Hadoop3:MapReduce中Reduce阶段自定义OutputFormat逻辑

2024-06-22 03:52

本文主要是介绍Hadoop3:MapReduce中Reduce阶段自定义OutputFormat逻辑,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、情景描述

我们知道,在MapTask阶段开始时,需要InputFormat来读取数据
而在ReduceTask阶段结束时,将处理完成的数据,输出到磁盘,此时就要用到OutputFormat

在之前的程序中,我们都没有设置过这部分配置
所以,采用的是默认输出格式:TextOutputFormat

在实际工作中,我们的输出不一定是到磁盘,可能是输出到MySQL、HBase

那么,如何实现自定义的OutputFormat
在这里插入图片描述

二、案例

1、源数据

http://www.baidu.com
http://www.google.com
http://cn.bing.com
http://www.atguigu.com
http://www.sohu.com
http://www.baidu.com
http://www.sina.com
http://www.sin2a.com
http://www.baidu.com
http://www.sin2desa.com
http://www.sindsafa.com

2、需求分析

过滤输入的log日志,包含atguigu的网站输出到e:/atguigu.log,不包含atguigu的网站输出到e:/other.log

3、代码实现

LogMapper.java

package com.atguigu.mapreduce.outputformat;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class LogMapper extends Mapper<LongWritable, Text,Text, NullWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// http://www.baidu.com//http://www.google.com// (http://www.google.com, NullWritable)// 不做任何处理context.write(value, NullWritable.get());}
}

LogReducer.java

