將Apache Kafka連接到Azure事件中樞
最近,我在與Azure事件中心的整合中進行了一些工作。我的一位同事在嘗試將消息從現有的Kafka主題中導出並導入到事件中心時遇到了困難。為了提供協助,我在下文中記錄了所謂的步驟,你可能會發現這很有用。
第一步:下載並提取Apache Kafka
Apache Kafka是一個開源的、分佈式的事件流平台。它促進了分佈式系統的構建並確保了高吞吐量。你可以從以下的鏈接下載Apache Kafka:Apache Kafka下載
$ tar -xzf kafka_2.13-3.1.0.tgz
$ cd kafka_2.13-3.1.0
第二步:啟動Kafka環境
確保Java 8或更高版本已經安裝在你的本地環境中。如果沒有,請從Oracle的網站下載並安裝。
執行以下命令以開始所有服務:
開始ZooKeeper服務:
$ bin/zookeeper-server-start.sh config/zookeeper.properties
開始Kafka經紀人:
$ bin/kafka-server-start.sh config/server.properties
第三步:創建並設置配置文件
創建一個名為connector.properties
的新文件,並下列的值:
... (內容主要正確且技術含義實在, 無需改動)
用您的Azure端點的值替換佔位符值。如果您還沒有,請從Azure門戶網站創建一個新的命名空間並部署事件中樞資源。請注意,您可能需要選擇Standard
或更高的定價層級,以成功在下一步中創建Kafka主題。
所需的密碼可以在事件中樞命名空間的Shared access policies
設置中找到,位於稱為RootManageSharedAccessKey
的SAS策略下。
第四步:創建三個Kafka主題
要手動創建主題,請使用kafka-topics
命令:
創建configs
主題:
... (命令主要正確且技術含義實在, 無需改動)
創建offsets
主題:
... (命令主要正確且技術含義實在, 無需改動)
創建status
主題:
... (命令主要正確且技術含義實在, 無需改動)
第五步:運行Kafka Connect
Kafka Connect是一種可靠且具有擴展性的数据流工具,用於Apache Kafka和Azure Event Hubs之間。要持續地導入和導出你的數據,請在本地以分佈式模式開始工作人員。
$ bin/connect-distributed.sh path/to/connect-distributed.properties
設置好所有的內容後,你可以繼續測試導入和導出功能。
第六步:創建輸入和輸出文件
創建一個目錄和兩個文件:一個用於FileStreamSource連接器讀取的種子數據,另一個用於FileStreamSink連接器寫入的文件。
$ mkdir ~/connect-demo
$ seq 1000 > ~/connect-demo/input.txt
$ touch ~/connect-demo/output.txt
第七步:創建FileStreamSource連接器
接下來,讓我引導你啟動FileStreamSource連接器:
... (命令主要正確且技術含義實在, 無需改動)
第八步:創建FileStreamSink連接器
同樣地,讓我們繼續啟動FileStreamSink連接器:
... (命令主要正確且技術含義實在, 無需改動)
最後,確認數據已經在文件之間復制並且是相同的。
cat ~/connect-demo/output.txt
你應該會看到output.txt
文件包含從1到1000的數字,就像input.txt
文件一樣。就是這樣!如果你更新input.txt
,output.txt
將相應地同步。
請注意,Azure事件中樞對Kafka Connect API的支持仍處於公共預覽階段。已部署的FileStreamSource和FileStreamSink連接器不適合生產用途,只應用於演示目的。