1. 程式人生 > >flume+kafka+zookeeper+storm實時計算環境搭建(二)

flume+kafka+zookeeper+storm實時計算環境搭建(二)

搭建flume+kafka+storm環境

這裡,我的flume是採集mysql的資料再存入kafka,我用mysql作為我的source,記憶體memory作為channels,kafka作為sink,這個要藉助一個外掛source-ng-sql手機mysql的資料

環境準備:
1.flume1.8.0
2.kafka2.0.0
3.storm1.2.2
4.zookeeper3.4.13

flume
下載:
http://flume.apache.org/download.html
下載
apache-flume-1.8.0-bin.tar.gz
解壓
tar -zxvf ~/Desktop/apache-flume-1.8.0-bin.tar.gz
sudo mv apache-flume-1.8.0-bin flume1.8.0
mkdir -p bigdata
sudo mv flume1.8.0 /usr/local/bigdata
改一下擁有者
sudo chown bigdata -R storm:storm
修改flume的conf配置檔案
sudo cp flume-conf.properties.template flume.conf
sudo vim flume.conf

a1.sources=sql
a1.channels=mem
a1.sinks=kafka

1.sources.sql.type=org.keedio.flume.source.SQLSource
a1.sources.sql.hibernate.connection.url=jdbc:mysql://資料庫地址

a1.sources.sql.hibernate.connection.user=root
a1.sources.sql.hibernate.connection.password=123456
a1.sources.sql.hibernate.connection.autocommit=
true a1.sources.sql.hibernate.dialect=org.hibernate.dialect.MySQL5Dialect a1.sources.sql.hibernate.connection.driver_class=com.mysql.jdbc.Driver a1.sources.sql.run.query.delay=5000 a1.sources.sql.status.file.path=sql狀態檔案儲存路徑 a1.sources.sql.status.file.name=sql狀態檔名 # Custom query a1.sources.sql.start.from
=0 a1.sources.sql.custom.query=資料庫查詢語句 a1.sources.sql.batch.size=1000 a1.sources.sql.max.rows=1000 a1.sources.sql.hibernate.connection.provider_class=org.hibernate.connection.C3P0ConnectionProvider a1.sources.sql.hibernate.c3p0.min_size=1 a1.sources.sql.hibernate.c3p0.max_size=10 # use memory as the channels a1.channels.mem.type=memory a1.channels.mem.capacity=1000 a1.channels.mem.transactionCapacity=1000 a1.channels.mem.byteCapacityBufferPercentage=20 a1.channels.mem.byteCapacity=800000 a1.sinks.kafka.type=org.apache.flume.sink.kafka.KafkaSink a1.sinks.kafka.topic=建立的kafka topic名 a1.sinks.kafka.brokerList=master:9092,slave1:9092,slave2:9092 a1.sinks.kafka.requireAcks=1 a1.sinks.kafka.batchSize=20 a1.sinks.kafka.partitionIdHeader=0 a1.sources.sql.channels=mem a1.sinks.kafka.channel=mem

配置環境變數
sudo vim /etc/profile
在這裡插入圖片描述

flume-ng-sql-source外掛
下載
https://github.com/xiaomoo/oeasy
在flume的目錄下建立目錄plugins.d
在這裡插入圖片描述
目錄結構如上
lib放編譯好的flume-ng-sql-source.jar
libext放mysql的jar

zookeeper
下載:
http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.4.13/
解壓:
tar -zxvf zookeeper-3.4.13.tar.gz
sudo mv zookeeper-3.4.13 zookeeper3.4.13
sudo zookeeper3.4.13 /usr/local/bigdata
修改配置檔案
sudo vim zoo.cfg
單節點配置

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataLogDir=/usr/local/bigdata/zookeeper3.4.13/logs
dataDir=/usr/local/bigdata/zookeeper3.4.13/data
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
maxClientCnxns=60
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1

然後新建data目錄,單節點為standalone模式

zookeeper叢集配置

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataLogDir=/usr/local/bigdata/zookeeper3.4.13/logs
dataDir=/usr/local/bigdata/zookeeper3.4.13/data
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
maxClientCnxns=60
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
server.1=0.0.0.0:2888:3888
server.2=slave1:2888:3888
server.3=slave2:2888:3888

新建data目錄
mkdir -p data
cat 1 > myid
通過ssh將zookeeper目錄傳送給slave1,slave2
scp -t zookeeper3.4.13 [email protected]:/usr/local/bigdata/
scp -t zookeeper3.4.13 [email protected]:/usr/local/bigdata/
分別將slave1,slave2新建data目錄
slave1:
cat 2 > myid
修改zoo.cfg

server.1=master:2888:3888
server.2=0.0.0.0:2888:3888
server.3=slave2:2888:3888

slave2:
cat 3 > myid
修改zoo.cfg

server.1=master:2888:3888
server.2=slave1:2888:3888
server.3=0.0.0.0:2888:3888

