Skip to content

NimTechnology

Trình bày các công nghệ CLOUD một cách dễ hiểu.

  • Kubernetes & Container
    • Docker
    • Kubernetes
      • Ingress
      • Pod
    • Helm Chart
    • OAuth2 Proxy
    • Isito-EnvoyFilter
    • Apache Kafka
      • Kafka
      • Kafka Connect
      • Lenses
    • Vault
    • Longhorn – Storage
    • VictoriaMetrics
    • MetalLB
    • Kong Gateway
  • CI/CD
    • ArgoCD
    • ArgoWorkflows
    • Argo Events
    • Spinnaker
    • Jenkins
    • Harbor
    • TeamCity
    • Git
      • Bitbucket
  • Coding
    • DevSecOps
    • Terraform
      • GCP – Google Cloud
      • AWS – Amazon Web Service
      • Azure Cloud
    • Golang
    • Laravel
    • Python
    • Jquery & JavaScript
    • Selenium
  • Log, Monitor & Tracing
    • DataDog
    • Prometheus
    • Grafana
    • ELK
      • Kibana
      • Logstash
  • BareMetal
    • NextCloud
  • Toggle search form

[Kafka-connect] Streaming the data of MySQL throughs Kafka-connect and Debezium plugin.

Posted on February 13, 2022July 11, 2022 By nim No Comments on [Kafka-connect] Streaming the data of MySQL throughs Kafka-connect and Debezium plugin.

Contents

Toggle
  • 1) Install and configurate mysql to use to the kafka-connect Lab.
  • 2) Config kafka-connect.
    • 2.1) Repairing
    • 2.2) Action config
    • 2.3) Add a new column inside a table that is streaming.
    • 2.4) Create a new table
    • 2.5) Adding the old tables but before that it wasn’t added into connector.
  • 3) Note.

1) Install and configurate mysql to use to the kafka-connect Lab.

Run docker MySQL:

docker run -p 3306:3306 --name mastermysql -e MYSQL_ROOT_PASSWORD=123456 -d mysql:5.7

docker exec -it mastermysql bash
apt update -y
apt install vim -y
cd /etc/mysql/

giờ sửa file: mysql.cnf

vi mysql.cnf

[mysqld]
server-id = 42
log_bin = mysql-bin
binlog_format = row
expire_logs_days = 10

xong rồi thoát ra restart docker

docker restart mastermysql
docker exec -it mastermysql bash

mysqladmin variables -uroot -p|grep log_bin
như là đã bật log_bin.

In order for Debezium MySQL to track all the changes in bin-log, it needs a user with SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT permissions. Grant them to the user you intend to use for Debezium.

giờ chúng ta tạo 1 User để cho kafka-connect.
ở dưới là user: debezium và pass là: 123456

>>>Mysql 5.7
mysql -uroot -p
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE,
REPLICATION CLIENT ON *.* TO 'debezium' IDENTIFIED BY '123456';

>>> Mysql 8
docker run -p 3306:3306 --name mastermysql -e MYSQL_ROOT_PASSWORD=123456 -d mysql:8.0

CREATE USER 'debezium'@'%' IDENTIFIED BY '123456';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium'@'%';

Xong rồi mình tạo database và table như hình bên dươi.

2) Config kafka-connect.

2.1) Repairing

Mình đã hướng dẫn càc bạn cài các components sau:

Kafka:
https://nimtechnology.com/2022/02/13/kafka-install-kafka-and-zookeeper-cluster-on-kubernetes/
Hoặc starting kafka và zookeeper bằng command lại để lab.
https://nimtechnology.com/2022/01/30/kafka-zookeeper-starting-kafka-and-zookeeper/

Kafka-connect:
https://nimtechnology.com/2022/02/09/kafka-connect-install-kafka-connect-on-kubernetes-through-helm-chart/

Schema registry:
https://nimtechnology.com/2022/02/13/kafka-installing-schema-registry-to-use-for-the-kafka-and-kafka-connect-model/

