自定义InputFormat和OutputFormat案例

2023-12-14 06:18

本文主要是介绍自定义InputFormat和OutputFormat案例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、自定义InputFormat

  InputFormat是输入流,在前面的例子中使用的是文件输入输出流FileInputFormat和FileOutputFormat,而FileInputFormat和FileOutputFormat它们默认使用的是继承它们的子类TextInputFormat和TextOutputFormat,以Text的方式去读取数据。

  当我们遇到许多小文件,要将他们整理合成为一个文件SequenceFile(存储了多个小文件),且文件内的存储格式为:文件路径+文件内容,这时我们可以通过封装自定义的InputFormat输入流来实现需求。

  思路如下:

    1.自定义FuncFileInputFormat类继承FileInputFormat(参数类型为NullWritable和BytesWritable),并重写isSplitable和createRecordReader方法;

    2.isSplitable方法中return false即可表示不切割,createRecordReader方法中要返回一个RecordReader类,这是我们要自定义的对输入文件的业务逻辑,所以创建FuncRecordReader类;

    3.FuncRecordReader类继承RecordReader类,参数类型同为NullWritable和BytesWritable,重写initialize、nextKeyValue、getCurrentKey、getCurrentValue、getProcess、close方法;

    4.Mapper:初始化setup方法,通过context拿到切片、获取路径、将路径写入定义的全局变量Text t,然后在map阶段将t和value输出到reducer;

    5.Reducer:遍历values,输出key,value;

    6.Driver:在设置完Mapper和Reducer类后,添加设置setInputFormatClass为FuncFileInputFormat、设置setOutputFormatClass为SequenceFileOutputFormat。

  代码如下:

/*** @author: PrincessHug* @date: 2019/3/29, 20:49* @Blog: https://www.cnblogs.com/HelloBigTable/*/
public class FuncFileInputFormat extends FileInputFormat<NullWritable, BytesWritable> {@Overrideprotected boolean isSplitable(JobContext context, Path filename) {return false;}@Overridepublic RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {FuncRecordReader recordReader = new FuncRecordReader();return recordReader;}
}public class FuncRecordReader  extends RecordReader<NullWritable, BytesWritable> {boolean isProcess = false;FileSplit split;Configuration conf;BytesWritable value = new BytesWritable();//初始化@Overridepublic void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {//初始化切片文件this.split = (FileSplit) inputSplit;//初始化配置信息conf = taskAttemptContext.getConfiguration();}//获取下一个文件@Overridepublic boolean nextKeyValue() throws IOException, InterruptedException {if (!isProcess){//根据切片的长度来创建缓冲区byte[] buf = new byte[(int) split.getLength()];FSDataInputStream fis = null;FileSystem fs = null;try {//获取路径Path path = split.getPath();//根据路径获取文件系统fs = path.getFileSystem(conf);//拿到输入流fis = fs.open(path);//数据拷贝IOUtils.readFully(fis,buf,0,buf.length);//拷贝缓存到最终的输出value.set(buf,0,buf.length);} catch (IOException e) {e.printStackTrace();} finally {IOUtils.closeStream(fis);IOUtils.closeStream(fs);}isProcess = true;return true;}return false;}@Overridepublic NullWritable getCurrentKey() throws IOException, InterruptedException {return NullWritable.get();}@Overridepublic BytesWritable getCurrentValue() throws IOException, InterruptedException {return value;}@Overridepublic float getProgress() throws IOException, InterruptedException {return 0;}@Overridepublic void close() throws IOException {}
}public class SequencceFileMapper extends Mapper<NullWritable, BytesWritable, Text,BytesWritable> {Text t = new Text();@Overrideprotected void setup(Context context) throws IOException, InterruptedException {//拿到切片信息FileSplit split = (FileSplit) context.getInputSplit();//路径Path path = split.getPath();//即带路径有待名称t.set(path.toString());}@Overrideprotected void map(NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {context.write(t,value);}
}public class SequenceFileReducer extends Reducer<Text, BytesWritable,Text,BytesWritable> {@Overrideprotected void reduce(Text key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {for (BytesWritable v:values){context.write(key,v);}}
}public class SequenceFileDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {//1.获取job信息Configuration conf = new Configuration();Job job = Job.getInstance(conf);//2.获取Jar包job.setJarByClass(SequenceFileDriver.class);//3.获取Mapper、Redcuer类job.setMapperClass(SequencceFileMapper.class);job.setReducerClass(SequenceFileReducer.class);//4.设置自定义读取方法job.setInputFormatClass(FuncFileInputFormat.class);//5.设置默认的输出方式job.setOutputFormatClass(SequenceFileOutputFormat.class);//6.获取Mapper输出数据类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(BytesWritable.class);//7.获取Reducer输出数据类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(BytesWritable.class);//8.设置输入存在的路径与处理后的结果路径FileInputFormat.setInputPaths(job,new Path("G:\\mapreduce\\inputformat\\in"));FileOutputFormat.setOutputPath(job,new Path("G:\\mapreduce\\inputformat\\out"));//9.提交任务if (job.waitForCompletion(true)){System.out.println("运行完成!");}else {System.out.println("运行失败!");}}
}

 

  

二、自定义OutputFormat

  需求:目前我们有一个网站ip的文件,每行都有一个网站的ip地址,要求我们将含有“www.baidu.com”的ip地址取出放入一个结果文件,其他的地址放入另一个结果文件。

  思路:1.首先Mapper、Reduer就是简单的读取数据、写出数据;

    2.自定义FuncFileOutputFormat,重写它的getRecordWriter方法,返回一个FIleRecordWriter对象,这里我们再定义一个FileRecordWriter,重写FileRecordWriter、write、close方法;

    3.Driver:再设置Reducer输出后添加设置setOutputFormatClass为我们自定义的FuncFileOutputFormat即可;

  代码如下:

/*** @author: PrincessHug* @date: 2019/3/30, 14:44* @Blog: https://www.cnblogs.com/HelloBigTable/*/
public class FileMapper extends Mapper<LongWritable, Text, IntWritable, Text> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {context.write(new IntWritable(1),new value);}
}public class FileReducer extends Reducer<IntWritable, Text,Text,NullWritable> {@Overrideprotected void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {for (Text k:values){String s = k.toString() + "\n";context.write(new Text(s),NullWritable.get());}}
}public class FuncFileOutputFormat extends FileOutputFormat<Text, NullWritable> {@Overridepublic RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {return new FileRecordWriter(taskAttemptContext);}
}public class FileRecordWriter extends RecordWriter<Text, NullWritable> {Configuration conf = null;FSDataOutputStream baidulog = null;FSDataOutputStream otherlog = null;//定义数据输出路径public FileRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException {//获取配置信息和文件系统conf = taskAttemptContext.getConfiguration();FileSystem fs = FileSystem.get(conf);//定义输出路径itstarlog = fs.create(new Path("G:\\mapreduce\\outputformat\\out\\itstart\\baidu.logs"));otherlog = fs.create(new Path("G:\\mapreduce\\outputformat\\out\\other\\other.logs"));}//数据输出@Overridepublic void write(Text key, NullWritable value) throws IOException, InterruptedException {if (key.toString().contains("baidu")){baidulog.write(key.getBytes());}else {otherlog.write(key.getBytes());}}//关闭资源@Overridepublic void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {if (itstarlog != null){itstarlog.close();}if (otherlog != null){otherlog.close();}}
}public class FileDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {//配置、jobConfiguration conf = new Configuration();Job job = Job.getInstance(conf);//jar包job.setJarByClass(FileDriver.class);//Mapper、Reducerjob.setMapperClass(FileMapper.class);job.setReducerClass(FileReducer.class);//Mapper输出job.setMapOutputKeyClass(IntWritable.class);job.setMapOutputValueClass(Text.class);//Reudcer输出job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);//自定义输出类job.setOutputFormatClass(FuncFileOutputFormat.class);//文件输入输出流FileInputFormat.setInputPaths(job,new Path("G:\\mapreduce\\outputformat\\in"));FileOutputFormat.setOutputPath(job,new Path("G:\\mapreduce\\outputformat\\out"));//提交任务if (job.waitForCompletion(true)){System.out.println("运行完成!");}else {System.out.println("运行失败!");}}
}

  

 

转载于:https://www.cnblogs.com/HelloBigTable/p/10638866.html

这篇关于自定义InputFormat和OutputFormat案例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/491432

相关文章

SpringBoot 自定义消息转换器使用详解

《SpringBoot自定义消息转换器使用详解》本文详细介绍了SpringBoot消息转换器的知识,并通过案例操作演示了如何进行自定义消息转换器的定制开发和使用,感兴趣的朋友一起看看吧... 目录一、前言二、SpringBoot 内容协商介绍2.1 什么是内容协商2.2 内容协商机制深入理解2.2.1 内容

Hadoop企业开发案例调优场景

需求 (1)需求:从1G数据中,统计每个单词出现次数。服务器3台,每台配置4G内存,4核CPU,4线程。 (2)需求分析: 1G / 128m = 8个MapTask;1个ReduceTask;1个mrAppMaster 平均每个节点运行10个 / 3台 ≈ 3个任务(4    3    3) HDFS参数调优 (1)修改:hadoop-env.sh export HDFS_NAMENOD

【前端学习】AntV G6-08 深入图形与图形分组、自定义节点、节点动画(下)

【课程链接】 AntV G6:深入图形与图形分组、自定义节点、节点动画(下)_哔哩哔哩_bilibili 本章十吾老师讲解了一个复杂的自定义节点中,应该怎样去计算和绘制图形,如何给一个图形制作不间断的动画,以及在鼠标事件之后产生动画。(有点难,需要好好理解) <!DOCTYPE html><html><head><meta charset="UTF-8"><title>06

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置

深入探索协同过滤:从原理到推荐模块案例

文章目录 前言一、协同过滤1. 基于用户的协同过滤(UserCF)2. 基于物品的协同过滤(ItemCF)3. 相似度计算方法 二、相似度计算方法1. 欧氏距离2. 皮尔逊相关系数3. 杰卡德相似系数4. 余弦相似度 三、推荐模块案例1.基于文章的协同过滤推荐功能2.基于用户的协同过滤推荐功能 前言     在信息过载的时代,推荐系统成为连接用户与内容的桥梁。本文聚焦于

【区块链 + 人才服务】可信教育区块链治理系统 | FISCO BCOS应用案例

伴随着区块链技术的不断完善,其在教育信息化中的应用也在持续发展。利用区块链数据共识、不可篡改的特性, 将与教育相关的数据要素在区块链上进行存证确权,在确保数据可信的前提下,促进教育的公平、透明、开放,为教育教学质量提升赋能,实现教育数据的安全共享、高等教育体系的智慧治理。 可信教育区块链治理系统的顶层治理架构由教育部、高校、企业、学生等多方角色共同参与建设、维护,支撑教育资源共享、教学质量评估、

客户案例:安全海外中继助力知名家电企业化解海外通邮困境

1、客户背景 广东格兰仕集团有限公司(以下简称“格兰仕”),成立于1978年,是中国家电行业的领军企业之一。作为全球最大的微波炉生产基地,格兰仕拥有多项国际领先的家电制造技术,连续多年位列中国家电出口前列。格兰仕不仅注重业务的全球拓展,更重视业务流程的高效与顺畅,以确保在国际舞台上的竞争力。 2、需求痛点 随着格兰仕全球化战略的深入实施,其海外业务快速增长,电子邮件成为了关键的沟通工具。

【区块链 + 人才服务】区块链集成开发平台 | FISCO BCOS应用案例

随着区块链技术的快速发展,越来越多的企业开始将其应用于实际业务中。然而,区块链技术的专业性使得其集成开发成为一项挑战。针对此,广东中创智慧科技有限公司基于国产开源联盟链 FISCO BCOS 推出了区块链集成开发平台。该平台基于区块链技术,提供一套全面的区块链开发工具和开发环境,支持开发者快速开发和部署区块链应用。此外,该平台还可以提供一套全面的区块链开发教程和文档,帮助开发者快速上手区块链开发。

自定义类型:结构体(续)

目录 一. 结构体的内存对齐 1.1 为什么存在内存对齐? 1.2 修改默认对齐数 二. 结构体传参 三. 结构体实现位段 一. 结构体的内存对齐 在前面的文章里我们已经讲过一部分的内存对齐的知识,并举出了两个例子,我们再举出两个例子继续说明: struct S3{double a;int b;char c;};int mian(){printf("%zd\n",s

Spring 源码解读:自定义实现Bean定义的注册与解析

引言 在Spring框架中,Bean的注册与解析是整个依赖注入流程的核心步骤。通过Bean定义,Spring容器知道如何创建、配置和管理每个Bean实例。本篇文章将通过实现一个简化版的Bean定义注册与解析机制,帮助你理解Spring框架背后的设计逻辑。我们还将对比Spring中的BeanDefinition和BeanDefinitionRegistry,以全面掌握Bean注册和解析的核心原理。