使用Scala 讀寫MySQL 資料給Spark任務執行
阿新 • • 發佈:2019-01-08
初學Spark,需要從資料庫讀取資料給Spark執行,然後將執行結果返回給資料庫。
由於Spark是基於 Scala 開發的,剛開始完全摸不到頭腦,本來是用java將資料庫資料寫到一個檔案,然後spark去讀這個檔案然後執行,又突然想到,既然scala寫的spark程式,何不用scala來直接讀取資料庫給spark任務執行,然後返回給資料庫就行了啊,還繞那麼多彎幹嘛。。
雖然不會寫Scala,但是會寫 Java啊,於是照葫蘆畫瓢,連線JDBC,將資料存到集合然後計算即可。
開始用的List, 但是Scala的List型別是不可變的,immutable,
A class for immutable linked lists representing ordered collections of elements of type.
於是看看有沒有可變集合,找到一個scala.collection.mutable.ArrayBuffer,可用。
package wordcount
import org.apache.spark.{SparkConf, SparkContext}
import java.sql.{Connection, DriverManager, ResultSet};
import scala.collection.mutable.ArrayBuffer
/**
* Created by kay on 2017/12/13.
*/
object WordCountLocal {
// Change to Your Database Config
val conn_str = "jdbc:mysql://localhost:3306/mydb?user=root&password=root";
def main(args: Array[String]): Unit = {
// Load the driver
classOf[com.mysql.jdbc.Driver]
// Setup the connection
val conn = DriverManager.getConnection(conn_str)
val arrayBuffer = ArrayBuffer[String]()
try {
// Configure to be Read Only
val statement = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
// Execute Query
val rs = statement.executeQuery("SELECT name FROM table1")
// Iterate Over ResultSet
while (rs.next) {
arrayBuffer+=rs.getString("name")
}
}
finally {
conn.close
}
execute(arrayBuffer)
}
def execute(arrayBuffer:ArrayBuffer[String]): Unit ={
// Load the driver
classOf[com.mysql.jdbc.Driver]
val conf = new SparkConf().setAppName("testWord").setMaster("local[4]")
val sc = new SparkContext(conf)
val stringRDD=sc.parallelize(arrayBuffer)
val conn1 = DriverManager.getConnection(conn_str)
try {
for ((key,count) <- stringRDD.map((_, 1)).reduceByKey(_ + _).collect()) {
val prep = conn1.prepareStatement("INSERT INTO result (word, count) VALUES (?, ?) ")
prep.setString(1, key)
prep.setInt(2, count)
prep.executeUpdate
}
}finally {
conn1.close()
}
}
}