Lense nữa:
https://nimtechnology.com/2022/02/09/kafka-connect-install-lenses-to-manage-kafka/
cái này thì cần lincense.
Bạn có thể dụng câu lệnh curl trực tiếp vào kafka connect nhé. Command sao mình chỉ sau he.

Reference Links:
https://dev-yeye.tistory.com/35?category=904307

Bạn chuẩn bị đầu đủ các phần trên nhé

2.2) Action config

Giờ login vào lenses.
tạo connector

Tạo cdc Mysql

Vậy CDC là gì?

Đầu tiên cdc là Change Data Capture (CDC)
Ở bước tạo mysql mình có tạo user debezium để nó thể đọc được log_bin, nghĩa là các thay đổi trong database.
nếu bạn làm mysql master-slave rồi thì thấy cách cdc này nó cũng như thế.

reference links for master:
https://developer.confluent.io/learn-kafka/data-pipelines/kafka-data-ingestion-with-cdc/

Nếu lenses của các bạn ko có icon giống mình thì đọc lại bài kafka-connect và build lại image nhé.
Links add plugin for kafka connect

giờ chúng ta cần copy data và mình cung cấp và paste vào đó

Config sau là dành cho mysql 5.7

connector.class=io.debezium.connector.mysql.MySqlConnector
database.user=debezium
database.server.id=223344
database.history.kafka.topic=dbhistory.voting_contra.v2
database.history.kafka.bootstrap.servers=kafka:9092
database.server.name=connect_wordpress_note
database.port=3306
include.schema.changes=false
table.whitelist=wordpress_note.students
database.hostname=192.168.101.27
database.password=***********
name=cdc-wordpress-note-connector
database.whitelist=wordpress_note

Config sau là dành cho mysql 8.0

connector.class=io.debezium.connector.mysql.MySqlConnector
database.allowPublicKeyRetrieval=true
database.user=debezium
database.server.id=223344
database.history.kafka.topic=dbhistory.school.v1
database.history.kafka.bootstrap.servers=kafka:9092
database.server.name=connect_school
database.port=3306
include.schema.changes=false
table.whitelist=school.students
database.hostname=192.168.101.27
database.password=***********
name=cdc-school-connector
database.whitelist=school

Note config with mysql 8.x
Bạn sẽ gặp lỗi này “Public Key Retrieval Is Not Allowed” hay đọc bài viết bên dứoi
https://rmoff.net/2019/10/23/debezium-mysql-v8-public-key-retrieval-is-not-allowed/

curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/source-debezium-mysql-00/config \
    -d '{
          "connector.class": "io.debezium.connector.mysql.MySqlConnector",
          "database.hostname": "mysql",
          "database.port": "3306",
          "database.user": "debezium",
          "database.password": "dbz",
          "database.server.id": "42",
          "database.allowPublicKeyRetrieval":"true", <<<==== add this line to FIX Error
          "database.server.name": "asgard",
          "table.whitelist": "demo.customers",
          "database.history.kafka.bootstrap.servers": "kafka:29092",
          "database.history.kafka.topic": "asgard.dbhistory.demo" ,
          "include.schema.changes": "true"
    }'


Explain the configurations

name: đơn giản là name của connector đó.

connector.class
cái này chỗ khai báo các plugin của chúng ta.


database.port: Port của DB
database.hostname: IP hoặc hostname của DB
database.user: username mà debezium để đọc log_bin
database.password: password của user trên

database.server.id: số này bạn cứ để số nào cũng được nhưng duy nhất nhé (unique) trên cluster kafka, Nó chỉ có ý nghĩa trên kafka thôi

database.whitelist: đây là cái database bạn muốn stream data
table.whitelist: đây là table mà bạn muốn stream data

database.server.name
table.whitelist

với cặp ở trên mình thấy nó có liên quan đến Schema registry

Mình vào kiểm tra


database.history.kafka.topic
chỗ này mình thấy tạo ra 1 topic như sau:

Nhìn sơ giống như lưu lại các bước drop, create database, table, ….


database.history.kafka.bootstrap.servers: cái này để bạn để IP và port của kafka

