Skip to content

NimTechnology

Trình bày các công nghệ CLOUD một cách dễ hiểu.

  • Kubernetes & Container
    • Docker
    • Kubernetes
      • Ingress
      • Pod
    • Helm Chart
    • OAuth2 Proxy
    • Isito-EnvoyFilter
    • Apache Kafka
      • Kafka
      • Kafka Connect
      • Lenses
    • Vault
    • Longhorn – Storage
    • VictoriaMetrics
    • MetalLB
    • Kong Gateway
  • CI/CD
    • ArgoCD
    • ArgoWorkflows
    • Argo Events
    • Spinnaker
    • Jenkins
    • Harbor
    • TeamCity
    • Git
      • Bitbucket
  • Coding
    • DevSecOps
    • Terraform
      • GCP – Google Cloud
      • AWS – Amazon Web Service
      • Azure Cloud
    • Golang
    • Laravel
    • Python
    • Jquery & JavaScript
    • Selenium
  • Log, Monitor & Tracing
    • DataDog
    • Prometheus
    • Grafana
    • ELK
      • Kibana
      • Logstash
  • BareMetal
    • NextCloud
  • Toggle search form

[Kafka-connect] Single Message Transform: lesson 8 – TimestampConverter – convert the type of the fields from string to timestamp or date,…

Posted on February 26, 2022February 27, 2022 By nim No Comments on [Kafka-connect] Single Message Transform: lesson 8 – TimestampConverter – convert the type of the fields from string to timestamp or date,…

Contents

Toggle
  • 1) Setup the components.
  • 2) Practice.

1) Setup the components.

Phần setup kafka, bạn có thể coi lại link này he:
Setup the needing components of Kafka

Bạn nhớ delele container cũ và run lại docker-compose nhé:
docker-compose down -v
docker-compose up -d

2) Practice.

reference link:
https://github.com/confluentinc/demo-scene/blob/master/kafka-connect-single-message-transforms/day8.adoc

Tạo the data generator
https://www.confluent.io/hub/mdrogalis/voluble

curl -i -X PUT -H  "Content-Type:application/json" \
    http://localhost:8083/connectors/source-voluble-datagen-day8-00/config \
    -d '{
        "connector.class"                             : "io.mdrogalis.voluble.VolubleSourceConnector",
        "genkp.day8-transactions.with"                : "#{Internet.uuid}",
        "genv.day8-transactions.cost.with"            : "#{Commerce.price}",
        "genv.day8-transactions.txn_date.with"        : "#{date.past '\''10'\'','\''DAYS'\''}",
        "genv.day8-transactions.card_type.with"       : "#{Business.creditCardType}",
        "genv.day8-transactions.customer_remarks.with": "#{BackToTheFuture.quote}",
        "genv.day8-transactions.item.with"            : "#{Beer.name}",
        "topic.day8-transactions.throttle.ms"         : 1000
    }'

Giờ chúng ta kiểm tra data trong topic và chính xác là field: txn_date

docker exec kafkacat kafkacat -b broker:29092 -r http://schema-registry:8081 -s key=s -s value=avro -t day8-transactions -C -c5 -o-5 -u -q -J | \
  jq  '.payload.Gen0.txn_date.string'
bạn thấy cái data trong field txn_date có format cũng hơi là đúng ko?

Giờ chúng ta kiểm tra schema sẽ trông như thế nào?

curl -s "http://localhost:8081/subjects/day8-transactions-value/versions/latest" | jq '.schema|fromjson[]'
field này được định dang trong shema là string

Giờ anh Tây tạo sink connector bạn sẽ hiểu vấn đề:

curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day8-00/config \
    -d '{
          "connector.class"    : "io.confluent.connect.jdbc.JdbcSinkConnector",
          "connection.url"     : "jdbc:mysql://mysql:3306/demo",
          "connection.user"    : "mysqluser",
          "connection.password": "mysqlpw",
          "topics"             : "day8-transactions",
          "tasks.max"          : "4",
          "auto.create"        : "true",
          "auto.evolve"        : "true"
        }'
docker exec -it mysql bash -c 'mysql -u root -p$MYSQL_ROOT_PASSWORD demo'
show tables;
describe `day8-transactions`;
Bạn sẽ thấy column txn_date thì type nó là text

Nhưng thật sự chúng ta sẽ muốn column txn_date có type là date
như anh Tây chỉ chúng ta có thể sử dụng TimestampConverter: Convert timestamps between different formats such as Unix epoch, strings, and Connect Date/Timestamp types. Applies to individual fields or to the entire value.

curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day8-01/config \
    -d '{
          "connector.class"                 : "io.confluent.connect.jdbc.JdbcSinkConnector",
          "connection.url"                  : "jdbc:mysql://mysql:3306/demo",
          "connection.user"                 : "mysqluser",
          "connection.password"             : "mysqlpw",
          "topics"                          : "day8-transactions",
          "tasks.max"                       : "4",
          "auto.create"                     : "true",
          "auto.evolve"                     : "true",
          "transforms"                      : "convertTS,changeTopic",
          "transforms.convertTS.type"       : "org.apache.kafka.connect.transforms.TimestampConverter$Value",
          "transforms.convertTS.field"      : "txn_date",
          "transforms.convertTS.format"     : "EEE MMM dd HH:mm:ss zzz yyyy",
          "transforms.convertTS.target.type": "Timestamp",
          "transforms.changeTopic.type"       : "org.apache.kafka.connect.transforms.RegexRouter",
          "transforms.changeTopic.regex"      : "(.*)",
          "transforms.changeTopic.replacement": "$1_withTS"
        }'

step 1: khai báo label convertTS
step2: apply “org.apache.kafka.connect.transforms.TimestampConverter$Value”
step3: xác định field name sẽ action TimestampConverter.
step4: design format data “EEE MMM dd HH:mm:ss zzz yyyy”, nghĩa là data hiện thị trên destination data sẽ hiện thị ntn?
step5: định nghĩa target type của field đó: “transforms.convertTS.target.type”: “Timestamp”,

docker exec -it mysql bash -c 'mysql -u root -p$MYSQL_ROOT_PASSWORD demo'
describe `day8-transactions_withTS`;

Giờ ngon hơn rồi nhé

select txn_date from `day8-transactions_withTS` LIMIT 5;

Giờ nếu buốn thay đổi type và format
thì chúng ta chỉ cần thay transforms.convertTS.target.type

....
"transforms.convertTS.target.type": "Date",
....

thì result sẽ như thế này:

......
"transforms.convertTS.target.type": "Time",
.....

thì kết quả

.....
"transforms.convertTS.target.type": "unix",
....

Accessing timestamps in nested fields

Unfortunately the TimestampConverter only works on root-level elements; it can’t be used on timestamp fields that are nested in other fields. You’d need to either use Flatten first, or write your own transformation.

Apache Kafka, Kafka Connect

Post navigation

Previous Post: [Kafka-connect] Single Message Transform: lesson 7 – TimeStampRouter and MessageTimestampRouter – Custom format topic name with timestamp
Next Post: [Kafka-connect] Single Message Transform: lesson 9 – Cast

More Related Articles

[Kafka-connect] Install lenses to manage Kafka. Apache Kafka
[Kafka-connect] Single Message Transform: lesson 5 MaskField – Cover the sensitive data. Apache Kafka
[Kafka] Console Consumer CLI and Consumers in Group Apache Kafka
[Kafka-connect] Solving the error or issues on Kafka-connect Kafka Connect
[Lenses/kafka] Fix the problem “Cannot extract connector information from the configuration provided” when creating connector Kafka-connect Apache Kafka
[Kafka] Install kafka and zookeeper cluster on kubernetes. Apache Kafka

Leave a Reply Cancel reply

Your email address will not be published. Required fields are marked *

Tham Gia Group DevOps nhé!
Để Nim có nhiều động lực ra nhiều bài viết.
Để nhận được những thông báo mới nhất.

Recent Posts

  • [Azure] The subscription is not registered to use namespace ‘Microsoft.ContainerService’ May 8, 2025
  • [Azure] Insufficient regional vcpu quota left May 8, 2025
  • [WordPress] How to add a Dynamic watermark on WordPress. May 6, 2025
  • [vnet/Azure] VNet provisioning via Terraform. April 28, 2025
  • [tracetcp] How to perform a tracert command using a specific port. April 3, 2025

Archives

  • May 2025
  • April 2025
  • March 2025
  • February 2025
  • January 2025
  • December 2024
  • November 2024
  • October 2024
  • September 2024
  • August 2024
  • July 2024
  • June 2024
  • May 2024
  • April 2024
  • March 2024
  • February 2024
  • January 2024
  • December 2023
  • November 2023
  • October 2023
  • September 2023
  • August 2023
  • July 2023
  • June 2023
  • May 2023
  • April 2023
  • March 2023
  • February 2023
  • January 2023
  • December 2022
  • November 2022
  • October 2022
  • September 2022
  • August 2022
  • July 2022
  • June 2022
  • May 2022
  • April 2022
  • March 2022
  • February 2022
  • January 2022
  • December 2021
  • November 2021
  • October 2021
  • September 2021
  • August 2021
  • July 2021
  • June 2021

Categories

  • BareMetal
    • NextCloud
  • CI/CD
    • Argo Events
    • ArgoCD
    • ArgoWorkflows
    • Git
      • Bitbucket
    • Harbor
    • Jenkins
    • Spinnaker
    • TeamCity
  • Coding
    • DevSecOps
    • Golang
    • Jquery & JavaScript
    • Laravel
    • NextJS 14 & ReactJS & Type Script
    • Python
    • Selenium
    • Terraform
      • AWS – Amazon Web Service
      • Azure Cloud
      • GCP – Google Cloud
  • Kubernetes & Container
    • Apache Kafka
      • Kafka
      • Kafka Connect
      • Lenses
    • Docker
    • Helm Chart
    • Isito-EnvoyFilter
    • Kong Gateway
    • Kubernetes
      • Ingress
      • Pod
    • Longhorn – Storage
    • MetalLB
    • OAuth2 Proxy
    • Vault
    • VictoriaMetrics
  • Log, Monitor & Tracing
    • DataDog
    • ELK
      • Kibana
      • Logstash
    • Fluent
    • Grafana
    • Prometheus
  • Uncategorized
  • Admin

Copyright © 2025 NimTechnology.