Skip to content

NimTechnology

Trình bày các công nghệ CLOUD một cách dễ hiểu.

  • Kubernetes & Container
    • Docker
    • Kubernetes
      • Ingress
      • Pod
    • Helm Chart
    • OAuth2 Proxy
    • Isito-EnvoyFilter
    • Apache Kafka
      • Kafka
      • Kafka Connect
      • Lenses
    • Vault
    • Longhorn – Storage
    • VictoriaMetrics
    • MetalLB
    • Kong Gateway
  • CI/CD
    • ArgoCD
    • ArgoWorkflows
    • Argo Events
    • Spinnaker
    • Jenkins
    • Harbor
    • TeamCity
    • Git
      • Bitbucket
  • Coding
    • DevSecOps
    • Terraform
      • GCP – Google Cloud
      • AWS – Amazon Web Service
      • Azure Cloud
    • Golang
    • Laravel
    • Python
    • Jquery & JavaScript
    • Selenium
  • Log, Monitor & Tracing
    • DataDog
    • Prometheus
    • Grafana
    • ELK
      • Kibana
      • Logstash
  • BareMetal
    • NextCloud
  • Toggle search form

[Kafka-connect] Single Message Transform: lesson 11 – Predicate and Filter – filtering topic name and content of the messages.

Posted on February 27, 2022 By nim No Comments on [Kafka-connect] Single Message Transform: lesson 11 – Predicate and Filter – filtering topic name and content of the messages.

Contents

Toggle
  • 1) Setup the components.
  • 2) Practice.

1) Setup the components.

Phần setup kafka, bạn có thể coi lại link này he:
Setup the needing components of Kafka

Bạn nhớ delele container cũ và run lại docker-compose nhé:
docker-compose down -v
docker-compose up -d

2) Practice.

reference link:
https://github.com/confluentinc/demo-scene/blob/master/kafka-connect-single-message-transforms/day11.adoc

Với connector source-voluble-datagen-day11-00 thì nó sẽ sinh ra 2 topic:
– day11-sys01
– day11-systemB
OK bạn cần nhớ điều này nhé!

curl -i -X PUT -H  "Content-Type:application/json" \
    http://localhost:8083/connectors/source-voluble-datagen-day11-00/config \
    -d '{
        "connector.class"                 : "io.mdrogalis.voluble.VolubleSourceConnector",
        "genkp.day11-sys01.with"          : "#{Internet.uuid}",
        "genv.day11-sys01.amount.with"    : "#{Commerce.price}",
        "genv.day11-sys01.units.with"     : "#{number.number_between '\''1'\'','\''99'\''}",
        "genv.day11-sys01.txn_date.with"  : "#{date.past '\''10'\'','\''DAYS'\''}",
        "genv.day11-sys01.product.with"   : "#{Beer.name}",
        "genv.day11-sys01.source.with"    : "SYS01",
        "topic.day11-sys01.throttle.ms"   : 1000,
        "genkp.day11-systemB.with"        : "#{Internet.uuid}",
        "genv.day11-systemB.cost.with"    : "#{Commerce.price}",
        "genv.day11-systemB.units.with"   : "#{number.number_between '\''1'\'','\''99'\''}",
        "genv.day11-systemB.txn_date.with": "#{date.past '\''10'\'','\''DAYS'\''}",
        "genv.day11-systemB.item.with"    : "#{Beer.name}",
        "genv.day11-systemB.source.with"  : "SYSTEM_B",
        "topic.day11-systemB.throttle.ms" : 1000
    }'

Với connector source-voluble-datagen-day11-01 chỉ sinh ra 1 topic là sys02.

curl -i -X PUT -H  "Content-Type:application/json" \
    http://localhost:8083/connectors/source-voluble-datagen-day11-01/config \
    -d '{
        "connector.class"                 : "io.mdrogalis.voluble.VolubleSourceConnector",
        "genkp.sys02.with"          : "#{Internet.uuid}",
        "genv.sys02.amount.with"    : "#{Commerce.price}",
        "genv.sys02.units.with"     : "#{number.number_between '\''1'\'','\''99'\''}",
        "genv.sys02.txn_date.with"  : "#{date.past '\''10'\'','\''DAYS'\''}",
        "genv.sys02.product.with"   : "#{Beer.name}",
        "genv.sys02.source.with"    : "SYS02",
        "topic.sys02.throttle.ms"   : 1000,
        "topic.sys02.tombstone.rate":"0.30"
    }'s

Topic day11-sys01

docker exec kafkacat kafkacat -b broker:29092 -r http://schema-registry:8081 -s key=s -s value=avro -C -c1 -o-1 -u -q -J \
  -t day11-sys01  | \
  jq  '.payload.Gen0'

Topic day11-systemB

docker exec kafkacat kafkacat -b broker:29092 -r http://schema-registry:8081 -s key=s -s value=avro -C -c1 -o-1 -u -q -J \
  -t day11-systemB  | \
  jq  '.payload.Gen0'

Giữa topic day11-sys01 và topic day11-systemB có một số field khác nhau.

cost is the same as amount and item the same as product

