CC00047.hadoop——|HadoopMapReduce.V20|——|Hadoop.v20|MapReduce综合案例.v01|

2023-11-08 18:10

本文主要是介绍CC00047.hadoop——|HadoopMapReduce.V20|——|Hadoop.v20|MapReduce综合案例.v01|,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、MapReduce综合案例:MR综合案例
### --- 需求~~~     现在有一些订单的评论数据,需求,将订单按照好评与差评区分开来,
~~~     将数据输出到不同的文件目录下,数据内容如下,其中数据第九个字段表示好评,
~~~     中评,差评。0:好评,1:中评,2:差评。
~~~     现需要根据好评,中评,差评把数据分类并输出到不同的目录中,并且要求按照时间顺序降序排列。
~~~     # 备注:现在有大量类似上面的小文件!
300 东西很不错,物流也很快 \N 1 106 131******33 0 2019-02-06 19:10:13
301 还行,洗完有点干,不知道怎么回事 \N 1 106 136******44 0 2019-03-2214:16:41
302 还可以吧,保质期短,感觉貌似更天然些 \N 1 106 134******34 0 2019-04-1013:40:06
303 还可以吧,保质期短,感觉貌似更天然些 \N 1 105 134******33 0 2019-01-1514:40:21
304 还没用,,不知道效果怎么样 \N 1 105 137******66 0 2019-02-28 18:55:43
305 刚收到,还没用,用后再追评!不过,听朋友说好用,才买的! \N 1 105 138******600 2019-03-13 19:10:09
306 一般,感觉用着不是很好,可能我头发太干了 \N 1 105 132******44 0 2019-04-09 10:35:49
307 非常好用,之前买了10支,这次又买了10支,不错,会继续支持! \N 1 103 131******330 2019-01-15 13:10:46
308 喜欢茶树油的 \N 1 103 135******33 0 2019-02-08 14:35:09
309 好像比其他的强一些,继续使用中 \N 1 103 133******99 0 2019-03-1419:55:36
310 感觉洗后头发很干净,头皮有一定改善。 \N 1 103 138******44 0 2019-04-0922:55:59
311 从出生到现在一直都是惠氏 现在宝宝两周半了 \N 1 157 那***情 0 2017-12-01 06:05:30
312 口感不错,孩子很喜欢。推荐。 \N 1 157 w***4 0 2017-12-12 08:35:06
313 价格优惠,日期新鲜,包装完好!发货速度快,非常喜欢!还有赠品! \N 1 157 j***00 2019-01-09 22:55:41
二、分析
### --- 分析~~~     自定义InputFormat合并小文件
~~~     自定义分区根据评论等级把数据分区
~~~     自定义OutputFormat把数据输出到多个目录
三、开发步骤
### --- 合并小文件
~~~     创建项目:comment.step1
~~~     Mapperpackage com.yanqi.mr.comment.step1;import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;//text:代表的是一个文件的path+名称,BytesWritable:一个文件的内容
public class MergeMapper extends Mapper<Text, BytesWritable, Text, BytesWritable> {@Overrideprotected void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException {context.write(key, value);}
}
### --- 自定义InputFormat
### --- MergeInputFormatpackage com.yanqi.mr.comment.step1;
//自定义inputformat读取多个小文件合并为一个SequenceFile文件//SequenceFile文件中以kv形式存储文件,key--》文件路径+文件名称,value-->文件的整个内容import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import java.io.IOException;
import java.util.List;//TextInputFormat中泛型是LongWritable:文本的偏移量, Text:一行文本内容;指明当前inputformat的输出数据类型
//自定义inputformat:key-->文件路径+名称,value-->整个文件内容
public class MergeInputFormat extends FileInputFormat<Text, BytesWritable> {//重写是否可切分@Overrideprotected boolean isSplitable(JobContext context, Path filename) {//对于当前需求,不需要把文件切分,保证一个切片就是一个文件return false;}@Overridepublic List<InputSplit> getSplits(JobContext job) throws IOException {//分片逻辑依然是原始的分片逻辑,一个文件一个maptask,jvm重用优化,uber模式,小文件任务优化?return super.getSplits(job);}//recordReader就是用来读取数据的对象@Overridepublic RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {MergeRecordReader recordReader = new MergeRecordReader();//调用recordReader的初始化方法recordReader.initialize(split, context);return recordReader;}
}
### --- MergeRecordReaderpackage com.yanqi.mr.comment.step1;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;import java.io.IOException;//负责读取数据,一次读取整个文件内容,封装成kv输出
public class MergeRecordReader extends RecordReader<Text, BytesWritable> {private FileSplit split;//hadoop配置文件对象private Configuration conf;//定义key,value的成员变量private Text key = new Text();private BytesWritable value = new BytesWritable();//初始化方法,把切片以及上下文提升为全局@Overridepublic void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {this.split = (FileSplit) split;conf = context.getConfiguration();}private Boolean flag = true;//用来读取数据的方法@Overridepublic boolean nextKeyValue() throws IOException, InterruptedException {//对于当前split来说只需要读取一次即可,因为一次就把整个文件全部读取了。if (flag) {//准备一个数组存放读取到的数据,数据大小是多少?byte[] content = new byte[(int) split.getLength()];final Path path = split.getPath();//获取切片的path信息final FileSystem fs = path.getFileSystem(conf);//获取到文件系统对象final FSDataInputStream fis = fs.open(path); //获取到输入流IOUtils.readFully(fis, content, 0, content.length); //读取数据并把数据放入byte[]//封装key和valuekey.set(path.toString());value.set(content, 0, content.length);IOUtils.closeStream(fis);//把再次读取的开关置为falseflag = false;return true;}return false;}//获取到key@Overridepublic Text getCurrentKey() throws IOException, InterruptedException {return key;}//获取到value@Overridepublic BytesWritable getCurrentValue() throws IOException, InterruptedException {return value;}//获取进度@Overridepublic float getProgress() throws IOException, InterruptedException {return 0;}//关闭资源@Overridepublic void close() throws IOException {}
}
### --- Reducerpackage com.yanqi.mr.comment.step1;import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class MergeReducer extends Reducer<Text, BytesWritable, Text, BytesWritable> {@Overrideprotected void reduce(Text key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {//输出value值(文件内容),只获取其中第一个即可(只有一个)context.write(key, values.iterator().next());}
}
### --- Driverpackage com.yanqi.mr.comment.step1;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;import java.io.IOException;public class MergeDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {//        1. 获取配置文件对象,获取job对象实例final Configuration conf = new Configuration();final Job job = Job.getInstance(conf, "MergeDriver");
//        2. 指定程序jar的本地路径job.setJarByClass(MergeDriver.class);
//        3. 指定Mapper/Reducer类job.setMapperClass(MergeMapper.class);
//        job.setReducerClass(MergeReducer.class);
//        4. 指定Mapper输出的kv数据类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(BytesWritable.class);
//        5. 指定最终输出的kv数据类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(BytesWritable.class);//设置使用自定义InputFormat读取数据job.setInputFormatClass(MergeInputFormat.class);FileInputFormat.setInputPaths(job, new Path("E:\\merge\\merge-out")); //指定读取数据的原始路径//指定输出使用的outputformatjob.setOutputFormatClass(SequenceFileOutputFormat.class);//尽可能降低数据的量,减少磁盘空间的占用,网络间通信时数据量小可以节省时间//针对Sequencefile的压缩SequenceFileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);//压缩类型:record压缩SequenceFileOutputFormat.setOutputCompressionType(job, SequenceFile.CompressionType.RECORD);
//        SequenceFileOutputFormat.setOutputCompressionType(job, SequenceFile.CompressionType.BLOCK);
//        7. 指定job输出结果路径FileOutputFormat.setOutputPath(job, new Path("E:\\merge\\merge-output")); //指定结果数据输出路径
//        8. 提交作业final boolean flag = job.waitForCompletion(true);//jvm退出:正常退出0,非0值则是错误退出System.exit(flag ? 0 : 1);}
}
二、编译打印输出
### --- 编译打印输出~~~     配置输入输出参数
~~~     编译打印
~~~     将多个小文件合并成一个文件

