Skip to content

NimTechnology

Trình bày các công nghệ CLOUD một cách dễ hiểu.

  • Kubernetes & Container
    • Docker
    • Kubernetes
      • Ingress
    • Helm Chart
    • Isito-EnvoyFilter
    • Apache Kafka
      • Kafka
      • Kafka Connect
      • Lenses
    • Vault
    • Longhorn – Storage
    • VictoriaMetrics
    • MetalLB
    • Kong Gateway
  • CI/CD
    • ArgoCD
    • ArgoWorkflows
    • Spinnaker
    • Jenkins
    • Harbor
    • TeamCity
    • Git
      • Bitbucket
  • Coding
    • Terraform
      • GCP – Google Cloud
      • AWS – Amazon Web Service
    • Golang
    • Laravel
    • Python
    • Jquery & JavaScript
    • Selenium
  • Log & Monitor
    • DataDog
    • Prometheus
    • Grafana
    • ELK
      • Kibana
      • Logstash
  • BareMetal
    • NextCloud
  • Toggle search form

[Kafka-connect] Single Message Transform: lesson 10 – ReplaceField – U can drop, keep and renamed field in kafka message.

Posted on February 27, 2022February 27, 2022 By nim No Comments on [Kafka-connect] Single Message Transform: lesson 10 – ReplaceField – U can drop, keep and renamed field in kafka message.

Contents

  • 1) Setup the components.
  • 2) Practice.

1) Setup the components.

Phần setup kafka, bạn có thể coi lại link này he:
Setup the needing components of Kafka

Bạn nhớ delele container cũ và run lại docker-compose nhé:
docker-compose down -v
docker-compose up -d

2) Practice.

reference link:
https://github.com/confluentinc/demo-scene/blob/master/kafka-connect-single-message-transforms/day10.adoc

curl -i -X PUT -H  "Content-Type:application/json" \
    http://localhost:8083/connectors/source-voluble-datagen-day10-00/config \
    -d '{
        "connector.class"                              : "io.mdrogalis.voluble.VolubleSourceConnector",
        "genkp.day10-transactions.with"                : "#{Internet.uuid}",
        "genv.day10-transactions.cost.with"            : "#{Commerce.price}",
        "genv.day10-transactions.units.with"           : "#{number.number_between '\''1'\'','\''99'\''}",
        "genv.day10-transactions.txn_date.with"        : "#{date.past '\''10'\'','\''DAYS'\''}",
        "genv.day10-transactions.cc_num.with"          : "#{Business.creditCardNumber}",
        "genv.day10-transactions.cc_exp.with"          : "#{Business.creditCardExpiry}",
        "genv.day10-transactions.card_type.with"       : "#{Business.creditCardType}",
        "genv.day10-transactions.customer_remarks.with": "#{BackToTheFuture.quote}",
        "genv.day10-transactions.item.with"            : "#{Beer.name}",
        "topic.day10-transactions.throttle.ms"         : 1000
    }'

curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day10-00/config \
    -d '{
          "connector.class"                       : "io.confluent.connect.jdbc.JdbcSinkConnector",
          "connection.url"                        : "jdbc:mysql://mysql:3306/demo",
          "connection.user"                       : "mysqluser",
          "connection.password"                   : "mysqlpw",
          "topics"                                : "day10-transactions",
          "tasks.max"                             : "4",
          "auto.create"                           : "true",
          "auto.evolve"                           : "true",
          "transforms"                            : "changeTableName",
          "transforms.changeTableName.type"       : "org.apache.kafka.connect.transforms.RegexRouter",
          "transforms.changeTableName.regex"      : ".*",
          "transforms.changeTableName.replacement": "production_data"
        }'
docker exec -it mysql bash -c 'mysql -u root -p$MYSQL_ROOT_PASSWORD demo'
show tables;

Dropping fields in a sink connector:

docker exec kafkacat kafkacat -b broker:29092 -r http://schema-registry:8081 -s key=s -s value=avro -t day10-transactions -C -c1 -o-1 -u -q -J |  jq  '.payload'
Kiểm tra data strong topic: day10-transactions

[Kafka-connect] Single Message Transform: lesson 5 MaskField
Ở bài 5 chúng ta học cách che các dữ liệu nhạnh cảm (sensitive infomantion) bằng MaskField, Nhưng chúng ta sẽ cảm wasteful(lãng phí) ghi vào database nhưng bị che và cũng chả làm gì.

Vậy chúng ta sẽ drop các field ấy bằng ReplaceField

