Spark rest api 提交監控任務
阿新 • • 發佈:2018-12-15
場景
有些時候我們希望搭建自己的web平臺對spark任務進行啟動、監控和管理。spark也提供了restful api對任務進行監控,但是對於如何從外部提交任務並沒有說明。
一、提交任務
我們使用java進行後臺呼叫restful api,附上提交任務的格式:
curl -X POST http://spark-cluster-ip:6066/v1/submissions/create --header "Content-Type:application/json;charset=UTF-8" --data '{ "action" : "CreateSubmissionRequest", "appArgs" : [ "args1, args2,..." ], "appResource" : "file:/myfilepath/spark-job-1.0.jar", "clientSparkVersion" : "2.1.0", "environmentVariables" : { "SPARK_ENV_LOADED" : "1" }, "mainClass" : "com.mycompany.MyJob", "sparkProperties" : { "spark.jars" : "file:/myfilepath/spark-job-1.0.jar", "spark.driver.supervise" : "false", "spark.app.name" : "MyJob", "spark.eventLog.enabled": "true", "spark.submit.deployMode" : "cluster", "spark.master" : "spark://spark-cluster-ip:6066" } }'
現在我們需要後臺提交需要進行序列化,首先要構建一個JavaBean:
package com.dci.log.logsearch; import com.fasterxml.jackson.annotation.JsonAnyGetter; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import java.util.HashMap; import java.util.List; import java.util.Map; @JsonInclude(JsonInclude.Include.NON_NULL) class JobSubmitRequest { private Action action; private String appResource; private List<String> appArgs; private String clientSparkVersion; public Action getAction() { return action; } public void setAction(Action action) { this.action = action; } public String getAppResource() { return appResource; } public void setAppResource(String appResource) { this.appResource = appResource; } public List<String> getAppArgs() { return appArgs; } public void setAppArgs(List<String> appArgs) { this.appArgs = appArgs; } public String getClientSparkVersion() { return clientSparkVersion; } public void setClientSparkVersion(String clientSparkVersion) { this.clientSparkVersion = clientSparkVersion; } public String getMainClass() { return mainClass; } public void setMainClass(String mainClass) { this.mainClass = mainClass; } public Map<String, String> getEnvironmentVariables() { return environmentVariables; } public void setEnvironmentVariables(Map<String, String> environmentVariables) { this.environmentVariables = environmentVariables; } public SparkProperties getSparkProperties() { return sparkProperties; } public void setSparkProperties(SparkProperties sparkProperties) { this.sparkProperties = sparkProperties; } private String mainClass; private Map<String,String> environmentVariables; private SparkProperties sparkProperties; @JsonInclude(JsonInclude.Include.NON_NULL) static class SparkProperties { @JsonProperty(value = "spark.jars") private String jars; @JsonProperty(value = "spark.app.name") private String appName; @JsonProperty(value = "spark.master") private String master; private Map<String,String> otherProperties = new HashMap<>(); public String getJars() { return jars; } public void setJars(String jars) { this.jars = jars; } public String getAppName() { return appName; } public void setAppName(String appName) { this.appName = appName; } public String getMaster() { return master; } public void setMaster(String master) { this.master = master; } public void setOtherProperties(Map<String, String> otherProperties) { this.otherProperties = otherProperties; } void setOtherProperties(String key, String value) { this.otherProperties.put(key,value); } @JsonAnyGetter Map<String,String> getOtherProperties() { return this.otherProperties; } }
然後我們進行傳參就可以執行了。
final JobSubmitRequest jobSubmitRequest = new JobSubmitRequest(); jobSubmitRequest.setAction(Action.CreateSubmissionRequest); List<String> appArgs = new ArrayList<>(); appArgs.add(starttime); appArgs.add(endtime); appArgs.add(servicemaxnum); appArgs.add(key); jobSubmitRequest.setAppArgs(appArgs); jobSubmitRequest.setAppResource(jarpath); jobSubmitRequest.setClientSparkVersion("2.3.1"); jobSubmitRequest.setMainClass(mainclass); Map<String, String> environmentVariables = new HashMap<>(); environmentVariables.put("SPARK_ENV_LOADED", "1"); jobSubmitRequest.setEnvironmentVariables(environmentVariables); JobSubmitRequest.SparkProperties sparkProperties = new JobSubmitRequest.SparkProperties(); sparkProperties.setJars(jarpath); sparkProperties.setAppName("SubmitScalaJobToSpark"); sparkProperties.setOtherProperties("spark.submit.deployMode", "cluster"); sparkProperties.setMaster("spark://"+master); jobSubmitRequest.setSparkProperties(sparkProperties); HttpClient client = HttpClients.createDefault(); final String url = "http://"+master+"/v1/submissions/create"; final HttpPost post = new HttpPost(url); post.setHeader(HTTP.CONTENT_TYPE, "application/json;charset=UTF-8"); try { final String message = MapperWrapper.MAPPER.writeValueAsString(jobSubmitRequest); post.setEntity(new StringEntity(message.toString())); final String stringResponse = client.execute(post, new BasicResponseHandler()); if (stringResponse != null) { SparkResponse response = MapperWrapper.MAPPER.readValue(stringResponse, SparkResponse.class); return response.getSubmissionId(); } else { return "FAILED"; } } catch (Exception e) { System.out.println(e); return "FAILED"; }
二、監控任務狀態
監控任務狀態就很簡單了,輸入Submission ID就可以了
curl http://spark-cluster-ip:6066/v1/submissions/status/driver-20151008145126-0000