zookeeper叢集會選擇一個leader,其他為follower
配置環境變數
在這裡插入圖片描述

通過bin/zkServer.sh start啟動zookeeper
bin/zkServer.sh stop關閉zookeeper
bin/zkServer.sh status檢視叢集狀態

kafka
下載:
http://kafka.apache.org/downloads
解壓:
tar -zxvf kafka_2.11-2.0.0.tgz
sudo mv kafka_2.11-2.0.0 kafka2.0.0
sudo mv kafka2.0.0 /usr/local/bigdata
cd /usr/local/bigdata/kafka2.0.0/config
修改kafka配置server.properties
sudo vim server.properties

broker.id=0
host.name=本機的ip地址

listeners=PLAINTEXT://192.168.5.211:9092
log.dirs=/usr/local/bigdata/kafka2.0.0/logs

num.network.threads=3
num.io.threads=8

socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400

socket.request.max.bytes=104857600
delete.topic.enable=true

log.dirs=/usr/local/bigdata/kafka2.0.0/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1

offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000

zookeeper.connect=你的zookeeper地址,可以使單節點也可以是叢集
zookeeper.connection.timeout.ms=6000
roup.initial.rebalance.delay.ms=0

開啟zookeeper.properties
sudo vim zookeeper.properties

dataDir=你的zookeeper的data目錄路徑
clientPort=2181

將kafka傳送到slave1,slave2
scp -t kafka2.0.0 [email protected]:/usr/local/bigdata/
scp -t kafka2.0.0 [email protected]:/usr/local/bigdata/
分別修改slave1,slave2的kafka中config的server.properties
slave1:
將broker.id改為1
slave2:
將broker.id改為2

配置環境變數
在這裡片描述

storm
下載
http://storm.apache.org/downloads.html
解壓
tar -zxvf apache-storm-1.2.2.tar.gz
sudo mv apache-storm-1.2.2 storm1.2.2
sudo mv storm1.2.2 /usr/local/bigdata

修改配置檔案,在conf目錄下
sudo vim storm.yaml

storm.local.dir: "/usr/local/bigdata/storm1.2.2/localdir"
storm.zookeeper.servers:
     - zookeeper地址
     
nimbus.seeds: ["master"]
ui.host: 0.0.0.0
ui.port: 8093
supervisor.slots.ports:
     - 6700
     - 6701
     - 6702
     - 6703

drpc.servers:
     - "master"
     - "slave1"
     - "slave2"

將storm傳送給slave1,slave2
scp -t storm1.2.2 [email protected]:/usr/local/bigdata/
scp -t storm1.2.2 [email protected]:/usr/local/bigdata/

配置環境變數
在這裡插入圖片描述

測試環境
首先啟動zookeeper
$ZK_HOME/bin/zkServer.sh start(我用的單節點)
$ZK_HOME/bin/zkServer.sh status檢視狀態
在這裡插入圖片描述

啟動kafka叢集
$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties &

啟動storm
master:
storm nimbus &
storm ui &

slave1:
storm supervisor &

slave2:
storm supervisor &

jps命令檢視啟動程序
master:
在這裡插入圖片描述

slave1:
在這裡插入圖片描述

slave2:
在這裡插入圖片描述

至此,環境就搭建完成了。

相關推薦

flume+kafka+zookeeper+storm實時計算環境搭建()

搭建flume+kafka+storm環境 這裡,我的flume是採集mysql的資料再存入kafka,我用mysql作為我的source,記憶體memory作為channels,kafka作為sink,這個要藉助一個外掛source-ng-sql手機mysql

Storm之——Storm+Kafka+Flume+Zookeeper+MySQL實現資料實時分析(環境搭建篇)

Storm之——Storm+Kafka+Flume+Zookeeper+MySQL實現資料實時分析(環境搭建篇) 2018年03月04日 23:05:29 冰 河 閱讀數:1602更多 所屬專欄: Hadoop生態 版權宣告:本文為博主原創文章,未經博主允許不得轉載。 https:/

Hadoop+Flume+Kafka+Zookeeper叢集環境搭建(一)

Hadoop+Flume+Kafka+Zookeeper叢集環境搭建 1.部署基礎條件 1.1 硬體條件 IP hostname 192.168.100.103 mater 192.168.100.104 flumekafka1 192.168.1

zookeeperkafka安裝部署及java環境搭建

3.4 項目目錄 tin bytes result zxvf util ise cat 1. ZooKeeper安裝部署 本文在一臺機器上模擬3個zk server的集群安裝。 1.1. 創建目錄、解壓 cd /usr/ #創建項目目錄 mkdir zookeepe

Flume+Kafka+Zookeeper搭建大數據日誌采集框架

flume+kafka+zookeeper1. JDK的安裝 參考jdk的安裝,此處略。2. 安裝Zookeeper 參考我的Zookeeper安裝教程中的“完全分布式”部分。3. 安裝Kafka 參考我的Kafka安裝教程中的“完全分布式搭建”部分。4. 安裝Flume 參考