curl -i -X PUT -H "Accept:application/json" \
  -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day10-01/config \
  -d '{
      "connector.class"            : "io.confluent.connect.jdbc.JdbcSinkConnector",
      "connection.url"             : "jdbc:mysql://mysql:3306/demo",
      "connection.user"            : "mysqluser",
      "connection.password"        : "mysqlpw",
      "topics"                     : "day10-transactions",
      "tasks.max"                  : "4",
      "auto.create"                : "true",
      "auto.evolve"                : "true",
      "transforms"                 : "dropCC",
      "transforms.dropCC.type"     : "org.apache.kafka.connect.transforms.ReplaceField$Value",
      "transforms.dropCC.blacklist": "cc_num,cc_exp,card_type"
      }'

Giờ kiểm tra database

docker exec -it mysql bash -c 'mysql -u root -p$MYSQL_ROOT_PASSWORD demo'
show tables;
describe `day10-transactions`;
chúng ta nhìn thấy trong table: day10-transactions không có các column cc_num,cc_exp,card_type

Các drop field này thì có vẻ là ngon nghẻ hơn đúng hem!

Including only certain fields in a source connector

Giờ bạn có 100 field nhưng giờ bạn chỉ muốn giữ 3 field chẳng lẽ ngồi điền drop 97 field còn lại
Chúng ta có 1 cách khác là khai báo chỉ giữ lại cần thiết và drop hết mấy cái còn lại.

curl -X PUT http://localhost:8083/connectors/source-jdbc-mysql-day10-00/config \
  -H "Content-Type: application/json" -d '{
    "connector.class"                  : "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url"                   : "jdbc:mysql://mysql:3306/demo",
    "connection.user"                  : "mysqluser",
    "connection.password"              : "mysqlpw",
    "topic.prefix"                     : "day10-",
    "poll.interval.ms"                 : 10000,
    "tasks.max"                        : 1,
    "table.include"                  : "production_data",
    "mode"                             : "bulk",
    "transforms"                       : "selectFields",
    "transforms.selectFields.type"     : "org.apache.kafka.connect.transforms.ReplaceField$Value",
    "transforms.selectFields.whitelist": "item,cost,units,txn_date"
  }'

Nếu bạn run luôn command bên trên sẽ bị lỗi:
Caused by: org.apache.avro.SchemaParseException: Illegal character in: day10-transactions

Giờ bạn xoá mấy container cũ đi và run lại

docker-compose down -v
docker-compose up -d

>>>>>>>>>>>>
curl -i -X PUT -H  "Content-Type:application/json" \
    http://localhost:8083/connectors/source-voluble-datagen-day10-00/config \
    -d '{
        "connector.class"                              : "io.mdrogalis.voluble.VolubleSourceConnector",
        "genkp.day10-transactions.with"                : "#{Internet.uuid}",
        "genv.day10-transactions.cost.with"            : "#{Commerce.price}",
        "genv.day10-transactions.units.with"           : "#{number.number_between '\''1'\'','\''99'\''}",
        "genv.day10-transactions.txn_date.with"        : "#{date.past '\''10'\'','\''DAYS'\''}",
        "genv.day10-transactions.cc_num.with"          : "#{Business.creditCardNumber}",
        "genv.day10-transactions.cc_exp.with"          : "#{Business.creditCardExpiry}",
        "genv.day10-transactions.card_type.with"       : "#{Business.creditCardType}",
        "genv.day10-transactions.customer_remarks.with": "#{BackToTheFuture.quote}",
        "genv.day10-transactions.item.with"            : "#{Beer.name}",
        "topic.day10-transactions.throttle.ms"         : 1000
    }'

>>>>
curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day10-00/config \
    -d '{
          "connector.class"                       : "io.confluent.connect.jdbc.JdbcSinkConnector",
          "connection.url"                        : "jdbc:mysql://mysql:3306/demo",
          "connection.user"                       : "mysqluser",
          "connection.password"                   : "mysqlpw",
          "topics"                                : "day10-transactions",
          "tasks.max"                             : "4",
          "auto.create"                           : "true",
          "auto.evolve"                           : "true",
          "transforms"                            : "changeTableName",
          "transforms.changeTableName.type"       : "org.apache.kafka.connect.transforms.RegexRouter",
          "transforms.changeTableName.regex"      : ".*",
          "transforms.changeTableName.replacement": "production_data"
        }'

Giờ run tạo lại connector source-jdbc-mysql-day10-00. đang học lười fix bug lắm.

