Skip to content

NimTechnology

Trình bày các công nghệ CLOUD một cách dễ hiểu.

  • Kubernetes & Container
    • Docker
    • Kubernetes
      • Ingress
    • Helm Chart
    • Isito-EnvoyFilter
    • Apache Kafka
      • Kafka
      • Kafka Connect
      • Lenses
    • Vault
    • Longhorn – Storage
    • VictoriaMetrics
    • MetalLB
    • Kong Gateway
  • CI/CD
    • ArgoCD
    • ArgoWorkflows
    • Spinnaker
    • Jenkins
    • Harbor
    • TeamCity
    • Git
      • Bitbucket
  • Coding
    • Terraform
      • GCP – Google Cloud
      • AWS – Amazon Web Service
    • Golang
    • Laravel
    • Python
    • Jquery & JavaScript
    • Selenium
  • Log & Monitor
    • DataDog
    • Prometheus
    • Grafana
    • ELK
      • Kibana
      • Logstash
  • BareMetal
    • NextCloud
  • Toggle search form

[Kafka] Console Consumer CLI and Consumers in Group

Posted on January 31, 2022February 27, 2022 By nim No Comments on [Kafka] Console Consumer CLI and Consumers in Group

Giờ chúng ta tìm hiểu về việc read và write vào 1 topic trên kafka bằng CLI

Contents

  • 1) Overview
    • 1.1) Consumers
    • 1.3) Consumer Groups
  • 2) Practice
    • 2.1) Consumer CLI
    • 2.2) Consumer Groups CLI
    • 2.3) Consumer Offsets
    • 2.4) Delivery Semantics for consumers
    • 2.5) Demo many consumers inside a Group with many partitions
    • 2.6) Resetting Offsets
      • 2.6.1) to-earliest
      • 2.6.2) –shift-by
  • 3) Ot

1) Overview

1.1) Consumers

  • Consumers read data from a topic (identified by name)
  • Consumers know which broker to read from
  • In case of broker failures, consumers know how to recover
  • Data is read in order within each partitions

1.3) Consumer Groups

  • Consumers read data in consumer groups
  • Each consumer within a group reads from exclusive partitions
  • If you have more consumers than partitions, some consumers will be inactive

What if too many consumers?

  • If you have more consumers than partitions, some consumers will be inactive

giờ mình thực hiện mở 2 cửa sổ:
– 1 consumer để lắng nghe topic
– 1 producer để write data into topic

2) Practice

2.1) Consumer CLI

###consumer 
kafka-console-consumer.sh  --bootstrap-server localhost:9092 --topic first-topic

###producer
kafka-console-producer.sh  --bootstrap-server localhost:9092 --topic first-topic                                    
nếu bạn làm như ảnh trên
bạn sẽ thấy như kiểu nhắn tin messager

2.2) Consumer Groups CLI

kafka-consumer-groups.sh

chúng ta có thể list ra các consumer groups

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
chúng ta thấy đã có 1 group là “console-consumer-69371“

Theo mình là khi chúng ta tạo 1 consumer để lắng nghe 1 topic thì nó tự động tao 1 comsumer-group-<random>
Nếu các bạn có thời gian thì kiểm chứng thử giúp mình nhé.

### Consumer Group
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first-topic --group my-first-application

###producer
kafka-console-producer.sh  --bootstrap-server localhost:9092 --topic first-topic 

mình sẽ thực hiện kiểm tra ngay consumer group

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-first-application
vì là chỉ có 1 consumer nên là nó lắng nghe cả 3 partitions
mình thực hiện control + C để thoát consumer
khí describe lại thì nó thông báo là không còn consumer nào đang active

khi này mình sẽ write topic sau đó thì describe group sem ntn?

chúng ta thấy có điều thú vị. Mỗi dòng mà chúng ta nhắn qua đó là 1 offset nhé!
Vậy chúng ta có 3 offset được sent vào topic và mỗi offset sẽ được lưu trữ(archive) vào 1 partition
bạn sẽ thấy là CURRENT-OFFSET 3 3 3 và LOG-END-OFFSET 4 4 4, LAG 1 1 1.

mình mở lại group my-first-application

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first-topic --group my-first-application
bạn sẽ thấy các offset appear(xuất hiện)
bạn thấy đã có sử thay đổi ở
CURRENT-OFFSET, LOG-END-OFFSET, LAG

giờ mình sẽ tạo và lắng nghe trên 1 consumer-group mới.

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first-topic --group my-second-application
mình sẽ không thấy được các offset cũ của topic này
vì nó đã được các comsumer của group first nhận rồi thì phải.
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first-topic --group my-second-application --from-beginning

mình đã sử dụng flag là –from-beginning nhưng cũng chả thấy j?

nhưng nếu mình chạy group mới và thêm flag là –from-beginning
nó hiện tất cả các messages từ đầu trong topic đó

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first-topic --group my-third-application --from-beginning

2.3) Consumer Offsets

  • Kafka stores the offsets at which a consumer group has been reading
  • The offsets committed live in a Kafka topic named __consumer_offsets
  • When a consumer in a group has processed data received from Kafka, it should be committing the offsets
  • If a consumer dies, it will be able to read back from where it left off thanks to the committed consumer offsets!

Chúng ta có thể hiểu đơn giản là Kafka sẽ có cách để đánh dấu các offset đã được đọc. Bạn có thể back lại các đề nghiềm ngẫm 1 chút nhé!

