Bài này thì anh Tây chỉ cánh chỉnh topic name trong source và table name trong sink bằng cách kêt với timestamp.
1) Setup the components.
Phần setup kafka, bạn có thể coi lại link này he:
Setup the needing components of Kafka
Bạn nhớ delele container cũ và run lại docker-compose nhé:docker-compose down -v
docker-compose up -d
2) Practice.
Tạo the data generator
https://www.confluent.io/hub/mdrogalis/voluble
curl -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/source-voluble-datagen-day7-00/config \
-d '{
"connector.class" : "io.mdrogalis.voluble.VolubleSourceConnector",
"genkp.day7-transactions.with" : "#{Internet.uuid}",
"genv.day7-transactions.cost.with" : "#{Commerce.price}",
"genv.day7-transactions.txn_date.with" : "#{date.past '\''10'\'','\''DAYS'\''}",
"genv.day7-transactions.card_type.with" : "#{Business.creditCardType}",
"genv.day7-transactions.customer_remarks.with": "#{BackToTheFuture.quote}",
"genv.day7-transactions.item.with" : "#{Beer.name}",
"topic.day7-transactions.throttle.ms" : 1000
}'
Giống như RegExRouter
, thì TimeStampRouter
có thể được sử dụng để modify topic name
Nghĩa là chúng ta sẽ có nhu cầu them timestamp vào topic name.
For example, instead of streaming messages from Kafka to an Elasticsearch index called cars
, they can be routed to monthly indices e.g. cars_2020-10
, cars_2020-11
, cars_2020-12
, etc.
TimeStampRouter
lấy timestamp của của kafka message
Giờ khi sink data từ topic -> database chúng ta sử dụng TimeStampRouter
:
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day7-00/config \
-d '{
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url" : "jdbc:mysql://mysql:3306/demo",
"connection.user" : "mysqluser",
"connection.password" : "mysqlpw",
"topics" : "day7-transactions",
"tasks.max" : "4",
"auto.create" : "true",
"auto.evolve" : "true",
"transforms" : "addTimestampToTopic",
"transforms.addTimestampToTopic.type" : "org.apache.kafka.connect.transforms.TimestampRouter",
"transforms.addTimestampToTopic.topic.format" : "${topic}_${timestamp}",
"transforms.addTimestampToTopic.timestamp.format": "YYYY-MM-dd"
}'
thoại nhìn có 2 phần:
– transforms.addTimestampToTopic.topic.format: định dạnh và sắp sếp giữa topic name và timestamp.
– transforms.addTimestampToTopic.timestamp.format: định dang timestamp
giờ kiểm tra database:
docker exec -it mysql bash -c 'mysql -u root -p$MYSQL_ROOT_PASSWORD demo'
show tables;
Ngoài cách lấy timestamp từ kafka.
thì còn 1 cách nữa là timestamp từ field trong message
Vậy là như thế nào?
mình sẽ thực hành select data trong table:
SELECT txn_date, item, cost FROM `day7-transactions_2022-02-26` LIMIT 5;
chúng ta để ý column txn_date nó có định dạng thời gian data cử dụng row. từ data của field txn_date -> trích xuất ra ngày -> setup format cho topic name
There is a Single Message Transform called MessageTimestampRouter
which is part of Confluent Platform and can be used to route data based on a time field in the message value itself.
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day7-00/config \
-d '{
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url" : "jdbc:mysql://mysql:3306/demo",
"connection.user" : "mysqluser",
"connection.password" : "mysqlpw",
"topics" : "day7-transactions",
"tasks.max" : "4",
"auto.create" : "true",
"auto.evolve" : "true",
"transforms" : "addTimestampToTopicFromField",
"transforms.addTimestampToTopicFromField.type" : "io.confluent.connect.transforms.MessageTimestampRouter",
"transforms.addTimestampToTopicFromField.message.timestamp.keys" : "txn_date",
"transforms.addTimestampToTopicFromField.message.timestamp.format": "EEE MMM dd HH:mm:ss zzz yyyy",
"transforms.addTimestampToTopicFromField.topic.format" : "${topic}_${timestamp}",
"transforms.addTimestampToTopicFromField.topic.timestamp.format" : "YYYY-MM-dd"
}'
Khi show status connector bạn sẽ thấy fail
curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
column -s : -t| sed 's/\"//g'| sort
Currently fails…
org.apache.kafka.connect.errors.DataException: Only Map objects supported in absence of schema for [appending message's timestamp field to topic], found: org.apache.kafka.connect.data.Struct
at io.confluent.connect.transforms.util.Requirements.requireMap(Requirements.java:30)
at io.confluent.connect.transforms.MessageTimestampRouter.apply(MessageTimestampRouter.java:132)
at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:146)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:180)
... 14 more
TRACE [sink-jdbc-mysql-day7-00|task-2] Applying transformation io.confluent.connect.transforms.MessageTimestampRouter to SinkRecord{kafkaOffset=2300, timestampType=CreateTime} ConnectRecord{topic='day7-transactions', kafkaPartition=0, key=013e350e-ac03-44cd-bc2b-7b348ec4df6b, keySchema=Schema{STRING}, value=Struct{txn_date=Thu Dec 03 02:25:24 GMT 2020,cost=73.58,item=Delirium Noctorum,card_type=mastercard,customer_remarks=He laid out Biff in one punch. I didn't know he had it in him. He's never stood up to Biff in his life!}, valueSchema=Schema{io.mdrogalis.Gen0:STRUCT}, timestamp=1607681956641, headers=ConnectHeaders(headers=)} (org.apache.kafka.connect.runtime.TransformationChain:47)
The cause of this is that the Single Message Transform currently expects to handle raw JSON formatted records – not Avro/Protobuf/JSON Schema.