1. 程式人生 > >sparkStreaming裡面使用文字分析模型(2.0.1)

sparkStreaming裡面使用文字分析模型(2.0.1)

如果使用模型的建立話請參考另一篇部落格建模地址
功能:接收來自kafka的資料,資料是一篇文章,來判斷文章的型別,把判斷的結果一併儲存到hbase,並把文章建立索引(沒有程式碼只有一個空殼,可以自己實現,以後有機會了可能會補上),
程式碼實現:

package spark.mllib
import org.apache.spark.ml.PipelineModel
import org.apache.spark.ml.feature.{HashingTF, IDF, LabeledPoint, Tokenizer}
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache
.spark.SparkConf import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream import org.apache.spark.SparkConf import org.apache
.spark.streaming.api.java.JavaPairReceiverInputDStream.fromReceiverInputDStream import org.apache.spark.rdd.RDD import org.apache.spark.ml.classification.NaiveBayesModel import org.omg.CORBA_2_3.portable.OutputStream import java.io.FileOutputStream class UseModel1 { } object UseModel1{ //流程程式碼 def main(args: Array[String]): Unit = { val Array(zkQuorum, group, topics, numThreads) =Array("192.168.10.199:2181"
,"order","order","2"); val conf = new SparkConf().setAppName("useModel").setMaster("local[4]"); val ssc = getStreamingContext(conf, 10); val dstreams = getKafkaDstream(ssc, topics, zkQuorum, group, numThreads); val dstream = dstreams.inputDStream.map(_._2); dstream.persist() //測試 dstream.print() //如果能判斷不為空就更好了 dstream.foreachRDD(rdd =>everyRDD(rdd)) ssc.start() ssc.awaitTermination() } //得到StreamingContext def getStreamingContext(conf:SparkConf,secend:Int):StreamingContext = { return new StreamingContext(conf, Seconds(secend)) } //得到sparkSession def getSparkSession(conf:SparkConf): SparkSession = { val spark = SparkSession.builder() .config(conf) .config("spark.sql.warehouse.dir", "warehouse/dir") .getOrCreate() return spark; } //得到kafkaDStream def getKafkaDstream(ssc:StreamingContext,topics:String,zkQuorum:String,group:String,numThreads:String):JavaPairReceiverInputDStream[String,String] ={ ssc.checkpoint("directory") val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap; val stream = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap) return stream; } //檔案儲存測試 def savaString(str:String):Unit={ val out = new FileOutputStream("D:\\decstop\\file.txt",true); out.write(str.getBytes) out.flush() out.close() } //每一個rdd做動作 def everyRDD(rdd:RDD[String]){ val sameModel = NaiveBayesModel.load("resoult") val spark = getSparkSession(rdd.context.getConf) import spark.implicits._ val rddDF = rdd.map { line => (1,line) }.toDF("label","text").persist() //rddDF.show() val tokenizer = new Tokenizer().setInputCol("text").setOutputCol("words") val tokenizerRDD = tokenizer.transform(rddDF) //tokenizerRDD.show(false) val hashingTF = new HashingTF().setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(100) val hashingTFRDD = hashingTF.transform(tokenizerRDD) val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features") val idfModel = idf.fit(hashingTFRDD) val rescaledData = idfModel.transform(hashingTFRDD) //rescaledData.show(false) //轉化為貝葉斯需要的格式 val useDataRdd = rescaledData.select($"label", $"features").map{ case Row(label:Int , features:Vector) => LabeledPoint(label.toDouble, Vectors.dense(features.toArray)) } val predictions = sameModel.transform(useDataRdd) predictions.persist() //predictions.show(false) //參照下面可以實現各種的邏輯,可以把下面的儲存,建索引都加上 predictions.select($"label",$"prediction").foreach { x => savaString((""+x.getAs("label")+" "+x.getAs("prediction")+"\n\r")) } //測試 predictions.createOrReplaceTempView("prediction") rddDF.createOrReplaceTempView("atical") //spark.sql("select p.label,p.prediction,a.text from prediction p,atical a where p.label=a.label").select(col, cols) } //簡歷索引 主要的建立索引的有hbase_rowKay(time) aothor title article def buiderIndex(){} //儲存到hbase def savaToHbase(){ } //傳送到下一個kafka 傳送的資料 time 正輿情數量 負面輿情數量 百分比 是否報警 def sendToKafka(){ } }