Các bạn hỏi ủa rồi nó có stream data trong table đó vào topic hem????

Mình thấy có topic này: <database.server.name>.<table.whitelist>

Bạn sẽ thấy là trong topic này đã có 3 record nghĩ là trong table của cũng đang có 3 row.

giờ mình sẽ thêm 1 row data nữa.

Mình xin phép add thêm ảnh của case khác để các bạn tham khảo:

2.3) Add a new column inside a table that is streaming.

Giờ cũng table đó mình add thêm 1 column

thì mình thấy là e.history.kafka.topic với topic tên là dbhistory.voting_contra.v2 đã tăng 1 record

Giờ thêm data vào table;

2.4) Create a new table

Giờ mình tạo 1 table mới.

Vậy giờ chúng ta edit config conector như thế nào?

Bạn thấy mình sửa như thế này.

Khi mình edit và save lại thì thấy có lỗi.

You may attempt to recover it by reconfiguring the connector to SCHEMA_ONLY_RECOVERY

Mình đã dò google:
https://gitter.im/debezium/dev/archives/2019/01/26

Jiri Pechanec @jpechane Jan 26 2019 13:18 UTC
@rajan-g Hi, please look at https://debezium.io/docs/connectors/mysql/ and search for schema_onyl_recovery string

Đây là điều chỉnh của mình.
connector.class=io.debezium.connector.mysql.MySqlConnector
database.user=debezium
database.server.id=223344
database.history.kafka.topic=dbhistory.voting_contra.v3
database.history.kafka.bootstrap.servers=kafka:9092
database.server.name=connect_wordpress_note
database.port=3306
include.schema.changes=false
table.whitelist=wordpress_note.students,wordpress_note.company
key.converter.schemas.enable=true
database.hostname=192.168.101.27
database.password=***********
name=cdc-wordpress-note-connector
value.converter.schemas.enable=true
database.whitelist=wordpress_note
snapshot.mode=schema_only_recovery

Note nho nhỏ:
Nếu ở lần đầu tiên create connector mà bạn chơi:
snapshot.mode=schema_only_recovery
thì sẽ bị báo lỗi

Could not find existing binlog information while attempting schema only recovery snapshot

file đã nhiều hơn. mà mình ko thấy lỗi nữa

chưa thấy tạo topic data.

Giờ thì topic data của chúng ta muốn đã được tạo.

2.5) Adding the old tables but before that it wasn’t added into connector.

Mình gặp trường hợp trong database của mình có 2 table:

Tiếp đên mình tạo connector với table company và với config như bên dưới

connector.class=io.debezium.connector.mysql.MySqlConnector
database.user=debezium
database.server.id=11
database.history.kafka.topic=_dbhistory.nimtechnology.v1
database.history.kafka.bootstrap.servers=kafka:9092
database.server.name=connect_nimtechnology
database.port=3306
table.whitelist=wordpress_note.company
database.hostname=192.168.101.27
database.password=123456
name=source.mysqldebezium.nimtechnology.v1
database.whitelist=wordpress_note
include.schema.changes=false
Kiểm tra topic thấy tạo ok

Giờ mình stream (CDC) thêm table students

connector.class=io.debezium.connector.mysql.MySqlConnector
database.user=debezium
database.server.id=11
database.history.kafka.topic=_dbhistory.nimtechnology.v2
database.history.kafka.bootstrap.servers=kafka:9092
database.server.name=connect_nimtechnology
database.port=3306
table.whitelist=wordpress_note.company,wordpress_note.students
database.hostname=192.168.101.27
database.password=123456
name=source.mysqldebezium.nimtechnology.v1
database.whitelist=wordpress_note
include.schema.changes=false
snapshot.mode=schema_only_recovery

Mình đã sửa database.history.kafka.topic và add thêm table students vào line “table.whitelist”

mình thấy tạo them topic history nhưng ko tạo topic Data students

Bạn phài insert data mới vào table students thì kafka connect mới tao topic

data có id 6 và name là nim

Mình show data có trong table.

3) Note.

