Datax与hadoop2.x兼容部署与实际项目应用工作记录分享

2024-06-12 19:58

本文主要是介绍Datax与hadoop2.x兼容部署与实际项目应用工作记录分享,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、概述

    Hadoop的版本更新挺快的,已经到了2.4,但是其周边工具的更新速度还是比较慢的,一些旧的周边工具版本对hadoop2.x的兼容性做得还不完善,特别是sqoop。最近,在为hadoop2.2.0找适合的sqoop版本时遇到了很多问题。尝试了多个sqoop1.4.x版本的直接简单粗暴的报版本不兼容问题,其中测了sqoop-1.4.4.bin__hadoop-0.23这个版本,在该版本中直接用sqoop的脚本export HDFS的数据是没有问题的,但是一旦调用JAVA API来进行对HDFS的数据的export的时候就各种不兼容问题,原因是这个版本的API也是基于hadoop1.x来写的。另外还尝试了使用sqoop2(之前blog写过关于sqoop2的部署和使用情况:http://zengzhaozheng.blog.51cto.com/8219051/1431882 ),这个版本取消了sqoop1的脚本执行方式,可以采取交互式、api或者rest的方式工作,但是我在使用的过程中还是存在的一些问题:sqoop2(我用的是1.99.3)无法指定列的分隔符、对\N等字符的处理有问题、对列值的类型判断存在问题等(其详细问题所在请看,sqoop1.99.3源代码的org.apache.sqoop.job.io.Data类)。

    这个礼拜终于找到了一个比较好的方案来取代sqoop作为HDFS到mysql的数据export模块,那就是大淘宝开源的datax。虽然datax采用的是单机方式的作业方式,但是经过试验我对比了一下其和sqoop性能上的差异,在数据量不是特别大的情况下datax和sqoop的性能相差不是很明显的,在少量数据的情况下datax的性能稍微好点。

    这篇blog将简单介绍一下这个datax这个框架以及它的用法,特别地说说如果修改datax才能使得datax运行在hadoop2.x上(datax是基于hadoop1.x进行开发的)。另外,主要和大家分享一下我在自己项目中如何使用datax,如何通过自己编写的shell脚本将datax、mysql和项目粘合起来。


二、datax简介和datax在hadoop2.x上的兼容部署

1、datax简介

    DataX是一个在异构的数据库/文件系统之间高速交换数据的工具,实现了在任意的数据处理系统(RDBMS/Hdfs/Local filesystem)之间的数据交换。Datax框架中我最欣赏的就是基于插件的模式,你在部署的时候可以只安装那些用到的Reader/Writer插件rpm包,没有用的可以不用安装。同时,你也可以根据自己的特殊需求很快的写出Reader、Writer。Datax采用Framework + plugin架构构建,Framework处理了缓冲,流控,并发,上下文加载等高速数据交换的大部分技术问题,提供了简单的接口与插件交互,插件仅需实现对数据处理系统的访问。Datax的运行方式采用stand-alone方式,在数据传输过程在单进程内完成,全内存操作,不读写磁盘,也没有IPC通信。下面是一个来自大淘宝开源官网的datax架构图:

wKiom1PtpMqgr7e2AAGmnn3LYHc088.jpg

各个组件的作用:

  • Job: 一道数据同步作业

  • Splitter: 作业切分模块,将一个大任务与分解成多个可以并发的小任务.

  • Sub-job: 数据同步作业切分后的小任务

  • Reader(Loader): 数据读入模块,负责运行切分后的小任务,将数据从源头装载入DataX

  • Storage: Reader和Writer通过Storage交换数据

  • Writer(Dumper): 数据写出模块,负责将数据从DataX导入至目的数据地


Datax内置插件:
    DataX框架内部通过双缓冲队列、线程池封装等技术,集中处理了高速数据交换遇到的问题,提供简单的接口与插件交互,插件分为Reader和Writer两类,基于框架提供的插件接口,可以十分便捷的开发出需要的插件。比如想要从oracle导出数据到mysql,那么需要做的就是开发出OracleReader和MysqlWriter插件,装配到框架上即可。并且这样的插件一般情况下在其他数据交换场合是可以通用的。更大的惊喜是我们已经开发了如下插件:


Reader插件

  • hdfsreader : 支持从hdfs文件系统获取数据。

  • mysqlreader: 支持从mysql数据库获取数据。

  • sqlserverreader: 支持从sqlserver数据库获取数据。

  • oraclereader : 支持从oracle数据库获取数据。

  • streamreader: 支持从stream流获取数据(常用于测试)

  • httpreader : 支持从http URL获取数据。


Writer插件

  • hdfswriter:支持向hdbf写入数据。

  • mysqlwriter:支持向mysql写入数据。

  • oraclewriter:支持向oracle写入数据。

  • streamwriter:支持向stream流写入数据。(常用于测试)


2、datax在hadoop2.x上的兼容部署

    Datax是基于Hadoop1.x开发的,因此要想基于HADOOP2.x使用hdfsreader和hdfswriter插件,那么必须对这些插件的本地库以及一些jar包替换掉,同时要增加Hadoop2.x所需的依赖包,下面以hdfsreader为例说明:

wKiom1PtqoqDx-HSAAJAkufP5EQ380.jpg

进入到plugins目录找到hdfsreader,将hadoop-0.19.2-core.jar删除,将本地库替换为$HAOOP_HOME2.x/lib/native/libhadoop.so。同时添加Hadoop2.x的依赖包,如下图:

wKiom1Ptq5DAmRCLAAMw-fhcS7w743.jpg

另外,Datax需要hadoop1.x的hadoop-core.xml配置文件,但是hadoop2.x中不存在这个文件,这里有一个解决方法,就是将各个配置文件的配置项都集中写到一个新建的配置文件中,单独有datax使用,这个配置文件在datax的job xml文件由参数hadoop-conf配上。到现在为止,datax与hadoop2.x的兼容性修改已经完成了。

    还要做其他环境的调整,确保java版本>=1.6,python的版本>=2.6(对于python的版本选择上,个人推荐2.6或者2.7,如果pytyon版本上到3.x的话会有错误,个人经验)。最后修改一下各个插件的rpm包的build路径:

wKioL1PtrxOBC2nQAAQmgVRCwRk456.jpg

下面以t_dp_datax_engine.spec为例子:

wKioL1Ptr1TzudZgAAHquH2uYQQ529.jpg

上面红色方框的地方是指build rpm 插件后新产生的文件夹位置,改为自己编辑的目录。

下面以t_dp_datax_engine.spec为例子,看看怎么build rpm 插件:

具体执行过程如下:

1、请先check out一份DataX源码,并cd切换到DataX源码中的rpm目录

2、编译打包DataX engine包,使用rpmbuild --ba t_dp_datax_engine.spec(请确保有root权限),打包生成的rpm后如下图所示

 wKiom1Ptr57CQUYbAACvCg12hUI694.jpg

Rpm制作完成后,即可分发、安装,例如使用

rpm -ivh  t_dp_datax_engine.rpm

即可安装DataX engine 包,需要注意的是engine的rpm地址源自于上图的截图中信息。

如下图:

 wKiom1Ptr3OwEWI7AACu95LGP0k854.jpg

安装完成后,在/home/taobao/datax/目录下会存在如下文件:

wKiom1PtsGLBAbt1AAIZeoIrKOM474.jpg


其他的插件按照这种方式按照好就ok了。


三、datax的实际应用记录分享

    在blog的这部分主要分享一下我对datax使用的一个小案例,希望能够给初用datax的同学一点点参照。

  • 具体业务场景:

    需要将存储在HDFS上的一些表export到mysql中,不希望datax对每一个表的export操作都产生一个job xml文件,希望对不同的表动态使用同一个 job xml文件(这个用datax配置文件动态参数结合shell实现)。同时,根据公司业务的需求当不同的HDFS 表export到mysql的前后还需要做一些基于mysql的DML操作(这个可以通过datax 配置文件中的pre以及post参数进行配置,但是我为了方便流程的控制用shell取代了)。

  • 实现步骤:

步骤1:

执行$DATAX_HOME/bin/datax.py -e命令,选择data source来源,这里我们选择7:

wKiom1PttJzDrEFhAADPYH21Bp8574.jpg

接着选择export的目标源,这个我们选择0:

wKioL1PttffjMsX8AACscgOOhhg820.jpg

步骤2:

根据自己的业务需求和HADOOP的相应环境配置产生的job xml,进入到$DATAX_HOME/jobs,编辑job配置文件,我的配置如下(里边的一些动态参数有下面我自己写的Shell中进行控制):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
<?xml version="1.0" encoding="UTF-8"?>
<jobs>
   <job id="hdfsreader_to_mysqlwriter_job">
     <reader>
       <plugin>hdfsreader</plugin>
       <!--
description:HDFS login account, e.g. 'username, groupname(groupname...),#password
mandatory:true
name:ugi
-->
       <param key="hadoop.job.ugi" value="hadoop,supergroup#jpkjcluster"/>
       <!--
description:hadoop-site.xml path
mandatory:false
name:hadoop_conf
-->
       <param key="hadoop_conf" value="/data/hadoop/hadoop-2.2.0/etc/hadoop/datax_hadoop_conf.xml"/>
       <!--
description:hdfs path, format like: hdfs://ip:port/path, or file:///home/taobao/
mandatory:true
name:dir
-->
       <param key="dir" value="hdfs://172.16.8.1:8020/user/hive/warehouse/jl.db/${hdfs_table}/day=${export_day}"/>
       <!--
default:\t
description:how to sperate a line
mandatory:false
name:fieldSplit
-->
       <param key="field_split" value=","/>
       <!--
default:UTF-8
range:UTF-8|GBK|GB2312
description:hdfs encode
mandatory:false
name:encoding
-->
       <param key="encoding" value="UTF-8"/>
       <!--
default:4096
range:[1024-4194304]
description:how large the buffer
mandatory:false
name:bufferSize
-->
       <param key="buffer_size" value="4096"/>
       <!--
default:\N
range:
description:replace the nullstring to null
mandatory:false
name:nullString
-->
       <param key="nullstring" value="\N"/>
       <!--
default:true
range:true|false
description:ingore key
mandatory:false
name:ignoreKey
-->
       <param key="ignore_key" value="true"/>
       <!--
default:
range:
description:how to filter column
mandatory:false
name:colFilter
       <param key="col_filter" value="?"/>
-->
       <!--
default:1
range:1-100
description:concurrency of the job
mandatory:false
name:concurrency
-->
       <param key="concurrency" value="${reader_concurrency}"/>
     </reader>
     <writer>
       <plugin>mysqlwriter</plugin>
       <!--
description:Mysql database ip address
mandatory:true
name:ip
-->
       <param key="ip" value="jl-master"/>
       <!--
default:3306
description:Mysql database port
mandatory:true
name:port
-->
       <param key="port" value="3306"/>
       <!--
description:Mysql database name
mandatory:true
name:dbname
-->
       <param key="dbname" value="newidigg_jilin"/>
       <!--
description:Mysql database login username
mandatory:true
name:username
-->
<param key="username" value="hadoop"/>
       <!--
description:Mysql database login password
mandatory:true
name:password
-->
       <param key="password" value="jpkjcluster"/>
       <!--
default:
range:
description:table to be dumped data into
mandatory:true
name:table
-->
       <param key="table" value="${mysql_table}"/>
       <!--
range:
description:order of columns
mandatory:false
name:colorder
       <param key="colorder" value="?"/>
-->
       <!--
default:UTF-8
range:UTF-8|GBK|GB2312
description:
mandatory:false
name:encoding
-->
       <param key="encoding" value="UTF-8"/>
       <!--
description:execute sql before dumping data
mandatory:false
name:pre
       <param key="pre" value="${preSql}"/>
-->
    <!--
description:execute sql after dumping data
mandatory:false
name:post
       <param key="post" value="${postSql}"/>
-->
       <!--
default:0
range:[0-65535]
description:error limit
mandatory:false
name:limit
-->
       <param key="limit" value="0"/>
       <!--
mandatory:false
name:set
       <param key="set" value="?"/>
-->
       <!--
default:false
range:[true/false]
mandatory:false
name:replace
-->
       <param key="replace" value="false"/>
       <!--
range:params1|params2|...
description:mysql driver params
mandatory:false
name:mysql.params
       <param key="mysql.params" value="?"/>
-->
       <!--
default:1
range:1-100
description:concurrency of the job
mandatory:false
-->
       <param key="concurrency" value="${writer_concurrency}"/>
     </writer>
   </job>
</jobs>


步骤3:

    编写Shell脚本export_hdfs2mysql.sh对整个Datax作业根据业务需求进行控制:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
#!/bin/bash
#author:曾昭正
#create time:2014-08-14
workspace=` dirname  $0`
dataxHome= '/data/hadoop/datax'
export_day=$1
reader_concurrency=1
writer_concurrency=1
mysqlUser= 'hadoop'
mysqlPassword= 'jpkjcluster'
mysqlServerHost= 'jl-master'
currentDatabase= 'newidigg_jilin'
preSql= ''
postSql= ''
importTable=( 'tb_userview_domain_noMdn'  'tb_fact_app_v2'  'tb_fact_domain'  'tb_fact_tag'  'tb_fact_top5_www'  'tb_fact_upwww_time'  \
'tb_fact_search'  'tb_userview_domain'  'tb_userview_kpi_order'  'tb_userview_search'  'tb_userview_time'  'tb_userview_tag' );
#function which is used to DDL or DML msyql
function  mysqlController(){
     #这里注意一下:这里的$1不同于整个脚本的参数$1,这里是指函数的第一个参数
     local  sqlString=$*
     echo  ` date  +%Y-%m-%d " " %H:%M:%S`  "执行:${sqlString}"
     mysql -u ${mysqlUser} --password=${mysqlPassword} -h ${mysqlServerHost} -e "
         use ${currentDatabase};
         ${sqlString};
     "
}
#通用表导入模块
function  commonImport(){
     local  current_table=$1
     #create temporary table before importing data into mysql.
     echo  ` date  +%Y-%m-%d " " %H:%M:%S`  "......进入处理${current_table}表入mysql库环节......"
     echo  ` date  +%Y-%m-%d " " %H:%M:%S`  "入库前创建临时表"
     preSql= "drop table if exists ${current_table}_${export_day};create table ${current_table}_${export_day} like ${current_table}"
     mysqlController ${preSql}
     
     #import data from hdfs into msyql.
     echo  ` date  +%Y-%m-%d " " %H:%M:%S`  "将hdfs的${current_table}表导入mysql...."
     #调整mysql的导入线程数
     writer_concurrency=2
     #调用Datax将hdfs文件导入mysql
     python ${dataxHome} /bin/datax .py ${dataxHome} /jobs/hdfsreader_to_mysqlwriter_1407525566122 .xml -p "-Dhdfs_table=${current_table} -Dexport_day=${export_day} -Dreader_concurrency=${reader_concurrency} -Dwriter_concurrency=${writer_concurrency} -Dmysql_table=${current_table}_${export_day}"
     
     #Updata Data Relationship after importing data into mysql.
     if  [ ${current_table} ==  "tb_userview_search"  ]
         then
           postSql="drop table  if  exists ${current_table}; \
          rename table ${current_table}_${export_day} to ${current_table}; \
          CREATE INDEX mdn_index ON ${current_table}(mdn);
           "
     else
          postSql="drop table  if  exists ${current_table}; \
          rename table ${current_table}_${export_day} to ${current_table}; \
          Alter table ${current_table} add primary key(mdn);
           "
     fi
     mysqlController ${postSql}
     echo  ` date  +%Y-%m-%d " " %H:%M:%S`  "......完成处理${current_table}表入mysql操作......"
}
for  tableItem  in  ${importTable[*]}
do
if  [ ${tableItem} ==  "tb_userview_domain"  -o ${tableItem} ==  "tb_userview_kpi_order"  -o ${tableItem} ==  "tb_userview_search"  -o ${tableItem} ==  "tb_userview_time"  -o ${tableItem} ==  "tb_userview_tag"  ]
    then
     commonImport ${tableItem}
else
     #delete dirty data
     preSql= "delete from ${tableItem} where day_id=${export_day};"
     mysqlController ${preSql}
     #调用Datax将hdfs文件导入mysql
     python ${dataxHome} /bin/datax .py ${dataxHome} /jobs/hdfsreader_to_mysqlwriter_1407525566122 .xml -p "-Dhdfs_table=${tableItem} -Dexport_day=${export_day} -Dreader_concurrency=${reader_concurrency} -Dwriter_concurrency=${writer_concurrency} -Dmysql_table=${tableItem}"
# >> ${workspace}/../logs/exportData.log
fi
done

简单说说我这个shell脚本的用途,主要是对datax中的job配置文件的动态参数进行控制。另外,根据公司业务的不同需求,这十几个需要导入mysql的表其中有些表在导入之前和导入之后需要做不同的完善工作,这个通过这shell来控制。对于这个Shell脚本我是花了点时间进行重构的,功能点还是比较清晰、简洁的。

步骤4:

    执行脚本:nohup ./export_hdfs2mysql.sh 20140815 >> ./../idigg_task/logs/export.log &  大功告成。


三、总结

    本blog主要介绍了datax框架、对它的部署、与hadoop2.x的兼容性修改和结合我的个人开发案例说了下datax的实际使用。整个Datax的部署和使用过程还是比较方便的,其效率也是相当不错,而且性能是可控的(通过job配置文件配置读、写线程数)。在大多数情况下,datax和sqoop的性能上可以作为互补,是一个相当不错的产品。另外,说说Shell。Shell是我个人最喜欢的一种威武工具,它不仅具有天然的操作系统原生优势,同时它具有强大的粘合作用,可以将各种技术非常完美的粘合在一个项目之中。熟练的掌握Shell的编写,可以使一个开发者的战斗力上升几个等级,这个是我在实际工作中总结出来的绝对的真理。


这篇关于Datax与hadoop2.x兼容部署与实际项目应用工作记录分享的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1055185

相关文章

Spring Boot 配置文件之类型、加载顺序与最佳实践记录

《SpringBoot配置文件之类型、加载顺序与最佳实践记录》SpringBoot的配置文件是灵活且强大的工具,通过合理的配置管理,可以让应用开发和部署更加高效,无论是简单的属性配置,还是复杂... 目录Spring Boot 配置文件详解一、Spring Boot 配置文件类型1.1 applicatio

Python中随机休眠技术原理与应用详解

《Python中随机休眠技术原理与应用详解》在编程中,让程序暂停执行特定时间是常见需求,当需要引入不确定性时,随机休眠就成为关键技巧,下面我们就来看看Python中随机休眠技术的具体实现与应用吧... 目录引言一、实现原理与基础方法1.1 核心函数解析1.2 基础实现模板1.3 整数版实现二、典型应用场景2

一文教你如何将maven项目转成web项目

《一文教你如何将maven项目转成web项目》在软件开发过程中,有时我们需要将一个普通的Maven项目转换为Web项目,以便能够部署到Web容器中运行,本文将详细介绍如何通过简单的步骤完成这一转换过程... 目录准备工作步骤一:修改​​pom.XML​​1.1 添加​​packaging​​标签1.2 添加

tomcat多实例部署的项目实践

《tomcat多实例部署的项目实践》Tomcat多实例是指在一台设备上运行多个Tomcat服务,这些Tomcat相互独立,本文主要介绍了tomcat多实例部署的项目实践,具有一定的参考价值,感兴趣的可... 目录1.创建项目目录,测试文China编程件2js.创建实例的安装目录3.准备实例的配置文件4.编辑实例的

MySQL INSERT语句实现当记录不存在时插入的几种方法

《MySQLINSERT语句实现当记录不存在时插入的几种方法》MySQL的INSERT语句是用于向数据库表中插入新记录的关键命令,下面:本文主要介绍MySQLINSERT语句实现当记录不存在时... 目录使用 INSERT IGNORE使用 ON DUPLICATE KEY UPDATE使用 REPLACE

SpringBoot配置Ollama实现本地部署DeepSeek

《SpringBoot配置Ollama实现本地部署DeepSeek》本文主要介绍了在本地环境中使用Ollama配置DeepSeek模型,并在IntelliJIDEA中创建一个Sprin... 目录前言详细步骤一、本地配置DeepSeek二、SpringBoot项目调用本地DeepSeek前言随着人工智能技

Python 中的异步与同步深度解析(实践记录)

《Python中的异步与同步深度解析(实践记录)》在Python编程世界里,异步和同步的概念是理解程序执行流程和性能优化的关键,这篇文章将带你深入了解它们的差异,以及阻塞和非阻塞的特性,同时通过实际... 目录python中的异步与同步:深度解析与实践异步与同步的定义异步同步阻塞与非阻塞的概念阻塞非阻塞同步

Python Dash框架在数据可视化仪表板中的应用与实践记录

《PythonDash框架在数据可视化仪表板中的应用与实践记录》Python的PlotlyDash库提供了一种简便且强大的方式来构建和展示互动式数据仪表板,本篇文章将深入探讨如何使用Dash设计一... 目录python Dash框架在数据可视化仪表板中的应用与实践1. 什么是Plotly Dash?1.1

Android Kotlin 高阶函数详解及其在协程中的应用小结

《AndroidKotlin高阶函数详解及其在协程中的应用小结》高阶函数是Kotlin中的一个重要特性,它能够将函数作为一等公民(First-ClassCitizen),使得代码更加简洁、灵活和可... 目录1. 引言2. 什么是高阶函数?3. 高阶函数的基础用法3.1 传递函数作为参数3.2 Lambda

通过Docker Compose部署MySQL的详细教程

《通过DockerCompose部署MySQL的详细教程》DockerCompose作为Docker官方的容器编排工具,为MySQL数据库部署带来了显著优势,下面小编就来为大家详细介绍一... 目录一、docker Compose 部署 mysql 的优势二、环境准备与基础配置2.1 项目目录结构2.2 基