1) Setup the components.
Phần setup kafka, bạn có thể coi lại link này he:
Setup the needing components of Kafka
Bạn nhớ delele container cũ và run lại docker-compose nhé:docker-compose down -v
docker-compose up -d
2) Practice.
reference link:
https://github.com/confluentinc/demo-scene/blob/master/kafka-connect-single-message-transforms/day10.adoc
curl -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/source-voluble-datagen-day10-00/config \
-d '{
"connector.class" : "io.mdrogalis.voluble.VolubleSourceConnector",
"genkp.day10-transactions.with" : "#{Internet.uuid}",
"genv.day10-transactions.cost.with" : "#{Commerce.price}",
"genv.day10-transactions.units.with" : "#{number.number_between '\''1'\'','\''99'\''}",
"genv.day10-transactions.txn_date.with" : "#{date.past '\''10'\'','\''DAYS'\''}",
"genv.day10-transactions.cc_num.with" : "#{Business.creditCardNumber}",
"genv.day10-transactions.cc_exp.with" : "#{Business.creditCardExpiry}",
"genv.day10-transactions.card_type.with" : "#{Business.creditCardType}",
"genv.day10-transactions.customer_remarks.with": "#{BackToTheFuture.quote}",
"genv.day10-transactions.item.with" : "#{Beer.name}",
"topic.day10-transactions.throttle.ms" : 1000
}'
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day10-00/config \
-d '{
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url" : "jdbc:mysql://mysql:3306/demo",
"connection.user" : "mysqluser",
"connection.password" : "mysqlpw",
"topics" : "day10-transactions",
"tasks.max" : "4",
"auto.create" : "true",
"auto.evolve" : "true",
"transforms" : "changeTableName",
"transforms.changeTableName.type" : "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.changeTableName.regex" : ".*",
"transforms.changeTableName.replacement": "production_data"
}'
docker exec -it mysql bash -c 'mysql -u root -p$MYSQL_ROOT_PASSWORD demo'
show tables;
Dropping fields in a sink connector:
docker exec kafkacat kafkacat -b broker:29092 -r http://schema-registry:8081 -s key=s -s value=avro -t day10-transactions -C -c1 -o-1 -u -q -J | jq '.payload'

[Kafka-connect] Single Message Transform: lesson 5 MaskField
Ở bài 5 chúng ta học cách che các dữ liệu nhạnh cảm (sensitive infomantion) bằng MaskField, Nhưng chúng ta sẽ cảm wasteful(lãng phí) ghi vào database nhưng bị che và cũng chả làm gì.
Vậy chúng ta sẽ drop các field ấy bằng ReplaceField
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day10-01/config \
-d '{
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url" : "jdbc:mysql://mysql:3306/demo",
"connection.user" : "mysqluser",
"connection.password" : "mysqlpw",
"topics" : "day10-transactions",
"tasks.max" : "4",
"auto.create" : "true",
"auto.evolve" : "true",
"transforms" : "dropCC",
"transforms.dropCC.type" : "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.dropCC.blacklist": "cc_num,cc_exp,card_type"
}'
Giờ kiểm tra database
docker exec -it mysql bash -c 'mysql -u root -p$MYSQL_ROOT_PASSWORD demo'
show tables;
describe `day10-transactions`;

Các drop field này thì có vẻ là ngon nghẻ hơn đúng hem!
Including only certain fields in a source connector
Giờ bạn có 100 field nhưng giờ bạn chỉ muốn giữ 3 field chẳng lẽ ngồi điền drop 97 field còn lại
Chúng ta có 1 cách khác là khai báo chỉ giữ lại cần thiết và drop hết mấy cái còn lại.
curl -X PUT http://localhost:8083/connectors/source-jdbc-mysql-day10-00/config \
-H "Content-Type: application/json" -d '{
"connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url" : "jdbc:mysql://mysql:3306/demo",
"connection.user" : "mysqluser",
"connection.password" : "mysqlpw",
"topic.prefix" : "day10-",
"poll.interval.ms" : 10000,
"tasks.max" : 1,
"table.include" : "production_data",
"mode" : "bulk",
"transforms" : "selectFields",
"transforms.selectFields.type" : "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.selectFields.whitelist": "item,cost,units,txn_date"
}'
Nếu bạn run luôn command bên trên sẽ bị lỗi:
Caused by: org.apache.avro.SchemaParseException: Illegal character in: day10-transactions
Giờ bạn xoá mấy container cũ đi và run lại
docker-compose down -v
docker-compose up -d
>>>>>>>>>>>>
curl -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/source-voluble-datagen-day10-00/config \
-d '{
"connector.class" : "io.mdrogalis.voluble.VolubleSourceConnector",
"genkp.day10-transactions.with" : "#{Internet.uuid}",
"genv.day10-transactions.cost.with" : "#{Commerce.price}",
"genv.day10-transactions.units.with" : "#{number.number_between '\''1'\'','\''99'\''}",
"genv.day10-transactions.txn_date.with" : "#{date.past '\''10'\'','\''DAYS'\''}",
"genv.day10-transactions.cc_num.with" : "#{Business.creditCardNumber}",
"genv.day10-transactions.cc_exp.with" : "#{Business.creditCardExpiry}",
"genv.day10-transactions.card_type.with" : "#{Business.creditCardType}",
"genv.day10-transactions.customer_remarks.with": "#{BackToTheFuture.quote}",
"genv.day10-transactions.item.with" : "#{Beer.name}",
"topic.day10-transactions.throttle.ms" : 1000
}'
>>>>
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day10-00/config \
-d '{
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url" : "jdbc:mysql://mysql:3306/demo",
"connection.user" : "mysqluser",
"connection.password" : "mysqlpw",
"topics" : "day10-transactions",
"tasks.max" : "4",
"auto.create" : "true",
"auto.evolve" : "true",
"transforms" : "changeTableName",
"transforms.changeTableName.type" : "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.changeTableName.regex" : ".*",
"transforms.changeTableName.replacement": "production_data"
}'
Giờ run tạo lại connector source-jdbc-mysql-day10-00. đang học lười fix bug lắm.
giờ kiêm tra topic: day10-production_data
docker exec kafkacat kafkacat -b broker:29092 -r http://schema-registry:8081 -s key=s -s value=avro -t day10-production_data -C -o-1 -u -q -J | jq '.payload'

Renaming fields
Giờ khi mà mình stream từ topic -> database. chúng ta sẽ thực hiện renamed field txn_date(topic) -> transaction_timestamp(database)
thì ta sẽ làm như sau:
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day10-02/config \
-d '{
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url" : "jdbc:mysql://mysql:3306/demo",
"connection.user" : "mysqluser",
"connection.password" : "mysqlpw",
"topics" : "day10-production_data",
"tasks.max" : "4",
"auto.create" : "true",
"auto.evolve" : "true",
"transforms" : "renameTS",
"transforms.renameTS.type" : "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.renameTS.renames": "txn_date:transaction_timestamp"
}'
>>>>>>>>>
docker exec -it mysql bash -c 'mysql -u root -p$MYSQL_ROOT_PASSWORD demo'
describe `day10-production_data`;
