Hadoop确实是处理海量离线数据的利器

2024-03-18 20:32

本文主要是介绍Hadoop确实是处理海量离线数据的利器,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

不得不说,Hadoop确实是处理海量离线数据的利器,当然,凡是一个东西有优点必定也有缺点,hadoop的缺点也很多,比如对流式计算,实时计算,DAG具有依赖关系的计算,支持都不友好,所以,由此诞生了很多新的分布式计算框架,Storm,Spark,Tez,impala,drill,等等,他们都是针对特定问题提出一种解决方案,新框架的的兴起,并不意味者他们就可以替代hadoop,一手独大,HDFS和MapReduce依旧是很优秀的,特别是对离线海量数据的处理。

hadoop在如下的几种应用场景里,用的还是非常广泛的,1,搜索引擎建索引,2,topK热关键词统计,3,海量日志的数据分析等等。
散仙,今天的这个例子的场景要对几亿的单词或短语做统计和并按词频排序,当然这些需求都是类似WordCount问题,如果你把Hadoop自带的WordCount的例子,给搞懂了,基本上做一些IP,热词的统计与分析就很很容易了,WordCount问题,确实是一个非常具有代表性的例子。


下面进入正题,先来分析下散仙这个例子的需求,总共需要二步来完成,第一步就是对短语的统计,第二步就是对结果集的排序。所以如果使用MapReduce来完成的话,就得需要2个作业来完成这件事情,第一个作业来统计词频,第二个来负责进行排序,当然这两者之间是有依赖关系的,第二个作业的执行,需要依赖第一个作业的结果,这就是典型的M,R,R的问题并且作业之间具有依赖关系,这种问题使用MapReduce来完成,效率可能有点低,如果使用支持DAG作业的Tez来做这件事情,那么就很简单了。不过本篇散仙,要演示的例子还是基于MapReduce来完成的,有兴趣的朋友,可以研究一下使用Tez。


对于第一个作业,我们只需要改写wordcount的例子,即可,因为散仙的需求里面涉及短语的统计,所以定义的格式为,短语和短语之间使用分号隔开,(默认的格式是按单词统计的,以空格为分割符)在map时只需要,按分号打散成数组,进行处理即可,测试的数据内容如下:



map里面的核心代码如下:

Java代码 复制代码  收藏代码
  1. /** 
  2.    * 统计词频的map端 
  3.    * 代码 
  4.    *  
  5.    * **/  
  6.   public void map(Object key, Text value, Context context  
  7.                   ) throws IOException, InterruptedException {  
  8.    
  9.     String [] data=value.toString().split(";");//按每行的分号拆分短语  
  10.     for(String s:data){  
  11.         if(s.trim().length()>0){//忽略空字符  
  12.         word.set(s);  
  13.         context.write(word, one);  
  14.         }  
  15.     }  
  16.   
  17.   }  
  /*** 统计词频的map端* 代码* * **/public void map(Object key, Text value, Context context) throws IOException, InterruptedException {String [] data=value.toString().split(";");//按每行的分号拆分短语for(String s:data){if(s.trim().length()>0){//忽略空字符word.set(s);context.write(word, one);}}}



reduce端的核心代码如下:

Java代码 复制代码  收藏代码
  1. /** 
  2.      * reduce端的 
  3.      * 代码 
  4.      * **/  
  5.     public void reduce(Text key, Iterable<IntWritable> values,   
  6.                        Context context  
  7.                        ) throws IOException, InterruptedException {  
  8.       int sum = 0;  
  9.       for (IntWritable val : values) {  
  10.         sum += val.get();//累加词频  
  11.       }  
  12.       result.set(sum);  
  13.       context.write(new Text(key.toString()+"::"), result);//为方便短语排序,以双冒号分隔符间隔  
  14.     }  
  15.   }  
/*** reduce端的* 代码* **/public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get();//累加词频}result.set(sum);context.write(new Text(key.toString()+"::"), result);//为方便短语排序,以双冒号分隔符间隔}}


