Skip to content

NimTechnology

Trình bày các công nghệ CLOUD một cách dễ hiểu.

  • Kubernetes & Container
    • Docker
    • Kubernetes
      • Gateway API
      • Ingress
      • Pod
    • Helm Chart
    • OAuth2 Proxy
    • Isito-EnvoyFilter
    • Apache Kafka
      • Kafka
      • Kafka Connect
      • Lenses
    • Vault
    • Longhorn – Storage
    • VictoriaMetrics
    • MetalLB
    • Kong Gateway
  • CI/CD
    • ArgoCD
    • ArgoWorkflows
    • Argo Events
    • Spinnaker
    • Jenkins
    • Harbor
    • TeamCity
    • Git
      • Bitbucket
  • Coding
    • DevSecOps
    • Terraform
      • GCP – Google Cloud
      • AWS – Amazon Web Service
      • Azure Cloud
    • Golang
    • Laravel
    • Python
    • Jquery & JavaScript
    • Selenium
  • Log, Monitor & Tracing
    • DataDog
    • Prometheus
    • Grafana
    • ELK
      • Kibana
      • Logstash
  • BareMetal
    • NextCloud
  • Toggle search form

[Kafka-connect] Single Message Transform: lesson 5 MaskField – Cover the sensitive data.

Posted on February 26, 2022February 27, 2022 By nim No Comments on [Kafka-connect] Single Message Transform: lesson 5 MaskField – Cover the sensitive data.

Contents

Toggle
  • 1) Setup the components.
  • 2) Practice.

1) Setup the components.

Phần setup kafka, bạn có thể coi lại link này he:
Setup the needing components of Kafka

Bạn nhớ delele container cũ và run lại docker-compose nhé:
docker-compose down -v
docker-compose up -d

2) Practice.

Tạo the data generator
https://www.confluent.io/hub/mdrogalis/voluble

curl -i -X PUT -H  "Content-Type:application/json" \
    http://localhost:8083/connectors/source-voluble-datagen-day5-00/config \
    -d '{
        "connector.class"                          : "io.mdrogalis.voluble.VolubleSourceConnector",
        "genkp.day5-00-person.with"                : "#{Internet.uuid}",
        "genv.day5-00-person.firstName.with"       : "#{Address.firstName}",
        "genv.day5-00-person.lastName.with"        : "#{Address.lastName}",
        "genv.day5-00-person.fullAddress.with"     : "#{Address.fullAddress}",
        "genv.day5-00-person.phone_num.with"       : "#{PhoneNumber.phoneNumber}",
        "genv.day5-00-person.cc_num.with"          : "#{Business.creditCardNumber}",
        "genv.day5-00-person.cc_exp.with"          : "#{Business.creditCardExpiry}",
        "topic.day5-00-person.throttle.ms"         : 500
    }'
docker exec kafkacat kafkacat -b broker:29092 -r http://schema-registry:8081 -s key=s -s value=avro -t day5-00-person -C -c1 -o beginning -u -q -J | jq  '.'

Mình đặt ra một trường hợp, Bạn sử dụng kafka-connect để stream table trong database. Trong table đó thì có vài column thì data của chúng khá là nhạnh cảm (sensitive information) và bạn ko muốn show nó trên topic.
Vậy làm sao?
Chúng ta sử dụng MaskField để che đi data nhạnh cảm đó (sensitive infomation).
Ở đây thì anh Tây che lên bằng chữ <masked>

Ta tạo 1 connector mới.

curl -i -X PUT -H  "Content-Type:application/json" \
    http://localhost:8083/connectors/source-voluble-datagen-day5-01/config \
    -d '{
        "connector.class"                          : "io.mdrogalis.voluble.VolubleSourceConnector",
        "genkp.day5-01-person.with"                : "#{Internet.uuid}",
        "genv.day5-01-person.firstName.with"       : "#{Address.firstName}",
        "genv.day5-01-person.lastName.with"        : "#{Address.lastName}",
        "genv.day5-01-person.fullAddress.with"     : "#{Address.fullAddress}",
        "genv.day5-01-person.phone_num.with"       : "#{PhoneNumber.phoneNumber}",
        "genv.day5-01-person.cc_num.with"          : "#{Business.creditCardNumber}",
        "genv.day5-01-person.cc_exp.with"          : "#{Business.creditCardExpiry}",
        "topic.day5-01-person.throttle.ms"         : 500,
        "transforms"                               : "maskCC",
        "transforms.maskCC.type"                   : "org.apache.kafka.connect.transforms.MaskField$Value",
        "transforms.maskCC.fields"                 : "cc_num,cc_exp",
        "transforms.maskCC.replacement"            : "<masked>"
    }'

Kiểm tra lại data và nhận thấy điều khách biệt:

docker exec kafkacat kafkacat -b broker:29092 -r http://schema-registry:8081 -s key=s -s value=avro -t day5-01-person -C -c1 -o end -u -q -J | jq  '.'
Bạn thấy data đã được che đi!

Giờ chúng ta làm với case sink data từ topic vào Database

curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day5-person-01/config \
    -d '{
          "connector.class"                    : "io.confluent.connect.jdbc.JdbcSinkConnector",
          "connection.url"                     : "jdbc:mysql://mysql:3306/demo",
          "connection.user"                    : "mysqluser",
          "connection.password"                : "mysqlpw",
          "topics"                             : "day5-01-person",
          "tasks.max"                          : "4",
          "auto.create"                        : "true",
          "auto.evolve"                        : "true"
        }'
