MongoDB Kafka Connector
Apache Kafka is an open-source publish/subscribe messaging system. Kafka Connect, a component of Apache Kafka, addresses the challenge of linking Apache Kafka with various datastores, including MongoDB. Kafka Connect offers:
- A fault-tolerant runtime for transferring data to and from datastores
- A framework that enables the Apache Kafka community to share solutions for connecting Apache Kafka to different datastores
In this post, we’ll focus on using MongoDB as a data lake. The MongoDB Kafka sink connector is a Kafka Connect connector that reads data from Apache Kafka and writes it to MongoDB. The official MongoDB Kafka Connector can be found here.
Start the Kafka Environment
Download the latest Kafka version from here.
curl https://dlcdn.apache.org/kafka/3.2.0/kafka_2.13-3.2.0.tgz -o kafka_2.13-3.2.0.tgz
tar -xzf kafka_2.13-3.2.0.tgz
cd kafka_2.13-3.2.0
Run the following commands to start all the services in the correct order. Begin with the ZooKeeper service.
bin/zookeeper-server-start.sh config/zookeeper.properties
In another terminal session, start the Kafka broker service:
bin/kafka-server-start.sh config/server.properties
Once all the services have successfully launched, you will have a basic Kafka environment up and running.
Install the Plugin
Download the JAR file from here and navigate to the /libs
directory.
curl -L https://search.maven.org/remotecontent?filepath=org/mongodb/kafka/mongo-kafka-connect/1.7.0/mongo-kafka-connect-1.7.0-all.jar -o plugin/mongo-kafka-connect-1.7.0-all.jar
Edit config/connect-standalone.properties
and update the plugin.path
to point to the downloaded JAR file.
plugin.path=/home/ubuntu/kafka_2.13-3.2.0/libs/mongo-kafka-connect-1.7.0-all.jar
Create Configuration Properties
In the /config
folder, create a file named MongoSinkConnector.properties
.
name=mongo-sink
topics=quickstart.sampleData
connector.class=com.mongodb.kafka.connect.MongoSinkConnector
Message Types
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
Specific MongoDB Sink Connector Configuration
connection.url=mongodb://localhost:27017
database=quickstart
collection=topicData
change.data.capture.handler=com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler
In the /config
folder, create a file named MongoSourceConnector.properties
.
name=mongo-source
connector.class=com.mongodb.kafka.connect.MongoSourceConnector
Connection and Source Configuration
connection.uri=mongodb://localhost:27017
database=quickstart
collection=sampleData
Install MongoDB
Import the MongoDB public GPG Key by running the following command:
wget -qO - https://www.mongodb.org/static/pgp/server-5.0.asc | sudo apt-key add -
Create the MongoDB Source List
echo "deb [ arch=amd64,arm64 ] https://repo.mongodb.org/apt/ubuntu focal/mongodb-org/5.0 multiverse" | sudo tee /etc/apt/sources.list.d/mongodb-org-5.0.list
Update the Local Package Database
sudo apt-get update
Install MongoDB Packages
sudo apt-get install -y mongodb-org
If you encounter any errors related to unmet dependencies, fix them with the following commands:
echo "deb http://security.ubuntu.com/ubuntu impish-security main" | sudo tee /etc/apt/sources.list.d/impish-security.list
sudo apt-get update
sudo apt-get install libssl1.1
Verify MongoDB Status
Check that MongoDB has started successfully:
sudo systemctl status mongod
If it’s inactive and needs to restart, run:
sudo systemctl restart mongod
Start Kafka Connect
To start Kafka Connect, execute the following command:
bin/connect-standalone.sh config/connect-standalone.properties config/MongoSourceConnector.properties config/MongoSinkConnector.properties
Write Data to the Topic
Run the console producer client to write a few events into your topic. Each line you enter will result in a separate event being written to the topic.
$ bin/kafka-console-producer.sh --topic connect-test --bootstrap-server localhost:9092
This is my first event
This is my second event
Send Document Contents Through Your Connectors
To send the contents of a document through your connectors, insert a document into the MongoDB collection from which your source connector reads data. Use the following MongoDB shell commands:
use quickstart
db.sampleData.insertOne({"hello":"world"})
After inserting the document, verify that your connectors have processed the change by checking the topicData
collection.
db.topicData.find()
You should see output similar to the following:
[
{
"_id": ObjectId(...),
"hello": "world",
"travel": "MongoDB Kafka Connector"
}
]
Reference
For more information, visit the MongoDB Kafka Connector documentation.