Hadoop确实是处理海量离线数据的利器

2024-03-18 20:32

本文主要是介绍Hadoop确实是处理海量离线数据的利器,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

不得不说,Hadoop确实是处理海量离线数据的利器,当然,凡是一个东西有优点必定也有缺点,hadoop的缺点也很多,比如对流式计算,实时计算,DAG具有依赖关系的计算,支持都不友好,所以,由此诞生了很多新的分布式计算框架,Storm,Spark,Tez,impala,drill,等等,他们都是针对特定问题提出一种解决方案,新框架的的兴起,并不意味者他们就可以替代hadoop,一手独大,HDFS和MapReduce依旧是很优秀的,特别是对离线海量数据的处理。

hadoop在如下的几种应用场景里,用的还是非常广泛的,1,搜索引擎建索引,2,topK热关键词统计,3,海量日志的数据分析等等。
散仙,今天的这个例子的场景要对几亿的单词或短语做统计和并按词频排序,当然这些需求都是类似WordCount问题,如果你把Hadoop自带的WordCount的例子,给搞懂了,基本上做一些IP,热词的统计与分析就很很容易了,WordCount问题,确实是一个非常具有代表性的例子。


下面进入正题,先来分析下散仙这个例子的需求,总共需要二步来完成,第一步就是对短语的统计,第二步就是对结果集的排序。所以如果使用MapReduce来完成的话,就得需要2个作业来完成这件事情,第一个作业来统计词频,第二个来负责进行排序,当然这两者之间是有依赖关系的,第二个作业的执行,需要依赖第一个作业的结果,这就是典型的M,R,R的问题并且作业之间具有依赖关系,这种问题使用MapReduce来完成,效率可能有点低,如果使用支持DAG作业的Tez来做这件事情,那么就很简单了。不过本篇散仙,要演示的例子还是基于MapReduce来完成的,有兴趣的朋友,可以研究一下使用Tez。


对于第一个作业,我们只需要改写wordcount的例子,即可,因为散仙的需求里面涉及短语的统计,所以定义的格式为,短语和短语之间使用分号隔开,(默认的格式是按单词统计的,以空格为分割符)在map时只需要,按分号打散成数组,进行处理即可,测试的数据内容如下:



map里面的核心代码如下:

Java代码 复制代码  收藏代码
  1. /** 
  2.    * 统计词频的map端 
  3.    * 代码 
  4.    *  
  5.    * **/  
  6.   public void map(Object key, Text value, Context context  
  7.                   ) throws IOException, InterruptedException {  
  8.    
  9.     String [] data=value.toString().split(";");//按每行的分号拆分短语  
  10.     for(String s:data){  
  11.         if(s.trim().length()>0){//忽略空字符  
  12.         word.set(s);  
  13.         context.write(word, one);  
  14.         }  
  15.     }  
  16.   
  17.   }  
  /*** 统计词频的map端* 代码* * **/public void map(Object key, Text value, Context context) throws IOException, InterruptedException {String [] data=value.toString().split(";");//按每行的分号拆分短语for(String s:data){if(s.trim().length()>0){//忽略空字符word.set(s);context.write(word, one);}}}



reduce端的核心代码如下:

Java代码 复制代码  收藏代码
  1. /** 
  2.      * reduce端的 
  3.      * 代码 
  4.      * **/  
  5.     public void reduce(Text key, Iterable<IntWritable> values,   
  6.                        Context context  
  7.                        ) throws IOException, InterruptedException {  
  8.       int sum = 0;  
  9.       for (IntWritable val : values) {  
  10.         sum += val.get();//累加词频  
  11.       }  
  12.       result.set(sum);  
  13.       context.write(new Text(key.toString()+"::"), result);//为方便短语排序,以双冒号分隔符间隔  
  14.     }  
  15.   }  
/*** reduce端的* 代码* **/public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get();//累加词频}result.set(sum);context.write(new Text(key.toString()+"::"), result);//为方便短语排序,以双冒号分隔符间隔}}


main函数里面的代码如下:

Java代码 复制代码  收藏代码
  1. public static void main(String[] args) throws Exception {  
  2.   Configuration conf = new Configuration();  
  3.   String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();  
  4.   if (otherArgs.length != 2) {  
  5.     System.err.println("Usage: wordcount <in> <out>");  
  6.     System.exit(2);  
  7.   }  
  8.   Job job = new Job(conf, "word count");  
  9.   job.setJarByClass(WordCount.class);  
  10.   job.setMapperClass(TokenizerMapper.class);  
  11.   job.setCombinerClass(IntSumReducer.class);  
  12.   job.setReducerClass(IntSumReducer.class);  
  13.   job.setOutputKeyClass(Text.class);  
  14.   job.setOutputValueClass(IntWritable.class);  
  15.   FileInputFormat.addInputPath(job, new Path(otherArgs[0]));  
  16.   FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));  
  17.   System.exit(job.waitForCompletion(true) ? 0 : 1);  
  18. }  
  public static void main(String[] args) throws Exception {Configuration conf = new Configuration();String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length != 2) {System.err.println("Usage: wordcount <in> <out>");System.exit(2);}Job job = new Job(conf, "word count");job.setJarByClass(WordCount.class);job.setMapperClass(TokenizerMapper.class);job.setCombinerClass(IntSumReducer.class);job.setReducerClass(IntSumReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job, new Path(otherArgs[0]));FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}


运行结果,如下所示:

Java代码 复制代码  收藏代码
  1. a good student::    1  
  2. good student::  3  
  3. patient::   2  
  4. patient a:: 1  
a good student::	1
good student::	3
patient::	2
patient a::	1



下面,散仙来分析下排序作业的代码,如上图所示hadoop默认的排序,是基于key排序的,如果是字符类型的则基于字典表排序,如果是数值类型的则基于数字大小排序,两种方式都是按默认的升序排列的,如果想要降序输出,就需要我们自己写个排序组件了,散仙会在下面的代码给出例子,因为我们是要基于词频排序的,所以需要反转K,V来实现对词频的排序,map端代码如下:

Java代码 复制代码  收藏代码
  1. /** 
  2.          * 排序作业 
  3.          * map的实现 
  4.          *  
  5.          * **/  
  6.         @Override  
  7.         protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {  
  8.             String s[]=value.toString().split("::");//按两个冒号拆分每行数据  
  9.             word.set(s[0]);//  
  10.             one.set(Integer.parseInt(s[1].trim()));//  
  11.             context.write(one, word);//注意,此部分,需要反转K,V顺序  
  12.         }  
/*** 排序作业* map的实现* * **/@Overrideprotected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {String s[]=value.toString().split("::");//按两个冒号拆分每行数据word.set(s[0]);//one.set(Integer.parseInt(s[1].trim()));//context.write(one, word);//注意,此部分,需要反转K,V顺序}


reduce端代码如下:

Java代码 复制代码  收藏代码
  1. /*** 
  2.   *  
  3.   * 排序作业的 
  4.   * reduce代码 
  5.   * **/       
  6.         @Override  
  7.         protected void reduce(IntWritable arg0, Iterable<Text> arg1, Context arg2)  
  8.                 throws IOException, InterruptedException {  
  9.             for(Text t:arg1){  
  10.                 result.set(t.toString());  
  11.                  arg2.write(result, arg0);  
  12.             }  
  13.         }  
/**** * 排序作业的* reduce代码* **/		@Overrideprotected void reduce(IntWritable arg0, Iterable<Text> arg1, Context arg2)throws IOException, InterruptedException {for(Text t:arg1){result.set(t.toString());arg2.write(result, arg0);}}




下面,我们再来看下排序组件的代码:

Java代码 复制代码  收藏代码
  1. /*** 
  2.  * 按词频降序排序 
  3.  * 的类 
  4.  *  
  5.  * **/  
  6.     public static class DescSort extends  WritableComparator{  
  7.   
  8.          public DescSort() {  
  9.              super(IntWritable.class,true);//注册排序组件  
  10.         }  
  11.          @Override  
  12.         public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3,  
  13.                 int arg4, int arg5) {  
  14.             return -super.compare(arg0, arg1, arg2, arg3, arg4, arg5);//注意使用负号来完成降序  
  15.         }  
  16.            
  17.          @Override  
  18.         public int compare(Object a, Object b) {  
  19.        
  20.             return   -super.compare(a, b);//注意使用负号来完成降序  
  21.         }  
  22.           
  23.     }  
/**** 按词频降序排序* 的类* * **/public static class DescSort extends  WritableComparator{public DescSort() {super(IntWritable.class,true);//注册排序组件}@Overridepublic int compare(byte[] arg0, int arg1, int arg2, byte[] arg3,int arg4, int arg5) {return -super.compare(arg0, arg1, arg2, arg3, arg4, arg5);//注意使用负号来完成降序}@Overridepublic int compare(Object a, Object b) {return   -super.compare(a, b);//注意使用负号来完成降序}}



main方法里面的实现代码如下所示:

Java代码 复制代码  收藏代码
  1. public static void main(String[] args) throws Exception{  
  2.           
  3.           
  4.           
  5.           Configuration conf = new Configuration();  
  6.             String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();  
  7.             if (otherArgs.length != 2) {  
  8.                 System.err.println("Usage: wordcount <in> <out>");  
  9.                 System.exit(2);  
  10.               }  
  11.            Job job=new Job(conf, "sort");  
  12.            job.setOutputKeyClass(IntWritable.class);  
  13.            job.setOutputValueClass(Text.class);  
  14.            job.setMapperClass(SortIntValueMapper.class);  
  15.            job.setReducerClass(SortIntValueReducer.class)   ;  
  16.            job.setSortComparatorClass(DescSort.class);//加入排序组件  
  17.            job.setInputFormatClass(org.apache.hadoop.mapreduce.lib.input.TextInputFormat.class);  
  18.            job.setOutputFormatClass(TextOutputFormat.class);  
  19.            FileInputFormat.setInputPaths(job, new Path(args[0]));  
  20.            FileOutputFormat.setOutputPath(job, new Path(args[1]));  
  21.            
  22.            System.exit(job.waitForCompletion(true) ? 0 : 1);  
  23.     }  
public static void main(String[] args) throws Exception{Configuration conf = new Configuration();String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length != 2) {System.err.println("Usage: wordcount <in> <out>");System.exit(2);}Job job=new Job(conf, "sort");job.setOutputKeyClass(IntWritable.class);job.setOutputValueClass(Text.class);job.setMapperClass(SortIntValueMapper.class);job.setReducerClass(SortIntValueReducer.class)	;job.setSortComparatorClass(DescSort.class);//加入排序组件job.setInputFormatClass(org.apache.hadoop.mapreduce.lib.input.TextInputFormat.class);job.setOutputFormatClass(TextOutputFormat.class);FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}



输出结果,如下所示:

Java代码 复制代码  收藏代码
  1. good student    3  
  2. patient 2  
  3. a good student  1  
  4. patient a   1  
good student	3
patient	2
a good student	1
patient a	1



至此,我们可以成功实现,统计并排序的业务,当然这种类型的需求非常多而且常见,如对某个海量日志IP的分析,散仙上面的例子使用的只是测试的数据,而真实数据是对几亿或几十亿的短语构建语料库使用,配置集群方面,可以根据自己的需求,配置集群的节点个数以及map,reduce的个数,而代码,只需要我们写好,提交给hadoop集群执行即可。

最后在简单总结一下,数据处理过程中,格式是需要提前定制好的,也就是说你得很清楚的你的格式代表什么意思,另外一点,关于hadoop的中文编码问题,这个是内部固定的UTF-8格式,如果你是GBK的文件编码,则需要自己单独在map或reduce过程中处理一下,否则输出的结果可能是乱码,最好的方法就是统一成UTF-8格式,否则,很容易出现一些编码问题的。

这篇关于Hadoop确实是处理海量离线数据的利器的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/823555

相关文章

MyBatisPlus如何优化千万级数据的CRUD

《MyBatisPlus如何优化千万级数据的CRUD》最近负责的一个项目,数据库表量级破千万,每次执行CRUD都像走钢丝,稍有不慎就引起数据库报警,本文就结合这个项目的实战经验,聊聊MyBatisPl... 目录背景一、MyBATis Plus 简介二、千万级数据的挑战三、优化 CRUD 的关键策略1. 查

python实现对数据公钥加密与私钥解密

《python实现对数据公钥加密与私钥解密》这篇文章主要为大家详细介绍了如何使用python实现对数据公钥加密与私钥解密,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录公钥私钥的生成使用公钥加密使用私钥解密公钥私钥的生成这一部分,使用python生成公钥与私钥,然后保存在两个文

mysql中的数据目录用法及说明

《mysql中的数据目录用法及说明》:本文主要介绍mysql中的数据目录用法及说明,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1、背景2、版本3、数据目录4、总结1、背景安装mysql之后,在安装目录下会有一个data目录,我们创建的数据库、创建的表、插入的

Navicat数据表的数据添加,删除及使用sql完成数据的添加过程

《Navicat数据表的数据添加,删除及使用sql完成数据的添加过程》:本文主要介绍Navicat数据表的数据添加,删除及使用sql完成数据的添加过程,具有很好的参考价值,希望对大家有所帮助,如有... 目录Navicat数据表数据添加,删除及使用sql完成数据添加选中操作的表则出现如下界面,查看左下角从左

SpringBoot中4种数据水平分片策略

《SpringBoot中4种数据水平分片策略》数据水平分片作为一种水平扩展策略,通过将数据分散到多个物理节点上,有效解决了存储容量和性能瓶颈问题,下面小编就来和大家分享4种数据分片策略吧... 目录一、前言二、哈希分片2.1 原理2.2 SpringBoot实现2.3 优缺点分析2.4 适用场景三、范围分片

Redis分片集群、数据读写规则问题小结

《Redis分片集群、数据读写规则问题小结》本文介绍了Redis分片集群的原理,通过数据分片和哈希槽机制解决单机内存限制与写瓶颈问题,实现分布式存储和高并发处理,但存在通信开销大、维护复杂及对事务支持... 目录一、分片集群解android决的问题二、分片集群图解 分片集群特征如何解决的上述问题?(与哨兵模

浅析如何保证MySQL与Redis数据一致性

《浅析如何保证MySQL与Redis数据一致性》在互联网应用中,MySQL作为持久化存储引擎,Redis作为高性能缓存层,两者的组合能有效提升系统性能,下面我们来看看如何保证两者的数据一致性吧... 目录一、数据不一致性的根源1.1 典型不一致场景1.2 关键矛盾点二、一致性保障策略2.1 基础策略:更新数

Oracle 数据库数据操作如何精通 INSERT, UPDATE, DELETE

《Oracle数据库数据操作如何精通INSERT,UPDATE,DELETE》在Oracle数据库中,对表内数据进行增加、修改和删除操作是通过数据操作语言来完成的,下面给大家介绍Oracle数... 目录思维导图一、插入数据 (INSERT)1.1 插入单行数据,指定所有列的值语法:1.2 插入单行数据,指

电脑提示xlstat4.dll丢失怎么修复? xlstat4.dll文件丢失处理办法

《电脑提示xlstat4.dll丢失怎么修复?xlstat4.dll文件丢失处理办法》长时间使用电脑,大家多少都会遇到类似dll文件丢失的情况,不过,解决这一问题其实并不复杂,下面我们就来看看xls... 在Windows操作系统中,xlstat4.dll是一个重要的动态链接库文件,通常用于支持各种应用程序

SQL Server修改数据库名及物理数据文件名操作步骤

《SQLServer修改数据库名及物理数据文件名操作步骤》在SQLServer中重命名数据库是一个常见的操作,但需要确保用户具有足够的权限来执行此操作,:本文主要介绍SQLServer修改数据... 目录一、背景介绍二、操作步骤2.1 设置为单用户模式(断开连接)2.2 修改数据库名称2.3 查找逻辑文件名