CentralMesh.io

Kafka Fundamentals for Beginners
AdSense Banner (728x90)

5.3 Consumer API

Learn to read data from Kafka using Consumer API. Covers consumer groups, polling, offsets, and graceful shutdown with complete Java implementation examples.

Video Coming Soon

Consumer API

Overview

The Consumer API allows applications to read data from Kafka topics. Unlike traditional queues where messages disappear after reading, Kafka keeps messages available, allowing multiple consumers to read at their own pace.

Consumer Fundamentals

Publish-Subscribe Model

  • Producers publish messages to topics
  • Consumers subscribe to topics to receive messages
  • Messages remain available for configured retention period
  • Multiple consumers can read same messages independently

Pull-Based Model

  • Consumers request data from Kafka (pull)
  • Kafka doesn't push data to consumers
  • Consumers control their own pace of consumption

Project Setup

Initialize Project

Create project directory and initialize Gradle:

bash
1mkdir kafka-consumer-demo
2cd kafka-consumer-demo
3gradle init --type java-application

Project Structure

Generated files:

  • build.gradle - Gradle configuration file
  • gradlew / gradlew.bat - Gradle wrapper scripts
  • src/main/java/ - Main Java source folder
  • src/test/java/ - Test files folder

Add Kafka Dependencies

Update build.gradle:

groovy
1plugins {
2    id 'java'
3}
4
5group = 'com.example'
6version = '1.0-SNAPSHOT'
7
8repositories {
9    mavenCentral()
10}
11
12dependencies {
13    implementation 'org.apache.kafka:kafka-clients:3.5.0'
14}

Build project:

bash
1./gradlew build

Creating a Consumer

Basic Consumer Class

Create src/main/java/com/example/ConsumerDemo.java:

java
1package com.example;
2
3import org.apache.kafka.clients.consumer.*;
4import java.time.Duration;
5import java.util.Collections;
6import java.util.Properties;
7
8public class ConsumerDemo {
9    public static void main(String[] args) {
10        // Consumer configuration
11        Properties props = new Properties();
12        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
13        props.put(ConsumerConfig.GROUP_ID_CONFIG, "payment-consumer-group");
14        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
15                 "org.apache.kafka.common.serialization.StringDeserializer");
16        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
17                 "org.apache.kafka.common.serialization.StringDeserializer");
18        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
19
20        // Create consumer
21        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
22
23        // Subscribe to topic
24        consumer.subscribe(Collections.singletonList("payment"));
25
26        // Shutdown hook for graceful exit
27        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
28            System.out.println("Shutting down consumer...");
29            consumer.close();
30        }));
31
32        // Poll for messages
33        while (true) {
34            ConsumerRecords<String, String> records =
35                consumer.poll(Duration.ofMillis(100));
36
37            for (ConsumerRecord<String, String> record : records) {
38                System.out.printf("Key: %s, Value: %s, Partition: %d, Offset: %d%n",
39                    record.key(), record.value(), record.partition(), record.offset());
40            }
41        }
42    }
43}

Consumer Configuration

Essential Properties

BOOTSTRAP_SERVERS_CONFIG

  • Specifies Kafka broker addresses
  • Example: localhost:9092

GROUP_ID_CONFIG

  • Assigns consumer to consumer group
  • Example: payment-consumer-group
  • Groups enable work distribution among multiple consumers

KEY_DESERIALIZER_CLASS_CONFIG

  • Converts byte array keys back to objects
  • Example: StringDeserializer for string keys

VALUE_DESERIALIZER_CLASS_CONFIG

  • Converts byte array values back to objects
  • Example: StringDeserializer for string values

AUTO_OFFSET_RESET_CONFIG

  • Defines behavior when no initial offset exists
  • earliest: Read from beginning of topic
  • latest: Read only new messages

Polling and Processing Messages

The Poll Loop

java
1while (true) {
2    ConsumerRecords<String, String> records =
3        consumer.poll(Duration.ofMillis(100));
4
5    for (ConsumerRecord<String, String> record : records) {
6        // Process each record
7        System.out.printf("Key: %s, Value: %s%n",
8            record.key(), record.value());
9    }
10}

How Polling Works

  • poll() method requests new messages from Kafka
  • Waits up to specified duration (100ms) if no messages available
  • Returns ConsumerRecords collection containing messages
  • Loop through each ConsumerRecord to process messages

Record Information

Each ConsumerRecord contains:

  • Key: Message key
  • Value: Message payload
  • Partition: Partition message was read from
  • Offset: Message position in partition

Graceful Shutdown

Why Shutdown Hooks Matter

Without proper shutdown:

  • Kafka thinks consumer is still active
  • Offsets might not commit properly
  • Can cause duplicate message processing
  • Network connections don't close cleanly

Implementing Shutdown Hook

java
1Runtime.getRuntime().addShutdownHook(new Thread(() -> {
2    System.out.println("Shutting down consumer...");
3    consumer.close();
4}));

This ensures:

  • Consumer closes properly on CTRL+C
  • Offsets are committed
  • Resources are released
  • No data loss or duplication

Running the Consumer

Start Kafka

Using Docker:

bash
1docker-compose up -d

Compile Project

bash
1./gradlew build

Run Consumer

bash
1./gradlew run

Expected Output

Consumer will:

  • Connect to Kafka broker
  • Subscribe to "payment" topic
  • Start polling for messages
  • Print message details (key, value, partition, offset)
  • Continue running until manually stopped

Key Concepts

Consumer Groups

  • Allow multiple consumers to share workload
  • Each partition assigned to one consumer in group
  • Enables parallel processing
  • Improves scalability and throughput

Offsets

  • Track consumer's position in topic
  • Automatically updated as messages are read
  • Consumer resumes from last committed offset
  • Prevents message loss or duplication

Configuration Settings

group.id

  • Identifies consumer group
  • Required for consumer coordination

auto.offset.reset

  • Controls starting position for new consumers
  • earliest or latest

Polling

  • Consumer continuously polls for new messages
  • Configurable wait duration
  • Pull-based consumption model

Best Practices

  • Always implement graceful shutdown
  • Use appropriate deserializers for your data
  • Configure consumer groups for parallel processing
  • Monitor consumer lag
  • Handle errors appropriately
  • Commit offsets strategically

Summary

Consumer API provides:

  • Flexible message consumption from topics
  • Support for consumer groups and parallel processing
  • Offset management for reliable message delivery
  • Pull-based model for consumer-controlled pacing

Understanding Consumer API is essential for building robust data consumption systems with Kafka.