基於SparkSQL的網站日誌分析實戰
基於SparkSQL的網站日誌分析實戰
使用者行為日誌概述
使用者行為日誌:使用者每次訪問網站時所有的行為資料(訪問、瀏覽、搜尋、點選...)
使用者行為軌跡、流量日誌
為什麼要記錄使用者訪問行為日誌
網站頁面的訪問量
網站的黏性
推薦
2.使用者行為日誌生成渠道
Nginx
Ajax
3.使用者行為日誌內容
日誌資料內容:
1)訪問的系統屬性: 作業系統、瀏覽器等等
2)訪問特徵:點選的url、從哪個url跳轉過來的(referer)、頁面上的停留時間等
3)訪問資訊:session_id、訪問ip(訪問城市)等
日誌資訊如下所示:
2013-05-19 13:00:00 http://www.taobao.com/17/?tracker_u=1624169&type=1 B58W48U4WKZCJ5D1T3Z9ZY88RU7QA7B1 http://hao.360.cn/ 1.196.34.243
4.使用者行為日誌分析的意義
網站的眼睛(訪問者來自什麼地方,找什麼東西,那些頁面最受歡迎,訪問者的入口地址是什麼等)
網站的神經(網站佈局是否合理,導航層次是否清晰,功能是否存在問題,轉換路徑是否靠譜)
網站的大腦(如何分析目標,如何分配廣告預算(廣告推廣))
離線資料處理架構
資料處理流程
1)資料採集
Flume: web日誌寫入到HDFS
2)資料清洗
髒資料
Spark、Hive、MapReduce 或者是其他的一些分散式計算框架
清洗完之後的資料可以存放在HDFS(Hive/Spark SQL)
3)資料處理
按照我們的需要進行相應業務的統計和分析
Spark、Hive、MapReduce 或者是其他的一些分散式計算框架
4)處理結果入庫
結果可以存放到RDBMS、NoSQL
5)資料的視覺化
通過圖形化展示的方式展現出來:餅圖、柱狀圖、地圖、折線圖
ECharts、HUE、Zeppelin
離線資料處理架構
專案需求
需求一:統計imooc主站最受歡迎的課程/手記的Top N訪問次數
需求二:按地市統計imooc主站最受歡迎的Top N課程
(1).根據IP地址提取出城市資訊
(2).視窗函式在Spark SQL中的使用
按流量統計imooc主站最受歡迎的Top N課程
功能實現
網站主站日誌介紹
訪問時間
訪問URL
訪問過程耗費流量
訪問IP地址
資料清洗
原始日誌
10.100.0.1 - - [10/Nov/2016:00:01:02 +0800] "HEAD / HTTP/1.1" 301 0 "117.121.101.40" "-" - "curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.16.2.3 Basic ECC zlib/1.2.3 libidn/1.18 libssh2/1.4.2" "-" - - - 0.000
第一次清洗後(訪問時間 主站URL耗費的流量ip地址)
2017-05-11 15:07:17 http://www.imooc.com/video/14322 245 202.96.134.133
2017-05-11 06:52:31 http://www.imooc.com/article/17891 535 222.129.235.182
2017-05-11 18:46:43 http://www.imooc.com/article/17898 807 218.75.35.226
程式碼實現參見gitee地址【https://gitee.com/robei/SparkSQLProject/blob/master/src/main/scala/com/imooc/log/SparkStatFormatJob.scala】
第二次資料清洗
輸入:訪問時間、訪問URL、耗費的流量、訪問IP地址資訊
輸出:URL、cmsType(video/article)、cmsId(編號)、流量、ip、城市資訊、訪問時間、天
程式碼實現參見gitee地址【https://gitee.com/robei/SparkSQLProject/blob/master/src/main/scala/com/imooc/log/SparkStatCleanJob.scala】
根據ip地址解析城市資訊
使用github上已有的開源專案
1)git clone https://github.com/wzhe06/ipdatabase.git
2)編譯下載的專案:mvn clean package -DskipTests
3)安裝jar包到自己的maven倉庫
mvn install:install-file -Dfile=~/source/ipdatabase/target/ipdatabase-1.0-SNAPSHOT.jar -DgroupId=com.ggstar -DartifactId=ipdatabase -Dversion=1.0 -Dpackaging=jar
資料清洗儲存到目標地址()
【https://gitee.com/robei/SparkSQLProject/blob/e60788c9e0f3c2ffc446d9aa8acaa5a66ac006fc/src/main/scala/com/imooc/log/SparkStatCleanJob.scala】
需求一的實現:統計主站最受歡迎的課程/手記的Top N訪問次數
/**
* 需求一:主站最受歡迎的TopN課程統計
*
* @param spark
* @param cleanDF
*/
def videoAccessTopNStat(spark: SparkSession, cleanDF: DataFrame, day:String): Unit = {
//------------------使用DataFrame API完成統計操作--------------------------------------------
/* import spark.implicits._
val videoAccessTopNDF = cleanDF.filter($"day" === day && $"cmsType" === "video")
.groupBy("day","cmsId").agg(count("cmsId").as("times")).orderBy($"times".desc)*/
// videoAccessTopNDF.printSchema()
/**
* root
* |-- day: string (nullable = true)
* |-- cmsId: long (nullable = true)
* |-- times: long (nullable = false)
*/
// videoAccessTopNDF.show(10,false)
/**
* +--------+-----+------+
* |day |cmsId|times |
* +--------+-----+------+
* |20170511|14540|111027|
* |20170511|4000 |55734 |
* |20170511|14704|55701 |
* |20170511|14390|55683 |
* |20170511|14623|55621 |
* |20170511|4600 |55501 |
* |20170511|4500 |55366 |
* |20170511|14322|55102 |
* +--------+-----+------+
*/
//-------------------------使用SQL API完成操作-------------------------
cleanDF.createOrReplaceTempView("access_logs")
//建立臨時表 access_logs
val videoAccessTopNDF = spark.sql("select day,cmsId,count(1) as times from access_logs" +
" where day="+day+" and cmsType='video'" +
" group by day,cmsId order by times desc")
videoAccessTopNDF.show(10, false)
/**
* +--------+-----+------+
* |day |cmsId|times |
* +--------+-----+------+
* |20170511|14540|111027|
* |20170511|4000 |55734 |
* |20170511|14704|55701 |
* |20170511|14390|55683 |
* |20170511|14623|55621 |
* |20170511|4600 |55501 |
* |20170511|4500 |55366 |
* |20170511|14322|55102 |
* +--------+-----+------+
*/
//-------------------將統計結果寫入資料庫-------------------
try {
videoAccessTopNDF.foreachPartition(partitionOfRecords => {
val list = new ListBuffer[DayVideoAccessStat]
partitionOfRecords.foreach(info => {
val day = info.getAs[String]("day")
val cmsId = info.getAs[Long]("cmsId")
val times = info.getAs[Long]("times")
list.append(DayVideoAccessStat(day, cmsId, times))
})
StatDAO.insertDayVideoAccessTopN(list)
})
}catch {
case e:Exception => e.printStackTrace()
}
/**
* 在mysql中建立day_video_access_topn_stat,主站最受歡迎的Top N課程
* create table day_video_access_topn_stat (
* day varchar(8) not null,
* cms_id bigint(10) not null,
* times bigint(10) not null,
* primary key (day, cms_id)
* );
*/
}
需求二:按地市統計主站最受歡迎的Top N課程
/**
* 需求二:按地市統計主站最受歡迎的Top N課程
* @param spark
* @param cleanDF
*/
def cityAccessTopSata(spark: SparkSession, cleanDF: DataFrame, day:String): Unit = {
//------------------使用DataFrame API完成統計操作--------------------------------------------
import spark.implicits._
val cityAccessTopNDF = cleanDF.filter($"day" === day && $"cmsType" === "video")
.groupBy("day","city","cmsId").agg(count("cmsId").as("times")).orderBy($"times".desc)
// cityAccessTopNDF.printSchema()
/**
* root
* |-- day: string (nullable = true)
* |-- city: string (nullable = true)
* |-- cmsId: long (nullable = true)
* |-- times: long (nullable = false)
*/
// cityAccessTopNDF.show(false)
/**
* +--------+----+-----+-----+
* |day |city|cmsId|times|
* +--------+----+-----+-----+
* |20170511|浙江省 |14540|22435|
* |20170511|北京市 |14540|22270|
* |20170511|安徽省 |14540|22149|
* |20170511|廣東省 |14540|22115|
* |20170511|上海市 |14540|22058|
* |20170511|北京市 |4600 |11271|
* |20170511|安徽省 |14390|11229|
* |20170511|廣東省 |14623|11226|
* |20170511|上海市 |14704|11219|
* |20170511|廣東省 |14704|11216|
* |20170511|廣東省 |4600 |11215|
* |20170511|上海市 |4000 |11182|
* |20170511|北京市 |14390|11175|
* |20170511|廣東省 |4000 |11169|
* |20170511|上海市 |4500 |11167|
* |20170511|安徽省 |14704|11162|
* |20170511|北京市 |4000 |11156|
* |20170511|浙江省 |14322|11151|
* |20170511|上海市 |14623|11149|
* |20170511|廣東省 |4500 |11136|
* +--------+----+-----+-----+
*/
//-----------Window函式在Spark SQL中的使用--------------------
val cityTop3DF = cityAccessTopNDF.select(
cityAccessTopNDF("day"),
cityAccessTopNDF("city"),
cityAccessTopNDF("cmsId"),
cityAccessTopNDF("times"),
row_number().over(Window.partitionBy(cityAccessTopNDF("city"))
.orderBy(cityAccessTopNDF("times").desc)).as("times_rank")
).filter("times_rank <= 3").orderBy($"city".desc,$"times_rank".asc)
cityTop3DF.show(false)//展示每個地市的Top3
/**
* +--------+----+-----+-----+----------+
* |day |city|cmsId|times|times_rank|
* +--------+----+-----+-----+----------+
* |20170511|浙江省 |14540|22435|1 |
* |20170511|浙江省 |14322|11151|2 |
* |20170511|浙江省 |14390|11110|3 |
* |20170511|廣東省 |14540|22115|1 |
* |20170511|廣東省 |14623|11226|2 |
* |20170511|廣東省 |14704|11216|3 |
* |20170511|安徽省 |14540|22149|1 |
* |20170511|安徽省 |14390|11229|2 |
* |20170511|安徽省 |14704|11162|3 |
* |20170511|北京市 |14540|22270|1 |
* |20170511|北京市 |4600 |11271|2 |
* |20170511|北京市 |14390|11175|3 |
* |20170511|上海市 |14540|22058|1 |
* |20170511|上海市 |14704|11219|2 |
* |20170511|上海市 |4000 |11182|3 |
* +--------+----+-----+-----+----------+
*/
//-------------------將統計結果寫入資料庫-------------------
try {
cityTop3DF.foreachPartition(partitionOfRecords => {
val list = new ListBuffer[DayCityVideoAccessStat]
partitionOfRecords.foreach(info => {
val day = info.getAs[String]("day")
val cmsId = info.getAs[Long]("cmsId")
val city = info.getAs[String]("city")
val times = info.getAs[Long]("times")
val timesRank = info.getAs[Int]("times_rank")
list.append(DayCityVideoAccessStat(day, cmsId,city, times,timesRank))
})
StatDAO.insertDayCityVideoAccessTopN(list)
})
}catch {
case e:Exception => e.printStackTrace()
}
/**
* create table day_video_city_access_topn_stat (
* day varchar(8) not null,
* cms_id bigint(10) not null,
* city varchar(20) not null,
* times bigint(10) not null,
* times_rank int not null,
* primary key (day, cms_id, city)
* );
*/
}
需求三:按照流量統計主站最受歡迎的Top N課程
/**
* 需求三:按照流量統計主站最受歡迎的Top N課程
* @param spark
* @param cleanDF
*/
def videoTraffsTopStat(spark: SparkSession, cleanDF: DataFrame, day:String): Unit = {
//------------------使用DataFrame API完成統計操作--------------------------------------------
import spark.implicits._
val trafficsTopNDF = cleanDF.filter($"day" === day && $"cmsType" === "video")
.groupBy("day","cmsId").agg(sum("traffic").as("traffics")).orderBy($"traffics".desc)
trafficsTopNDF.show()
/**
* +--------+-----+--------+
* | day|cmsId|traffics|
* +--------+-----+--------+
* |20170511|14540|55454898|
* |20170511|14390|27895139|
* |20170511| 4500|27877433|
* |20170511| 4000|27847261|
* |20170511|14623|27822312|
* |20170511| 4600|27777838|
* |20170511|14704|27737876|
* |20170511|14322|27592386|
* +--------+-----+--------+
*/
//-------------------將統計結果寫入資料庫-------------------
try {
trafficsTopNDF.foreachPartition(partitionOfRecords => {
val list = new ListBuffer[DayVideoTrafficsStat]
partitionOfRecords.foreach(info => {
val day = info.getAs[String]("day")
val cmsId = info.getAs[Long]("cmsId")
val traffics = info.getAs[Long]("traffics")
list.append(DayVideoTrafficsStat(day, cmsId,traffics))
})
StatDAO.insertDayVideoTrafficsTopN(list)
})
}catch {
case e:Exception => e.printStackTrace()
}
/**
* create table day_video_traffics_topn_stat (
* day varchar(8) not null,
* cms_id bigint(10) not null,
* traffics bigint(20) not null,
* primary key (day, cms_id)
* );
*/
}
【https://gitee.com/robei/SparkSQLProject】