Skip to content

NimTechnology

Trình bày các công nghệ CLOUD một cách dễ hiểu.

  • Kubernetes & Container
    • Docker
    • Kubernetes
      • Ingress
      • Pod
    • Helm Chart
    • OAuth2 Proxy
    • Isito-EnvoyFilter
    • Apache Kafka
      • Kafka
      • Kafka Connect
      • Lenses
    • Vault
    • Longhorn – Storage
    • VictoriaMetrics
    • MetalLB
    • Kong Gateway
  • CI/CD
    • ArgoCD
    • ArgoWorkflows
    • Argo Events
    • Spinnaker
    • Jenkins
    • Harbor
    • TeamCity
    • Git
      • Bitbucket
  • Coding
    • DevSecOps
    • Terraform
      • GCP – Google Cloud
      • AWS – Amazon Web Service
      • Azure Cloud
    • Golang
    • Laravel
    • Python
    • Jquery & JavaScript
    • Selenium
  • Log, Monitor & Tracing
    • DataDog
    • Prometheus
    • Grafana
    • ELK
      • Kibana
      • Logstash
  • BareMetal
    • NextCloud
  • Toggle search form

[Kafka-connect] Single Message Transform: lesson 2 – ValueToKey and ExtractField in Sink

Posted on February 20, 2022February 27, 2022 By nim No Comments on [Kafka-connect] Single Message Transform: lesson 2 – ValueToKey and ExtractField in Sink

Contents

Toggle
  • 1) Setup
  • 2) Practice

1) Setup

Phần setup kafka, bạn có thể coi lại link này he:
Setup the needing components of Kafka

2) Practice

giờ chui vào mysql nghịch:

docker exec -it mysql bash -c 'mysql -u root -p$MYSQL_ROOT_PASSWORD demo'

tạo table và insert data.

create table customers (
	id INT,
	full_name VARCHAR(50),
	birthdate DATE,
	fav_animal VARCHAR(50),
	fav_colour VARCHAR(50),
	fav_movie VARCHAR(50)
);
insert into customers (id, full_name, birthdate, fav_animal, fav_colour, fav_movie) values (1, 'Leone Puxley', '1995-02-06', 'Violet-eared waxbill', 'Puce', 'Oh! What a Lovely War');
insert into customers (id, full_name, birthdate, fav_animal, fav_colour, fav_movie) values (2, 'Angelo Sharkey', '1996-04-08', 'Macaw, green-winged', 'Red', 'View from the Top, A');
insert into customers (id, full_name, birthdate, fav_animal, fav_colour, fav_movie) values (3, 'Jozef Bailey', '1954-07-10', 'Little brown bat', 'Indigo', '99 francs');
insert into customers (id, full_name, birthdate, fav_animal, fav_colour, fav_movie) values (4, 'Evelyn Deakes', '1975-09-13', 'Vervet monkey', 'Teal', 'Jane Austen in Manhattan');
insert into customers (id, full_name, birthdate, fav_animal, fav_colour, fav_movie) values (5, 'Dermot Perris', '1991-01-29', 'African ground squirrel (unidentified)', 'Khaki', 'Restless');
insert into customers (id, full_name, birthdate, fav_animal, fav_colour, fav_movie) values (6, 'Renae Bonsale', '1965-01-05', 'Brown antechinus', 'Fuscia', 'Perfect Day, A (Un giorno perfetto)');
insert into customers (id, full_name, birthdate, fav_animal, fav_colour, fav_movie) values (7, 'Florella Fridlington', '1950-08-07', 'Burmese brown mountain tortoise', 'Purple', 'Dot the I');
insert into customers (id, full_name, birthdate, fav_animal, fav_colour, fav_movie) values (8, 'Hettie Keepence', '1971-10-14', 'Crab-eating raccoon', 'Puce', 'Outer Space');
insert into customers (id, full_name, birthdate, fav_animal, fav_colour, fav_movie) values (9, 'Briano Quene', '1990-05-02', 'Cormorant, large', 'Yellow', 'Peacekeeper, The');
insert into customers (id, full_name, birthdate, fav_animal, fav_colour, fav_movie) values (10, 'Jeddy Cassell', '1978-12-24', 'Badger, european', 'Indigo', 'Shadow of a Doubt');

Giờ chúng ta đã có table mới:

Create source connector

reference links:
https://github.com/confluentinc/demo-scene/blob/master/kafka-connect-single-message-transforms/day2.adoc
https://docs.confluent.io/current/connect/kafka-connect-jdbc/source-connector/
Kafka Connect Deep Dive – JDBC Source Connector

Giờ bạn run command để tạo source jdbc:

