Bài này chúng ta sẽ chỉ custom type data khi nào source connector
1) Setup the components.
Phần setup kafka, bạn có thể coi lại link này he:
Setup the needing components of Kafka
Bạn nhớ delele container cũ và run lại docker-compose nhé:docker-compose down -v
docker-compose up -d
2) Practice.
reference link:
https://github.com/confluentinc/demo-scene/blob/master/kafka-connect-single-message-transforms/day9.adoc
curl -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/source-voluble-datagen-day9-00/config \
-d '{
"connector.class" : "io.mdrogalis.voluble.VolubleSourceConnector",
"genkp.day9-transactions.with" : "#{Internet.uuid}",
"genv.day9-transactions.cost.with" : "#{Commerce.price}",
"genv.day9-transactions.units.with" : "#{number.number_between '\''1'\'','\''99'\''}",
"genv.day9-transactions.txn_date.with" : "#{date.past '\''10'\'','\''DAYS'\''}",
"genv.day9-transactions.card_type.with" : "#{Business.creditCardType}",
"genv.day9-transactions.customer_remarks.with": "#{BackToTheFuture.quote}",
"genv.day9-transactions.item.with" : "#{Beer.name}",
"topic.day9-transactions.throttle.ms" : 1000
}'
>>>>>
curl -s "http://localhost:8081/subjects/day9-transactions-value/versions/latest" | jq '.schema|fromjson[]'
>>>>>
Bạn thấy kết quả
[Kafka-connect] Single Message Transform: lesson 8 – TimestampConverter
Như ở bài 8 thì chúng ta custom data type trong quá trình sink data. Nghĩa schema type vẫn dữ nguyên.
Giờ chúng ta chúng ta custom type từ source connector luôn.
Khi đó type lưu trong schema đã được change.
Now we will use the Cast
Single Message Transform to cast the two fields currently held as string to their correct numeric types.
Here we’ll apply the fix to the source connector. Note that we’re using a new target topic (day9-01
) because otherwise you’ll quite rightly get an error (io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema for subject "day9-transactions-value"; error code: 409
).
curl -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/source-voluble-datagen-day9-01/config \
-d '{
"connector.class" : "io.mdrogalis.voluble.VolubleSourceConnector",
"genkp.day9-01-transactions.with" : "#{Internet.uuid}",
"genv.day9-01-transactions.cost.with" : "#{Commerce.price}",
"genv.day9-01-transactions.units.with" : "#{number.number_between '\''1'\'','\''99'\''}",
"genv.day9-01-transactions.txn_date.with" : "#{date.past '\''10'\'','\''DAYS'\''}",
"genv.day9-01-transactions.card_type.with" : "#{Business.creditCardType}",
"genv.day9-01-transactions.customer_remarks.with": "#{BackToTheFuture.quote}",
"genv.day9-01-transactions.item.with" : "#{Beer.name}",
"topic.day9-01-transactions.throttle.ms" : 1000,
"transforms" : "castTypes",
"transforms.castTypes.type" : "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.castTypes.spec" : "cost:float32,units:int16"
}'
curl -s "http://localhost:8081/subjects/day9-01-transactions-value/versions/latest" | jq '.schema|fromjson[]'
Giờ tạo sink connector:
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day9-01/config \
-d '{
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url" : "jdbc:mysql://mysql:3306/demo",
"connection.user" : "mysqluser",
"connection.password": "mysqlpw",
"topics" : "day9-01-transactions",
"tasks.max" : "4",
"auto.create" : "true",
"auto.evolve" : "true"}'
Giờ kiêm trả database:
docker exec -it mysql bash -c 'mysql -u root -p$MYSQL_ROOT_PASSWORD demo'
describe `day9-01-transactions`;