大資料之Spark(八)--- Spark閉包處理,部署模式和叢集模式,SparkOnYarn模式,高可用,Spark整合Hive訪問hbase類載入等異常解決,使用spark下的thriftserv
阿新 • • 發佈:2018-11-19
一、Spark閉包處理 ------------------------------------------------------------ RDD,resilient distributed dataset,彈性(容錯)分散式資料集。 分割槽列表,function,dep Option(分割槽類, Pair[Key,Value]),首選位置。 執行job時,spark將rdd打碎變換成tasks,每個task由一個executor執行。執行 之前,spark會進行task的閉包(closure)計算。閉包是指標對executor可見的 變數和方法,將他們封裝成一個包,以備在rdd的foreach中進行計算。閉包就是將包序列化,然後傳送給每個 executor. local模式下,所有spark程式執行在同一JVM中,共享物件,counter是可以累加的。 原因是所有executor指向的是同一個引用。 cluster模式下,不可以,counter是閉包處理的。每個節點對driver上的counter是 不可見的。只能看到自己內部序列化的counter副本。 二、Spark的應用的部署模式[客戶端模式和叢集模式] -------------------------------------------------------------------- a.spark-submit --class xxx xx.jar --deploy-mode (client | cluster) --deploy-mode指定是否部署的driver程式,是在worker節點上還是在client主機上。 b.[client] driver執行在client主機上。client可以不在cluster中。 c.[cluster] driver程式提交給spark cluster的某個worker節點來執行。 worker是cluster中的一員。 匯出的jar需要放置到所有worker節點都可見的位置(如hdfs)才可以。 d.不論哪種方式,rdd的運算都在worker執行 f.驗證Spark的部署模式 1)啟動spark叢集 2)程式設計
package com.test.spark.scala; import java.net.{InetAddress, Socket} import org.apache.spark.{SparkConf, SparkContext} /** * */ object DeployModeTest { def printInfo(str:String): Unit ={ val ip = InetAddress.getLocalHost.getHostAddress; val sock = new Socket("192.168.231.205",8888); val out = sock.getOutputStream; out.write((ip + " : " + str + "\r\n").getBytes()) out.flush() sock.close(); } def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setAppName("DeployModeTest") conf.setMaster("spark://s201:7077") val sc = new SparkContext(conf) printInfo("hello world") ; val rdd1 = sc.parallelize(1 to 10,3); val rdd2 = rdd1.map(e=>{ printInfo(" map : " + e) e * 2 ; }) val rdd3 = rdd2.repartition(2) val rdd4 = rdd3.map(e=>{ printInfo(" map2 : " + e) e }) val res = rdd4.reduce((a,b)=>{ printInfo("reduce : " + a + "," + b) a + b ; }) printInfo("driver : " + res + "") } }
3)打包 jar 對於cluster部署模式,必須要將jar放置到所有worker都能夠看到的地方才可以,例如hdfs。 4)複製到s100,並分發到所有節點的相同目錄下 5)提交job到spark叢集 $s100> spark-submit --class com.test.spark.scala.DeployModeTest --master spark://s100:7077 --deploy-mode client TestSpark-2-1.0-SNAPSHOT.jar //上傳jar到hdfs $> spark-submit --class com.test.spark.scala.DeployModeTest --master spark://s100:7077 --deploy-mode cluster hdfs://s500:8020/data/spark/TestSpark-2-1.0-SNAPSHOT.jar 三、Spark叢集的模式 ----------------------------------------------------- 1.Spark叢集模式的區別主要是ClusterMaster的什麼 如果是SparkMaster,就是local或者standalone或者local模式 如果是MesosMaster,就是Mesos模式 如果是ResourceManagerMaster,就是Yarn模式 2.開啟模式 yarn模式: --master yarn(yarn-site.xml) standalone: --master spark://s100:7077 mesos: --master mesos//xxx:xxx 2.[local]/[standalone] 使用SparkMaster程序作為管理節點. 3.[mesos] 使用mesos的master作為管理節點。 4.[yarn] a.使用hadoop的ResourceManager作為sparkMaster節點。不用spark的master. b.不需要啟動spark-master節點。也不需要。 c.確保HADOOP_CONF_DIR和YARN_CONF_DIR環境變數指向了包含了hadoop配置檔案的目錄,這些配置檔案可以確保,是向hdfs寫入資料,並且確定是連線到yarn的resourcemanager. --> cp core-site.xml hdfs-site.xml yarn-site.xml 到 spark/conf下 --> 分發到所有節點 --> 配置HADOOP_CONF_DIR和YARN_CONF_DIR環境變數 修改/soft/sparl/conf/spark-env.sh ----------------------------------- export HADOOP_CONF_DIR=/soft/hadoop/etc/hadoop export SPARK_EXECUTOR_INSTANCES=3 export SPARK_EXECUTOR_CORES=1 export SPARK_EXECUTOR_MEMORY=500M export SPARK_DRIVER_MEMORY=500M ---> 分發到所有節點 d.這些配置分發到yarn叢集的所有節點,並且確保所有節點的配置是一致的。配置中設定的所有屬性確保所有節點都能找到。 f.在yarn上執行spark應用,可以採用兩種部署模式。 a.cluster部署模式:driver執行在Yarn-Appmaster程序中。 b.client部署模式:driver執行在client程序中,Yarn-AppMaster只用於請求資源。 四、啟動Spark On Yarn模式 ------------------------------------------------------------- 1.修改程式碼並打包
package com.test.spark.scala;
import java.net.{InetAddress, Socket}
import org.apache.spark.{SparkConf, SparkContext}
/**
*
*/
object DeployModeTest {
def printInfo(str:String): Unit ={
val ip = InetAddress.getLocalHost.getHostAddress;
val sock = new Socket("192.168.231.205",8888);
val out = sock.getOutputStream;
out.write((ip + " : " + str + "\r\n").getBytes())
out.flush()
sock.close();
}
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("DeployModeTest")
conf.setMaster("yarn")
val sc = new SparkContext(conf)
printInfo("hello world") ;
val rdd1 = sc.parallelize(1 to 10,3);
val rdd2 = rdd1.map(e=>{
printInfo(" map : " + e)
e * 2 ;
})
val rdd3 = rdd2.repartition(2)
val rdd4 = rdd3.map(e=>{
printInfo(" map2 : " + e)
e
})
val res = rdd4.reduce((a,b)=>{
printInfo("reduce : " + a + "," + b)
a + b ;
})
printInfo("driver : " + res + "")
}
}
2.拷貝配置檔案,配置HADOOP_CONF_DIR和YARN_CONF_DIR環境變數
a.拷貝 core-site.xml hdfs-site.xml yarn-site.xml 到 spark/conf下,並分發到所有節點
b.修改/soft/spark/conf/spark-env.sh,並分發
export HADOOP_CONF_DIR=/soft/hadoop/etc/hadoop
export SPARK_EXECUTOR_INSTANCES=3
export SPARK_EXECUTOR_CORES=1
export SPARK_EXECUTOR_MEMORY=500M
export SPARK_DRIVER_MEMORY=500M
3.將Spark的jars檔案放到hdfs上[因為Hadoop叢集中預設是沒有spark的jar包的,所以,需要手動put上去,不然每次系統都會將jars打包上傳到臨時目錄]
a.將/soft/spark/jars資料夾上傳到hdfs上
$> hdfs dfs -put /soft/spark/jars /data/spark
4.配置spark屬性檔案,並分發到所有節點
[/spark/conf/spark-defaults.conf]
spark.yarn.jars hdfs://mycluster/data/spark/jars/*.jar
spark.yarn.am.memory=512M
spark.driver.memory=512M
spark.executor.memory=512M
5.在s500上開啟nc
$> nc -lk 8888
6.提交作業
//yarn + cluster
spark-submit --class com.test.spark.scala.DeployModeTest --master yarn --deploy-mode cluster hdfs://mycluster/data/spark/TestSpark-2-1.0-SNAPSHOT.jar
//yarn + client
spark-submit --class com.test.spark.scala.DeployModeTest --master yarn --deploy-mode client TestSpark-2-1.0-SNAPSHOT.jar
7.如果是yarn模式啟動,是不需要啟動spark叢集的
五、SparkMaster_HA
---------------------------------------------------------------
1.[描述]
只針對standalone和mesos叢集部署情況,因為yarn模式已經有HA了
使用zk連線多個master並存儲state。
master主要負責排程。
2.[配置配置檔案並分發到所有節點]
[spark/conf/spark-env.sh]
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=s100:2181,s200:2181,s300:2181 -Dspark.deploy.zookeeper.dir=/spark"
spark.deploy.recoveryMode=ZOOKEEPER
spark.deploy.zookeeper.url=s100:2181,s200:2181,s300:2181
spark.deploy.zookeeper.dir=/spark/ha
3.[啟動方式]
a.直接在多個節點上啟動master程序,HA會自動從zk中新增或者刪除Master節點.
$s100> ./start-all.sh
$s500> ./start-master.sh
b.也可通過指定多個master連線地址實現:spark://host1:port1,host2:port2.
程式碼中使用spark ha
conf.setMaster("spark://s100:7077,s500:7077")
六、通話日誌專案改造,使用Spark聚合查詢Hive的通話日誌
-----------------------------------------------------------------
1.原始碼 --- 使用hive + hadoop mr
/**
* 查詢指定人員指定年份中各個月份的通話次數
*/
public List<CalllogStat> statCalllogsCount_1(String caller, String year){
List<CalllogStat> list = new ArrayList<CalllogStat>() ;
try {
Connection conn = DriverManager.getConnection(url);
Statement st = conn.createStatement();
//拼串: select count(*) , substr(calltime,1,6) from ext_calllogs_in_hbase where caller = '15338597777'
// and substr(calltime,1,4) == '2018' group by substr(calltime,1,6) ;
String sql = "select count(*) ,substr(calltime,1,6) from ext_calllogs_in_hbase " +
"where caller = '" + caller+"' and substr(calltime,1,4) == '" + year
+ "' group by substr(calltime,1,6)";
ResultSet rs = st.executeQuery(sql);
Calllog log = null;
while (rs.next()) {
CalllogStat logSt = new CalllogStat();
logSt.setCount(rs.getInt(1));
logSt.setYearMonth(rs.getString(2));
list.add(logSt);
}
rs.close();
return list;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
2.改造後代碼 --- 使用 hive + spark
/**
* 查詢指定人員指定年份中各個月份的通話次數
*/
public List<CalllogStat> statCalllogsCount(String caller, String year){
List<CalllogStat> list = new ArrayList<CalllogStat>() ;
SparkConf conf = new SparkConf();
conf.setAppName("SparkHive");
conf.setMaster("spark://s100:7077,s500:7077");
SparkSession sess = SparkSession.builder().config(conf).getOrCreate();
SparkContext sc = sess.sparkContext();
//拼串: select count(*) , substr(calltime,1,6) from ext_calllogs_in_hbase where caller = '15338597777'
// and substr(calltime,1,4) == '2018' group by substr(calltime,1,6) ;
String sql = "select count(*) ,substr(calltime,1,6) from ext_calllogs_in_hbase " +
"where caller = '" + caller+"' and substr(calltime,1,4) == '" + year
+ "' group by substr(calltime,1,6)";
Dataset<Row> df = sess.sql(sql);
List<Row> lst = df.collectAsList();
for(Row r : lst)
{
CalllogStat logSt = new CalllogStat();
logSt.setCount(r.getInt(1));
logSt.setYearMonth(r.getString(2));
list.add(logSt);
}
return list;
}
七、Spark整合Hive訪問hbase庫出現類找不到問題解決
----------------------------------------------------------
1.Spark local[*]模式
a.複製hive的hive-hbase-handler-2.1.0.jar檔案到spark/jars目錄下[所有節點]
$> xcall.sh "cp /soft/hive/lib/hive-hbase-handler-2.1.1.jar /soft/spark/jars/"
b.複製hive/下的metrics的jar檔案到spark/jars下[所有節點]。
$>cd /soft/hive/lib
$>ls /soft/hive/lib | grep metrics | cp `xargs` /soft/spark/jars
c.啟動spark-shell 本地模式測試
$>spark-shell --master local[4]
$scala>spark.sql("select * from mydb.ext_calllogs_in_hbase").show();
$scala>spark.sql("select count(*) ,substr(calltime,1,6) from ext_calllogs_in_hbase where caller = '15778423030' and substr(calltime,1,4) == '2017' group by substr(calltime,1,6)").show();
2.Spark standlone模式
a.複製hive的hive-hbase-handler-2.1.0.jar檔案到spark/jars目錄下[所有節點]
$> xcall.sh "cp /soft/hive/lib/hive-hbase-handler-2.1.1.jar /soft/spark/jars/"
b.複製hive/下的metrics的jar檔案到spark/jars下[所有節點]。
$>cd /soft/hive/lib
$>ls /soft/hive/lib | grep metrics | cp `xargs` /soft/spark/jars
c.將spark/jars 中的所有jar包複製到hdfs叢集上
$> hdfs dfs -put /soft/spark/jars /data/spark/
d.啟動spark叢集
$> ./start-all.sh
e.開啟spark-standlone shell 進行測試
$>spark-shell --master spark://s100:7077,s500:7077 //因為有HA
$scala>spark.sql("select * from mydb.ext_calllogs_in_hbase").show();
$scala>spark.sql("select count(*) ,substr(calltime,1,6) from ext_calllogs_in_hbase where caller = '15778423030' and substr(calltime,1,4) == '2017' group by substr(calltime,1,6)").show();
3.Spark IDEA中程式設計手段實現訪問Hbase+hive
a.引入新增依賴[注意版本要一致]
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hive/hive-hbase-handler -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-hbase-handler</artifactId>
<version>2.1.0</version>
</dependency>
b.程式設計測試
@Test
public void test1(){
String caller = "13341109505" ;
String year = "2017" ;
SparkSession sess = SparkSession.builder().enableHiveSupport().appName("SparkHive").master("spark://s201:7077").getOrCreate();
String sql = "select count(*) ,substr(calltime,1,6) from ext_calllogs_in_hbase " +
"where caller = '" + caller + "' and substr(calltime,1,4) == '" + year
+ "' group by substr(calltime,1,6) order by substr(calltime,1,6)";
Dataset<Row> df = sess.sql(sql);
List<Row> rows = df.collectAsList();
List<CallLogStat> list = new ArrayList<CallLogStat>();
for (Row row : rows) {
System.out.println(row.getString(1));
list.add(new CallLogStat(row.getString(1), (int)row.getLong(0)));
}
}
八、Spark SQL統計查詢:使用spark下的thriftserver2伺服器訪問hbase -------------------------------------------------------------------------------- 1.啟動spark下的thriftserver2伺服器 $>./start-thriftserver.sh --master spark://s100:7077 2.web程式通過hive-jdbc驅動程式進行整合 引入pom.xml <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-jdbc</artifactId> <version>2.1.0</version> </dependency> 3.程式設計 Class.forName("org.apache.hive.jdbc.HiveDriver"); Connection conn = DriverManager.getConnection("jdbc:hive2://s201:10000"); String sql = "select count(*) ,substr(calltime,1,6) from mydb.ext_calllogs_in_hbase " + "where caller = '" + caller + "' and substr(calltime,1,4) == '" + year + "' group by substr(calltime,1,6) order by substr(calltime,1,6) desc"; Statement st = conn.createStatement(); ResultSet rs = st.executeQuery(sql); List<CallLogStat> list = new ArrayList<CallLogStat>(); while (rs.next()) { long count = rs.getLong(1); String ym = rs.getString(2); list.add(new CallLogStat(ym, (int)count)); } rs.close(); st.close(); conn.close(); return list ;