Skip to content

NimTechnology

Trình bày các công nghệ CLOUD một cách dễ hiểu.

  • Kubernetes & Container
    • Docker
    • Kubernetes
      • Ingress
    • Helm Chart
    • Isito-EnvoyFilter
    • Apache Kafka
      • Kafka
      • Kafka Connect
      • Lenses
    • Vault
    • Longhorn – Storage
    • VictoriaMetrics
  • CI/CD
    • ArgoCD
    • ArgoWorkflows
    • Spinnaker
    • Jenkins
    • Harbor
    • TeamCity
    • Git
      • Bitbucket
  • Coding
    • Terraform
      • GCP – Google Cloud
      • AWS – Amazon Web Service
    • Golang
    • Laravel
    • Python
    • Jquery & JavaScript
    • Selenium
  • Log & Monitor
    • Prometheus
    • Grafana
    • ELK
      • Kibana
      • Logstash
  • BareMetal
  • Toggle search form

[Kafka-Connect] Overwrite value of Key on Mongo Source Connector – Mongodb

Posted on April 1, 2022April 7, 2022 By nim No Comments on [Kafka-Connect] Overwrite value of Key on Mongo Source Connector – Mongodb

Mình đã có 1 bài hướng dẫn các bạn tạo Connector Source CDC từ read data từ 1 database và 1 collection thuộc 1 Mongodb và write vào topic.

[Kafka-connect] Streaming the data of MongoDB through Kafka-connect into topic Kafka.

Ta để ý value của key sẽ thay đổi liên tục.

1st: Create Document A –(read data change)–> Kafka-connect –(write topic)–> Kafka –> 1 message có key: 1111 (partition 1)

Key này có vẻ được auto gen

2rd: Edit Document A –(read data change)–> Kafka-connect –(write topic)–> Kafka –> 1 message có key: 1211 (partition 2)

Lần này key được gen với số khác thì nó vào partition khác

Vậy làm sao để khi bạn tạo hoặc sửa Document A —> kafka-connect chỉ sinh ra 1 key duy nhất (key giống với lần create đầu tiên)
==> tất cả các message liên quan đến Document A đều đi vô 1 partition.

Contents

  • 1) using “output.schema.key”
  • 2) using ValueToKey

1) using “output.schema.key”

Bạn sẽ cần sử dụng:
MongoSourceConnector
Version: 1.7.0 | Author: Mongodb team

Trong config cần có cái này:

output.schema.key={"type":"record","name":"id","fields":[{"name":"documentKey._id","type":"string"}]}
output.format.key=schema

Dưới đây là việc thử change config và view lại data trong topic!

connector.class=com.mongodb.kafka.connect.MongoSourceConnector
tasks.max=1
batch.size=0
change.stream.full.document=updateLookup
collection=newtodo
database=todo
topic.prefix=mongouat
poll.await.time.ms=5000
connection.uri=mongodb://192.168.101.27:27017/?replicaSet=rs0
name=source.mongo_connector.todo.v1
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
poll.max.batch.size=1000
Bạn để này view cho dễ

Mình muốn lấy giá trị của documentKey._id để làm value của KEY schema

có thể bạn sẽ thắc mắc sao mình ko lấy là documentKey._id.$oid
nếu dụng như trên thì nó lỗi
https://jira.mongodb.org/browse/KAFKA-162
bạn có thêm tham khảo links trên nghe đồn là hết lỗi.

connector.class=com.mongodb.kafka.connect.MongoSourceConnector
tasks.max=1
batch.size=0
change.stream.full.document=updateLookup
collection=newtodo
output.schema.key={"type":"record","name":"id","fields":[{"name":"documentKey._id","type":"string"}]}
output.format.key=schema
database=todo
topic.prefix=mongov2
poll.await.time.ms=5000
connection.uri=mongodb://192.168.101.27:27017/?replicaSet=rs0
name=source.mongo_connector.todo.v2
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
poll.max.batch.size=1000


key của ta có Struct{}

Để sử lý struct{}
bạn có thể tham khảo link dưới:
https://nimtechnology.com/2022/02/20/kafka-connect-single-message-transform-lesson-2-valuetokey-and-extractfield-in-sink
https://docs.confluent.io/platform/current/connect/transforms/extractfield.html#examples
Đó là phần ExtractField

