Skip to content

NimTechnology

Trình bày các công nghệ CLOUD một cách dễ hiểu.

  • Kubernetes & Container
    • Docker
    • Kubernetes
      • Ingress
    • Helm Chart
    • Isito-EnvoyFilter
    • Apache Kafka
      • Kafka
      • Kafka Connect
      • Lenses
    • Vault
    • Longhorn – Storage
    • VictoriaMetrics
  • CI/CD
    • ArgoCD
    • ArgoWorkflows
    • Spinnaker
    • Jenkins
  • Coding
    • Terraform
      • GCP – Google Cloud
      • AWS – Amazon Web Service
    • Golang
    • Laravel
    • Jquery & JavaScript
    • Git
    • Selenium
  • Log & Monitor
    • Prometheus
    • Grafana
    • ELK
      • Kibana
      • Logstash
  • BareMetal
  • Toggle search form

[Kafka-connect] Single Message Transform: lesson 5 MaskField – Cover the sensitive data.

Posted on February 26, 2022February 27, 2022 By nim No Comments on [Kafka-connect] Single Message Transform: lesson 5 MaskField – Cover the sensitive data.

Contents

  • 1) Setup the components.
  • 2) Practice.

1) Setup the components.

Phần setup kafka, bạn có thể coi lại link này he:
Setup the needing components of Kafka

Bạn nhớ delele container cũ và run lại docker-compose nhé:
docker-compose down -v
docker-compose up -d

2) Practice.

Tạo the data generator
https://www.confluent.io/hub/mdrogalis/voluble

curl -i -X PUT -H  "Content-Type:application/json" \
    http://localhost:8083/connectors/source-voluble-datagen-day5-00/config \
    -d '{
        "connector.class"                          : "io.mdrogalis.voluble.VolubleSourceConnector",
        "genkp.day5-00-person.with"                : "#{Internet.uuid}",
        "genv.day5-00-person.firstName.with"       : "#{Address.firstName}",
        "genv.day5-00-person.lastName.with"        : "#{Address.lastName}",
        "genv.day5-00-person.fullAddress.with"     : "#{Address.fullAddress}",
        "genv.day5-00-person.phone_num.with"       : "#{PhoneNumber.phoneNumber}",
        "genv.day5-00-person.cc_num.with"          : "#{Business.creditCardNumber}",
        "genv.day5-00-person.cc_exp.with"          : "#{Business.creditCardExpiry}",
        "topic.day5-00-person.throttle.ms"         : 500
    }'
docker exec kafkacat kafkacat -b broker:29092 -r http://schema-registry:8081 -s key=s -s value=avro -t day5-00-person -C -c1 -o beginning -u -q -J | jq  '.'

Mình đặt ra một trường hợp, Bạn sử dụng kafka-connect để stream table trong database. Trong table đó thì có vài column thì data của chúng khá là nhạnh cảm (sensitive information) và bạn ko muốn show nó trên topic.
Vậy làm sao?
Chúng ta sử dụng MaskField để che đi data nhạnh cảm đó (sensitive infomation).
Ở đây thì anh Tây che lên bằng chữ <masked>

Ta tạo 1 connector mới.

curl -i -X PUT -H  "Content-Type:application/json" \
    http://localhost:8083/connectors/source-voluble-datagen-day5-01/config \
    -d '{
        "connector.class"                          : "io.mdrogalis.voluble.VolubleSourceConnector",
        "genkp.day5-01-person.with"                : "#{Internet.uuid}",
        "genv.day5-01-person.firstName.with"       : "#{Address.firstName}",
        "genv.day5-01-person.lastName.with"        : "#{Address.lastName}",
        "genv.day5-01-person.fullAddress.with"     : "#{Address.fullAddress}",
        "genv.day5-01-person.phone_num.with"       : "#{PhoneNumber.phoneNumber}",
        "genv.day5-01-person.cc_num.with"          : "#{Business.creditCardNumber}",
        "genv.day5-01-person.cc_exp.with"          : "#{Business.creditCardExpiry}",
        "topic.day5-01-person.throttle.ms"         : 500,
        "transforms"                               : "maskCC",
        "transforms.maskCC.type"                   : "org.apache.kafka.connect.transforms.MaskField$Value",
        "transforms.maskCC.fields"                 : "cc_num,cc_exp",
        "transforms.maskCC.replacement"            : "<masked>"
    }'

Kiểm tra lại data và nhận thấy điều khách biệt:

docker exec kafkacat kafkacat -b broker:29092 -r http://schema-registry:8081 -s key=s -s value=avro -t day5-01-person -C -c1 -o end -u -q -J | jq  '.'
Bạn thấy data đã được che đi!

