【Hadoop】9.MapReduce框架原理-OutputFormat数据输出

2023-11-09 13:32

本文主要是介绍【Hadoop】9.MapReduce框架原理-OutputFormat数据输出,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

在前面,我们知道了多种输入模式,输出也一样。OutputFormat是MapReduce输出的基类,所有实现MapReduce输出都实现了OutputFormat接口。

OutputFormat 接口实现类
  1. 文本输出TextOutputFormat
    默认的输出格式是TextOutputFormat它把每条记录写为文本行。它的键和值可以是任意类型,因为TextOutputFormat调用toString() 方法把它们转换为字符串
  2. SequenceFileOutputFormat
    将SequenceFileOutputFormat输出作为后续MapReduce任务的输入,这便是一种好的输出格式,因为他的格式紧凑,很容易被压缩
  3. 自定义OutputFormat
    根据自己的需求,自定义实现。
自定义OutputFormat实现过程

步骤:

  1. 自定义一个类继承FileOutputFormat
  2. 改写RecordWriter,具体改写输出数据的方法write()

示例:
在这里插入图片描述

CustomOutputDriver

package com.xing.MapReduce.CustomOutputFormat;import com.xing.MapReduce.Flowsum.FlowBean;
import com.xing.MapReduce.Flowsum.FlowBeanMapper;
import com.xing.MapReduce.Flowsum.FlowDriver;
import com.xing.MapReduce.Flowsum.FlowReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class CustomOutputDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {System.setProperty("hadoop.home.dir", "E:\\hadoop-2.7.1");String in = "E:\\hdfs\\data\\customout\\input\\demo.txt";String out = "E:\\hdfs\\data\\customout\\out";Path inPath = new Path(in);Path outPath = new Path(out);Configuration configuration = new Configuration();FileSystem fs = FileSystem.get(configuration);if (fs.exists(outPath)) {if (fs.delete(outPath, true)){System.out.println("success delete outfile");}}Job job = Job.getInstance(configuration);job.setJobName("CustomOutput");job.setJarByClass(CustomOutputDriver.class);job.setMapperClass(CustomOutputMapper.class);job.setReducerClass(CustomOutputReduce.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NullWritable.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);// 设置自定义FileOutputFormat类job.setOutputFormatClass(CustomFileOutputFormat.class);FileInputFormat.setInputPaths(job, inPath);FileOutputFormat.setOutputPath(job, outPath);boolean rel = job.waitForCompletion(true);if (rel) {System.out.println("success");}}}

CustomOutputMapper

package com.xing.MapReduce.CustomOutputFormat;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class CustomOutputMapper extends Mapper<LongWritable,Text,Text,NullWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {context.write(value,NullWritable.get() );}
}

CustomOutputReduce

package com.xing.MapReduce.CustomOutputFormat;import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class CustomOutputReduce extends Reducer<Text,NullWritable,Text,NullWritable> {@Overrideprotected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {for (NullWritable value : values) {context.write(key,value );}}
}

CustomFileOutputFormat

package com.xing.MapReduce.CustomOutputFormat;import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class CustomFileOutputFormat extends FileOutputFormat<Text,NullWritable> {/***  返回自定义的Writer* @param context* @return* @throws IOException* @throws InterruptedException*/public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {return new CustomRecordWriter(context);}
}

CustomRecordWriter

