Java调用oozie提交spark on yarn任务

2024-08-24 19:58

本文主要是介绍Java调用oozie提交spark on yarn任务,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1.需要在oozie-site.xml设置如下属性:

<property> <name>hadoop.proxyuser.cenyuhai.hosts</name> <value>*</value> 
</property> 
<property> <name>hadoop.proxyuser.cenyuhai.groups</name> <value>*</value> 
</property>

如果Oozie报错ClassNotFoundException: Class org.apache.oozie.action.hadoop.SparkMain 再添加一个属性

<property> <name>oozie.use.system.libpath</name> <value>true</value> 
</property>

3.Java调用代码

maven的jar吧

<dependency><groupId>org.apache.oozie</groupId><artifactId>oozie-client</artifactId><version>4.1.0-cdh5.9.0</version></dependency><dependency><groupId>org.apache.oozie</groupId><artifactId>oozie-core</artifactId><version>4.1.0-cdh5.9.0</version><scope>provided</scope></dependency><dependency><groupId>org.apache.oozie</groupId><artifactId>oozie-hadoop</artifactId><version>2.6.0-cdh5.9.2.oozie-4.1.0-cdh5.9.2</version><scope>provided</scope></dependency>


import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.OozieClientException;
import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.client.WorkflowJob.Status;import java.util.LinkedList;
import java.util.List;
import java.util.Properties;/*** Created by root on 7/19/17.*/
public class WorkFlowClient {private static String OOZIE_URL = "http://master01:11000/oozie/";private static String JOB_PATH = "hdfs://master01:8020/bobo/in/flow/workflow.xml";private static String JOB_Tracker = "master01:8032";private static String NAMENode = "hdfs://master01:8020";OozieClient wc = null;public WorkFlowClient(String url){wc = new OozieClient(url);}public String startJob(String wfDefinition, List<WorkflowParameter> wfParameters)throws OozieClientException {// create a workflow job configuration and set the workflow application pathProperties conf = wc.createConfiguration();conf.setProperty(OozieClient.APP_PATH, wfDefinition);// setting workflow parametersconf.setProperty("jobTracker", JOB_Tracker);conf.setProperty("nameNode", NAMENode);if((wfParameters != null) && (wfParameters.size() > 0)){for(WorkflowParameter parameter : wfParameters)conf.setProperty(parameter.getName(), parameter.getValue());}// submit and start the workflow jobreturn wc.run(conf);}public Status getJobStatus(String jobID) throws OozieClientException{WorkflowJob job = wc.getJobInfo(jobID);return job.getStatus();}public static void main(String[] args) throws OozieClientException, InterruptedException{// Create clientWorkFlowClient client = new WorkFlowClient(OOZIE_URL);// Create parametersList<WorkflowParameter> wfParameters = new LinkedList<WorkflowParameter>();WorkflowParameter jobmaster = new WorkflowParameter("jobmaster","yarn-client");WorkflowParameter jobmode = new WorkflowParameter("jobmode","client");WorkflowParameter jobname = new WorkflowParameter("jobname","SparkOozieAction");WorkflowParameter jarclass = new WorkflowParameter("jarclass","com.kafkaspark.sparkstreaming.SparkstreamingData");WorkflowParameter jarpath = new WorkflowParameter("jarpath","hdfs://master01:8020/bobo/in/flow/kafkaspark.jar");WorkflowParameter sparkopts = new WorkflowParameter("sparkopts","--num-executors 3 --executor-memory 1G --executor-cores 3 --driver-memory 2G --files config.properties  --conf spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC" +" --conf spark.yarn.jar=hdfs://master01:8020/bobo/in/flow/spark-assembly-1.6.0-cdh5.9.0-hadoop2.6.0-cdh5.9.0.jar");WorkflowParameter jararg1 = new WorkflowParameter("jararg1","slave01:9092,slave02:9092,slave03:9092");WorkflowParameter jararg2 = new WorkflowParameter("jararg2","DATA-TOPIC");wfParameters.add(jobmaster);wfParameters.add(jobmode);wfParameters.add(jobname);wfParameters.add(jarclass);wfParameters.add(jarpath);wfParameters.add(sparkopts);wfParameters.add(jararg1);wfParameters.add(jararg2);// Start OozingString jobId = client.startJob(JOB_PATH, wfParameters);Status status = client.getJobStatus(jobId);if(status == Status.RUNNING)System.out.println("Workflow job running");elseSystem.out.println("Problem starting Workflow job");}
}
parameter类就一个name和value属性

4.workflow.xml的配置

<workflow-app name="Spark_Workflow" xmlns="uri:oozie:workflow:0.1">  <start to="spark-SparkOozieAction"/>  <kill name="Kill">  <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>  </kill>  <action name="spark-SparkOozieAction">  <spark xmlns="uri:oozie:spark-action:0.1">  <job-tracker>${jobTracker}</job-tracker>  <name-node>${nameNode}</name-node>  <master>${jobmaster}</master>  <mode>${jobmode}</mode>  <name>${jobname}</name>  <class>${jarclass}</class>  <jar>${jarpath}</jar>  <spark-opts>${sparkopts}</spark-opts>  <arg>${jararg1}</arg>  <arg>${jararg2}</arg>  </spark>  <ok to="End"/>  <error to="Kill"/>  </action>  <end name="End"/>  
</workflow-app>  
5.你就可以去oozie的web界面查看提交的job




这篇关于Java调用oozie提交spark on yarn任务的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1103454

相关文章

Java中StopWatch的使用示例详解

《Java中StopWatch的使用示例详解》stopWatch是org.springframework.util包下的一个工具类,使用它可直观的输出代码执行耗时,以及执行时间百分比,这篇文章主要介绍... 目录stopWatch 是org.springframework.util 包下的一个工具类,使用它

Java进行文件格式校验的方案详解

《Java进行文件格式校验的方案详解》这篇文章主要为大家详细介绍了Java中进行文件格式校验的相关方案,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录一、背景异常现象原因排查用户的无心之过二、解决方案Magandroidic Number判断主流检测库对比Tika的使用区分zip

Java实现时间与字符串互相转换详解

《Java实现时间与字符串互相转换详解》这篇文章主要为大家详细介绍了Java中实现时间与字符串互相转换的相关方法,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录一、日期格式化为字符串(一)使用预定义格式(二)自定义格式二、字符串解析为日期(一)解析ISO格式字符串(二)解析自定义

Java使用Curator进行ZooKeeper操作的详细教程

《Java使用Curator进行ZooKeeper操作的详细教程》ApacheCurator是一个基于ZooKeeper的Java客户端库,它极大地简化了使用ZooKeeper的开发工作,在分布式系统... 目录1、简述2、核心功能2.1 CuratorFramework2.2 Recipes3、示例实践3

Springboot处理跨域的实现方式(附Demo)

《Springboot处理跨域的实现方式(附Demo)》:本文主要介绍Springboot处理跨域的实现方式(附Demo),具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不... 目录Springboot处理跨域的方式1. 基本知识2. @CrossOrigin3. 全局跨域设置4.

springboot security使用jwt认证方式

《springbootsecurity使用jwt认证方式》:本文主要介绍springbootsecurity使用jwt认证方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地... 目录前言代码示例依赖定义mapper定义用户信息的实体beansecurity相关的类提供登录接口测试提供一

Spring Boot 3.4.3 基于 Spring WebFlux 实现 SSE 功能(代码示例)

《SpringBoot3.4.3基于SpringWebFlux实现SSE功能(代码示例)》SpringBoot3.4.3结合SpringWebFlux实现SSE功能,为实时数据推送提供... 目录1. SSE 简介1.1 什么是 SSE?1.2 SSE 的优点1.3 适用场景2. Spring WebFlu

基于SpringBoot实现文件秒传功能

《基于SpringBoot实现文件秒传功能》在开发Web应用时,文件上传是一个常见需求,然而,当用户需要上传大文件或相同文件多次时,会造成带宽浪费和服务器存储冗余,此时可以使用文件秒传技术通过识别重复... 目录前言文件秒传原理代码实现1. 创建项目基础结构2. 创建上传存储代码3. 创建Result类4.

Java利用JSONPath操作JSON数据的技术指南

《Java利用JSONPath操作JSON数据的技术指南》JSONPath是一种强大的工具,用于查询和操作JSON数据,类似于SQL的语法,它为处理复杂的JSON数据结构提供了简单且高效... 目录1、简述2、什么是 jsONPath?3、Java 示例3.1 基本查询3.2 过滤查询3.3 递归搜索3.4

Tomcat版本与Java版本的关系及说明

《Tomcat版本与Java版本的关系及说明》:本文主要介绍Tomcat版本与Java版本的关系及说明,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录Tomcat版本与Java版本的关系Tomcat历史版本对应的Java版本Tomcat支持哪些版本的pythonJ