1. 程式人生 > >kafka系列(七)使用Kafka-Connect匯入匯出資料

kafka系列(七)使用Kafka-Connect匯入匯出資料

摘要

本文主要內容是介紹如何使用kafka-connect進行匯入匯出資料,文章內容來自於kafka官方文件,對官方文件中一些內容作了簡要補充。

簡介

向console中寫入資料然後再寫回到console是非常方便的,但是你可能想從其他的資料來源寫入資料,然後將資料匯出到kafka以外的其他系統,kafka connect 為很多系統提供匯入匯出資料功能,而不用寫任何程式碼

1、準備資料來源

>mkdir /data
> echo -e "foo\nbar" > /data/test.txt

(此處需要注意test.txt檔案所在的路徑,如果沒有/data 資料夾需要先建立他)

2、修改配置檔案

之後我們需要修改kafka配置檔案 connect-file-source.properties,檔案內容相當簡單

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=/data/test.txt
topic=connect-test

注意修改上述程式碼中的file欄位的值為我們剛才建立的test.txt的檔案路徑。

修改connect-file-sink.properties配置檔案

name=local-file-sink
connector.class=FileStreamSink
tasks.
max=1 file=/data/test.sink.txt topics=connect-test

上述程式碼中file=/data/test.sink.txt指定匯出檔案的路徑和檔名,topics=connect-test指定topic名稱

3、執行Kafka-Connect

配置完成後我們可以執行下面命令進行

> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

在輸出一系列資訊後我們可以檢視輸出檔案

> cat /data/test.sink.txt
foo
bar

檢視topic中內容

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
...

我們向test.txt中新加入一行

> echo "Another line" >> test.txt

你可以看到輸出檔案以及topic中也增加了新的一行。