docker exec -it mysql bash -c 'mysql -u root -p$MYSQL_ROOT_PASSWORD demo'
describe `day5-01-person`;
select * from `day5-01-person` LIMIT 5;
các masked vẫn được giữ cho đến khi ta sink
curl -i -X DELETE -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day5-person-01
docker exec -it mysql bash -c 'mysql -u root -p$MYSQL_ROOT_PASSWORD demo'
show tables;
drop table `day5-01-person`;

Giờ test thử MaskField với connector SINK

curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day5-person-01/config \
    -d '{
          "connector.class"                    : "io.confluent.connect.jdbc.JdbcSinkConnector",
          "connection.url"                     : "jdbc:mysql://mysql:3306/demo",
          "connection.user"                    : "mysqluser",
          "connection.password"                : "mysqlpw",
          "topics"                             : "day5-01-person",
          "tasks.max"                          : "4",
          "auto.create"                        : "true",
          "auto.evolve"                        : "true",
          "transforms"                         : "maskAddress",
          "transforms.maskAddress.type"        : "org.apache.kafka.connect.transforms.MaskField$Value",
          "transforms.maskAddress.fields"      : "fullAddress",
          "transforms.maskAddress.replacement" : "[XredactedX]"
        }'
Vậy là ok rồi nhé

Reference Links:
https://github.com/confluentinc/demo-scene/blob/master/kafka-connect-single-message-transforms/day5.adoc

Apache Kafka, Kafka Connect

Post navigation

Previous Post: [Kafka-connect] Single Message Transform: lesson 4 – RegexRouter – change topic name.
Next Post: [Kafka-connect] Single Message Transform: lesson 6 InsertField – Insert fields using attributes in the process data.

More Related Articles

[Kafka] Install kafka and zookeeper cluster on kubernetes. Apache Kafka
IBM MQ -> RabbitMQ -> Kafka ->Pulsar: How do message queue architectures evolve? Kafka Connect
[Kafka-connect] Single Message Transform: lesson 1 – InsertField in Sink Apache Kafka
[Kafka/Schema-registry] Installing Schema-registry to use for the Kafka and Kafka-connect model Apache Kafka
[Kafka-connect] Single Message Transform: lesson 12 – Community Transformations Kafka Connect
[Kafka-connect] Streaming the data of MongoDB through Kafka-connect into topic Kafka. Apache Kafka

Leave a Reply Cancel reply

Your email address will not be published. Required fields are marked *

Tham Gia Group DevOps nhé!
Để Nim có nhiều động lực ra nhiều bài viết.
Để nhận được những thông báo mới nhất.

Recent Posts

  • [Rancher/EKS] Rancher from v2.12.x can not work on eks cluster. April 15, 2026
  • [Telegram/Openclaw] Configure openclaw bot in a Telegram group. March 31, 2026
  • Tutorial: Gateway API + Traefik + oauth2-proxy (Microsoft Entra ID) March 30, 2026
  • Full + incremental backup: When restoring, do deleted files come back? March 27, 2026
  • [K8S] Create long-lived kubeconfig on k8s March 23, 2026

Archives

  • April 2026
  • March 2026
  • February 2026
  • January 2026
  • December 2025
  • November 2025
  • October 2025
  • September 2025
  • August 2025
  • July 2025
  • June 2025
  • May 2025
  • April 2025
  • March 2025
  • February 2025
  • January 2025
  • December 2024
  • November 2024
  • October 2024
  • September 2024
  • August 2024
  • July 2024
  • June 2024
  • May 2024
  • April 2024
  • March 2024
  • February 2024
  • January 2024
  • December 2023
  • November 2023
  • October 2023
  • September 2023
  • August 2023
  • July 2023
  • June 2023
  • May 2023
  • April 2023
  • March 2023
  • February 2023
  • January 2023
  • December 2022
  • November 2022
  • October 2022
  • September 2022
  • August 2022
  • July 2022
  • June 2022
  • May 2022
  • April 2022
  • March 2022
  • February 2022
  • January 2022
  • December 2021
  • November 2021
  • October 2021
  • September 2021
  • August 2021
  • July 2021
  • June 2021

Categories

  • AI
    • OpenClaw
  • BareMetal
    • NextCloud
  • CI/CD
    • Argo Events
    • ArgoCD
    • ArgoWorkflows
    • Git
      • Bitbucket
    • Harbor
    • Jenkins
    • Spinnaker
    • TeamCity
  • Coding
    • DevSecOps
    • Golang
    • Jquery & JavaScript
    • Laravel
    • NextJS 14 & ReactJS & Type Script
    • Python
    • Selenium
    • Terraform
      • AWS – Amazon Web Service
      • Azure Cloud
      • GCP – Google Cloud
  • Kubernetes & Container
    • Apache Kafka
      • Kafka
      • Kafka Connect
      • Lenses
    • Docker
    • Helm Chart
    • Isito-EnvoyFilter
    • Kong Gateway
    • Kubernetes
      • Gateway API
      • Ingress
      • Pod
    • Longhorn – Storage
    • MetalLB
    • OAuth2 Proxy
    • Vault
    • VictoriaMetrics
  • Log, Monitor & Tracing
    • DataDog
    • ELK
      • Kibana
      • Logstash
    • Fluent
    • Grafana
    • Prometheus
  • Uncategorized
  • Admin

Copyright © 2026 NimTechnology.