MongoDB Kafka 連接器
Apache Kafka 是一種開源的發布/訂閱消息系統。Kafka Connect,Apache Kafka的一個元件,面對將Apache Kafka與各種數據存儲連接的挑戰,包括 MongoDB。Kafka Connect 提供:
- 傳輸數據到數據存儲的容錯運行時
- Apache Kafka社區共享連接 Apache Kafka 到不同數據存儲解決方案的框架
在這篇文章中,我們將重點介紹如何將 MongoDB 作為數據湖。 MongoDB Kafka 接收連接器是從 Apache Kafka 讀取數據並將其寫入 MongoDB 的 Kafka Connect 連接器。官方的 MongoDB Kafka 連接器可以在這裏找到。
開始 Kafka 環境
從這裡下載最新版的 Kafka。
curl https://dlcdn.apache.org/kafka/3.2.0/kafka_2.13-3.2.0.tgz -o kafka_2.13-3.2.0.tgz
tar -xzf kafka_2.13-3.2.0.tgz
cd kafka_2.13-3.2.0
按照正確的順序運行以下命令來開始所有的服務。首先開始 ZooKeeper 服務。
bin/zookeeper-server-start.sh config/zookeeper.properties
在另一個終端會話中,開始 Kafka 代理服務:
bin/kafka-server-start.sh config/server.properties
所有服務成功啟動後,您將會擁有一個運行中的 Kafka 基礎環境。
安裝插件
從這裡下載 JAR 文件,並導航至 /libs
目錄。
curl -L https://search.maven.org/remotecontent?filepath=org/mongodb/kafka/mongo-kafka-connect/1.7.0/mongo-kafka-connect-1.7.0-all.jar -o plugin/mongo-kafka-connect-1.7.0-all.jar
編輯 config/connect-standalone.properties
,並將 plugin.path
指向下載的 JAR 文件。
plugin.path=/home/ubuntu/kafka_2.13-3.2.0/libs/mongo-kafka-connect-1.7.0-all.jar
創建配置屬性
在 /config
文件夾中,創建一個名為 MongoSinkConnector.properties
的文件。
name=mongo-sink
topics=quickstart.sampleData
connector.class=com.mongodb.kafka.connect.MongoSinkConnector
消息類型
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
關於 MongoDB Sink 連接器的具體配置
connection.url=mongodb://localhost:27017
database=quickstart
collection=topicData
change.data.capture.handler=com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler
在 /config
文件夾中,創建一個名為 MongoSourceConnector.properties
的文件。
name=mongo-source
connector.class=com.mongodb.kafka.connect.MongoSourceConnector
連接和源配置
connection.uri=mongodb://localhost:27017
database=quickstart
collection=sampleData
安裝 MongoDB
運行以下命令導入 MongoDB 的公開 GPG 鑰匙:
wget -qO - https://www.mongodb.org/static/pgp/server-5.0.asc | sudo apt-key add -
創建 MongoDB 源列表
echo "deb [ arch=amd64,arm64 ] https://repo.mongodb.org/apt/ubuntu focal/mongodb-org/5.0 multiverse" | sudo tee /etc/apt/sources.list.d/mongodb-org-5.0.list
更新本地軟件包數據庫
sudo apt-get update
安裝 MongoDB 套件
sudo apt-get install -y mongodb-org
如果遇到任何與未滿足的依賴項相關的錯誤,使用以下命令修復它們:
echo "deb http://security.ubuntu.com/ubuntu impish-security main" | sudo tee /etc/apt/sources.list.d/impish-security.list
sudo apt-get update
sudo apt-get install libssl1.1
驗證 MongoDB 狀態
檢查 MongoDB 是否已成功啟動:
sudo systemctl status mongod
如果它是非活動的並需要重新啟動,運行:
sudo systemctl restart mongod
開始 Kafka Connect
要開始 Kafka Connect,執行以下命令:
bin/connect-standalone.sh config/connect-standalone.properties config/MongoSourceConnector.properties config/MongoSinkConnector.properties
將數據寫入 Topic
運行控制台生產者客戶端以將幾個事件寫入您的 Topic。您輸入的每行將導致一個單獨的事件被寫入 Topic。
$ bin/kafka-console-producer.sh --topic connect-test --bootstrap-server localhost:9092
This is my first event
This is my second event
通過您的連接器發送文件內容
要將文檔的內容通過您的連接器發送,插入一個文檔到您的源連接器從中讀取數據的 MongoDB 集合。使用以下 MongoDB shell 命令:
use quickstart
db.sampleData.insertOne({"hello":"world"})
插入文檔後,通過檢查 topicData
集合來驗證您的連接器是否已將變更處理。
db.topicData.find()
您應該會看到以下類似的輸出:
[
{
"_id": ObjectId(...),
"hello": "world",
"travel": "MongoDB Kafka Connector"
}
]
參考
欲了解更多訊息,請參觀 MongoDB Kafka 連接器文檔。