Skip to content

NimTechnology

Trình bày các công nghệ CLOUD một cách dễ hiểu.

  • Kubernetes & Container
    • Docker
    • Kubernetes
      • Ingress
      • Pod
    • Helm Chart
    • OAuth2 Proxy
    • Isito-EnvoyFilter
    • Apache Kafka
      • Kafka
      • Kafka Connect
      • Lenses
    • Vault
    • Longhorn – Storage
    • VictoriaMetrics
    • MetalLB
    • Kong Gateway
  • CI/CD
    • ArgoCD
    • ArgoWorkflows
    • Argo Events
    • Spinnaker
    • Jenkins
    • Harbor
    • TeamCity
    • Git
      • Bitbucket
  • Coding
    • DevSecOps
    • Terraform
      • GCP – Google Cloud
      • AWS – Amazon Web Service
      • Azure Cloud
    • Golang
    • Laravel
    • Python
    • Jquery & JavaScript
    • Selenium
  • Log, Monitor & Tracing
    • DataDog
    • Prometheus
    • Grafana
    • ELK
      • Kibana
      • Logstash
  • BareMetal
    • NextCloud
  • Toggle search form

[Kafka-connect] Single Message Transform: lesson 7 – TimeStampRouter and MessageTimestampRouter – Custom format topic name with timestamp

Posted on February 26, 2022February 26, 2022 By nim No Comments on [Kafka-connect] Single Message Transform: lesson 7 – TimeStampRouter and MessageTimestampRouter – Custom format topic name with timestamp

Bài này thì anh Tây chỉ cánh chỉnh topic name trong source và table name trong sink bằng cách kêt với timestamp.

https://github.com/confluentinc/demo-scene/blob/master/kafka-connect-single-message-transforms/day7.adoc

Contents

Toggle
  • 1) Setup the components.
  • 2) Practice.

1) Setup the components.

Phần setup kafka, bạn có thể coi lại link này he:
Setup the needing components of Kafka

Bạn nhớ delele container cũ và run lại docker-compose nhé:
docker-compose down -v
docker-compose up -d

2) Practice.

Tạo the data generator
https://www.confluent.io/hub/mdrogalis/voluble

curl -i -X PUT -H  "Content-Type:application/json" \
    http://localhost:8083/connectors/source-voluble-datagen-day7-00/config \
    -d '{
        "connector.class"                             : "io.mdrogalis.voluble.VolubleSourceConnector",
        "genkp.day7-transactions.with"                : "#{Internet.uuid}",
        "genv.day7-transactions.cost.with"            : "#{Commerce.price}",
        "genv.day7-transactions.txn_date.with"        : "#{date.past '\''10'\'','\''DAYS'\''}",
        "genv.day7-transactions.card_type.with"       : "#{Business.creditCardType}",
        "genv.day7-transactions.customer_remarks.with": "#{BackToTheFuture.quote}",
        "genv.day7-transactions.item.with"            : "#{Beer.name}",
        "topic.day7-transactions.throttle.ms"         : 1000
    }'

Giống như RegExRouter, thì TimeStampRouter  có thể được sử dụng để modify topic name
Nghĩa là chúng ta sẽ có nhu cầu them timestamp vào topic name.
For example, instead of streaming messages from Kafka to an Elasticsearch index called cars, they can be routed to monthly indices e.g. cars_2020-10, cars_2020-11, cars_2020-12, etc.

TimeStampRouter lấy timestamp của của kafka message

Giờ khi sink data từ topic -> database chúng ta sử dụng TimeStampRouter:

curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day7-00/config \
    -d '{
          "connector.class"                                : "io.confluent.connect.jdbc.JdbcSinkConnector",
          "connection.url"                                 : "jdbc:mysql://mysql:3306/demo",
          "connection.user"                                : "mysqluser",
          "connection.password"                            : "mysqlpw",
          "topics"                                         : "day7-transactions",
          "tasks.max"                                      : "4",
          "auto.create"                                    : "true",
          "auto.evolve"                                    : "true",
          "transforms"                                     : "addTimestampToTopic",
          "transforms.addTimestampToTopic.type"            : "org.apache.kafka.connect.transforms.TimestampRouter",
          "transforms.addTimestampToTopic.topic.format"    : "${topic}_${timestamp}",
          "transforms.addTimestampToTopic.timestamp.format": "YYYY-MM-dd"
        }'

thoại nhìn có 2 phần:
– transforms.addTimestampToTopic.topic.format: định dạnh và sắp sếp giữa topic name và timestamp.
– transforms.addTimestampToTopic.timestamp.format: định dang timestamp

giờ kiểm tra database:

docker exec -it mysql bash -c 'mysql -u root -p$MYSQL_ROOT_PASSWORD demo'
show tables;

Ngoài cách lấy timestamp từ kafka.
thì còn 1 cách nữa là timestamp từ field trong message
Vậy là như thế nào?

mình sẽ thực hành select data trong table:

SELECT txn_date, item, cost FROM `day7-transactions_2022-02-26` LIMIT 5;

chúng ta để ý column txn_date nó có định dạng thời gian data cử dụng row. từ data của field txn_date -> trích xuất ra ngày -> setup format cho topic name

There is a Single Message Transform called MessageTimestampRouter which is part of Confluent Platform and can be used to route data based on a time field in the message value itself.

curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day7-00/config \
    -d '{
          "connector.class"                                                 : "io.confluent.connect.jdbc.JdbcSinkConnector",
          "connection.url"                                                  : "jdbc:mysql://mysql:3306/demo",
          "connection.user"                                                 : "mysqluser",
          "connection.password"                                             : "mysqlpw",
          "topics"                                                          : "day7-transactions",
          "tasks.max"                                                       : "4",
          "auto.create"                                                     : "true",
          "auto.evolve"                                                     : "true",
          "transforms"                                                      : "addTimestampToTopicFromField",
          "transforms.addTimestampToTopicFromField.type"                    : "io.confluent.connect.transforms.MessageTimestampRouter",
          "transforms.addTimestampToTopicFromField.message.timestamp.keys"  : "txn_date",
          "transforms.addTimestampToTopicFromField.message.timestamp.format": "EEE MMM dd HH:mm:ss zzz yyyy",
          "transforms.addTimestampToTopicFromField.topic.format"            : "${topic}_${timestamp}",
          "transforms.addTimestampToTopicFromField.topic.timestamp.format"  : "YYYY-MM-dd"
        }'

Khi show status connector bạn sẽ thấy fail

curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
       jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
       column -s : -t| sed 's/\"//g'| sort

Currently fails…

org.apache.kafka.connect.errors.DataException: Only Map objects supported in absence of schema for [appending message's timestamp field to topic], found: org.apache.kafka.connect.data.Struct
        at io.confluent.connect.transforms.util.Requirements.requireMap(Requirements.java:30)
        at io.confluent.connect.transforms.MessageTimestampRouter.apply(MessageTimestampRouter.java:132)
        at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:146)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:180)
        ... 14 more

TRACE [sink-jdbc-mysql-day7-00|task-2] Applying transformation io.confluent.connect.transforms.MessageTimestampRouter to SinkRecord{kafkaOffset=2300, timestampType=CreateTime} ConnectRecord{topic='day7-transactions', kafkaPartition=0, key=013e350e-ac03-44cd-bc2b-7b348ec4df6b, keySchema=Schema{STRING}, value=Struct{txn_date=Thu Dec 03 02:25:24 GMT 2020,cost=73.58,item=Delirium Noctorum,card_type=mastercard,customer_remarks=He laid out Biff in one punch. I didn't know he had it in him. He's never stood up to Biff in his life!}, valueSchema=Schema{io.mdrogalis.Gen0:STRUCT}, timestamp=1607681956641, headers=ConnectHeaders(headers=)} (org.apache.kafka.connect.runtime.TransformationChain:47)

