6.1 Kafka Connect
Integrating external systems with Kafka Connect.
Video Coming Soon
Kafka Connect
Introduction
Kafka Connect is a tool designed to integrate Kafka with external systems without writing custom code. It acts as a bridge between Kafka and various data sources and sinks, enabling seamless data movement.
Connector Types
Source Connectors
- Pull data into Kafka from external systems
- Continuously read from source systems
- Push data into Kafka topics for real-time processing
Sink Connectors
- Push data out of Kafka to external systems
- Read messages from Kafka topics
- Write to external destinations
Operational Modes
Standalone Mode
- Great for simple, local tasks
- Ideal for development environments
- Single process handles all connectors
- Easy to set up and test
Distributed Mode
- Production-ready deployment
- Scales efficiently with multiple workers
- Manages multiple connectors across cluster
- Automatic load balancing and fault tolerance
Why Use Kafka Connect?
Scalability
- Easy to scale by adding worker nodes
- Automatic load balancing in distributed mode
- No manual handling of distribution
- Grows with your data volume
Fault Tolerance
- Continues running if worker node crashes
- Automatically redistributes tasks
- Resumes processing without data loss
- Ensures pipeline reliability
Prebuilt Connectors
- Huge ecosystem of ready-to-use connectors
- Supports databases, cloud services, and more
- No need to write custom integration code
- Just configure and deploy
Configuration-Based Setup
- Define everything in simple configuration files
- No complex coding required
- Easy to manage and update
- Quick to modify as system evolves
Source Connector Examples
JDBC Source Connector
Use Case: Pull data from relational databases
- Continuously reads from PostgreSQL, MySQL, Oracle
- Captures new records as they appear
- Pushes data into Kafka topics
- Enables real-time database change capture
S3 Source Connector
Use Case: Ingest data from cloud storage
- Reads files from AWS S3 buckets
- Streams contents into Kafka topics
- Handles structured and unstructured data
- Perfect for cloud-based data lakes
MQTT Source Connector
Use Case: Stream IoT device data
- Captures data from MQTT brokers
- Ingests sensor and device messages
- Streams IoT data directly into Kafka
- Enables real-time IoT analytics
Sink Connector Examples
JDBC Sink Connector
Use Case: Store Kafka messages in databases
- Writes data to PostgreSQL, MySQL, Oracle tables
- Persists processed events for reporting
- Enables integration with existing applications
- Supports analytics and historical queries
Elasticsearch Sink Connector
Use Case: Real-time search and analytics
- Streams Kafka data into Elasticsearch
- Makes data instantly searchable
- Perfect for log analysis and monitoring
- Enables search-driven applications
HDFS Sink Connector
Use Case: Long-term storage for big data
- Writes Kafka messages to Hadoop HDFS
- Enables scalable storage
- Supports big data processing
- Ideal for data lakes and machine learning
Kafka Connect Architecture
Workers
- Engines that run Kafka Connect
- Execute connectors and manage tasks
- In distributed mode, multiple workers share load
- Ensure high availability and reliability
Connectors
- Define data source or destination
- Wrapper around external system
- Examples: JDBC, S3, Elasticsearch
- Configurable for specific use cases
Tasks
- Units of work assigned by connectors
- Can run in parallel for performance
- Distribute workload across workers
- More tasks enable better parallelization
Converters
- Translate messages between formats
- Handle different system data formats
- Common types: JSON, Avro, Protobuf
- Determine serialization and deserialization
Example: Online Store Data Pipeline
Scenario
- Customer orders stored in MySQL database
- Need real-time search in Elasticsearch
- Kafka Connect automates data movement
Architecture
Components:
- Two worker nodes running Kafka Connect
- JDBC Source Connector reads from MySQL
- Elasticsearch Sink Connector writes to Elasticsearch
- Kafka topic bridges source and sink
Data Flow:
- Orders placed in MySQL database
- JDBC Source Connector publishes to Kafka topic
- Elasticsearch Sink Connector reads from topic
- Data written to Elasticsearch for search
Task Parallelization
- Each connector spins up multiple tasks
- Tasks divide work for parallel processing
- Faster data movement at scale
- Both source and sink benefit from parallelization
Format Conversion
- MySQL stores structured data
- Kafka expects JSON, Avro, or Protobuf
- Converters transform data appropriately
- Format preserved through pipeline
Running Kafka Connect
Standalone Mode
Start Command:
1connect-standalone.sh config/connect-standalone.properties connector-config.jsonCharacteristics:
- Single process runs everything
- Great for local testing
- Simple small jobs
- No high availability
Distributed Mode
Start Command:
1connect-distributed.sh config/connect-distributed.propertiesCharacteristics:
- Multiple workers share workload
- Production-ready
- Scalable and fault-tolerant
- Workers pick up tasks if one fails
Configuration Differences
Standalone Mode:
1offset.storage.file.filename=/tmp/connect.offsets- Offsets stored locally in file
- Single machine storage
- No coordination needed
Distributed Mode:
1group.id=connect-cluster
2offset.storage.topic=connect-offsets
3config.storage.topic=connect-configs
4status.storage.topic=connect-status- Offsets stored in Kafka topic
connect-offsets - Configurations in
connect-configs - Status tracking in
connect-status - Workers coordinate as group
- Built for scale
Configuring a Connector
Example: JDBC Source Connector
1{
2 "name": "jdbc-source",
3 "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
4 "tasks.max": "2",
5 "connection.url": "jdbc:postgresql://localhost:5432/mydb",
6 "topic.prefix": "db-"
7}Configuration Elements:
name: Unique connector identifierconnector.class: Specific connector implementationtasks.max: Maximum parallel tasksconnection.url: Database connection stringtopic.prefix: Prefix for destination topics
Finding Connector Settings
Documentation Sources:
- Connector documentation (Confluent, etc.)
- Example configuration files
- REST API endpoint
/connector-plugins - Lists available connectors and config options
Kafka Connect REST API
List Connectors
1curl -X GET http://localhost:8083/connectorsReturns list of all active connectors.
Create New Connector
1curl -X POST -H "Content-Type: application/json" \
2 --data @connector-config.json \
3 http://localhost:8083/connectorsDeploys connector with specified configuration.
Check Connector Status
1curl -X GET http://localhost:8083/connectors/jdbc-source/statusShows whether connector is running, paused, or has errors.
Additional Operations
- Update connectors
- Pause connectors
- Delete connectors
- Full API documentation available online
Monitoring Kafka Connect
Logs
1tail -f /var/log/kafka-connect/connect.log- Real-time log monitoring
- Identifies stuck or failing connectors
- Shows task failures and errors
Internal Kafka Topics
Three special topics:
- Connector configurations - stores connector settings
- Offset tracking - source connectors know where they left off
- Status records - connector and task statuses
Checking these topics provides insights into behind-the-scenes operations.
Advanced Monitoring Tools
- Confluent Control Center
- Prometheus
- Grafana
- Deeper visibility into performance and health
Summary
Key Concepts:
- Integration Bridge
- Kafka Connect bridges Kafka and external systems
- Source connectors bring data in
- Sink connectors send data out
- Scalability and Reliability
- Built for production use
- Fault-tolerant architecture
- Easy to scale
- Simple Configuration
- Configuration-based setup
- No complex coding required
- Easy to manage
- REST API Management
- Manage connectors remotely
- No config file editing
- No service restarts needed
Kafka Connect enables efficient data movement, scales with your needs, and keeps everything running smoothly.