DataX教程(08)- 监控与汇报

2024-03-10 11:59
文章标签 教程 监控 08 汇报 datax

本文主要是介绍DataX教程(08)- 监控与汇报,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • 01 引言
  • 02 监控功能
    • 2.1 ErrorRecordChecker
    • 2.2 ErrorRecordChecker源码
    • 2.3 ErrorRecordChecker检查时机
  • 03 汇报功能
    • 3.1 汇报运行流程
    • 3.2 汇报的运行流程
      • 3.2.1 汇报的几个角色
      • 3.2.2 汇报的流程
    • 3.3 什么时候写信息内容
    • 3.4 Channel通讯信息接收
  • 04 文末

01 引言

通过前面的博文,我们对DataX有了一定的深入了解了:

  • 《DataX教程(01)- 入门》
  • 《DataX教程(02)- IDEA运行DataX完整流程(填完所有的坑)》
  • 《DataX教程(03)- 源码解读(超详细版)
  • 《DataX教程(04)- 配置完整解读》
  • 《DataX教程(05)- DataX Web项目实践》
  • 《DataX教程(06)- DataX调优》
  • 《DataX教程(07)- 图解DataX任务分配及执行流程》

本文主要讲解DataX的监控与汇报功能。

02 监控功能

2.1 ErrorRecordChecker

JobContainer类里面,可以看到引用了一个类ErrorRecordChecker,它在JobContainer初始化的时候做了初始操作。
在这里插入图片描述
ErrorChecker是一个监控类,主要用来检查任务是否到达错误记录限制。有检查条数(recordLimit)和百分比(percentageLimit)两种方式:

  1. errorRecord表示出错条数不能大于限制数,当超过时任务失败。比如errorRecord为0表示不容许任何脏数据;
  2. errorPercentage表示出错比例,在任务结束时校验;
  3. errorRecord优先级高于errorPercentage

2.2 ErrorRecordChecker源码

Control+O可以看到ErrorRecordChecker,有如下几个方法:
在这里插入图片描述
这里主要做简要描述,

① 构造函数ErrorRecordChecker(Configuration configuration):主要就是从任务配置文件job.json里面获取errorLimit.record错误记录数限制及errorLimit.percentage错误记录百分比的值:

public ErrorRecordChecker(Configuration configuration) {this(configuration.getLong(CoreConstant.DATAX_JOB_SETTING_ERRORLIMIT_RECORD),configuration.getDouble(CoreConstant.DATAX_JOB_SETTING_ERRORLIMIT_PERCENT));}

② 检查错误记录数限制checkRecordLimit(Communication communication):主要就是从communication里获取总共的错误记录数,然后判断是否超出配置的值,如果是,则抛出异常

 public void checkRecordLimit(Communication communication) {if (recordLimit == null) {return;}long errorNumber = CommunicationTool.getTotalErrorRecords(communication);if (recordLimit < errorNumber) {LOG.debug(String.format("Error-limit set to %d, error count check.",recordLimit));throw DataXException.asDataXException(FrameworkErrorCode.PLUGIN_DIRTY_DATA_LIMIT_EXCEED,String.format("脏数据条数检查不通过,限制是[%d]条,但实际上捕获了[%d]条.",recordLimit, errorNumber));}}

③ 检查错误记录百分比checkPercentageLimit(Communication communication):主要就是从communication里获取总共的错误记录数与总数的百分比值,然后判断是否超出配置的值,如果是,则抛出异常:

public void checkPercentageLimit(Communication communication) {if (percentageLimit == null) {return;}LOG.debug(String.format("Error-limit set to %f, error percent check.", percentageLimit));long total = CommunicationTool.getTotalReadRecords(communication);long error = CommunicationTool.getTotalErrorRecords(communication);if (total > 0 && ((double) error / (double) total) > percentageLimit) {throw DataXException.asDataXException(FrameworkErrorCode.PLUGIN_DIRTY_DATA_LIMIT_EXCEED,String.format("脏数据百分比检查不通过,限制是[%f],但实际上捕获到[%f].",percentageLimit, ((double) error / (double) total)));}}

好了,这里就讲完了ErrorRecordChecker的功能了,注意check方法里面有一个Communication类,这是一个通讯类,主要用来保存当前任务的状态信息的,接下来也会讲解。

