CentralMesh.io

Kafka Fundamentals for Beginners
AdSense Banner (728x90)

6.2 Running Kafka Connect Locally

Set up Kafka Connect using Docker to stream data from MySQL to file. Learn JDBC Source and File Sink connectors for real-time data integration.

Video Coming Soon

Running Kafka Connect Locally

Overview

This session demonstrates setting up Kafka Connect using Docker to stream data from MySQL to a file using File Sink Connector. Docker simplifies setup by providing self-contained, consistent environments across different systems.

Architecture Components

Source System

  • MySQL Database: Stores transactional order data
  • Widely used for business transactions
  • Kafka Connect captures changes in real-time

Kafka Connect

  • Distributed Mode: Provides scalability and fault tolerance for production scenarios
  • Runs as REST API on port 8083
  • Coordinates data movement between Kafka and external systems

Sink Destination

  • File Sink Connector: Writes data to local file for demo purposes
  • Real-world alternatives: S3, HDFS, data warehouses
  • Simplifies verification without cloud dependencies

Docker Compose Setup

Zookeeper Configuration

  • Manages broker metadata and leader election
  • Maintains cluster state
  • Listens on standard port 2181
  • Required for Kafka coordination

Kafka Broker Configuration

  • Container name: kafka1
  • Exposed port: 9092
  • Key environment variables:

- KAFKA_BROKER_ID: 1 - Unique broker identifier

- KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - Zookeeper connection

- KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 - Client connection endpoint

- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT - No authentication/encryption

MySQL Configuration

  • Database: orders_db (created automatically)
  • Root password: root (for demo only)
  • Port: 3306
  • Stores order data for streaming

Kafka Connect Configuration

  • Depends on kafka1 broker
  • REST API port: 8083
  • Key settings:

- CONNECT_BOOTSTRAP_SERVERS: kafka1:9092 - Kafka connection

- CONNECT_GROUP_ID: "connect-cluster" - Cluster group identifier

- Internal topics: CONNECT_CONFIG_STORAGE_TOPIC, CONNECT_OFFSET_STORAGE_TOPIC, CONNECT_STATUS_STORAGE_TOPIC

- Data format: JsonConverter for serialization

Starting Services

bash
1# Start all services in background
2docker-compose up -d
3
4# Check container status
5docker ps

Setting Up Source Database

Create Orders Table

bash
1# Connect to MySQL container
2docker exec -it mysql mysql -u root -proot orders_db
3
4# Create table
5CREATE TABLE orders (
6  id INT AUTO_INCREMENT PRIMARY KEY,
7  customer_name VARCHAR(255),
8  product_name VARCHAR(255),
9  quantity INT
10);

Insert Sample Data

sql
1INSERT INTO orders (customer_name, product_name, quantity) VALUES
2  ('John Doe', 'Laptop', 1),
3  ('Jane Smith', 'Mouse', 2),
4  ('Bob Johnson', 'Keyboard', 1);

Verify Data

sql
1SELECT * FROM orders;

JDBC Source Connector Configuration

Configuration File

json
1{
2  "name": "mysql-source",
3  "config": {
4    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
5    "connection.url": "jdbc:mysql://mysql:3306/orders_db",
6    "connection.user": "root",
7    "connection.password": "root",
8    "mode": "incrementing",
9    "incrementing.column.name": "id",
10    "topic.prefix": "orders-",
11    "table.whitelist": "orders"
12  }
13}

Key Configuration Parameters

  • Connector Class: JDBC Source Connector for MySQL
  • Connection Details: Database URL, credentials
  • Tracking Mode: Uses incrementing id column
  • Topic Mapping: Publishes to topics prefixed with orders-
  • Table Monitoring: Watches orders table for new records

Deploy Connector

bash
1curl -X POST http://localhost:8083/connectors \
2  -H "Content-Type: application/json" \
3  -d @mysql-source-config.json

Verify Data Flow to Kafka

bash
1# Consume from orders topic
2kafka-console-consumer.sh --bootstrap-server localhost:9092 \
3  --topic orders --from-beginning

Expected output: MySQL orders appearing as Kafka messages

File Sink Connector Configuration

Configuration File

json
1{
2  "name": "file-sink",
3  "config": {
4    "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
5    "tasks.max": "1",
6    "file": "/tmp/orders.txt",
7    "topics": "orders",
8    "value.converter.schemas.enable": "false"
9  }
10}

Key Parameters

  • FileStreamSinkConnector: Writes Kafka records to file
  • Single Task: Simplified processing
  • Output Location: /tmp/orders.txt
  • Source Topic: orders
  • Schema Handling: Disabled to store JSON payload only

Deploy Sink Connector

bash
1curl -X POST http://localhost:8083/connectors \
2  -H "Content-Type: application/json" \
3  -d @file-sink-config.json

Verify Output

bash
1cat /tmp/orders.txt

Orders from Kafka should appear in the file

End-to-End Validation

Insert New Records

bash
1# Connect to MySQL
2docker exec -it mysql mysql -u root -proot orders_db
3
4# Insert new orders
5INSERT INTO orders (customer_name, product_name, quantity) VALUES
6  ('Alice Brown', 'Monitor', 1),
7  ('Charlie Davis', 'Keyboard', 2);

Check Kafka Topic

bash
1kafka-console-consumer.sh --bootstrap-server localhost:9092 \
2  --topic orders --from-beginning

New records should appear in the stream

Verify File Output

bash
1cat /tmp/orders.txt

Latest orders should be written to the file

Summary

  • Kafka Connect bridges Kafka with external systems seamlessly
  • Source Connectors pull data from databases like MySQL into Kafka
  • Sink Connectors write Kafka records to external systems (files, cloud storage, databases)
  • Same pattern applies to S3, HDFS, and other cloud storage solutions
  • Enables real-time data integration without custom code