ZooKeeper偽集群環境搭建

val exp war spa statistic style cal post server 1.從官網下載程序包。 2.解壓。 [dev@localhost

Flink+kafka實現Wordcount實時計算

lis AS -c 安裝包 pos localhost 行動 private 配置信息 1. Flink Flink介紹: Flink 是一個針對流數據和批數據的分布式處理引擎。它主要是由 Java 代碼實現。目前主要還是依靠開源社區的貢獻而發展。對 Flink 而言,其所

Kafka+Zookeeper+Storm的docker化

docker kafka storm Kafka+Zookeeper+Storm的docker化 因為項目需要,需要把kafka、zookeeper、storm等服務組件docker化,在借鑒相關的開源dockerfile文件後,終於完成,以此記錄。 1.首先是建立基礎的Linux鏡像,針對本

Kafka:ZK+Kafka+Spark Streaming集群環境搭建(三)安裝spark2.2.1

node word clas 執行 選擇 dir clust 用戶名 uil 如何配置centos虛擬機請參考《Kafka:ZK+Kafka+Spark Streaming集群環境搭建(一)VMW安裝四臺CentOS,並實現本機與它們能交互,虛擬機內部實現可以上網。》 如

Kafka:ZK+Kafka+Spark Streaming集群環境搭建(九)安裝kafka_2.11-1.1.0

itl CA blog tor line cat pre PE atan 如何搭建配置centos虛擬機請參考《Kafka:ZK+Kafka+Spark Streaming集群環境搭建(一)VMW安裝四臺CentOS,並實現本機與它們能交互,虛擬機內部實現可以上網。》 如

Kafka:ZK+Kafka+Spark Streaming集群環境搭建)VMW安裝四臺CentOS,並實現本機與它們能交互,虛擬機內部實現可以上網。

centos 失敗 sco pan html top n 而且 div href Centos7出現異常:Failed to start LSB: Bring up/down networking. 按照《Kafka:ZK+Kafka+Spark Streaming集群環

Kafka:ZK+Kafka+Spark Streaming集群環境搭建(十三)定義一個avro schema使用comsumer發送avro字符流,producer接受avro字符流並解析

finall ges records ring ack i++ 一個 lan cde 參考《在Kafka中使用Avro編碼消息:Consumer篇》、《在Kafka中使用Avro編碼消息:Producter篇》 pom.xml <depende

Kafka:ZK+Kafka+Spark Streaming集群環境搭建(十七)待整理

lan post -a 客戶端 客戶 struct bsp www get redis按照正則批量刪除key redis客戶端--jedis 在Spark結構化流readStream、writeStream 輸入輸出,及過程ETL Spark Structur

Kafka:ZK+Kafka+Spark Streaming集群環境搭建(十九)待整理

set dstream 搭建 details 編程指南 .com .cn csdn read redis按照正則批量刪除key redis客戶端--jedis 在Spark結構化流readStream、writeStream 輸入輸出,及過程ETL Spark St

Kafka:ZK+Kafka+Spark Streaming集群環境搭建十三)Structured Streaming遇到問題:Set(TopicName-0) are gone. Some data may have been missed

ack loss set div top 過程 pan check use 事情經過:之前該topic(M_A)已經存在,而且正常消費了一段時間,後來刪除了topic(M_A),重新創建了topic(M-B),程序使用新創建的topic(M-B)進行實時統計操作,執行過程中

Kafka:ZK+Kafka+Spark Streaming集群環境搭建十五)Structured Streaming:同一個topic中包含一組數據的多個部分,按照key它們拼接為一條記錄(以及遇到的問題)。

eas array 記錄 splay span ack timestamp b- each 需求: 目前kafka的topic上有一批數據,這些數據被分配到9個不同的partition中(就是發布時key:{m1,m2,m3,m4...m9},value:{records

Kafka訊息佇列介紹、環境搭建及應用:C#實現消費者-生產者訂閱

一:kafka介紹 kafka(官網地址:http://kafka.apache.org)是一種高吞吐量的分散式釋出訂閱的訊息佇列系統,具有高效能和高吞吐率。 1.1 術語介紹 Broker Kafka叢集包含一個或多個伺服器,這種伺服器被稱為broker

Storm 偽分散式環境搭建

前提:安裝ZooKeeper     tar -zxvf apache-storm-1.0.3.tar.gz -C ~/training/     設定環境變數:vi ~/.bash_profile  

Storm實時計算網站的UV

(1)建立帶IP地址的資料來源GenerateData package storm.uv; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.u

Storm實時計算網站pv

PVBolt1進行多併發區域性彙總,PVSumbolt單執行緒進行全域性彙總 (1)建立資料輸入源PVSpout package storm.test; import java.io.BufferedReader; import java.io.FileInputStream;