Apache Kafka is an open-source publish/subscribe messaging system. Kafka Connect, a component of Apache Kafka, addresses the challenge of linking Apache Kafka with various datastores, including MongoDB. Kafka Connect offers:

  • A fault-tolerant runtime for transferring data to and from datastores
  • A framework that enables the Apache Kafka community to share solutions for connecting Apache Kafka to different datastores

In this post, we’ll focus on using MongoDB as a data lake. The MongoDB Kafka sink connector is a Kafka Connect connector that reads data from Apache Kafka and writes it to MongoDB. The official MongoDB Kafka Connector can be found here.

Start the Kafka Environment

Download the latest Kafka version from here.

curl https://dlcdn.apache.org/kafka/3.2.0/kafka_2.13-3.2.0.tgz -o kafka_2.13-3.2.0.tgz
tar -xzf kafka_2.13-3.2.0.tgz
cd kafka_2.13-3.2.0

Run the following commands to start all the services in the correct order. Begin with the ZooKeeper service.

bin/zookeeper-server-start.sh config/zookeeper.properties

In another terminal session, start the Kafka broker service:

bin/kafka-server-start.sh config/server.properties

Once all the services have successfully launched, you will have a basic Kafka environment up and running.

Install the Plugin

Download the JAR file from here and navigate to the /libs directory.

curl -L https://search.maven.org/remotecontent?filepath=org/mongodb/kafka/mongo-kafka-connect/1.7.0/mongo-kafka-connect-1.7.0-all.jar -o plugin/mongo-kafka-connect-1.7.0-all.jar

Edit config/connect-standalone.properties and update the plugin.path to point to the downloaded JAR file.

plugin.path=/home/ubuntu/kafka_2.13-3.2.0/libs/mongo-kafka-connect-1.7.0-all.jar

Create Configuration Properties

In the /config folder, create a file named MongoSinkConnector.properties.

name=mongo-sink
topics=quickstart.sampleData
connector.class=com.mongodb.kafka.connect.MongoSinkConnector

Message Types

key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

Specific MongoDB Sink Connector Configuration

connection.url=mongodb://localhost:27017
database=quickstart
collection=topicData
change.data.capture.handler=com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler

In the /config folder, create a file named MongoSourceConnector.properties.

name=mongo-source
connector.class=com.mongodb.kafka.connect.MongoSourceConnector

Connection and Source Configuration

connection.uri=mongodb://localhost:27017
database=quickstart
collection=sampleData

Install MongoDB

Import the MongoDB public GPG Key by running the following command:

wget -qO - https://www.mongodb.org/static/pgp/server-5.0.asc | sudo apt-key add -

Create the MongoDB Source List

echo "deb [ arch=amd64,arm64 ] https://repo.mongodb.org/apt/ubuntu focal/mongodb-org/5.0 multiverse" | sudo tee /etc/apt/sources.list.d/mongodb-org-5.0.list

Update the Local Package Database

sudo apt-get update

Install MongoDB Packages

sudo apt-get install -y mongodb-org

If you encounter any errors related to unmet dependencies, fix them with the following commands:

echo "deb http://security.ubuntu.com/ubuntu impish-security main" | sudo tee /etc/apt/sources.list.d/impish-security.list
sudo apt-get update
sudo apt-get install libssl1.1

Verify MongoDB Status

Check that MongoDB has started successfully:

sudo systemctl status mongod

If it’s inactive and needs to restart, run:

sudo systemctl restart mongod

Start Kafka Connect

To start Kafka Connect, execute the following command:

bin/connect-standalone.sh config/connect-standalone.properties config/MongoSourceConnector.properties config/MongoSinkConnector.properties

Write Data to the Topic

Run the console producer client to write a few events into your topic. Each line you enter will result in a separate event being written to the topic.

$ bin/kafka-console-producer.sh --topic connect-test --bootstrap-server localhost:9092
This is my first event
This is my second event

Send Document Contents Through Your Connectors

To send the contents of a document through your connectors, insert a document into the MongoDB collection from which your source connector reads data. Use the following MongoDB shell commands:

use quickstart
db.sampleData.insertOne({"hello":"world"})

After inserting the document, verify that your connectors have processed the change by checking the topicData collection.

db.topicData.find()

You should see output similar to the following:

[
  {
    "_id": ObjectId(...),
    "hello": "world",
    "travel": "MongoDB Kafka Connector"
  }
]

Reference

For more information, visit the MongoDB Kafka Connector documentation.