giờ kiêm tra topic: day10-production_data

docker exec kafkacat kafkacat -b broker:29092 -r http://schema-registry:8081 -s key=s -s value=avro -t day10-production_data -C -o-1 -u -q -J | jq  '.payload'
Giờ bạn thấy nó chỉ có 4 field trong topic thôi he

Renaming fields

Giờ khi mà mình stream từ topic -> database. chúng ta sẽ thực hiện renamed field txn_date(topic) -> transaction_timestamp(database)
thì ta sẽ làm như sau:

curl -i -X PUT -H "Accept:application/json" \
  -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day10-02/config \
  -d '{
      "connector.class"            : "io.confluent.connect.jdbc.JdbcSinkConnector",
      "connection.url"             : "jdbc:mysql://mysql:3306/demo",
      "connection.user"            : "mysqluser",
      "connection.password"        : "mysqlpw",
      "topics"                     : "day10-production_data",
      "tasks.max"                  : "4",
      "auto.create"                : "true",
      "auto.evolve"                : "true",
      "transforms"                 : "renameTS",
      "transforms.renameTS.type"   : "org.apache.kafka.connect.transforms.ReplaceField$Value",
      "transforms.renameTS.renames": "txn_date:transaction_timestamp"
      }'
>>>>>>>>>
docker exec -it mysql bash -c 'mysql -u root -p$MYSQL_ROOT_PASSWORD demo'
describe `day10-production_data`;
Kafka Connect

Post navigation

Previous Post: [Kafka-connect] Single Message Transform: lesson 9 – Cast
Next Post: [Kafka-connect] Single Message Transform: lesson 11 – Predicate and Filter – filtering topic name and content of the messages.

More Related Articles

[Kafka-Connect] Overwrite value of Key on Mongo Source Connector – Mongodb Kafka Connect
[Kafka-connect] Single Message Transform: lesson 12 – Community Transformations Kafka Connect
[Kafka-connect] Streaming the data of MongoDB through Kafka-connect into topic Kafka. Apache Kafka
[Kafka-connect] Single Message Transform: lesson 5 MaskField – Cover the sensitive data. Apache Kafka
[Kafka-connect] Single Message Transform: lesson 1 – InsertField in Sink Apache Kafka
[Kafka-connect] Install Kafka-connect on Kubernetes through helm-chart. Apache Kafka

Leave a Reply Cancel reply

Your email address will not be published. Required fields are marked *

Tham Gia Group DevOps nhé!
Để Nim có nhiều động lực ra nhiều bài viết.
Để nhận được những thông báo mới nhất.

Recent Posts

  • [Prometheus/Grafana] Install Prometheus and Grafana on ubuntu. March 27, 2023
  • [Kong Gateway] WebSocket connection failed March 26, 2023
  • [Nextcloud] Can’t download files to have a size bigger than 2Gi on NextCloud – RaspBerry March 24, 2023
  • [Datadog] Using DataDog to monitor all services on kubernetes March 19, 2023
  • [Metrics Server] Failed to make webhook authorizer request: the server could not find the requested resource March 17, 2023

Archives

  • March 2023
  • February 2023
  • January 2023
  • December 2022
  • November 2022
  • October 2022
  • September 2022
  • August 2022
  • July 2022
  • June 2022
  • May 2022
  • April 2022
  • March 2022
  • February 2022
  • January 2022
  • December 2021
  • November 2021
  • October 2021
  • September 2021
  • August 2021
  • July 2021
  • June 2021

Categories

  • BareMetal
    • NextCloud
  • CI/CD
    • ArgoCD
    • ArgoWorkflows
    • Git
      • Bitbucket
    • Harbor
    • Jenkins
    • Spinnaker
    • TeamCity
  • Coding
    • Golang
    • Jquery & JavaScript
    • Laravel
    • Python
    • Selenium
    • Terraform
      • AWS – Amazon Web Service
      • GCP – Google Cloud
  • Kubernetes & Container
    • Apache Kafka
      • Kafka
      • Kafka Connect
      • Lenses
    • Docker
    • Helm Chart
    • Isito-EnvoyFilter
    • Kong Gateway
    • Kubernetes
      • Ingress
    • Longhorn – Storage
    • MetalLB
    • Vault
    • VictoriaMetrics
  • Log & Monitor
    • DataDog
    • ELK
      • Kibana
      • Logstash
    • Grafana
    • Prometheus
  • Uncategorized
  • Admin

Copyright © 2023 NimTechnology.