curl -i -X PUT -H "Accept:application/json" \
  -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day11-00/config \
  -d '{
      "connector.class"                         : "io.confluent.connect.jdbc.JdbcSinkConnector",
      "connection.url"                          : "jdbc:mysql://mysql:3306/demo",
      "connection.user"                         : "mysqluser",
      "connection.password"                     : "mysqlpw",
      "topics.regex"                            : "day11-.*",
      "tasks.max"                               : "4",
      "auto.create"                             : "true",
      "auto.evolve"                             : "true",

      "transforms"                              : "renameSystemBFields,renameTargetTopic",
      "transforms.renameSystemBFields.type"     : "org.apache.kafka.connect.transforms.ReplaceField$Value",
      "transforms.renameSystemBFields.renames"  : "item:product,cost:amount",
      "transforms.renameSystemBFields.predicate": "isSystemBTopic",

      "transforms.renameTargetTopic.type"       : "org.apache.kafka.connect.transforms.RegexRouter",
      "transforms.renameTargetTopic.regex"      : "day11-.*",
      "transforms.renameTargetTopic.replacement": "transactions",

      "predicates"                              : "isSystemBTopic",
      "predicates.isSystemBTopic.type"          : "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
      "predicates.isSystemBTopic.pattern"       : ".*-systemB"
      }'

Bạn có thể them khảo docs của hãng ở đây: Predicate (Filter (Apache Kafka))

Bạn thấy cả 2 topic day11-sys01 và day11-systemB đều đi vào 1 tranforms
Người ta dùng predicates để dánh dấu data của topic thuộc day11-systemB -> tiếp đến họ rename 2 field của topic day11-systemB

Đến lúc này thì các field của 2 topic đã giống nhau -> sử dụng RegexRouter để hợp 2 topic thành 1 và đặt 1 tên mới: transactions

Giờ kiêm tra DB:

docker exec -it mysql bash -c 'mysql -u root -p$MYSQL_ROOT_PASSWORD demo'
show tables;
describe transactions;
SELECT * FROM transactions LIMIT 5;

Case đảo người của predicates:

You can use the negate option to invert a predicate. Consider this predicate:

"predicates"                              : "isSystemBTopic",
"predicates.isSystemBTopic.type"          : "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.isSystemBTopic.pattern"       : ".*-systemB"

 Nếu như bạn apply Single Message Transform lên bất cứ topic nào ngoài trừ topic match với predicates thi set "…negate": "true"

"transforms.renameNonSystemBFields.type"     : "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.renameNonSystemBFields.renames"  : "product:item,amount:cost",
"transforms.renameNonSystemBFields.predicate": "isSystemBTopic",
"transforms.renameNonSystemBFields.negate"   : "true",

Kết quả là renames của ReplaceField chỉ apply lên topic day11-sys01

Filtering out null records

docker exec kafkacat kafkacat -b broker:29092 -r http://schema-registry:8081 -s key=s -s value=avro -C  -o-10 -u -q -J \
  -t sys02  | \
  jq -c '[.offset,.key,.payload]'

Khi chúng ta kiểm tra sẽ thấy tombstone (null) records – các record rỗng – không có data.
These may be by design, or by error – but either way, we want to exclude them from the sink connector pipeline.

Bạn muốn loại bỏ các tombstone (null) records thì sử dụng command sau:

curl -i -X PUT -H "Accept:application/json" \
  -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day11-01/config \
  -d '{
      "connector.class"                     : "io.confluent.connect.jdbc.JdbcSinkConnector",
      "connection.url"                      : "jdbc:mysql://mysql:3306/demo",
      "connection.user"                     : "mysqluser",
      "connection.password"                 : "mysqlpw",
      "topics"                              : "sys02",
      "tasks.max"                           : "4",
      "auto.create"                         : "true",
      "auto.evolve"                         : "true",

      "transforms"                          : "dropNullRecords",
      "transforms.dropNullRecords.type"     : "org.apache.kafka.connect.transforms.Filter",
      "transforms.dropNullRecords.predicate": "isNullRecord",

      "predicates"                          : "isNullRecord",
      "predicates.isNullRecord.type"        : "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone"
      }'

Filtering based on the contents of a message

Với predicates thì các filter của chúng ta đang làm việc với topic là chính
nếu bạn muốn filter content bên trong của 1 message thì sao?
như là chỉ nhận data amount < 42 hay product thì string phải được kết thúc bằng Stout

curl -i -X PUT -H "Accept:application/json" \
  -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day11-02/config \
  -d '{
      "connector.class"                        : "io.confluent.connect.jdbc.JdbcSinkConnector",
      "connection.url"                         : "jdbc:mysql://mysql:3306/demo",
      "connection.user"                        : "mysqluser",
      "connection.password"                    : "mysqlpw",
      "topics"                                 : "day11-sys01",
      "tasks.max"                              : "4",
      "auto.create"                            : "true",
      "auto.evolve"                            : "true",

      "transforms"                             : "filterStout",
      "transforms.filterStout.type"            : "io.confluent.connect.transforms.Filter$Value",
      "transforms.filterStout.filter.condition": "$[?(@.product =~ /.*Stout/)]",
      "transforms.filterStout.filter.type"     : "include"
      }'

