Hadoop案例(五)过滤日志及自定义日志输出路径(自定义OutputFormat)

2023-11-01 17:40

本文主要是介绍Hadoop案例(五)过滤日志及自定义日志输出路径(自定义OutputFormat),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

过滤日志自定义日志输出路径(自定义OutputFormat

1.需求分析

过滤输入log日志是否包含xyg

1)包含xyg的网站输出e:/xyg.log

2)不包含xyg的网站输出到e:/other.log

2.数据准备

http://www.baidu.com
http://www.google.com
http://cn.bing.com
http://www.xyg.com
http://www.sohu.com
http://www.sina.com
http://www.sin2a.com
http://www.sin2desa.com
http://www.sindsafa.com
log.txt

输出预期:

http://www.xyg.com
xyg.txt
http://cn.bing.com
http://www.baidu.com
http://www.google.com
http://www.sin2a.com
http://www.sin2desa.com
http://www.sina.com
http://www.sindsafa.com
http://www.sohu.com
other.txt

3.代码实现

(1)自定义一个outputformat

package com.xyg.mapreduce.outputformat;
import java.io.IOException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class FilterOutputFormat extends FileOutputFormat<Text, NullWritable>{@Overridepublic RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {// 创建一个RecordWriterreturn new FilterRecordWriter(job);} }

(2)具体的写数据RecordWriter

package com.xyg.mapreduce.outputformat;
import java.io.IOException; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext;
public class FilterRecordWriter extends RecordWriter<Text, NullWritable> {FSDataOutputStream atguiguOut = null;FSDataOutputStream otherOut = null;public FilterRecordWriter(TaskAttemptContext job) {// 1 获取文件系统 FileSystem fs;try {fs = FileSystem.get(job.getConfiguration());// 2 创建输出文件路径Path atguiguPath = new Path("e:/xyg.log");Path otherPath = new Path("e:/other.log");// 3 创建输出流atguiguOut = fs.create(atguiguPath);otherOut = fs.create(otherPath);} catch (IOException e) {e.printStackTrace();}}@Overridepublic void write(Text key, NullWritable value) throws IOException, InterruptedException {// 判断是否包含“xyg”输出到不同文件if (key.toString().contains("xyg")) {atguiguOut.write(key.toString().getBytes());} else {otherOut.write(key.toString().getBytes());}}@Overridepublic void close(TaskAttemptContext context) throws IOException, InterruptedException {// 关闭资源if (atguiguOut != null) {atguiguOut.close();}if (otherOut != null) {otherOut.close();}} }

(3)编写FilterMapper

package com.xyg.mapreduce.outputformat;
import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;
public class FilterMapper extends Mapper<LongWritable, Text, Text, NullWritable>{Text k = new Text();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 获取一行String line = value.toString();k.set(line);// 3 写出context.write(k, NullWritable.get());} }

(4)编写FilterReducer

package com.xyg.mapreduce.outputformat;
import java.io.IOException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;
public class FilterReducer extends Reducer<Text, NullWritable, Text, NullWritable> {@Overrideprotected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {String k = key.toString();k = k + "\r\n";context.write(new Text(k), NullWritable.get());} }

(5)编写FilterDriver

package com.xyg.mapreduce.outputformat;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class FilterDriver {public static void main(String[] args) throws Exception {args = new String[] { "e:/inputoutputformat", "e:/output2" };Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(FilterDriver.class);job.setMapperClass(FilterMapper.class);job.setReducerClass(FilterReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NullWritable.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);// 要将自定义的输出格式组件设置到job中job.setOutputFormatClass(FilterOutputFormat.class);FileInputFormat.setInputPaths(job, new Path(args[0]));// 虽然我们自定义了outputformat,但是因为我们的outputformat继承自fileoutputformat// 而fileoutputformat要输出一个_SUCCESS文件,所以,在这还得指定一个输出目录FileOutputFormat.setOutputPath(job, new Path(args[1]));boolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);} }

 

转载于:https://www.cnblogs.com/frankdeng/p/9256215.html

这篇关于Hadoop案例(五)过滤日志及自定义日志输出路径(自定义OutputFormat)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/324645

相关文章

SpringBoot+EasyExcel实现自定义复杂样式导入导出

《SpringBoot+EasyExcel实现自定义复杂样式导入导出》这篇文章主要为大家详细介绍了SpringBoot如何结果EasyExcel实现自定义复杂样式导入导出功能,文中的示例代码讲解详细,... 目录安装处理自定义导出复杂场景1、列不固定,动态列2、动态下拉3、自定义锁定行/列,添加密码4、合并

Python通用唯一标识符模块uuid使用案例详解

《Python通用唯一标识符模块uuid使用案例详解》Pythonuuid模块用于生成128位全局唯一标识符,支持UUID1-5版本,适用于分布式系统、数据库主键等场景,需注意隐私、碰撞概率及存储优... 目录简介核心功能1. UUID版本2. UUID属性3. 命名空间使用场景1. 生成唯一标识符2. 数

C++ Log4cpp跨平台日志库的使用小结

《C++Log4cpp跨平台日志库的使用小结》Log4cpp是c++类库,本文详细介绍了C++日志库log4cpp的使用方法,及设置日志输出格式和优先级,具有一定的参考价值,感兴趣的可以了解一下... 目录一、介绍1. log4cpp的日志方式2.设置日志输出的格式3. 设置日志的输出优先级二、Window

在Linux中改变echo输出颜色的实现方法

《在Linux中改变echo输出颜色的实现方法》在Linux系统的命令行环境下,为了使输出信息更加清晰、突出,便于用户快速识别和区分不同类型的信息,常常需要改变echo命令的输出颜色,所以本文给大家介... 目python录在linux中改变echo输出颜色的方法技术背景实现步骤使用ANSI转义码使用tpu

PostgreSQL的扩展dict_int应用案例解析

《PostgreSQL的扩展dict_int应用案例解析》dict_int扩展为PostgreSQL提供了专业的整数文本处理能力,特别适合需要精确处理数字内容的搜索场景,本文给大家介绍PostgreS... 目录PostgreSQL的扩展dict_int一、扩展概述二、核心功能三、安装与启用四、字典配置方法

怎样通过分析GC日志来定位Java进程的内存问题

《怎样通过分析GC日志来定位Java进程的内存问题》:本文主要介绍怎样通过分析GC日志来定位Java进程的内存问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、GC 日志基础配置1. 启用详细 GC 日志2. 不同收集器的日志格式二、关键指标与分析维度1.

解读GC日志中的各项指标用法

《解读GC日志中的各项指标用法》:本文主要介绍GC日志中的各项指标用法,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、基础 GC 日志格式(以 G1 为例)1. Minor GC 日志2. Full GC 日志二、关键指标解析1. GC 类型与触发原因2. 堆

Python中re模块结合正则表达式的实际应用案例

《Python中re模块结合正则表达式的实际应用案例》Python中的re模块是用于处理正则表达式的强大工具,正则表达式是一种用来匹配字符串的模式,它可以在文本中搜索和匹配特定的字符串模式,这篇文章主... 目录前言re模块常用函数一、查看文本中是否包含 A 或 B 字符串二、替换多个关键词为统一格式三、提

Python get()函数用法案例详解

《Pythonget()函数用法案例详解》在Python中,get()是字典(dict)类型的内置方法,用于安全地获取字典中指定键对应的值,它的核心作用是避免因访问不存在的键而引发KeyError错... 目录简介基本语法一、用法二、案例:安全访问未知键三、案例:配置参数默认值简介python是一种高级编

MySQL中的索引结构和分类实战案例详解

《MySQL中的索引结构和分类实战案例详解》本文详解MySQL索引结构与分类,涵盖B树、B+树、哈希及全文索引,分析其原理与优劣势,并结合实战案例探讨创建、管理及优化技巧,助力提升查询性能,感兴趣的朋... 目录一、索引概述1.1 索引的定义与作用1.2 索引的基本原理二、索引结构详解2.1 B树索引2.2