1) Setup the components.
Phần setup kafka, bạn có thể coi lại link này he:
Setup the needing components of Kafka
Bạn nhớ delele container cũ và run lại docker-compose nhé:docker-compose down -v
docker-compose up -d
2) Practice.
reference link:
https://github.com/confluentinc/demo-scene/blob/master/kafka-connect-single-message-transforms/day8.adoc
Tạo the data generator
https://www.confluent.io/hub/mdrogalis/voluble
curl -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/source-voluble-datagen-day8-00/config \
-d '{
"connector.class" : "io.mdrogalis.voluble.VolubleSourceConnector",
"genkp.day8-transactions.with" : "#{Internet.uuid}",
"genv.day8-transactions.cost.with" : "#{Commerce.price}",
"genv.day8-transactions.txn_date.with" : "#{date.past '\''10'\'','\''DAYS'\''}",
"genv.day8-transactions.card_type.with" : "#{Business.creditCardType}",
"genv.day8-transactions.customer_remarks.with": "#{BackToTheFuture.quote}",
"genv.day8-transactions.item.with" : "#{Beer.name}",
"topic.day8-transactions.throttle.ms" : 1000
}'
Giờ chúng ta kiểm tra data trong topic và chính xác là field: txn_date
docker exec kafkacat kafkacat -b broker:29092 -r http://schema-registry:8081 -s key=s -s value=avro -t day8-transactions -C -c5 -o-5 -u -q -J | \
jq '.payload.Gen0.txn_date.string'

Giờ chúng ta kiểm tra schema sẽ trông như thế nào?
curl -s "http://localhost:8081/subjects/day8-transactions-value/versions/latest" | jq '.schema|fromjson[]'

Giờ anh Tây tạo sink connector bạn sẽ hiểu vấn đề:
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day8-00/config \
-d '{
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url" : "jdbc:mysql://mysql:3306/demo",
"connection.user" : "mysqluser",
"connection.password": "mysqlpw",
"topics" : "day8-transactions",
"tasks.max" : "4",
"auto.create" : "true",
"auto.evolve" : "true"
}'
docker exec -it mysql bash -c 'mysql -u root -p$MYSQL_ROOT_PASSWORD demo'
show tables;
describe `day8-transactions`;

Nhưng thật sự chúng ta sẽ muốn column txn_date có type là date
như anh Tây chỉ chúng ta có thể sử dụng TimestampConverter
: Convert timestamps between different formats such as Unix epoch, strings, and Connect Date/Timestamp types. Applies to individual fields or to the entire value.
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day8-01/config \
-d '{
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url" : "jdbc:mysql://mysql:3306/demo",
"connection.user" : "mysqluser",
"connection.password" : "mysqlpw",
"topics" : "day8-transactions",
"tasks.max" : "4",
"auto.create" : "true",
"auto.evolve" : "true",
"transforms" : "convertTS,changeTopic",
"transforms.convertTS.type" : "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.convertTS.field" : "txn_date",
"transforms.convertTS.format" : "EEE MMM dd HH:mm:ss zzz yyyy",
"transforms.convertTS.target.type": "Timestamp",
"transforms.changeTopic.type" : "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.changeTopic.regex" : "(.*)",
"transforms.changeTopic.replacement": "$1_withTS"
}'
step 1: khai báo label convertTS
step2: apply “org.apache.kafka.connect.transforms.TimestampConverter$Value”
step3: xác định field name sẽ action TimestampConverter.
step4: design format data “EEE MMM dd HH:mm:ss zzz yyyy”, nghĩa là data hiện thị trên destination data sẽ hiện thị ntn?
step5: định nghĩa target type của field đó: “transforms.convertTS.target.type”: “Timestamp”,
docker exec -it mysql bash -c 'mysql -u root -p$MYSQL_ROOT_PASSWORD demo'
describe `day8-transactions_withTS`;

Giờ ngon hơn rồi nhé
select txn_date from `day8-transactions_withTS` LIMIT 5;

Giờ nếu buốn thay đổi type và format
thì chúng ta chỉ cần thay transforms.convertTS.target.type
....
"transforms.convertTS.target.type": "Date",
....
thì result sẽ như thế này:

......
"transforms.convertTS.target.type": "Time",
.....
thì kết quả

.....
"transforms.convertTS.target.type": "unix",
....

Accessing timestamps in nested fields
Unfortunately the TimestampConverter
only works on root-level elements; it can’t be used on timestamp fields that are nested in other fields. You’d need to either use Flatten
first, or write your own transformation.