Giờ chúng ta tìm hiểu về việc read và write vào 1 topic trên kafka bằng CLI
1) Overview
1.1) Consumers
- Consumers read data from a topic (identified by name)
- Consumers know which broker to read from
- In case of broker failures, consumers know how to recover
- Data is read in order within each partitions

1.3) Consumer Groups
- Consumers read data in consumer groups
- Each consumer within a group reads from exclusive partitions
- If you have more consumers than partitions, some consumers will be inactive

What if too many consumers?
- If you have more consumers than partitions, some consumers will be inactive

giờ mình thực hiện mở 2 cửa sổ:
– 1 consumer để lắng nghe topic
– 1 producer để write data into topic
2) Practice
2.1) Consumer CLI
###consumer
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first-topic
###producer
kafka-console-producer.sh --bootstrap-server localhost:9092 --topic first-topic

bạn sẽ thấy như kiểu nhắn tin messager
2.2) Consumer Groups CLI
kafka-consumer-groups.sh

chúng ta có thể list ra các consumer groups
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

Theo mình là khi chúng ta tạo 1 consumer để lắng nghe 1 topic thì nó tự động tao 1 comsumer-group-<random>
Nếu các bạn có thời gian thì kiểm chứng thử giúp mình nhé.
### Consumer Group
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first-topic --group my-first-application
###producer
kafka-console-producer.sh --bootstrap-server localhost:9092 --topic first-topic

mình sẽ thực hiện kiểm tra ngay consumer group
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-first-application



khi này mình sẽ write topic sau đó thì describe group sem ntn?


chúng ta thấy có điều thú vị. Mỗi dòng mà chúng ta nhắn qua đó là 1 offset nhé!
Vậy chúng ta có 3 offset được sent vào topic và mỗi offset sẽ được lưu trữ(archive) vào 1 partition
bạn sẽ thấy là CURRENT-OFFSET 3 3 3 và LOG-END-OFFSET 4 4 4, LAG 1 1 1.
mình mở lại group my-first-application
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first-topic --group my-first-application


CURRENT-OFFSET, LOG-END-OFFSET, LAG
giờ mình sẽ tạo và lắng nghe trên 1 consumer-group mới.
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first-topic --group my-second-application

vì nó đã được các comsumer của group first nhận rồi thì phải.
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first-topic --group my-second-application --from-beginning
mình đã sử dụng flag là –from-beginning nhưng cũng chả thấy j?

nhưng nếu mình chạy group mới và thêm flag là –from-beginning
nó hiện tất cả các messages từ đầu trong topic đó
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first-topic --group my-third-application --from-beginning

2.3) Consumer Offsets
- Kafka stores the offsets at which a consumer group has been reading
- The offsets committed live in a Kafka topic named __consumer_offsets
- When a consumer in a group has processed data received from Kafka, it should be committing the offsets
- If a consumer dies, it will be able to read back from where it left off thanks to the committed consumer offsets!
Chúng ta có thể hiểu đơn giản là Kafka sẽ có cách để đánh dấu các offset đã được đọc. Bạn có thể back lại các đề nghiềm ngẫm 1 chút nhé!

2.4) Delivery Semantics for consumers
- Consumers choose when to commit offsets.
- There are 3 delivery semantics:
- At most once:
- offsets are committed as soon as the message is received.
- If the processing goes wrong, the message will be lost (it won’t be read again).
- Atleast once (usually preferred):
- offsets are committed after the message is processed.
- If the processing goes wrong, the message will be read again.
- This can result in duplicate processing of messages. Make sure your processing is idempotent(ie. processing again the messages won’t impact your systems)
- Exactly once:
- Can be achieved for Kafka => Kafka workflows using Kafka Streams API
- For Kafka => External System workflows, use an idempotent consumer.
2.5) Demo many consumers inside a Group with many partitions
Giờ mình mở 3 console group consumers và 1 producer
#### 3 console group consumers
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-first-application
#### producer
kafka-console-producer.sh --bootstrap-server localhost:9092 --topic first-topic

mình thực hiện write 3 message.

mỗi consumer sẽ đọc 1 message
giờ mình cho lost 1 consumer.

khi write 3 messages – 3 partitions – 2 consumers
thì sẽ có 1 consumer nhận 2 messages và 1 consumer nhận 1 message
2.6) Resetting Offsets
Bạn nhớ khi reset phải out hết tất cả các consumer nhé
Error: Assignments can only be reset if the group ‘my-first-application’ is inactive, but the current state is Stable.
2.6.1) to-earliest
Cái này là reset toàn bộ các offsets
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-first-application

Vậy thì ta có thể reset offset được hem nhi?
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-first-application --reset-offsets --to-earliest --execute --topic first-topic


giờ start consumer read data of topic
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first-topic --group my-first-application


2.6.2) –shift-by
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-first-application --reset-offsets --shift-by 2 --execute --topic first-topic


Chỗ này warning vì lag đang là 0 0 0 nên ko thể set –shift-by 2
shift-by > 0 thì kiểu trong topic của bạn đang có vài offset chưa được read và bạn cũng muốn commited các offset đó luôn, không cần đọc các offset này.
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-first-application --reset-offsets --shift-by -2 --execute --topic first-topic


shift-by < 0 thì là bạn muốn đọc lại 1 số offsets đã được commit
3) Ot
The CLI has many options, but here are the others that are most commonly used:
Producer with keys
kafka-console-producer --broker-list 127.0.0.1:9092 --topic first_topic --property parse.key=true --property key.separator=,
> key,value
> another key,another value
Consumer with keys
kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic first_topic --from-beginning --property print.key=true --property key.separator=,
KafkaCat (https://github.com/edenhill/kafkacat) is an open-source alternative to using the Kafka CLI, created by Magnus Edenhill.
While KafkaCat is not used in this course, if you have any interest in trying it out, I recommend reading: https://medium.com/@coderunner/debugging-with-kafkacat-df7851d21968
Happy learning!