Skip to content

NimTechnology

Trình bày các công nghệ CLOUD một cách dễ hiểu.

  • Kubernetes & Container
    • Docker
    • Kubernetes
      • Ingress
    • Helm Chart
    • Isito-EnvoyFilter
    • Apache Kafka
      • Kafka
      • Kafka Connect
      • Lenses
    • Vault
    • Longhorn – Storage
    • VictoriaMetrics
  • CI/CD
    • ArgoCD
    • ArgoWorkflows
    • Spinnaker
    • Jenkins
  • Coding
    • Terraform
      • GCP – Google Cloud
      • AWS – Amazon Web Service
    • Golang
    • Laravel
    • Jquery & JavaScript
    • Git
    • Selenium
  • Log & Monitor
    • Prometheus
    • Grafana
    • ELK
      • Kibana
      • Logstash
  • BareMetal
  • Toggle search form

[Kafka-connect] Single Message Transform: lesson 7 – TimeStampRouter and MessageTimestampRouter – Custom format topic name with timestamp

Posted on February 26, 2022February 26, 2022 By nim No Comments on [Kafka-connect] Single Message Transform: lesson 7 – TimeStampRouter and MessageTimestampRouter – Custom format topic name with timestamp

Bài này thì anh Tây chỉ cánh chỉnh topic name trong source và table name trong sink bằng cách kêt với timestamp.

https://github.com/confluentinc/demo-scene/blob/master/kafka-connect-single-message-transforms/day7.adoc

Contents

  • 1) Setup the components.
  • 2) Practice.

1) Setup the components.

Phần setup kafka, bạn có thể coi lại link này he:
Setup the needing components of Kafka

Bạn nhớ delele container cũ và run lại docker-compose nhé:
docker-compose down -v
docker-compose up -d

2) Practice.

Tạo the data generator
https://www.confluent.io/hub/mdrogalis/voluble

curl -i -X PUT -H  "Content-Type:application/json" \
    http://localhost:8083/connectors/source-voluble-datagen-day7-00/config \
    -d '{
        "connector.class"                             : "io.mdrogalis.voluble.VolubleSourceConnector",
        "genkp.day7-transactions.with"                : "#{Internet.uuid}",
        "genv.day7-transactions.cost.with"            : "#{Commerce.price}",
        "genv.day7-transactions.txn_date.with"        : "#{date.past '\''10'\'','\''DAYS'\''}",
        "genv.day7-transactions.card_type.with"       : "#{Business.creditCardType}",
        "genv.day7-transactions.customer_remarks.with": "#{BackToTheFuture.quote}",
        "genv.day7-transactions.item.with"            : "#{Beer.name}",
        "topic.day7-transactions.throttle.ms"         : 1000
    }'

Giống như RegExRouter, thì TimeStampRouter  có thể được sử dụng để modify topic name
Nghĩa là chúng ta sẽ có nhu cầu them timestamp vào topic name.
For example, instead of streaming messages from Kafka to an Elasticsearch index called cars, they can be routed to monthly indices e.g. cars_2020-10, cars_2020-11, cars_2020-12, etc.

TimeStampRouter lấy timestamp của của kafka message

Giờ khi sink data từ topic -> database chúng ta sử dụng TimeStampRouter:

curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day7-00/config \
    -d '{
          "connector.class"                                : "io.confluent.connect.jdbc.JdbcSinkConnector",
          "connection.url"                                 : "jdbc:mysql://mysql:3306/demo",
          "connection.user"                                : "mysqluser",
          "connection.password"                            : "mysqlpw",
          "topics"                                         : "day7-transactions",
          "tasks.max"                                      : "4",
          "auto.create"                                    : "true",
          "auto.evolve"                                    : "true",
          "transforms"                                     : "addTimestampToTopic",
          "transforms.addTimestampToTopic.type"            : "org.apache.kafka.connect.transforms.TimestampRouter",
          "transforms.addTimestampToTopic.topic.format"    : "${topic}_${timestamp}",
          "transforms.addTimestampToTopic.timestamp.format": "YYYY-MM-dd"
        }'

thoại nhìn có 2 phần:
– transforms.addTimestampToTopic.topic.format: định dạnh và sắp sếp giữa topic name và timestamp.
– transforms.addTimestampToTopic.timestamp.format: định dang timestamp

giờ kiểm tra database:

docker exec -it mysql bash -c 'mysql -u root -p$MYSQL_ROOT_PASSWORD demo'
show tables;

Ngoài cách lấy timestamp từ kafka.
thì còn 1 cách nữa là timestamp từ field trong message
Vậy là như thế nào?

mình sẽ thực hành select data trong table:

SELECT txn_date, item, cost FROM `day7-transactions_2022-02-26` LIMIT 5;

chúng ta để ý column txn_date nó có định dạng thời gian data cử dụng row. từ data của field txn_date -> trích xuất ra ngày -> setup format cho topic name

There is a Single Message Transform called MessageTimestampRouter which is part of Confluent Platform and can be used to route data based on a time field in the message value itself.

curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day7-00/config \
    -d '{
          "connector.class"                                                 : "io.confluent.connect.jdbc.JdbcSinkConnector",
          "connection.url"                                                  : "jdbc:mysql://mysql:3306/demo",
          "connection.user"                                                 : "mysqluser",
          "connection.password"                                             : "mysqlpw",
          "topics"                                                          : "day7-transactions",
          "tasks.max"                                                       : "4",
          "auto.create"                                                     : "true",
          "auto.evolve"                                                     : "true",
          "transforms"                                                      : "addTimestampToTopicFromField",
          "transforms.addTimestampToTopicFromField.type"                    : "io.confluent.connect.transforms.MessageTimestampRouter",
          "transforms.addTimestampToTopicFromField.message.timestamp.keys"  : "txn_date",
          "transforms.addTimestampToTopicFromField.message.timestamp.format": "EEE MMM dd HH:mm:ss zzz yyyy",
          "transforms.addTimestampToTopicFromField.topic.format"            : "${topic}_${timestamp}",
          "transforms.addTimestampToTopicFromField.topic.timestamp.format"  : "YYYY-MM-dd"
        }'

Khi show status connector bạn sẽ thấy fail

curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
       jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
       column -s : -t| sed 's/\"//g'| sort

Currently fails…

org.apache.kafka.connect.errors.DataException: Only Map objects supported in absence of schema for [appending message's timestamp field to topic], found: org.apache.kafka.connect.data.Struct
        at io.confluent.connect.transforms.util.Requirements.requireMap(Requirements.java:30)
        at io.confluent.connect.transforms.MessageTimestampRouter.apply(MessageTimestampRouter.java:132)
        at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:146)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:180)
        ... 14 more

TRACE [sink-jdbc-mysql-day7-00|task-2] Applying transformation io.confluent.connect.transforms.MessageTimestampRouter to SinkRecord{kafkaOffset=2300, timestampType=CreateTime} ConnectRecord{topic='day7-transactions', kafkaPartition=0, key=013e350e-ac03-44cd-bc2b-7b348ec4df6b, keySchema=Schema{STRING}, value=Struct{txn_date=Thu Dec 03 02:25:24 GMT 2020,cost=73.58,item=Delirium Noctorum,card_type=mastercard,customer_remarks=He laid out Biff in one punch. I didn't know he had it in him. He's never stood up to Biff in his life!}, valueSchema=Schema{io.mdrogalis.Gen0:STRUCT}, timestamp=1607681956641, headers=ConnectHeaders(headers=)} (org.apache.kafka.connect.runtime.TransformationChain:47)

The cause of this is that the Single Message Transform currently expects to handle raw JSON formatted records – not Avro/Protobuf/JSON Schema.

Apache Kafka

Post navigation

Previous Post: [Kafka-connect] Single Message Transform: lesson 6 InsertField – Insert fields using attributes in the process data.
Next Post: [Kafka-connect] Single Message Transform: lesson 8 – TimestampConverter – convert the type of the fields from string to timestamp or date,…

More Related Articles

[Kafka-connect] Streaming the data of MongoDB through Kafka-connect into topic Kafka. Apache Kafka
[Kafka-Zookeeper] Starting Kafka and Zookeeper Apache Kafka
[Kafka] Kafka Console Producer CLI. Apache Kafka
[Kafka-connect] Streaming the data of Postgresql through Kafka-connect and Debezium plugin. Apache Kafka
[Kafka-connect] Single Message Transform: lesson 9 – Cast Apache Kafka
[Kafka-connect]Single Message Transform: lesson 3 – Flatten and LongConverter Apache Kafka

Leave a Reply Cancel reply

Your email address will not be published. Required fields are marked *

Tham Gia Group DevOps nhé!
Để Nim có nhiều động lực ra nhiều bài viết.
Để nhận được những thông báo mới nhất.

Recent Posts

  • [Argocd/Vault] Integrate Vault inside Argocd by the plugin July 1, 2022
  • [Vault] Using Service Acount of Kubernetes to login Vault system. June 28, 2022
  • Protected: My Assignment  June 24, 2022
  • [Spinnaker] Spinnaker writes too many logs – Reduce spinnaker log level June 22, 2022
  • [Jenkins] Jobs will be created automatically by Jenkins Job Builder June 20, 2022

Archives

  • July 2022
  • June 2022
  • May 2022
  • April 2022
  • March 2022
  • February 2022
  • January 2022
  • December 2021
  • November 2021
  • October 2021
  • September 2021
  • August 2021
  • July 2021
  • June 2021

Categories

  • BareMetal
  • CI/CD
    • ArgoCD
    • ArgoWorkflows
    • Jenkins
    • Spinnaker
  • Coding
    • Git
    • Golang
    • Jquery & JavaScript
    • Laravel
    • Selenium
    • Terraform
      • AWS – Amazon Web Service
      • GCP – Google Cloud
  • Kubernetes & Container
    • Apache Kafka
      • Kafka
      • Kafka Connect
      • Lenses
    • Docker
    • Helm Chart
    • Isito-EnvoyFilter
    • Kubernetes
      • Ingress
    • Longhorn – Storage
    • Vault
    • VictoriaMetrics
  • Log & Monitor
    • ELK
      • Kibana
      • Logstash
    • Grafana
    • Prometheus
  • Uncategorized
  • Admin

Copyright © 2022 NimTechnology.