1. 程式人生 > >實時計算實踐(spark streaming+kafka+hdfs)

實時計算實踐(spark streaming+kafka+hdfs)

一直在研究大資料處理方向的各種技術和工具,但沒有真正用於實踐中,恰好目前風控措施轉向,需要實施“線上+離線”的雙線防控措施,遂在調研查閱相關文件後,決定從零開始構造(資料探勘轉工程開發,思維轉變是關鍵),這裡面涉及的幾個階段慢慢說來:

  1. 專案開發環境選擇(scala2.10.4 IDE +maven3.3)
    最開始的選擇是直接在eclipse環境上安裝scala的IDE外掛,但實踐證明此種方式很不穩定,受限於網路的限制基本沒能成功,索性直接利用scala提供的IDE開發環境,這視為邁出第一步,雖然IDE提供了構建scala project的模板,但是後期證明沒有合適的版本管理工具,很難在包依賴及部署上做到得心應手,雖然網上普遍推薦SBT在該scala專案管理上的作用,但是有過java開發經驗的還是首推maven工具,為編譯打包提供極大的方便,同時實現在windows環境下編譯執行spark程式碼,不過裡面涉及很多坑後面再術。

  2. 實際計算平臺(spark 1.5.2)
    網上關於storm與spark ,誰在流式計算方式更具有優勢的討論甚多,這裡不做比較,本人結合已有的平臺環境加開發工具選擇了 spark streaming作為實時計算的計算引擎,另外一個原因在於spark在於機器學習支撐上的強有力地位,方便後日擴充套件,此外spark streaming也提供了針對各種資料來源的高階API,方便從不同資料來源中獲取DStearm,同時支援寫入各種儲存介質中。

  3. 資料來源(kafka)
    kafka作為一個分散式釋出-訂閱訊息系統,可以利用已有的API實時的從topic中pull資料,滿足實時計算的要求(前期嘗試從hdfs中讀取資料做計算並非有效),同時streaming-kafka的API友好的提供了資料來源無縫的讀取方式,一切都為達到實時計算的目的

  4. 資料去向(HDFS)

利用上述提到的各種平臺工具等, 舉個簡單案例,程式碼如下:

package com.kafka.test

import java.util.HashMap
import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer, ProducerRecord}
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf

object KafkaWordCounts {
    def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
    return Some(newValues.sum
+runningCount.getOrElse(0)) } def main(args: Array[String]) { //System.setProperty("hadoop.home.dir", "E:\\software\\hadoop-2.5.2"); val zkQuorum = "101.271.251.161:2181" val group = "zjz-test-waf8" val topics = "monitor_firewall_accesslog_pre" val numThreads="2" if (args.length >= 4){ val zkQuorum = args(0) val group = args(1) val topics = args(2) val numThreads=args(3) } val sparkConf = new SparkConf().setAppName("KafkaWordCount") val ssc = new StreamingContext(sparkConf, Seconds(10)) ssc.checkpoint("checkpoint") val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) val words = lines.flatMap(_.split("\\s+")) val wordCounts = words.map(x => (x, 1)).updateStateByKey[Int](updateFunction(_,_)) wordCounts.print() wordCounts.saveAsTextFiles("sstest/kafka_waf_streaming","txt") ssc.start() ssc.awaitTermination() } }

相關推薦

實時計算實踐spark streaming+kafka+hdfs

一直在研究大資料處理方向的各種技術和工具,但沒有真正用於實踐中,恰好目前風控措施轉向,需要實施“線上+離線”的雙線防控措施,遂在調研查閱相關文件後,決定從零開始構造(資料探勘轉工程開發,思維轉變是關鍵),這裡面涉及的幾個階段慢慢說來: 專案開發環境選擇(sc

圖平行計算實踐spark streaming+graphx+kafka

上回利用transform方法實現對於資料流的圖平行計算過程,今天繼續改進完善已有的計算機制,加入updateStateByKey和checkpoint機制,保障圖平行計算在故障中仍能保證零誤差。 import kafka.serializer.Strin

大資料學習之路97-kafka直連方式spark streaming 整合kafka 0.10版本

我們之前SparkStreaming整合Kafka的時候用的是傻瓜式的方式-----createStream,但是這種方式的效率很低。而且在kafka 0.10版本之後就不再提供了。 接下來我們使用Kafka直連的方式,這種方式其實是呼叫Kafka底層的消費資料的API,我們知道,越底層的東

基於Python的Spark Streaming+Kafka程式設計實踐及調優總結

說明Spark Streaming的原理說明的文章很多,這裡不做介紹。本文主要介紹使用Kafka作為資料來源的程式設計模型,編碼實踐,以及一些優化說明演示環境Spark:1.6Kafka:kafka_2.11-0.9.0.1實現語言:Python程式設計模型目前Spark S

基於Python的Spark Streaming+Kafka程式設計實踐

說明 Spark Streaming的原理說明的文章很多,這裡不做介紹。本文主要介紹使用Kafka作為資料來源的程式設計模型,編碼實踐,以及一些優化說明 演示環境 Spark:1.6 Kafka:kafka_2.11-0.9.0.1 實現語言:P

Spark Streaming+kafka訂單實時統計實現

package com.lm.sparkLearning.orderexmaple; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.

spark----基於Python的Spark Streaming+Kafka程式設計實踐

來源:http://blog.csdn.net/eric_sunah/article/details/54096057?utm_source=tuicool&utm_medium=referral 說明 Spark Streaming的原理說明的文章很多,這裡不

【六】Spark Streaming接入HDFS的資料Local模式使用Scala語言

Spark Streaming接入HDFS的資料模擬一個wordcount的功能,結果列印到控制檯,使用Local模式,使用Scala語言。 專案目錄 pom.xml <project xmlns="http://maven.apache.org/POM/4.

Spark Streaming+Kafka實現訂單數和GMV的實時更新

前言 在雙十一這樣的節日,很多電商都會在大螢幕上顯示實時的訂單總量和GMV總額。由於訂單數量巨大,不可能每隔一秒就到資料庫裡進行一次SQL的資料統計,這時候就需要用到流式計算。本文將介紹一個簡單的Demo,講解如何通過Spark Stream消費來自Kafka中訂單資訊,

Spark Streaming + Kafka + Opencv + Face Recognizer + HDFS Sequence File + Mysql

<pre name="code" class="java">/** * Created by lwc on 6/17/16. */ import java.io.*; import java.sql.*; import java.util.*; impo

簡單實時計算方案kafka+flink+druid/es

    最近在從事實時方面的工作,主要涉及到資料處理、加工及視覺化,在採坑的過程中總結出一套比較簡單的實時計算方案,供大家參考。主要涉及到幾個元件,kafka,flink,redis,druid和es。相信大家對以上幾個元件都比較熟悉了,這裡就不細說了。我們從一個簡單的需求,

下載基於大數據技術推薦系統實戰教程(Spark ML Spark Streaming Kafka Hadoop Mahout Flume Sqoop Redis)

大數據技術推薦系統 推薦系統實戰 地址:http://pan.baidu.com/s/1c2tOtwc 密碼:yn2r82課高清完整版,轉一播放碼。互聯網行業是大數據應用最前沿的陣地,目前主流的大數據技術,包括 hadoop,spark等,全部來自於一線互聯網公司。從應用角度講,大數據在互聯網領域主

流式大資料計算實踐2----Hadoop叢集和Zookeeper

一、前言 1、上一文搭建好了Hadoop單機模式,這一文繼續搭建Hadoop叢集 二、搭建Hadoop叢集 1、根據上文的流程得到兩臺單機模式的機器,並保證兩臺單機模式正常啟動,記得第二臺機器core-site.xml內的fs.defaultFS引數值要改成本機的來啟動,啟動完畢後再改回來 2、清空資

流式大數據計算實踐2----Hadoop集群和Zookeeper

nts 環境變量 技術 文件創建 con mon orm rm2 sam 一、前言 1、上一文搭建好了Hadoop單機模式,這一文繼續搭建Hadoop集群 二、搭建Hadoop集群 1、根據上文的流程得到兩臺單機模式的機器,並保證兩臺單機模式正常啟動,記得第二臺機器c

流式大資料計算實踐3----高可用的Hadoop叢集

流式大資料計算實踐(3)----高可用的Hadoop叢集 一、前言 1、上文中我們已經搭建好了Hadoop和Zookeeper的叢集,這一文來將Hadoop叢集變得高可用 2、由於Hadoop叢集是主從節點的模式,如果叢集中的namenode主節點掛掉,那麼叢集就會癱瘓,所以我們要改造成

Spark Streaming+Kafka spark 寫入 kafka

目錄 前言 在WeTest輿情專案中,需要對每天千萬級的遊戲評論資訊進行詞頻統計,在生產者一端,我們將資料按照每天的拉取時間存入了Kafka當中,而在消費者一端,我們利用了spark streaming從kafka中不斷拉取資料進行詞頻統計。本文首先對spark stre

Spark踩坑記——Spark Streaming+Kafka

目錄     前言     Spark streaming接收Kafka資料         基於Receiver的方式         直接讀取方式     Spark向kafka中寫入資料     Spark streaming+Kafka應用     Spark str

Spark Streaming+Kafka

前言 在WeTest輿情專案中,需要對每天千萬級的遊戲評論資訊進行詞頻統計,在生產者一端,我們將資料按照每天的拉取時間存入了Kafka當中,而在消費者一端,我們利用了spark streaming從kafka中不斷拉取資料進行詞頻統計。本文首先對spark streamin

Spark-Streaming+kafka實現零丟失

原文連結 kafka和sparkstreaming是兩種適配很好的技術,兩者都是分散式系統適用於處理大量資料,兩者對於實現資料的零丟失並沒有提供現成的解決方案,所以這篇文章就是希望可以幫助你完成這個目標 注:使用Spark Streaming的Direct St

流式大資料計算實踐7----Hive安裝

一、前言 1、這一文學習使用Hive 二、Hive介紹與安裝 Hive介紹:Hive是基於Hadoop的一個數據倉庫工具,可以通過HQL語句(類似SQL)來操作HDFS上面的資料,其原理就是將使用者寫的HQL語句轉換成MapReduce任務去執行,這樣不用開發者去寫繁瑣的MapReduce程式,直接編寫