Datax与hadoop2.x兼容部署与实际项目应用工作记录分享

2024-06-12 19:58

本文主要是介绍Datax与hadoop2.x兼容部署与实际项目应用工作记录分享,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、概述

    Hadoop的版本更新挺快的,已经到了2.4,但是其周边工具的更新速度还是比较慢的,一些旧的周边工具版本对hadoop2.x的兼容性做得还不完善,特别是sqoop。最近,在为hadoop2.2.0找适合的sqoop版本时遇到了很多问题。尝试了多个sqoop1.4.x版本的直接简单粗暴的报版本不兼容问题,其中测了sqoop-1.4.4.bin__hadoop-0.23这个版本,在该版本中直接用sqoop的脚本export HDFS的数据是没有问题的,但是一旦调用JAVA API来进行对HDFS的数据的export的时候就各种不兼容问题,原因是这个版本的API也是基于hadoop1.x来写的。另外还尝试了使用sqoop2(之前blog写过关于sqoop2的部署和使用情况:http://zengzhaozheng.blog.51cto.com/8219051/1431882 ),这个版本取消了sqoop1的脚本执行方式,可以采取交互式、api或者rest的方式工作,但是我在使用的过程中还是存在的一些问题:sqoop2(我用的是1.99.3)无法指定列的分隔符、对\N等字符的处理有问题、对列值的类型判断存在问题等(其详细问题所在请看,sqoop1.99.3源代码的org.apache.sqoop.job.io.Data类)。

    这个礼拜终于找到了一个比较好的方案来取代sqoop作为HDFS到mysql的数据export模块,那就是大淘宝开源的datax。虽然datax采用的是单机方式的作业方式,但是经过试验我对比了一下其和sqoop性能上的差异,在数据量不是特别大的情况下datax和sqoop的性能相差不是很明显的,在少量数据的情况下datax的性能稍微好点。

    这篇blog将简单介绍一下这个datax这个框架以及它的用法,特别地说说如果修改datax才能使得datax运行在hadoop2.x上(datax是基于hadoop1.x进行开发的)。另外,主要和大家分享一下我在自己项目中如何使用datax,如何通过自己编写的shell脚本将datax、mysql和项目粘合起来。


二、datax简介和datax在hadoop2.x上的兼容部署

1、datax简介

    DataX是一个在异构的数据库/文件系统之间高速交换数据的工具,实现了在任意的数据处理系统(RDBMS/Hdfs/Local filesystem)之间的数据交换。Datax框架中我最欣赏的就是基于插件的模式,你在部署的时候可以只安装那些用到的Reader/Writer插件rpm包,没有用的可以不用安装。同时,你也可以根据自己的特殊需求很快的写出Reader、Writer。Datax采用Framework + plugin架构构建,Framework处理了缓冲,流控,并发,上下文加载等高速数据交换的大部分技术问题,提供了简单的接口与插件交互,插件仅需实现对数据处理系统的访问。Datax的运行方式采用stand-alone方式,在数据传输过程在单进程内完成,全内存操作,不读写磁盘,也没有IPC通信。下面是一个来自大淘宝开源官网的datax架构图:

wKiom1PtpMqgr7e2AAGmnn3LYHc088.jpg

各个组件的作用:

  • Job: 一道数据同步作业

  • Splitter: 作业切分模块,将一个大任务与分解成多个可以并发的小任务.

  • Sub-job: 数据同步作业切分后的小任务

  • Reader(Loader): 数据读入模块,负责运行切分后的小任务,将数据从源头装载入DataX

  • Storage: Reader和Writer通过Storage交换数据

  • Writer(Dumper): 数据写出模块,负责将数据从DataX导入至目的数据地


Datax内置插件:
    DataX框架内部通过双缓冲队列、线程池封装等技术,集中处理了高速数据交换遇到的问题,提供简单的接口与插件交互,插件分为Reader和Writer两类,基于框架提供的插件接口,可以十分便捷的开发出需要的插件。比如想要从oracle导出数据到mysql,那么需要做的就是开发出OracleReader和MysqlWriter插件,装配到框架上即可。并且这样的插件一般情况下在其他数据交换场合是可以通用的。更大的惊喜是我们已经开发了如下插件:


Reader插件

  • hdfsreader : 支持从hdfs文件系统获取数据。

  • mysqlreader: 支持从mysql数据库获取数据。

  • sqlserverreader: 支持从sqlserver数据库获取数据。

  • oraclereader : 支持从oracle数据库获取数据。

  • streamreader: 支持从stream流获取数据(常用于测试)

  • httpreader : 支持从http URL获取数据。


Writer插件

  • hdfswriter:支持向hdbf写入数据。

  • mysqlwriter:支持向mysql写入数据。

  • oraclewriter:支持向oracle写入数据。

  • streamwriter:支持向stream流写入数据。(常用于测试)


2、datax在hadoop2.x上的兼容部署

    Datax是基于Hadoop1.x开发的,因此要想基于HADOOP2.x使用hdfsreader和hdfswriter插件,那么必须对这些插件的本地库以及一些jar包替换掉,同时要增加Hadoop2.x所需的依赖包,下面以hdfsreader为例说明:

wKiom1PtqoqDx-HSAAJAkufP5EQ380.jpg

进入到plugins目录找到hdfsreader,将hadoop-0.19.2-core.jar删除,将本地库替换为$HAOOP_HOME2.x/lib/native/libhadoop.so。同时添加Hadoop2.x的依赖包,如下图:

wKiom1Ptq5DAmRCLAAMw-fhcS7w743.jpg

另外,Datax需要hadoop1.x的hadoop-core.xml配置文件,但是hadoop2.x中不存在这个文件,这里有一个解决方法,就是将各个配置文件的配置项都集中写到一个新建的配置文件中,单独有datax使用,这个配置文件在datax的job xml文件由参数hadoop-conf配上。到现在为止,datax与hadoop2.x的兼容性修改已经完成了。

    还要做其他环境的调整,确保java版本>=1.6,python的版本>=2.6(对于python的版本选择上,个人推荐2.6或者2.7,如果pytyon版本上到3.x的话会有错误,个人经验)。最后修改一下各个插件的rpm包的build路径:

wKioL1PtrxOBC2nQAAQmgVRCwRk456.jpg

下面以t_dp_datax_engine.spec为例子:

wKioL1Ptr1TzudZgAAHquH2uYQQ529.jpg

上面红色方框的地方是指build rpm 插件后新产生的文件夹位置,改为自己编辑的目录。

下面以t_dp_datax_engine.spec为例子,看看怎么build rpm 插件:

具体执行过程如下:

1、请先check out一份DataX源码,并cd切换到DataX源码中的rpm目录

2、编译打包DataX engine包,使用rpmbuild --ba t_dp_datax_engine.spec(请确保有root权限),打包生成的rpm后如下图所示

 wKiom1Ptr57CQUYbAACvCg12hUI694.jpg

Rpm制作完成后,即可分发、安装,例如使用

rpm -ivh  t_dp_datax_engine.rpm

即可安装DataX engine 包,需要注意的是engine的rpm地址源自于上图的截图中信息。

如下图:

 wKiom1Ptr3OwEWI7AACu95LGP0k854.jpg

安装完成后,在/home/taobao/datax/目录下会存在如下文件:

wKiom1PtsGLBAbt1AAIZeoIrKOM474.jpg


其他的插件按照这种方式按照好就ok了。


三、datax的实际应用记录分享

    在blog的这部分主要分享一下我对datax使用的一个小案例,希望能够给初用datax的同学一点点参照。

  • 具体业务场景:

    需要将存储在HDFS上的一些表export到mysql中,不希望datax对每一个表的export操作都产生一个job xml文件,希望对不同的表动态使用同一个 job xml文件(这个用datax配置文件动态参数结合shell实现)。同时,根据公司业务的需求当不同的HDFS 表export到mysql的前后还需要做一些基于mysql的DML操作(这个可以通过datax 配置文件中的pre以及post参数进行配置,但是我为了方便流程的控制用shell取代了)。

  • 实现步骤:

步骤1:

执行$DATAX_HOME/bin/datax.py -e命令,选择data source来源,这里我们选择7:

wKiom1PttJzDrEFhAADPYH21Bp8574.jpg

接着选择export的目标源,这个我们选择0:

wKioL1PttffjMsX8AACscgOOhhg820.jpg

步骤2:

根据自己的业务需求和HADOOP的相应环境配置产生的job xml,进入到$DATAX_HOME/jobs,编辑job配置文件,我的配置如下(里边的一些动态参数有下面我自己写的Shell中进行控制):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
<?xml version="1.0" encoding="UTF-8"?>
<jobs>
   <job id="hdfsreader_to_mysqlwriter_job">
     <reader>
       <plugin>hdfsreader</plugin>
       <!--
description:HDFS login account, e.g. 'username, groupname(groupname...),#password
mandatory:true
name:ugi
-->
       <param key="hadoop.job.ugi" value="hadoop,supergroup#jpkjcluster"/>
       <!--
description:hadoop-site.xml path
mandatory:false
name:hadoop_conf
-->
       <param key="hadoop_conf" value="/data/hadoop/hadoop-2.2.0/etc/hadoop/datax_hadoop_conf.xml"/>
       <!--
description:hdfs path, format like: hdfs://ip:port/path, or file:///home/taobao/
mandatory:true
name:dir
-->
       <param key="dir" value="hdfs://172.16.8.1:8020/user/hive/warehouse/jl.db/${hdfs_table}/day=${export_day}"/>
       <!--
default:\t
description:how to sperate a line
mandatory:false
name:fieldSplit
-->
       <param key="field_split" value=","/>
       <!--
default:UTF-8
range:UTF-8|GBK|GB2312
description:hdfs encode
mandatory:false
name:encoding
-->
       <param key="encoding" value="UTF-8"/>
       <!--
default:4096
range:[1024-4194304]
description:how large the buffer
mandatory:false
name:bufferSize
-->
       <param key="buffer_size" value="4096"/>
       <!--
default:\N
range:
description:replace the nullstring to null
mandatory:false
name:nullString
-->
       <param key="nullstring" value="\N"/>
       <!--
default:true
range:true|false
description:ingore key
mandatory:false
name:ignoreKey
-->
       <param key="ignore_key" value="true"/>
       <!--
default:
range:
description:how to filter column
mandatory:false
name:colFilter
       <param key="col_filter" value="?"/>
-->
       <!--
default:1
range:1-100
description:concurrency of the job
mandatory:false
name:concurrency
-->
       <param key="concurrency" value="${reader_concurrency}"/>
     </reader>
     <writer>
       <plugin>mysqlwriter</plugin>
       <!--
description:Mysql database ip address
mandatory:true
name:ip
-->
       <param key="ip" value="jl-master"/>
       <!--
default:3306
description:Mysql database port
mandatory:true
name:port
-->
       <param key="port" value="3306"/>
       <!--
description:Mysql database name
mandatory:true
name:dbname
-->
       <param key="dbname" value="newidigg_jilin"/>
       <!--
description:Mysql database login username
mandatory:true
name:username
-->
<param key="username" value="hadoop"/>
       <!--
description:Mysql database login password
mandatory:true
name:password
-->
       <param key="password" value="jpkjcluster"/>
       <!--
default:
range:
description:table to be dumped data into
mandatory:true
name:table
-->
       <param key="table" value="${mysql_table}"/>
       <!--
range:
description:order of columns
mandatory:false
name:colorder
       <param key="colorder" value="?"/>
-->
       <!--
default:UTF-8
range:UTF-8|GBK|GB2312
description:
mandatory:false
name:encoding
-->
       <param key="encoding" value="UTF-8"/>
       <!--
description:execute sql before dumping data
mandatory:false
name:pre
       <param key="pre" value="${preSql}"/>
-->
    <!--
description:execute sql after dumping data
mandatory:false
name:post
       <param key="post" value="${postSql}"/>
-->
       <!--
default:0
range:[0-65535]
description:error limit
mandatory:false
name:limit
-->
       <param key="limit" value="0"/>
       <!--
mandatory:false
name:set
       <param key="set" value="?"/>
-->
       <!--
default:false
range:[true/false]
mandatory:false
name:replace
-->
       <param key="replace" value="false"/>
       <!--
range:params1|params2|...
description:mysql driver params
mandatory:false
name:mysql.params
       <param key="mysql.params" value="?"/>
-->
       <!--
default:1
range:1-100
description:concurrency of the job
mandatory:false
-->
       <param key="concurrency" value="${writer_concurrency}"/>
     </writer>
   </job>
</jobs>


步骤3:

    编写Shell脚本export_hdfs2mysql.sh对整个Datax作业根据业务需求进行控制:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
#!/bin/bash
#author:曾昭正
#create time:2014-08-14
workspace=` dirname  $0`
dataxHome= '/data/hadoop/datax'
export_day=$1
reader_concurrency=1
writer_concurrency=1
mysqlUser= 'hadoop'
mysqlPassword= 'jpkjcluster'
mysqlServerHost= 'jl-master'
currentDatabase= 'newidigg_jilin'
preSql= ''
postSql= ''
importTable=( 'tb_userview_domain_noMdn'  'tb_fact_app_v2'  'tb_fact_domain'  'tb_fact_tag'  'tb_fact_top5_www'  'tb_fact_upwww_time'  \
'tb_fact_search'  'tb_userview_domain'  'tb_userview_kpi_order'  'tb_userview_search'  'tb_userview_time'  'tb_userview_tag' );
#function which is used to DDL or DML msyql
function  mysqlController(){
     #这里注意一下:这里的$1不同于整个脚本的参数$1,这里是指函数的第一个参数
     local  sqlString=$*
     echo  ` date  +%Y-%m-%d " " %H:%M:%S`  "执行:${sqlString}"
     mysql -u ${mysqlUser} --password=${mysqlPassword} -h ${mysqlServerHost} -e "
         use ${currentDatabase};
         ${sqlString};
     "
}
#通用表导入模块
function  commonImport(){
     local  current_table=$1
     #create temporary table before importing data into mysql.
     echo  ` date  +%Y-%m-%d " " %H:%M:%S`  "......进入处理${current_table}表入mysql库环节......"
     echo  ` date  +%Y-%m-%d " " %H:%M:%S`  "入库前创建临时表"
     preSql= "drop table if exists ${current_table}_${export_day};create table ${current_table}_${export_day} like ${current_table}"
     mysqlController ${preSql}
     
     #import data from hdfs into msyql.
     echo  ` date  +%Y-%m-%d " " %H:%M:%S`  "将hdfs的${current_table}表导入mysql...."
     #调整mysql的导入线程数
     writer_concurrency=2
     #调用Datax将hdfs文件导入mysql
     python ${dataxHome} /bin/datax .py ${dataxHome} /jobs/hdfsreader_to_mysqlwriter_1407525566122 .xml -p "-Dhdfs_table=${current_table} -Dexport_day=${export_day} -Dreader_concurrency=${reader_concurrency} -Dwriter_concurrency=${writer_concurrency} -Dmysql_table=${current_table}_${export_day}"
     
     #Updata Data Relationship after importing data into mysql.
     if  [ ${current_table} ==  "tb_userview_search"  ]
         then
           postSql="drop table  if  exists ${current_table}; \
          rename table ${current_table}_${export_day} to ${current_table}; \
          CREATE INDEX mdn_index ON ${current_table}(mdn);
           "
     else
          postSql="drop table  if  exists ${current_table}; \
          rename table ${current_table}_${export_day} to ${current_table}; \
          Alter table ${current_table} add primary key(mdn);
           "
     fi
     mysqlController ${postSql}
     echo  ` date  +%Y-%m-%d " " %H:%M:%S`  "......完成处理${current_table}表入mysql操作......"
}
for  tableItem  in  ${importTable[*]}
do
if  [ ${tableItem} ==  "tb_userview_domain"  -o ${tableItem} ==  "tb_userview_kpi_order"  -o ${tableItem} ==  "tb_userview_search"  -o ${tableItem} ==  "tb_userview_time"  -o ${tableItem} ==  "tb_userview_tag"  ]
    then
     commonImport ${tableItem}
else
     #delete dirty data
     preSql= "delete from ${tableItem} where day_id=${export_day};"
     mysqlController ${preSql}
     #调用Datax将hdfs文件导入mysql
     python ${dataxHome} /bin/datax .py ${dataxHome} /jobs/hdfsreader_to_mysqlwriter_1407525566122 .xml -p "-Dhdfs_table=${tableItem} -Dexport_day=${export_day} -Dreader_concurrency=${reader_concurrency} -Dwriter_concurrency=${writer_concurrency} -Dmysql_table=${tableItem}"
# >> ${workspace}/../logs/exportData.log
fi
done

简单说说我这个shell脚本的用途,主要是对datax中的job配置文件的动态参数进行控制。另外,根据公司业务的不同需求,这十几个需要导入mysql的表其中有些表在导入之前和导入之后需要做不同的完善工作,这个通过这shell来控制。对于这个Shell脚本我是花了点时间进行重构的,功能点还是比较清晰、简洁的。

步骤4:

    执行脚本:nohup ./export_hdfs2mysql.sh 20140815 >> ./../idigg_task/logs/export.log &  大功告成。


三、总结

    本blog主要介绍了datax框架、对它的部署、与hadoop2.x的兼容性修改和结合我的个人开发案例说了下datax的实际使用。整个Datax的部署和使用过程还是比较方便的,其效率也是相当不错,而且性能是可控的(通过job配置文件配置读、写线程数)。在大多数情况下,datax和sqoop的性能上可以作为互补,是一个相当不错的产品。另外,说说Shell。Shell是我个人最喜欢的一种威武工具,它不仅具有天然的操作系统原生优势,同时它具有强大的粘合作用,可以将各种技术非常完美的粘合在一个项目之中。熟练的掌握Shell的编写,可以使一个开发者的战斗力上升几个等级,这个是我在实际工作中总结出来的绝对的真理。


这篇关于Datax与hadoop2.x兼容部署与实际项目应用工作记录分享的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1055185

相关文章

找完工作该补充的东西

首先: 锻炼身体,包括乒乓球,羽毛球,都必须练习,学习,锻炼身体等是一个很重要的与人交际沟通的方式; 打牌,娱乐:会玩是一个人很重要的交际沟通的法宝; 摄影:这个是一个兴趣爱好,也是提高自己的审美,生活品质,当然也是与人沟通的重要途径; 做饭:这个的话就是对自己,对朋友非常有益的一件事情;

51单片机学习记录———定时器

文章目录 前言一、定时器介绍二、STC89C52定时器资源三、定时器框图四、定时器模式五、定时器相关寄存器六、定时器练习 前言 一个学习嵌入式的小白~ 有问题评论区或私信指出~ 提示:以下是本篇文章正文内容,下面案例可供参考 一、定时器介绍 定时器介绍:51单片机的定时器属于单片机的内部资源,其电路的连接和运转均在单片机内部完成。 定时器作用: 1.用于计数系统,可

[职场] 护理专业简历怎么写 #经验分享#微信

护理专业简历怎么写   很多想成为一名护理方面的从业者,但是又不知道应该怎么制作一份简历,现在这里分享了一份护理方面的简历模板供大家参考。   蓝山山   年龄:24   号码:12345678910   地址:上海市 邮箱:jianli@jianli.com   教育背景   时间:2011-09到2015-06   学校:蓝山大学   专业:护理学   学历:本科

Javascript高级程序设计(第四版)--学习记录之变量、内存

原始值与引用值 原始值:简单的数据即基础数据类型,按值访问。 引用值:由多个值构成的对象即复杂数据类型,按引用访问。 动态属性 对于引用值而言,可以随时添加、修改和删除其属性和方法。 let person = new Object();person.name = 'Jason';person.age = 42;console.log(person.name,person.age);//'J

大学湖北中医药大学法医学试题及答案,分享几个实用搜题和学习工具 #微信#学习方法#职场发展

今天分享拥有拍照搜题、文字搜题、语音搜题、多重搜题等搜题模式,可以快速查找问题解析,加深对题目答案的理解。 1.快练题 这是一个网站 找题的网站海量题库,在线搜题,快速刷题~为您提供百万优质题库,直接搜索题库名称,支持多种刷题模式:顺序练习、语音听题、本地搜题、顺序阅读、模拟考试、组卷考试、赶快下载吧! 2.彩虹搜题 这是个老公众号了 支持手写输入,截图搜题,详细步骤,解题必备

UnrealScriptIDE调试环境部署

先安装vs2010   再安装VSIsoShell.exe, 下载地址 https://pan.baidu.com/s/10kPNUuDGTbWXbz7Nos-1WA       fd3t   最后安装unside,下载地址 https://archive.codeplex.com/?p=uside  安装中间有一步选择Binary文件夹要选对路径。   安装好以后,启动 UDKDe

用Microsoft.Extensions.Hosting 管理WPF项目.

首先引入必要的包: <ItemGroup><PackageReference Include="CommunityToolkit.Mvvm" Version="8.2.2" /><PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.0" /><PackageReference Include="Serilog

eclipse运行springboot项目,找不到主类

解决办法尝试了很多种,下载sts压缩包行不通。最后解决办法如图: help--->Eclipse Marketplace--->Popular--->找到Spring Tools 3---->Installed。

vcpkg安装opencv中的特殊问题记录(无法找到opencv_corexd.dll)

我是按照网上的vcpkg安装opencv方法进行的(比如这篇:从0开始在visual studio上安装opencv(超详细,针对小白)),但是中间出现了一些别人没有遇到的问题,虽然原因没有找到,但是本人给出一些暂时的解决办法: 问题1: 我在安装库命令行使用的是 .\vcpkg.exe install opencv 我的电脑是x64,vcpkg在这条命令后默认下载的也是opencv2:x6

[职场] 公务员的利弊分析 #知识分享#经验分享#其他

公务员的利弊分析     公务员作为一种稳定的职业选择,一直备受人们的关注。然而,就像任何其他职业一样,公务员职位也有其利与弊。本文将对公务员的利弊进行分析,帮助读者更好地了解这一职业的特点。 利: 1. 稳定的职业:公务员职位通常具有较高的稳定性,一旦进入公务员队伍,往往可以享受到稳定的工作环境和薪资待遇。这对于那些追求稳定的人来说,是一个很大的优势。 2. 薪资福利优厚:公务员的薪资和