Skip to content

NimTechnology

Trình bày các công nghệ CLOUD một cách dễ hiểu.

  • Kubernetes & Container
    • Docker
    • Kubernetes
      • Ingress
      • Pod
    • Helm Chart
    • Isito-EnvoyFilter
    • Apache Kafka
      • Kafka
      • Kafka Connect
      • Lenses
    • Vault
    • Longhorn – Storage
    • VictoriaMetrics
    • MetalLB
    • Kong Gateway
  • CI/CD
    • ArgoCD
    • ArgoWorkflows
    • Spinnaker
    • Jenkins
    • Harbor
    • TeamCity
    • Git
      • Bitbucket
  • Coding
    • DevSecOps
    • Terraform
      • GCP – Google Cloud
      • AWS – Amazon Web Service
    • Golang
    • Laravel
    • Python
    • Jquery & JavaScript
    • Selenium
  • Log, Monitor & Tracing
    • DataDog
    • Prometheus
    • Grafana
    • ELK
      • Kibana
      • Logstash
  • BareMetal
    • NextCloud
  • Toggle search form

[Kafka-connect] Reset Connector in Kafka Connect

Posted on April 3, 2022April 3, 2022 By nim No Comments on [Kafka-connect] Reset Connector in Kafka Connect

Hello các bạn sau đâu mình sẽ trình bày về vần để reset 1 connector trên kafka.

Why need you reset a connector in Kafka-connect?
Vì 1 số case thì kafka-connect sẽ read data từ Database rồi write data vào trong 1 topic trên kafka.
Với Mysql thì thì kafka-connect sẽ đọc log bin, với Mongodb thì kafka đọc oplog.
Đôi khi connetor sẽ bị lỗi mất log bin với mongodb thì mình gặp lỗi này sau:

Failed to resume change stream: Resume of change stream was not possible, as the resume point may no longer be in the oplog. 286 (com.mongodb.kafka.connect.source.MongoSourceTask)
[2022-04-01 13:41:58,733] INFO Failed to resume change stream: Resume of change stream was not possible, as the resume point may no longer be in the oplog. 286 (com.mongodb.kafka.connect.source.MongoSourceTask)

Hoặc đơn giản là các tasks của 1 connector đang lỗi và bạn muốn reset.

Mình đã đọc khá nhiều bài và cũng được khá nhiều đại ca chỉ cách
reference links:
https://rmoff.net/2019/08/15/reset-kafka-connect-source-connector-offsets/
https://soojong.tistory.com/entry/Source-Connector-Offset-%EC%B4%88%EA%B8%B0%ED%99%94-%ED%95%98%EA%B8%B0
https://docs.conduktor.io/features/kafka-connect/features#reset-connector-source-offsets

Riêng với Mongo thì bạn đọc thêm cài này:
https://www.mongodb.com/docs/kafka-connector/current/troubleshooting/recover-from-invalid-resume-token/

Contents

  • 1) Reset the Connector immediately.
    • 1.1) Look up topic *connect-offset
    • 1.2) Reset Connector Source Offsets by conduktor
    • 1.3) Deep dive into how it works
      • 1.3.1) Mongodb

1) Reset the Connector immediately.

Các này giải thích sơ.
Vị dụ là
– lúc 1h thì connector bị lỗi thì data từ lúc 1h sẽ không được ghi vào topic trên kafka.
– lúc 2 giờ thì bạn phát hiện ra connector bị lỗi và bạn thực hiện cách Reset the Connector immediately.
–> Thì khả năng data sẽ được bắt đầu ghi vào topic từ lúc 2 giờ.
–> data từ 1h -> 2h khả năng cao sẽ không được ghi topic

Trên đâu là Lưu ý của mình.!

1.1) Look up topic *connect-offset

Tóm lại các trên thì bạn Insert Record vào topic có tên giống như là *connect-offset*

Vậy tại sao mình ko cho bạn 1 cái tên chính sác vì mỗi hệ thông cái lên thì nó ra các tên khác nhau.
Ví dụ mình cài trên k8s thì sẽ tìm theo Environment

- name: CONNECT_OFFSET_STORAGE_TOPIC
  value: kafka-connect-cp-kafka-connect-offset

Còn đây là trên lenses

