Skip to content

NimTechnology

Trình bày các công nghệ CLOUD một cách dễ hiểu.

  • Kubernetes & Container
    • Docker
    • Kubernetes
      • Ingress
      • Pod
    • Helm Chart
    • OAuth2 Proxy
    • Isito-EnvoyFilter
    • Apache Kafka
      • Kafka
      • Kafka Connect
      • Lenses
    • Vault
    • Longhorn – Storage
    • VictoriaMetrics
    • MetalLB
    • Kong Gateway
  • CI/CD
    • ArgoCD
    • ArgoWorkflows
    • Argo Events
    • Spinnaker
    • Jenkins
    • Harbor
    • TeamCity
    • Git
      • Bitbucket
  • Coding
    • DevSecOps
    • Terraform
      • GCP – Google Cloud
      • AWS – Amazon Web Service
      • Azure Cloud
    • Golang
    • Laravel
    • Python
    • Jquery & JavaScript
    • Selenium
  • Log, Monitor & Tracing
    • DataDog
    • Prometheus
    • Grafana
    • ELK
      • Kibana
      • Logstash
  • BareMetal
    • NextCloud
  • Toggle search form

[Kafka-connect] Single Message Transform: lesson 1 – InsertField in Sink

Posted on February 20, 2022February 27, 2022 By nim No Comments on [Kafka-connect] Single Message Transform: lesson 1 – InsertField in Sink

Bài này chúng ta sem cách anh tây hướng dẫn insert field hay còn gọi là cột dữ liệu mà ảnh muốn khi mà anh sink data từ topic của kafka vào database (Mysql)

và đây là bài của ảnh:
https://github.com/confluentinc/demo-scene/blob/master/kafka-connect-single-message-transforms/day1.adoc

Contents

Toggle
  • 1) Setup the needing components of Kafka
  • 2) Practice

1) Setup the needing components of Kafka

Và dưới đây là phần note của mình:

git clone https://github.com/confluentinc/demo-scene.git
cd demo-scene/
cd kafka-connect-single-message-transforms/

Giờ run docker-compose
Ở đây mình cung cấp file nội dụng file docker-compose.yaml luôn he

---
version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:6.0.0
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:6.0.0
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
    # "`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-
    # An important note about accessing Kafka from clients on other machines:
    # -----------------------------------------------------------------------
    #
    # The config used here exposes port 9092 for _external_ connections to the broker
    # i.e. those from _outside_ the docker network. This could be from the host machine
    # running docker, or maybe further afield if you've got a more complicated setup.
    # If the latter is true, you will need to change the value 'localhost' in
    # KAFKA_ADVERTISED_LISTENERS to one that is resolvable to the docker host from those
    # remote clients
    #
    # For connections _internal_ to the docker network, such as from other services
    # and components, use broker:29092.
    #
    # See https://rmoff.net/2018/08/02/kafka-listeners-explained/ for details
    # "`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-
    #
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://broker:9092
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100

  schema-registry:
    image: confluentinc/cp-schema-registry:6.0.0
    container_name: schema-registry
    ports:
      - "8081:8081"
    depends_on:
      - broker
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'

  kafka-connect:
    image: confluentinc/cp-kafka-connect-base:6.0.0
    container_name: kafka-connect
    depends_on:
      - broker
      - schema-registry
    ports:
      - 8083:8083
    environment:
      CONNECT_BOOTSTRAP_SERVERS: "broker:29092"
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: kafka-connect
      CONNECT_CONFIG_STORAGE_TOPIC: _connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: _connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: _connect-status
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
      CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
      CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
      CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
    # You can either specify your AWS credentials here, or create a .env file
    # in which the environment variables are set. The latter is preferable since you can isolate 
    # your credentials from this Docker Compose file.
      AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID}
      AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY}
    #  ---------------
      CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components,/data/connect-jars
    # If you want to use the Confluent Hub installer to d/l component, but make them available
    # when running this offline, spin up the stack once and then run : 
    #   docker cp kafka-connect:/usr/share/confluent-hub-components ./data/connect-jars
    # volumes:
    #   - $PWD/data:/data
    # In the command section, $ are replaced with $$ to avoid the error 'Invalid interpolation format for "command" option'
    command:
      - bash
      - -c
      - |
        echo "Installing Connector"
        confluent-hub install --no-prompt confluentinc/kafka-connect-s3:5.5.2
        confluent-hub install --no-prompt mdrogalis/voluble:0.3.1
        confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:10.0.1
        confluent-hub install --no-prompt confluentinc/connect-transforms:1.3.2
        confluent-hub install --no-prompt jcustenborder/kafka-connect-transform-common:0.1.0.36
        confluent-hub install --no-prompt jcustenborder/kafka-connect-simulator:0.1.120
        #
        # JDBC Drivers
        # ------------
        # MySQL
        cd /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib
        # See https://dev.mysql.com/downloads/connector/j/
        curl http://ftp.jaist.ac.jp/pub/mysql/Downloads/Connector-J/mysql-connector-java-8.0.28.tar.gz | tar xz
        #
        echo "Launching Kafka Connect worker"
        /etc/confluent/docker/run &
        #
        sleep infinity

  ksqldb:
    image: confluentinc/ksqldb-server:0.14.0
    container_name: ksqldb
    depends_on:
      - broker
    ports:
      - "8088:8088"
    environment:
      KSQL_LISTENERS: http://0.0.0.0:8088
      KSQL_BOOTSTRAP_SERVERS: broker:29092
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
      KSQL_KSQL_CONNECT_URL: http://kafka-connect:8083
      KSQL_KSQL_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      KSQL_KSQL_SERVICE_ID: confluent_rmoff_01
      KSQL_KSQL_HIDDEN_TOPICS: '^_.*'
      
