Hello các bạn sau đâu mình sẽ trình bày về vần để reset 1 connector trên kafka.
Why need you reset a connector in Kafka-connect?
Vì 1 số case thì kafka-connect sẽ read data từ Database rồi write data vào trong 1 topic trên kafka.
Với Mysql thì thì kafka-connect sẽ đọc log bin, với Mongodb thì kafka đọc oplog.
Đôi khi connetor sẽ bị lỗi mất log bin với mongodb thì mình gặp lỗi này sau:
1 | Failed to resume change stream: Resume of change stream was not possible, as the resume point may no longer be in the oplog. 286 (com.mongodb.kafka.connect.source.MongoSourceTask) |
2 | [ 2022 - 04 - 01 13 : 41 : 58 , 733 ] INFO Failed to resume change stream: Resume of change stream was not possible, as the resume point may no longer be in the oplog. 286 (com.mongodb.kafka.connect.source.MongoSourceTask) |
Hoặc đơn giản là các tasks của 1 connector đang lỗi và bạn muốn reset.
Mình đã đọc khá nhiều bài và cũng được khá nhiều đại ca chỉ cách
reference links:
https://rmoff.net/2019/08/15/reset-kafka-connect-source-connector-offsets/
https://soojong.tistory.com/entry/Source-Connector-Offset-%EC%B4%88%EA%B8%B0%ED%99%94-%ED%95%98%EA%B8%B0
https://docs.conduktor.io/features/kafka-connect/features#reset-connector-source-offsets
Riêng với Mongo thì bạn đọc thêm cài này:
https://www.mongodb.com/docs/kafka-connector/current/troubleshooting/recover-from-invalid-resume-token/
1) Reset the Connector immediately.
Các này giải thích sơ.
Vị dụ là
– lúc 1h thì connector bị lỗi thì data từ lúc 1h sẽ không được ghi vào topic trên kafka.
– lúc 2 giờ thì bạn phát hiện ra connector bị lỗi và bạn thực hiện cách Reset the Connector immediately.
–> Thì khả năng data sẽ được bắt đầu ghi vào topic từ lúc 2 giờ.
–> data từ 1h -> 2h khả năng cao sẽ không được ghi topic
Trên đâu là Lưu ý của mình.!
1.1) Look up topic *connect-offset
Tóm lại các trên thì bạn Insert Record vào topic có tên giống như là *connect-offset*
Vậy tại sao mình ko cho bạn 1 cái tên chính sác vì mỗi hệ thông cái lên thì nó ra các tên khác nhau.
Ví dụ mình cài trên k8s thì sẽ tìm theo Environment
1 | - name: CONNECT_OFFSET_STORAGE_TOPIC |
2 | value: kafka - connect - cp - kafka - connect - offset |
Còn đây là trên lenses
Vậy là done việc tìm được topic *connect-offset.
1.2) Reset Connector Source Offsets by conduktor
Phân mền nay thì đã hỗ trợ săn cho chúng ta reset rất đơn giản.
https://docs.conduktor.io/features/kafka-connect/features#reset-connector-source-offsets
1.3) Deep dive into how it works
1.3.1) Mongodb
Như sau khi thực hiện các bước ở 1.2 thì sẽ dô soi topic *conect-offset có j mới lạ hem.
record kiểu như sau:
01 | [ |
02 | { |
03 | "key" : [ |
04 | "source.mongo_connector.todo.v1" , |
05 | { |
06 | "ns" : "mongodb://192.168.101.27:27017/todo.newtodo" |
07 | } |
08 | ] |
09 | } |
10 | ] |
Theo mình là chúng ta có thể tự viết 1 tool.
Khi phát hiện connector fail. -> input record reset connect offset -> restart connector đó