package com.atguigu.mapreduce.outputformat;import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class LogReducer extends Reducer<Text, NullWritable, Text, NullWritable> {@Overrideprotected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {// http://www.baidu.com// http://www.baidu.com// 防止有相同数据,丢数据for (NullWritable value : values) {context.write(key, NullWritable.get());}}
}

LogRecordWriter.java

package com.atguigu.mapreduce.outputformat;import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;import java.io.IOException;public class LogRecordWriter extends RecordWriter<Text, NullWritable> {private  FSDataOutputStream atguiguOut;private  FSDataOutputStream otherOut;public LogRecordWriter(TaskAttemptContext job) {// 创建两条流try {FileSystem fs = FileSystem.get(job.getConfiguration());atguiguOut = fs.create(new Path("D:\\hadoop\\atguigu.log"));otherOut = fs.create(new Path("D:\\hadoop\\other.log"));} catch (IOException e) {e.printStackTrace();}}@Overridepublic void write(Text key, NullWritable value) throws IOException, InterruptedException {String log = key.toString();// 具体写if (log.contains("atguigu")){atguiguOut.writeBytes(log+"\n");}else {otherOut.writeBytes(log+"\n");}}@Overridepublic void close(TaskAttemptContext context) throws IOException, InterruptedException {// 关流IOUtils.closeStream(atguiguOut);IOUtils.closeStream(otherOut);}
}

LogOutputFormat.java

package com.atguigu.mapreduce.outputformat;import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class LogOutputFormat extends FileOutputFormat<Text, NullWritable> {@Overridepublic RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {LogRecordWriter lrw = new LogRecordWriter(job);return lrw;}
}

LogDriver.java

package com.atguigu.mapreduce.outputformat;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class LogDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(LogDriver.class);job.setMapperClass(LogMapper.class);job.setReducerClass(LogReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NullWritable.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);//设置自定义的outputformatjob.setOutputFormatClass(LogOutputFormat.class);FileInputFormat.setInputPaths(job, new Path("D:\\input\\inputoutputformat"));//虽然我们自定义了outputformat,但是因为我们的outputformat继承自fileoutputformat//而fileoutputformat要输出一个_SUCCESS文件,所以在这还得指定一个输出目录FileOutputFormat.setOutputPath(job, new Path("D:\\hadoop\\output1111"));boolean b = job.waitForCompletion(true);System.exit(b ? 0 : 1);}
}

3、测试

在这里插入图片描述
在这里插入图片描述

三、总结

关键文件:
LogRecordWriter.java
LogOutputFormat.java
LogDriver.java

        //设置自定义的outputformatjob.setOutputFormatClass(LogOutputFormat.class);

这篇关于Hadoop3:MapReduce中Reduce阶段自定义OutputFormat逻辑的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1083214

相关文章

ROS话题通信流程自定义数据格式

ROS话题通信流程自定义数据格式 需求流程实现步骤定义msg文件编辑配置文件编译 在 ROS 通信协议中,数据载体是一个较为重要组成部分,ROS 中通过 std_msgs 封装了一些原生的数据类型,比如:String、Int32、Int64、Char、Bool、Empty… 但是,这些数据一般只包含一个 data 字段,结构的单一意味着功能上的局限性,当传输一些复杂的数据,比如:

添加自定义的CALayer

iOS开发UI篇—CAlayer(创建图层) 一、添加一个图层 添加图层的步骤: 1.创建layer 2.设置layer的属性(设置了颜色,bounds才能显示出来) 3.将layer添加到界面上(控制器view的layer上)  1 // 2 // YYViewController.m 3 // 01-创建一个简单的图层 4 // 5 //

android自定义View的和FramgentActivity的一个小坑

对于自定义View //加载样式TypedArray typedArray = context.obtainStyledAttributes(attrs, R.styleable.TitleBarView, defStyleAttr, 0);setTitle(typedArray.getString(R.styleable.TitleBarView_main_title));//不能写成

大型网站架构演化(一)——初始阶段的网站架构

大型网站的技术挑战主要来自于庞大的用户,高并发的访问和海量的数据,任何简单的业务一旦需要处理数以P计的数据和面对数以亿计的用户,问题就会变得很棘手。大型网站架构主要是解决这类问题。         大型网站都是从小型网站发展而来,网站架构也是一样,是从小型网站架构逐步演化而来。小型网站最开始时没有太多人访问,只需要一台服务器就绰绰有余,这时的网站架构如图所示。

第三十七章 添加和使用自定义标题元素 - 自定义标头的继承

文章目录 第三十七章 添加和使用自定义标题元素 - 自定义标头的继承自定义标头的继承示例 在 `SOAPHEADERS` 参数中指定支持的标头元素自定义标头的继承 第三十七章 添加和使用自定义标题元素 - 自定义标头的继承 自定义标头的继承 如果创建此Web 服务的子类,该子类将继承不特定于方法的标头信息 — 包含在 <request> 或 <response> 元素中的标头信

女人的逻辑

段子一: “今晚妹子无意间在我电脑的某个文件夹内发现了前女友的照片,死活要和我分手。原因是觉得我前女友巨丑,进而怀疑我眼光,为了避嫌,她决定分手。” 段子二: “一妹子的钥匙弄不见了就去配钥匙,她问师傅:“可以配钥匙吗?”师傅说:“可以可以。”然后师傅看着她,她也看着师傅。良久,师傅忍不住问她:“钥匙呢?”那妹子说:“我有钥匙干嘛还找你

爬虫阶段思考

内容:写这篇文章是因为最近帮同学改了很多的爬虫代码,感触良多。 我用豆瓣为例,并不是不会用别的,而是这个我个人感觉最经典。然后还会写我遇到的一些问题以及解决方法。 首先,我们得先知道怎样爬取。我用的scrapy框架爬取。 我对此图的理解就是: 从spiders中获得一个请求(REQUEST),通过引擎传递给调度器,之后再返回给引擎,引擎把url封装好后传递给下载器,下载器将资源下载好后

自定义recyclerView实现时光轴效果

时光轴效果在很多app上都有出现,例如淘宝中快递的跟踪,本文将使用recyclerView实现时光轴效果,我们会到自定义控件,首先先看一下效果图: 接下来是步骤分析 1自定义属性 这个大家应该都了解了,根据我们之前的分析,直接在attrs.xml中进行声明 <declare-styleable name="TimeLine"><attr name="beginLine" f

Android自定义系列——9.Path详细用法

rXxx方法 rXxx方法的坐标使用的是相对位置(基于当前点的位移),而之前方法的坐标是绝对位置(基于当前坐标系的坐标)。 Path path = new Path();path.moveTo(100,100);path.lineTo(100,200);canvas.drawPath(path,mDeafultPaint); 在这个例子中,先移动点到坐标(100,100)处,之后再连接

Android自定义系列——8.Path之贝塞尔曲线

贝塞尔曲线能干什么 贝塞尔曲线作用十分广泛,简单举几个的栗子: QQ小红点拖拽效果一些炫酷的下拉刷新控件阅读软件的翻书效果一些平滑的折线图的制作很多炫酷的动画效果 理解贝塞尔曲线的原理 一阶曲线原理: 一阶曲线是没有控制点的,仅有两个数据点(A 和 B),最终动态过程如下: (本文中贝塞尔曲线相关的动态演示图片来自维基百科)。一阶曲线其实就是前面讲解过的lineTo。 二阶曲线