There are multiple strategies to read a topic from its beginning. To explain those, we first need to understand what happens at consumer startup. On startup of a consumer, the following happens:
auto.offset.reset
configuration parameterIf you want to process a topic from its beginning, you can simple start a new consumer group (i.e., choose an unused group.id
) and set auto.offset.reset = earliest
. Because there are no committed offsets for a new group, auto offset reset will trigger and the topic will be consumed from its beginning. Pay attention, that on consumer restart, if you use the same group.id
again, it will not read the topic from beginning again, but resume where it left of. Thus, for this strategy, you will need to assign a new group.id
each time you want to read a topic from its beginning.
To avoid setting a new group.id
each time you want to read a topic from its beginning, you can disable auto commit (via enable.auto.commit = false
) before starting the consumer for the very first time (using an unused group.id
and setting auto.offset.reset = earliest
). Additionally, you should not commit any offsets manually. Because offsets are never committed using this strategy, on restart, the consumer will read the topic from its beginning again.
However, this strategy has two disadvantages:
(1) Because offsets are never committed, a failing and a stopped consumer are handled the same way on restart. For both cases, the topic will be consumed from its beginning. (2) Because offset are never committed, on rebalance newly assigned partitions will be consumer from the very beginning.
Therefore, this strategy only works for consumer groups with a single consumer and should only be used for development purpose.
If you want to be fault-tolerant and/or use multiple consumers in your Consumer Group, committing offsets is mandatory. Thus, if you want to read a topic from its beginning, you need to manipulate committed offsets at consumer startup. For this, KafkaConsumer
provides three methods seek()
, seekToBeginning()
, and seekToEnd()
. While seek()
can be used to set an arbitrary offset, the second and third method can be use to seek to the beginning or end of a partition, respectively. Thus, on failure and on consumer restart seeking would be omitted and the consumer can resume where it left of. For consumer-stop-and-restart-from-beginning, seekToBeginning()
would be called explicitly before you enter your poll()
loop. Note, that seekXXX()
can only be used after a consumer joined a group -- thus, it's required to do a "dummy-poll" before using seekXXX()
. The overall code would be something like this:
if (consumer-stop-and-restart-from-beginning) {
consumer.poll(0); // dummy poll() to join consumer group
consumer.seekToBeginning(...);
}
// now you can start your poll() loop
while (isRunning) {
for (ConsumerRecord record : consumer.poll(0)) {
// process a record
}
}