6.2 Running Kafka Connect Locally
Set up Kafka Connect using Docker to stream data from MySQL to file. Learn JDBC Source and File Sink connectors for real-time data integration.
Video Coming Soon
Running Kafka Connect Locally
Overview
This session demonstrates setting up Kafka Connect using Docker to stream data from MySQL to a file using File Sink Connector. Docker simplifies setup by providing self-contained, consistent environments across different systems.
Architecture Components
Source System
- MySQL Database: Stores transactional order data
- Widely used for business transactions
- Kafka Connect captures changes in real-time
Kafka Connect
- Distributed Mode: Provides scalability and fault tolerance for production scenarios
- Runs as REST API on port 8083
- Coordinates data movement between Kafka and external systems
Sink Destination
- File Sink Connector: Writes data to local file for demo purposes
- Real-world alternatives: S3, HDFS, data warehouses
- Simplifies verification without cloud dependencies
Docker Compose Setup
Zookeeper Configuration
- Manages broker metadata and leader election
- Maintains cluster state
- Listens on standard port 2181
- Required for Kafka coordination
Kafka Broker Configuration
- Container name:
kafka1 - Exposed port: 9092
- Key environment variables:
- KAFKA_BROKER_ID: 1 - Unique broker identifier
- KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - Zookeeper connection
- KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 - Client connection endpoint
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT - No authentication/encryption
MySQL Configuration
- Database:
orders_db(created automatically) - Root password:
root(for demo only) - Port: 3306
- Stores order data for streaming
Kafka Connect Configuration
- Depends on
kafka1broker - REST API port: 8083
- Key settings:
- CONNECT_BOOTSTRAP_SERVERS: kafka1:9092 - Kafka connection
- CONNECT_GROUP_ID: "connect-cluster" - Cluster group identifier
- Internal topics: CONNECT_CONFIG_STORAGE_TOPIC, CONNECT_OFFSET_STORAGE_TOPIC, CONNECT_STATUS_STORAGE_TOPIC
- Data format: JsonConverter for serialization
Starting Services
1# Start all services in background
2docker-compose up -d
3
4# Check container status
5docker psSetting Up Source Database
Create Orders Table
1# Connect to MySQL container
2docker exec -it mysql mysql -u root -proot orders_db
3
4# Create table
5CREATE TABLE orders (
6 id INT AUTO_INCREMENT PRIMARY KEY,
7 customer_name VARCHAR(255),
8 product_name VARCHAR(255),
9 quantity INT
10);Insert Sample Data
1INSERT INTO orders (customer_name, product_name, quantity) VALUES
2 ('John Doe', 'Laptop', 1),
3 ('Jane Smith', 'Mouse', 2),
4 ('Bob Johnson', 'Keyboard', 1);Verify Data
1SELECT * FROM orders;JDBC Source Connector Configuration
Configuration File
1{
2 "name": "mysql-source",
3 "config": {
4 "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
5 "connection.url": "jdbc:mysql://mysql:3306/orders_db",
6 "connection.user": "root",
7 "connection.password": "root",
8 "mode": "incrementing",
9 "incrementing.column.name": "id",
10 "topic.prefix": "orders-",
11 "table.whitelist": "orders"
12 }
13}Key Configuration Parameters
- Connector Class: JDBC Source Connector for MySQL
- Connection Details: Database URL, credentials
- Tracking Mode: Uses incrementing
idcolumn - Topic Mapping: Publishes to topics prefixed with
orders- - Table Monitoring: Watches
orderstable for new records
Deploy Connector
1curl -X POST http://localhost:8083/connectors \
2 -H "Content-Type: application/json" \
3 -d @mysql-source-config.jsonVerify Data Flow to Kafka
1# Consume from orders topic
2kafka-console-consumer.sh --bootstrap-server localhost:9092 \
3 --topic orders --from-beginningExpected output: MySQL orders appearing as Kafka messages
File Sink Connector Configuration
Configuration File
1{
2 "name": "file-sink",
3 "config": {
4 "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
5 "tasks.max": "1",
6 "file": "/tmp/orders.txt",
7 "topics": "orders",
8 "value.converter.schemas.enable": "false"
9 }
10}Key Parameters
- FileStreamSinkConnector: Writes Kafka records to file
- Single Task: Simplified processing
- Output Location:
/tmp/orders.txt - Source Topic:
orders - Schema Handling: Disabled to store JSON payload only
Deploy Sink Connector
1curl -X POST http://localhost:8083/connectors \
2 -H "Content-Type: application/json" \
3 -d @file-sink-config.jsonVerify Output
1cat /tmp/orders.txtOrders from Kafka should appear in the file
End-to-End Validation
Insert New Records
1# Connect to MySQL
2docker exec -it mysql mysql -u root -proot orders_db
3
4# Insert new orders
5INSERT INTO orders (customer_name, product_name, quantity) VALUES
6 ('Alice Brown', 'Monitor', 1),
7 ('Charlie Davis', 'Keyboard', 2);Check Kafka Topic
1kafka-console-consumer.sh --bootstrap-server localhost:9092 \
2 --topic orders --from-beginningNew records should appear in the stream
Verify File Output
1cat /tmp/orders.txtLatest orders should be written to the file
Summary
- Kafka Connect bridges Kafka with external systems seamlessly
- Source Connectors pull data from databases like MySQL into Kafka
- Sink Connectors write Kafka records to external systems (files, cloud storage, databases)
- Same pattern applies to S3, HDFS, and other cloud storage solutions
- Enables real-time data integration without custom code