1. 程式人生 > >使用Scala 讀寫MySQL 資料給Spark任務執行

使用Scala 讀寫MySQL 資料給Spark任務執行

初學Spark,需要從資料庫讀取資料給Spark執行,然後將執行結果返回給資料庫。

由於Spark是基於 Scala 開發的,剛開始完全摸不到頭腦,本來是用java將資料庫資料寫到一個檔案,然後spark去讀這個檔案然後執行,又突然想到,既然scala寫的spark程式,何不用scala來直接讀取資料庫給spark任務執行,然後返回給資料庫就行了啊,還繞那麼多彎幹嘛。。

雖然不會寫Scala,但是會寫 Java啊,於是照葫蘆畫瓢,連線JDBC,將資料存到集合然後計算即可。

開始用的List, 但是Scala的List型別是不可變的,immutable,
A class for immutable linked lists representing ordered collections of elements of type.

於是看看有沒有可變集合,找到一個scala.collection.mutable.ArrayBuffer,可用。

package wordcount

import org.apache.spark.{SparkConf, SparkContext}
import java.sql.{Connection, DriverManager, ResultSet};
import scala.collection.mutable.ArrayBuffer

/**
  * Created by kay on 2017/12/13.
  */
object WordCountLocal {

  // Change to Your Database Config
val conn_str = "jdbc:mysql://localhost:3306/mydb?user=root&password=root"; def main(args: Array[String]): Unit = { // Load the driver classOf[com.mysql.jdbc.Driver] // Setup the connection val conn = DriverManager.getConnection(conn_str) val arrayBuffer = ArrayBuffer[String]() try
{ // Configure to be Read Only val statement = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY) // Execute Query val rs = statement.executeQuery("SELECT name FROM table1") // Iterate Over ResultSet while (rs.next) { arrayBuffer+=rs.getString("name") } } finally { conn.close } execute(arrayBuffer) } def execute(arrayBuffer:ArrayBuffer[String]): Unit ={ // Load the driver classOf[com.mysql.jdbc.Driver] val conf = new SparkConf().setAppName("testWord").setMaster("local[4]") val sc = new SparkContext(conf) val stringRDD=sc.parallelize(arrayBuffer) val conn1 = DriverManager.getConnection(conn_str) try { for ((key,count) <- stringRDD.map((_, 1)).reduceByKey(_ + _).collect()) { val prep = conn1.prepareStatement("INSERT INTO result (word, count) VALUES (?, ?) ") prep.setString(1, key) prep.setInt(2, count) prep.executeUpdate } }finally { conn1.close() } } }