Vậy là done việc tìm được topic *connect-offset.

1.2) Reset Connector Source Offsets by conduktor

Phân mền nay thì đã hỗ trợ săn cho chúng ta reset rất đơn giản.
https://docs.conduktor.io/features/kafka-connect/features#reset-connector-source-offsets

1.3) Deep dive into how it works

1.3.1) Mongodb

Như sau khi thực hiện các bước ở 1.2 thì sẽ dô soi topic *conect-offset có j mới lạ hem.

Mình thấy là conduktor sẽ insert 1 record có key là thông tin connector mongo và không có value.

record kiểu như sau:

[
    {
        "key": [
            "source.mongo_connector.todo.v1",
            {
                "ns": "mongodb://192.168.101.27:27017/todo.newtodo"
            }
        ]
    }
]

Theo mình là chúng ta có thể tự viết 1 tool.
Khi phát hiện connector fail. -> input record reset connect offset -> restart connector đó

Kafka Connect

Post navigation

Previous Post: [Kubernetes] Changing DNS or Hosts of the POD on Kubernetes
Next Post: [Terraform/GCP] Registering a Google Cloud Account is Free!

More Related Articles

[Kafka-connect] research on Kafka Connect Source and demo watch the changing file. Apache Kafka
[Kafka-connect] Streaming the data of Postgresql through Kafka-connect and Debezium plugin. Apache Kafka
[Kafka-connect] Single Message Transform: lesson 2 – ValueToKey and ExtractField in Sink Apache Kafka
[Kafka-connect] Install lenses to manage Kafka. Apache Kafka
[Kafka-connect] Single Message Transform: lesson 6 InsertField – Insert fields using attributes in the process data. Apache Kafka
[Kafka-connect] A few APIs are helpful in Kafka-connect. Apache Kafka

Leave a Reply Cancel reply

Your email address will not be published. Required fields are marked *

Tham Gia Group DevOps nhé!
Để Nim có nhiều động lực ra nhiều bài viết.
Để nhận được những thông báo mới nhất.

Recent Posts

  • [Monitoring] Understanding Different Types of Metrics in Monitoring Systems: Gauge, Counter, Histogram, and Summary December 6, 2023
  • [Golang/Protocol Buffers] Binarilize data by Proto Buf December 5, 2023
  • [Helm] Fail when running helm upgrade November 30, 2023
  • [AWS/EKS] Unlocking Simplicity and Security of Amazon EKS Pod Identity. November 30, 2023
  • [Gitleaks/njsscan/semgrep] Secure Your code by CAST and SAST tools November 27, 2023

Archives

  • December 2023
  • November 2023
  • October 2023
  • September 2023
  • August 2023
  • July 2023
  • June 2023
  • May 2023
  • April 2023
  • March 2023
  • February 2023
  • January 2023
  • December 2022
  • November 2022
  • October 2022
  • September 2022
  • August 2022
  • July 2022
  • June 2022
  • May 2022
  • April 2022
  • March 2022
  • February 2022
  • January 2022
  • December 2021
  • November 2021
  • October 2021
  • September 2021
  • August 2021
  • July 2021
  • June 2021

Categories

  • BareMetal
    • NextCloud
  • CI/CD
    • ArgoCD
    • ArgoWorkflows
    • Git
      • Bitbucket
    • Harbor
    • Jenkins
    • Spinnaker
    • TeamCity
  • Coding
    • DevSecOps
    • Golang
    • Jquery & JavaScript
    • Laravel
    • Python
    • Selenium
    • Terraform
      • AWS – Amazon Web Service
      • GCP – Google Cloud
  • Kubernetes & Container
    • Apache Kafka
      • Kafka
      • Kafka Connect
      • Lenses
    • Docker
    • Helm Chart
    • Isito-EnvoyFilter
    • Kong Gateway
    • Kubernetes
      • Ingress
      • Pod
    • Longhorn – Storage
    • MetalLB
    • Vault
    • VictoriaMetrics
  • Log, Monitor & Tracing
    • DataDog
    • ELK
      • Kibana
      • Logstash
    • Fluent
    • Grafana
    • Prometheus
  • Uncategorized
  • Admin

Copyright © 2023 NimTechnology.