Skip to content

NimTechnology

Trình bày các công nghệ CLOUD một cách dễ hiểu.

  • Kubernetes & Container
    • Docker
    • Kubernetes
      • Ingress
      • Pod
    • Helm Chart
    • OAuth2 Proxy
    • Isito-EnvoyFilter
    • Apache Kafka
      • Kafka
      • Kafka Connect
      • Lenses
    • Vault
    • Longhorn – Storage
    • VictoriaMetrics
    • MetalLB
    • Kong Gateway
  • CI/CD
    • ArgoCD
    • ArgoWorkflows
    • Argo Events
    • Spinnaker
    • Jenkins
    • Harbor
    • TeamCity
    • Git
      • Bitbucket
  • Coding
    • DevSecOps
    • Terraform
      • GCP – Google Cloud
      • AWS – Amazon Web Service
      • Azure Cloud
    • Golang
    • Laravel
    • Python
    • Jquery & JavaScript
    • Selenium
  • Log, Monitor & Tracing
    • DataDog
    • Prometheus
    • Grafana
    • ELK
      • Kibana
      • Logstash
  • BareMetal
    • NextCloud
  • Toggle search form

[Kafka-connect]Single Message Transform: lesson 3 – Flatten and LongConverter

Posted on February 21, 2022February 27, 2022 By nim No Comments on [Kafka-connect]Single Message Transform: lesson 3 – Flatten and LongConverter

Khi data trong 1 topic ở format Avro,… mà ta muốn sink vào DB thì nó sẽ bị lỗi ở việc tạo field column cho 1 table.

thì ta sẽ cân transform data như ảnh
để có thể các field comlumn

Contents

Toggle
  • 1) Setup the components.
  • 2) Practice.

1) Setup the components.

Phần setup kafka, bạn có thể coi lại link này he:
Setup the needing components of Kafka

2) Practice.

Launch ksqlDB:

docker exec -it ksqldb bash -c 'echo -e "\n\n⏳ Waiting for ksqlDB to be available before launching CLI\n"; while : ; do curl_status=$(curl -s -o /dev/null -w %{http_code} http://ksqldb:8088/info) ; echo -e $(date) " ksqlDB server listener HTTP state: " $curl_status " (waiting for 200)" ; if [ $curl_status -eq 200 ] ; then  break ; fi ; sleep 5 ; done ; ksql http://ksqldb:8088'

OK giờ bạn thấy output khá đẹp:

Create a stream:

CREATE STREAM CUSTOMERS (ID BIGINT KEY, FULL_NAME VARCHAR, ADDRESS STRUCT<STREET VARCHAR, CITY VARCHAR, COUNTY_OR_STATE VARCHAR, ZIP_OR_POSTCODE VARCHAR>)
                  WITH (KAFKA_TOPIC='day3-customers',
                        VALUE_FORMAT='AVRO',
                        REPLICAS=1,
                        PARTITIONS=4);

chúng ta có thế show topic trong ksql:
show topics;

preview data:

docker exec kafkacat kafkacat -b broker:29092 -r http://schema-registry:8081 -s key=s -s value=avro -t day3-customers -C -c1 -o beginning -u -q -J | jq  '.'

Bạn có thể thấy key đã được serialize
"key": "\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0001",

Giờ chúng ta tạo 1 jdbc sink connector:

curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day3-customers-00/config \
    -d '{
          "connector.class"               : "io.confluent.connect.jdbc.JdbcSinkConnector",
          "connection.url"                : "jdbc:mysql://mysql:3306/demo",
          "connection.user"               : "mysqluser",
          "connection.password"           : "mysqlpw",
          "topics"                        : "day3-customers",
          "tasks.max"                     : "4",
          "auto.create"                   : "true",
          "auto.evolve"                   : "true"
        }'

Giờ show status các connector:

curl -s "http://localhost:8083/connectors?expand=info&expand=status" |        jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' |        column -s : -t| sed 's/\"//g'| sort
connector vẫn đang running như task bên trong bị fail

check kĩ task bị lỗi gì:

curl -s "http://localhost:8083/connectors/sink-jdbc-mysql-day3-customers-00/status"| jq '.tasks[0].trace'
(STRUCT) type doesn’t have a mapping to the SQL database column type

Lỗi trên là sink nó ko thể map struct vào trong database
nó kiểu ADDRESS.xxx.xxx

Many databases don’t have support for nested fields, and whilst some have added it in recent times the JDBC Sink connector doesn’t support it.

Stream the nested data to MySQL – with a Flatten SMT
https://docs.confluent.io/platform/current/connect/transforms/flatten.html#avro-example
Bạn nên sem để hiểu các ví dụ nhé

Avro Example

The Avro schema specification only allows alphanumeric characters and the underscore _ character in field names. The configuration snippet below shows how to use Flatten to concatenate field names with the underscore _ delimiter character.

"transforms": "flatten",
"transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value",
"transforms.flatten.delimiter": "_"

Before:

{
  "content": {
    "id": 42,
    "name": {
      "first": "David",
      "middle": null,
      "last": "Wong"
    }
  }
}

After:

{
  "content_id": 42,
  "content_name_first": "David",
  "content_name_middle": null,
  "content_name_last": "Wong"
 }
curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day3-customers-00/config \
    -d '{
          "connector.class"               : "io.confluent.connect.jdbc.JdbcSinkConnector",
          "connection.url"                : "jdbc:mysql://mysql:3306/demo",
          "connection.user"               : "mysqluser",
          "connection.password"           : "mysqlpw",
          "topics"                        : "day3-customers",
          "tasks.max"                     : "4",
          "auto.create"                   : "true",
          "auto.evolve"                   : "true",
          "transforms"                    : "flatten",
          "transforms.flatten.type"       : "org.apache.kafka.connect.transforms.Flatten$Value",
          "transforms.flatten.delimiter"  : "_"
        }'
