Skip to content

NimTechnology

Trình bày các công nghệ CLOUD một cách dễ hiểu.

  • Kubernetes & Container
    • Docker
    • Kubernetes
      • Ingress
      • Pod
    • Helm Chart
    • OAuth2 Proxy
    • Isito-EnvoyFilter
    • Apache Kafka
      • Kafka
      • Kafka Connect
      • Lenses
    • Vault
    • Longhorn – Storage
    • VictoriaMetrics
    • MetalLB
    • Kong Gateway
  • CI/CD
    • ArgoCD
    • ArgoWorkflows
    • Argo Events
    • Spinnaker
    • Jenkins
    • Harbor
    • TeamCity
    • Git
      • Bitbucket
  • Coding
    • DevSecOps
    • Terraform
      • GCP – Google Cloud
      • AWS – Amazon Web Service
      • Azure Cloud
    • Golang
    • Laravel
    • Python
    • Jquery & JavaScript
    • Selenium
  • Log, Monitor & Tracing
    • DataDog
    • Prometheus
    • Grafana
    • ELK
      • Kibana
      • Logstash
  • BareMetal
    • NextCloud
  • Toggle search form

[Kafka] Console Consumer CLI and Consumers in Group

Posted on January 31, 2022April 7, 2023 By nim No Comments on [Kafka] Console Consumer CLI and Consumers in Group

Giờ chúng ta tìm hiểu về việc read và write vào 1 topic trên kafka bằng CLI

Contents

Toggle
  • 1) Overview
    • 1.1) Consumers
    • 1.3) Consumer Groups
  • 2) Practice
    • 2.1) Consumer CLI
    • 2.2) Consumer Groups CLI
    • 2.3) Consumer Offsets
    • 2.4) Delivery Semantics for consumers
    • 2.5) Demo many consumers inside a Group with many partitions
    • 2.6) Resetting Offsets
      • 2.6.1) to-earliest
      • 2.6.2) –shift-by
  • 3) Ot
  • 4) Consumer rebalancing on Kafka.

1) Overview

1.1) Consumers

  • Consumers read data from a topic (identified by name)
  • Consumers know which broker to read from
  • In case of broker failures, consumers know how to recover
  • Data is read in order within each partitions

1.3) Consumer Groups

  • Consumers read data in consumer groups
  • Each consumer within a group reads from exclusive partitions
  • If you have more consumers than partitions, some consumers will be inactive

What if too many consumers?

  • If you have more consumers than partitions, some consumers will be inactive

giờ mình thực hiện mở 2 cửa sổ:
– 1 consumer để lắng nghe topic
– 1 producer để write data into topic

2) Practice

2.1) Consumer CLI

###consumer 
kafka-console-consumer.sh  --bootstrap-server localhost:9092 --topic first-topic

###producer
kafka-console-producer.sh  --bootstrap-server localhost:9092 --topic first-topic                                    
nếu bạn làm như ảnh trên
bạn sẽ thấy như kiểu nhắn tin messager

2.2) Consumer Groups CLI

kafka-consumer-groups.sh

chúng ta có thể list ra các consumer groups

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
chúng ta thấy đã có 1 group là “console-consumer-69371“

Theo mình là khi chúng ta tạo 1 consumer để lắng nghe 1 topic thì nó tự động tao 1 comsumer-group-<random>
Nếu các bạn có thời gian thì kiểm chứng thử giúp mình nhé.

### Consumer Group
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first-topic --group my-first-application

###producer
kafka-console-producer.sh  --bootstrap-server localhost:9092 --topic first-topic 

mình sẽ thực hiện kiểm tra ngay consumer group

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-first-application
vì là chỉ có 1 consumer nên là nó lắng nghe cả 3 partitions
mình thực hiện control + C để thoát consumer
khí describe lại thì nó thông báo là không còn consumer nào đang active

khi này mình sẽ write topic sau đó thì describe group sem ntn?

chúng ta thấy có điều thú vị. Mỗi dòng mà chúng ta nhắn qua đó là 1 offset nhé!
Vậy chúng ta có 3 offset được sent vào topic và mỗi offset sẽ được lưu trữ(archive) vào 1 partition
bạn sẽ thấy là CURRENT-OFFSET 3 3 3 và LOG-END-OFFSET 4 4 4, LAG 1 1 1.

mình mở lại group my-first-application

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first-topic --group my-first-application
bạn sẽ thấy các offset appear(xuất hiện)
bạn thấy đã có sử thay đổi ở
CURRENT-OFFSET, LOG-END-OFFSET, LAG