这篇关于CC00047.hadoop——|HadoopMapReduce.V20|——|Hadoop.v20|MapReduce综合案例.v01|的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/371601

相关文章

Hadoop企业开发案例调优场景

需求 (1)需求:从1G数据中,统计每个单词出现次数。服务器3台,每台配置4G内存,4核CPU,4线程。 (2)需求分析: 1G / 128m = 8个MapTask;1个ReduceTask;1个mrAppMaster 平均每个节点运行10个 / 3台 ≈ 3个任务(4    3    3) HDFS参数调优 (1)修改:hadoop-env.sh export HDFS_NAMENOD

Hadoop集群数据均衡之磁盘间数据均衡

生产环境,由于硬盘空间不足,往往需要增加一块硬盘。刚加载的硬盘没有数据时,可以执行磁盘数据均衡命令。(Hadoop3.x新特性) plan后面带的节点的名字必须是已经存在的,并且是需要均衡的节点。 如果节点不存在,会报如下错误: 如果节点只有一个硬盘的话,不会创建均衡计划: (1)生成均衡计划 hdfs diskbalancer -plan hadoop102 (2)执行均衡计划 hd

hadoop开启回收站配置

开启回收站功能,可以将删除的文件在不超时的情况下,恢复原数据,起到防止误删除、备份等作用。 开启回收站功能参数说明 (1)默认值fs.trash.interval = 0,0表示禁用回收站;其他值表示设置文件的存活时间。 (2)默认值fs.trash.checkpoint.interval = 0,检查回收站的间隔时间。如果该值为0,则该值设置和fs.trash.interval的参数值相等。