update lại connector
đã running
Bạn để ý các field column để có Undercore “_”

Add thêm khoá vào bảng
Here’s how to add the key into the target table:

Bạn thấy bảng của chúng ta ko có khoá chính

Chúng ta sẽ sử dụng:
“key.converter” : “org.apache.kafka.connect.converters.LongConverter”
Use LongConverter to deserialize that in kafka-connect

“pk.mode” : “record_key”,
Tell the JdbcSinkConnector that Primary key handling in the database, use the key of the Kafka message

“pk.fields” : “id”,
Chúng ta nói JdbcSinkConnector với field hay comlumn thì đánh khoá chính trong dabase

Giờ bạn cần xoá table day3-customers trong database mysql
xoá connector

curl -i -X DELETE -H "Accept:application/json"     -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day3-customers-00

Giờ tạo lại connector

curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day3-customers-02/config \
    -d '{
          "connector.class"               : "io.confluent.connect.jdbc.JdbcSinkConnector",
          "connection.url"                : "jdbc:mysql://mysql:3306/demo",
          "connection.user"               : "mysqluser",
          "connection.password"           : "mysqlpw",
          "topics"                        : "day3-customers",
          "tasks.max"                     : "4",
          "auto.create"                   : "true",
          "auto.evolve"                   : "true",
          "transforms"                    : "flatten",
          "transforms.flatten.type"       : "org.apache.kafka.connect.transforms.Flatten$Value",
          "transforms.flatten.delimiter"  : "_",
          "pk.mode"                       : "record_key",
          "pk.fields"                     : "id",
          "key.converter"                 : "org.apache.kafka.connect.converters.LongConverter"
        }'
Apache Kafka, Kafka Connect

Post navigation

Previous Post: [Kafka-connect] Single Message Transform: lesson 2 – ValueToKey and ExtractField in Sink
Next Post: [MongoDB] Install and config MongoDB is so easy on Docker.

More Related Articles

[Kafka-connect] research on Kafka Connect Source and demo watch the changing file. Apache Kafka
[Kafka] Console Consumer CLI and Consumers in Group Apache Kafka
[Kafka] Kafka Console Producer CLI. Apache Kafka
[Kafka-connect] Streaming the data of Postgresql through Kafka-connect and Debezium plugin. Apache Kafka
[Kafka-Connect] Overwrite value of Key on Mongo Source Connector – Mongodb Kafka Connect
[Kafka-connect] Single Message Transform: lesson 11 – Predicate and Filter – filtering topic name and content of the messages. Kafka Connect

Leave a Reply Cancel reply

Your email address will not be published. Required fields are marked *

Tham Gia Group DevOps nhé!
Để Nim có nhiều động lực ra nhiều bài viết.
Để nhận được những thông báo mới nhất.

Recent Posts

  • [Azure] The subscription is not registered to use namespace ‘Microsoft.ContainerService’ May 8, 2025
  • [Azure] Insufficient regional vcpu quota left May 8, 2025
  • [WordPress] How to add a Dynamic watermark on WordPress. May 6, 2025
  • [vnet/Azure] VNet provisioning via Terraform. April 28, 2025
  • [tracetcp] How to perform a tracert command using a specific port. April 3, 2025

Archives

  • May 2025
  • April 2025
  • March 2025
  • February 2025
  • January 2025
  • December 2024
  • November 2024
  • October 2024
  • September 2024
  • August 2024
  • July 2024
  • June 2024
  • May 2024
  • April 2024
  • March 2024
  • February 2024
  • January 2024
  • December 2023
  • November 2023
  • October 2023
  • September 2023
  • August 2023
  • July 2023
  • June 2023
  • May 2023
  • April 2023
  • March 2023
  • February 2023
  • January 2023
  • December 2022
  • November 2022
  • October 2022
  • September 2022
  • August 2022
  • July 2022
  • June 2022
  • May 2022
  • April 2022
  • March 2022
  • February 2022
  • January 2022
  • December 2021
  • November 2021
  • October 2021
  • September 2021
  • August 2021
  • July 2021
  • June 2021

Categories

  • BareMetal
    • NextCloud
  • CI/CD
    • Argo Events
    • ArgoCD
    • ArgoWorkflows
    • Git
      • Bitbucket
    • Harbor
    • Jenkins
    • Spinnaker
    • TeamCity
  • Coding
    • DevSecOps
    • Golang
    • Jquery & JavaScript
    • Laravel
    • NextJS 14 & ReactJS & Type Script
    • Python
    • Selenium
    • Terraform
      • AWS – Amazon Web Service
      • Azure Cloud
      • GCP – Google Cloud
  • Kubernetes & Container
    • Apache Kafka
      • Kafka
      • Kafka Connect
      • Lenses
    • Docker
    • Helm Chart
    • Isito-EnvoyFilter
    • Kong Gateway
    • Kubernetes
      • Ingress
      • Pod
    • Longhorn – Storage
    • MetalLB
    • OAuth2 Proxy
    • Vault
    • VictoriaMetrics
  • Log, Monitor & Tracing
    • DataDog
    • ELK
      • Kibana
      • Logstash
    • Fluent
    • Grafana
    • Prometheus
  • Uncategorized
  • Admin

Copyright © 2025 NimTechnology.