1) Setup the components.
Phần setup kafka, bạn có thể coi lại link này he:
Setup the needing components of Kafka
Bạn nhớ delele container cũ và run lại docker-compose nhé:docker-compose down -v
docker-compose up -d
2) Practice.
reference link:
https://github.com/confluentinc/demo-scene/blob/master/kafka-connect-single-message-transforms/day11.adoc
Với connector source-voluble-datagen-day11-00 thì nó sẽ sinh ra 2 topic:
– day11-sys01
– day11-systemB
OK bạn cần nhớ điều này nhé!
curl -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/source-voluble-datagen-day11-00/config \
-d '{
"connector.class" : "io.mdrogalis.voluble.VolubleSourceConnector",
"genkp.day11-sys01.with" : "#{Internet.uuid}",
"genv.day11-sys01.amount.with" : "#{Commerce.price}",
"genv.day11-sys01.units.with" : "#{number.number_between '\''1'\'','\''99'\''}",
"genv.day11-sys01.txn_date.with" : "#{date.past '\''10'\'','\''DAYS'\''}",
"genv.day11-sys01.product.with" : "#{Beer.name}",
"genv.day11-sys01.source.with" : "SYS01",
"topic.day11-sys01.throttle.ms" : 1000,
"genkp.day11-systemB.with" : "#{Internet.uuid}",
"genv.day11-systemB.cost.with" : "#{Commerce.price}",
"genv.day11-systemB.units.with" : "#{number.number_between '\''1'\'','\''99'\''}",
"genv.day11-systemB.txn_date.with": "#{date.past '\''10'\'','\''DAYS'\''}",
"genv.day11-systemB.item.with" : "#{Beer.name}",
"genv.day11-systemB.source.with" : "SYSTEM_B",
"topic.day11-systemB.throttle.ms" : 1000
}'
Với connector source-voluble-datagen-day11-01 chỉ sinh ra 1 topic là sys02.
curl -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/source-voluble-datagen-day11-01/config \
-d '{
"connector.class" : "io.mdrogalis.voluble.VolubleSourceConnector",
"genkp.sys02.with" : "#{Internet.uuid}",
"genv.sys02.amount.with" : "#{Commerce.price}",
"genv.sys02.units.with" : "#{number.number_between '\''1'\'','\''99'\''}",
"genv.sys02.txn_date.with" : "#{date.past '\''10'\'','\''DAYS'\''}",
"genv.sys02.product.with" : "#{Beer.name}",
"genv.sys02.source.with" : "SYS02",
"topic.sys02.throttle.ms" : 1000,
"topic.sys02.tombstone.rate":"0.30"
}'s
Topic day11-sys01
docker exec kafkacat kafkacat -b broker:29092 -r http://schema-registry:8081 -s key=s -s value=avro -C -c1 -o-1 -u -q -J \
-t day11-sys01 | \
jq '.payload.Gen0'
Topic day11-systemB
docker exec kafkacat kafkacat -b broker:29092 -r http://schema-registry:8081 -s key=s -s value=avro -C -c1 -o-1 -u -q -J \
-t day11-systemB | \
jq '.payload.Gen0'
Giữa topic day11-sys01
và topic day11-systemB
có một số field khác nhau.
cost
is the same as amount
and item
the same as product
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day11-00/config \
-d '{
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url" : "jdbc:mysql://mysql:3306/demo",
"connection.user" : "mysqluser",
"connection.password" : "mysqlpw",
"topics.regex" : "day11-.*",
"tasks.max" : "4",
"auto.create" : "true",
"auto.evolve" : "true",
"transforms" : "renameSystemBFields,renameTargetTopic",
"transforms.renameSystemBFields.type" : "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.renameSystemBFields.renames" : "item:product,cost:amount",
"transforms.renameSystemBFields.predicate": "isSystemBTopic",
"transforms.renameTargetTopic.type" : "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.renameTargetTopic.regex" : "day11-.*",
"transforms.renameTargetTopic.replacement": "transactions",
"predicates" : "isSystemBTopic",
"predicates.isSystemBTopic.type" : "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.isSystemBTopic.pattern" : ".*-systemB"
}'
Bạn có thể them khảo docs của hãng ở đây: Predicate (Filter (Apache Kafka))
Bạn thấy cả 2 topic day11-sys01
và day11-systemB
đều đi vào 1 tranforms
Người ta dùng predicates để dánh dấu data của topic thuộc day11-systemB
-> tiếp đến họ rename 2 field của topic day11-systemB
Đến lúc này thì các field của 2 topic đã giống nhau -> sử dụng RegexRouter để hợp 2 topic thành 1 và đặt 1 tên mới: transactions
Giờ kiêm tra DB:
docker exec -it mysql bash -c 'mysql -u root -p$MYSQL_ROOT_PASSWORD demo'
show tables;
describe transactions;
SELECT * FROM transactions LIMIT 5;
Case đảo người của predicates:
You can use the negate
option to invert a predicate. Consider this predicate:
"predicates" : "isSystemBTopic",
"predicates.isSystemBTopic.type" : "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.isSystemBTopic.pattern" : ".*-systemB"
Nếu như bạn apply Single Message Transform lên bất cứ topic nào ngoài trừ topic match với predicates thi set "…negate": "true"
"transforms.renameNonSystemBFields.type" : "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.renameNonSystemBFields.renames" : "product:item,amount:cost",
"transforms.renameNonSystemBFields.predicate": "isSystemBTopic",
"transforms.renameNonSystemBFields.negate" : "true",
Kết quả là renames của ReplaceField chỉ apply lên topic day11-sys01
Filtering out null records
docker exec kafkacat kafkacat -b broker:29092 -r http://schema-registry:8081 -s key=s -s value=avro -C -o-10 -u -q -J \
-t sys02 | \
jq -c '[.offset,.key,.payload]'
Khi chúng ta kiểm tra sẽ thấy tombstone (null) records – các record rỗng – không có data.
These may be by design, or by error – but either way, we want to exclude them from the sink connector pipeline.
Bạn muốn loại bỏ các tombstone (null) records thì sử dụng command sau:
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day11-01/config \
-d '{
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url" : "jdbc:mysql://mysql:3306/demo",
"connection.user" : "mysqluser",
"connection.password" : "mysqlpw",
"topics" : "sys02",
"tasks.max" : "4",
"auto.create" : "true",
"auto.evolve" : "true",
"transforms" : "dropNullRecords",
"transforms.dropNullRecords.type" : "org.apache.kafka.connect.transforms.Filter",
"transforms.dropNullRecords.predicate": "isNullRecord",
"predicates" : "isNullRecord",
"predicates.isNullRecord.type" : "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone"
}'
Filtering based on the contents of a message
Với predicates thì các filter của chúng ta đang làm việc với topic là chính
nếu bạn muốn filter content bên trong của 1 message thì sao?
như là chỉ nhận data amount < 42 hay product thì string phải được kết thúc bằng Stout
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day11-02/config \
-d '{
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url" : "jdbc:mysql://mysql:3306/demo",
"connection.user" : "mysqluser",
"connection.password" : "mysqlpw",
"topics" : "day11-sys01",
"tasks.max" : "4",
"auto.create" : "true",
"auto.evolve" : "true",
"transforms" : "filterStout",
"transforms.filterStout.type" : "io.confluent.connect.transforms.Filter$Value",
"transforms.filterStout.filter.condition": "$[?(@.product =~ /.*Stout/)]",
"transforms.filterStout.filter.type" : "include"
}'
Giờ chúng ta tạo 1 sink connector mà chỉ nhận các message có điều kiện sau:
– field product có data kết thúc bằng Stout
docker exec -it mysql bash -c 'mysql -u root -p$MYSQL_ROOT_PASSWORD demo'
show tables;
select * from `day11-sys01`;
OK rồi ha.
Hoặc là bạn muốn chỉ nhận các message mà có amount bé hơn 42
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day11-02/config \
-d '{
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url" : "jdbc:mysql://mysql:3306/demo",
"connection.user" : "mysqluser",
"connection.password" : "mysqlpw",
"topics" : "day11-sys01",
"tasks.max" : "4",
"auto.create" : "true",
"auto.evolve" : "true",
"transforms" : "castTypes,filterSmallOrder",
"transforms.filterSmallOrder.type" : "io.confluent.connect.transforms.Filter$Value",
"transforms.filterSmallOrder.filter.condition": "$[?(@.amount < 42)]",
"transforms.filterSmallOrder.filter.type" : "include",
"transforms.castTypes.type" : "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.castTypes.spec" : "amount:float32"
}'