The English version of quarkus.io is the official project site. Translated sites are community supported on a best-effort basis.

Kafka - When to commit?

In a previous blog post, we have looked at failure strategies provided by the Reactive Messaging Kafka connector. But, imagine it’s our lucky day, and for once it worked. We should inform Kafka that the processing succeeded. In Kafka terminology, we call this: offset commit. This post covers the different strategies to commit offsets with the Reactive Messaging Kafka connector.

Kafka Consumer Group and Offsets

Kafka organizes records (i.e. messages) around topics. Each topic has a name, and applications send records to topics and poll records from topics. So far, nothing out of the ordinary.

Topics are divided into partitions. Each partition is an ordered, immutable sequence of records. Sending a message to a topic appends it to the selected partition. Each message from a partition gets a sequential id number called offset. It uniquely identifies each message within the partition. So, with Kafka, you can identify an individual record using a <topic, partition, offset> tuple.

topics partitions

When an application consumes messages from Kafka, it uses a Kafka consumer. With this consumer, it polls batches of messages from a specific topic, for example, movies or actors. Retrieved messages belong to partitions assigned to this consumer. And that aspect is essential.

Consumers belong to a consumer group, identified with a name (A and B in the picture above). A group contains one or more consumers. In general, when you scale up your application, it creates a consumer joining the same group.

consumer groups

Each consumer group receives each record from a topic once. To achieve this, it assigns each consumer from a group to a set of partitions. For example, in the above picture, the consumer from the application A1 receives the records from the partitions 0 and 1. A2 receives the records from the partition 2. App B is the only consumer from its consumer group. So, it gets the records from all three partitions. Consequently (ignore rebalance or other subtilities for now), each record from a topic is only received once per consumer group, by a specific consumer from that group.

To orchestrate each consumer group’s progress, each consumer periodically informs the broker of its current position - the last processed offset. It commits the offset, indicating that all the previous records from that partition have been processed. So, if a consumer stops and comes back later, it restarts from the last committed position (if assigned to that partition again). Note that this behavior is configurable.

What’s important to notice is the periodic aspect of the commit. Offset commit is expensive, and to enhance performance, we should not commit the offset after each processed record. In this regard, Kafka behaves differently from traditional messaging solutions, such as JMS, which acknowledges each message individually. Another important characteristic is the positional aspect of the commit. You commit the position indicating that all the records located before that position are processed. But is it really the case?

The Kafka default behavior

The Apache Kafka consumer uses an auto-commit approach by default. Applications using such a consumer are structured around a polling loop:

while(true) {
    ConsumerRecords records = consumer.poll(Duration.ofMillis(10000));
    processRetrievedRecords(records);
}

Such a program polls a batch of records, processes them, and then polls the next set. While calling the poll method, the consumer periodically commits the last offset of the previous batches transparently.

Quite nice, right? If the application fails to process a message, it throws an exception, which either interrupts the while loop or is handled gracefully (within the processRetrievedRecords method). In the first case, it means that it won’t commit anymore (as it happens in the poll method, not called anymore). If the application restarts, it resumes from the last committed offset (or apply the auto.offset.reset strategy, defaulting to latest, if there are no offsets for this group yet). It may re-process a set of messages (it’s the application’s responsibility to handle duplicates), but at least nothing is lost.

So, is there anything wrong with this? Looks wonderful…​ until you add a pinch of asynchrony.

What if the message’s processing is asynchronous

If the message processing is asynchronous (offloaded to another thread, use non-blocking I/O…​), failures may not interrupt the while loop from above. Failure happens asynchronously, outside the polling thread. If the poll method gets called again despite a failed processing, and auto-commit is still enabled, we may commit offsets while something wrong happened. If some processing of previously retrieved records is not completed yet, while this auto commit happens, it may consider the record as processed correctly, but the outcome is unknown at that point.

