oozie应用介绍

2023-12-10 18:32
文章标签 应用 介绍 oozie

本文主要是介绍oozie应用介绍,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

在前一篇文章《Oozie简介》中,我们已经描述了Oozie工作流服务器,并且展示了一个非常简单的工作流示例。我们还描述了针对Oozie的工作流的部署和配置,以及用来启动、停止和监控Oozie工作流的工具。

在本文中,我们会描述一个更加复杂的例子,通过它我们可以讨论更多Oozie特性,并演示如何来使用它们。

定义过程

我们在此描述的工作流会实现汽车GPS探测数据的获取过程。我们每个小时都会以文件的形式把探测数据传递到指定的HDFS目录中[1],其中包含有这个小时之内的所有探测数据。探测数据的获取是每天针对一天内所有的24个文件完成的。如果文件的数量是24,那么获取过程就会启动。否则:

  • 当天什么都不做
  • 对前一天——最多到7天,发送剩下的内容到探测数据提供程序
  • 如果目录的存在时间已达到7天,那么就获取所有可用的探测数据文件。

过程的总体实现请见图1

(点击可以查看大图)。

图1: 过程图

在此,主流程(数据获取流程)首先会为今天以及之前的六天计算出目录的名称,然后启动(fork)七个目录的子过程(子流程)。待所有子过程的状态都变成终止之后,join步骤就会把控制权交给end状态。

子过程启动时,首先会获得关于目录的信息——它的日期以及文件数量。基于这条信息,它会决定是获取数据还是把数据归档,或者发送剩下的邮件,或者不做任何工作。

Directory子过程实现

以下代码负责实现的是directory子过程(代码1)。

<workflow-app xmlns='uri:oozie:workflow:0.1' name='processDir'><start to='getDirInfo' /><!-- STEP ONE --><action name='getDirInfo'><!--writes 2 properties: dir.num-files: returns -1 if dir doesn't exist,otherwise returns # of files in dir dir.age: returns -1 if dir doesn't exist,otherwise returns age of dir in days --><java><job-tracker>${jobTracker}</job-tracker><name-node>${nameNode}</name-node><main-class>com.navteq.oozie.GetDirInfo</main-class><arg>${inputDir}</arg><capture-output /></java><ok to="makeIngestDecision" /><error to="fail" /></action><!-- STEP TWO --><decision name="makeIngestDecision"><switch><!-- empty or doesn't exist --><case to="end">${wf:actionData('getDirInfo')['dir.num-files'] lt 0 ||(wf:actionData('getDirInfo')['dir.age'] lt 1 andwf:actionData('getDirInfo')['dir.num-files'] lt 24)}</case><!-- # of files >= 24 --><case to="ingest">${wf:actionData('getDirInfo')['dir.num-files'] gt 23 ||wf:actionData('getDirInfo')['dir.age'] gt 6}</case><default to="sendEmail"/></switch></decision><!--EMAIL--><action name="sendEmail"><java><job-tracker>${jobTracker}</job-tracker><name-node>${nameNode}</name-node><main-class>com.navteq.oozie.StandaloneMailer</main-class><arg>probedata2@navteq.com</arg><arg>gregory.titievsky@navteq.com</arg><arg>${inputDir}</arg><arg>${wf:actionData('getDirInfo')['dir.num-files']}</arg><arg>${wf:actionData('getDirInfo')['dir.age']}</arg></java><ok to="end" /><error to="fail" /></action><!--INGESTION --><action name="ingest"><java><job-tracker>${jobTracker}</job-tracker><name-node>${nameNode}</name-node><prepare><delete path="${outputDir}" /></prepare><configuration><property><name>mapred.reduce.tasks</name><value>300</value></property></configuration><main-class>com.navteq.probedata.drivers.ProbeIngest</main-class><arg>-conf</arg><arg>action.xml</arg><arg>${inputDir}</arg><arg>${outputDir}</arg></java><ok to=" archive-data" /><error to="ingest-fail" /></action><!—Archive Data --><action name="archive-data"><fs><move source='${inputDir}' target='/probe/backup/${dirName}' /><delete path = '${inputDir}' /></fs><ok to="end" /><error to="ingest-fail" /></action><kill name="ingest-fail"><message>Ingestion failed, errormessage[${wf:errorMessage(wf:lastErrorNode())}]</message></kill><kill name="fail"><message>Java failed, errormessage[${wf:errorMessage(wf:lastErrorNode())}]</message></kill><end name='end' />
</workflow-app>

代码1: Directory子过程

这个子过程的start节点会触发自定义的java节点,这个节点会获得目录信息(代码2)。

