DataX实战应用

2024-05-14 15:18
文章标签 实战 应用 datax

本文主要是介绍DataX实战应用,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

前言

系统架构

关键实现

系统目前使用现状

DataX使用心得


前言

DataX是阿里开源数据同步工具,实现异构数据源的数据同步,Github地址:https://github.com/alibaba/DataX,企业存储离线数据到数仓,但是没办法对接业务,本次实践主要是运用DataX实现数据从数仓导入到MySQL,从而对接业务,另外,对数仓数据的流出进行管理。

一般从数仓数据导入到MySQL中,可以从hive查询存储到一个文件里面,如果是数据量比较大的情况下先将文件按一定行数切分为多个文件,然后遍历文件往MySQL中导入,这种方式虽然简单,缺点在于对于每一个导入需求,都需要写一个job,并且每次都会产生临时文件,mysql load会比较占用资源,之所以选择了DataX,因为它能实现hdfs导入MySQL,速度快,能实现增量全量,可以分表,能减少很多技术的实现成本。

系统架构

本次应用主要需实现一个Client和Server,依托于Hadoop集群,通过Schedule基础平台定时调度调用client实现数据的同步。Server是一个管理项目,后端使用Spring MVC工程,对接了元数据系统,直接从前端页面可以配置从Hive表导入到具体的MySQL表,client是一个jar包,通过HTTP和MQ和Server通信。

运行流程

1.Schedule通过java -jar client.jar启动client

2.client通过http接口调用到server,获取到本次要同步的一个job,通过job的信息生成一个json串,分配一个线程,client中通过python datax.py xxx.json调用DataX完成数据同步

3.client完成json文件的输出,输出到指定的文件夹下,文件名采用时间戳命名。

4. client中通过执行shell命令完成对DataX的调用

发布流程:

1.DataX部署,这里只使用到了mysqlwriter和hdfsreader组件,所以通过clone DataX源码,删除掉了其余不用的组件代码,手动编译进行部署,编译后大约占用200M空间,部署到hadoop集群的各个slave机器,正常情况下DataX只需要一次部署。

2.server发布,server通过jenkins发布为一个web工程,后端提供http接口,并监听MQ。

3.client发布,client通过jenkins发布到hadoop集群的slave机器。

关键实现

1.日志监控,schedule调起client任务一次对应的日志应该都要全部输出到schedule中,并且应该要捕捉到DataX的运行日志。关键代码:

//commond是运行DataX的命令:python datax/bin/datax.py xxx.json 
Process process = RUNTIME.exec(command);
try (BufferedReader input = new BufferedReader(new InputStreamReader(process.getInputStream()))) {String line;while ((line = input.readLine()) != null) {System.out.println(line);}
} catch (IOException e) {LOG.error("读取DataX中job运行日志异常", e);
}

2.作业告警,采用了Schedule调度系统的告警,DataX导入失败会触发调度系统的告警,然后通过电话和邮件对负责人进行通知。

3.Hive分区表,hive中对单个表进行分区存储,一般按照日期进行分区,在schedule调起client时可以传入一个日期参数,然后根据日期生成一个日期分区,在hdfsreader中的分区中带入即可:

 "path": "/user/hive/warehouse/order/20180831","defaultFS": "hdfs://localhost:2181",

4.MySQL增量和全量,DataX很贴心,mysqlwriter可以选择导入的模式。

