1) Setup the components.
Phần setup kafka, bạn có thể coi lại link này he:
Setup the needing components of Kafka
Bạn nhớ delele container cũ và run lại docker-compose nhé:docker-compose down -v
docker-compose up -d
2) Practice.
Tạo the data generator
https://www.confluent.io/hub/mdrogalis/voluble
curl -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/source-voluble-datagen-00/config \
-d '{
"connector.class" : "io.mdrogalis.voluble.VolubleSourceConnector",
"genkp.day4-transactions.with" : "#{Internet.uuid}",
"genv.day4-transactions.cost.with" : "#{Commerce.price}",
"genv.day4-transactions.card_type.with" : "#{Business.creditCardType}",
"genv.day4-transactions.item.with" : "#{Beer.name}",
"topic.day4-transactions.throttle.ms" : 500
}'
Giờ bạn thấy nó tạo ra 1 topic: day4-transactions
docker exec kafkacat kafkacat -b broker:29092 -L -J | jq '.topics[].topic'|sort
Giờ ta tạo 1 sink connector:
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day4-transactions-00/config \
-d '{
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url" : "jdbc:mysql://mysql:3306/demo",
"connection.user" : "mysqluser",
"connection.password" : "mysqlpw",
"topics" : "day4-transactions",
"tasks.max" : "4",
"auto.create" : "true",
"auto.evolve" : "true"
}'
Kiểm tra các connector:
curl -s "http://localhost:8083/connectors?expand=info&expand=status" | jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | column -s : -t| sed 's/\"//g'| sort
Giờ vào database:
docker exec -it mysql mysql -u mysqluser -pmysqlpw demo
show tables;
mysql> select * from day4-transactions;
ERROR 1064 (42000): You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near '-transactions' at line 1
mysql>
Vậy là dấu hyphen “-” trong tên table thì chúng ta gặp error SQL syntax
Anh trai đó nói là bạn có thể dùng backtick `x` thì sẽ query được hoi.
mysql> select * from `day4-transactions` LIMIT 1;
+-------------+-------+-----------------------+
| card_type | cost | item |
+-------------+-------+-----------------------+
| diners_club | 34.45 | Celebrator Doppelbock |
+-------------+-------+-----------------------+
1 row in set (0.00 sec)
Như mà cách thêm backtick thì hay lắm!
Chúng ta có thể chuyển từ day4-transactions sang –> transactions
Vậy giờ làm sao.
Mặc định JDBC Sink connector sẽ lấy name của topic và tạo thành table trong database.
Còn với Elasticsearch sink connector thì lấy name của topic để tạo thành index elasticsearch.
Theo như anh Tây chúng ta sẽ sử dụng RegExRouter
Using the RegExRouter
override the topic name can be modified either as data is streamed into Kafka from a source connector, or as it leaves Kafka in a sink connector.
https://docs.confluent.io/platform/current/connect/transforms/regexrouter.html
Update the record’s topic using the configured regular expression and replacement string.
Giờ chúng ta sẽ edit connector bên trên bằng cách chỉ giữ lại chữ transactions và drop day4-
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day4-transactions-00/config \
-d '{
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url" : "jdbc:mysql://mysql:3306/demo",
"connection.user" : "mysqluser",
"connection.password" : "mysqlpw",
"topics" : "day4-transactions",
"tasks.max" : "4",
"auto.create" : "true",
"auto.evolve" : "true",
"transforms" : "dropTopicPrefix",
"transforms.dropTopicPrefix.type" : "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropTopicPrefix.regex" : "day4-(.*)",
"transforms.dropTopicPrefix.replacement": "$1"
}'
Bước 1: bạn đặt label cho transform này là dropTopicPrefix
Lúc này transactions sẽ match với regex và chữ transactions sẽ được đưa vào $1
transform replace day4-transactions thành transactions -> create table trong database là transactions
docker exec -it mysql mysql -u mysqluser -pmysqlpw demo
show tables;
Chúng filter regex and place cho nó tường minh hơn.
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day4-transactions-00/config \
-d '{
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url" : "jdbc:mysql://mysql:3306/demo",
"connection.user" : "mysqluser",
"connection.password" : "mysqlpw",
"topics" : "day4-transactions",
"tasks.max" : "4",
"auto.create" : "true",
"auto.evolve" : "true",
"transforms" : "dropTopicPrefix",
"transforms.dropTopicPrefix.type" : "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropTopicPrefix.regex" : "day(\\d+)-(.*)",
"transforms.dropTopicPrefix.replacement": "$2_day$1"
}'
You need to escape the \
when passing it through curl
, so \
becomes \\
.
Đây là nội dung mình tham khảo: