spark的JDBC連線池(Scala版)
阿新 • • 發佈:2018-12-21
一個Scala版的連線池,並在使用Spark Streaming進行Word Count時,把每批資料都存到mySql中
import java.sql.{Connection, DriverManager}
import java.util
object JDBCConnectePools02 {
private val max = 10 //設定連線最大數
private val ConnectionNum = 10 //設定 每次可以獲取幾個Connection
private var conNum = 0//連線數
private val pool = new util.LinkedList [Connection]() //連線池
def getDriver() : Unit = { //載入Driver
//載入
if(conNum < max && pool.isEmpty){
Class.forName("com.mysql.jdbc.Driver")
}else if(conNum>=max && pool.isEmpty){
print("當前暫無可用Connection")
Thread.sleep(2000)
getDriver()
}
}
def getConn (): Connection ={
if(pool.isEmpty){
getDriver()
for(i <- 1 to ConnectionNum){ //建立10個連線
val conn = DriverManager.getConnection("jdbc:mysql://hadoop01:3306/updatewordcount","root","root")
pool.push(conn) // 把連線放到連線池中,push是LinkedList中的方法
conNum += 1
}
}
val conn: Connection = pool.pop()//從執行緒池所在LinkedList中彈出一個Connection,pop 是LinkedList的方法
conn //返回一個Connection
}
def returnConn( conn :Connection): Unit ={ //還連線
pool.push(conn)
}
}
一個簡單的使用,使用sparkStreaming,盡心wordCount,每次把結果放到MySql 中
import java.sql.{Connection, PreparedStatement}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Duration, StreamingContext}
object JDBCWordCont02 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("wc").setMaster("local[2]")
//新建一個StreamingContext,每個5s是一個批次
val ssc = new StreamingContext(conf,new Duration(5000))
//接受hadoop01主機的 8888埠的資料
val data: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop01",8888)
//進行切分壓平
val split: DStream[String] = data.flatMap(_.split(" "))
//單詞和1組合
val wordAndOne: DStream[(String, Int)] = split.map((_,1))
//對rdd進行遍歷,想要使用foreachPartition 需要foreachRDD ,
//對裡面的rdd進行操作,DStream中沒有foreachpartition方法,
//如果直接使用foreach方法不好,會大量的去連線,還連線,對效能有影響
wordAndOne.foreachRDD(rdd=>{
//對RDD中的資料進行聚合
val reduced: RDD[(String, Int)] = rdd.reduceByKey(_+_)
reduced.foreachPartition(item =>{
//獲取連線
val conn: Connection = JDBCConnectePools02.getConn()
for(one <- item){ //把聚合後的資料存到mysql 中
val pstm: PreparedStatement = conn.prepareStatement("insert into wordcount(word,count) values(?,?)")
pstm.setString(1,one._1)
pstm.setInt(2,one._2)
pstm.executeUpdate()
}
//還連線
JDBCConnectePools02.returnConn(conn)
})
})
ssc.start()
ssc.awaitTermination()
}
}