5.3 Consumer API
Learn to read data from Kafka using Consumer API. Covers consumer groups, polling, offsets, and graceful shutdown with complete Java implementation examples.
Video Coming Soon
Consumer API
Overview
The Consumer API allows applications to read data from Kafka topics. Unlike traditional queues where messages disappear after reading, Kafka keeps messages available, allowing multiple consumers to read at their own pace.
Consumer Fundamentals
Publish-Subscribe Model
- Producers publish messages to topics
- Consumers subscribe to topics to receive messages
- Messages remain available for configured retention period
- Multiple consumers can read same messages independently
Pull-Based Model
- Consumers request data from Kafka (pull)
- Kafka doesn't push data to consumers
- Consumers control their own pace of consumption
Project Setup
Initialize Project
Create project directory and initialize Gradle:
1mkdir kafka-consumer-demo
2cd kafka-consumer-demo
3gradle init --type java-applicationProject Structure
Generated files:
build.gradle- Gradle configuration filegradlew/gradlew.bat- Gradle wrapper scriptssrc/main/java/- Main Java source foldersrc/test/java/- Test files folder
Add Kafka Dependencies
Update build.gradle:
1plugins {
2 id 'java'
3}
4
5group = 'com.example'
6version = '1.0-SNAPSHOT'
7
8repositories {
9 mavenCentral()
10}
11
12dependencies {
13 implementation 'org.apache.kafka:kafka-clients:3.5.0'
14}Build project:
1./gradlew buildCreating a Consumer
Basic Consumer Class
Create src/main/java/com/example/ConsumerDemo.java:
1package com.example;
2
3import org.apache.kafka.clients.consumer.*;
4import java.time.Duration;
5import java.util.Collections;
6import java.util.Properties;
7
8public class ConsumerDemo {
9 public static void main(String[] args) {
10 // Consumer configuration
11 Properties props = new Properties();
12 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
13 props.put(ConsumerConfig.GROUP_ID_CONFIG, "payment-consumer-group");
14 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
15 "org.apache.kafka.common.serialization.StringDeserializer");
16 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
17 "org.apache.kafka.common.serialization.StringDeserializer");
18 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
19
20 // Create consumer
21 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
22
23 // Subscribe to topic
24 consumer.subscribe(Collections.singletonList("payment"));
25
26 // Shutdown hook for graceful exit
27 Runtime.getRuntime().addShutdownHook(new Thread(() -> {
28 System.out.println("Shutting down consumer...");
29 consumer.close();
30 }));
31
32 // Poll for messages
33 while (true) {
34 ConsumerRecords<String, String> records =
35 consumer.poll(Duration.ofMillis(100));
36
37 for (ConsumerRecord<String, String> record : records) {
38 System.out.printf("Key: %s, Value: %s, Partition: %d, Offset: %d%n",
39 record.key(), record.value(), record.partition(), record.offset());
40 }
41 }
42 }
43}Consumer Configuration
Essential Properties
BOOTSTRAP_SERVERS_CONFIG
- Specifies Kafka broker addresses
- Example:
localhost:9092
GROUP_ID_CONFIG
- Assigns consumer to consumer group
- Example:
payment-consumer-group - Groups enable work distribution among multiple consumers
KEY_DESERIALIZER_CLASS_CONFIG
- Converts byte array keys back to objects
- Example:
StringDeserializerfor string keys
VALUE_DESERIALIZER_CLASS_CONFIG
- Converts byte array values back to objects
- Example:
StringDeserializerfor string values
AUTO_OFFSET_RESET_CONFIG
- Defines behavior when no initial offset exists
earliest: Read from beginning of topiclatest: Read only new messages
Polling and Processing Messages
The Poll Loop
1while (true) {
2 ConsumerRecords<String, String> records =
3 consumer.poll(Duration.ofMillis(100));
4
5 for (ConsumerRecord<String, String> record : records) {
6 // Process each record
7 System.out.printf("Key: %s, Value: %s%n",
8 record.key(), record.value());
9 }
10}How Polling Works
poll()method requests new messages from Kafka- Waits up to specified duration (100ms) if no messages available
- Returns
ConsumerRecordscollection containing messages - Loop through each
ConsumerRecordto process messages
Record Information
Each ConsumerRecord contains:
- Key: Message key
- Value: Message payload
- Partition: Partition message was read from
- Offset: Message position in partition
Graceful Shutdown
Why Shutdown Hooks Matter
Without proper shutdown:
- Kafka thinks consumer is still active
- Offsets might not commit properly
- Can cause duplicate message processing
- Network connections don't close cleanly
Implementing Shutdown Hook
1Runtime.getRuntime().addShutdownHook(new Thread(() -> {
2 System.out.println("Shutting down consumer...");
3 consumer.close();
4}));This ensures:
- Consumer closes properly on CTRL+C
- Offsets are committed
- Resources are released
- No data loss or duplication
Running the Consumer
Start Kafka
Using Docker:
1docker-compose up -dCompile Project
1./gradlew buildRun Consumer
1./gradlew runExpected Output
Consumer will:
- Connect to Kafka broker
- Subscribe to "payment" topic
- Start polling for messages
- Print message details (key, value, partition, offset)
- Continue running until manually stopped
Key Concepts
Consumer Groups
- Allow multiple consumers to share workload
- Each partition assigned to one consumer in group
- Enables parallel processing
- Improves scalability and throughput
Offsets
- Track consumer's position in topic
- Automatically updated as messages are read
- Consumer resumes from last committed offset
- Prevents message loss or duplication
Configuration Settings
group.id
- Identifies consumer group
- Required for consumer coordination
auto.offset.reset
- Controls starting position for new consumers
earliestorlatest
Polling
- Consumer continuously polls for new messages
- Configurable wait duration
- Pull-based consumption model
Best Practices
- Always implement graceful shutdown
- Use appropriate deserializers for your data
- Configure consumer groups for parallel processing
- Monitor consumer lag
- Handle errors appropriately
- Commit offsets strategically
Summary
Consumer API provides:
- Flexible message consumption from topics
- Support for consumer groups and parallel processing
- Offset management for reliable message delivery
- Pull-based model for consumer-controlled pacing
Understanding Consumer API is essential for building robust data consumption systems with Kafka.