Hadoop案例(五)过滤日志及自定义日志输出路径(自定义OutputFormat)

2023-11-01 17:40

本文主要是介绍Hadoop案例(五)过滤日志及自定义日志输出路径(自定义OutputFormat),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

过滤日志自定义日志输出路径(自定义OutputFormat

1.需求分析

过滤输入log日志是否包含xyg

1)包含xyg的网站输出e:/xyg.log

2)不包含xyg的网站输出到e:/other.log

2.数据准备

http://www.baidu.com
http://www.google.com
http://cn.bing.com
http://www.xyg.com
http://www.sohu.com
http://www.sina.com
http://www.sin2a.com
http://www.sin2desa.com
http://www.sindsafa.com
log.txt

输出预期:

http://www.xyg.com
xyg.txt
http://cn.bing.com
http://www.baidu.com
http://www.google.com
http://www.sin2a.com
http://www.sin2desa.com
http://www.sina.com
http://www.sindsafa.com
http://www.sohu.com
other.txt

3.代码实现

(1)自定义一个outputformat

package com.xyg.mapreduce.outputformat;
import java.io.IOException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class FilterOutputFormat extends FileOutputFormat<Text, NullWritable>{@Overridepublic RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {// 创建一个RecordWriterreturn new FilterRecordWriter(job);} }

(2)具体的写数据RecordWriter

package com.xyg.mapreduce.outputformat;
import java.io.IOException; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext;
public class FilterRecordWriter extends RecordWriter<Text, NullWritable> {FSDataOutputStream atguiguOut = null;FSDataOutputStream otherOut = null;public FilterRecordWriter(TaskAttemptContext job) {// 1 获取文件系统 FileSystem fs;try {fs = FileSystem.get(job.getConfiguration());// 2 创建输出文件路径Path atguiguPath = new Path("e:/xyg.log");Path otherPath = new Path("e:/other.log");// 3 创建输出流atguiguOut = fs.create(atguiguPath);otherOut = fs.create(otherPath);} catch (IOException e) {e.printStackTrace();}}@Overridepublic void write(Text key, NullWritable value) throws IOException, InterruptedException {// 判断是否包含“xyg”输出到不同文件if (key.toString().contains("xyg")) {atguiguOut.write(key.toString().getBytes());} else {otherOut.write(key.toString().getBytes());}}@Overridepublic void close(TaskAttemptContext context) throws IOException, InterruptedException {// 关闭资源if (atguiguOut != null) {atguiguOut.close();}if (otherOut != null) {otherOut.close();}} }

(3)编写FilterMapper

package com.xyg.mapreduce.outputformat;
import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;
public class FilterMapper extends Mapper<LongWritable, Text, Text, NullWritable>{Text k = new Text();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 获取一行String line = value.toString();k.set(line);// 3 写出context.write(k, NullWritable.get());} }

(4)编写FilterReducer

package com.xyg.mapreduce.outputformat;
import java.io.IOException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;
public class FilterReducer extends Reducer<Text, NullWritable, Text, NullWritable> {@Overrideprotected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {String k = key.toString();k = k + "\r\n";context.write(new Text(k), NullWritable.get());} }

(5)编写FilterDriver

package com.xyg.mapreduce.outputformat;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class FilterDriver {public static void main(String[] args) throws Exception {args = new String[] { "e:/inputoutputformat", "e:/output2" };Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(FilterDriver.class);job.setMapperClass(FilterMapper.class);job.setReducerClass(FilterReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NullWritable.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);// 要将自定义的输出格式组件设置到job中job.setOutputFormatClass(FilterOutputFormat.class);FileInputFormat.setInputPaths(job, new Path(args[0]));// 虽然我们自定义了outputformat,但是因为我们的outputformat继承自fileoutputformat// 而fileoutputformat要输出一个_SUCCESS文件,所以,在这还得指定一个输出目录FileOutputFormat.setOutputPath(job, new Path(args[1]));boolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);} }

 

转载于:https://www.cnblogs.com/frankdeng/p/9256215.html

这篇关于Hadoop案例(五)过滤日志及自定义日志输出路径(自定义OutputFormat)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/324645

相关文章

Golang操作DuckDB实战案例分享

《Golang操作DuckDB实战案例分享》DuckDB是一个嵌入式SQL数据库引擎,它与众所周知的SQLite非常相似,但它是为olap风格的工作负载设计的,DuckDB支持各种数据类型和SQL特性... 目录DuckDB的主要优点环境准备初始化表和数据查询单行或多行错误处理和事务完整代码最后总结Duck

C++中实现调试日志输出

《C++中实现调试日志输出》在C++编程中,调试日志对于定位问题和优化代码至关重要,本文将介绍几种常用的调试日志输出方法,并教你如何在日志中添加时间戳,希望对大家有所帮助... 目录1. 使用 #ifdef _DEBUG 宏2. 加入时间戳:精确到毫秒3.Windows 和 MFC 中的调试日志方法MFC

SpringBoot如何使用TraceId日志链路追踪

《SpringBoot如何使用TraceId日志链路追踪》文章介绍了如何使用TraceId进行日志链路追踪,通过在日志中添加TraceId关键字,可以将同一次业务调用链上的日志串起来,本文通过实例代码... 目录项目场景:实现步骤1、pom.XML 依赖2、整合logback,打印日志,logback-sp

Python使用Colorama库美化终端输出的操作示例

《Python使用Colorama库美化终端输出的操作示例》在开发命令行工具或调试程序时,我们可能会希望通过颜色来区分重要信息,比如警告、错误、提示等,而Colorama是一个简单易用的Python库... 目录python Colorama 库详解:终端输出美化的神器1. Colorama 是什么?2.

MySQL不使用子查询的原因及优化案例

《MySQL不使用子查询的原因及优化案例》对于mysql,不推荐使用子查询,效率太差,执行子查询时,MYSQL需要创建临时表,查询完毕后再删除这些临时表,所以,子查询的速度会受到一定的影响,本文给大家... 目录不推荐使用子查询和JOIN的原因解决方案优化案例案例1:查询所有有库存的商品信息案例2:使用EX

python获取当前文件和目录路径的方法详解

《python获取当前文件和目录路径的方法详解》:本文主要介绍Python中获取当前文件路径和目录的方法,包括使用__file__关键字、os.path.abspath、os.path.realp... 目录1、获取当前文件路径2、获取当前文件所在目录3、os.path.abspath和os.path.re

SpringBoot 自定义消息转换器使用详解

《SpringBoot自定义消息转换器使用详解》本文详细介绍了SpringBoot消息转换器的知识,并通过案例操作演示了如何进行自定义消息转换器的定制开发和使用,感兴趣的朋友一起看看吧... 目录一、前言二、SpringBoot 内容协商介绍2.1 什么是内容协商2.2 内容协商机制深入理解2.2.1 内容

Mybatis拦截器如何实现数据权限过滤

《Mybatis拦截器如何实现数据权限过滤》本文介绍了MyBatis拦截器的使用,通过实现Interceptor接口对SQL进行处理,实现数据权限过滤功能,通过在本地线程变量中存储数据权限相关信息,并... 目录背景基础知识MyBATis 拦截器介绍代码实战总结背景现在的项目负责人去年年底离职,导致前期规

Hadoop企业开发案例调优场景

需求 (1)需求:从1G数据中,统计每个单词出现次数。服务器3台,每台配置4G内存,4核CPU,4线程。 (2)需求分析: 1G / 128m = 8个MapTask;1个ReduceTask;1个mrAppMaster 平均每个节点运行10个 / 3台 ≈ 3个任务(4    3    3) HDFS参数调优 (1)修改:hadoop-env.sh export HDFS_NAMENOD

Hadoop集群数据均衡之磁盘间数据均衡

生产环境,由于硬盘空间不足,往往需要增加一块硬盘。刚加载的硬盘没有数据时,可以执行磁盘数据均衡命令。(Hadoop3.x新特性) plan后面带的节点的名字必须是已经存在的,并且是需要均衡的节点。 如果节点不存在,会报如下错误: 如果节点只有一个硬盘的话,不会创建均衡计划: (1)生成均衡计划 hdfs diskbalancer -plan hadoop102 (2)执行均衡计划 hd