Datax与hadoop2.x兼容部署与实际项目应用工作记录分享

2024-06-12 19:58

本文主要是介绍Datax与hadoop2.x兼容部署与实际项目应用工作记录分享,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、概述

    Hadoop的版本更新挺快的,已经到了2.4,但是其周边工具的更新速度还是比较慢的,一些旧的周边工具版本对hadoop2.x的兼容性做得还不完善,特别是sqoop。最近,在为hadoop2.2.0找适合的sqoop版本时遇到了很多问题。尝试了多个sqoop1.4.x版本的直接简单粗暴的报版本不兼容问题,其中测了sqoop-1.4.4.bin__hadoop-0.23这个版本,在该版本中直接用sqoop的脚本export HDFS的数据是没有问题的,但是一旦调用JAVA API来进行对HDFS的数据的export的时候就各种不兼容问题,原因是这个版本的API也是基于hadoop1.x来写的。另外还尝试了使用sqoop2(之前blog写过关于sqoop2的部署和使用情况:http://zengzhaozheng.blog.51cto.com/8219051/1431882 ),这个版本取消了sqoop1的脚本执行方式,可以采取交互式、api或者rest的方式工作,但是我在使用的过程中还是存在的一些问题:sqoop2(我用的是1.99.3)无法指定列的分隔符、对\N等字符的处理有问题、对列值的类型判断存在问题等(其详细问题所在请看,sqoop1.99.3源代码的org.apache.sqoop.job.io.Data类)。

    这个礼拜终于找到了一个比较好的方案来取代sqoop作为HDFS到mysql的数据export模块,那就是大淘宝开源的datax。虽然datax采用的是单机方式的作业方式,但是经过试验我对比了一下其和sqoop性能上的差异,在数据量不是特别大的情况下datax和sqoop的性能相差不是很明显的,在少量数据的情况下datax的性能稍微好点。

    这篇blog将简单介绍一下这个datax这个框架以及它的用法,特别地说说如果修改datax才能使得datax运行在hadoop2.x上(datax是基于hadoop1.x进行开发的)。另外,主要和大家分享一下我在自己项目中如何使用datax,如何通过自己编写的shell脚本将datax、mysql和项目粘合起来。


二、datax简介和datax在hadoop2.x上的兼容部署

1、datax简介

    DataX是一个在异构的数据库/文件系统之间高速交换数据的工具,实现了在任意的数据处理系统(RDBMS/Hdfs/Local filesystem)之间的数据交换。Datax框架中我最欣赏的就是基于插件的模式,你在部署的时候可以只安装那些用到的Reader/Writer插件rpm包,没有用的可以不用安装。同时,你也可以根据自己的特殊需求很快的写出Reader、Writer。Datax采用Framework + plugin架构构建,Framework处理了缓冲,流控,并发,上下文加载等高速数据交换的大部分技术问题,提供了简单的接口与插件交互,插件仅需实现对数据处理系统的访问。Datax的运行方式采用stand-alone方式,在数据传输过程在单进程内完成,全内存操作,不读写磁盘,也没有IPC通信。下面是一个来自大淘宝开源官网的datax架构图:

wKiom1PtpMqgr7e2AAGmnn3LYHc088.jpg

各个组件的作用:

  • Job: 一道数据同步作业

  • Splitter: 作业切分模块,将一个大任务与分解成多个可以并发的小任务.

  • Sub-job: 数据同步作业切分后的小任务

  • Reader(Loader): 数据读入模块,负责运行切分后的小任务,将数据从源头装载入DataX

  • Storage: Reader和Writer通过Storage交换数据

  • Writer(Dumper): 数据写出模块,负责将数据从DataX导入至目的数据地


Datax内置插件:
    DataX框架内部通过双缓冲队列、线程池封装等技术,集中处理了高速数据交换遇到的问题,提供简单的接口与插件交互,插件分为Reader和Writer两类,基于框架提供的插件接口,可以十分便捷的开发出需要的插件。比如想要从oracle导出数据到mysql,那么需要做的就是开发出OracleReader和MysqlWriter插件,装配到框架上即可。并且这样的插件一般情况下在其他数据交换场合是可以通用的。更大的惊喜是我们已经开发了如下插件:


Reader插件

  • hdfsreader : 支持从hdfs文件系统获取数据。

  • mysqlreader: 支持从mysql数据库获取数据。

  • sqlserverreader: 支持从sqlserver数据库获取数据。

  • oraclereader : 支持从oracle数据库获取数据。

  • streamreader: 支持从stream流获取数据(常用于测试)

  • httpreader : 支持从http URL获取数据。


Writer插件

  • hdfswriter:支持向hdbf写入数据。

  • mysqlwriter:支持向mysql写入数据。

  • oraclewriter:支持向oracle写入数据。

  • streamwriter:支持向stream流写入数据。(常用于测试)


2、datax在hadoop2.x上的兼容部署

    Datax是基于Hadoop1.x开发的,因此要想基于HADOOP2.x使用hdfsreader和hdfswriter插件,那么必须对这些插件的本地库以及一些jar包替换掉,同时要增加Hadoop2.x所需的依赖包,下面以hdfsreader为例说明:

wKiom1PtqoqDx-HSAAJAkufP5EQ380.jpg

进入到plugins目录找到hdfsreader,将hadoop-0.19.2-core.jar删除,将本地库替换为$HAOOP_HOME2.x/lib/native/libhadoop.so。同时添加Hadoop2.x的依赖包,如下图:

wKiom1Ptq5DAmRCLAAMw-fhcS7w743.jpg

另外,Datax需要hadoop1.x的hadoop-core.xml配置文件,但是hadoop2.x中不存在这个文件,这里有一个解决方法,就是将各个配置文件的配置项都集中写到一个新建的配置文件中,单独有datax使用,这个配置文件在datax的job xml文件由参数hadoop-conf配上。到现在为止,datax与hadoop2.x的兼容性修改已经完成了。

    还要做其他环境的调整,确保java版本>=1.6,python的版本>=2.6(对于python的版本选择上,个人推荐2.6或者2.7,如果pytyon版本上到3.x的话会有错误,个人经验)。最后修改一下各个插件的rpm包的build路径:

wKioL1PtrxOBC2nQAAQmgVRCwRk456.jpg

下面以t_dp_datax_engine.spec为例子:

wKioL1Ptr1TzudZgAAHquH2uYQQ529.jpg

上面红色方框的地方是指build rpm 插件后新产生的文件夹位置,改为自己编辑的目录。

下面以t_dp_datax_engine.spec为例子,看看怎么build rpm 插件:

具体执行过程如下:

1、请先check out一份DataX源码,并cd切换到DataX源码中的rpm目录

2、编译打包DataX engine包,使用rpmbuild --ba t_dp_datax_engine.spec(请确保有root权限),打包生成的rpm后如下图所示

 wKiom1Ptr57CQUYbAACvCg12hUI694.jpg

Rpm制作完成后,即可分发、安装,例如使用

rpm -ivh  t_dp_datax_engine.rpm

即可安装DataX engine 包,需要注意的是engine的rpm地址源自于上图的截图中信息。

如下图:

 wKiom1Ptr3OwEWI7AACu95LGP0k854.jpg

安装完成后,在/home/taobao/datax/目录下会存在如下文件:

wKiom1PtsGLBAbt1AAIZeoIrKOM474.jpg


其他的插件按照这种方式按照好就ok了。


三、datax的实际应用记录分享

    在blog的这部分主要分享一下我对datax使用的一个小案例,希望能够给初用datax的同学一点点参照。

  • 具体业务场景:

    需要将存储在HDFS上的一些表export到mysql中,不希望datax对每一个表的export操作都产生一个job xml文件,希望对不同的表动态使用同一个 job xml文件(这个用datax配置文件动态参数结合shell实现)。同时,根据公司业务的需求当不同的HDFS 表export到mysql的前后还需要做一些基于mysql的DML操作(这个可以通过datax 配置文件中的pre以及post参数进行配置,但是我为了方便流程的控制用shell取代了)。

  • 实现步骤:

步骤1:

执行$DATAX_HOME/bin/datax.py -e命令,选择data source来源,这里我们选择7:

wKiom1PttJzDrEFhAADPYH21Bp8574.jpg

接着选择export的目标源,这个我们选择0:

wKioL1PttffjMsX8AACscgOOhhg820.jpg

步骤2:

根据自己的业务需求和HADOOP的相应环境配置产生的job xml,进入到$DATAX_HOME/jobs,编辑job配置文件,我的配置如下(里边的一些动态参数有下面我自己写的Shell中进行控制):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
<?xml version="1.0" encoding="UTF-8"?>
<jobs>
   <job id="hdfsreader_to_mysqlwriter_job">
     <reader>
       <plugin>hdfsreader</plugin>
       <!--
description:HDFS login account, e.g. 'username, groupname(groupname...),#password
mandatory:true
name:ugi
-->
       <param key="hadoop.job.ugi" value="hadoop,supergroup#jpkjcluster"/>
       <!--
description:hadoop-site.xml path
mandatory:false
name:hadoop_conf
-->
       <param key="hadoop_conf" value="/data/hadoop/hadoop-2.2.0/etc/hadoop/datax_hadoop_conf.xml"/>
       <!--
description:hdfs path, format like: hdfs://ip:port/path, or file:///home/taobao/
mandatory:true
name:dir
-->
       <param key="dir" value="hdfs://172.16.8.1:8020/user/hive/warehouse/jl.db/${hdfs_table}/day=${export_day}"/>
       <!--
default:\t
description:how to sperate a line
mandatory:false
name:fieldSplit
-->
       <param key="field_split" value=","/>
       <!--
default:UTF-8
range:UTF-8|GBK|GB2312
description:hdfs encode
mandatory:false
name:encoding
-->
       <param key="encoding" value="UTF-8"/>
       <!--
default:4096
range:[1024-4194304]
description:how large the buffer
mandatory:false
name:bufferSize
-->
       <param key="buffer_size" value="4096"/>
       <!--
default:\N
range:
description:replace the nullstring to null
mandatory:false
name:nullString
-->
       <param key="nullstring" value="\N"/>
       <!--
default:true
range:true|false
description:ingore key
mandatory:false
name:ignoreKey
-->
       <param key="ignore_key" value="true"/>
       <!--
default:
range:
description:how to filter column
mandatory:false
name:colFilter
       <param key="col_filter" value="?"/>
-->
       <!--
default:1
range:1-100
description:concurrency of the job
mandatory:false
name:concurrency
-->
       <param key="concurrency" value="${reader_concurrency}"/>
     </reader>
     <writer>
       <plugin>mysqlwriter</plugin>
       <!--
description:Mysql database ip address
mandatory:true
name:ip
-->
       <param key="ip" value="jl-master"/>
       <!--
default:3306
description:Mysql database port
mandatory:true
name:port
-->
       <param key="port" value="3306"/>
       <!--
description:Mysql database name
mandatory:true
name:dbname
-->
       <param key="dbname" value="newidigg_jilin"/>
       <!--
description:Mysql database login username
mandatory:true
name:username
-->
<param key="username" value="hadoop"/>
       <!--
description:Mysql database login password
mandatory:true
name:password
-->
       <param key="password" value="jpkjcluster"/>
       <!--
default:
range:
description:table to be dumped data into
mandatory:true
name:table
-->
       <param key="table" value="${mysql_table}"/>
       <!--
range:
description:order of columns
mandatory:false
name:colorder
       <param key="colorder" value="?"/>
-->
       <!--
default:UTF-8
range:UTF-8|GBK|GB2312
description:
mandatory:false
name:encoding
-->
       <param key="encoding" value="UTF-8"/>
       <!--
description:execute sql before dumping data
mandatory:false
name:pre
       <param key="pre" value="${preSql}"/>
-->
    <!--
description:execute sql after dumping data
mandatory:false
name:post
       <param key="post" value="${postSql}"/>
-->
       <!--
default:0
range:[0-65535]
description:error limit
mandatory:false
name:limit
-->
       <param key="limit" value="0"/>
       <!--
mandatory:false
name:set
       <param key="set" value="?"/>
-->
       <!--
default:false
range:[true/false]
mandatory:false
name:replace
-->
       <param key="replace" value="false"/>
       <!--
range:params1|params2|...
description:mysql driver params
mandatory:false
name:mysql.params
       <param key="mysql.params" value="?"/>
-->
       <!--
default:1
range:1-100
description:concurrency of the job
mandatory:false
-->
       <param key="concurrency" value="${writer_concurrency}"/>
     </writer>
   </job>
</jobs>


步骤3:

    编写Shell脚本export_hdfs2mysql.sh对整个Datax作业根据业务需求进行控制:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
#!/bin/bash
#author:曾昭正
#create time:2014-08-14
workspace=` dirname  $0`
dataxHome= '/data/hadoop/datax'
export_day=$1
reader_concurrency=1
writer_concurrency=1
mysqlUser= 'hadoop'
mysqlPassword= 'jpkjcluster'
mysqlServerHost= 'jl-master'
currentDatabase= 'newidigg_jilin'
preSql= ''
postSql= ''
importTable=( 'tb_userview_domain_noMdn'  'tb_fact_app_v2'  'tb_fact_domain'  'tb_fact_tag'  'tb_fact_top5_www'  'tb_fact_upwww_time'  \
'tb_fact_search'  'tb_userview_domain'  'tb_userview_kpi_order'  'tb_userview_search'  'tb_userview_time'  'tb_userview_tag' );
#function which is used to DDL or DML msyql
function  mysqlController(){
     #这里注意一下:这里的$1不同于整个脚本的参数$1,这里是指函数的第一个参数
     local  sqlString=$*
     echo  ` date  +%Y-%m-%d " " %H:%M:%S`  "执行:${sqlString}"
     mysql -u ${mysqlUser} --password=${mysqlPassword} -h ${mysqlServerHost} -e "
         use ${currentDatabase};
         ${sqlString};
     "
}
#通用表导入模块
function  commonImport(){
     local  current_table=$1
     #create temporary table before importing data into mysql.
     echo  ` date  +%Y-%m-%d " " %H:%M:%S`  "......进入处理${current_table}表入mysql库环节......"
     echo  ` date  +%Y-%m-%d " " %H:%M:%S`  "入库前创建临时表"
     preSql= "drop table if exists ${current_table}_${export_day};create table ${current_table}_${export_day} like ${current_table}"
     mysqlController ${preSql}
     
     #import data from hdfs into msyql.
     echo  ` date  +%Y-%m-%d " " %H:%M:%S`  "将hdfs的${current_table}表导入mysql...."
     #调整mysql的导入线程数
     writer_concurrency=2
     #调用Datax将hdfs文件导入mysql
     python ${dataxHome} /bin/datax .py ${dataxHome} /jobs/hdfsreader_to_mysqlwriter_1407525566122 .xml -p "-Dhdfs_table=${current_table} -Dexport_day=${export_day} -Dreader_concurrency=${reader_concurrency} -Dwriter_concurrency=${writer_concurrency} -Dmysql_table=${current_table}_${export_day}"
     
     #Updata Data Relationship after importing data into mysql.
     if  [ ${current_table} ==  "tb_userview_search"  ]
         then
           postSql="drop table  if  exists ${current_table}; \
          rename table ${current_table}_${export_day} to ${current_table}; \
          CREATE INDEX mdn_index ON ${current_table}(mdn);
           "
     else
          postSql="drop table  if  exists ${current_table}; \
          rename table ${current_table}_${export_day} to ${current_table}; \
          Alter table ${current_table} add primary key(mdn);
           "
     fi
     mysqlController ${postSql}
     echo  ` date  +%Y-%m-%d " " %H:%M:%S`  "......完成处理${current_table}表入mysql操作......"
}
for  tableItem  in  ${importTable[*]}
do
if  [ ${tableItem} ==  "tb_userview_domain"  -o ${tableItem} ==  "tb_userview_kpi_order"  -o ${tableItem} ==  "tb_userview_search"  -o ${tableItem} ==  "tb_userview_time"  -o ${tableItem} ==  "tb_userview_tag"  ]
    then
     commonImport ${tableItem}
else
     #delete dirty data
     preSql= "delete from ${tableItem} where day_id=${export_day};"
     mysqlController ${preSql}
     #调用Datax将hdfs文件导入mysql
     python ${dataxHome} /bin/datax .py ${dataxHome} /jobs/hdfsreader_to_mysqlwriter_1407525566122 .xml -p "-Dhdfs_table=${tableItem} -Dexport_day=${export_day} -Dreader_concurrency=${reader_concurrency} -Dwriter_concurrency=${writer_concurrency} -Dmysql_table=${tableItem}"
# >> ${workspace}/../logs/exportData.log
fi
done

简单说说我这个shell脚本的用途,主要是对datax中的job配置文件的动态参数进行控制。另外,根据公司业务的不同需求,这十几个需要导入mysql的表其中有些表在导入之前和导入之后需要做不同的完善工作,这个通过这shell来控制。对于这个Shell脚本我是花了点时间进行重构的,功能点还是比较清晰、简洁的。

步骤4:

    执行脚本:nohup ./export_hdfs2mysql.sh 20140815 >> ./../idigg_task/logs/export.log &  大功告成。


三、总结

    本blog主要介绍了datax框架、对它的部署、与hadoop2.x的兼容性修改和结合我的个人开发案例说了下datax的实际使用。整个Datax的部署和使用过程还是比较方便的,其效率也是相当不错,而且性能是可控的(通过job配置文件配置读、写线程数)。在大多数情况下,datax和sqoop的性能上可以作为互补,是一个相当不错的产品。另外,说说Shell。Shell是我个人最喜欢的一种威武工具,它不仅具有天然的操作系统原生优势,同时它具有强大的粘合作用,可以将各种技术非常完美的粘合在一个项目之中。熟练的掌握Shell的编写,可以使一个开发者的战斗力上升几个等级,这个是我在实际工作中总结出来的绝对的真理。


这篇关于Datax与hadoop2.x兼容部署与实际项目应用工作记录分享的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1055185

相关文章

C语言中位操作的实际应用举例

《C语言中位操作的实际应用举例》:本文主要介绍C语言中位操作的实际应用,总结了位操作的使用场景,并指出了需要注意的问题,如可读性、平台依赖性和溢出风险,文中通过代码介绍的非常详细,需要的朋友可以参... 目录1. 嵌入式系统与硬件寄存器操作2. 网络协议解析3. 图像处理与颜色编码4. 高效处理布尔标志集合

SpringBoot请求参数接收控制指南分享

《SpringBoot请求参数接收控制指南分享》:本文主要介绍SpringBoot请求参数接收控制指南,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录Spring Boot 请求参数接收控制指南1. 概述2. 有注解时参数接收方式对比3. 无注解时接收参数默认位置

SpringBoot项目中报错The field screenShot exceeds its maximum permitted size of 1048576 bytes.的问题及解决

《SpringBoot项目中报错ThefieldscreenShotexceedsitsmaximumpermittedsizeof1048576bytes.的问题及解决》这篇文章... 目录项目场景问题描述原因分析解决方案总结项目场景javascript提示:项目相关背景:项目场景:基于Spring

解决Maven项目idea找不到本地仓库jar包问题以及使用mvn install:install-file

《解决Maven项目idea找不到本地仓库jar包问题以及使用mvninstall:install-file》:本文主要介绍解决Maven项目idea找不到本地仓库jar包问题以及使用mvnin... 目录Maven项目idea找不到本地仓库jar包以及使用mvn install:install-file基

Java中的Lambda表达式及其应用小结

《Java中的Lambda表达式及其应用小结》Java中的Lambda表达式是一项极具创新性的特性,它使得Java代码更加简洁和高效,尤其是在集合操作和并行处理方面,:本文主要介绍Java中的La... 目录前言1. 什么是Lambda表达式?2. Lambda表达式的基本语法例子1:最简单的Lambda表

springboot项目如何开启https服务

《springboot项目如何开启https服务》:本文主要介绍springboot项目如何开启https服务方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录springboot项目开启https服务1. 生成SSL证书密钥库使用keytool生成自签名证书将

Java使用SLF4J记录不同级别日志的示例详解

《Java使用SLF4J记录不同级别日志的示例详解》SLF4J是一个简单的日志门面,它允许在运行时选择不同的日志实现,这篇文章主要为大家详细介绍了如何使用SLF4J记录不同级别日志,感兴趣的可以了解下... 目录一、SLF4J简介二、添加依赖三、配置Logback四、记录不同级别的日志五、总结一、SLF4J

将Java项目提交到云服务器的流程步骤

《将Java项目提交到云服务器的流程步骤》所谓将项目提交到云服务器即将你的项目打成一个jar包然后提交到云服务器即可,因此我们需要准备服务器环境为:Linux+JDK+MariDB(MySQL)+Gi... 目录1. 安装 jdk1.1 查看 jdk 版本1.2 下载 jdk2. 安装 mariadb(my

Python结合PyWebView库打造跨平台桌面应用

《Python结合PyWebView库打造跨平台桌面应用》随着Web技术的发展,将HTML/CSS/JavaScript与Python结合构建桌面应用成为可能,本文将系统讲解如何使用PyWebView... 目录一、技术原理与优势分析1.1 架构原理1.2 核心优势二、开发环境搭建2.1 安装依赖2.2 验

Java字符串操作技巧之语法、示例与应用场景分析

《Java字符串操作技巧之语法、示例与应用场景分析》在Java算法题和日常开发中,字符串处理是必备的核心技能,本文全面梳理Java中字符串的常用操作语法,结合代码示例、应用场景和避坑指南,可快速掌握字... 目录引言1. 基础操作1.1 创建字符串1.2 获取长度1.3 访问字符2. 字符串处理2.1 子字