Skip to content

Commit 225a015

Browse files
authored
Merge pull request #443 from jturkel/feature/offset-retention
Commit offsets even if we don't read any new messages
2 parents c753516 + 983d7b8 commit 225a015

File tree

1 file changed

+8
-0
lines changed

1 file changed

+8
-0
lines changed

lib/kafka/consumer.rb

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,8 @@ def each_message(min_bytes: 1, max_wait_time: 1, automatically_mark_as_processed
216216

217217
# We may not have received any messages, but it's still a good idea to
218218
# commit offsets if we've processed messages in the last set of batches.
219+
# This also ensures the offsets are retained if we haven't read any messages
220+
# since the offset retention period has elapsed.
219221
@offset_manager.commit_offsets_if_necessary
220222
end
221223
end
@@ -279,6 +281,12 @@ def each_batch(min_bytes: 1, max_wait_time: 1, automatically_mark_as_processed:
279281

280282
return if !@running
281283
end
284+
285+
# We may not have received any messages, but it's still a good idea to
286+
# commit offsets if we've processed messages in the last set of batches.
287+
# This also ensures the offsets are retained if we haven't read any messages
288+
# since the offset retention period has elapsed.
289+
@offset_manager.commit_offsets_if_necessary
282290
end
283291
end
284292

0 commit comments

Comments
 (0)