giờ mình sẽ tạo và lắng nghe trên 1 consumer-group mới.

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first-topic --group my-second-application
mình sẽ không thấy được các offset cũ của topic này
vì nó đã được các comsumer của group first nhận rồi thì phải.
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first-topic --group my-second-application --from-beginning

mình đã sử dụng flag là –from-beginning nhưng cũng chả thấy j?

nhưng nếu mình chạy group mới và thêm flag là –from-beginning
nó hiện tất cả các messages từ đầu trong topic đó

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first-topic --group my-third-application --from-beginning

2.3) Consumer Offsets

  • Kafka stores the offsets at which a consumer group has been reading
  • The offsets committed live in a Kafka topic named __consumer_offsets
  • When a consumer in a group has processed data received from Kafka, it should be committing the offsets
  • If a consumer dies, it will be able to read back from where it left off thanks to the committed consumer offsets!

Chúng ta có thể hiểu đơn giản là Kafka sẽ có cách để đánh dấu các offset đã được đọc. Bạn có thể back lại các đề nghiềm ngẫm 1 chút nhé!

2.4) Delivery Semantics for consumers

  • Consumers choose when to commit offsets.
  • There are 3 delivery semantics:
  • At most once:
    • offsets are committed as soon as the message is received.
    • If the processing goes wrong, the message will be lost (it won’t be read again).
  • Atleast once (usually preferred):
    • offsets are committed after the message is processed.
    • If the processing goes wrong, the message will be read again.
    • This can result in duplicate processing of messages. Make sure your processing is idempotent(ie. processing again the messages won’t impact your systems)
  • Exactly once:
    • Can be achieved for Kafka => Kafka workflows using Kafka Streams API
    • For Kafka => External System workflows, use an idempotent consumer.

2.5) Demo many consumers inside a Group with many partitions

Giờ mình mở 3 console group consumers và 1 producer

#### 3 console group consumers
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-first-application

#### producer
kafka-console-producer.sh  --bootstrap-server localhost:9092 --topic first-topic

mình thực hiện write 3 message.

bạn thấy là mỗi message được write vào 1 partition
mỗi consumer sẽ đọc 1 message

giờ mình cho lost 1 consumer.

khi write 3 messages – 3 partitions – 2 consumers
thì sẽ có 1 consumer nhận 2 messages và 1 consumer nhận 1 message

2.6) Resetting Offsets

Bạn nhớ khi reset phải out hết tất cả các consumer nhé
Error: Assignments can only be reset if the group ‘my-first-application’ is inactive, but the current state is Stable.

2.6.1) to-earliest

Cái này là reset toàn bộ các offsets

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-first-application
LAG 0 0 0 thì group my-first-application ko có offset nào chưa được read

Vậy thì ta có thể reset offset được hem nhi?

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-first-application --reset-offsets --to-earliest --execute --topic first-topic
giờ kiểm tra thì LAG trở lại thành 6 6 6

giờ start consumer read data of topic

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first-topic --group my-first-application

2.6.2) –shift-by

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-first-application --reset-offsets --shift-by 2 --execute --topic first-topic

Chỗ này warning vì lag đang là 0 0 0 nên ko thể set –shift-by 2
shift-by > 0 thì kiểu trong topic của bạn đang có vài offset chưa được read và bạn cũng muốn commited các offset đó luôn, không cần đọc các offset này.

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-first-application --reset-offsets --shift-by -2 --execute --topic first-topic

shift-by < 0 thì là bạn muốn đọc lại 1 số offsets đã được commit

3) Ot

The CLI has many options, but here are the others that are most commonly used:

Producer with keys

kafka-console-producer --broker-list 127.0.0.1:9092 --topic first_topic --property parse.key=true --property key.separator=,
> key,value
> another key,another value

Consumer with keys

kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic first_topic --from-beginning --property print.key=true --property key.separator=,

KafkaCat (https://github.com/edenhill/kafkacat) is an open-source alternative to using the Kafka CLI, created by Magnus Edenhill.

While KafkaCat is not used in this course, if you have any interest in trying it out, I recommend reading: https://medium.com/@coderunner/debugging-with-kafkacat-df7851d21968

4) Consumer rebalancing on Kafka.

When a Kafka consumer group goes through a rebalance, it is normal for the consumers to pause processing for a short period of time while the rebalance is happening. This is because during a rebalance, the Kafka broker is reassigning the partitions to the consumers in the group, and the consumers need to stop processing their existing partitions and wait for the broker to finish the reassignment process.

