Skip to content

NimTechnology

Trình bày các công nghệ CLOUD một cách dễ hiểu.

  • Kubernetes & Container
    • Docker
    • Kubernetes
      • Ingress
    • Helm Chart
    • Isito-EnvoyFilter
    • Apache Kafka
      • Kafka
      • Kafka Connect
      • Lenses
    • Vault
    • Longhorn – Storage
    • VictoriaMetrics
    • MetalLB
    • Kong Gateway
  • CI/CD
    • ArgoCD
    • ArgoWorkflows
    • Spinnaker
    • Jenkins
    • Harbor
    • TeamCity
    • Git
      • Bitbucket
  • Coding
    • Terraform
      • GCP – Google Cloud
      • AWS – Amazon Web Service
    • Golang
    • Laravel
    • Python
    • Jquery & JavaScript
    • Selenium
  • Log & Monitor
    • DataDog
    • Prometheus
    • Grafana
    • ELK
      • Kibana
      • Logstash
  • BareMetal
    • NextCloud
  • Toggle search form

[Kafka-connect] Single Message Transform: lesson 7 – TimeStampRouter and MessageTimestampRouter – Custom format topic name with timestamp

Posted on February 26, 2022February 26, 2022 By nim No Comments on [Kafka-connect] Single Message Transform: lesson 7 – TimeStampRouter and MessageTimestampRouter – Custom format topic name with timestamp

Bài này thì anh Tây chỉ cánh chỉnh topic name trong source và table name trong sink bằng cách kêt với timestamp.

https://github.com/confluentinc/demo-scene/blob/master/kafka-connect-single-message-transforms/day7.adoc

Contents

  • 1) Setup the components.
  • 2) Practice.

1) Setup the components.

Phần setup kafka, bạn có thể coi lại link này he:
Setup the needing components of Kafka

Bạn nhớ delele container cũ và run lại docker-compose nhé:
docker-compose down -v
docker-compose up -d

2) Practice.

Tạo the data generator
https://www.confluent.io/hub/mdrogalis/voluble

curl -i -X PUT -H  "Content-Type:application/json" \
    http://localhost:8083/connectors/source-voluble-datagen-day7-00/config \
    -d '{
        "connector.class"                             : "io.mdrogalis.voluble.VolubleSourceConnector",
        "genkp.day7-transactions.with"                : "#{Internet.uuid}",
        "genv.day7-transactions.cost.with"            : "#{Commerce.price}",
        "genv.day7-transactions.txn_date.with"        : "#{date.past '\''10'\'','\''DAYS'\''}",
        "genv.day7-transactions.card_type.with"       : "#{Business.creditCardType}",
        "genv.day7-transactions.customer_remarks.with": "#{BackToTheFuture.quote}",
        "genv.day7-transactions.item.with"            : "#{Beer.name}",
        "topic.day7-transactions.throttle.ms"         : 1000
    }'

Giống như RegExRouter, thì TimeStampRouter  có thể được sử dụng để modify topic name
Nghĩa là chúng ta sẽ có nhu cầu them timestamp vào topic name.
For example, instead of streaming messages from Kafka to an Elasticsearch index called cars, they can be routed to monthly indices e.g. cars_2020-10, cars_2020-11, cars_2020-12, etc.

TimeStampRouter lấy timestamp của của kafka message

Giờ khi sink data từ topic -> database chúng ta sử dụng TimeStampRouter:

curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day7-00/config \
    -d '{
          "connector.class"                                : "io.confluent.connect.jdbc.JdbcSinkConnector",
          "connection.url"                                 : "jdbc:mysql://mysql:3306/demo",
          "connection.user"                                : "mysqluser",
          "connection.password"                            : "mysqlpw",
          "topics"                                         : "day7-transactions",
          "tasks.max"                                      : "4",
          "auto.create"                                    : "true",
          "auto.evolve"                                    : "true",
          "transforms"                                     : "addTimestampToTopic",
          "transforms.addTimestampToTopic.type"            : "org.apache.kafka.connect.transforms.TimestampRouter",
          "transforms.addTimestampToTopic.topic.format"    : "${topic}_${timestamp}",
          "transforms.addTimestampToTopic.timestamp.format": "YYYY-MM-dd"
        }'

thoại nhìn có 2 phần:
– transforms.addTimestampToTopic.topic.format: định dạnh và sắp sếp giữa topic name và timestamp.
– transforms.addTimestampToTopic.timestamp.format: định dang timestamp

giờ kiểm tra database:

docker exec -it mysql bash -c 'mysql -u root -p$MYSQL_ROOT_PASSWORD demo'
show tables;

Ngoài cách lấy timestamp từ kafka.
thì còn 1 cách nữa là timestamp từ field trong message
Vậy là như thế nào?

mình sẽ thực hành select data trong table:

SELECT txn_date, item, cost FROM `day7-transactions_2022-02-26` LIMIT 5;

chúng ta để ý column txn_date nó có định dạng thời gian data cử dụng row. từ data của field txn_date -> trích xuất ra ngày -> setup format cho topic name

There is a Single Message Transform called MessageTimestampRouter which is part of Confluent Platform and can be used to route data based on a time field in the message value itself.

curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day7-00/config \
    -d '{
          "connector.class"                                                 : "io.confluent.connect.jdbc.JdbcSinkConnector",
          "connection.url"                                                  : "jdbc:mysql://mysql:3306/demo",
          "connection.user"                                                 : "mysqluser",
          "connection.password"                                             : "mysqlpw",
          "topics"                                                          : "day7-transactions",
          "tasks.max"                                                       : "4",
          "auto.create"                                                     : "true",
          "auto.evolve"                                                     : "true",
          "transforms"                                                      : "addTimestampToTopicFromField",
          "transforms.addTimestampToTopicFromField.type"                    : "io.confluent.connect.transforms.MessageTimestampRouter",
          "transforms.addTimestampToTopicFromField.message.timestamp.keys"  : "txn_date",
          "transforms.addTimestampToTopicFromField.message.timestamp.format": "EEE MMM dd HH:mm:ss zzz yyyy",
          "transforms.addTimestampToTopicFromField.topic.format"            : "${topic}_${timestamp}",
          "transforms.addTimestampToTopicFromField.topic.timestamp.format"  : "YYYY-MM-dd"
        }'

Khi show status connector bạn sẽ thấy fail

curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
       jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
       column -s : -t| sed 's/\"//g'| sort

Currently fails…

org.apache.kafka.connect.errors.DataException: Only Map objects supported in absence of schema for [appending message's timestamp field to topic], found: org.apache.kafka.connect.data.Struct
        at io.confluent.connect.transforms.util.Requirements.requireMap(Requirements.java:30)
        at io.confluent.connect.transforms.MessageTimestampRouter.apply(MessageTimestampRouter.java:132)
        at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:146)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:180)
        ... 14 more

TRACE [sink-jdbc-mysql-day7-00|task-2] Applying transformation io.confluent.connect.transforms.MessageTimestampRouter to SinkRecord{kafkaOffset=2300, timestampType=CreateTime} ConnectRecord{topic='day7-transactions', kafkaPartition=0, key=013e350e-ac03-44cd-bc2b-7b348ec4df6b, keySchema=Schema{STRING}, value=Struct{txn_date=Thu Dec 03 02:25:24 GMT 2020,cost=73.58,item=Delirium Noctorum,card_type=mastercard,customer_remarks=He laid out Biff in one punch. I didn't know he had it in him. He's never stood up to Biff in his life!}, valueSchema=Schema{io.mdrogalis.Gen0:STRUCT}, timestamp=1607681956641, headers=ConnectHeaders(headers=)} (org.apache.kafka.connect.runtime.TransformationChain:47)

The cause of this is that the Single Message Transform currently expects to handle raw JSON formatted records – not Avro/Protobuf/JSON Schema.

Apache Kafka

Post navigation

Previous Post: [Kafka-connect] Single Message Transform: lesson 6 InsertField – Insert fields using attributes in the process data.
Next Post: [Kafka-connect] Single Message Transform: lesson 8 – TimestampConverter – convert the type of the fields from string to timestamp or date,…

More Related Articles

[Lenses/kafka] Fix the problem “Cannot extract connector information from the configuration provided” when creating connector Kafka-connect Apache Kafka
[Kafka-connect] Single Message Transform: lesson 1 – InsertField in Sink Apache Kafka
[Kafka-connect] Single Message Transform: lesson 2 – ValueToKey and ExtractField in Sink Apache Kafka
[Kafka] Install kafka and zookeeper cluster on kubernetes. Apache Kafka
[Kafka] UI control Kafka, Kafka-connect, … It’s akhq.io Apache Kafka
[Kafka-connect] Install Kafka-connect on Kubernetes through helm-chart. Apache Kafka

Leave a Reply Cancel reply

Your email address will not be published. Required fields are marked *

Tham Gia Group DevOps nhé!
Để Nim có nhiều động lực ra nhiều bài viết.
Để nhận được những thông báo mới nhất.

Recent Posts

  • [Prometheus/Grafana] Install Prometheus and Grafana on ubuntu. March 27, 2023
  • [Kong Gateway] WebSocket connection failed March 26, 2023
  • [Nextcloud] Can’t download files to have a size bigger than 2Gi on NextCloud – RaspBerry March 24, 2023
  • [Datadog] Using DataDog to monitor all services on kubernetes March 19, 2023
  • [Metrics Server] Failed to make webhook authorizer request: the server could not find the requested resource March 17, 2023

Archives

  • March 2023
  • February 2023
  • January 2023
  • December 2022
  • November 2022
  • October 2022
  • September 2022
  • August 2022
  • July 2022
  • June 2022
  • May 2022
  • April 2022
  • March 2022
  • February 2022
  • January 2022
  • December 2021
  • November 2021
  • October 2021
  • September 2021
  • August 2021
  • July 2021
  • June 2021

Categories

  • BareMetal
    • NextCloud
  • CI/CD
    • ArgoCD
    • ArgoWorkflows
    • Git
      • Bitbucket
    • Harbor
    • Jenkins
    • Spinnaker
    • TeamCity
  • Coding
    • Golang
    • Jquery & JavaScript
    • Laravel
    • Python
    • Selenium
    • Terraform
      • AWS – Amazon Web Service
      • GCP – Google Cloud
  • Kubernetes & Container
    • Apache Kafka
      • Kafka
      • Kafka Connect
      • Lenses
    • Docker
    • Helm Chart
    • Isito-EnvoyFilter
    • Kong Gateway
    • Kubernetes
      • Ingress
    • Longhorn – Storage
    • MetalLB
    • Vault
    • VictoriaMetrics
  • Log & Monitor
    • DataDog
    • ELK
      • Kibana
      • Logstash
    • Grafana
    • Prometheus
  • Uncategorized
  • Admin

Copyright © 2023 NimTechnology.