Skip to content

NimTechnology

Trình bày các công nghệ CLOUD một cách dễ hiểu.

  • Kubernetes & Container
    • Docker
    • Kubernetes
      • Ingress
    • Helm Chart
    • Isito-EnvoyFilter
    • Apache Kafka
      • Kafka
      • Kafka Connect
      • Lenses
    • Vault
    • Longhorn – Storage
    • VictoriaMetrics
  • CI/CD
    • ArgoCD
    • ArgoWorkflows
    • Spinnaker
    • Jenkins
    • Harbor
    • TeamCity
    • Git
      • Bitbucket
  • Coding
    • Terraform
      • GCP – Google Cloud
      • AWS – Amazon Web Service
    • Golang
    • Laravel
    • Python
    • Jquery & JavaScript
    • Selenium
  • Log & Monitor
    • Prometheus
    • Grafana
    • ELK
      • Kibana
      • Logstash
  • BareMetal
  • Toggle search form

[Kafka-connect] Single Message Transform: lesson 4 – RegexRouter – change topic name.

Posted on February 24, 2022February 26, 2022 By nim No Comments on [Kafka-connect] Single Message Transform: lesson 4 – RegexRouter – change topic name.

Contents

  • 1) Setup the components.
  • 2) Practice.

1) Setup the components.

Phần setup kafka, bạn có thể coi lại link này he:
Setup the needing components of Kafka

Bạn nhớ delele container cũ và run lại docker-compose nhé:
docker-compose down -v
docker-compose up -d

2) Practice.

Tạo the data generator
https://www.confluent.io/hub/mdrogalis/voluble

curl -i -X PUT -H  "Content-Type:application/json" \
    http://localhost:8083/connectors/source-voluble-datagen-00/config \
    -d '{
        "connector.class"                             : "io.mdrogalis.voluble.VolubleSourceConnector",
        "genkp.day4-transactions.with"                : "#{Internet.uuid}",
        "genv.day4-transactions.cost.with"            : "#{Commerce.price}",
        "genv.day4-transactions.card_type.with"       : "#{Business.creditCardType}",
        "genv.day4-transactions.item.with"            : "#{Beer.name}",
        "topic.day4-transactions.throttle.ms"         : 500
    }'

Giờ bạn thấy nó tạo ra 1 topic: day4-transactions

docker exec kafkacat kafkacat -b broker:29092 -L -J | jq '.topics[].topic'|sort

Giờ ta tạo 1 sink connector:

curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day4-transactions-00/config \
    -d '{
          "connector.class"               : "io.confluent.connect.jdbc.JdbcSinkConnector",
          "connection.url"                : "jdbc:mysql://mysql:3306/demo",
          "connection.user"               : "mysqluser",
          "connection.password"           : "mysqlpw",
          "topics"                        : "day4-transactions",
          "tasks.max"                     : "4",
          "auto.create"                   : "true",
          "auto.evolve"                   : "true"
        }'

Kiểm tra các connector:

curl -s "http://localhost:8083/connectors?expand=info&expand=status" |        jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' |        column -s : -t| sed 's/\"//g'| sort

Giờ vào database:

docker exec -it mysql mysql -u mysqluser -pmysqlpw demo
show tables;
mysql> select * from day4-transactions;
ERROR 1064 (42000): You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near '-transactions' at line 1
mysql> 

Vậy là dấu hyphen “-” trong tên table thì chúng ta gặp error SQL syntax
Anh trai đó nói là bạn có thể dùng backtick `x` thì sẽ query được hoi.

mysql> select * from `day4-transactions` LIMIT 1;
+-------------+-------+-----------------------+
| card_type   | cost  | item                  |
+-------------+-------+-----------------------+
| diners_club | 34.45 | Celebrator Doppelbock |
+-------------+-------+-----------------------+
1 row in set (0.00 sec)

Như mà cách thêm backtick thì hay lắm!
Chúng ta có thể chuyển từ day4-transactions sang –> transactions
Vậy giờ làm sao.

Mặc định JDBC Sink connector sẽ lấy name của topic và tạo thành table trong database.
Còn với Elasticsearch sink connector thì lấy name của topic để tạo thành index elasticsearch.

Theo như anh Tây chúng ta sẽ sử dụng RegExRouter
Using the RegExRouter override the topic name can be modified either as data is streamed into Kafka from a source connector, or as it leaves Kafka in a sink connector.
https://docs.confluent.io/platform/current/connect/transforms/regexrouter.html
Update the record’s topic using the configured regular expression and replacement string.

Giờ chúng ta sẽ edit connector bên trên bằng cách chỉ giữ lại chữ transactions và drop day4-

curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day4-transactions-00/config \
    -d '{
          "connector.class"                       : "io.confluent.connect.jdbc.JdbcSinkConnector",
          "connection.url"                        : "jdbc:mysql://mysql:3306/demo",
          "connection.user"                       : "mysqluser",
          "connection.password"                   : "mysqlpw",
          "topics"                                : "day4-transactions",
          "tasks.max"                             : "4",
          "auto.create"                           : "true",
          "auto.evolve"                           : "true",
          "transforms"                            : "dropTopicPrefix",
          "transforms.dropTopicPrefix.type"       : "org.apache.kafka.connect.transforms.RegexRouter",
          "transforms.dropTopicPrefix.regex"      : "day4-(.*)",
          "transforms.dropTopicPrefix.replacement": "$1"
        }'
200OK is updated successfully

Bước 1: bạn đặt label cho transform này là dropTopicPrefix

Lúc này transactions sẽ match với regex và chữ transactions sẽ được đưa vào $1
transform replace day4-transactions thành transactions -> create table trong database là transactions

docker exec -it mysql mysql -u mysqluser -pmysqlpw demo
show tables;

Chúng filter regex and place cho nó tường minh hơn.

curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day4-transactions-00/config \
    -d '{
          "connector.class"                       : "io.confluent.connect.jdbc.JdbcSinkConnector",
          "connection.url"                        : "jdbc:mysql://mysql:3306/demo",
          "connection.user"                       : "mysqluser",
          "connection.password"                   : "mysqlpw",
          "topics"                                : "day4-transactions",
          "tasks.max"                             : "4",
          "auto.create"                           : "true",
          "auto.evolve"                           : "true",
          "transforms"                            : "dropTopicPrefix",
          "transforms.dropTopicPrefix.type"       : "org.apache.kafka.connect.transforms.RegexRouter",
          "transforms.dropTopicPrefix.regex"      : "day(\\d+)-(.*)",
          "transforms.dropTopicPrefix.replacement": "$2_day$1"
        }'

You need to escape the \ when passing it through curl, so \ becomes \\.

Lúc này bạn thấy nó tạo table hợp lý hơn

Đây là nội dung mình tham khảo:

https://github.com/confluentinc/demo-scene/blob/master/kafka-connect-single-message-transforms/day4.adoc

Apache Kafka

Post navigation

Previous Post: [Lenses/kafka] Fix the problem “Cannot extract connector information from the configuration provided” when creating connector Kafka-connect
Next Post: [Kafka-connect] Single Message Transform: lesson 5 MaskField – Cover the sensitive data.

More Related Articles

[Kafka-connect] Single Message Transform: lesson 2 – ValueToKey and ExtractField in Sink Apache Kafka
[Kafka-connect] Install lenses to manage Kafka. Apache Kafka
[Kafka-connect] Single Message Transform: lesson 7 – TimeStampRouter and MessageTimestampRouter – Custom format topic name with timestamp Apache Kafka
[Kafka-connect] Streaming the data of Postgresql through Kafka-connect and Debezium plugin. Apache Kafka
[Kafka] Kafka Console Producer CLI. Apache Kafka
[Kafka-Zookeeper] Starting Kafka and Zookeeper Apache Kafka

Leave a Reply Cancel reply

Your email address will not be published. Required fields are marked *

Tham Gia Group DevOps nhé!
Để Nim có nhiều động lực ra nhiều bài viết.
Để nhận được những thông báo mới nhất.

Recent Posts

  • [Smartctl] Instruction check the health disk of Raspberry. January 16, 2023
  • [kubectl/Argocd] How to create a kubectl config file for serviceaccount or from the cluster secret of Argocd January 12, 2023
  • [Helm/Github] Create a public Helm chart repository with GitHub Pages January 8, 2023
  • [AWS] How to increase the disk size of a Windows EC2 machine? January 4, 2023
  • [Redis] ElastiCache-Redis Cross-Region Replication|Global DataStore January 3, 2023

Archives

  • January 2023
  • December 2022
  • November 2022
  • October 2022
  • September 2022
  • August 2022
  • July 2022
  • June 2022
  • May 2022
  • April 2022
  • March 2022
  • February 2022
  • January 2022
  • December 2021
  • November 2021
  • October 2021
  • September 2021
  • August 2021
  • July 2021
  • June 2021

Categories

  • BareMetal
  • CI/CD
    • ArgoCD
    • ArgoWorkflows
    • Git
      • Bitbucket
    • Harbor
    • Jenkins
    • Spinnaker
    • TeamCity
  • Coding
    • Golang
    • Jquery & JavaScript
    • Laravel
    • Python
    • Selenium
    • Terraform
      • AWS – Amazon Web Service
      • GCP – Google Cloud
  • Kubernetes & Container
    • Apache Kafka
      • Kafka
      • Kafka Connect
      • Lenses
    • Docker
    • Helm Chart
    • Isito-EnvoyFilter
    • Kubernetes
      • Ingress
    • Longhorn – Storage
    • Vault
    • VictoriaMetrics
  • Log & Monitor
    • ELK
      • Kibana
      • Logstash
    • Grafana
    • Prometheus
  • Uncategorized
  • Admin

Copyright © 2023 NimTechnology.