AdSense Banner (728x90)
6.4 Running Kafka Streams Locally
Build and run Kafka Streams application locally using Java and Docker. Filter and transform transaction data in real-time with hands-on examples.
Video Coming Soon
Running Kafka Streams Locally
Overview
This session sets up a local Kafka Streams environment to process real-time data efficiently using Docker, Java, and Gradle.
Objectives
- Run Kafka instance (using Docker)
- Write simple Kafka Streams application
- Verify stream processing with test messages
Exercise Overview
Processing Logic
Input Stream: Transactions with user name and amount
json
1{"user": "Alice", "amount": 100}
2{"user": "Bob", "amount": 200}Processing Steps:
- Filter out transactions where amount ≤ 100
- Transform user name to uppercase
Output Stream:
json1{"user": "BOB", "amount": 200}Alice's transaction is filtered out (amount = 100), Bob's name converted to uppercase.
Prerequisites
Required Software
- Docker: Quickly spin up Kafka environment
- Java 11+: Kafka Streams is Java-based library
- Gradle: Manage dependencies and build project
Setting Up Kafka with Docker
Docker Compose Configuration
yaml
1version: '3'
2services:
3 zookeeper:
4 image: confluentinc/cp-zookeeper:latest
5 container_name: zookeeper
6 environment:
7 ZOOKEEPER_CLIENT_PORT: 2181
8 ports:
9 - "2181:2181"
10
11 kafka1:
12 image: confluentinc/cp-kafka:latest
13 container_name: kafka1
14 depends_on:
15 - zookeeper
16 environment:
17 KAFKA_BROKER_ID: 1
18 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
19 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
20 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
21 ports:
22 - "9092:9092"Configuration Details
#### Zookeeper
- Manages Kafka brokers
- Port: 2181 (default)
- Confluent image for Kafka compatibility
#### Kafka Broker
- ID: 1 (useful for multiple brokers)
- Port: 9092 (standard Kafka port)
- Advertised listeners:
localhost:9092for client connections - Security: Plaintext (no encryption/authentication)
- Depends on Zookeeper
Starting Kafka
bash
1# Start services in background
2docker-compose up -d
3
4# Check running containers
5docker psSetting Up Kafka Streams Project
Initialize Project
bash
1# Create project directory
2mkdir kafka-streams-app
3cd kafka-streams-app
4
5# Initialize Gradle project
6gradle init --type java-applicationProject Structure
text
1kafka-streams-app/
2├── build.gradle # Build configuration and dependencies
3├── src/
4│ ├── main/
5│ │ └── java/ # Application code
6│ └── test/
7│ └── java/ # Test code
8├── gradlew # Gradle wrapper (Unix)
9├── gradlew.bat # Gradle wrapper (Windows)
10└── settings.gradle # Project settingsAdd Dependencies
build.gradle:
gradle
1dependencies {
2 implementation 'org.apache.kafka:kafka-streams:3.4.0'
3}Application Properties
properties
1bootstrap.servers=localhost:9092
2application.id=streams-appRunning the Application
bash
1java -jar my-streams-app.jarCreating Kafka Topics
Create Input Topic
bash
1docker exec -it kafka1 kafka-topics.sh \
2 --create --topic input-topic \
3 --bootstrap-server localhost:9092 \
4 --partitions 1 \
5 --replication-factor 1Create Output Topic
bash
1docker exec -it kafka1 kafka-topics.sh \
2 --create --topic output-topic \
3 --bootstrap-server localhost:9092 \
4 --partitions 1 \
5 --replication-factor 1Command Breakdown
docker exec -it kafka1: Run command inside Kafka containerkafka-topics.sh --create: Create new topic--topic <name>: Topic name--bootstrap-server localhost:9092: Kafka broker address--partitions 1: Single partition (simple setup)--replication-factor 1: No replication (single broker)
Writing Kafka Streams Application
java
1import org.apache.kafka.streams.KafkaStreams;
2import org.apache.kafka.streams.StreamsBuilder;
3import org.apache.kafka.streams.StreamsConfig;
4import org.apache.kafka.streams.kstream.KStream;
5import org.apache.kafka.common.serialization.Serdes;
6
7import java.util.Properties;
8
9public class StreamsApp {
10 public static void main(String[] args) {
11 // 1. Configure Kafka Streams
12 Properties props = new Properties();
13 props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-app");
14 props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
15 props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
16 Serdes.String().getClass());
17 props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
18 Serdes.String().getClass());
19
20 // 2. Build topology
21 StreamsBuilder builder = new StreamsBuilder();
22 KStream<String, String> stream = builder.stream("input-topic");
23
24 // 3. Process data
25 stream
26 .filter((key, value) -> {
27 String[] parts = value.split(",");
28 int amount = Integer.parseInt(parts[1]);
29 return amount > 100; // Filter: amount > 100
30 })
31 .mapValues(value -> {
32 String[] parts = value.split(",");
33 String user = parts[0].toUpperCase(); // Transform: uppercase
34 return user + "," + parts[1];
35 })
36 .to("output-topic"); // Write to output
37
38 // 4. Start streams
39 KafkaStreams streams = new KafkaStreams(builder.build(), props);
40 streams.start();
41
42 // 5. Graceful shutdown
43 Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
44 }
45}Configuration Details
APPLICATION_ID_CONFIG: Unique identifier for Streams appBOOTSTRAP_SERVERS_CONFIG: Kafka broker addressDEFAULT_KEY_SERDE_CLASS_CONFIG: Key serialization (String)DEFAULT_VALUE_SERDE_CLASS_CONFIG: Value serialization (String)
Processing Logic
- Read: Stream from
input-topic - Filter: Keep transactions where amount > 100
- Transform: Convert username to uppercase
- Write: Send to
output-topic
Sending Test Messages
bash
1docker exec -it kafka1 kafka-console-producer.sh \
2 --broker-list localhost:9092 \
3 --topic input-topicType messages (press Enter after each):
text
1Alice,100
2Bob,200Command Breakdown
docker exec -it kafka1: Interactive command in containerkafka-console-producer.sh: Producer tool--broker-list localhost:9092: Kafka broker--topic input-topic: Target topic
Consuming Transformed Messages
bash
1docker exec -it kafka1 kafka-console-consumer.sh \
2 --bootstrap-server localhost:9092 \
3 --topic output-topic \
4 --from-beginningExpected Output:
text
1BOB,200Verification
- Alice's transaction (amount = 100) filtered out
- Bob's name converted to uppercase
- Confirms stream processing works correctly
Scalability in Kafka Streams
Horizontal Scaling
#### Increase Partitions
- More partitions → better data distribution
- Enables parallel processing across instances
#### Multiple Application Instances
- Behaves like consumer group
- Automatic workload balancing
- Example: 3 servers → run app on each
- Kafka dynamically assigns partitions
Scaling Limits
- Maximum instances = number of partitions
- Topic with 2 partitions → max 2 instances
- Additional instances remain idle (standby)
Key Requirement: Partition Alignment
- Same keying for KStream and KTable
- Ensures data for same key goes to same partition
- Enables local joins without data shuffling
- Example: User ID as key → Alice's data always on same partition
Summary
- Real-Time Processing: Kafka Streams transforms and filters data as it flows
- Example Operations:
- .filter(): Remove unwanted records
- .mapValues(): Transform data
- Standard Java Application: No external processing frameworks needed
- Horizontal Scaling: Like regular Kafka consumers
- High-Throughput: Suitable for demanding data pipelines