package com.xing.MapReduce.CustomOutputFormat;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;import java.io.IOException;public class CustomRecordWriter extends RecordWriter<Text, NullWritable> {private FileSystem fs;private Configuration conf;private FSDataOutputStream fos1;private FSDataOutputStream fos2;CustomRecordWriter(){}CustomRecordWriter(TaskAttemptContext context) {// 初始化一些属性try {conf = context.getConfiguration();fs = FileSystem.get(conf);fos1 = fs.create(new Path("E:\\hdfs\\data\\customout\\output\\http.txt"));fos2 = fs.create(new Path("E:\\hdfs\\data\\customout\\output\\other.txt"));} catch (IOException e) {e.printStackTrace();}}/***  判断key值是否含有HTTP,有则输出到http.txt 其他都输出到other.txt文件* @param text* @param nullWritable* @throws IOException* @throws InterruptedException*/public void write(Text text, NullWritable nullWritable) throws IOException, InterruptedException {if (text.toString().toUpperCase().contains("HTTP")){System.out.println("yes");// \r\n 为windows的换行 fos1.write(text.toString().concat("\r\n").getBytes());}else {System.out.println("no");fos2.write(text.toString().concat("\r\n").getBytes());}}/***  关闭流* @param taskAttemptContext* @throws IOException* @throws InterruptedException*/public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {IOUtils.closeStream(fos1);IOUtils.closeStream(fos2);}
}

输出结果
在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

这篇关于【Hadoop】9.MapReduce框架原理-OutputFormat数据输出的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/376425

相关文章

Python将大量遥感数据的值缩放指定倍数的方法(推荐)

《Python将大量遥感数据的值缩放指定倍数的方法(推荐)》本文介绍基于Python中的gdal模块,批量读取大量多波段遥感影像文件,分别对各波段数据加以数值处理,并将所得处理后数据保存为新的遥感影像... 本文介绍基于python中的gdal模块,批量读取大量多波段遥感影像文件,分别对各波段数据加以数值处

使用MongoDB进行数据存储的操作流程

《使用MongoDB进行数据存储的操作流程》在现代应用开发中,数据存储是一个至关重要的部分,随着数据量的增大和复杂性的增加,传统的关系型数据库有时难以应对高并发和大数据量的处理需求,MongoDB作为... 目录什么是MongoDB?MongoDB的优势使用MongoDB进行数据存储1. 安装MongoDB

Python MySQL如何通过Binlog获取变更记录恢复数据

《PythonMySQL如何通过Binlog获取变更记录恢复数据》本文介绍了如何使用Python和pymysqlreplication库通过MySQL的二进制日志(Binlog)获取数据库的变更记录... 目录python mysql通过Binlog获取变更记录恢复数据1.安装pymysqlreplicat

Linux使用dd命令来复制和转换数据的操作方法

《Linux使用dd命令来复制和转换数据的操作方法》Linux中的dd命令是一个功能强大的数据复制和转换实用程序,它以较低级别运行,通常用于创建可启动的USB驱动器、克隆磁盘和生成随机数据等任务,本文... 目录简介功能和能力语法常用选项示例用法基础用法创建可启动www.chinasem.cn的 USB 驱动

Oracle数据库使用 listagg去重删除重复数据的方法汇总

《Oracle数据库使用listagg去重删除重复数据的方法汇总》文章介绍了在Oracle数据库中使用LISTAGG和XMLAGG函数进行字符串聚合并去重的方法,包括去重聚合、使用XML解析和CLO... 目录案例表第一种:使用wm_concat() + distinct去重聚合第二种:使用listagg,

Redis主从/哨兵机制原理分析

《Redis主从/哨兵机制原理分析》本文介绍了Redis的主从复制和哨兵机制,主从复制实现了数据的热备份和负载均衡,而哨兵机制可以监控Redis集群,实现自动故障转移,哨兵机制通过监控、下线、选举和故... 目录一、主从复制1.1 什么是主从复制1.2 主从复制的作用1.3 主从复制原理1.3.1 全量复制

C++中实现调试日志输出

《C++中实现调试日志输出》在C++编程中,调试日志对于定位问题和优化代码至关重要,本文将介绍几种常用的调试日志输出方法,并教你如何在日志中添加时间戳,希望对大家有所帮助... 目录1. 使用 #ifdef _DEBUG 宏2. 加入时间戳:精确到毫秒3.Windows 和 MFC 中的调试日志方法MFC

Python实现将实体类列表数据导出到Excel文件

《Python实现将实体类列表数据导出到Excel文件》在数据处理和报告生成中,将实体类的列表数据导出到Excel文件是一项常见任务,Python提供了多种库来实现这一目标,下面就来跟随小编一起学习一... 目录一、环境准备二、定义实体类三、创建实体类列表四、将实体类列表转换为DataFrame五、导出Da

Python实现数据清洗的18种方法

《Python实现数据清洗的18种方法》本文主要介绍了Python实现数据清洗的18种方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学... 目录1. 去除字符串两边空格2. 转换数据类型3. 大小写转换4. 移除列表中的重复元素5. 快速统

Python数据处理之导入导出Excel数据方式

《Python数据处理之导入导出Excel数据方式》Python是Excel数据处理的绝佳工具,通过Pandas和Openpyxl等库可以实现数据的导入、导出和自动化处理,从基础的数据读取和清洗到复杂... 目录python导入导出Excel数据开启数据之旅:为什么Python是Excel数据处理的最佳拍档