Datax与hadoop2.x兼容部署与实际项目应用工作记录分享

2024-06-12 19:58

本文主要是介绍Datax与hadoop2.x兼容部署与实际项目应用工作记录分享,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、概述

    Hadoop的版本更新挺快的,已经到了2.4,但是其周边工具的更新速度还是比较慢的,一些旧的周边工具版本对hadoop2.x的兼容性做得还不完善,特别是sqoop。最近,在为hadoop2.2.0找适合的sqoop版本时遇到了很多问题。尝试了多个sqoop1.4.x版本的直接简单粗暴的报版本不兼容问题,其中测了sqoop-1.4.4.bin__hadoop-0.23这个版本,在该版本中直接用sqoop的脚本export HDFS的数据是没有问题的,但是一旦调用JAVA API来进行对HDFS的数据的export的时候就各种不兼容问题,原因是这个版本的API也是基于hadoop1.x来写的。另外还尝试了使用sqoop2(之前blog写过关于sqoop2的部署和使用情况:http://zengzhaozheng.blog.51cto.com/8219051/1431882 ),这个版本取消了sqoop1的脚本执行方式,可以采取交互式、api或者rest的方式工作,但是我在使用的过程中还是存在的一些问题:sqoop2(我用的是1.99.3)无法指定列的分隔符、对\N等字符的处理有问题、对列值的类型判断存在问题等(其详细问题所在请看,sqoop1.99.3源代码的org.apache.sqoop.job.io.Data类)。

    这个礼拜终于找到了一个比较好的方案来取代sqoop作为HDFS到mysql的数据export模块,那就是大淘宝开源的datax。虽然datax采用的是单机方式的作业方式,但是经过试验我对比了一下其和sqoop性能上的差异,在数据量不是特别大的情况下datax和sqoop的性能相差不是很明显的,在少量数据的情况下datax的性能稍微好点。

    这篇blog将简单介绍一下这个datax这个框架以及它的用法,特别地说说如果修改datax才能使得datax运行在hadoop2.x上(datax是基于hadoop1.x进行开发的)。另外,主要和大家分享一下我在自己项目中如何使用datax,如何通过自己编写的shell脚本将datax、mysql和项目粘合起来。


二、datax简介和datax在hadoop2.x上的兼容部署

1、datax简介

    DataX是一个在异构的数据库/文件系统之间高速交换数据的工具,实现了在任意的数据处理系统(RDBMS/Hdfs/Local filesystem)之间的数据交换。Datax框架中我最欣赏的就是基于插件的模式,你在部署的时候可以只安装那些用到的Reader/Writer插件rpm包,没有用的可以不用安装。同时,你也可以根据自己的特殊需求很快的写出Reader、Writer。Datax采用Framework + plugin架构构建,Framework处理了缓冲,流控,并发,上下文加载等高速数据交换的大部分技术问题,提供了简单的接口与插件交互,插件仅需实现对数据处理系统的访问。Datax的运行方式采用stand-alone方式,在数据传输过程在单进程内完成,全内存操作,不读写磁盘,也没有IPC通信。下面是一个来自大淘宝开源官网的datax架构图:

wKiom1PtpMqgr7e2AAGmnn3LYHc088.jpg

各个组件的作用:

  • Job: 一道数据同步作业

  • Splitter: 作业切分模块,将一个大任务与分解成多个可以并发的小任务.

  • Sub-job: 数据同步作业切分后的小任务

  • Reader(Loader): 数据读入模块,负责运行切分后的小任务,将数据从源头装载入DataX

  • Storage: Reader和Writer通过Storage交换数据

  • Writer(Dumper): 数据写出模块,负责将数据从DataX导入至目的数据地


Datax内置插件:
    DataX框架内部通过双缓冲队列、线程池封装等技术,集中处理了高速数据交换遇到的问题,提供简单的接口与插件交互,插件分为Reader和Writer两类,基于框架提供的插件接口,可以十分便捷的开发出需要的插件。比如想要从oracle导出数据到mysql,那么需要做的就是开发出OracleReader和MysqlWriter插件,装配到框架上即可。并且这样的插件一般情况下在其他数据交换场合是可以通用的。更大的惊喜是我们已经开发了如下插件:


Reader插件

  • hdfsreader : 支持从hdfs文件系统获取数据。

  • mysqlreader: 支持从mysql数据库获取数据。

  • sqlserverreader: 支持从sqlserver数据库获取数据。

  • oraclereader : 支持从oracle数据库获取数据。

  • streamreader: 支持从stream流获取数据(常用于测试)

  • httpreader : 支持从http URL获取数据。


Writer插件

  • hdfswriter:支持向hdbf写入数据。

  • mysqlwriter:支持向mysql写入数据。

  • oraclewriter:支持向oracle写入数据。

  • streamwriter:支持向stream流写入数据。(常用于测试)


2、datax在hadoop2.x上的兼容部署

    Datax是基于Hadoop1.x开发的,因此要想基于HADOOP2.x使用hdfsreader和hdfswriter插件,那么必须对这些插件的本地库以及一些jar包替换掉,同时要增加Hadoop2.x所需的依赖包,下面以hdfsreader为例说明:

wKiom1PtqoqDx-HSAAJAkufP5EQ380.jpg

进入到plugins目录找到hdfsreader,将hadoop-0.19.2-core.jar删除,将本地库替换为$HAOOP_HOME2.x/lib/native/libhadoop.so。同时添加Hadoop2.x的依赖包,如下图:

wKiom1Ptq5DAmRCLAAMw-fhcS7w743.jpg

另外,Datax需要hadoop1.x的hadoop-core.xml配置文件,但是hadoop2.x中不存在这个文件,这里有一个解决方法,就是将各个配置文件的配置项都集中写到一个新建的配置文件中,单独有datax使用,这个配置文件在datax的job xml文件由参数hadoop-conf配上。到现在为止,datax与hadoop2.x的兼容性修改已经完成了。

    还要做其他环境的调整,确保java版本>=1.6,python的版本>=2.6(对于python的版本选择上,个人推荐2.6或者2.7,如果pytyon版本上到3.x的话会有错误,个人经验)。最后修改一下各个插件的rpm包的build路径:

wKioL1PtrxOBC2nQAAQmgVRCwRk456.jpg

下面以t_dp_datax_engine.spec为例子:

wKioL1Ptr1TzudZgAAHquH2uYQQ529.jpg

上面红色方框的地方是指build rpm 插件后新产生的文件夹位置,改为自己编辑的目录。

下面以t_dp_datax_engine.spec为例子,看看怎么build rpm 插件:

具体执行过程如下:

1、请先check out一份DataX源码,并cd切换到DataX源码中的rpm目录

2、编译打包DataX engine包,使用rpmbuild --ba t_dp_datax_engine.spec(请确保有root权限),打包生成的rpm后如下图所示

 wKiom1Ptr57CQUYbAACvCg12hUI694.jpg

Rpm制作完成后,即可分发、安装,例如使用

rpm -ivh  t_dp_datax_engine.rpm

即可安装DataX engine 包,需要注意的是engine的rpm地址源自于上图的截图中信息。

如下图:

 wKiom1Ptr3OwEWI7AACu95LGP0k854.jpg

安装完成后,在/home/taobao/datax/目录下会存在如下文件:

wKiom1PtsGLBAbt1AAIZeoIrKOM474.jpg


其他的插件按照这种方式按照好就ok了。


三、datax的实际应用记录分享

    在blog的这部分主要分享一下我对datax使用的一个小案例,希望能够给初用datax的同学一点点参照。

  • 具体业务场景:

    需要将存储在HDFS上的一些表export到mysql中,不希望datax对每一个表的export操作都产生一个job xml文件,希望对不同的表动态使用同一个 job xml文件(这个用datax配置文件动态参数结合shell实现)。同时,根据公司业务的需求当不同的HDFS 表export到mysql的前后还需要做一些基于mysql的DML操作(这个可以通过datax 配置文件中的pre以及post参数进行配置,但是我为了方便流程的控制用shell取代了)。

  • 实现步骤:

步骤1:

执行$DATAX_HOME/bin/datax.py -e命令,选择data source来源,这里我们选择7:

wKiom1PttJzDrEFhAADPYH21Bp8574.jpg

接着选择export的目标源,这个我们选择0:

wKioL1PttffjMsX8AACscgOOhhg820.jpg

步骤2:

根据自己的业务需求和HADOOP的相应环境配置产生的job xml,进入到$DATAX_HOME/jobs,编辑job配置文件,我的配置如下(里边的一些动态参数有下面我自己写的Shell中进行控制):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
<?xml version="1.0" encoding="UTF-8"?>
<jobs>
   <job id="hdfsreader_to_mysqlwriter_job">
     <reader>
       <plugin>hdfsreader</plugin>
       <!--
description:HDFS login account, e.g. 'username, groupname(groupname...),#password
mandatory:true
name:ugi
-->
       <param key="hadoop.job.ugi" value="hadoop,supergroup#jpkjcluster"/>
       <!--
description:hadoop-site.xml path
mandatory:false
name:hadoop_conf
-->
       <param key="hadoop_conf" value="/data/hadoop/hadoop-2.2.0/etc/hadoop/datax_hadoop_conf.xml"/>
       <!--
description:hdfs path, format like: hdfs://ip:port/path, or file:///home/taobao/
mandatory:true
name:dir
-->
       <param key="dir" value="hdfs://172.16.8.1:8020/user/hive/warehouse/jl.db/${hdfs_table}/day=${export_day}"/>
       <!--
default:\t
description:how to sperate a line
mandatory:false
name:fieldSplit
-->
       <param key="field_split" value=","/>
       <!--
default:UTF-8
range:UTF-8|GBK|GB2312
description:hdfs encode
mandatory:false
name:encoding
-->
       <param key="encoding" value="UTF-8"/>
       <!--
default:4096
range:[1024-4194304]
description:how large the buffer
mandatory:false
name:bufferSize
-->
       <param key="buffer_size" value="4096"/>
       <!--
default:\N
range:
description:replace the nullstring to null
mandatory:false
name:nullString
-->
       <param key="nullstring" value="\N"/>
       <!--
default:true
range:true|false
description:ingore key
mandatory:false
name:ignoreKey
-->
       <param key="ignore_key" value="true"/>
       <!--
default:
range:
description:how to filter column
mandatory:false
name:colFilter
       <param key="col_filter" value="?"/>
-->
       <!--
default:1
range:1-100
description:concurrency of the job
mandatory:false
name:concurrency
-->
       <param key="concurrency" value="${reader_concurrency}"/>
     </reader>
     <writer>
       <plugin>mysqlwriter</plugin>
       <!--
description:Mysql database ip address
mandatory:true
name:ip
-->
       <param key="ip" value="jl-master"/>
       <!--
default:3306
description:Mysql database port
mandatory:true
name:port
-->
       <param key="port" value="3306"/>
       <!--
description:Mysql database name
mandatory:true
name:dbname
-->
       <param key="dbname" value="newidigg_jilin"/>
       <!--
description:Mysql database login username
mandatory:true
name:username
-->
<param key="username" value="hadoop"/>
       <!--
description:Mysql database login password
mandatory:true
name:password
-->
       <param key="password" value="jpkjcluster"/>
       <!--
default:
range:
description:table to be dumped data into
mandatory:true
name:table
-->
       <param key="table" value="${mysql_table}"/>
       <!--
range:
description:order of columns
mandatory:false
name:colorder
       <param key="colorder" value="?"/>
-->
       <!--
default:UTF-8
range:UTF-8|GBK|GB2312
description:
mandatory:false
name:encoding
-->
       <param key="encoding" value="UTF-8"/>
       <!--
description:execute sql before dumping data
mandatory:false
name:pre
       <param key="pre" value="${preSql}"/>
-->
    <!--
description:execute sql after dumping data
mandatory:false
name:post
       <param key="post" value="${postSql}"/>
-->
       <!--
default:0
range:[0-65535]
description:error limit
mandatory:false
name:limit
-->
       <param key="limit" value="0"/>
       <!--
mandatory:false
name:set
       <param key="set" value="?"/>
-->
       <!--
default:false
range:[true/false]
mandatory:false
name:replace
-->
       <param key="replace" value="false"/>
       <!--
range:params1|params2|...
description:mysql driver params
mandatory:false
name:mysql.params
       <param key="mysql.params" value="?"/>
-->
       <!--
default:1
range:1-100
description:concurrency of the job
mandatory:false
-->
       <param key="concurrency" value="${writer_concurrency}"/>
     </writer>
   </job>
</jobs>


步骤3:

    编写Shell脚本export_hdfs2mysql.sh对整个Datax作业根据业务需求进行控制:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
#!/bin/bash
#author:曾昭正
#create time:2014-08-14
workspace=` dirname  $0`
dataxHome= '/data/hadoop/datax'
export_day=$1
reader_concurrency=1
writer_concurrency=1
mysqlUser= 'hadoop'
mysqlPassword= 'jpkjcluster'
mysqlServerHost= 'jl-master'
currentDatabase= 'newidigg_jilin'
preSql= ''
postSql= ''
importTable=( 'tb_userview_domain_noMdn'  'tb_fact_app_v2'  'tb_fact_domain'  'tb_fact_tag'  'tb_fact_top5_www'  'tb_fact_upwww_time'  \
'tb_fact_search'  'tb_userview_domain'  'tb_userview_kpi_order'  'tb_userview_search'  'tb_userview_time'  'tb_userview_tag' );
#function which is used to DDL or DML msyql
function  mysqlController(){
     #这里注意一下:这里的$1不同于整个脚本的参数$1,这里是指函数的第一个参数
     local  sqlString=$*
     echo  ` date  +%Y-%m-%d " " %H:%M:%S`  "执行:${sqlString}"
     mysql -u ${mysqlUser} --password=${mysqlPassword} -h ${mysqlServerHost} -e "
         use ${currentDatabase};
         ${sqlString};
     "
}
#通用表导入模块
function  commonImport(){
     local  current_table=$1
     #create temporary table before importing data into mysql.
     echo  ` date  +%Y-%m-%d " " %H:%M:%S`  "......进入处理${current_table}表入mysql库环节......"
     echo  ` date  +%Y-%m-%d " " %H:%M:%S`  "入库前创建临时表"
     preSql= "drop table if exists ${current_table}_${export_day};create table ${current_table}_${export_day} like ${current_table}"
     mysqlController ${preSql}
     
     #import data from hdfs into msyql.
     echo  ` date  +%Y-%m-%d " " %H:%M:%S`  "将hdfs的${current_table}表导入mysql...."
     #调整mysql的导入线程数
     writer_concurrency=2
     #调用Datax将hdfs文件导入mysql
     python ${dataxHome} /bin/datax .py ${dataxHome} /jobs/hdfsreader_to_mysqlwriter_1407525566122 .xml -p "-Dhdfs_table=${current_table} -Dexport_day=${export_day} -Dreader_concurrency=${reader_concurrency} -Dwriter_concurrency=${writer_concurrency} -Dmysql_table=${current_table}_${export_day}"
     
     #Updata Data Relationship after importing data into mysql.
     if  [ ${current_table} ==  "tb_userview_search"  ]
         then
           postSql="drop table  if  exists ${current_table}; \
          rename table ${current_table}_${export_day} to ${current_table}; \
          CREATE INDEX mdn_index ON ${current_table}(mdn);
           "
     else
          postSql="drop table  if  exists ${current_table}; \
          rename table ${current_table}_${export_day} to ${current_table}; \
          Alter table ${current_table} add primary key(mdn);
           "
     fi
     mysqlController ${postSql}
     echo  ` date  +%Y-%m-%d " " %H:%M:%S`  "......完成处理${current_table}表入mysql操作......"
}
for  tableItem  in  ${importTable[*]}
do
if  [ ${tableItem} ==  "tb_userview_domain"  -o ${tableItem} ==  "tb_userview_kpi_order"  -o ${tableItem} ==  "tb_userview_search"  -o ${tableItem} ==  "tb_userview_time"  -o ${tableItem} ==  "tb_userview_tag"  ]
    then
     commonImport ${tableItem}
else
     #delete dirty data
     preSql= "delete from ${tableItem} where day_id=${export_day};"
     mysqlController ${preSql}
     #调用Datax将hdfs文件导入mysql
     python ${dataxHome} /bin/datax .py ${dataxHome} /jobs/hdfsreader_to_mysqlwriter_1407525566122 .xml -p "-Dhdfs_table=${tableItem} -Dexport_day=${export_day} -Dreader_concurrency=${reader_concurrency} -Dwriter_concurrency=${writer_concurrency} -Dmysql_table=${tableItem}"
# >> ${workspace}/../logs/exportData.log
fi
done

简单说说我这个shell脚本的用途,主要是对datax中的job配置文件的动态参数进行控制。另外,根据公司业务的不同需求,这十几个需要导入mysql的表其中有些表在导入之前和导入之后需要做不同的完善工作,这个通过这shell来控制。对于这个Shell脚本我是花了点时间进行重构的,功能点还是比较清晰、简洁的。

步骤4:

    执行脚本:nohup ./export_hdfs2mysql.sh 20140815 >> ./../idigg_task/logs/export.log &  大功告成。


三、总结

    本blog主要介绍了datax框架、对它的部署、与hadoop2.x的兼容性修改和结合我的个人开发案例说了下datax的实际使用。整个Datax的部署和使用过程还是比较方便的,其效率也是相当不错,而且性能是可控的(通过job配置文件配置读、写线程数)。在大多数情况下,datax和sqoop的性能上可以作为互补,是一个相当不错的产品。另外,说说Shell。Shell是我个人最喜欢的一种威武工具,它不仅具有天然的操作系统原生优势,同时它具有强大的粘合作用,可以将各种技术非常完美的粘合在一个项目之中。熟练的掌握Shell的编写,可以使一个开发者的战斗力上升几个等级,这个是我在实际工作中总结出来的绝对的真理。


这篇关于Datax与hadoop2.x兼容部署与实际项目应用工作记录分享的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1055185

相关文章

Ilya-AI分享的他在OpenAI学习到的15个提示工程技巧

Ilya(不是本人,claude AI)在社交媒体上分享了他在OpenAI学习到的15个Prompt撰写技巧。 以下是详细的内容: 提示精确化:在编写提示时,力求表达清晰准确。清楚地阐述任务需求和概念定义至关重要。例:不用"分析文本",而用"判断这段话的情感倾向:积极、消极还是中性"。 快速迭代:善于快速连续调整提示。熟练的提示工程师能够灵活地进行多轮优化。例:从"总结文章"到"用

闲置电脑也能活出第二春?鲁大师AiNAS让你动动手指就能轻松部署

对于大多数人而言,在这个“数据爆炸”的时代或多或少都遇到过存储告急的情况,这使得“存储焦虑”不再是个别现象,而将会是随着软件的不断臃肿而越来越普遍的情况。从不少手机厂商都开始将存储上限提升至1TB可以见得,我们似乎正处在互联网信息飞速增长的阶段,对于存储的需求也将会不断扩大。对于苹果用户而言,这一问题愈发严峻,毕竟512GB和1TB版本的iPhone可不是人人都消费得起的,因此成熟的外置存储方案开

这15个Vue指令,让你的项目开发爽到爆

1. V-Hotkey 仓库地址: github.com/Dafrok/v-ho… Demo: 戳这里 https://dafrok.github.io/v-hotkey 安装: npm install --save v-hotkey 这个指令可以给组件绑定一个或多个快捷键。你想要通过按下 Escape 键后隐藏某个组件,按住 Control 和回车键再显示它吗?小菜一碟: <template

中文分词jieba库的使用与实景应用(一)

知识星球:https://articles.zsxq.com/id_fxvgc803qmr2.html 目录 一.定义: 精确模式(默认模式): 全模式: 搜索引擎模式: paddle 模式(基于深度学习的分词模式): 二 自定义词典 三.文本解析   调整词出现的频率 四. 关键词提取 A. 基于TF-IDF算法的关键词提取 B. 基于TextRank算法的关键词提取

水位雨量在线监测系统概述及应用介绍

在当今社会,随着科技的飞速发展,各种智能监测系统已成为保障公共安全、促进资源管理和环境保护的重要工具。其中,水位雨量在线监测系统作为自然灾害预警、水资源管理及水利工程运行的关键技术,其重要性不言而喻。 一、水位雨量在线监测系统的基本原理 水位雨量在线监测系统主要由数据采集单元、数据传输网络、数据处理中心及用户终端四大部分构成,形成了一个完整的闭环系统。 数据采集单元:这是系统的“眼睛”,

如何用Docker运行Django项目

本章教程,介绍如何用Docker创建一个Django,并运行能够访问。 一、拉取镜像 这里我们使用python3.11版本的docker镜像 docker pull python:3.11 二、运行容器 这里我们将容器内部的8080端口,映射到宿主机的80端口上。 docker run -itd --name python311 -p

csu 1446 Problem J Modified LCS (扩展欧几里得算法的简单应用)

这是一道扩展欧几里得算法的简单应用题,这题是在湖南多校训练赛中队友ac的一道题,在比赛之后请教了队友,然后自己把它a掉 这也是自己独自做扩展欧几里得算法的题目 题意:把题意转变下就变成了:求d1*x - d2*y = f2 - f1的解,很明显用exgcd来解 下面介绍一下exgcd的一些知识点:求ax + by = c的解 一、首先求ax + by = gcd(a,b)的解 这个

hdu1394(线段树点更新的应用)

题意:求一个序列经过一定的操作得到的序列的最小逆序数 这题会用到逆序数的一个性质,在0到n-1这些数字组成的乱序排列,将第一个数字A移到最后一位,得到的逆序数为res-a+(n-a-1) 知道上面的知识点后,可以用暴力来解 代码如下: #include<iostream>#include<algorithm>#include<cstring>#include<stack>#in

阿里开源语音识别SenseVoiceWindows环境部署

SenseVoice介绍 SenseVoice 专注于高精度多语言语音识别、情感辨识和音频事件检测多语言识别: 采用超过 40 万小时数据训练,支持超过 50 种语言,识别效果上优于 Whisper 模型。富文本识别:具备优秀的情感识别,能够在测试数据上达到和超过目前最佳情感识别模型的效果。支持声音事件检测能力,支持音乐、掌声、笑声、哭声、咳嗽、喷嚏等多种常见人机交互事件进行检测。高效推

【专题】2024飞行汽车技术全景报告合集PDF分享(附原数据表)

原文链接: https://tecdat.cn/?p=37628 6月16日,小鹏汇天旅航者X2在北京大兴国际机场临空经济区完成首飞,这也是小鹏汇天的产品在京津冀地区进行的首次飞行。小鹏汇天方面还表示,公司准备量产,并计划今年四季度开启预售小鹏汇天分体式飞行汽车,探索分体式飞行汽车城际通勤。阅读原文,获取专题报告合集全文,解锁文末271份飞行汽车相关行业研究报告。 据悉,业内人士对飞行汽车行业