apache-kafka How to Commit Offsets


Example

KafkaConsumers can commit offsets automatically in the background (configuration parameter enable.auto.commit = true) what is the default setting. Those auto commits are done within poll() (which is typically called in a loop). How frequently offsets should be committed, can be configured via auto.commit.interval.ms. Because, auto commits are embedded in poll() and poll() is called by the user code, this parameter defines a lower bound for the inter-commit-interval.

As an alternative to auto commit, offsets can also be managed manually. For this, auto commit should be disabled (enable.auto.commit = false). For manual committing KafkaConsumers offers two methods, namely commitSync() and commitAsync(). As the name indicates, commitSync() is a blocking call, that does return after offsets got committed successfully, while commitAsync() returns immediately. If you want to know if a commit was successful or not, you can provide a call back handler (OffsetCommitCallback) a method parameter. Pay attention, that in both commit calls, the consumer commits the offsets of the latest poll() call. For example. let us assume a single partition topic with a single consumer and the last call to poll() return messages with offsets 4,5,6. On commit, offset 6 will be committed because this is the latest offset tracked by the consumer client. At the same time, both commitSync() and commitAsync() allow for more control what offset you want to commit: if you use the corresponding overloads that allow you to specify a Map<TopicPartition, OffsetAndMetadata> the consumer will commit only the specified offsets (ie, the map can contain any subset of assigned partitions, and the specified offset can have any value).

Semantics of committed offsets

A committed offset indicates, that all messages up to this offset got already processed. Thus, as offsets are consecutive numbers, committing offset X implicitly commits all offsets smaller than X. Therefore, it is not necessary to commit each offset individually, and committing multiple offsets at once, happens but just committing the largest offset.

Pay attention, that by design it is also possible to commit a smaller offset than the last committed offset. This can be done, if messages should be read a second time.

Processing guarantees

Using auto commit provides at-least-once processing semantics. The underlying assumption is, that poll() is only called after all previously delivered messages got processed successfully. This ensures, that no message get lost because a commit happens after processing. If a consumer fails before a commit, all messages after the last commit are received from Kafka and processed again. However, this retry might result in duplicates, as some message from the last poll() call might have been processed but the failure happened right before the auto commit call.

If at-most-once processing semantics are required, auto commit must be disabled and a manual commitSync() directly after poll() should be done. Afterward, messages get processed. This ensure, that messages are committed before there are processed and thus never read a second time. Of course, some message might get lost in case of failure.