curl -X PUT http://localhost:8083/connectors/source-jdbc-mysql-00/config \
         -H "Content-Type: application/json" -d '{
          "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
          "connection.url": "jdbc:mysql://mysql:3306/demo",
          "connection.user": "mysqluser",
          "connection.password": "mysqlpw",
          "topic.prefix": "mysql-00-",
          "poll.interval.ms": 1000,
          "tasks.max":1,
          "table.whitelist" : "customers",
          "mode":"incrementing",
          "incrementing.column.name": "id",
          "validate.non.null": false
          }'

OK giờ show topic ra he:

docker exec kafkacat kafkacat -b broker:29092 -L -J | jq '.topics[].topic'|sort
Như là đây là 2 topic mới được tạo từ kafka-connect

giờ chúng ta Preview data:

docker exec kafkacat kafkacat -b broker:29092 -r http://schema-registry:8081 -s key=s -s value=avro -t mysql-00-customers -C -c1 -o beginning -u -q -J | jq  '.'

ValueToKey
Replace the record key with a new key formed from a subset of fields in the record value.
https://docs.confluent.io/platform/current/connect/transforms/valuetokey.html

curl -X PUT http://localhost:8083/connectors/source-jdbc-mysql-01/config \
         -H "Content-Type: application/json" -d '{
          "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
          "connection.url": "jdbc:mysql://mysql:3306/demo",
          "connection.user": "mysqluser",
          "connection.password": "mysqlpw",
          "topic.prefix": "mysql-01-",
          "poll.interval.ms": 1000,
          "tasks.max":1,
          "table.whitelist" : "customers",
          "mode":"incrementing",
          "incrementing.column.name": "id",
          "validate.non.null": false,
          "transforms": "copyIdToKey",
          "transforms.copyIdToKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
          "transforms.copyIdToKey.fields": "id"
          }'

“transforms”: “copyIdToKey”,
“transforms.copyIdToKey.type”: “org.apache.kafka.connect.transforms.ValueToKey”,
“transforms.copyIdToKey.fields”: “id”

Anh tây nói như thế này:
Bước 1: đặt label cho cái transform là: copyIdToKey
Bước 2: Từ 1 key trong topic thì nó sẽ tương ứng với 1 column hay 1 field trong database -> ta lấy ra được value (giá trị) -> overwrite key mà chúng ta đang bị null ở trên.

Mình ví dụ: với 1 dòng insert sau:
insert into customers (id, full_name, birthdate, fav_animal, fav_colour, fav_movie) values (1, ‘Leone Puxley’, ‘1995-02-06’, ‘Violet-eared waxbill’, ‘Puce’, ‘Oh! What a Lovely War’);
Chúng lấy column là id và chúng ta có giá trị là 1 chúng ta overwrite cái key: null
Đại loại là như thế

Kiểm tra topic:

docker exec kafkacat kafkacat -b broker:29092 -L -J | jq '.topics[].topic'|sort

chúng ta review lại data:
docker exec kafkacat kafkacat -b broker:29092 -r http://schema-registry:8081 -s key=s -s value=avro -t mysql-01-customers -C -c1 -o beginning -u -q -J | jq  '.'

nhưng ở đây thì nó mang cả struct và bên trong có key and value qua luôn
nhưng mình chỉ muốn value là 1 thôi.

Combining ValueToKey and ExtractField
The above SMT will write a struct to the key, and often you want just the primitive value instead. That’s what ExtractField does.

ExtractField pulls a field out of a complex (non-primitive, Map or Struct) key or value and replaces the entire key or value with the extracted field. Any null values are passed through unmodified.

curl -X PUT http://localhost:8083/connectors/source-jdbc-mysql-02/config \
         -H "Content-Type: application/json" -d '{
          "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
          "connection.url": "jdbc:mysql://mysql:3306/demo",
          "connection.user": "mysqluser",
          "connection.password": "mysqlpw",
          "topic.prefix": "mysql-02-",
          "poll.interval.ms": 1000,
          "tasks.max":1,
          "table.whitelist" : "customers",
          "mode":"incrementing",
          "incrementing.column.name": "id",
          "validate.non.null": false,
          "transforms": "copyIdToKey,extractKeyFromStruct",
          "transforms.copyIdToKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
          "transforms.copyIdToKey.fields": "id",
          "transforms.extractKeyFromStruct.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
          "transforms.extractKeyFromStruct.field":"id"
          }'


Chúng ta đặt 1 label mới: extractKeyFromStruct

Bạn để ý cái giá trị của key chúng ta là Struct{id=1}

