Co-author: Eugen Nicolae Cojan
As the father of quality management W. Edwards Deming, put it:
“Without data, you are just another person with an opinion.”
This post combines real-world issues faced by our team and our approach to addressing them. It also shares the key metrics to start monitoring Kafka client implementations effectively.
1. What could go wrong with the Producer while scaling the Consumer?
One day we increased the number of partitions for a topic used by our client by 2.5x. It is a high-traffic topic, with a 3:1 consume-to-produce ratio, replication factor 3, peaking at 250K msg/s, 400 MB/s IN, and 1.2 GB/s OUT at that time. Without changes in traffic patterns and settings of upstream service, we noticed that topic started growing in size, and the performance of upstream producers degraded on batch size and compression rate. Batches were triggered before
linger.ms time expiration, but still at the lower size compared with
The increased number of partitions resulted in a larger number of batches for each topic-partition pair kept in RecordAccumulator memory(see producer internals), which were not filled accordingly. To deal with that, we changed
partitioner.class in the producer settings to the DefaultPartitioner, which tries to batch records with null keys into the same partition and moves to a new partition once the current one is full.
Batch size increased, resulting in better compression and less incoming network traffic. Overall savings: ~200 MB/s of incoming traffic, ~600 MB/s OUT, ~400 MB/s on inter-broker replication(topic has replication factor 3) and ~78T storage.
2. Strange upstream traffic
While checking metrics for one of the replication services, we observed that incoming traffic was significantly higher than outgoing traffic, a 2.6–3x difference. We plot the consumer incoming byte rate and producer outgoing byte rate on the same panel (listed below); that’s why we spot this unusual difference.
Since replication services are only moving traffic cross-region, this raised concerns about possible misconfigurations in upstream services.
We contacted upstream service, showed our metrics, and asked for a review of their producer settings. They missed a few settings on batching and compression, which were updated quickly.
Overall savings: ~130 MB/s of incoming and outgoing traffic, ~260 MB/s on inter-broker replication and ~41T storage. We also scaled down our Kafka cluster on edge by 50%, saving CPU and Memory costs.
3. My service not consuming, help!
One of our teams came to us asking for help troubleshooting why their service stopped consumption from the Kafka cluster. They didn’t have any errors in logs, and we didn’t observe any visible changes in traffic patterns on the topic.
First, we asked the team to start collecting Kafka Consumer and Producer metrics and expose them to their Prometheus account. The team went with the approach of implementing a pluggable metrics reporter, as described below.
Using the consumer coordinator
join-rate metric, we quickly identified that the consumer group was constantly rebalancing, resulting in stopped consumption. Next, we looked at
last-poll-seconds-ago and saw that some of the members of the consumer group have a poll interval greater than their
heartbeat.interval.ms, resulting in client-side failure detection and consumer leaving the group. This led us to investigations of downstream processing; the team noticed a significant increase in latencies, triggering backpressure and ultimately affecting the time between calling consumer
At this point, we focused on minimizing the effect of downstream latencies and switched the consumer group assignor to
CooperativeSticky based implementation. We also tuned
session.timeout.ms to accommodate changes in downstream processing times.
We stabilized the consumer group, which allowed continuous processing of the message without long interruptions. Meanwhile, the team got time to troubleshoot properly and fix downstream processing. Kafka Consumer and Producer metrics have been added to their dashboards and on-call run books.
Metrics represent raw measurements collected from Kafka client internals, providing useful insights into the operational aspects of the system. There are a couple of ways to collect Kafka client metrics:
- scraping JMX MBeans
- using pluggable metrics reporter
- using metrics() method on producer or consumer.
Kafka clients report metrics via JMX reporter, registered as MBeans. Producer MBeans have the prefix “kafka.producer:type=producer-metrics,” and consumer “kafka.consumer:type=consumer-metrics,”. If your team uses Prometheus for monitoring, there is a JMX Exporter project that can scrape and expose these metrics.
Pluggable Metrics Reporter
User can specify an implementation of org.apache.kafka.common.metrics.MetricsReporter interface via metric.reporters configuration parameter. Kafka client will accept multiple implementations, separated via comma. The interface requires the implementation of four methods:
// called when the reporter is registered for the first time
init(List<KafkaMetric> metrics)// called when a metric is added on updated
metricChange(KafkaMetric metric)// called when metric is removed
metricRemoval(KafkaMetric metric)// called when metrics are closed
The reporter can access Producer and Consumer configuration parameters by overriding method
configure(Map<String, ?> configs). This could be useful for passing additional parameters, like excluded or included metric names etc.
Another useful method to override is
contextChange(MetricContext metricsContext). The MetricContext object can be populated with additional labels via config parameters in the form of
metrics.context.<key>=<value> This method will be called before
Using metrics() method
Producer and Consumer expose a method for querying metrics, with the following signature:
Map<MetricName, ? extends Metric> metrics() . Micrometer, for example, uses this approach. If you are already using Spring Boot— Micrometer is included via actuator dependency. There is a dependency for Quarkus as well, an example can be seen in the following guide. Both frameworks try to detect Kafka Producer or Consumer instantiations and bind metrics accordingly.
Kafka Consumer also offers many metrics, covering poll() loop, Group Coordinator, Fetcher, Network etc. We recommend starting with some of the key metrics below.