1) Setup the components.
Phần setup kafka, bạn có thể coi lại link này he:
Setup the needing components of Kafka
Bạn nhớ delele container cũ và run lại docker-compose nhé:docker-compose down -v
docker-compose up -d
2) Practice.
Tạo the data generator
https://www.confluent.io/hub/mdrogalis/voluble
curl -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/source-voluble-datagen-day5-00/config \
-d '{
"connector.class" : "io.mdrogalis.voluble.VolubleSourceConnector",
"genkp.day5-00-person.with" : "#{Internet.uuid}",
"genv.day5-00-person.firstName.with" : "#{Address.firstName}",
"genv.day5-00-person.lastName.with" : "#{Address.lastName}",
"genv.day5-00-person.fullAddress.with" : "#{Address.fullAddress}",
"genv.day5-00-person.phone_num.with" : "#{PhoneNumber.phoneNumber}",
"genv.day5-00-person.cc_num.with" : "#{Business.creditCardNumber}",
"genv.day5-00-person.cc_exp.with" : "#{Business.creditCardExpiry}",
"topic.day5-00-person.throttle.ms" : 500
}'
docker exec kafkacat kafkacat -b broker:29092 -r http://schema-registry:8081 -s key=s -s value=avro -t day5-00-person -C -c1 -o beginning -u -q -J | jq '.'
Mình đặt ra một trường hợp, Bạn sử dụng kafka-connect để stream table trong database. Trong table đó thì có vài column thì data của chúng khá là nhạnh cảm (sensitive information) và bạn ko muốn show nó trên topic.
Vậy làm sao?
Chúng ta sử dụng MaskField
để che đi data nhạnh cảm đó (sensitive infomation).
Ở đây thì anh Tây che lên bằng chữ <masked>
Ta tạo 1 connector mới.
curl -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/source-voluble-datagen-day5-01/config \
-d '{
"connector.class" : "io.mdrogalis.voluble.VolubleSourceConnector",
"genkp.day5-01-person.with" : "#{Internet.uuid}",
"genv.day5-01-person.firstName.with" : "#{Address.firstName}",
"genv.day5-01-person.lastName.with" : "#{Address.lastName}",
"genv.day5-01-person.fullAddress.with" : "#{Address.fullAddress}",
"genv.day5-01-person.phone_num.with" : "#{PhoneNumber.phoneNumber}",
"genv.day5-01-person.cc_num.with" : "#{Business.creditCardNumber}",
"genv.day5-01-person.cc_exp.with" : "#{Business.creditCardExpiry}",
"topic.day5-01-person.throttle.ms" : 500,
"transforms" : "maskCC",
"transforms.maskCC.type" : "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.maskCC.fields" : "cc_num,cc_exp",
"transforms.maskCC.replacement" : "<masked>"
}'
Kiểm tra lại data và nhận thấy điều khách biệt:
docker exec kafkacat kafkacat -b broker:29092 -r http://schema-registry:8081 -s key=s -s value=avro -t day5-01-person -C -c1 -o end -u -q -J | jq '.'
Giờ chúng ta làm với case sink data từ topic vào Database
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day5-person-01/config \
-d '{
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url" : "jdbc:mysql://mysql:3306/demo",
"connection.user" : "mysqluser",
"connection.password" : "mysqlpw",
"topics" : "day5-01-person",
"tasks.max" : "4",
"auto.create" : "true",
"auto.evolve" : "true"
}'
docker exec -it mysql bash -c 'mysql -u root -p$MYSQL_ROOT_PASSWORD demo'
describe `day5-01-person`;
select * from `day5-01-person` LIMIT 5;
curl -i -X DELETE -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day5-person-01
docker exec -it mysql bash -c 'mysql -u root -p$MYSQL_ROOT_PASSWORD demo'
show tables;
drop table `day5-01-person`;
Giờ test thử MaskField với connector SINK
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day5-person-01/config \
-d '{
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url" : "jdbc:mysql://mysql:3306/demo",
"connection.user" : "mysqluser",
"connection.password" : "mysqlpw",
"topics" : "day5-01-person",
"tasks.max" : "4",
"auto.create" : "true",
"auto.evolve" : "true",
"transforms" : "maskAddress",
"transforms.maskAddress.type" : "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.maskAddress.fields" : "fullAddress",
"transforms.maskAddress.replacement" : "[XredactedX]"
}'
Reference Links:
https://github.com/confluentinc/demo-scene/blob/master/kafka-connect-single-message-transforms/day5.adoc