Hello các bạn sau đâu mình sẽ trình bày về vần để reset 1 connector trên kafka.
Why need you reset a connector in Kafka-connect?
Vì 1 số case thì kafka-connect sẽ read data từ Database rồi write data vào trong 1 topic trên kafka.
Với Mysql thì thì kafka-connect sẽ đọc log bin, với Mongodb thì kafka đọc oplog.
Đôi khi connetor sẽ bị lỗi mất log bin với mongodb thì mình gặp lỗi này sau:
Failed to resume change stream: Resume of change stream was not possible, as the resume point may no longer be in the oplog. 286 (com.mongodb.kafka.connect.source.MongoSourceTask)
[2022-04-01 13:41:58,733] INFO Failed to resume change stream: Resume of change stream was not possible, as the resume point may no longer be in the oplog. 286 (com.mongodb.kafka.connect.source.MongoSourceTask)
Hoặc đơn giản là các tasks của 1 connector đang lỗi và bạn muốn reset.
Mình đã đọc khá nhiều bài và cũng được khá nhiều đại ca chỉ cách
reference links:
https://rmoff.net/2019/08/15/reset-kafka-connect-source-connector-offsets/
https://soojong.tistory.com/entry/Source-Connector-Offset-%EC%B4%88%EA%B8%B0%ED%99%94-%ED%95%98%EA%B8%B0
https://docs.conduktor.io/features/kafka-connect/features#reset-connector-source-offsets
Riêng với Mongo thì bạn đọc thêm cài này:
https://www.mongodb.com/docs/kafka-connector/current/troubleshooting/recover-from-invalid-resume-token/
1) Reset the Connector immediately.
Các này giải thích sơ.
Vị dụ là
– lúc 1h thì connector bị lỗi thì data từ lúc 1h sẽ không được ghi vào topic trên kafka.
– lúc 2 giờ thì bạn phát hiện ra connector bị lỗi và bạn thực hiện cách Reset the Connector immediately.
–> Thì khả năng data sẽ được bắt đầu ghi vào topic từ lúc 2 giờ.
–> data từ 1h -> 2h khả năng cao sẽ không được ghi topic
Trên đâu là Lưu ý của mình.!
1.1) Look up topic *connect-offset
Tóm lại các trên thì bạn Insert Record vào topic có tên giống như là *connect-offset*
Vậy tại sao mình ko cho bạn 1 cái tên chính sác vì mỗi hệ thông cái lên thì nó ra các tên khác nhau.
Ví dụ mình cài trên k8s thì sẽ tìm theo Environment
- name: CONNECT_OFFSET_STORAGE_TOPIC
value: kafka-connect-cp-kafka-connect-offset
Còn đây là trên lenses
Vậy là done việc tìm được topic *connect-offset.
1.2) Reset Connector Source Offsets by conduktor
Phân mền nay thì đã hỗ trợ săn cho chúng ta reset rất đơn giản.
https://docs.conduktor.io/features/kafka-connect/features#reset-connector-source-offsets
1.3) Deep dive into how it works
1.3.1) Mongodb
Như sau khi thực hiện các bước ở 1.2 thì sẽ dô soi topic *conect-offset có j mới lạ hem.
record kiểu như sau:
[
{
"key": [
"source.mongo_connector.todo.v1",
{
"ns": "mongodb://192.168.101.27:27017/todo.newtodo"
}
]
}
]
Theo mình là chúng ta có thể tự viết 1 tool.
Khi phát hiện connector fail. -> input record reset connect offset -> restart connector đó