Bài này chúng ta sem cách anh tây hướng dẫn insert field hay còn gọi là cột dữ liệu mà ảnh muốn khi mà anh sink data từ topic của kafka vào database (Mysql)
và đây là bài của ảnh:
https://github.com/confluentinc/demo-scene/blob/master/kafka-connect-single-message-transforms/day1.adoc
1) Setup the needing components of Kafka
Và dưới đây là phần note của mình:
git clone https://github.com/confluentinc/demo-scene.git
cd demo-scene/
cd kafka-connect-single-message-transforms/
Giờ run docker-compose
Ở đây mình cung cấp file nội dụng file docker-compose.yaml luôn he
---
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:6.0.0
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:6.0.0
container_name: kafka
depends_on:
- zookeeper
ports:
# "`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-
# An important note about accessing Kafka from clients on other machines:
# -----------------------------------------------------------------------
#
# The config used here exposes port 9092 for _external_ connections to the broker
# i.e. those from _outside_ the docker network. This could be from the host machine
# running docker, or maybe further afield if you've got a more complicated setup.
# If the latter is true, you will need to change the value 'localhost' in
# KAFKA_ADVERTISED_LISTENERS to one that is resolvable to the docker host from those
# remote clients
#
# For connections _internal_ to the docker network, such as from other services
# and components, use broker:29092.
#
# See https://rmoff.net/2018/08/02/kafka-listeners-explained/ for details
# "`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-
#
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://broker:9092
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100
schema-registry:
image: confluentinc/cp-schema-registry:6.0.0
container_name: schema-registry
ports:
- "8081:8081"
depends_on:
- broker
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
kafka-connect:
image: confluentinc/cp-kafka-connect-base:6.0.0
container_name: kafka-connect
depends_on:
- broker
- schema-registry
ports:
- 8083:8083
environment:
CONNECT_BOOTSTRAP_SERVERS: "broker:29092"
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: kafka-connect
CONNECT_CONFIG_STORAGE_TOPIC: _connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: _connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: _connect-status
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
# You can either specify your AWS credentials here, or create a .env file
# in which the environment variables are set. The latter is preferable since you can isolate
# your credentials from this Docker Compose file.
AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID}
AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY}
# ---------------
CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components,/data/connect-jars
# If you want to use the Confluent Hub installer to d/l component, but make them available
# when running this offline, spin up the stack once and then run :
# docker cp kafka-connect:/usr/share/confluent-hub-components ./data/connect-jars
# volumes:
# - $PWD/data:/data
# In the command section, $ are replaced with $$ to avoid the error 'Invalid interpolation format for "command" option'
command:
- bash
- -c
- |
echo "Installing Connector"
confluent-hub install --no-prompt confluentinc/kafka-connect-s3:5.5.2
confluent-hub install --no-prompt mdrogalis/voluble:0.3.1
confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:10.0.1
confluent-hub install --no-prompt confluentinc/connect-transforms:1.3.2
confluent-hub install --no-prompt jcustenborder/kafka-connect-transform-common:0.1.0.36
confluent-hub install --no-prompt jcustenborder/kafka-connect-simulator:0.1.120
#
# JDBC Drivers
# ------------
# MySQL
cd /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib
# See https://dev.mysql.com/downloads/connector/j/
curl http://ftp.jaist.ac.jp/pub/mysql/Downloads/Connector-J/mysql-connector-java-8.0.28.tar.gz | tar xz
#
echo "Launching Kafka Connect worker"
/etc/confluent/docker/run &
#
sleep infinity
ksqldb:
image: confluentinc/ksqldb-server:0.14.0
container_name: ksqldb
depends_on:
- broker
ports:
- "8088:8088"
environment:
KSQL_LISTENERS: http://0.0.0.0:8088
KSQL_BOOTSTRAP_SERVERS: broker:29092
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
KSQL_KSQL_CONNECT_URL: http://kafka-connect:8083
KSQL_KSQL_SCHEMA_REGISTRY_URL: http://schema-registry:8081
KSQL_KSQL_SERVICE_ID: confluent_rmoff_01
KSQL_KSQL_HIDDEN_TOPICS: '^_.*'
# Other systems
mysql:
# *-----------------------------*
# To connect to the DB:
# docker exec -it mysql bash -c 'mysql -u root -p$MYSQL_ROOT_PASSWORD demo'
# *-----------------------------*
image: mysql:8.0
container_name: mysql
ports:
- 3306:3306
environment:
- MYSQL_ROOT_PASSWORD=Admin123
- MYSQL_USER=mysqluser
- MYSQL_PASSWORD=mysqlpw
volumes:
- ${PWD}/00_setup_db.sql:/docker-entrypoint-initdb.d/00_setup_db.sql
# - ${PWD}/data:/data
kafkacat:
image: edenhill/kafkacat:1.6.0
container_name: kafkacat
links:
- broker
- schema-registry
entrypoint:
- /bin/sh
- -c
- |
apk add jq;
while [ 1 -eq 1 ];do sleep 60;done
docker-compose up -d
https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/
trong docker-compose có action download file mysql-connector-java thì bạn nhớ check link còn sống ko nhé!
Kiểm tra kafka-connect đã lên chưa?
bash -c ' \
echo -e "\n\n=============\nWaiting for Kafka Connect to start listening on localhost ⏳\n=============\n"
while [ $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) -ne 200 ] ; do
echo -e "\t" $(date) " Kafka Connect listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) " (waiting for 200)"
sleep 5
done
echo -e $(date) "\n\n--------------\n\o/ Kafka Connect is ready! Listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) "\n--------------\n"
'
2) Practice
giờ tạo connector với kafka-connect: data generator
cái này là source: Data đưa vào topic
curl -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/source-voluble-datagen-00/config \
-d '{
"connector.class" : "io.mdrogalis.voluble.VolubleSourceConnector",
"genkp.customers.with" : "#{Internet.uuid}",
"genv.customers.name.with" : "#{HitchhikersGuideToTheGalaxy.character}",
"genv.customers.email.with" : "#{Internet.emailAddress}",
"genv.customers.location->city.with" : "#{HitchhikersGuideToTheGalaxy.location}",
"genv.customers.location->planet.with" : "#{HitchhikersGuideToTheGalaxy.planet}",
"topic.customers.records.exactly" : 10,
"genkp.transactions.with" : "#{Internet.uuid}",
"genv.transactions.customer_id.matching" : "customers.key",
"genv.transactions.cost.with" : "#{Commerce.price}",
"genv.transactions.card_type.with" : "#{Business.creditCardType}",
"genv.transactions.item.with" : "#{Beer.name}",
"topic.transactions.throttle.ms" : 500
}'
thông qua câu lệnh này chúng ta có thể kiểm tra status của connectors
curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
column -s : -t| sed 's/\"//g'| sort
giờ tạo Sink data from Kafka to MySQL (JDBC Sink connector)
cái này là data từ topic được ghi vào DB
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-00/config \
-d '{
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url" : "jdbc:mysql://mysql:3306/demo",
"connection.user" : "mysqluser",
"connection.password" : "mysqlpw",
"topics" : "transactions",
"tasks.max" : "4",
"auto.create" : "true"
}'
Giờ kiểm tra lại
curl -s "http://localhost:8083/connectors?expand=info&expand=status" | jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | column -s : -t| sed 's/\"//g'| sort
GIờ exec vào database kiểm tra, vào thằng table demo luôn:docker exec -it mysql mysql -u mysqluser -pmysqlpw demo
mysql> show tables;
+----------------+
| Tables_in_demo |
+----------------+
| transactions |
+----------------+
1 row in set (0.00 sec)
mysql> describe transactions;
+-------------+------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------------+------+------+-----+---------+-------+
| customer_id | text | YES | | NULL | |
| cost | text | YES | | NULL | |
| item | text | YES | | NULL | |
| card_type | text | YES | | NULL | |
+-------------+------+------+-----+---------+-------+
4 rows in set (0.01 sec)
chúng ta có thể thấy ở trên là sink đã tạo table.
khí select data trong table này thì dùng câu sau:select * from transactions;
giờ chúng ta add thêm 1 số về transform:
Add Message Timestamp to payload sent to sinks: InsertField
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-00/config \
-d '{
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url" : "jdbc:mysql://mysql:3306/demo",
"connection.user" : "mysqluser",
"connection.password" : "mysqlpw",
"topics" : "transactions",
"tasks.max" : "4",
"auto.create" : "true",
"auto.evolve" : "true",
"transforms" : "insertTS",
"transforms.insertTS.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertTS.timestamp.field": "messageTS"
}'
Note auto.evolve=true
otherwise the target table won’t hold the new field unless it happens to exist already.
chúng ta thêm 3 dòng này:
"transforms" : "insertTS",
"transforms.insertTS.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertTS.timestamp.field": "messageTS"
Mình sẽ giải thích đơn giản:
– “transforms” : “insertTS” Điều này nghĩ là transforms này bạn đặt cho nó 1 label là insertTS
– “transforms.insertTS.timestamp.field”: “messageTS”
Mình sẽ giải thích cái này trước.
https://docs.confluent.io/platform/current/connect/transforms/insertfield.html
Ở links trên thì chúng được cung cấp nhiều kiểu kiểu dữ liệu và mình chọn là “timestamp.field”
value của timestamp là ngày giờ.
và đặt tên cho key ấy là: “messageTS“ và dự liệu của key là: ngày giờ
Lúc này sink sẽ tạo column là messageTS với type là datetime
– “transforms.insertTS.type”: “org.apache.kafka.connect.transforms.InsertField$Value”
Ờ đây thì sink lấy value (gía trị) của timestamp.field ví dụ như: 2022-02-20 10:22:17.871 và điện vào column messageTS
Và việc trên sẽ được lặp lại với từng dòng dữ liệu
Quay trở lại kiểm tra database:
docker exec -it mysql mysql -u mysqluser -pmysqlpw demo
Mình có 1 ảnh ví dụ về static transforms: