本文主要是介绍Datax与hadoop2.x兼容部署与实际项目应用工作记录分享,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
一、概述
Hadoop的版本更新挺快的,已经到了2.4,但是其周边工具的更新速度还是比较慢的,一些旧的周边工具版本对hadoop2.x的兼容性做得还不完善,特别是sqoop。最近,在为hadoop2.2.0找适合的sqoop版本时遇到了很多问题。尝试了多个sqoop1.4.x版本的直接简单粗暴的报版本不兼容问题,其中测了sqoop-1.4.4.bin__hadoop-0.23这个版本,在该版本中直接用sqoop的脚本export HDFS的数据是没有问题的,但是一旦调用JAVA API来进行对HDFS的数据的export的时候就各种不兼容问题,原因是这个版本的API也是基于hadoop1.x来写的。另外还尝试了使用sqoop2(之前blog写过关于sqoop2的部署和使用情况:http://zengzhaozheng.blog.51cto.com/8219051/1431882 ),这个版本取消了sqoop1的脚本执行方式,可以采取交互式、api或者rest的方式工作,但是我在使用的过程中还是存在的一些问题:sqoop2(我用的是1.99.3)无法指定列的分隔符、对\N等字符的处理有问题、对列值的类型判断存在问题等(其详细问题所在请看,sqoop1.99.3源代码的org.apache.sqoop.job.io.Data类)。
这个礼拜终于找到了一个比较好的方案来取代sqoop作为HDFS到mysql的数据export模块,那就是大淘宝开源的datax。虽然datax采用的是单机方式的作业方式,但是经过试验我对比了一下其和sqoop性能上的差异,在数据量不是特别大的情况下datax和sqoop的性能相差不是很明显的,在少量数据的情况下datax的性能稍微好点。
这篇blog将简单介绍一下这个datax这个框架以及它的用法,特别地说说如果修改datax才能使得datax运行在hadoop2.x上(datax是基于hadoop1.x进行开发的)。另外,主要和大家分享一下我在自己项目中如何使用datax,如何通过自己编写的shell脚本将datax、mysql和项目粘合起来。
二、datax简介和datax在hadoop2.x上的兼容部署
1、datax简介
DataX是一个在异构的数据库/文件系统之间高速交换数据的工具,实现了在任意的数据处理系统(RDBMS/Hdfs/Local filesystem)之间的数据交换。Datax框架中我最欣赏的就是基于插件的模式,你在部署的时候可以只安装那些用到的Reader/Writer插件rpm包,没有用的可以不用安装。同时,你也可以根据自己的特殊需求很快的写出Reader、Writer。Datax采用Framework + plugin架构构建,Framework处理了缓冲,流控,并发,上下文加载等高速数据交换的大部分技术问题,提供了简单的接口与插件交互,插件仅需实现对数据处理系统的访问。Datax的运行方式采用stand-alone方式,在数据传输过程在单进程内完成,全内存操作,不读写磁盘,也没有IPC通信。下面是一个来自大淘宝开源官网的datax架构图:
各个组件的作用:
-
Job: 一道数据同步作业
-
Splitter: 作业切分模块,将一个大任务与分解成多个可以并发的小任务.
-
Sub-job: 数据同步作业切分后的小任务
-
Reader(Loader): 数据读入模块,负责运行切分后的小任务,将数据从源头装载入DataX
-
Storage: Reader和Writer通过Storage交换数据
-
Writer(Dumper): 数据写出模块,负责将数据从DataX导入至目的数据地
Datax内置插件:
DataX框架内部通过双缓冲队列、线程池封装等技术,集中处理了高速数据交换遇到的问题,提供简单的接口与插件交互,插件分为Reader和Writer两类,基于框架提供的插件接口,可以十分便捷的开发出需要的插件。比如想要从oracle导出数据到mysql,那么需要做的就是开发出OracleReader和MysqlWriter插件,装配到框架上即可。并且这样的插件一般情况下在其他数据交换场合是可以通用的。更大的惊喜是我们已经开发了如下插件:
Reader插件
-
hdfsreader : 支持从hdfs文件系统获取数据。
-
mysqlreader: 支持从mysql数据库获取数据。
-
sqlserverreader: 支持从sqlserver数据库获取数据。
-
oraclereader : 支持从oracle数据库获取数据。
-
streamreader: 支持从stream流获取数据(常用于测试)
-
httpreader : 支持从http URL获取数据。
Writer插件
-
hdfswriter:支持向hdbf写入数据。
-
mysqlwriter:支持向mysql写入数据。
-
oraclewriter:支持向oracle写入数据。
-
streamwriter:支持向stream流写入数据。(常用于测试)
2、datax在hadoop2.x上的兼容部署
Datax是基于Hadoop1.x开发的,因此要想基于HADOOP2.x使用hdfsreader和hdfswriter插件,那么必须对这些插件的本地库以及一些jar包替换掉,同时要增加Hadoop2.x所需的依赖包,下面以hdfsreader为例说明:
进入到plugins目录找到hdfsreader,将hadoop-0.19.2-core.jar删除,将本地库替换为$HAOOP_HOME2.x/lib/native/libhadoop.so。同时添加Hadoop2.x的依赖包,如下图:
另外,Datax需要hadoop1.x的hadoop-core.xml配置文件,但是hadoop2.x中不存在这个文件,这里有一个解决方法,就是将各个配置文件的配置项都集中写到一个新建的配置文件中,单独有datax使用,这个配置文件在datax的job xml文件由参数hadoop-conf配上。到现在为止,datax与hadoop2.x的兼容性修改已经完成了。
还要做其他环境的调整,确保java版本>=1.6,python的版本>=2.6(对于python的版本选择上,个人推荐2.6或者2.7,如果pytyon版本上到3.x的话会有错误,个人经验)。最后修改一下各个插件的rpm包的build路径:
下面以t_dp_datax_engine.spec为例子:
上面红色方框的地方是指build rpm 插件后新产生的文件夹位置,改为自己编辑的目录。
下面以t_dp_datax_engine.spec为例子,看看怎么build rpm 插件:
具体执行过程如下:
1、请先check out一份DataX源码,并cd切换到DataX源码中的rpm目录
2、编译打包DataX engine包,使用rpmbuild --ba t_dp_datax_engine.spec(请确保有root权限),打包生成的rpm后如下图所示
Rpm制作完成后,即可分发、安装,例如使用
rpm -ivh t_dp_datax_engine.rpm
即可安装DataX engine 包,需要注意的是engine的rpm地址源自于上图的截图中信息。
如下图:
安装完成后,在/home/taobao/datax/目录下会存在如下文件:
其他的插件按照这种方式按照好就ok了。
三、datax的实际应用记录分享
在blog的这部分主要分享一下我对datax使用的一个小案例,希望能够给初用datax的同学一点点参照。
-
具体业务场景:
需要将存储在HDFS上的一些表export到mysql中,不希望datax对每一个表的export操作都产生一个job xml文件,希望对不同的表动态使用同一个 job xml文件(这个用datax配置文件动态参数结合shell实现)。同时,根据公司业务的需求当不同的HDFS 表export到mysql的前后还需要做一些基于mysql的DML操作(这个可以通过datax 配置文件中的pre以及post参数进行配置,但是我为了方便流程的控制用shell取代了)。
-
实现步骤:
步骤1:
执行$DATAX_HOME/bin/datax.py -e命令,选择data source来源,这里我们选择7:
接着选择export的目标源,这个我们选择0:
步骤2:
根据自己的业务需求和HADOOP的相应环境配置产生的job xml,进入到$DATAX_HOME/jobs,编辑job配置文件,我的配置如下(里边的一些动态参数有下面我自己写的Shell中进行控制):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 | <?xml version="1.0" encoding="UTF-8"?> <jobs> <job id="hdfsreader_to_mysqlwriter_job"> <reader> <plugin>hdfsreader</plugin> <!-- description:HDFS login account, e.g. 'username, groupname(groupname...),#password mandatory:true name:ugi --> <param key="hadoop.job.ugi" value="hadoop,supergroup#jpkjcluster"/> <!-- description:hadoop-site.xml path mandatory:false name:hadoop_conf --> <param key="hadoop_conf" value="/data/hadoop/hadoop-2.2.0/etc/hadoop/datax_hadoop_conf.xml"/> <!-- description:hdfs path, format like: hdfs://ip:port/path, or file:///home/taobao/ mandatory:true name:dir --> <param key="dir" value="hdfs://172.16.8.1:8020/user/hive/warehouse/jl.db/${hdfs_table}/day=${export_day}"/> <!-- default:\t description:how to sperate a line mandatory:false name:fieldSplit --> <param key="field_split" value=","/> <!-- default:UTF-8 range:UTF-8|GBK|GB2312 description:hdfs encode mandatory:false name:encoding --> <param key="encoding" value="UTF-8"/> <!-- default:4096 range:[1024-4194304] description:how large the buffer mandatory:false name:bufferSize --> <param key="buffer_size" value="4096"/> <!-- default:\N range: description:replace the nullstring to null mandatory:false name:nullString --> <param key="nullstring" value="\N"/> <!-- default:true range:true|false description:ingore key mandatory:false name:ignoreKey --> <param key="ignore_key" value="true"/> <!-- default: range: description:how to filter column mandatory:false name:colFilter <param key="col_filter" value="?"/> --> <!-- default:1 range:1-100 description:concurrency of the job mandatory:false name:concurrency --> <param key="concurrency" value="${reader_concurrency}"/> </reader> <writer> <plugin>mysqlwriter</plugin> <!-- description:Mysql database ip address mandatory:true name:ip --> <param key="ip" value="jl-master"/> <!-- default:3306 description:Mysql database port mandatory:true name:port --> <param key="port" value="3306"/> <!-- description:Mysql database name mandatory:true name:dbname --> <param key="dbname" value="newidigg_jilin"/> <!-- description:Mysql database login username mandatory:true name:username --> <param key="username" value="hadoop"/> <!-- description:Mysql database login password mandatory:true name:password --> <param key="password" value="jpkjcluster"/> <!-- default: range: description:table to be dumped data into mandatory:true name:table --> <param key="table" value="${mysql_table}"/> <!-- range: description:order of columns mandatory:false name:colorder <param key="colorder" value="?"/> --> <!-- default:UTF-8 range:UTF-8|GBK|GB2312 description: mandatory:false name:encoding --> <param key="encoding" value="UTF-8"/> <!-- description:execute sql before dumping data mandatory:false name:pre <param key="pre" value="${preSql}"/> --> <!-- description:execute sql after dumping data mandatory:false name:post <param key="post" value="${postSql}"/> --> <!-- default:0 range:[0-65535] description:error limit mandatory:false name:limit --> <param key="limit" value="0"/> <!-- mandatory:false name:set <param key="set" value="?"/> --> <!-- default:false range:[true/false] mandatory:false name:replace --> <param key="replace" value="false"/> <!-- range:params1|params2|... description:mysql driver params mandatory:false name:mysql.params <param key="mysql.params" value="?"/> --> <!-- default:1 range:1-100 description:concurrency of the job mandatory:false --> <param key="concurrency" value="${writer_concurrency}"/> </writer> </job> </jobs> |
步骤3:
编写Shell脚本export_hdfs2mysql.sh对整个Datax作业根据业务需求进行控制:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 | #!/bin/bash #author:曾昭正 #create time:2014-08-14 workspace=` dirname $0` dataxHome= '/data/hadoop/datax' export_day=$1 reader_concurrency=1 writer_concurrency=1 mysqlUser= 'hadoop' mysqlPassword= 'jpkjcluster' mysqlServerHost= 'jl-master' currentDatabase= 'newidigg_jilin' preSql= '' postSql= '' importTable=( 'tb_userview_domain_noMdn' 'tb_fact_app_v2' 'tb_fact_domain' 'tb_fact_tag' 'tb_fact_top5_www' 'tb_fact_upwww_time' \ 'tb_fact_search' 'tb_userview_domain' 'tb_userview_kpi_order' 'tb_userview_search' 'tb_userview_time' 'tb_userview_tag' ); #function which is used to DDL or DML msyql function mysqlController(){ #这里注意一下:这里的$1不同于整个脚本的参数$1,这里是指函数的第一个参数 local sqlString=$* echo ` date +%Y-%m-%d " " %H:%M:%S` "执行:${sqlString}" mysql -u ${mysqlUser} --password=${mysqlPassword} -h ${mysqlServerHost} -e " use ${currentDatabase}; ${sqlString}; " } #通用表导入模块 function commonImport(){ local current_table=$1 #create temporary table before importing data into mysql. echo ` date +%Y-%m-%d " " %H:%M:%S` "......进入处理${current_table}表入mysql库环节......" echo ` date +%Y-%m-%d " " %H:%M:%S` "入库前创建临时表" preSql= "drop table if exists ${current_table}_${export_day};create table ${current_table}_${export_day} like ${current_table}" mysqlController ${preSql} #import data from hdfs into msyql. echo ` date +%Y-%m-%d " " %H:%M:%S` "将hdfs的${current_table}表导入mysql...." #调整mysql的导入线程数 writer_concurrency=2 #调用Datax将hdfs文件导入mysql python ${dataxHome} /bin/datax .py ${dataxHome} /jobs/hdfsreader_to_mysqlwriter_1407525566122 .xml -p "-Dhdfs_table=${current_table} -Dexport_day=${export_day} -Dreader_concurrency=${reader_concurrency} -Dwriter_concurrency=${writer_concurrency} -Dmysql_table=${current_table}_${export_day}" #Updata Data Relationship after importing data into mysql. if [ ${current_table} == "tb_userview_search" ] then postSql="drop table if exists ${current_table}; \ rename table ${current_table}_${export_day} to ${current_table}; \ CREATE INDEX mdn_index ON ${current_table}(mdn); " else postSql="drop table if exists ${current_table}; \ rename table ${current_table}_${export_day} to ${current_table}; \ Alter table ${current_table} add primary key(mdn); " fi mysqlController ${postSql} echo ` date +%Y-%m-%d " " %H:%M:%S` "......完成处理${current_table}表入mysql操作......" } for tableItem in ${importTable[*]} do if [ ${tableItem} == "tb_userview_domain" -o ${tableItem} == "tb_userview_kpi_order" -o ${tableItem} == "tb_userview_search" -o ${tableItem} == "tb_userview_time" -o ${tableItem} == "tb_userview_tag" ] then commonImport ${tableItem} else #delete dirty data preSql= "delete from ${tableItem} where day_id=${export_day};" mysqlController ${preSql} #调用Datax将hdfs文件导入mysql python ${dataxHome} /bin/datax .py ${dataxHome} /jobs/hdfsreader_to_mysqlwriter_1407525566122 .xml -p "-Dhdfs_table=${tableItem} -Dexport_day=${export_day} -Dreader_concurrency=${reader_concurrency} -Dwriter_concurrency=${writer_concurrency} -Dmysql_table=${tableItem}" # >> ${workspace}/../logs/exportData.log fi done |
简单说说我这个shell脚本的用途,主要是对datax中的job配置文件的动态参数进行控制。另外,根据公司业务的不同需求,这十几个需要导入mysql的表其中有些表在导入之前和导入之后需要做不同的完善工作,这个通过这shell来控制。对于这个Shell脚本我是花了点时间进行重构的,功能点还是比较清晰、简洁的。
步骤4:
执行脚本:nohup ./export_hdfs2mysql.sh 20140815 >> ./../idigg_task/logs/export.log & 大功告成。
三、总结
本blog主要介绍了datax框架、对它的部署、与hadoop2.x的兼容性修改和结合我的个人开发案例说了下datax的实际使用。整个Datax的部署和使用过程还是比较方便的,其效率也是相当不错,而且性能是可控的(通过job配置文件配置读、写线程数)。在大多数情况下,datax和sqoop的性能上可以作为互补,是一个相当不错的产品。另外,说说Shell。Shell是我个人最喜欢的一种威武工具,它不仅具有天然的操作系统原生优势,同时它具有强大的粘合作用,可以将各种技术非常完美的粘合在一个项目之中。熟练的掌握Shell的编写,可以使一个开发者的战斗力上升几个等级,这个是我在实际工作中总结出来的绝对的真理。
这篇关于Datax与hadoop2.x兼容部署与实际项目应用工作记录分享的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!