Giờ chúng ta tìm hiểu về việc read và write vào 1 topic trên kafka bằng CLI
1) Overview
1.1) Consumers
- Consumers read data from a topic (identified by name)
- Consumers know which broker to read from
- In case of broker failures, consumers know how to recover
- Data is read in order within each partitions
1.3) Consumer Groups
- Consumers read data in consumer groups
- Each consumer within a group reads from exclusive partitions
- If you have more consumers than partitions, some consumers will be inactive
What if too many consumers?
- If you have more consumers than partitions, some consumers will be inactive
giờ mình thực hiện mở 2 cửa sổ:
– 1 consumer để lắng nghe topic
– 1 producer để write data into topic
2) Practice
2.1) Consumer CLI
###consumer
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first-topic
###producer
kafka-console-producer.sh --bootstrap-server localhost:9092 --topic first-topic
2.2) Consumer Groups CLI
kafka-consumer-groups.sh
chúng ta có thể list ra các consumer groups
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
Theo mình là khi chúng ta tạo 1 consumer để lắng nghe 1 topic thì nó tự động tao 1 comsumer-group-<random>
Nếu các bạn có thời gian thì kiểm chứng thử giúp mình nhé.
### Consumer Group
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first-topic --group my-first-application
###producer
kafka-console-producer.sh --bootstrap-server localhost:9092 --topic first-topic
mình sẽ thực hiện kiểm tra ngay consumer group
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-first-application
khi này mình sẽ write topic sau đó thì describe group sem ntn?
chúng ta thấy có điều thú vị. Mỗi dòng mà chúng ta nhắn qua đó là 1 offset nhé!
Vậy chúng ta có 3 offset được sent vào topic và mỗi offset sẽ được lưu trữ(archive) vào 1 partition
bạn sẽ thấy là CURRENT-OFFSET 3 3 3 và LOG-END-OFFSET 4 4 4, LAG 1 1 1.
mình mở lại group my-first-application
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first-topic --group my-first-application
giờ mình sẽ tạo và lắng nghe trên 1 consumer-group mới.
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first-topic --group my-second-application
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first-topic --group my-second-application --from-beginning
mình đã sử dụng flag là –from-beginning nhưng cũng chả thấy j?
nhưng nếu mình chạy group mới và thêm flag là –from-beginning
nó hiện tất cả các messages từ đầu trong topic đó
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first-topic --group my-third-application --from-beginning
2.3) Consumer Offsets
- Kafka stores the offsets at which a consumer group has been reading
- The offsets committed live in a Kafka topic named __consumer_offsets
- When a consumer in a group has processed data received from Kafka, it should be committing the offsets
- If a consumer dies, it will be able to read back from where it left off thanks to the committed consumer offsets!
Chúng ta có thể hiểu đơn giản là Kafka sẽ có cách để đánh dấu các offset đã được đọc. Bạn có thể back lại các đề nghiềm ngẫm 1 chút nhé!
2.4) Delivery Semantics for consumers
- Consumers choose when to commit offsets.
- There are 3 delivery semantics:
- At most once:
- offsets are committed as soon as the message is received.
- If the processing goes wrong, the message will be lost (it won’t be read again).
- Atleast once (usually preferred):
- offsets are committed after the message is processed.
- If the processing goes wrong, the message will be read again.
- This can result in duplicate processing of messages. Make sure your processing is idempotent(ie. processing again the messages won’t impact your systems)
- Exactly once:
- Can be achieved for Kafka => Kafka workflows using Kafka Streams API
- For Kafka => External System workflows, use an idempotent consumer.
2.5) Demo many consumers inside a Group with many partitions
Giờ mình mở 3 console group consumers và 1 producer
#### 3 console group consumers
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-first-application
#### producer
kafka-console-producer.sh --bootstrap-server localhost:9092 --topic first-topic
mình thực hiện write 3 message.
giờ mình cho lost 1 consumer.
khi write 3 messages – 3 partitions – 2 consumers
thì sẽ có 1 consumer nhận 2 messages và 1 consumer nhận 1 message
2.6) Resetting Offsets
Bạn nhớ khi reset phải out hết tất cả các consumer nhé
Error: Assignments can only be reset if the group ‘my-first-application’ is inactive, but the current state is Stable.
2.6.1) to-earliest
Cái này là reset toàn bộ các offsets
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-first-application
Vậy thì ta có thể reset offset được hem nhi?
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-first-application --reset-offsets --to-earliest --execute --topic first-topic
giờ start consumer read data of topic
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first-topic --group my-first-application
2.6.2) –shift-by
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-first-application --reset-offsets --shift-by 2 --execute --topic first-topic
Chỗ này warning vì lag đang là 0 0 0 nên ko thể set –shift-by 2
shift-by > 0 thì kiểu trong topic của bạn đang có vài offset chưa được read và bạn cũng muốn commited các offset đó luôn, không cần đọc các offset này.
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-first-application --reset-offsets --shift-by -2 --execute --topic first-topic
shift-by < 0 thì là bạn muốn đọc lại 1 số offsets đã được commit
3) Ot
The CLI has many options, but here are the others that are most commonly used:
Producer with keys
kafka-console-producer --broker-list 127.0.0.1:9092 --topic first_topic --property parse.key=true --property key.separator=,
> key,value
> another key,another value
Consumer with keys
kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic first_topic --from-beginning --property print.key=true --property key.separator=,
KafkaCat (https://github.com/edenhill/kafkacat) is an open-source alternative to using the Kafka CLI, created by Magnus Edenhill.
While KafkaCat is not used in this course, if you have any interest in trying it out, I recommend reading: https://medium.com/@coderunner/debugging-with-kafkacat-df7851d21968
4) Consumer rebalancing on Kafka.
When a Kafka consumer group goes through a rebalance, it is normal for the consumers to pause processing for a short period of time while the rebalance is happening. This is because during a rebalance, the Kafka broker is reassigning the partitions to the consumers in the group, and the consumers need to stop processing their existing partitions and wait for the broker to finish the reassignment process.
While the consumers are waiting for the rebalance to complete, they are in a “rebalance in progress” state, and they cannot consume any messages from the Kafka topics. However, once the rebalance is complete, the consumers are assigned new partitions to process, and they can resume processing messages.
During the rebalance process, the Kafka consumer client may appear to hang, but this is usually temporary and is part of the normal behavior of the consumer group. The duration of the rebalance process depends on various factors such as the number of partitions, the number of consumers, and the configuration of the Kafka cluster. In general, the rebalance process should complete within a few seconds to a few minutes, depending on the complexity of the rebalance.
It’s important to note that while a rebalance is happening, the Kafka cluster is still operational, and producers can continue to send messages to Kafka topics. Therefore, it’s important to design the Kafka consumer group in such a way that rebalances do not cause significant downtime or processing delays.
Happy learning!