# Other systems
  mysql:
    # *-----------------------------*
    # To connect to the DB:
    #   docker exec -it mysql bash -c 'mysql -u root -p$MYSQL_ROOT_PASSWORD demo'
    # *-----------------------------*
    image: mysql:8.0
    container_name: mysql
    ports:
      - 3306:3306
    environment:
     - MYSQL_ROOT_PASSWORD=Admin123
     - MYSQL_USER=mysqluser
     - MYSQL_PASSWORD=mysqlpw
    volumes:
     - ${PWD}/00_setup_db.sql:/docker-entrypoint-initdb.d/00_setup_db.sql
    #  - ${PWD}/data:/data

  kafkacat:
    image: edenhill/kafkacat:1.6.0
    container_name: kafkacat
    links:
      - broker
      - schema-registry
    entrypoint: 
      - /bin/sh 
      - -c 
      - |
        apk add jq; 
        while [ 1 -eq 1 ];do sleep 60;done
docker-compose up -d

https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/
trong docker-compose có action download file mysql-connector-java thì bạn nhớ check link còn sống ko nhé!

Kiểm tra kafka-connect đã lên chưa?

bash -c ' \
echo -e "\n\n=============\nWaiting for Kafka Connect to start listening on localhost ⏳\n=============\n"
while [ $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) -ne 200 ] ; do
  echo -e "\t" $(date) " Kafka Connect listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) " (waiting for 200)"
  sleep 5
done
echo -e $(date) "\n\n--------------\n\o/ Kafka Connect is ready! Listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) "\n--------------\n"
'
Báo như này là ok.

2) Practice

giờ tạo connector với kafka-connect: data generator
cái này là source: Data đưa vào topic

curl -i -X PUT -H  "Content-Type:application/json" \
    http://localhost:8083/connectors/source-voluble-datagen-00/config \
    -d '{
        "connector.class"                      : "io.mdrogalis.voluble.VolubleSourceConnector",
        "genkp.customers.with"                 : "#{Internet.uuid}",
        "genv.customers.name.with"             : "#{HitchhikersGuideToTheGalaxy.character}",
        "genv.customers.email.with"            : "#{Internet.emailAddress}",
        "genv.customers.location->city.with"   : "#{HitchhikersGuideToTheGalaxy.location}",
        "genv.customers.location->planet.with" : "#{HitchhikersGuideToTheGalaxy.planet}",
        "topic.customers.records.exactly"      : 10,

        "genkp.transactions.with"                : "#{Internet.uuid}",
        "genv.transactions.customer_id.matching" : "customers.key",
        "genv.transactions.cost.with"            : "#{Commerce.price}",
        "genv.transactions.card_type.with"       : "#{Business.creditCardType}",
        "genv.transactions.item.with"            : "#{Beer.name}",
        "topic.transactions.throttle.ms"         : 500
    }'

