1. 程式人生 > >scala實戰之spark讀取mysql資料表並存放到mysql庫中程式設計例項

scala實戰之spark讀取mysql資料表並存放到mysql庫中程式設計例項

今天簡單講解一下應用spark1.5.2相關讀取mysql資料到DataFrame的介面以及將DF資料存放到mysql中介面實現例項

同樣我們的程式設計開發環境是不需要安裝spark的,但是需要一臺安裝了mysql的伺服器,我這裡直接在本機安裝了一個mysql,還有就是scala的程式設計環境。

注意本次使用的spark版本是1.5.2,相關引用的包請參考下圖:

先看程式碼吧

package JDBC_MySql

import java.util.Properties

import org.apache.spark.sql.SaveMode
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by zhoubh on 2016/7/20.
  */
object mysqlDB {

  case class zbh_test(day_id:String, prvnce_id:String,pv_cnts:Int)

  def main(args: Array[String]) {


    val conf = new SparkConf().setAppName("mysql").setMaster("local[4]")
    val sc = new SparkContext(conf)
    //sc.addJar("D:\\workspace\\sparkApp\\lib\\mysql-connector-java-5.0.8-bin.jar")
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)



     //定義mysql資訊
    val jdbcDF = sqlContext.read.format("jdbc").options(
      Map("url"->"jdbc:mysql://localhost:3306/db_ldjs",
    "dbtable"->"(select imei,region,city,company,name from tb_user_imei) as some_alias",
    "driver"->"com.mysql.jdbc.Driver",
    "user"-> "root",
    //"partitionColumn"->"day_id",
    "lowerBound"->"0",
    "upperBound"-> "1000",
    //"numPartitions"->"2",
    "fetchSize"->"100",
    "password"->"123456")).load()


    jdbcDF.collect().take(20).foreach(println) //終端列印DF中的資料。
    //jdbcDF.rdd.saveAsTextFile("C:/Users/zhoubh/Downloads/abi_sum")
    val url="jdbc:mysql://localhost:3306/db_ldjs"
    val prop=new Properties()
    prop.setProperty("user","root")
    prop.setProperty("password","123456")
    jdbcDF.write.mode(SaveMode.Overwrite).jdbc(url,"zfs_test",prop) //寫入資料庫db_ldjs的表 zfs_test 中
    //jdbcDF.write.mode(SaveMode.Append).jdbc(url,"zbh_test",prop)  //你會發現SaveMode改成Append依然無濟於事,表依然會被重建,為了解決這個問題,後期會另開部落格講解

     //org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils.saveTable(jdbcDF,url,"zbh_test",prop)
////    #然後進行groupby 操作,獲取資料集合
//    val abi_sum_area = abi_sum.groupBy("date_time", "area_name")
//
////    #計算數目,並根據數目進行降序排序
//    val sorted = abi_sum_area.count().orderBy("count")
//
////    #顯示前10條
//    sorted.show(10)
//
////    #儲存到檔案(這裡會有很多分片檔案。。。)
//    sorted.rdd.saveAsTextFile("C:/Users/zhoubh/Downloads/sparktest/flight_top")
//
//
////    #儲存到mysql表裡
//    //sorted.write.jdbc(url,"table_name",prop)

  }
}

下面來看看執行結果啥樣:


資料庫結果如下:

通過這段程式碼可以實現從mysql關係型資料庫中直接讀取資料轉化成DataFrame參與到sparksql的分析當中這個意義是非常重大的,因為我們日常應用sparksql進行資料分析時經常會用到一些配置表,而這些配置定義表都是存在關係型資料庫中,所以以後不用擔心了。

