Skip to content

NimTechnology

Trình bày các công nghệ CLOUD một cách dễ hiểu.

  • Kubernetes & Container
    • Docker
    • Kubernetes
      • Ingress
      • Pod
    • Helm Chart
    • OAuth2 Proxy
    • Isito-EnvoyFilter
    • Apache Kafka
      • Kafka
      • Kafka Connect
      • Lenses
    • Vault
    • Longhorn – Storage
    • VictoriaMetrics
    • MetalLB
    • Kong Gateway
  • CI/CD
    • ArgoCD
    • ArgoWorkflows
    • Argo Events
    • Spinnaker
    • Jenkins
    • Harbor
    • TeamCity
    • Git
      • Bitbucket
  • Coding
    • DevSecOps
    • Terraform
      • GCP – Google Cloud
      • AWS – Amazon Web Service
      • Azure Cloud
    • Golang
    • Laravel
    • Python
    • Jquery & JavaScript
    • Selenium
  • Log, Monitor & Tracing
    • DataDog
    • Prometheus
    • Grafana
    • ELK
      • Kibana
      • Logstash
  • BareMetal
    • NextCloud
  • Toggle search form

[Kafka-connect] Single Message Transform: lesson 6 InsertField – Insert fields using attributes in the process data.

Posted on February 26, 2022February 27, 2022 By nim No Comments on [Kafka-connect] Single Message Transform: lesson 6 InsertField – Insert fields using attributes in the process data.

Contents

Toggle
  • 1) Setup the components.
  • 2) Practice.

1) Setup the components.

Phần setup kafka, bạn có thể coi lại link này he:
Setup the needing components of Kafka

Bạn nhớ delele container cũ và run lại docker-compose nhé:
docker-compose down -v
docker-compose up -d

2) Practice.

Bài này kiểu như bạn thêm số field mà trong quá trình kafka-connect stream data.
Ví dụ trong stream data từ DB -> kafka-connect bạn add thêm field tên DB hay hostname DB

Tạo the data generator
https://www.confluent.io/hub/mdrogalis/voluble

curl -i -X PUT -H  "Content-Type:application/json" \
    http://localhost:8083/connectors/source-voluble-datagen-day6-00/config \
    -d '{
        "connector.class"                           : "io.mdrogalis.voluble.VolubleSourceConnector",
        "genkp.day6-transactions.with"              : "#{Internet.uuid}",
        "genv.day6-transactions.cost.with"          : "#{Commerce.price}",
        "genv.day6-transactions.card_type.with"     : "#{Business.creditCardType}",
        "genv.day6-transactions.item.with"          : "#{Beer.name}",
        "topic.day6-transactions.throttle.ms"       : 5000,
        "transforms"                                : "insertStaticField1,insertStaticField2",
        "transforms.insertStaticField1.type"        : "org.apache.kafka.connect.transforms.InsertField$Value",
        "transforms.insertStaticField1.static.field": "sourceSystem",
        "transforms.insertStaticField1.static.value": "NeverGonna",
        "transforms.insertStaticField2.type"        : "org.apache.kafka.connect.transforms.InsertField$Value",
        "transforms.insertStaticField2.static.field": "ingestAgent",
        "transforms.insertStaticField2.static.value": "GiveYouUp"
    }'

Bạn thấy chúng ta add thêm các fields có value (giá trị) là statics.

Ngoài ra chúng ta còn có thể add thêm các field khách mà InsertField cung cấp:

NameDescriptionTypeDefaultValid ValuesImportance
offset.fieldField name for Apache Kafka® offset. This is only applicable to sink connectors. Suffix with ! to make this a required field, or ? to keep it optional (the default).stringnull medium
partition.fieldField name for Kafka partition. Suffix with ! to make this a required field, or ? to keep it optional (the default).stringnull medium
static.fieldField name for static data field. Suffix with ! to make this a required field, or ? to keep it optional (the default).stringnull medium
static.valueIf field name is configured, the static field value.stringnull medium
timestamp.fieldField name for record timestamp. Suffix with ! to make this a required field, or ? to keep it optional (the default).stringnull medium
topic.fieldField name for Kafka topic. Suffix with ! to make this a required field, or ? to keep it optional (the default).stringnull medium
curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day6-00/config \
    -d '{
          "connector.class"                           : "io.confluent.connect.jdbc.JdbcSinkConnector",
          "connection.url"                            : "jdbc:mysql://mysql:3306/demo",
          "connection.user"                           : "mysqluser",
          "connection.password"                       : "mysqlpw",
          "topics"                                    : "day6-transactions",
          "tasks.max"                                 : "4",
          "auto.create"                               : "true",
          "auto.evolve"                               : "true",
          "transforms"                                : "insertPartition,insertOffset,insertTopic",
          "transforms.insertPartition.type"           : "org.apache.kafka.connect.transforms.InsertField$Value",
          "transforms.insertPartition.partition.field": "kafkaPartition",
          "transforms.insertOffset.type"              : "org.apache.kafka.connect.transforms.InsertField$Value",
          "transforms.insertOffset.offset.field"      : "kafkaOffset",
          "transforms.insertTopic.type"               : "org.apache.kafka.connect.transforms.InsertField$Value",
          "transforms.insertTopic.topic.field"        : "kafkaTopic"
        }'