While the consumers are waiting for the rebalance to complete, they are in a “rebalance in progress” state, and they cannot consume any messages from the Kafka topics. However, once the rebalance is complete, the consumers are assigned new partitions to process, and they can resume processing messages.

During the rebalance process, the Kafka consumer client may appear to hang, but this is usually temporary and is part of the normal behavior of the consumer group. The duration of the rebalance process depends on various factors such as the number of partitions, the number of consumers, and the configuration of the Kafka cluster. In general, the rebalance process should complete within a few seconds to a few minutes, depending on the complexity of the rebalance.

It’s important to note that while a rebalance is happening, the Kafka cluster is still operational, and producers can continue to send messages to Kafka topics. Therefore, it’s important to design the Kafka consumer group in such a way that rebalances do not cause significant downtime or processing delays.

Happy learning!

Apache Kafka, Kafka

Post navigation

Previous Post: [web] Chuyển ảnh sang text
Next Post: [Golang] Chuẩn bị 1 project golang.

More Related Articles

[Kafka-connect] Single Message Transform: lesson 5 MaskField – Cover the sensitive data. Apache Kafka
[Kafka-connect] Install lenses to manage Kafka. Apache Kafka
[Kafka-connect] Single Message Transform: lesson 1 – InsertField in Sink Apache Kafka
[Kafka-connect] Single Message Transform: lesson 9 – Cast Apache Kafka
[Kafka] Install kafka and zookeeper cluster on kubernetes. Apache Kafka
[Azure/Event Hub/Kafka] Using Event Hub instead of Kafka on the Azure Cloud. Azure Cloud

Leave a Reply Cancel reply

Your email address will not be published. Required fields are marked *

Tham Gia Group DevOps nhé!
Để Nim có nhiều động lực ra nhiều bài viết.
Để nhận được những thông báo mới nhất.

Recent Posts

  • [Laravel] Laravel Helpful June 26, 2025
  • [VScode] Hướng dẫn điều chỉnh font cho terminal June 20, 2025
  • [WordPress] Hướng dấn gửi mail trên WordPress thông qua gmail. June 15, 2025
  • [Bitbucket] Git Clone/Pull/Push with Bitbucket through API Token. June 12, 2025
  • [Teamcity] How to transfer the value from pipeline A to pipeline B June 9, 2025

Archives

  • June 2025
  • May 2025
  • April 2025
  • March 2025
  • February 2025
  • January 2025
  • December 2024
  • November 2024
  • October 2024
  • September 2024
  • August 2024
  • July 2024
  • June 2024
  • May 2024
  • April 2024
  • March 2024
  • February 2024
  • January 2024
  • December 2023
  • November 2023
  • October 2023
  • September 2023
  • August 2023
  • July 2023
  • June 2023
  • May 2023
  • April 2023
  • March 2023
  • February 2023
  • January 2023
  • December 2022
  • November 2022
  • October 2022
  • September 2022
  • August 2022
  • July 2022
  • June 2022
  • May 2022
  • April 2022
  • March 2022
  • February 2022
  • January 2022
  • December 2021
  • November 2021
  • October 2021
  • September 2021
  • August 2021
  • July 2021
  • June 2021

Categories

  • BareMetal
    • NextCloud
  • CI/CD
    • Argo Events
    • ArgoCD
    • ArgoWorkflows
    • Git
      • Bitbucket
    • Harbor
    • Jenkins
    • Spinnaker
    • TeamCity
  • Coding
    • DevSecOps
    • Golang
    • Jquery & JavaScript
    • Laravel
    • NextJS 14 & ReactJS & Type Script
    • Python
    • Selenium
    • Terraform
      • AWS – Amazon Web Service
      • Azure Cloud
      • GCP – Google Cloud
  • Kubernetes & Container
    • Apache Kafka
      • Kafka
      • Kafka Connect
      • Lenses
    • Docker
    • Helm Chart
    • Isito-EnvoyFilter
    • Kong Gateway
    • Kubernetes
      • Ingress
      • Pod
    • Longhorn – Storage
    • MetalLB
    • OAuth2 Proxy
    • Vault
    • VictoriaMetrics
  • Log, Monitor & Tracing
    • DataDog
    • ELK
      • Kibana
      • Logstash
    • Fluent
    • Grafana
    • Prometheus
  • Uncategorized
  • Admin

Copyright © 2025 NimTechnology.