Giờ chúng ta làm với case sink data từ topic vào Database

curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day5-person-01/config \
    -d '{
          "connector.class"                    : "io.confluent.connect.jdbc.JdbcSinkConnector",
          "connection.url"                     : "jdbc:mysql://mysql:3306/demo",
          "connection.user"                    : "mysqluser",
          "connection.password"                : "mysqlpw",
          "topics"                             : "day5-01-person",
          "tasks.max"                          : "4",
          "auto.create"                        : "true",
          "auto.evolve"                        : "true"
        }'
docker exec -it mysql bash -c 'mysql -u root -p$MYSQL_ROOT_PASSWORD demo'
describe `day5-01-person`;
select * from `day5-01-person` LIMIT 5;
các masked vẫn được giữ cho đến khi ta sink
curl -i -X DELETE -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day5-person-01
docker exec -it mysql bash -c 'mysql -u root -p$MYSQL_ROOT_PASSWORD demo'
show tables;
drop table `day5-01-person`;

Giờ test thử MaskField với connector SINK

curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day5-person-01/config \
    -d '{
          "connector.class"                    : "io.confluent.connect.jdbc.JdbcSinkConnector",
          "connection.url"                     : "jdbc:mysql://mysql:3306/demo",
          "connection.user"                    : "mysqluser",
          "connection.password"                : "mysqlpw",
          "topics"                             : "day5-01-person",
          "tasks.max"                          : "4",
          "auto.create"                        : "true",
          "auto.evolve"                        : "true",
          "transforms"                         : "maskAddress",
          "transforms.maskAddress.type"        : "org.apache.kafka.connect.transforms.MaskField$Value",
          "transforms.maskAddress.fields"      : "fullAddress",
          "transforms.maskAddress.replacement" : "[XredactedX]"
        }'
Vậy là ok rồi nhé

Reference Links:
https://github.com/confluentinc/demo-scene/blob/master/kafka-connect-single-message-transforms/day5.adoc

Apache Kafka, Kafka Connect

Post navigation

Previous Post: [Kafka-connect] Single Message Transform: lesson 4 – RegexRouter – change topic name.
Next Post: [Kafka-connect] Single Message Transform: lesson 6 InsertField – Insert fields using attributes in the process data.

More Related Articles

[Kafka-connect] Streaming the data of MongoDB through Kafka-connect into topic Kafka. Apache Kafka
[Kafka-connect] Single Message Transform: lesson 1 – InsertField in Sink Apache Kafka
[Kafka-connect] Single Message Transform: lesson 8 – TimestampConverter – convert the type of the fields from string to timestamp or date,… Apache Kafka
[Kafka] Console Consumer CLI and Consumers in Group Apache Kafka
[Lenses/kafka] Fix the problem “Cannot extract connector information from the configuration provided” when creating connector Kafka-connect Apache Kafka
[Kafka-connect] Single Message Transform: lesson 4 – RegexRouter – change topic name. Apache Kafka

Leave a Reply Cancel reply

Your email address will not be published. Required fields are marked *

Tham Gia Group DevOps nhé!
Để Nim có nhiều động lực ra nhiều bài viết.
Để nhận được những thông báo mới nhất.

Recent Posts

  • Protected: My Assignment  June 24, 2022
  • [Spinnaker] Spinnaker writes too many logs – Reduce spinnaker log level June 22, 2022
  • [Jenkins] Jobs will be created automatically by Jenkins Job Builder June 20, 2022
  • [Postgresql] Install postgresql client and trying a few command postgresql. June 20, 2022
  • [Mount/Nextcloud] How do you mount a hard disk that was used windows into Linux. June 19, 2022

Archives

  • June 2022
  • May 2022
  • April 2022
  • March 2022
  • February 2022
  • January 2022
  • December 2021
  • November 2021
  • October 2021
  • September 2021
  • August 2021
  • July 2021
  • June 2021

Categories

  • BareMetal
  • CI/CD
    • ArgoCD
    • ArgoWorkflows
    • Jenkins
    • Spinnaker
  • Coding
    • Git
    • Golang
    • Jquery & JavaScript
    • Laravel
    • Selenium
    • Terraform
      • AWS – Amazon Web Service
      • GCP – Google Cloud
  • Kubernetes & Container
    • Apache Kafka
      • Kafka
      • Kafka Connect
      • Lenses
    • Docker
    • Helm Chart
    • Isito-EnvoyFilter
    • Kubernetes
      • Ingress
    • Longhorn – Storage
    • Vault
    • VictoriaMetrics
  • Log & Monitor
    • ELK
      • Kibana
      • Logstash
    • Grafana
    • Prometheus
  • Uncategorized
  • Admin

Copyright © 2022 NimTechnology.