2.4) Delivery Semantics for consumers

  • Consumers choose when to commit offsets.
  • There are 3 delivery semantics:
  • At most once:
    • offsets are committed as soon as the message is received.
    • If the processing goes wrong, the message will be lost (it won’t be read again).
  • Atleast once (usually preferred):
    • offsets are committed after the message is processed.
    • If the processing goes wrong, the message will be read again.
    • This can result in duplicate processing of messages. Make sure your processing is idempotent(ie. processing again the messages won’t impact your systems)
  • Exactly once:
    • Can be achieved for Kafka => Kafka workflows using Kafka Streams API
    • For Kafka => External System workflows, use an idempotent consumer.

2.5) Demo many consumers inside a Group with many partitions

Giờ mình mở 3 console group consumers và 1 producer

#### 3 console group consumers
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-first-application

#### producer
kafka-console-producer.sh  --bootstrap-server localhost:9092 --topic first-topic

mình thực hiện write 3 message.

bạn thấy là mỗi message được write vào 1 partition
mỗi consumer sẽ đọc 1 message

giờ mình cho lost 1 consumer.

khi write 3 messages – 3 partitions – 2 consumers
thì sẽ có 1 consumer nhận 2 messages và 1 consumer nhận 1 message

2.6) Resetting Offsets

Bạn nhớ khi reset phải out hết tất cả các consumer nhé
Error: Assignments can only be reset if the group ‘my-first-application’ is inactive, but the current state is Stable.

2.6.1) to-earliest

Cái này là reset toàn bộ các offsets

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-first-application
LAG 0 0 0 thì group my-first-application ko có offset nào chưa được read

Vậy thì ta có thể reset offset được hem nhi?

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-first-application --reset-offsets --to-earliest --execute --topic first-topic
giờ kiểm tra thì LAG trở lại thành 6 6 6

giờ start consumer read data of topic

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first-topic --group my-first-application

2.6.2) –shift-by

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-first-application --reset-offsets --shift-by 2 --execute --topic first-topic

Chỗ này warning vì lag đang là 0 0 0 nên ko thể set –shift-by 2
shift-by > 0 thì kiểu trong topic của bạn đang có vài offset chưa được read và bạn cũng muốn commited các offset đó luôn, không cần đọc các offset này.

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-first-application --reset-offsets --shift-by -2 --execute --topic first-topic

shift-by < 0 thì là bạn muốn đọc lại 1 số offsets đã được commit

3) Ot

The CLI has many options, but here are the others that are most commonly used:

Producer with keys

kafka-console-producer --broker-list 127.0.0.1:9092 --topic first_topic --property parse.key=true --property key.separator=,
> key,value
> another key,another value

Consumer with keys

kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic first_topic --from-beginning --property print.key=true --property key.separator=,

KafkaCat (https://github.com/edenhill/kafkacat) is an open-source alternative to using the Kafka CLI, created by Magnus Edenhill.

While KafkaCat is not used in this course, if you have any interest in trying it out, I recommend reading: https://medium.com/@coderunner/debugging-with-kafkacat-df7851d21968

Happy learning!

Apache Kafka, Kafka

Post navigation

Previous Post: [web] Chuyển ảnh sang text
Next Post: [Golang] Chuẩn bị 1 project golang.

More Related Articles

[Kafka/Schema-registry] Installing Schema-registry to use for the Kafka and Kafka-connect model Apache Kafka
Wins from Effective Kafka Monitoring at Adobe: Stability, Performance, and Cost Savings Kafka
[Kafka] Install kafka and zookeeper cluster on kubernetes. Apache Kafka
[Kafka] Kafka Topics CLI Apache Kafka
[Lenses/kafka] Fix the problem “Cannot extract connector information from the configuration provided” when creating connector Kafka-connect Apache Kafka
[Kafka-connect] Single Message Transform: lesson 5 MaskField – Cover the sensitive data. Apache Kafka

Leave a Reply Cancel reply

Your email address will not be published. Required fields are marked *

Tham Gia Group DevOps nhé!
Để Nim có nhiều động lực ra nhiều bài viết.
Để nhận được những thông báo mới nhất.

Recent Posts

  • Experiences for IP Addresses Shortage on EKS Clusters March 29, 2023
  • [Talisman] Discover the sensitive information in your code. March 28, 2023
  • [Prometheus/Grafana] Install Prometheus and Grafana on ubuntu. March 27, 2023
  • [Kong Gateway] WebSocket connection failed March 26, 2023
  • [Nextcloud] Can’t download files to have a size bigger than 2Gi on NextCloud – RaspBerry March 24, 2023

Archives

  • March 2023
  • February 2023
  • January 2023
  • December 2022
  • November 2022
  • October 2022
  • September 2022
  • August 2022
  • July 2022
  • June 2022
  • May 2022
  • April 2022
  • March 2022
  • February 2022
  • January 2022
  • December 2021
  • November 2021
  • October 2021
  • September 2021
  • August 2021
  • July 2021
  • June 2021

Categories

  • BareMetal
    • NextCloud
  • CI/CD
    • ArgoCD
    • ArgoWorkflows
    • Git
      • Bitbucket
    • Harbor
    • Jenkins
    • Spinnaker
    • TeamCity
  • Coding
    • Golang
    • Jquery & JavaScript
    • Laravel
    • Python
    • Selenium
    • Terraform
      • AWS – Amazon Web Service
      • GCP – Google Cloud
  • Kubernetes & Container
    • Apache Kafka
      • Kafka
      • Kafka Connect
      • Lenses
    • Docker
    • Helm Chart
    • Isito-EnvoyFilter
    • Kong Gateway
    • Kubernetes
      • Ingress
    • Longhorn – Storage
    • MetalLB
    • Vault
    • VictoriaMetrics
  • Log & Monitor
    • DataDog
    • ELK
      • Kibana
      • Logstash
    • Grafana
    • Prometheus
  • Uncategorized
  • Admin

Copyright © 2023 NimTechnology.