connector.class=com.mongodb.kafka.connect.MongoSourceConnector
tasks.max=1
transforms.ExtractField.field=documentKey._id
batch.size=0
change.stream.full.document=updateLookup
transforms=ExtractField
collection=newtodo
output.schema.key={"type":"record","name":"id","fields":[{"name":"documentKey._id","type":"string"}]}
output.format.key=schema
database=todo
topic.prefix=mongov2
poll.await.time.ms=5000
connection.uri=mongodb://192.168.101.27:27017/?replicaSet=rs0
name=source.mongo_connector.todo.v2
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
poll.max.batch.size=1000
transforms.ExtractField.type=org.apache.kafka.connect.transforms.ExtractField$Key
connector.class=com.mongodb.kafka.connect.MongoSourceConnector
tasks.max=1
transforms.ExtractField.field=documentKey._id
batch.size=0
change.stream.full.document=updateLookup
transforms=ExtractField
collection=newtodo
output.schema.key={"type":"record","name":"id","fields":[{"name":"documentKey._id","type":"string"}]}
output.format.key=schema
database=todo
topic.prefix=mongov2
poll.await.time.ms=5000
connection.uri=mongodb://192.168.101.27:27017/?replicaSet=rs0
name=source.mongo_connector.todo.v2
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.json.JsonConverter
poll.max.batch.size=1000
transforms.ExtractField.type=org.apache.kafka.connect.transforms.ExtractField$Key
connector.class=com.mongodb.kafka.connect.MongoSourceConnector
tasks.max=1
transforms.ExtractField.field=documentKey._id
batch.size=0
change.stream.full.document=updateLookup
transforms=ExtractField
collection=newtodo
output.schema.key={"type":"record","name":"id","fields":[{"name":"documentKey._id","type":"string"}]}
output.format.key=schema
key.converter.schemas.enable=false
database=todo
topic.prefix=mongov2
poll.await.time.ms=5000
connection.uri=mongodb://192.168.101.27:27017/?replicaSet=rs0
name=source.mongo_connector.todo.v2
value.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.json.JsonConverter
poll.max.batch.size=1000
transforms.ExtractField.type=org.apache.kafka.connect.transforms.ExtractField$Key
connector.class=com.mongodb.kafka.connect.MongoSourceConnector
tasks.max=1
transforms.ExtractField.field=documentKey._id
batch.size=0
change.stream.full.document=updateLookup
transforms=ExtractField
collection=newtodo
output.schema.key={"type":"record","name":"id","fields":[{"name":"documentKey._id","type":"string"}]}
output.format.key=schema
key.converter.schemas.enable=false
database=todo
topic.prefix=mongov2
poll.await.time.ms=5000
connection.uri=mongodb://192.168.101.27:27017/?replicaSet=rs0
name=source.mongo_connector.todo.v2
value.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
poll.max.batch.size=1000
transforms.ExtractField.type=org.apache.kafka.connect.transforms.ExtractField$Key

2) using ValueToKey

transforms.insertKey.fields: documentKey
transforms.insertKey.type: org.apache.kafka.connect.transforms.ValueToKey
transforms:insertKey

Bạn có thể tham khảo thêm 1 bài viết của mình:
https://nimtechnology.com/2022/02/20/kafka-connect-single-message-transform-lesson-2-valuetokey-and-extractfield-in-sink/

Kafka Connect

Post navigation

Previous Post: [OpenVPN] How do you monitor Your OpenVPN-Server via Prometheus and Grafana
Next Post: [Kubernestes] Inserting command inside docker-compose or the manifest of a deployment

More Related Articles

[Kafka-connect] Single Message Transform: lesson 9 – Cast Apache Kafka
[Kafka-connect] Install lenses to manage Kafka. Apache Kafka
[Kafka-connect] Single Message Transform: lesson 8 – TimestampConverter – convert the type of the fields from string to timestamp or date,… Apache Kafka
[Kafka-connect] A few APIs are helpful in Kafka-connect. Apache Kafka
[Kafka-connect] Single Message Transform: lesson 10 – ReplaceField – U can drop, keep and renamed field in kafka message. Kafka Connect
[Kafka-connect] Install Kafka-connect on Kubernetes through helm-chart. Apache Kafka

Leave a Reply Cancel reply

Your email address will not be published. Required fields are marked *

Tham Gia Group DevOps nhé!
Để Nim có nhiều động lực ra nhiều bài viết.
Để nhận được những thông báo mới nhất.

Recent Posts

  • [Smartctl] Instruction check the health disk of Raspberry. January 16, 2023
  • [kubectl/Argocd] How to create a kubectl config file for serviceaccount or from the cluster secret of Argocd January 12, 2023
  • [Helm/Github] Create a public Helm chart repository with GitHub Pages January 8, 2023
  • [AWS] How to increase the disk size of a Windows EC2 machine? January 4, 2023
  • [Redis] ElastiCache-Redis Cross-Region Replication|Global DataStore January 3, 2023

Archives

  • January 2023
  • December 2022
  • November 2022
  • October 2022
  • September 2022
  • August 2022
  • July 2022
  • June 2022
  • May 2022
  • April 2022
  • March 2022
  • February 2022
  • January 2022
  • December 2021
  • November 2021
  • October 2021
  • September 2021
  • August 2021
  • July 2021
  • June 2021

Categories

  • BareMetal
  • CI/CD
    • ArgoCD
    • ArgoWorkflows
    • Git
      • Bitbucket
    • Harbor
    • Jenkins
    • Spinnaker
    • TeamCity
  • Coding
    • Golang
    • Jquery & JavaScript
    • Laravel
    • Python
    • Selenium
    • Terraform
      • AWS – Amazon Web Service
      • GCP – Google Cloud
  • Kubernetes & Container
    • Apache Kafka
      • Kafka
      • Kafka Connect
      • Lenses
    • Docker
    • Helm Chart
    • Isito-EnvoyFilter
    • Kubernetes
      • Ingress
    • Longhorn – Storage
    • Vault
    • VictoriaMetrics
  • Log & Monitor
    • ELK
      • Kibana
      • Logstash
    • Grafana
    • Prometheus
  • Uncategorized
  • Admin

Copyright © 2023 NimTechnology.