nó còn 1 phần mình thấy hay được sài đó là:

Event Message Flattening with Single Message Transform
https://rmoff.net/2018/03/24/streaming-data-from-mysql-into-kafka-with-kafka-connect-and-debezium/

mình sẽ tìm hiêu và trình bày sau.

Command check mysql:

mysql -u {user} -p'{pw}' -h {host} -P {port} -D {db} -e "SHOW GRANTS"
Apache Kafka, Kafka Connect

Post navigation

Previous Post: [Kafka] Install kafka and zookeeper cluster on kubernetes.
Next Post: [Kafka-connect] Streaming the data of Postgresql through Kafka-connect and Debezium plugin.

More Related Articles

[Kafka] UI control Kafka, Kafka-connect, … It’s akhq.io Apache Kafka
[Kafka-connect]Single Message Transform: lesson 3 – Flatten and LongConverter Apache Kafka
[Kafka-connect] Install Kafka-connect on Kubernetes through helm-chart. Apache Kafka
[Kafka-connect] Single Message Transform: lesson 2 – ValueToKey and ExtractField in Sink Apache Kafka
[Kafka-connect] Single Message Transform: lesson 12 – Community Transformations Kafka Connect
[Kafka-connect] A few APIs are helpful in Kafka-connect. Apache Kafka

Leave a Reply Cancel reply

Your email address will not be published. Required fields are marked *

Tham Gia Group DevOps nhé!
Để Nim có nhiều động lực ra nhiều bài viết.
Để nhận được những thông báo mới nhất.

Recent Posts

  • [Azure] The subscription is not registered to use namespace ‘Microsoft.ContainerService’ May 8, 2025
  • [Azure] Insufficient regional vcpu quota left May 8, 2025
  • [WordPress] How to add a Dynamic watermark on WordPress. May 6, 2025
  • [vnet/Azure] VNet provisioning via Terraform. April 28, 2025
  • [tracetcp] How to perform a tracert command using a specific port. April 3, 2025

Archives

  • May 2025
  • April 2025
  • March 2025
  • February 2025
  • January 2025
  • December 2024
  • November 2024
  • October 2024
  • September 2024
  • August 2024
  • July 2024
  • June 2024
  • May 2024
  • April 2024
  • March 2024
  • February 2024
  • January 2024
  • December 2023
  • November 2023
  • October 2023
  • September 2023
  • August 2023
  • July 2023
  • June 2023
  • May 2023
  • April 2023
  • March 2023
  • February 2023
  • January 2023
  • December 2022
  • November 2022
  • October 2022
  • September 2022
  • August 2022
  • July 2022
  • June 2022
  • May 2022
  • April 2022
  • March 2022
  • February 2022
  • January 2022
  • December 2021
  • November 2021
  • October 2021
  • September 2021
  • August 2021
  • July 2021
  • June 2021

Categories

  • BareMetal
    • NextCloud
  • CI/CD
    • Argo Events
    • ArgoCD
    • ArgoWorkflows
    • Git
      • Bitbucket
    • Harbor
    • Jenkins
    • Spinnaker
    • TeamCity
  • Coding
    • DevSecOps
    • Golang
    • Jquery & JavaScript
    • Laravel
    • NextJS 14 & ReactJS & Type Script
    • Python
    • Selenium
    • Terraform
      • AWS – Amazon Web Service
      • Azure Cloud
      • GCP – Google Cloud
  • Kubernetes & Container
    • Apache Kafka
      • Kafka
      • Kafka Connect
      • Lenses
    • Docker
    • Helm Chart
    • Isito-EnvoyFilter
    • Kong Gateway
    • Kubernetes
      • Ingress
      • Pod
    • Longhorn – Storage
    • MetalLB
    • OAuth2 Proxy
    • Vault
    • VictoriaMetrics
  • Log, Monitor & Tracing
    • DataDog
    • ELK
      • Kibana
      • Logstash
    • Fluent
    • Grafana
    • Prometheus
  • Uncategorized
  • Admin

Copyright © 2025 NimTechnology.