docker exec -it mysql bash -c 'mysql -u root -p$MYSQL_ROOT_PASSWORD demo'
SELECT * FROM `day6-transactions` LIMIT 5;

Bạn sẽ tham khảo cái link này để hiểu được các properties
https://docs.confluent.io/platform/current/connect/transforms/insertfield.html#properties
https://github.com/confluentinc/demo-scene/blob/master/kafka-connect-single-message-transforms/day6.adoc

Apache Kafka, Kafka Connect

Post navigation

Previous Post: [Kafka-connect] Single Message Transform: lesson 5 MaskField – Cover the sensitive data.
Next Post: [Kafka-connect] Single Message Transform: lesson 7 – TimeStampRouter and MessageTimestampRouter – Custom format topic name with timestamp

More Related Articles

[Kafka-connect] Streaming the data of Postgresql through Kafka-connect and Debezium plugin. Apache Kafka
[Kafka-connect] Single Message Transform: lesson 9 – Cast Apache Kafka
[Kafka-connect] Single Message Transform: lesson 8 – TimestampConverter – convert the type of the fields from string to timestamp or date,… Apache Kafka
[Kafka-connect] Install Kafka-connect on Kubernetes through helm-chart. Apache Kafka
[Kafka-connect] Install lenses to manage Kafka. Apache Kafka
[Kafka-connect] Single Message Transform: lesson 10 – ReplaceField – U can drop, keep and renamed field in kafka message. Kafka Connect

Leave a Reply Cancel reply

Your email address will not be published. Required fields are marked *

Tham Gia Group DevOps nhé!
Để Nim có nhiều động lực ra nhiều bài viết.
Để nhận được những thông báo mới nhất.

Recent Posts

  • [Azure] The subscription is not registered to use namespace ‘Microsoft.ContainerService’ May 8, 2025
  • [Azure] Insufficient regional vcpu quota left May 8, 2025
  • [WordPress] How to add a Dynamic watermark on WordPress. May 6, 2025
  • [vnet/Azure] VNet provisioning via Terraform. April 28, 2025
  • [tracetcp] How to perform a tracert command using a specific port. April 3, 2025

Archives

  • May 2025
  • April 2025
  • March 2025
  • February 2025
  • January 2025
  • December 2024
  • November 2024
  • October 2024
  • September 2024
  • August 2024
  • July 2024
  • June 2024
  • May 2024
  • April 2024
  • March 2024
  • February 2024
  • January 2024
  • December 2023
  • November 2023
  • October 2023
  • September 2023
  • August 2023
  • July 2023
  • June 2023
  • May 2023
  • April 2023
  • March 2023
  • February 2023
  • January 2023
  • December 2022
  • November 2022
  • October 2022
  • September 2022
  • August 2022
  • July 2022
  • June 2022
  • May 2022
  • April 2022
  • March 2022
  • February 2022
  • January 2022
  • December 2021
  • November 2021
  • October 2021
  • September 2021
  • August 2021
  • July 2021
  • June 2021

Categories

  • BareMetal
    • NextCloud
  • CI/CD
    • Argo Events
    • ArgoCD
    • ArgoWorkflows
    • Git
      • Bitbucket
    • Harbor
    • Jenkins
    • Spinnaker
    • TeamCity
  • Coding
    • DevSecOps
    • Golang
    • Jquery & JavaScript
    • Laravel
    • NextJS 14 & ReactJS & Type Script
    • Python
    • Selenium
    • Terraform
      • AWS – Amazon Web Service
      • Azure Cloud
      • GCP – Google Cloud
  • Kubernetes & Container
    • Apache Kafka
      • Kafka
      • Kafka Connect
      • Lenses
    • Docker
    • Helm Chart
    • Isito-EnvoyFilter
    • Kong Gateway
    • Kubernetes
      • Ingress
      • Pod
    • Longhorn – Storage
    • MetalLB
    • OAuth2 Proxy
    • Vault
    • VictoriaMetrics
  • Log, Monitor & Tracing
    • DataDog
    • ELK
      • Kibana
      • Logstash
    • Fluent
    • Grafana
    • Prometheus
  • Uncategorized
  • Admin

Copyright © 2025 NimTechnology.