So to handle these case, we can disable the auto-commit and switch to manual commit. In this case, it’s the application’s responsibility to commit the offsets regularly. So, the application needs to track the polled records, their processing, failures, and periodically commits the offsets. It might not look too tricky, but actually, it can become quite challenging. Again, in asynchronous scenarios, you may complete the processing of messages in various orders. For example, if you call a remote service for each record, the responses may not come in the same orders as the records. You need to track messages individually and only commit the offsets if all the previous messages are processed successfully. Without this, you may commit offsets while there is processing from previous records still in progress or even failed processing.

What can we do about this?

Kafka connector commit strategies

When using Reactive Messaging and the Kafka connector, you entered an asynchronous world.

Message processing may not happen synchronously and sequentially. When a Reactive Messaging Message processing completes, it acknowledges the message. In the case of processing failures, it sends a negative acknowledgment. The Kafka connector receives these acknowledgments and can decide what needs to be done, basically: to commit or not to commit.

You can choose among three strategies:

  • throttled (default starting Quarkus 1.10)

  • latest (default before Quarkus 1.10)

  • ignore (default if enabled.auto.commit=true is set)

This is configured using the commit-strategy attribute:

mp.messaging.incoming.my-channel.connector=smallrye-kafka
mp.messaging.incoming.my-channel.commit-strategy=throttled

The throttled strategy

The throttled strategy can be seen as an asynchronous variant of the default "auto-commit" behavior described above. When enabled, the connector tracks each received message and monitors their acknowledgment. When the connector finds out that all messages before a position are processed successfully, it commits that position. This commit happens periodically to avoid committing too often.

This strategy provides very good throughput and can handle asynchronous processing. To enable this strategy configures the channel with:

mp.messaging.incoming.my-channel.connector=smallrye-kafka
mp.messaging.incoming.my-channel.commit-strategy=throttled

There is one detail to mention. If an old message is neither acked nor nacked, the strategy cannot commit the position anymore. It will enqueue messages forever, waiting for that missing ack to happen. It can lead to out of memory, as the connector would never be able to commit a position and to clear the queue. Fortunately, the strategy detects this situation and reports a failure to the connector, marking the application unhealthy. The throttled.unprocessed-record-max-age.ms attribute configures the deadline for each message to be acked or nacked before being considered as a poison pill (Default is 1 minute).

The Ignore strategy

The connector uses this strategy by default if you explicitly enabled Kafka’s auto-commit (with the enable.auto.commit attribute set to true). In this case, the connector ignores acknowledgment and won’t commit the offsets. The Kafka consumer commits the offset periodically when polling batches, as described above. This strategy works well if the message processing is synchronous and failures handled gracefully.

You can enable this strategy by configured enabled-auto-commit to true:

mp.messaging.incoming.my-channel.connector=smallrye-kafka
mp.messaging.incoming.my-channel.enable.auto.commit=true
Be aware that starting Quarkus 1.9, auto commit is disabled by default. So you need to explicitly enable it.

If you don’t enable auto-commit, using this strategy is still possible but will never commit the offsets. In other words, you would restart from the oldest stored records every time. While there are use cases for this, double-check that’s what you want. In this case, enable this strategy with:

mp.messaging.incoming.my-channel.connector=smallrye-kafka
mp.messaging.incoming.my-channel.commit-strategy=ignore

The latest strategy

This strategy commits the offset every time a message is acknowledged. This strategy tends to commit often, and so decrease the throughput. However, it also reduces the risk of duplicates if the messages are processed synchronously.

Enable this strategy with:

mp.messaging.incoming.my-channel.connector=smallrye-kafka
mp.messaging.incoming.my-channel.commit-strategy=latest

解决方案

In general, use the throttled strategy. It provides high-throughput and handles the asynchronous use cases. This strategy is becoming the default strategy in Quarkus 1.10. You can also switch to the ignore strategy if the Kafka auto-commit is acceptable for you, or if you want to skip offset commit altogether.

That concludes this blog post. The next one will discuss how to receive and produce Cloud Events using the Kafka connector.