1. 程式人生 > >使用SparkSQL實現根據ip地址計算歸屬地一

使用SparkSQL實現根據ip地址計算歸屬地一

之前使用過RDD實現過這個案例,如果不知道可以去參考我寫的博文,這裡要實現的就是在之前那個基礎上進行修改的,具體實現思路就是將ip地址規則和訪問日誌檔案中的資料進行整理然後轉換成DataFrame之後註冊成表,然後寫Sql語句進行Join操作

具體程式碼實現:

package cn.ysjh0014.SparkSql


import cn.ysjh0014.TestIp
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

object IpLocationSQL {

  def main(args: Array[String]): Unit = {

    val session: SparkSession = SparkSession.builder().appName("IpLocationSQL").master("local[4]").getOrCreate()

    //取到HDFS中的ip規則
    import session.implicits._
    val rulesLines: Dataset[String] = session.read.textFile(args(0))
    //整理ip規則資料
    val Dept: Dataset[(Long, Long, String)] = rulesLines.map(line => {
      val fields = line.split("[|]")
      val startNum = fields(2).toLong
      val endNum = fields(3).toLong
      val province = fields(6)
      (startNum, endNum, province)
    })
    val ipRulesRdd: DataFrame = Dept.toDF("startNum","endNum","province")


    //建立RDD,讀取訪問日誌
     val accessLines: Dataset[String] = session.read.textFile(args(1))

    //整理資料
    val result : Dataset[Long] = accessLines.map(log => {
      //將log日誌的每一行進行切分
      val fields = log.split("[|]")
      val ip = fields(1)
      //將ip轉換成十進位制
      val ipNum = TestIp.ip2Long(ip)
      ipNum
    })

    val DFResult: DataFrame = result.toDF("ipNum")

    //建立檢視
    val table1: Unit = ipRulesRdd.createTempView("table1")
    val table2: Unit = DFResult.createTempView("table2")

    //寫SQL
    val ys: DataFrame = session.sql("SELECT province,count(*) counts FROM table1 JOIN table2 ON (ipNum>=startNum AND ipNum<=endNum) GROUP BY province ORDER BY counts DESC")

    ys.show()

    session.stop()

  }
}

執行結果:

你執行上邊程式碼的時候會發現很慢,資料量大的時候會更慢,這是因為進行查詢的時候是一條一條資料進行比較的,而沒有使用之前的二分查詢,所以效率不高