Hadoop数据压缩使用介绍

一、压缩原则 (1)运算密集型的Job,少用压缩 (2)IO密集型的Job,多用压缩 二、压缩算法比较 三、压缩位置选择 四、压缩参数配置 1)为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器 2)要在Hadoop中启用压缩,可以配置如下参数

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置

深入探索协同过滤:从原理到推荐模块案例

文章目录 前言一、协同过滤1. 基于用户的协同过滤(UserCF)2. 基于物品的协同过滤(ItemCF)3. 相似度计算方法 二、相似度计算方法1. 欧氏距离2. 皮尔逊相关系数3. 杰卡德相似系数4. 余弦相似度 三、推荐模块案例1.基于文章的协同过滤推荐功能2.基于用户的协同过滤推荐功能 前言     在信息过载的时代,推荐系统成为连接用户与内容的桥梁。本文聚焦于

综合安防管理平台LntonAIServer视频监控汇聚抖动检测算法优势

LntonAIServer视频质量诊断功能中的抖动检测是一个专门针对视频稳定性进行分析的功能。抖动通常是指视频帧之间的不必要运动,这种运动可能是由于摄像机的移动、传输中的错误或编解码问题导致的。抖动检测对于确保视频内容的平滑性和观看体验至关重要。 优势 1. 提高图像质量 - 清晰度提升:减少抖动,提高图像的清晰度和细节表现力,使得监控画面更加真实可信。 - 细节增强:在低光条件下,抖

【区块链 + 人才服务】可信教育区块链治理系统 | FISCO BCOS应用案例

伴随着区块链技术的不断完善,其在教育信息化中的应用也在持续发展。利用区块链数据共识、不可篡改的特性, 将与教育相关的数据要素在区块链上进行存证确权,在确保数据可信的前提下,促进教育的公平、透明、开放,为教育教学质量提升赋能,实现教育数据的安全共享、高等教育体系的智慧治理。 可信教育区块链治理系统的顶层治理架构由教育部、高校、企业、学生等多方角色共同参与建设、维护,支撑教育资源共享、教学质量评估、

客户案例:安全海外中继助力知名家电企业化解海外通邮困境

1、客户背景 广东格兰仕集团有限公司(以下简称“格兰仕”),成立于1978年,是中国家电行业的领军企业之一。作为全球最大的微波炉生产基地,格兰仕拥有多项国际领先的家电制造技术,连续多年位列中国家电出口前列。格兰仕不仅注重业务的全球拓展,更重视业务流程的高效与顺畅,以确保在国际舞台上的竞争力。 2、需求痛点 随着格兰仕全球化战略的深入实施,其海外业务快速增长,电子邮件成为了关键的沟通工具。

【区块链 + 人才服务】区块链集成开发平台 | FISCO BCOS应用案例

随着区块链技术的快速发展,越来越多的企业开始将其应用于实际业务中。然而,区块链技术的专业性使得其集成开发成为一项挑战。针对此,广东中创智慧科技有限公司基于国产开源联盟链 FISCO BCOS 推出了区块链集成开发平台。该平台基于区块链技术,提供一套全面的区块链开发工具和开发环境,支持开发者快速开发和部署区块链应用。此外,该平台还可以提供一套全面的区块链开发教程和文档,帮助开发者快速上手区块链开发。