Giờ chúng ta tạo 1 sink connector mà chỉ nhận các message có điều kiện sau:
– field product có data kết thúc bằng Stout

docker exec -it mysql bash -c 'mysql -u root -p$MYSQL_ROOT_PASSWORD demo'
show tables;
select * from `day11-sys01`;

OK rồi ha.

Hoặc là bạn muốn chỉ nhận các message mà có amount bé hơn 42

curl -i -X PUT -H "Accept:application/json" \
  -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day11-02/config \
  -d '{
      "connector.class"                        : "io.confluent.connect.jdbc.JdbcSinkConnector",
      "connection.url"                         : "jdbc:mysql://mysql:3306/demo",
      "connection.user"                        : "mysqluser",
      "connection.password"                    : "mysqlpw",
      "topics"                                 : "day11-sys01",
      "tasks.max"                              : "4",
      "auto.create"                            : "true",
      "auto.evolve"                            : "true",

      "transforms"                                  : "castTypes,filterSmallOrder",
      "transforms.filterSmallOrder.type"            : "io.confluent.connect.transforms.Filter$Value",
      "transforms.filterSmallOrder.filter.condition": "$[?(@.amount < 42)]",
      "transforms.filterSmallOrder.filter.type"     : "include",
      "transforms.castTypes.type"                   : "org.apache.kafka.connect.transforms.Cast$Value",
      "transforms.castTypes.spec"                   : "amount:float32"
      }'
Kafka Connect

Post navigation

Previous Post: [Kafka-connect] Single Message Transform: lesson 10 – ReplaceField – U can drop, keep and renamed field in kafka message.
Next Post: [Kafka-connect] Single Message Transform: lesson 12 – Community Transformations

More Related Articles

[Kafka-connect] Single Message Transform: lesson 9 – Cast Apache Kafka
[Kafka-connect] Reset Connector in Kafka Connect Kafka Connect
[Kafka-connect] Streaming the data of MySQL throughs Kafka-connect and Debezium plugin. Apache Kafka
[Kafka-connect] Single Message Transform: lesson 2 – ValueToKey and ExtractField in Sink Apache Kafka
[Kafka-connect] Install Kafka-connect on Kubernetes through helm-chart. Apache Kafka
[Kafka-connect] A few APIs are helpful in Kafka-connect. Apache Kafka

Leave a Reply Cancel reply

Your email address will not be published. Required fields are marked *

Tham Gia Group DevOps nhé!
Để Nim có nhiều động lực ra nhiều bài viết.
Để nhận được những thông báo mới nhất.

Recent Posts

  • [Argo Workflow] Create an access token for Argo Workflows July 14, 2025
  • [Argo Workflow] SSO Authentication for Argo Workflows. July 14, 2025
  • [AWS/EKS] Cache Docker image to accelerate EKS container deployment. July 10, 2025
  • [Laravel] Laravel Helpful June 26, 2025
  • [VScode] Hướng dẫn điều chỉnh font cho terminal June 20, 2025

Archives

  • July 2025
  • June 2025
  • May 2025
  • April 2025
  • March 2025
  • February 2025
  • January 2025
  • December 2024
  • November 2024
  • October 2024
  • September 2024
  • August 2024
  • July 2024
  • June 2024
  • May 2024
  • April 2024
  • March 2024
  • February 2024
  • January 2024
  • December 2023
  • November 2023
  • October 2023
  • September 2023
  • August 2023
  • July 2023
  • June 2023
  • May 2023
  • April 2023
  • March 2023
  • February 2023
  • January 2023
  • December 2022
  • November 2022
  • October 2022
  • September 2022
  • August 2022
  • July 2022
  • June 2022
  • May 2022
  • April 2022
  • March 2022
  • February 2022
  • January 2022
  • December 2021
  • November 2021
  • October 2021
  • September 2021
  • August 2021
  • July 2021
  • June 2021

Categories

  • BareMetal
    • NextCloud
  • CI/CD
    • Argo Events
    • ArgoCD
    • ArgoWorkflows
    • Git
      • Bitbucket
    • Harbor
    • Jenkins
    • Spinnaker
    • TeamCity
  • Coding
    • DevSecOps
    • Golang
    • Jquery & JavaScript
    • Laravel
    • NextJS 14 & ReactJS & Type Script
    • Python
    • Selenium
    • Terraform
      • AWS – Amazon Web Service
      • Azure Cloud
      • GCP – Google Cloud
  • Kubernetes & Container
    • Apache Kafka
      • Kafka
      • Kafka Connect
      • Lenses
    • Docker
    • Helm Chart
    • Isito-EnvoyFilter
    • Kong Gateway
    • Kubernetes
      • Ingress
      • Pod
    • Longhorn – Storage
    • MetalLB
    • OAuth2 Proxy
    • Vault
    • VictoriaMetrics
  • Log, Monitor & Tracing
    • DataDog
    • ELK
      • Kibana
      • Logstash
    • Fluent
    • Grafana
    • Prometheus
  • Uncategorized
  • Admin

Copyright © 2025 NimTechnology.