main函数里面的代码如下:

Java代码 复制代码  收藏代码
  1. public static void main(String[] args) throws Exception {  
  2.   Configuration conf = new Configuration();  
  3.   String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();  
  4.   if (otherArgs.length != 2) {  
  5.     System.err.println("Usage: wordcount <in> <out>");  
  6.     System.exit(2);  
  7.   }  
  8.   Job job = new Job(conf, "word count");  
  9.   job.setJarByClass(WordCount.class);  
  10.   job.setMapperClass(TokenizerMapper.class);  
  11.   job.setCombinerClass(IntSumReducer.class);  
  12.   job.setReducerClass(IntSumReducer.class);  
  13.   job.setOutputKeyClass(Text.class);  
  14.   job.setOutputValueClass(IntWritable.class);  
  15.   FileInputFormat.addInputPath(job, new Path(otherArgs[0]));  
  16.   FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));  
  17.   System.exit(job.waitForCompletion(true) ? 0 : 1);  
  18. }  
  public static void main(String[] args) throws Exception {Configuration conf = new Configuration();String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length != 2) {System.err.println("Usage: wordcount <in> <out>");System.exit(2);}Job job = new Job(conf, "word count");job.setJarByClass(WordCount.class);job.setMapperClass(TokenizerMapper.class);job.setCombinerClass(IntSumReducer.class);job.setReducerClass(IntSumReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job, new Path(otherArgs[0]));FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}


运行结果,如下所示:

Java代码 复制代码  收藏代码
  1. a good student::    1  
  2. good student::  3  
  3. patient::   2  
  4. patient a:: 1  
a good student::	1
good student::	3
patient::	2
patient a::	1



下面,散仙来分析下排序作业的代码,如上图所示hadoop默认的排序,是基于key排序的,如果是字符类型的则基于字典表排序,如果是数值类型的则基于数字大小排序,两种方式都是按默认的升序排列的,如果想要降序输出,就需要我们自己写个排序组件了,散仙会在下面的代码给出例子,因为我们是要基于词频排序的,所以需要反转K,V来实现对词频的排序,map端代码如下:

Java代码 复制代码  收藏代码
  1. /** 
  2.          * 排序作业 
  3.          * map的实现 
  4.          *  
  5.          * **/  
  6.         @Override  
  7.         protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {  
  8.             String s[]=value.toString().split("::");//按两个冒号拆分每行数据  
  9.             word.set(s[0]);//  
  10.             one.set(Integer.parseInt(s[1].trim()));//  
  11.             context.write(one, word);//注意,此部分,需要反转K,V顺序  
  12.         }  
/*** 排序作业* map的实现* * **/@Overrideprotected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {String s[]=value.toString().split("::");//按两个冒号拆分每行数据word.set(s[0]);//one.set(Integer.parseInt(s[1].trim()));//context.write(one, word);//注意,此部分,需要反转K,V顺序}


reduce端代码如下:

Java代码 复制代码  收藏代码
  1. /*** 
  2.   *  
  3.   * 排序作业的 
  4.   * reduce代码 
  5.   * **/       
  6.         @Override  
  7.         protected void reduce(IntWritable arg0, Iterable<Text> arg1, Context arg2)  
  8.                 throws IOException, InterruptedException {  
  9.             for(Text t:arg1){  
  10.                 result.set(t.toString());  
  11.                  arg2.write(result, arg0);  
  12.             }  
  13.         }  
/**** * 排序作业的* reduce代码* **/		@Overrideprotected void reduce(IntWritable arg0, Iterable<Text> arg1, Context arg2)throws IOException, InterruptedException {for(Text t:arg1){result.set(t.toString());arg2.write(result, arg0);}}




下面,我们再来看下排序组件的代码:

Java代码 复制代码  收藏代码
  1. /*** 
  2.  * 按词频降序排序 
  3.  * 的类 
  4.  *  
  5.  * **/  
  6.     public static class DescSort extends  WritableComparator{  
  7.   
  8.          public DescSort() {  
  9.              super(IntWritable.class,true);//注册排序组件  
  10.         }  
  11.          @Override  
  12.         public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3,  
  13.                 int arg4, int arg5) {  
  14.             return -super.compare(arg0, arg1, arg2, arg3, arg4, arg5);//注意使用负号来完成降序  
  15.         }  
  16.            
  17.          @Override  
  18.         public int compare(Object a, Object b) {  
  19.        
  20.             return   -super.compare(a, b);//注意使用负号来完成降序  
  21.         }  
  22.           
  23.     }  
/**** 按词频降序排序* 的类* * **/public static class DescSort extends  WritableComparator{public DescSort() {super(IntWritable.class,true);//注册排序组件}@Overridepublic int compare(byte[] arg0, int arg1, int arg2, byte[] arg3,int arg4, int arg5) {return -super.compare(arg0, arg1, arg2, arg3, arg4, arg5);//注意使用负号来完成降序}@Overridepublic int compare(Object a, Object b) {return   -super.compare(a, b);//注意使用负号来完成降序}}



main方法里面的实现代码如下所示:

Java代码 复制代码  收藏代码
  1. public static void main(String[] args) throws Exception{  
  2.           
  3.           
  4.           
  5.           Configuration conf = new Configuration();  
  6.             String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();  
  7.             if (otherArgs.length != 2) {  
  8.                 System.err.println("Usage: wordcount <in> <out>");  
  9.                 System.exit(2);  
  10.               }  
  11.            Job job=new Job(conf, "sort");  
  12.            job.setOutputKeyClass(IntWritable.class);  
  13.            job.setOutputValueClass(Text.class);  
  14.            job.setMapperClass(SortIntValueMapper.class);  
  15.            job.setReducerClass(SortIntValueReducer.class)   ;  
  16.            job.setSortComparatorClass(DescSort.class);//加入排序组件  
  17.            job.setInputFormatClass(org.apache.hadoop.mapreduce.lib.input.TextInputFormat.class);  
  18.            job.setOutputFormatClass(TextOutputFormat.class);  
  19.            FileInputFormat.setInputPaths(job, new Path(args[0]));  
  20.            FileOutputFormat.setOutputPath(job, new Path(args[1]));  
  21.            
  22.            System.exit(job.waitForCompletion(true) ? 0 : 1);  
  23.     }  
public static void main(String[] args) throws Exception{Configuration conf = new Configuration();String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length != 2) {System.err.println("Usage: wordcount <in> <out>");System.exit(2);}Job job=new Job(conf, "sort");job.setOutputKeyClass(IntWritable.class);job.setOutputValueClass(Text.class);job.setMapperClass(SortIntValueMapper.class);job.setReducerClass(SortIntValueReducer.class)	;job.setSortComparatorClass(DescSort.class);//加入排序组件job.setInputFormatClass(org.apache.hadoop.mapreduce.lib.input.TextInputFormat.class);job.setOutputFormatClass(TextOutputFormat.class);FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}



输出结果,如下所示:

Java代码 复制代码  收藏代码
  1. good student    3  
  2. patient 2  
  3. a good student  1  
  4. patient a   1  
good student	3
patient	2
a good student	1
patient a	1



至此,我们可以成功实现,统计并排序的业务,当然这种类型的需求非常多而且常见,如对某个海量日志IP的分析,散仙上面的例子使用的只是测试的数据,而真实数据是对几亿或几十亿的短语构建语料库使用,配置集群方面,可以根据自己的需求,配置集群的节点个数以及map,reduce的个数,而代码,只需要我们写好,提交给hadoop集群执行即可。

最后在简单总结一下,数据处理过程中,格式是需要提前定制好的,也就是说你得很清楚的你的格式代表什么意思,另外一点,关于hadoop的中文编码问题,这个是内部固定的UTF-8格式,如果你是GBK的文件编码,则需要自己单独在map或reduce过程中处理一下,否则输出的结果可能是乱码,最好的方法就是统一成UTF-8格式,否则,很容易出现一些编码问题的。

这篇关于Hadoop确实是处理海量离线数据的利器的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/823555

相关文章

Redis的数据过期策略和数据淘汰策略

《Redis的数据过期策略和数据淘汰策略》本文主要介绍了Redis的数据过期策略和数据淘汰策略,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一... 目录一、数据过期策略1、惰性删除2、定期删除二、数据淘汰策略1、数据淘汰策略概念2、8种数据淘汰策略

轻松上手MYSQL之JSON函数实现高效数据查询与操作

《轻松上手MYSQL之JSON函数实现高效数据查询与操作》:本文主要介绍轻松上手MYSQL之JSON函数实现高效数据查询与操作的相关资料,MySQL提供了多个JSON函数,用于处理和查询JSON数... 目录一、jsON_EXTRACT 提取指定数据二、JSON_UNQUOTE 取消双引号三、JSON_KE

使用C++将处理后的信号保存为PNG和TIFF格式

《使用C++将处理后的信号保存为PNG和TIFF格式》在信号处理领域,我们常常需要将处理结果以图像的形式保存下来,方便后续分析和展示,C++提供了多种库来处理图像数据,本文将介绍如何使用stb_ima... 目录1. PNG格式保存使用stb_imagephp_write库1.1 安装和包含库1.2 代码解

Python给Excel写入数据的四种方法小结

《Python给Excel写入数据的四种方法小结》本文主要介绍了Python给Excel写入数据的四种方法小结,包含openpyxl库、xlsxwriter库、pandas库和win32com库,具有... 目录1. 使用 openpyxl 库2. 使用 xlsxwriter 库3. 使用 pandas 库

SpringBoot定制JSON响应数据的实现

《SpringBoot定制JSON响应数据的实现》本文主要介绍了SpringBoot定制JSON响应数据的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们... 目录前言一、如何使用@jsonView这个注解?二、应用场景三、实战案例注解方式编程方式总结 前言

使用Python在Excel中创建和取消数据分组

《使用Python在Excel中创建和取消数据分组》Excel中的分组是一种通过添加层级结构将相邻行或列组织在一起的功能,当分组完成后,用户可以通过折叠或展开数据组来简化数据视图,这篇博客将介绍如何使... 目录引言使用工具python在Excel中创建行和列分组Python在Excel中创建嵌套分组Pyt

在Rust中要用Struct和Enum组织数据的原因解析

《在Rust中要用Struct和Enum组织数据的原因解析》在Rust中,Struct和Enum是组织数据的核心工具,Struct用于将相关字段封装为单一实体,便于管理和扩展,Enum用于明确定义所有... 目录为什么在Rust中要用Struct和Enum组织数据?一、使用struct组织数据:将相关字段绑

在Mysql环境下对数据进行增删改查的操作方法

《在Mysql环境下对数据进行增删改查的操作方法》本文介绍了在MySQL环境下对数据进行增删改查的基本操作,包括插入数据、修改数据、删除数据、数据查询(基本查询、连接查询、聚合函数查询、子查询)等,并... 目录一、插入数据:二、修改数据:三、删除数据:1、delete from 表名;2、truncate

C#使用DeepSeek API实现自然语言处理,文本分类和情感分析

《C#使用DeepSeekAPI实现自然语言处理,文本分类和情感分析》在C#中使用DeepSeekAPI可以实现多种功能,例如自然语言处理、文本分类、情感分析等,本文主要为大家介绍了具体实现步骤,... 目录准备工作文本生成文本分类问答系统代码生成翻译功能文本摘要文本校对图像描述生成总结在C#中使用Deep

Java实现Elasticsearch查询当前索引全部数据的完整代码

《Java实现Elasticsearch查询当前索引全部数据的完整代码》:本文主要介绍如何在Java中实现查询Elasticsearch索引中指定条件下的全部数据,通过设置滚动查询参数(scrol... 目录需求背景通常情况Java 实现查询 Elasticsearch 全部数据写在最后需求背景通常情况下