2.3 ErrorRecordChecker检查时机

Control点击可以看到ErrorRecordCheckerJobContainer调用(初始化,前面已讲),以及在AbstractScheduler任务任务调度schedule方法执行的时候调用了。
在这里插入图片描述
再看看check方法在哪里调用了,经过追踪,可以分析得出:

  • JobContainerschedule方法结束后会调用,检查整个任务的错误记录数
  • AbstractSchedulerschedule方法,里面开了一个while死循环,不断去采集任务的状态,检查的时间间隔配置(core.container.job.sleepInterval)在core.json里面的job.sleepInterval里配置。

最后贴下,AbstractScheduler的schedule方法实现实时采集的代码:

 while (true) {/*** step 1: collect job stat* step 2: getReport info, then report it* step 3: errorLimit do check* step 4: dealSucceedStat();* step 5: dealKillingStat();* step 6: dealFailedStat();* step 7: refresh last job stat, and then sleep for next while** above steps, some ones should report info to DS**/Communication nowJobContainerCommunication = this.containerCommunicator.collect();nowJobContainerCommunication.setTimestamp(System.currentTimeMillis());LOG.debug(nowJobContainerCommunication.toString());//汇报周期long now = System.currentTimeMillis();if (now - lastReportTimeStamp > jobReportIntervalInMillSec) {Communication reportCommunication = CommunicationTool.getReportCommunication(nowJobContainerCommunication, lastJobContainerCommunication, totalTasks);this.containerCommunicator.report(reportCommunication);lastReportTimeStamp = now;lastJobContainerCommunication = nowJobContainerCommunication;}errorLimit.checkRecordLimit(nowJobContainerCommunication);if (nowJobContainerCommunication.getState() == State.SUCCEEDED) {LOG.info("Scheduler accomplished all tasks.");break;}if (isJobKilling(this.getJobId())) {dealKillingStat(this.containerCommunicator, totalTasks);} else if (nowJobContainerCommunication.getState() == State.FAILED) {dealFailedStat(this.containerCommunicator, nowJobContainerCommunication.getThrowable());}Thread.sleep(jobSleepIntervalInMillSec);
}

03 汇报功能

3.1 汇报运行流程

友情提示:可能图片较大,建议下载下来使用图片编辑器查看

首先贴上一张图,里面描述的是Scheduler调度器与ErrorRecordChecker错误检查器及Communicator通讯者的整个调用关系,从上往下看:
在这里插入图片描述

3.2 汇报的运行流程

3.2.1 汇报的几个角色

汇报主要有几个重要的角色:

  • AbstractCommunicator通讯者抽象类:主要用来做通讯的协调;
  • Communication通讯的信息载体:主要用来存放通讯过程中产生的信息,为单例;
  • LocalTGCommunicationManager通讯信息载体工厂:根据任务id来获取通讯信息载体单例的工厂;
  • CommunicationTool信息载体工具类:此工具类是通讯业务层的处理,主要用来收集当前信息,并写入到Communication通讯的信息载体;
  • AbstractReporter信息上报:用来上报通讯信息。

3.2.2 汇报的流程

简要的流程描述:

  1. 首先根据配置new一个通讯者对象,有两种,分别为“StandAloneJobContainerCommunicator”、“StandAloneTGContainerCommunicator”,生成后,注入进Scheduler调度者,此时,Scheduler就有了一个Communicator工具了;
  2. 通讯者Communicator使用collect方法生成通讯的载体,也就是Communication,用来存放任务的相关信息,ErrorRecorder就是从这个Communication里获取当前任务的信息的;
  3. Scheduler调度器类里面,使用Communicator通讯工具的collect方法来获取communication通讯载体单例(获取单例方法在LocalTGCommunicationManager类,里面定义了Mapkey为任务idvalueCommunication通讯载体);
  4. Scheduler获取到Communication通讯载体后,使用CommunicationTool工具类把当前任务的状态信息写入;
  5. 最后使用reporter来上报Communication信息。

3.3 什么时候写信息内容

前面的3.13.2只做到了通讯类Communicator和通讯信息载体Communication的初始化,以及上报的流程,但是没有针对到哪里写入内容到Communication?这里直接看写入信息到Communication的地方,核心内容在TaskGroupContainer里面,下面来看看:

①首先根据任务id获取Communication的代码地方,在内部类TaskExecutor构造函数的地方:
在这里插入图片描述
②把Communication注入进Channel通道类,Channel通道类主要做内容的记录(核心:统计和限速都在这里):
在这里插入图片描述
Channel注入进了BufferedRecordExchangerBufferedRecordTransformerExchanger
而这连个Exchanger主要是为了记录RecordSender记录发送者、RecordReceiver记录接收者、TransformerExchanger的内容,就是记录ETL这3个模块里面的内容
在这里插入图片描述

根据流程,可以看到Channel类使用来收集ETL的信息的,那么看看Channel这个类的一些核心方法。

3.4 Channel通讯信息接收

Channel类有很多的方法,Control+O可以看到:
在这里插入图片描述
举个例子,可以看看Channelpush(final Record r)方法:

public void push(final Record r) {Validate.notNull(r, "record不能为空.");this.doPush(r);this.statPush(1L, r.getByteSize());}

进入statPush方法:

private void statPush(long recordSize, long byteSize) {currentCommunication.increaseCounter(CommunicationTool.READ_SUCCEED_RECORDS,recordSize);currentCommunication.increaseCounter(CommunicationTool.READ_SUCCEED_BYTES,byteSize);//在读的时候进行统计waitCounter即可,因为写(pull)的时候可能正在阻塞,但读的时候已经能读到这个阻塞的counter数currentCommunication.setLongCounter(CommunicationTool.WAIT_READER_TIME, waitReaderTime);currentCommunication.setLongCounter(CommunicationTool.WAIT_WRITER_TIME, waitWriterTime);boolean isChannelByteSpeedLimit = (this.byteSpeed > 0);boolean isChannelRecordSpeedLimit = (this.recordSpeed > 0);if (!isChannelByteSpeedLimit && !isChannelRecordSpeedLimit) {return;}long lastTimestamp = lastCommunication.getTimestamp();long nowTimestamp = System.currentTimeMillis();long interval = nowTimestamp - lastTimestamp;if (interval - this.flowControlInterval >= 0) {long byteLimitSleepTime = 0;long recordLimitSleepTime = 0;if (isChannelByteSpeedLimit) {long currentByteSpeed = (CommunicationTool.getTotalReadBytes(currentCommunication) -CommunicationTool.getTotalReadBytes(lastCommunication)) * 1000 / interval;if (currentByteSpeed > this.byteSpeed) {// 计算根据byteLimit得到的休眠时间byteLimitSleepTime = currentByteSpeed * interval / this.byteSpeed- interval;}}if (isChannelRecordSpeedLimit) {long currentRecordSpeed = (CommunicationTool.getTotalReadRecords(currentCommunication) -CommunicationTool.getTotalReadRecords(lastCommunication)) * 1000 / interval;if (currentRecordSpeed > this.recordSpeed) {// 计算根据recordLimit得到的休眠时间recordLimitSleepTime = currentRecordSpeed * interval / this.recordSpeed- interval;}}// 休眠时间取较大值long sleepTime = byteLimitSleepTime < recordLimitSleepTime ?recordLimitSleepTime : byteLimitSleepTime;if (sleepTime > 0) {try {Thread.sleep(sleepTime);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}lastCommunication.setLongCounter(CommunicationTool.READ_SUCCEED_BYTES,currentCommunication.getLongCounter(CommunicationTool.READ_SUCCEED_BYTES));lastCommunication.setLongCounter(CommunicationTool.READ_FAILED_BYTES,currentCommunication.getLongCounter(CommunicationTool.READ_FAILED_BYTES));lastCommunication.setLongCounter(CommunicationTool.READ_SUCCEED_RECORDS,currentCommunication.getLongCounter(CommunicationTool.READ_SUCCEED_RECORDS));lastCommunication.setLongCounter(CommunicationTool.READ_FAILED_RECORDS,currentCommunication.getLongCounter(CommunicationTool.READ_FAILED_RECORDS));lastCommunication.setTimestamp(nowTimestamp);}}

可以看到把内容都设置进Communication信息载体了,这里还有其它的方法如pushAll等。大家Control鼠标点一下就能trace整个调用链了,其实就是不同的插件调用触发Exchanger方法,然后在Exchanger里面调用Channel的方法来记录到Communication信息载体。

04 文末

好了,到此把DataX的监控与汇报功能讲解完毕了,有疑问的童鞋欢迎留言,谢谢大家的阅读,本文完!

这篇关于DataX教程(08)- 监控与汇报的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/794181

相关文章

Linux使用nload监控网络流量的方法

《Linux使用nload监控网络流量的方法》Linux中的nload命令是一个用于实时监控网络流量的工具,它提供了传入和传出流量的可视化表示,帮助用户一目了然地了解网络活动,本文给大家介绍了Linu... 目录简介安装示例用法基础用法指定网络接口限制显示特定流量类型指定刷新率设置流量速率的显示单位监控多个

Ubuntu固定虚拟机ip地址的方法教程

《Ubuntu固定虚拟机ip地址的方法教程》本文详细介绍了如何在Ubuntu虚拟机中固定IP地址,包括检查和编辑`/etc/apt/sources.list`文件、更新网络配置文件以及使用Networ... 1、由于虚拟机网络是桥接,所以ip地址会不停地变化,接下来我们就讲述ip如何固定 2、如果apt安

PyCharm 接入 DeepSeek最新完整教程

《PyCharm接入DeepSeek最新完整教程》文章介绍了DeepSeek-V3模型的性能提升以及如何在PyCharm中接入和使用DeepSeek进行代码开发,本文通过图文并茂的形式给大家介绍的... 目录DeepSeek-V3效果演示创建API Key在PyCharm中下载Continue插件配置Con

Deepseek R1模型本地化部署+API接口调用详细教程(释放AI生产力)

《DeepseekR1模型本地化部署+API接口调用详细教程(释放AI生产力)》本文介绍了本地部署DeepSeekR1模型和通过API调用将其集成到VSCode中的过程,作者详细步骤展示了如何下载和... 目录前言一、deepseek R1模型与chatGPT o1系列模型对比二、本地部署步骤1.安装oll

在不同系统间迁移Python程序的方法与教程

《在不同系统间迁移Python程序的方法与教程》本文介绍了几种将Windows上编写的Python程序迁移到Linux服务器上的方法,包括使用虚拟环境和依赖冻结、容器化技术(如Docker)、使用An... 目录使用虚拟环境和依赖冻结1. 创建虚拟环境2. 冻结依赖使用容器化技术(如 docker)1. 创

Spring Boot整合log4j2日志配置的详细教程

《SpringBoot整合log4j2日志配置的详细教程》:本文主要介绍SpringBoot项目中整合Log4j2日志框架的步骤和配置,包括常用日志框架的比较、配置参数介绍、Log4j2配置详解... 目录前言一、常用日志框架二、配置参数介绍1. 日志级别2. 输出形式3. 日志格式3.1 PatternL

MySQL8.2.0安装教程分享

《MySQL8.2.0安装教程分享》这篇文章详细介绍了如何在Windows系统上安装MySQL数据库软件,包括下载、安装、配置和设置环境变量的步骤... 目录mysql的安装图文1.python访问网址2javascript.点击3.进入Downloads向下滑动4.选择Community Server5.

CentOS系统Maven安装教程分享

《CentOS系统Maven安装教程分享》本文介绍了如何在CentOS系统中安装Maven,并提供了一个简单的实际应用案例,安装Maven需要先安装Java和设置环境变量,Maven可以自动管理项目的... 目录准备工作下载并安装Maven常见问题及解决方法实际应用案例总结Maven是一个流行的项目管理工具

本地私有化部署DeepSeek模型的详细教程

《本地私有化部署DeepSeek模型的详细教程》DeepSeek模型是一种强大的语言模型,本地私有化部署可以让用户在自己的环境中安全、高效地使用该模型,避免数据传输到外部带来的安全风险,同时也能根据自... 目录一、引言二、环境准备(一)硬件要求(二)软件要求(三)创建虚拟环境三、安装依赖库四、获取 Dee

通过prometheus监控Tomcat运行状态的操作流程

《通过prometheus监控Tomcat运行状态的操作流程》文章介绍了如何安装和配置Tomcat,并使用Prometheus和TomcatExporter来监控Tomcat的运行状态,文章详细讲解了... 目录Tomcat安装配置以及prometheus监控Tomcat一. 安装并配置tomcat1、安装