package com.navteq.oozie;import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.util.GregorianCalendar;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;public class GetDirInfo {private static final String OOZIE_ACTION_OUTPUT_PROPERTIES = "oozie.action.output.properties";public static void main(String[] args) throws Exception {String dirPath = args[0];String propKey0 = "dir.num-files";String propVal0 = "-1";String propKey1 = "dir.age";String propVal1 = "-1";System.out.println("Directory path: '"+dirPath+"'");Configuration conf = new Configuration();FileSystem fs = FileSystem.get(conf);Path hadoopDir = new Path(dirPath);if (fs.exists(hadoopDir)){FileStatus[] files = FileSystem.get(conf).listStatus(hadoopDir);int numFilesInDir = files.length;propVal0 = Integer.toString(numFilesInDir);long timePassed, daysPassedLong;int daysPassed;String dirName = hadoopDir.getName();String[] dirNameArray = dirName.split("-");if (dirNameArray.length == 3) {int year = Integer.valueOf(dirNameArray[0]);int month = Integer.valueOf(dirNameArray[1]) - 1; //months are 0 basedint date = Integer.valueOf(dirNameArray[2]);GregorianCalendar dirCreationDate = new GregorianCalendar(year,month, date);timePassed = (new GregorianCalendar()).getTimeInMillis()- dirCreationDate.getTimeInMillis();daysPassed = (int) = timePassed / 1000 / 60 / 60 / 24;;propVal1 = Integer.toString(daysPassed);}}String oozieProp = System.getProperty(OOZIE_ACTION_OUTPUT_PROPERTIES);if (oozieProp != null) {File propFile = new File(oozieProp);Properties props = new Properties();props.setProperty(propKey0, propVal0);props.setProperty(propKey1, propVal1);OutputStream os = new FileOutputStream(propFile);props.store(os, "");os.close();} elsethrow new RuntimeException(OOZIE_ACTION_OUTPUT_PROPERTIES+ " System property not defined");}
}

代码2: 获得目录信息的节点

这个类会获得目录名作为输入的参数,并首先检查该目录是否存在。如果目录不存在,那么存在时间(age)和文件数量都会返回-1,否则,这两个值就会返回给子过程。

子过程的下一步是一个switch(决定)声明,它会决定如何处理目录。如果目录不存在(文件数 < 0),或者是当前日期(存在时间 < 1)并且文件数量少于24(文件数 < 24),那么子过程就会直接转换到终止状态。如果所有文件都位于子目录中(文件数 > 23)或者目录是在至少七天前创建的(存在时间 > 6),那么就会有如下操作:

  • 使用现存的Map/reduce程序[2]获取数据
  • 目录会备份在数据归档中,然后删除

对action节点的其它配置
获取动作向你展示了另外一些Oozie配置参数,包括:
  • Prepare——如果出现了prepare参数,就意味着在启动作业(job)之前会删除路径列表。这应该专门用于清理目录。删除操作会在fs.default.name文件系统中执行。
  • Configuration——如果出现了configuration元素,它其中就会包含针对Map/Reduce 作业的JobConf属性。它不仅可以用于map/reduce动作, 而且还可以用于启动map/reduce作业的java动作。

如果不是以上两种情况,那么子过程就会发送剩余的邮件,然后退出。邮件是作为另一个java主类实现的(代码3)。

package com.navteq.oozie;import java.util.Properties;
import javax.mail.Message;
import javax.mail.Session;
import javax.mail.Transport;
import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeMessage;public class StandaloneMailer {private static String _mServer = "imailchi.navtech.com";private static Properties _props = null;private StandaloneMailer(){}public static void init(String mServer){_mServer = mServer;_props = new Properties();_props.setProperty("mail.smtp.host", _mServer);}public static void SendMail(String subject, String message, String from, String to) throws Exception {// create some properties and get the default SessionSession session = Session.getDefaultInstance(_props, null);// create a messageMessage msg = new MimeMessage(session);// set the from and to addressInternetAddress addressFrom = new InternetAddress(from);msg.setFrom(addressFrom);String [] recipients = new String[] {to};InternetAddress[] addressTo = new InternetAddress[recipients.length];for (int i = 0; i < recipients.length; i++){addressTo[i] = new InternetAddress(recipients[i]);}msg.setRecipients(Message.RecipientType.TO, addressTo);// Setting the Subject and Content Typemsg.setSubject(subject);msg.setContent(message, "text/plain");Transport.send(msg);}public static void main (String[] args) throws Exception {if (args.length ==5){init(_mServer);StringBuilder subject = new StringBuilder();StringBuilder body = new StringBuilder();subject.append("Directory ").append(args[2]).append(" contains").append(args[3]).append(" files.");body.append("Directory ").append(args[2]).append(" is ").append(args[4]).append(" days old and contains only ").append(args[3]).append(" files instead of 24.");SendMail(subject.toString(), body.toString(), args[0], args[1]);}else throw new Exception("Invalid number of parameters provided for email");}
}

列表3: 发送提醒邮件

这是使用了javax.mail API的简单实现,用于发送邮件。

主过程的实现

我们已经实现了子过程,然后,对主过程的实现就变得非常简单了(列表4)[3]

<workflow-app xmlns='uri:oozie:workflow:0.1' name='processDirsWF'><start to='getDirs2Process' /><!-- STEP ONE --><action name='getDirs2Process'><!--writes 2 properties: dir.num-files: returns -1 if dir doesn't exist,otherwise returns # of files in dir dir.age: returns -1 if dir doesn't exist,otherwise returns age of dir in days --><java><job-tracker>${jobTracker}</job-tracker><name-node>${nameNode}</name-node><main-class>com.navteq.oozie.GenerateLookupDirs</main-class><capture-output /></java><ok to="forkSubWorkflows" /><error to="fail" /></action><fork name="forkSubWorkflows"><path start="processDir0"/><path start="processDir1"/><path start="processDir2"/><path start="processDir3"/><path start="processDir4"/><path start="processDir5"/><path start="processDir6"/><path start="processDir7"/></fork><action name="processDir0"><sub-workflow><app-path>hdfs://sachicn001:8020/user/gtitievs/workflows/ingest</app-path><configuration><property><name>inputDir</name><value>hdfs://sachicn001:8020/user/data/probedev/files/${wf:actionData('getDirs2Process')['dir0']}</value></property><property><name>outputDir</name><value>hdfs://sachicn001:8020/user/gtitievs/probe-output/${wf:actionData('getDirs2Process')['dir0']}</value></property><property><name>jobTracker</name><value>${jobTracker}</value></property><property><name>nameNode</name><value>${nameNode}</value></property><property><name>activeDir</name><value>hdfs://sachicn001:8020/user/gtitievs/test-activeDir</value></property><property><name>dirName</name><value>${wf:actionData('getDirs2Process')['dir0']}</value></property></configuration></sub-workflow><ok to="joining"/><error to="fail"/></action>
….<action name="processDir7"><sub-workflow><app-path>hdfs://sachicn001:8020/user/gtitievs/workflows/ingest</app-path><configuration><property><name>inputDir</name><value>hdfs://sachicn001:8020/user/data/probedev/files/${wf:actionData('getDirs2Process')['dir7']}</value></property><property><name>outputDir</name><value>hdfs://sachicn001:8020/user/gtitievs/probe-output/${wf:actionData('getDirs2Process')['dir7']}</value></property><property><name>dirName</name><value>${wf:actionData('getDirs2Process')['dir7']}</value></property></configuration></sub-workflow><ok to="joining"/><error to="fail"/></action><join name="joining" to="end"/><kill name="fail"><message>Java failed, errormessage[${wf:errorMessage(wf:lastErrorNode())}]</message></kill><end name='end' />
</workflow-app>

代码4: 数据获取主过程

这个过程首先会触发java节点,计算需要处理的目录列表(列表5),然后对每个目录执行子过程,从而处理给定的目录。

package com.navteq.oozie;import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.util.Calendar;
import java.util.GregorianCalendar;
import java.util.Properties;public class GenerateLookupDirs {public static final long dayMillis = 1000 * 60 * 60 * 24;private static final String OOZIE_ACTION_OUTPUT_PROPERTIES = "oozie.action.output.properties";public static void main(String[] args) throws Exception {Calendar curDate = new GregorianCalendar();int year, month, date;String propKey, propVal;String oozieProp = System.getProperty(OOZIE_ACTION_OUTPUT_PROPERTIES);if (oozieProp != null) {File propFile = new File(oozieProp);Properties props = new Properties();for (int i = 0; i<8; ++i){year = curDate.get(Calendar.YEAR);month = curDate.get(Calendar.MONTH) + 1;date = curDate.get(Calendar.DATE);propKey = "dir"+i;propVal = year + "-" +(month < 10 ? "0" + month : month) + "-" +(date < 10 ? "0" + date : date);props.setProperty(propKey, propVal);curDate.setTimeInMillis(curDate.getTimeInMillis() - dayMillis);}OutputStream os = new FileOutputStream(propFile);props.store(os, "");os.close();} elsethrow new RuntimeException(OOZIE_ACTION_OUTPUT_PROPERTIES+ " System property not defined");}
}

代码5: 目录计算程序

结论

在这篇文章中,我们向你展示了一个更复杂的完整的工作流示例,它让我们可以演示更多的Oozie特性以及对它们的应用。在下一篇文章中,我们会讨论构建可重用的Oozie组件库,并使用自定义的节点扩展Oozie。

致谢

非常感谢我们在Navteq的同事Gregory Titievsky,他为我们实现了大部分代码。

关于作者

Boris Lublinsky是NAVTEQ公司的首席架构师,在这家公司中他的工作是为大型数据管理和处理、SOA以及实现各种NAVTEQ的项目定义架构的愿景。他还是InfoQ的SOA编辑,以及OASIS的SOA RA工作组的参与者。Boris是一位作者,还经常发表演讲,他最新的一本书是《Applied SOA》。

Michael Segel在过去二十多年间一直与客户写作,识别并解决他们的业务问题。Michael已经作为多种角色、在多个行业中工作过。他是一位独立顾问,总是期望能够解决所有有挑战的问题。Michael拥有俄亥俄州立大学的软件工程学位。

参考信息

1. Boris Lublinsky,Mike Segel 《Oozie简介》

这篇关于oozie应用介绍的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/477960

相关文章

Python中随机休眠技术原理与应用详解

《Python中随机休眠技术原理与应用详解》在编程中,让程序暂停执行特定时间是常见需求,当需要引入不确定性时,随机休眠就成为关键技巧,下面我们就来看看Python中随机休眠技术的具体实现与应用吧... 目录引言一、实现原理与基础方法1.1 核心函数解析1.2 基础实现模板1.3 整数版实现二、典型应用场景2

Python Dash框架在数据可视化仪表板中的应用与实践记录

《PythonDash框架在数据可视化仪表板中的应用与实践记录》Python的PlotlyDash库提供了一种简便且强大的方式来构建和展示互动式数据仪表板,本篇文章将深入探讨如何使用Dash设计一... 目录python Dash框架在数据可视化仪表板中的应用与实践1. 什么是Plotly Dash?1.1

Android Kotlin 高阶函数详解及其在协程中的应用小结

《AndroidKotlin高阶函数详解及其在协程中的应用小结》高阶函数是Kotlin中的一个重要特性,它能够将函数作为一等公民(First-ClassCitizen),使得代码更加简洁、灵活和可... 目录1. 引言2. 什么是高阶函数?3. 高阶函数的基础用法3.1 传递函数作为参数3.2 Lambda

MySQL中慢SQL优化的不同方式介绍

《MySQL中慢SQL优化的不同方式介绍》慢SQL的优化,主要从两个方面考虑,SQL语句本身的优化,以及数据库设计的优化,下面小编就来给大家介绍一下有哪些方式可以优化慢SQL吧... 目录避免不必要的列分页优化索引优化JOIN 的优化排序优化UNION 优化慢 SQL 的优化,主要从两个方面考虑,SQL 语

Java中&和&&以及|和||的区别、应用场景和代码示例

《Java中&和&&以及|和||的区别、应用场景和代码示例》:本文主要介绍Java中的逻辑运算符&、&&、|和||的区别,包括它们在布尔和整数类型上的应用,文中通过代码介绍的非常详细,需要的朋友可... 目录前言1. & 和 &&代码示例2. | 和 ||代码示例3. 为什么要使用 & 和 | 而不是总是使

Python循环缓冲区的应用详解

《Python循环缓冲区的应用详解》循环缓冲区是一个线性缓冲区,逻辑上被视为一个循环的结构,本文主要为大家介绍了Python中循环缓冲区的相关应用,有兴趣的小伙伴可以了解一下... 目录什么是循环缓冲区循环缓冲区的结构python中的循环缓冲区实现运行循环缓冲区循环缓冲区的优势应用案例Python中的实现库

SpringBoot整合MybatisPlus的基本应用指南

《SpringBoot整合MybatisPlus的基本应用指南》MyBatis-Plus,简称MP,是一个MyBatis的增强工具,在MyBatis的基础上只做增强不做改变,下面小编就来和大家介绍一下... 目录一、MyBATisPlus简介二、SpringBoot整合MybatisPlus1、创建数据库和

C++中函数模板与类模板的简单使用及区别介绍

《C++中函数模板与类模板的简单使用及区别介绍》这篇文章介绍了C++中的模板机制,包括函数模板和类模板的概念、语法和实际应用,函数模板通过类型参数实现泛型操作,而类模板允许创建可处理多种数据类型的类,... 目录一、函数模板定义语法真实示例二、类模板三、关键区别四、注意事项 ‌在C++中,模板是实现泛型编程

Python实现html转png的完美方案介绍

《Python实现html转png的完美方案介绍》这篇文章主要为大家详细介绍了如何使用Python实现html转png功能,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 1.增强稳定性与错误处理建议使用三层异常捕获结构:try: with sync_playwright(

Java使用多线程处理未知任务数的方案介绍

《Java使用多线程处理未知任务数的方案介绍》这篇文章主要为大家详细介绍了Java如何使用多线程实现处理未知任务数,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 知道任务个数,你可以定义好线程数规则,生成线程数去跑代码说明:1.虚拟线程池:使用 Executors.newVir