利用Spark sql操作Hdfs資料與Mysql資料,sql視窗函式的使用
阿新 • • 發佈:2019-02-07
需求說明:
對熱門商品進行統計
根據商品的點選資料,統計出各個區域的銷量排行TOPK 產品
輸入:開始時間與結束時間
輸出:每個城市的銷量排行TOP K 產品
地區級別 | 地區名稱 | 產品名稱 | 點選量 | 產品型別 |
A | 西南片區 | 霧霾口罩 | 1000000 | 第三方 |
A | 西南片區 | 霧霾口罩 | 1000000 | 第三方 |
A | 西南片區 | 霧霾口罩 | 1000000 | 第三方 |
B | 華中地區 | 蘋果 | 1000 | 自營 |
B | 華中地區 | 蘋果 | 1000 | 自營 |
B | 華中地區 | 蘋果 | 1000 | 自營 |
涉及表:
使用者行為表(檔案日誌) city_id , product_id,點選量
地區表(mysql)格式如下:
產品表(mysql)格式如下:
使用 spark core 與spark sql實現
主要技術點: Spark sql操作Hdfs資料與Mysql資料,sql視窗函式的使用,dataFrame的使用
關於spark讀取mysql(地區表,產品表)程式碼如下:
/** *獲取mysql表資料,並註冊為spark臨時表 */ def loadMysqlData(): Unit = { //建立連線mysql連線 val jdbcOptions = Map("url" -> "jdbc:mysql://192.168.100.212:3306/hxh?user=root&password=123456", "dbtable" -> "areas") val reader = sqlContext.read.format("jdbc") val jdbcOptions2 = Map("url" -> "jdbc:mysql://192.168.100.212:3306/hxh?user=root&password=123456", "dbtable" -> "product") val reader2 = sqlContext.read.format("jdbc") //把查詢出來的表註冊為臨時表 reader.options(jdbcOptions).load().registerTempTable("spark_areas") reader2.options(jdbcOptions2).load().registerTempTable("spark_product") }
關於spark統計地區、點選量程式碼如下:
package com.hxh import org.apache.spark.sql.hive.HiveContext import org.apache.spark.{SparkConf, SparkContext} object UserAnalysis { val conf = new SparkConf().setAppName("test").setMaster("local[*]") val sparkContext = new SparkContext(conf) val sqlContext = new HiveContext(sparkContext) def main(args: Array[String]): Unit = { sqlContext.sql("use bigdata") sqlContext.sql("select * from t_pages_click ").registerTempTable("tPagesClick") loadMysqlData() areaNameCount() areaRowCount() sqlContext.sql("select areaLevel,areaName,productName,sumClick,extendName " + "from click_row_count " + "where numSum<=3 " + "order by areaLevel asc,sumClick desc" ).show(50) } def areaRowCount(): Unit ={ sqlContext.sql("select " + "CASE WHEN areaName IN ( '華北地區', '東北地區') THEN 'A' "+ " WHEN areaName IN ( '華東地區', '華中地區') THEN 'B' "+ " WHEN areaName IN ( '華南地區', '西南地區') THEN 'C' "+ "WHEN areaName IN ('西北地區') THEN 'D' "+ "ELSE'資料錯誤' END as areaLevel,areaName,productName," + "sumClick," + "Row_Number() OVER (PARTITION BY areaName order by sumClick DESC) AS numSum," + "if(extendInfo='1','自營','第三方') extendName "+ "from areaNameCount ").registerTempTable("click_row_count") } /** * 按地區統計點選量 */ def areaNameCount(): Unit ={ sqlContext.sql("select areas.area_name areaName," + "product.product_name productName,count(1) sumClick," + "product.extend_info extendInfo from tPagesClick " + "join spark_areas areas " + "on tPagesClick.city_id=areas.city_id " + "join spark_product product " + "on product.product_id=tPagesClick.click_product_id " + "group by areas.area_name,product.product_name,product.extend_info").registerTempTable("areaNameCount") }
結果執行結果如下: