CentralMesh.io

Kafka Fundamentals for Beginners
AdSense Banner (728x90)

5.4 Consumer Offset Management

Master offset management for reliable Kafka consumers. Covers auto vs manual commits, consumer groups, partition assignment, and offset reset strategies with code examples.

Video Coming Soon

Consumer Offset Management

Overview

Offsets are critical for tracking consumer progress in Kafka. Proper offset management prevents data loss and duplicate processing. This lesson covers how consumers handle offsets, especially across multi-partition topics.

What Are Offsets?

Definition

  • Each message has unique offset marking its position in partition
  • Consumers read messages sequentially using offsets
  • Offsets ensure consumers don't lose track of processed messages
  • Position tracking mechanism for message consumption

How Offsets Work

  • Messages in each partition numbered sequentially (0, 1, 2, ...)
  • Consumer tracks last processed offset per partition
  • Offset increments as consumer progresses through messages
  • Separate offset maintained for each partition

Offset Storage

__consumer_offsets Topic

  • Internal Kafka topic storing committed offsets
  • Records latest committed offset for each consumer group
  • Automatically maintained by Kafka
  • Ensures consumer can resume from last position after restart

Partition Distribution Example

Payment topic with 2 partitions:

  • Partition 0: M1, M3, M5, M7, M9, M11, M13, M15, M17, M19
  • Partition 1: M2, M4, M6, M8, M10, M12, M14, M16, M18, M20

Messages distributed by key to ensure same key goes to same partition.

Auto Offset Configuration

auto.offset.reset Setting

Controls where new consumer starts reading when no previous offset exists.

earliest

  • Consumer starts from very first message in partition
  • Processes all available historical messages
  • Useful for reprocessing or initial data loads

latest

  • Consumer starts from most recent message
  • Skips older messages
  • Useful when only new data matters

Configuration Example

java
1props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

Single Consumer Behavior

Example: One Consumer, Two Partitions

Consumer with auto.offset.reset=earliest:

java
1consumer.subscribe(Collections.singletonList("payment"));
2
3while (true) {
4    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
5    for (ConsumerRecord<String, String> record : records) {
6        System.out.printf("Offset: %d, Partition: %d, Value: %s%n",
7            record.offset(), record.partition(), record.value());
8    }
9    consumer.commitSync();
10}

Processing Order

Single consumer assigned both partitions:

  • Reads from partition 0 and partition 1
  • Order across partitions is non-deterministic
  • Messages within single partition always ordered
  • May process M1, M2, M3, M4... or M1, M3, M5, M2, M4, M6...

Why Non-Deterministic Order?

Depends on:

  • Which partition data arrives first
  • Network conditions
  • Kafka's internal batch fetching
  • Consumer polling behavior

Offset Commits

After processing all messages:

  • Partition 0: committed offset = 10 (next message after M19)
  • Partition 1: committed offset = 10 (next message after M20)
  • New messages processed from offset 10 onwards

Multiple Consumers in Same Group

Two Consumers Example

Starting second consumer triggers rebalance:

Before Rebalance:

  • Consumer 1: reads partition 0 and partition 1

After Rebalance:

  • Consumer 1: assigned partition 0 only
  • Consumer 2: assigned partition 1 only

Output Distribution

Consumer 1 (Window 1):

  • Reads: M1, M3, M5, M7, M9, M11, M13, M15, M17, M19
  • Commits offset 10 for partition 0

Consumer 2 (Window 2):

  • Reads: M2, M4, M6, M8, M10, M12, M14, M16, M18, M20
  • Commits offset 10 for partition 1

Benefits

  • Load distributed across consumers
  • Parallel processing improves throughput
  • Each consumer handles subset of data

More Consumers Than Partitions

Three Consumers, Two Partitions

Kafka Assignment Rule:

  • Each partition assigned to only one consumer
  • Single consumer can handle multiple partitions

Result:

  • Consumer 1: assigned partition 0
  • Consumer 2: assigned partition 1
  • Consumer 3: idle (no partition assigned)

Idle Consumer Purpose

Not useless - acts as standby:

  • Ready to take over if active consumer fails
  • Automatically assigned partition during rebalance
  • Improves system resilience
  • No manual intervention needed

Consumer Group Leadership

How Leadership Works

  1. First Consumer Joins

    - Automatically becomes group leader

    - Leadership elected by consumers themselves

    - No broker involvement in election

  2. Leader Responsibilities

    - Assigns partitions to all group members

    - Coordinates rebalancing

    - Manages partition distribution

  3. Additional Consumers Join

    - Leader reassigns partitions

    - Distributes load among members

    - Excess consumers remain idle

  4. Leader Failure

    - New leader automatically elected

    - Group reorganizes seamlessly

    - Partitions reassigned

    Key Points

    • Leadership not permanent role
    • Can shift between consumers
    • Self-managing without broker control