thông qua câu lệnh này chúng ta có thể kiểm tra status của connectors

curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
       jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
       column -s : -t| sed 's/\"//g'| sort
chúng ta thấy connector source này đang ok.

giờ tạo Sink data from Kafka to MySQL (JDBC Sink connector)
cái này là data từ topic được ghi vào DB

curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-00/config \
    -d '{
          "connector.class"     : "io.confluent.connect.jdbc.JdbcSinkConnector",
          "connection.url"      : "jdbc:mysql://mysql:3306/demo",
          "connection.user"     : "mysqluser",
          "connection.password" : "mysqlpw",
          "topics"              : "transactions",
          "tasks.max"           : "4",
          "auto.create"         : "true"
        }'

Giờ kiểm tra lại

curl -s "http://localhost:8083/connectors?expand=info&expand=status" |        jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' |        column -s : -t| sed 's/\"//g'| sort

GIờ exec vào database kiểm tra, vào thằng table demo luôn:
docker exec -it mysql mysql -u mysqluser -pmysqlpw demo

mysql> show tables;
+----------------+
| Tables_in_demo |
+----------------+
| transactions   |
+----------------+
1 row in set (0.00 sec)
mysql> describe transactions;
+-------------+------+------+-----+---------+-------+
| Field       | Type | Null | Key | Default | Extra |
+-------------+------+------+-----+---------+-------+
| customer_id | text | YES  |     | NULL    |       |
| cost        | text | YES  |     | NULL    |       |
| item        | text | YES  |     | NULL    |       |
| card_type   | text | YES  |     | NULL    |       |
+-------------+------+------+-----+---------+-------+
4 rows in set (0.01 sec)

chúng ta có thể thấy ở trên là sink đã tạo table.

khí select data trong table này thì dùng câu sau:
select * from transactions;

giờ chúng ta add thêm 1 số về transform:
Add Message Timestamp to payload sent to sinks: InsertField

curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-00/config \
    -d '{
          "connector.class"     : "io.confluent.connect.jdbc.JdbcSinkConnector",
          "connection.url"      : "jdbc:mysql://mysql:3306/demo",
          "connection.user"     : "mysqluser",
          "connection.password" : "mysqlpw",
          "topics"              : "transactions",
          "tasks.max"           : "4",
          "auto.create"         : "true",
          "auto.evolve"         : "true",
          "transforms"          : "insertTS",
          "transforms.insertTS.type": "org.apache.kafka.connect.transforms.InsertField$Value",
          "transforms.insertTS.timestamp.field": "messageTS"
        }'

Note auto.evolve=true otherwise the target table won’t hold the new field unless it happens to exist already.

chúng ta thêm 3 dòng này:

      "transforms"          : "insertTS",
      "transforms.insertTS.type": "org.apache.kafka.connect.transforms.InsertField$Value",
      "transforms.insertTS.timestamp.field": "messageTS"

Mình sẽ giải thích đơn giản:

– “transforms” : “insertTS” Điều này nghĩ là transforms này bạn đặt cho nó 1 label là insertTS

– “transforms.insertTS.timestamp.field”: “messageTS”
Mình sẽ giải thích cái này trước.
https://docs.confluent.io/platform/current/connect/transforms/insertfield.html
Ở links trên thì chúng được cung cấp nhiều kiểu kiểu dữ liệu và mình chọn là “timestamp.field”
value của timestamp là ngày giờ.
và đặt tên cho key ấy là: “messageTS“ và dự liệu của key là: ngày giờ
Lúc này sink sẽ tạo column là messageTS với type là datetime

Bạn tưởng tưởng tượng là chúng ta có cột
Chúng ta phải thêm dòng việc insert là insert ngày giờ vào

