1) Setup
Phần setup kafka, bạn có thể coi lại link này he:
Setup the needing components of Kafka
2) Practice
giờ chui vào mysql nghịch:
docker exec -it mysql bash -c 'mysql -u root -p$MYSQL_ROOT_PASSWORD demo'
tạo table và insert data.
create table customers (
id INT,
full_name VARCHAR(50),
birthdate DATE,
fav_animal VARCHAR(50),
fav_colour VARCHAR(50),
fav_movie VARCHAR(50)
);
insert into customers (id, full_name, birthdate, fav_animal, fav_colour, fav_movie) values (1, 'Leone Puxley', '1995-02-06', 'Violet-eared waxbill', 'Puce', 'Oh! What a Lovely War');
insert into customers (id, full_name, birthdate, fav_animal, fav_colour, fav_movie) values (2, 'Angelo Sharkey', '1996-04-08', 'Macaw, green-winged', 'Red', 'View from the Top, A');
insert into customers (id, full_name, birthdate, fav_animal, fav_colour, fav_movie) values (3, 'Jozef Bailey', '1954-07-10', 'Little brown bat', 'Indigo', '99 francs');
insert into customers (id, full_name, birthdate, fav_animal, fav_colour, fav_movie) values (4, 'Evelyn Deakes', '1975-09-13', 'Vervet monkey', 'Teal', 'Jane Austen in Manhattan');
insert into customers (id, full_name, birthdate, fav_animal, fav_colour, fav_movie) values (5, 'Dermot Perris', '1991-01-29', 'African ground squirrel (unidentified)', 'Khaki', 'Restless');
insert into customers (id, full_name, birthdate, fav_animal, fav_colour, fav_movie) values (6, 'Renae Bonsale', '1965-01-05', 'Brown antechinus', 'Fuscia', 'Perfect Day, A (Un giorno perfetto)');
insert into customers (id, full_name, birthdate, fav_animal, fav_colour, fav_movie) values (7, 'Florella Fridlington', '1950-08-07', 'Burmese brown mountain tortoise', 'Purple', 'Dot the I');
insert into customers (id, full_name, birthdate, fav_animal, fav_colour, fav_movie) values (8, 'Hettie Keepence', '1971-10-14', 'Crab-eating raccoon', 'Puce', 'Outer Space');
insert into customers (id, full_name, birthdate, fav_animal, fav_colour, fav_movie) values (9, 'Briano Quene', '1990-05-02', 'Cormorant, large', 'Yellow', 'Peacekeeper, The');
insert into customers (id, full_name, birthdate, fav_animal, fav_colour, fav_movie) values (10, 'Jeddy Cassell', '1978-12-24', 'Badger, european', 'Indigo', 'Shadow of a Doubt');
Giờ chúng ta đã có table mới:
Create source connector
reference links:
https://github.com/confluentinc/demo-scene/blob/master/kafka-connect-single-message-transforms/day2.adoc
https://docs.confluent.io/current/connect/kafka-connect-jdbc/source-connector/
Kafka Connect Deep Dive – JDBC Source Connector
Giờ bạn run command để tạo source jdbc:
curl -X PUT http://localhost:8083/connectors/source-jdbc-mysql-00/config \
-H "Content-Type: application/json" -d '{
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mysql://mysql:3306/demo",
"connection.user": "mysqluser",
"connection.password": "mysqlpw",
"topic.prefix": "mysql-00-",
"poll.interval.ms": 1000,
"tasks.max":1,
"table.whitelist" : "customers",
"mode":"incrementing",
"incrementing.column.name": "id",
"validate.non.null": false
}'
OK giờ show topic ra he:
docker exec kafkacat kafkacat -b broker:29092 -L -J | jq '.topics[].topic'|sort
giờ chúng ta Preview data:
docker exec kafkacat kafkacat -b broker:29092 -r http://schema-registry:8081 -s key=s -s value=avro -t mysql-00-customers -C -c1 -o beginning -u -q -J | jq '.'
ValueToKey
Replace the record key with a new key formed from a subset of fields in the record value.
https://docs.confluent.io/platform/current/connect/transforms/valuetokey.html
curl -X PUT http://localhost:8083/connectors/source-jdbc-mysql-01/config \
-H "Content-Type: application/json" -d '{
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mysql://mysql:3306/demo",
"connection.user": "mysqluser",
"connection.password": "mysqlpw",
"topic.prefix": "mysql-01-",
"poll.interval.ms": 1000,
"tasks.max":1,
"table.whitelist" : "customers",
"mode":"incrementing",
"incrementing.column.name": "id",
"validate.non.null": false,
"transforms": "copyIdToKey",
"transforms.copyIdToKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.copyIdToKey.fields": "id"
}'
“transforms”: “copyIdToKey”,
“transforms.copyIdToKey.type”: “org.apache.kafka.connect.transforms.ValueToKey”,
“transforms.copyIdToKey.fields”: “id”
Anh tây nói như thế này:
Bước 1: đặt label cho cái transform là: copyIdToKey
Bước 2: Từ 1 key trong topic thì nó sẽ tương ứng với 1 column hay 1 field trong database -> ta lấy ra được value (giá trị) -> overwrite key mà chúng ta đang bị null ở trên.
Mình ví dụ: với 1 dòng insert sau:
insert into customers (id, full_name, birthdate, fav_animal, fav_colour, fav_movie) values (1, ‘Leone Puxley’, ‘1995-02-06’, ‘Violet-eared waxbill’, ‘Puce’, ‘Oh! What a Lovely War’);
Chúng lấy column là id và chúng ta có giá trị là 1 chúng ta overwrite cái key: null
Đại loại là như thế
Kiểm tra topic:
docker exec kafkacat kafkacat -b broker:29092 -L -J | jq '.topics[].topic'|sort
chúng ta review lại data:docker exec kafkacat kafkacat -b broker:29092 -r http://schema-registry:8081 -s key=s -s value=avro -t mysql-01-customers -C -c1 -o beginning -u -q -J | jq '.'
Combining ValueToKey
and ExtractField
The above SMT will write a struct to the key, and often you want just the primitive value instead. That’s what ExtractField
does.
ExtractField
pulls a field out of a complex (non-primitive, Map or Struct) key or value and replaces the entire key or value with the extracted field. Any null
values are passed through unmodified.
curl -X PUT http://localhost:8083/connectors/source-jdbc-mysql-02/config \
-H "Content-Type: application/json" -d '{
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mysql://mysql:3306/demo",
"connection.user": "mysqluser",
"connection.password": "mysqlpw",
"topic.prefix": "mysql-02-",
"poll.interval.ms": 1000,
"tasks.max":1,
"table.whitelist" : "customers",
"mode":"incrementing",
"incrementing.column.name": "id",
"validate.non.null": false,
"transforms": "copyIdToKey,extractKeyFromStruct",
"transforms.copyIdToKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.copyIdToKey.fields": "id",
"transforms.extractKeyFromStruct.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractKeyFromStruct.field":"id"
}'
Chúng ta đặt 1 label mới: extractKeyFromStruct
Bạn để ý cái giá trị của key chúng ta là Struct{id=1}
Lúc này thằng “copyIdToKey” đưa cho “extractKeyFromStruct” data như sau:
“key”: “Struct{id=1}”
Vì data này đầy đủ key và value nên ta sử dụng:
“transforms.extractKeyFromStruct.type”:”org.apache.kafka.connect.transforms.ExtractField$Key“,
Note 1 chút:
Vậy khi nào sài:
“transforms.ExtractField.type”:”org.apache.kafka.connect.transforms.ExtractField$Value“
khi mà data vào là value thôi thí dụ: “Struct{id=1}”
reference links:
https://docs.confluent.io/platform/current/connect/transforms/extractfield.html#examples
“transforms.extractKeyFromStruct.field”:”id”
tiếp đến là extract (trích xuất) value của key “id” trả về con số 1
giờ preview lại data:
docker exec kafkacat kafkacat -b broker:29092 -r http://schema-registry:8081 -s key=s -s value=avro -t mysql-02-customers -C -c1 -o beginning -u -q -J | jq '.'