The cause of this is that the Single Message Transform currently expects to handle raw JSON formatted records – not Avro/Protobuf/JSON Schema.

Apache Kafka

Post navigation

Previous Post: [Kafka-connect] Single Message Transform: lesson 6 InsertField – Insert fields using attributes in the process data.
Next Post: [Kafka-connect] Single Message Transform: lesson 8 – TimestampConverter – convert the type of the fields from string to timestamp or date,…

More Related Articles

[Kafka-connect] Single Message Transform: lesson 5 MaskField – Cover the sensitive data. Apache Kafka
[Kafka-connect] Streaming the data of MySQL throughs Kafka-connect and Debezium plugin. Apache Kafka
[Kafka] Kafka Topics CLI Apache Kafka
[Kafka] Kafka Console Producer CLI. Apache Kafka
[Kafka-connect]Single Message Transform: lesson 3 – Flatten and LongConverter Apache Kafka
[Kafka-connect] Single Message Transform: lesson 8 – TimestampConverter – convert the type of the fields from string to timestamp or date,… Apache Kafka

Leave a Reply Cancel reply

Your email address will not be published. Required fields are marked *

Tham Gia Group DevOps nhé!
Để Nim có nhiều động lực ra nhiều bài viết.
Để nhận được những thông báo mới nhất.

Recent Posts

  • [Laravel] Laravel Helpful June 26, 2025
  • [VScode] Hướng dẫn điều chỉnh font cho terminal June 20, 2025
  • [WordPress] Hướng dấn gửi mail trên WordPress thông qua gmail. June 15, 2025
  • [Bitbucket] Git Clone/Pull/Push with Bitbucket through API Token. June 12, 2025
  • [Teamcity] How to transfer the value from pipeline A to pipeline B June 9, 2025

Archives

  • June 2025
  • May 2025
  • April 2025
  • March 2025
  • February 2025
  • January 2025
  • December 2024
  • November 2024
  • October 2024
  • September 2024
  • August 2024
  • July 2024
  • June 2024
  • May 2024
  • April 2024
  • March 2024
  • February 2024
  • January 2024
  • December 2023
  • November 2023
  • October 2023
  • September 2023
  • August 2023
  • July 2023
  • June 2023
  • May 2023
  • April 2023
  • March 2023
  • February 2023
  • January 2023
  • December 2022
  • November 2022
  • October 2022
  • September 2022
  • August 2022
  • July 2022
  • June 2022
  • May 2022
  • April 2022
  • March 2022
  • February 2022
  • January 2022
  • December 2021
  • November 2021
  • October 2021
  • September 2021
  • August 2021
  • July 2021
  • June 2021

Categories

  • BareMetal
    • NextCloud
  • CI/CD
    • Argo Events
    • ArgoCD
    • ArgoWorkflows
    • Git
      • Bitbucket
    • Harbor
    • Jenkins
    • Spinnaker
    • TeamCity
  • Coding
    • DevSecOps
    • Golang
    • Jquery & JavaScript
    • Laravel
    • NextJS 14 & ReactJS & Type Script
    • Python
    • Selenium
    • Terraform
      • AWS – Amazon Web Service
      • Azure Cloud
      • GCP – Google Cloud
  • Kubernetes & Container
    • Apache Kafka
      • Kafka
      • Kafka Connect
      • Lenses
    • Docker
    • Helm Chart
    • Isito-EnvoyFilter
    • Kong Gateway
    • Kubernetes
      • Ingress
      • Pod
    • Longhorn – Storage
    • MetalLB
    • OAuth2 Proxy
    • Vault
    • VictoriaMetrics
  • Log, Monitor & Tracing
    • DataDog
    • ELK
      • Kibana
      • Logstash
    • Fluent
    • Grafana
    • Prometheus
  • Uncategorized
  • Admin

Copyright © 2025 NimTechnology.