將Apache Kafka連接到Azure事件中樞


最近,我在與Azure事件中心的整合中進行了一些工作。我的一位同事在嘗試將消息從現有的Kafka主題中導出並導入到事件中心時遇到了困難。為了提供協助,我在下文中記錄了所謂的步驟,你可能會發現這很有用。

第一步:下載並提取Apache Kafka

Apache Kafka是一個開源的、分佈式的事件流平台。它促進了分佈式系統的構建並確保了高吞吐量。你可以從以下的鏈接下載Apache Kafka:Apache Kafka下載

$ tar -xzf kafka_2.13-3.1.0.tgz
$ cd kafka_2.13-3.1.0

第二步:啟動Kafka環境

確保Java 8或更高版本已經安裝在你的本地環境中。如果沒有,請從Oracle的網站下載並安裝。

執行以下命令以開始所有服務:

開始ZooKeeper服務:

$ bin/zookeeper-server-start.sh config/zookeeper.properties

開始Kafka經紀人:

$ bin/kafka-server-start.sh config/server.properties

第三步:創建並設置配置文件

創建一個名為connector.properties的新文件,並下列的值:

... (內容主要正確且技術含義實在, 無需改動) 

用您的Azure端點的值替換佔位符值。如果您還沒有,請從Azure門戶網站創建一個新的命名空間並部署事件中樞資源。請注意,您可能需要選擇Standard或更高的定價層級,以成功在下一步中創建Kafka主題。

所需的密碼可以在事件中樞命名空間的Shared access policies設置中找到,位於稱為RootManageSharedAccessKey的SAS策略下。

第四步:創建三個Kafka主題

要手動創建主題,請使用kafka-topics命令:

創建configs主題:

... (命令主要正確且技術含義實在, 無需改動)

創建offsets主題:

... (命令主要正確且技術含義實在, 無需改動)

創建status主題:

... (命令主要正確且技術含義實在, 無需改動)

第五步:運行Kafka Connect

Kafka Connect是一種可靠且具有擴展性的数据流工具,用於Apache Kafka和Azure Event Hubs之間。要持續地導入和導出你的數據,請在本地以分佈式模式開始工作人員。

$ bin/connect-distributed.sh path/to/connect-distributed.properties

設置好所有的內容後,你可以繼續測試導入和導出功能。

第六步:創建輸入和輸出文件

創建一個目錄和兩個文件:一個用於FileStreamSource連接器讀取的種子數據,另一個用於FileStreamSink連接器寫入的文件。

$ mkdir ~/connect-demo
$ seq 1000 > ~/connect-demo/input.txt
$ touch ~/connect-demo/output.txt

第七步:創建FileStreamSource連接器

接下來,讓我引導你啟動FileStreamSource連接器:

... (命令主要正確且技術含義實在, 無需改動)

第八步:創建FileStreamSink連接器

同樣地,讓我們繼續啟動FileStreamSink連接器:

... (命令主要正確且技術含義實在, 無需改動)

最後,確認數據已經在文件之間復制並且是相同的。

cat ~/connect-demo/output.txt

你應該會看到output.txt文件包含從1到1000的數字,就像input.txt文件一樣。就是這樣!如果你更新input.txtoutput.txt將相應地同步。

請注意,Azure事件中樞對Kafka Connect API的支持仍處於公共預覽階段。已部署的FileStreamSource和FileStreamSink連接器不適合生產用途,只應用於演示目的。