– “transforms.insertTS.type”: “org.apache.kafka.connect.transforms.InsertField$Value”
Ờ đây thì sink lấy value (gía trị) của timestamp.field ví dụ như: 2022-02-20 10:22:17.871 và điện vào column messageTS

Và việc trên sẽ được lặp lại với từng dòng dữ liệu

nếu bạn thấy status này là ok.

Quay trở lại kiểm tra database:

docker exec -it mysql mysql -u mysqluser -pmysqlpw demo

Mình có 1 ảnh ví dụ về static transforms:

Apache Kafka, Kafka Connect

Post navigation

Previous Post: [Kafka-connect] Streaming the data of Postgresql through Kafka-connect and Debezium plugin.
Next Post: [Kafka-connect] A few APIs are helpful in Kafka-connect.

More Related Articles

[Kafka] Install kafka and zookeeper cluster on kubernetes. Apache Kafka
[Lenses/kafka] Fix the problem “Cannot extract connector information from the configuration provided” when creating connector Kafka-connect Apache Kafka
[Kafka-connect] Install lenses to manage Kafka. Apache Kafka
[Kafka-connect] Single Message Transform: lesson 8 – TimestampConverter – convert the type of the fields from string to timestamp or date,… Apache Kafka
[Kafka-connect] Streaming the data of MongoDB through Kafka-connect into topic Kafka. Apache Kafka
[Kafka-connect]Single Message Transform: lesson 3 – Flatten and LongConverter Apache Kafka

Leave a Reply Cancel reply

Your email address will not be published. Required fields are marked *

Tham Gia Group DevOps nhé!
Để Nim có nhiều động lực ra nhiều bài viết.
Để nhận được những thông báo mới nhất.

Recent Posts

  • [Azure] The subscription is not registered to use namespace ‘Microsoft.ContainerService’ May 8, 2025
  • [Azure] Insufficient regional vcpu quota left May 8, 2025
  • [WordPress] How to add a Dynamic watermark on WordPress. May 6, 2025
  • [vnet/Azure] VNet provisioning via Terraform. April 28, 2025
  • [tracetcp] How to perform a tracert command using a specific port. April 3, 2025

Archives

  • May 2025
  • April 2025
  • March 2025
  • February 2025
  • January 2025
  • December 2024
  • November 2024
  • October 2024
  • September 2024
  • August 2024
  • July 2024
  • June 2024
  • May 2024
  • April 2024
  • March 2024
  • February 2024
  • January 2024
  • December 2023
  • November 2023
  • October 2023
  • September 2023
  • August 2023
  • July 2023
  • June 2023
  • May 2023
  • April 2023
  • March 2023
  • February 2023
  • January 2023
  • December 2022
  • November 2022
  • October 2022
  • September 2022
  • August 2022
  • July 2022
  • June 2022
  • May 2022
  • April 2022
  • March 2022
  • February 2022
  • January 2022
  • December 2021
  • November 2021
  • October 2021
  • September 2021
  • August 2021
  • July 2021
  • June 2021

Categories

  • BareMetal
    • NextCloud
  • CI/CD
    • Argo Events
    • ArgoCD
    • ArgoWorkflows
    • Git
      • Bitbucket
    • Harbor
    • Jenkins
    • Spinnaker
    • TeamCity
  • Coding
    • DevSecOps
    • Golang
    • Jquery & JavaScript
    • Laravel
    • NextJS 14 & ReactJS & Type Script
    • Python
    • Selenium
    • Terraform
      • AWS – Amazon Web Service
      • Azure Cloud
      • GCP – Google Cloud
  • Kubernetes & Container
    • Apache Kafka
      • Kafka
      • Kafka Connect
      • Lenses
    • Docker
    • Helm Chart
    • Isito-EnvoyFilter
    • Kong Gateway
    • Kubernetes
      • Ingress
      • Pod
    • Longhorn – Storage
    • MetalLB
    • OAuth2 Proxy
    • Vault
    • VictoriaMetrics
  • Log, Monitor & Tracing
    • DataDog
    • ELK
      • Kibana
      • Logstash
    • Fluent
    • Grafana
    • Prometheus
  • Uncategorized
  • Admin

Copyright © 2025 NimTechnology.