Lúc này thằng “copyIdToKey” đưa cho “extractKeyFromStruct” data như sau:
“key”: “Struct{id=1}”
Vì data này đầy đủ key và value nên ta sử dụng:
“transforms.extractKeyFromStruct.type”:”org.apache.kafka.connect.transforms.ExtractField$Key“,

Note 1 chút:
Vậy khi nào sài:
“transforms.ExtractField.type”:”org.apache.kafka.connect.transforms.ExtractField$Value“
khi mà data vào là value thôi thí dụ: “Struct{id=1}”
reference links:
https://docs.confluent.io/platform/current/connect/transforms/extractfield.html#examples

“transforms.extractKeyFromStruct.field”:”id”
tiếp đến là extract (trích xuất) value của key “id” trả về con số 1

giờ preview lại data:

docker exec kafkacat kafkacat -b broker:29092 -r http://schema-registry:8081 -s key=s -s value=avro -t mysql-02-customers -C -c1 -o beginning -u -q -J | jq  '.'
Apache Kafka, Kafka Connect

Post navigation

Previous Post: [Kafka-connect] A few APIs are helpful in Kafka-connect.
Next Post: [Kafka-connect]Single Message Transform: lesson 3 – Flatten and LongConverter

More Related Articles

[Kafka] UI control Kafka, Kafka-connect, … It’s akhq.io Apache Kafka
[Kafka/Schema-registry] Installing Schema-registry to use for the Kafka and Kafka-connect model Apache Kafka
IBM MQ -> RabbitMQ -> Kafka ->Pulsar: How do message queue architectures evolve? Kafka Connect
[Kafka-connect] Streaming the data of MongoDB through Kafka-connect into topic Kafka. Apache Kafka
[Kafka-connect] Solving the error or issues on Kafka-connect Kafka Connect
[Kafka-connect] Single Message Transform: lesson 5 MaskField – Cover the sensitive data. Apache Kafka

Leave a Reply Cancel reply

Your email address will not be published. Required fields are marked *

Tham Gia Group DevOps nhé!
Để Nim có nhiều động lực ra nhiều bài viết.
Để nhận được những thông báo mới nhất.

Recent Posts

  • [WordPress] Hướng dấn gửi mail trên WordPress thông qua gmail. June 15, 2025
  • [Bitbucket] Git Clone/Pull/Push with Bitbucket through API Token. June 12, 2025
  • [Teamcity] How to transfer the value from pipeline A to pipeline B June 9, 2025
  • [Windows] Remove the process that consumes too much CPU. June 3, 2025
  • Deploying Web-Based File Managers: File Browser and KubeFileBrowser with Docker and Kubernetes June 3, 2025

Archives

  • June 2025
  • May 2025
  • April 2025
  • March 2025
  • February 2025
  • January 2025
  • December 2024
  • November 2024
  • October 2024
  • September 2024
  • August 2024
  • July 2024
  • June 2024
  • May 2024
  • April 2024
  • March 2024
  • February 2024
  • January 2024
  • December 2023
  • November 2023
  • October 2023
  • September 2023
  • August 2023
  • July 2023
  • June 2023
  • May 2023
  • April 2023
  • March 2023
  • February 2023
  • January 2023
  • December 2022
  • November 2022
  • October 2022
  • September 2022
  • August 2022
  • July 2022
  • June 2022
  • May 2022
  • April 2022
  • March 2022
  • February 2022
  • January 2022
  • December 2021
  • November 2021
  • October 2021
  • September 2021
  • August 2021
  • July 2021
  • June 2021

Categories

  • BareMetal
    • NextCloud
  • CI/CD
    • Argo Events
    • ArgoCD
    • ArgoWorkflows
    • Git
      • Bitbucket
    • Harbor
    • Jenkins
    • Spinnaker
    • TeamCity
  • Coding
    • DevSecOps
    • Golang
    • Jquery & JavaScript
    • Laravel
    • NextJS 14 & ReactJS & Type Script
    • Python
    • Selenium
    • Terraform
      • AWS – Amazon Web Service
      • Azure Cloud
      • GCP – Google Cloud
  • Kubernetes & Container
    • Apache Kafka
      • Kafka
      • Kafka Connect
      • Lenses
    • Docker
    • Helm Chart
    • Isito-EnvoyFilter
    • Kong Gateway
    • Kubernetes
      • Ingress
      • Pod
    • Longhorn – Storage
    • MetalLB
    • OAuth2 Proxy
    • Vault
    • VictoriaMetrics
  • Log, Monitor & Tracing
    • DataDog
    • ELK
      • Kibana
      • Logstash
    • Fluent
    • Grafana
    • Prometheus
  • Uncategorized
  • Admin

Copyright © 2025 NimTechnology.