1. 程式人生 > 實用技巧 >Spark專案實戰從0到1之(10)Spark讀取HDFS寫入Hive

Spark專案實戰從0到1之(10)Spark讀取HDFS寫入Hive

package com.xxxx.report.service;

import com.google.common.collect.Lists;
import com.xx.report.config.Constants;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Serializable; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.List; /** * @author huanghanyu */ public class BicycleLog2hive implements Serializable{ // Log日誌 private static final Logger LOG = LoggerFactory.getLogger(BicycleLog2hive.class
); // 日期格式化 private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMdd"); private static final String TMP_TABLE_NAME = "tableNameTemp"; private static final String TABLE_NAME = "tableName1"; private static final String APP_NAME = "xxxxx@yangxin"; private
EngineLockLog handleLine(String line) { EngineLockLog engineLockLog = new EngineLockLog(); try { System.out.println("handleLine Function -> : " + line); xxxxxxxxxxxxxxxxx xxxxxxxxxxxxxxx xxxxxxxxxxxx }catch (Exception error) { System.out.println(error.getMessage() + " | " + line); error.printStackTrace(); } return engineLockLog; } public void run(String master, String startTime, String endTime) { long startTimsStamp = System.currentTimeMillis(); startTime = startTime.replace("-", ""); startTime = startTime.replace("_", ""); endTime = endTime.replace("-", ""); endTime = endTime.replace("_", ""); SparkSession spark = SparkSession.builder().appName(APP_NAME).enableHiveSupport().getOrCreate(); List<String> list = Lists.newArrayList(); Calendar calendar = Calendar.getInstance(); calendar.set(Calendar.YEAR, Integer.valueOf(startTime.substring(0, 4))); calendar.set(Calendar.MONTH, Integer.valueOf(startTime.substring(4, 6)) - 1); calendar.set(Calendar.DATE, Integer.valueOf(startTime.substring(6, 8))); String date = startTime; while (!date.equals(endTime)) { list.add(date); calendar.add(Calendar.DATE, 1); date = simpleDateFormat.format(calendar.getTime()); } list.add(endTime); for (String day : list) { LOG.info("日期:-> " + day); StringBuilder path = new StringBuilder(); path.append(Constants.PREFIX_BICYCLE_LOG_PATH_YangXin).append(day).append("/*/*"); LOG.info("路徑:-> " + path); JavaRDD<EngineLockLog> mapRDD = spark.read().textFile(path.toString()). javaRDD(). map(line -> { return handleLine(line); }).filter(new Function<EngineLockLog, Boolean>() { @Override public Boolean call(EngineLockLog engineLockLog) throws Exception { return engineLockLog.getUser_id() != null; } }); if (!mapRDD.isEmpty()) { Dataset<Row> mapDF = spark.createDataFrame(mapRDD, EngineLockLog.class); mapDF.createOrReplaceTempView(TMP_TABLE_NAME); String dayTemp = day.substring(0, 4) + "-" + (day.substring(4,6)) + "-" + day.substring(6, 8); String insertSQL = "insert into table " + TABLE_NAME + " partition(dt=\'" + dayTemp + "\') " + "select xxxx,xxxxx,xxxxx from " + TMP_TABLE_NAME; spark.sql(insertSQL); } } long endTimeStamp = System.currentTimeMillis(); System.out.println("總耗時: -> " + (endTimeStamp - startTimsStamp) + "ms"); } public static void main(String[] args) { String master = args[0]; String startTime = args[1]; String endTime = args[2]; BicycleLog2hive bicycleLog2hive = new BicycleLog2hive(); bicycleLog2hive.run(master, startTime, endTime); } }