"writer":{"name": "mysqlwriter","parameter": {"writeMode": "insert",  // 写入模式可以选择insert/replace/update"username": "root","password": "root","column": ["id","name"],"session": ["set session sql_mode='ANSI'"],"preSql": ["delete from test" // 预执行SQL,可以选择一个delete语句],"connection": [{"jdbcUrl": "jdbc:mysql://127.0.0.1:3306/datax?useUnicode=true&characterEncoding=gbk","table": ["test"]}]}
}

其中parameter中的writeMode可以选择insert,update,replace。并且有preSql选项,也就是在导入之前可以执行一些SQL,对我们全量数据和增量数据提供了有效的方案。

全量数据导入:通过preSql传入一条delete语句,然后writerMode选择insert执行导入(甚至如果有比较高的权限,可以直接用truncate语句,但是一般dba才有的这样权限),在delete的时候需要注意,如果数据量太大的情况下,一条delete就足以让dba找上门拉,所以添加索引就有必要拉,根据索引删除就会快一些。

增量数据导入:选择writeMode使用update,这里dataX是通过on duplicate key update语句来实现数据的增量实现,也就是说表里面有唯一键或者主键冲突时,覆盖原来的数据,这就很容易理解了,比如订单数据,订单号肯定是一个唯一键,如果导入一个已存在的订单,数据肯定会覆盖原来的数据。

5.为什么要用MQ,这里client在调用DataX执行完成之后,应该要通知到server,当前的任务执行完毕,server会更新当前任务的状态,对于简单检查任务的状态时,就不用去查看schedule中的日志了。

系统目前使用现状

1.稳定性强,系统上线2个月以来,有大约20个作业在运行,其中有5个小时运行的作业,从来没有出现过问题。

2.速度快,目前同步数据量最大的一个表大概是1.4亿的数据量,同步的时间在1100s左右,也就是18分钟到19分钟这样一个时间,每秒最高写入量达到14w+,平均每秒写入量大约在13w左右的速度,速度保持一个平均的趋势,下面是一个运行结果的截图

DataX使用心得

1.DataX是一个高可用的数据同步工具,稳定性强,速度快,上手快(不知道二次开发会不会困难,有机会可以试试,但是目前的功能已经能满足很大一部分需求)。

2.事务的支持不足,在github上看到的DataX支持的一个线程中的事务,在测试过程中没有体会到这种小事务能带来多大的收益,在有脏数据的情况下,如果当前线程的数据出现问题执行回滚,其实对于整体来说数据是不完整的,还是有一部分数据已经导入到MySQL 了,所以单个线程的事务其实对于数据量较大的数据来说收益不大,除非作业是通过一个线程来跑的。

但是,另一方面,如果在大量数据导入执行一个事务,对MySQL来说无疑是一个灾难,所以最好的办法是保证系统结构稳定。

这里其实有一个办法,如果是全量的数据,在job跑失败之后重新跑,能证数据的准确性,但是跑失败的那段时间数据是有缺失的;如果是增量数据导入,重新跑一下job也能保证数据没有问题。但是在那段小时间内还是有问题的。

确实是没有什么好的办法去支持完整的事务。

欢迎交流。

这篇关于DataX实战应用的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/989099

相关文章

Spring Boot + MyBatis Plus 高效开发实战从入门到进阶优化(推荐)

《SpringBoot+MyBatisPlus高效开发实战从入门到进阶优化(推荐)》本文将详细介绍SpringBoot+MyBatisPlus的完整开发流程,并深入剖析分页查询、批量操作、动... 目录Spring Boot + MyBATis Plus 高效开发实战:从入门到进阶优化1. MyBatis

MyBatis 动态 SQL 优化之标签的实战与技巧(常见用法)

《MyBatis动态SQL优化之标签的实战与技巧(常见用法)》本文通过详细的示例和实际应用场景,介绍了如何有效利用这些标签来优化MyBatis配置,提升开发效率,确保SQL的高效执行和安全性,感... 目录动态SQL详解一、动态SQL的核心概念1.1 什么是动态SQL?1.2 动态SQL的优点1.3 动态S

Pandas使用SQLite3实战

《Pandas使用SQLite3实战》本文主要介绍了Pandas使用SQLite3实战,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学... 目录1 环境准备2 从 SQLite3VlfrWQzgt 读取数据到 DataFrame基础用法:读

Python中随机休眠技术原理与应用详解

《Python中随机休眠技术原理与应用详解》在编程中,让程序暂停执行特定时间是常见需求,当需要引入不确定性时,随机休眠就成为关键技巧,下面我们就来看看Python中随机休眠技术的具体实现与应用吧... 目录引言一、实现原理与基础方法1.1 核心函数解析1.2 基础实现模板1.3 整数版实现二、典型应用场景2

Python Dash框架在数据可视化仪表板中的应用与实践记录

《PythonDash框架在数据可视化仪表板中的应用与实践记录》Python的PlotlyDash库提供了一种简便且强大的方式来构建和展示互动式数据仪表板,本篇文章将深入探讨如何使用Dash设计一... 目录python Dash框架在数据可视化仪表板中的应用与实践1. 什么是Plotly Dash?1.1

Android Kotlin 高阶函数详解及其在协程中的应用小结

《AndroidKotlin高阶函数详解及其在协程中的应用小结》高阶函数是Kotlin中的一个重要特性,它能够将函数作为一等公民(First-ClassCitizen),使得代码更加简洁、灵活和可... 目录1. 引言2. 什么是高阶函数?3. 高阶函数的基础用法3.1 传递函数作为参数3.2 Lambda

Java中&和&&以及|和||的区别、应用场景和代码示例

《Java中&和&&以及|和||的区别、应用场景和代码示例》:本文主要介绍Java中的逻辑运算符&、&&、|和||的区别,包括它们在布尔和整数类型上的应用,文中通过代码介绍的非常详细,需要的朋友可... 目录前言1. & 和 &&代码示例2. | 和 ||代码示例3. 为什么要使用 & 和 | 而不是总是使

Python实战之屏幕录制功能的实现

《Python实战之屏幕录制功能的实现》屏幕录制,即屏幕捕获,是指将计算机屏幕上的活动记录下来,生成视频文件,本文主要为大家介绍了如何使用Python实现这一功能,希望对大家有所帮助... 目录屏幕录制原理图像捕获音频捕获编码压缩输出保存完整的屏幕录制工具高级功能实时预览增加水印多平台支持屏幕录制原理屏幕

Python循环缓冲区的应用详解

《Python循环缓冲区的应用详解》循环缓冲区是一个线性缓冲区,逻辑上被视为一个循环的结构,本文主要为大家介绍了Python中循环缓冲区的相关应用,有兴趣的小伙伴可以了解一下... 目录什么是循环缓冲区循环缓冲区的结构python中的循环缓冲区实现运行循环缓冲区循环缓冲区的优势应用案例Python中的实现库

SpringBoot整合MybatisPlus的基本应用指南

《SpringBoot整合MybatisPlus的基本应用指南》MyBatis-Plus,简称MP,是一个MyBatis的增强工具,在MyBatis的基础上只做增强不做改变,下面小编就来和大家介绍一下... 目录一、MyBATisPlus简介二、SpringBoot整合MybatisPlus1、创建数据库和