使用SparkSQL實現根據ip地址計算歸屬地一
阿新 • • 發佈:2018-12-16
之前使用過RDD實現過這個案例,如果不知道可以去參考我寫的博文,這裡要實現的就是在之前那個基礎上進行修改的,具體實現思路就是將ip地址規則和訪問日誌檔案中的資料進行整理然後轉換成DataFrame之後註冊成表,然後寫Sql語句進行Join操作
具體程式碼實現:
package cn.ysjh0014.SparkSql import cn.ysjh0014.TestIp import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} object IpLocationSQL { def main(args: Array[String]): Unit = { val session: SparkSession = SparkSession.builder().appName("IpLocationSQL").master("local[4]").getOrCreate() //取到HDFS中的ip規則 import session.implicits._ val rulesLines: Dataset[String] = session.read.textFile(args(0)) //整理ip規則資料 val Dept: Dataset[(Long, Long, String)] = rulesLines.map(line => { val fields = line.split("[|]") val startNum = fields(2).toLong val endNum = fields(3).toLong val province = fields(6) (startNum, endNum, province) }) val ipRulesRdd: DataFrame = Dept.toDF("startNum","endNum","province") //建立RDD,讀取訪問日誌 val accessLines: Dataset[String] = session.read.textFile(args(1)) //整理資料 val result : Dataset[Long] = accessLines.map(log => { //將log日誌的每一行進行切分 val fields = log.split("[|]") val ip = fields(1) //將ip轉換成十進位制 val ipNum = TestIp.ip2Long(ip) ipNum }) val DFResult: DataFrame = result.toDF("ipNum") //建立檢視 val table1: Unit = ipRulesRdd.createTempView("table1") val table2: Unit = DFResult.createTempView("table2") //寫SQL val ys: DataFrame = session.sql("SELECT province,count(*) counts FROM table1 JOIN table2 ON (ipNum>=startNum AND ipNum<=endNum) GROUP BY province ORDER BY counts DESC") ys.show() session.stop() } }
執行結果:
你執行上邊程式碼的時候會發現很慢,資料量大的時候會更慢,這是因為進行查詢的時候是一條一條資料進行比較的,而沒有使用之前的二分查詢,所以效率不高