另外這裡還實現了DataFrame結果回寫到mysql資料庫中,雖然官方的spark原始碼的寫入有些奇葩,設定的寫死模式overwriter,也就是說你確定寫入的表,他會重新建立,然後匯入資料,這個用起來很不爽,後面部落格將講解如何改寫原始碼,我要怎麼寫入就怎麼寫入。(這個意義也很重大,以後分析的結果就可以直接放mysql中,直接對外提供報表,哇 贊

最後感覺華哥的程式碼和講解(一個個默默耕耘大資料多年的人)

相關推薦

scala實戰spark讀取mysql資料並存放到mysql程式設計例項

今天簡單講解一下應用spark1.5.2相關讀取mysql資料到DataFrame的介面以及將DF資料存放到mysql中介面實現例項 同樣我們的程式設計開發環境是不需要安裝spark的,但是需要一臺安裝了mysql的伺服器,我這裡直接在本機安裝了一個mysql,還有就是sc

scala實戰spark使用者線上時長和登入次數統計例項

接觸spark後就開始學習scala語言了,因為有一點python和java的基礎學習起來還行,今天在這裡把我工作中應用scala程式設計統計分析使用者行為日誌的例項和大家分析一下,我這裡主要講一下使用者的線上時長統計和登入次數統計演算法實現過程。 第一步 程式設計環境:首

Navicat工具匯出Mysql資料結構到Excel檔案

------------------------------------------------------------------------ 前言     專案中資料庫設計已經完成,現在到了程式碼實現的階段,資料庫中沒有資料,測試看不出效果,領導要求添點資料,單個

spark sql 查詢hive寫入到PG

clas sel append nec pro 增加 word postgres erro import java.sql.DriverManager import java.util.Properties import com.zhaopin.tools.{DateU

Python查詢MySQL資料提取mysql欄位名轉化成DataFrame

今天覆習一下,用python操作mysql以及excel,並且作為橋樑,連線mysql,excel. 那麼既然用到了python操作資料就不免需要用到dataframe做資料分析,本文主要一個麻煩點在於從mysql 中獲取到的資料沒有欄位名, 下面直接上程式碼: import pymys

python讀取sqlserver資料儲存到csv

# -*- coding: utf-8 -*- """ @use:查詢17.11-18.1,18.6-18.8的PM2.5資料,匯出到csv """ import pymssql import xlwt import datetime from xml.dom.minidom import

Python爬蟲實戰 :批量採集股票資料儲存到Excel

小編說:通過本文,讀者可以掌握分析網頁的技巧、Python編寫網路程式的方法、Excel的操作,以及正則表示式的使用。這些都是爬蟲專案中必備的知識和技能。本文選自《Python帶我起飛》。 例項描述:通過編寫爬蟲,將指定日期時段內的全部上市公司股票資料爬取下來,並按照股

Mysql約束以及修改資料

unsigned:這裡約束的意思是沒符號,不能為負數,只能是整數 建立外來鍵約束的表格 提示: 給表新增外來鍵約束的時候,外來鍵列和參照列必須的型別必須一致,就好比上圖的中的兩個表格的型別都是int,但是當我們的父表的型別是smallint,而子表的型別是bigint,這樣也是會

Spark RDD 操作實戰檔案讀取

/1、本地檔案讀取 val local_file_1 = sc.textFile("/home/hadoop/sp.txt") val local_file_2 = sc.textFile("file://home/hadoop/sp.txt") //2、當前目錄下的檔案 val file1 = sc

資料分析技術與實戰 Spark Streaming

Spark是基於記憶體的大資料綜合處理引擎,具有優秀的作業排程機制和快速的分散式計算能力,使其能夠更加高效地進行迭代計算,因此Spark能夠在一定程度上實現大資料的流式處理。 隨著資訊科技的迅猛發展,資料量呈現出爆炸式增長趨勢,資料的種類與變化速度也遠遠超出人們的想象,因此人們對大資料處理提出了

程式碼 | Spark讀取mongoDB資料寫入Hive普通和分割槽

版本: spark 2.2.0  hive 1.1.0  scala 2.11.8  hadoop-2.6.0-cdh5.7.0  jdk 1.8  MongoDB 3.6.4 一 原始資料及Hive表  MongoDB資

資料分析技術與實戰Spark Streaming(內含福利)

↑ 點選上方藍字關注我們,和小夥伴一起聊技術! 隨著資訊科技的迅猛發展,資料量呈現出爆炸式增長趨勢,資料的種類與變化速度也遠遠超出人們的想象,因此人們對大資料處理提出了更高的要求,越來越多的領域迫切需要大資料技術來解決領域內的關鍵問題。在一些特定的領域中(例如金融、災害預警等),時間就是金錢、時間可能就

spark讀取redis資料(互動式,scala單機版,java單機版)

互動式 第一步:向redis中新增資料 第二步:將jedis jar包放入~/lib目錄下,開啟spark服務 第三步:通過spark-shell讀取redis資料,並做相應處理

spark streaming小實戰kafka讀取與儲存

本次小實戰主要介紹一下spark streaming如何讀取kafka資料涉及理論部分在這就不多說了,自己也剛入門先說下需求待處理日誌格式為ouMrq2r_aU1mtKRTmQclGo1UzY,3251210381,2018/11/29 13:46,上海,上海,210.2.2

mongo-spark-讀取不同的資料和寫入不同的

mongo-spark-讀取不同的庫資料和寫入不同的庫中 package com.example.app import com.mongodb.spark.config.{ReadConfig, WriteConfig} import com.mongodb.spark.sql._ object

spark讀取hive資料-java

需求:將hive中的資料讀取出來,寫入es中。 環境:spark 2.0.2 1. SparkSession裡設定enableHiveSupport() SparkConf conf = new SparkConf().setAppName("appName").setMast

java匯出mysql資料的結構生成word文件

使用sql查詢表的結構是比較簡單,其實這裡難就是難在匯出結構到word文件中。。。,使用poi-tl程式碼也簡單 一、首先jdbc工具類,這個不多說了 public class SqlUtils { private static String url = "jdbc:mysql://lo

[Scala]學習筆記六——讀取外部資料

1.讀取檔案及網路資料 object ReadFileApp extends App { val file=scala.io.Source.fromFile("E:\\data\\hello.txt") //讀取指定檔案 //一行一行讀取檔案 def readLine

spark讀取es資料

spark-2.0.2 scala-2.11.8 <!-- https://mvnrepository.com/artifact/org.webjars.npm/spark-md5 --> <dependency> <groupId>org.apa

wamp mysql資料儲存中文資料查詢後變成 ’???’

這個問題弄了好長時間,總是會變成??? 每次一查詢就是下面這樣,改了好多東西也不對; 網上查了很多辦法,現在給大家總結一下; 開啟wamp中mysql的配置檔案my.ini 找到下面的位置進行更改 由於網上很多辦法都是在【mysqld】下新增charac