Manual Offset Control

Auto-Commit vs Manual

Auto-Commit (enable.auto.commit=true)

Pros:

  • Convenient - Kafka handles commits automatically
  • No code changes needed

Cons:

  • Risk of duplicates if consumer crashes before commit
  • Less control over commit timing
  • May commit before actual processing completes

Manual Commit (enable.auto.commit=false)

Pros:

  • Full control over commit timing
  • Commit only after successful processing
  • Prevents data loss scenarios

Cons:

  • More code complexity
  • Must handle commits explicitly

Configuration

java
1props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

Commit Methods

Synchronous Commit

java
1consumer.commitSync();

Characteristics:

  • Blocks until Kafka confirms commit
  • Safer - ensures commit completes
  • Slower - waits for acknowledgment
  • Recommended for critical data

Asynchronous Commit

java
1consumer.commitAsync();

Characteristics:

  • Sends commit request and continues immediately
  • Faster - no blocking
  • Risk of lost commits on failure
  • Good for high-throughput scenarios

Best Practice: Combined Approach

java
1while (true) {
2    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
3
4    for (ConsumerRecord<String, String> record : records) {
5        processRecord(record);
6    }
7
8    // Fast async commits during normal operation
9    consumer.commitAsync();
10}
11
12// Final sync commit on shutdown
13Runtime.getRuntime().addShutdownHook(new Thread(() -> {
14    consumer.commitSync();
15    consumer.close();
16}));

External Offset Storage

Why Use External Storage?

Instead of __consumer_offsets:

  • Store offsets in PostgreSQL, Redis, or custom database
  • More visibility into offset history
  • Cross-system coordination
  • Integration with external systems
  • Custom exactly-once guarantees

When to Use

Use __consumer_offsets (default) when:

  • Standard Kafka guarantees sufficient
  • Simple architecture preferred
  • No special tracking requirements

Use external storage when:

  • Need detailed audit trail
  • Multiple systems share offsets
  • Custom processing guarantees required
  • Fine-grained control needed

Resetting Offsets

Reset to Beginning

java
1consumer.seekToBeginning(consumer.assignment());

Use Cases:

  • Reprocess all messages due to bug fix
  • Recover missed transactions
  • Data migration or backfill

Cautions:

  • May process large backlog
  • Can overwhelm downstream systems
  • Old messages may have expired (retention policy)

Reset to Specific Offset

java
1TopicPartition partition = new TopicPartition("payment", 0);
2consumer.seek(partition, 15);

Use Cases:

  • Skip bad data between offsets 10-14
  • Resume from known good offset
  • Manual offset control after failure

Important:

  • Only changes read position
  • Doesn't commit offset automatically
  • Consumer reverts to committed offset on restart unless committed

Reset Based on Timestamp

java
1// Go back 1 hour
2long timestamp = System.currentTimeMillis() - (60 * 60 * 1000);
3
4Map<TopicPartition, Long> timestampMap = new HashMap<>();
5timestampMap.put(new TopicPartition("payment", 0), timestamp);
6
7Map<TopicPartition, OffsetAndTimestamp> offsetMap =
8    consumer.offsetsForTimes(timestampMap);
9
10for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : offsetMap.entrySet()) {
11    consumer.seek(entry.getKey(), entry.getValue().offset());
12}

Use Cases:

  • Replay events after system failure
  • Reprocess from specific time point
  • Time-based data recovery

Best Practices

Reliability

  • Use manual commits for critical data
  • Commit after successful processing
  • Handle commit failures appropriately

Performance

  • Use commitAsync() for normal operation
  • Reserve commitSync() for shutdown
  • Batch commits when possible

Recovery

  • Understand implications of offset resets
  • Test recovery procedures
  • Monitor consumer lag

Monitoring

  • Track committed vs current offsets
  • Alert on consumer lag
  • Monitor rebalance frequency

Summary

Key Concepts:

  1. Offsets track progress - Unique position per partition
  2. Auto vs manual commits - Trade-off between convenience and reliability
  3. Multiple consumers - Load distribution through partition assignment
  4. Offset resets - Replay data from beginning, specific offset, or timestamp
  5. Manual control - commitSync() for safety, commitAsync() for speed

    Mastering offset management ensures reliable, efficient Kafka consumers that handle failures gracefully and maintain data consistency.