5.4 Consumer Offset Management
Master offset management for reliable Kafka consumers. Covers auto vs manual commits, consumer groups, partition assignment, and offset reset strategies with code examples.
Video Coming Soon
Consumer Offset Management
Overview
Offsets are critical for tracking consumer progress in Kafka. Proper offset management prevents data loss and duplicate processing. This lesson covers how consumers handle offsets, especially across multi-partition topics.
What Are Offsets?
Definition
- Each message has unique offset marking its position in partition
- Consumers read messages sequentially using offsets
- Offsets ensure consumers don't lose track of processed messages
- Position tracking mechanism for message consumption
How Offsets Work
- Messages in each partition numbered sequentially (0, 1, 2, ...)
- Consumer tracks last processed offset per partition
- Offset increments as consumer progresses through messages
- Separate offset maintained for each partition
Offset Storage
__consumer_offsets Topic
- Internal Kafka topic storing committed offsets
- Records latest committed offset for each consumer group
- Automatically maintained by Kafka
- Ensures consumer can resume from last position after restart
Partition Distribution Example
Payment topic with 2 partitions:
- Partition 0: M1, M3, M5, M7, M9, M11, M13, M15, M17, M19
- Partition 1: M2, M4, M6, M8, M10, M12, M14, M16, M18, M20
Messages distributed by key to ensure same key goes to same partition.
Auto Offset Configuration
auto.offset.reset Setting
Controls where new consumer starts reading when no previous offset exists.
earliest
- Consumer starts from very first message in partition
- Processes all available historical messages
- Useful for reprocessing or initial data loads
latest
- Consumer starts from most recent message
- Skips older messages
- Useful when only new data matters
Configuration Example
1props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");Single Consumer Behavior
Example: One Consumer, Two Partitions
Consumer with auto.offset.reset=earliest:
1consumer.subscribe(Collections.singletonList("payment"));
2
3while (true) {
4 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
5 for (ConsumerRecord<String, String> record : records) {
6 System.out.printf("Offset: %d, Partition: %d, Value: %s%n",
7 record.offset(), record.partition(), record.value());
8 }
9 consumer.commitSync();
10}Processing Order
Single consumer assigned both partitions:
- Reads from partition 0 and partition 1
- Order across partitions is non-deterministic
- Messages within single partition always ordered
- May process M1, M2, M3, M4... or M1, M3, M5, M2, M4, M6...
Why Non-Deterministic Order?
Depends on:
- Which partition data arrives first
- Network conditions
- Kafka's internal batch fetching
- Consumer polling behavior
Offset Commits
After processing all messages:
- Partition 0: committed offset = 10 (next message after M19)
- Partition 1: committed offset = 10 (next message after M20)
- New messages processed from offset 10 onwards
Multiple Consumers in Same Group
Two Consumers Example
Starting second consumer triggers rebalance:
Before Rebalance:
- Consumer 1: reads partition 0 and partition 1
After Rebalance:
- Consumer 1: assigned partition 0 only
- Consumer 2: assigned partition 1 only
Output Distribution
Consumer 1 (Window 1):
- Reads: M1, M3, M5, M7, M9, M11, M13, M15, M17, M19
- Commits offset 10 for partition 0
Consumer 2 (Window 2):
- Reads: M2, M4, M6, M8, M10, M12, M14, M16, M18, M20
- Commits offset 10 for partition 1
Benefits
- Load distributed across consumers
- Parallel processing improves throughput
- Each consumer handles subset of data
More Consumers Than Partitions
Three Consumers, Two Partitions
Kafka Assignment Rule:
- Each partition assigned to only one consumer
- Single consumer can handle multiple partitions
Result:
- Consumer 1: assigned partition 0
- Consumer 2: assigned partition 1
- Consumer 3: idle (no partition assigned)
Idle Consumer Purpose
Not useless - acts as standby:
- Ready to take over if active consumer fails
- Automatically assigned partition during rebalance
- Improves system resilience
- No manual intervention needed
Consumer Group Leadership
How Leadership Works
- First Consumer Joins
- Automatically becomes group leader
- Leadership elected by consumers themselves
- No broker involvement in election
- Leader Responsibilities
- Assigns partitions to all group members
- Coordinates rebalancing
- Manages partition distribution
- Additional Consumers Join
- Leader reassigns partitions
- Distributes load among members
- Excess consumers remain idle
- Leader Failure
- New leader automatically elected
- Group reorganizes seamlessly
- Partitions reassigned
Key Points
- Leadership not permanent role
- Can shift between consumers
- Self-managing without broker control
Manual Offset Control
Auto-Commit vs Manual
Auto-Commit (enable.auto.commit=true)
Pros:
- Convenient - Kafka handles commits automatically
- No code changes needed
Cons:
- Risk of duplicates if consumer crashes before commit
- Less control over commit timing
- May commit before actual processing completes
Manual Commit (enable.auto.commit=false)
Pros:
- Full control over commit timing
- Commit only after successful processing
- Prevents data loss scenarios
Cons:
- More code complexity
- Must handle commits explicitly
Configuration
1props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");Commit Methods
Synchronous Commit
1consumer.commitSync();Characteristics:
- Blocks until Kafka confirms commit
- Safer - ensures commit completes
- Slower - waits for acknowledgment
- Recommended for critical data
Asynchronous Commit
1consumer.commitAsync();Characteristics:
- Sends commit request and continues immediately
- Faster - no blocking
- Risk of lost commits on failure
- Good for high-throughput scenarios
Best Practice: Combined Approach
1while (true) {
2 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
3
4 for (ConsumerRecord<String, String> record : records) {
5 processRecord(record);
6 }
7
8 // Fast async commits during normal operation
9 consumer.commitAsync();
10}
11
12// Final sync commit on shutdown
13Runtime.getRuntime().addShutdownHook(new Thread(() -> {
14 consumer.commitSync();
15 consumer.close();
16}));External Offset Storage
Why Use External Storage?
Instead of __consumer_offsets:
- Store offsets in PostgreSQL, Redis, or custom database
- More visibility into offset history
- Cross-system coordination
- Integration with external systems
- Custom exactly-once guarantees
When to Use
Use __consumer_offsets (default) when:
- Standard Kafka guarantees sufficient
- Simple architecture preferred
- No special tracking requirements
Use external storage when:
- Need detailed audit trail
- Multiple systems share offsets
- Custom processing guarantees required
- Fine-grained control needed
Resetting Offsets
Reset to Beginning
1consumer.seekToBeginning(consumer.assignment());Use Cases:
- Reprocess all messages due to bug fix
- Recover missed transactions
- Data migration or backfill
Cautions:
- May process large backlog
- Can overwhelm downstream systems
- Old messages may have expired (retention policy)
Reset to Specific Offset
1TopicPartition partition = new TopicPartition("payment", 0);
2consumer.seek(partition, 15);Use Cases:
- Skip bad data between offsets 10-14
- Resume from known good offset
- Manual offset control after failure
Important:
- Only changes read position
- Doesn't commit offset automatically
- Consumer reverts to committed offset on restart unless committed
Reset Based on Timestamp
1// Go back 1 hour
2long timestamp = System.currentTimeMillis() - (60 * 60 * 1000);
3
4Map<TopicPartition, Long> timestampMap = new HashMap<>();
5timestampMap.put(new TopicPartition("payment", 0), timestamp);
6
7Map<TopicPartition, OffsetAndTimestamp> offsetMap =
8 consumer.offsetsForTimes(timestampMap);
9
10for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : offsetMap.entrySet()) {
11 consumer.seek(entry.getKey(), entry.getValue().offset());
12}Use Cases:
- Replay events after system failure
- Reprocess from specific time point
- Time-based data recovery
Best Practices
Reliability
- Use manual commits for critical data
- Commit after successful processing
- Handle commit failures appropriately
Performance
- Use
commitAsync()for normal operation - Reserve
commitSync()for shutdown - Batch commits when possible
Recovery
- Understand implications of offset resets
- Test recovery procedures
- Monitor consumer lag
Monitoring
- Track committed vs current offsets
- Alert on consumer lag
- Monitor rebalance frequency
Summary
Key Concepts:
- Offsets track progress - Unique position per partition
- Auto vs manual commits - Trade-off between convenience and reliability
- Multiple consumers - Load distribution through partition assignment
- Offset resets - Replay data from beginning, specific offset, or timestamp
- Manual control -
commitSync()for safety,commitAsync()for speedMastering offset management ensures reliable, efficient Kafka consumers that handle failures gracefully and maintain data consistency.