1) Setup the components.
Phần setup kafka, bạn có thể coi lại link này he:
Setup the needing components of Kafka
Bạn nhớ delele container cũ và run lại docker-compose nhé:docker-compose down -v
docker-compose up -d
2) Practice.
Bài này kiểu như bạn thêm số field mà trong quá trình kafka-connect stream data.
Ví dụ trong stream data từ DB -> kafka-connect bạn add thêm field tên DB hay hostname DB
Tạo the data generator
https://www.confluent.io/hub/mdrogalis/voluble
curl -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/source-voluble-datagen-day6-00/config \
-d '{
"connector.class" : "io.mdrogalis.voluble.VolubleSourceConnector",
"genkp.day6-transactions.with" : "#{Internet.uuid}",
"genv.day6-transactions.cost.with" : "#{Commerce.price}",
"genv.day6-transactions.card_type.with" : "#{Business.creditCardType}",
"genv.day6-transactions.item.with" : "#{Beer.name}",
"topic.day6-transactions.throttle.ms" : 5000,
"transforms" : "insertStaticField1,insertStaticField2",
"transforms.insertStaticField1.type" : "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertStaticField1.static.field": "sourceSystem",
"transforms.insertStaticField1.static.value": "NeverGonna",
"transforms.insertStaticField2.type" : "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertStaticField2.static.field": "ingestAgent",
"transforms.insertStaticField2.static.value": "GiveYouUp"
}'


Bạn thấy chúng ta add thêm các fields có value (giá trị) là statics.
Ngoài ra chúng ta còn có thể add thêm các field khách mà InsertField cung cấp:
Name | Description | Type | Default | Valid Values | Importance |
---|---|---|---|---|---|
offset.field | Field name for Apache Kafka® offset. This is only applicable to sink connectors. Suffix with ! to make this a required field, or ? to keep it optional (the default). | string | null | medium | |
partition.field | Field name for Kafka partition. Suffix with ! to make this a required field, or ? to keep it optional (the default). | string | null | medium | |
static.field | Field name for static data field. Suffix with ! to make this a required field, or ? to keep it optional (the default). | string | null | medium | |
static.value | If field name is configured, the static field value. | string | null | medium | |
timestamp.field | Field name for record timestamp. Suffix with ! to make this a required field, or ? to keep it optional (the default). | string | null | medium | |
topic.field | Field name for Kafka topic. Suffix with ! to make this a required field, or ? to keep it optional (the default). | string | null | medium |
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day6-00/config \
-d '{
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url" : "jdbc:mysql://mysql:3306/demo",
"connection.user" : "mysqluser",
"connection.password" : "mysqlpw",
"topics" : "day6-transactions",
"tasks.max" : "4",
"auto.create" : "true",
"auto.evolve" : "true",
"transforms" : "insertPartition,insertOffset,insertTopic",
"transforms.insertPartition.type" : "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertPartition.partition.field": "kafkaPartition",
"transforms.insertOffset.type" : "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertOffset.offset.field" : "kafkaOffset",
"transforms.insertTopic.type" : "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertTopic.topic.field" : "kafkaTopic"
}'

docker exec -it mysql bash -c 'mysql -u root -p$MYSQL_ROOT_PASSWORD demo'
SELECT * FROM `day6-transactions` LIMIT 5;

Bạn sẽ tham khảo cái link này để hiểu được các properties
https://docs.confluent.io/platform/current/connect/transforms/insertfield.html#properties
https://github.com/confluentinc/demo-scene/blob/master/kafka-connect-single-message-transforms/day6.adoc