mapreduce实现全局排序

2024-06-16 20:18
文章标签 mapreduce 排序 全局 实现

本文主要是介绍mapreduce实现全局排序,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

直接附代码,说明都在源码里了。

 

package com.hadoop.totalsort;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LineRecordReader;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.IndexedSortable;
import org.apache.hadoop.util.QuickSort;
public class SamplerInputFormat extends FileInputFormat<Text, Text> {  
static final String PARTITION_FILENAME = "_partition.lst";  
static final String SAMPLE_SIZE = "terasort.partitions.sample";  
private static JobConf lastConf = null;  
private static InputSplit[] lastResult = null;  
static class TextSampler implements IndexedSortable {  
public ArrayList<Text> records = new ArrayList<Text>();  
public int compare(int arg0, int arg1) {  
Text right = records.get(arg0);  
Text left = records.get(arg1);  
return right.compareTo(left);  
}  
public void swap(int arg0, int arg1) {  
Text right = records.get(arg0);  
Text left = records.get(arg1);  
records.set(arg0, left);  
records.set(arg1, right);  
}  
public void addKey(Text key) {  
records.add(new Text(key));  
}  
//将采集出来的key数据排序
public Text[] createPartitions(int numPartitions) {  
int numRecords = records.size();  
if (numPartitions > numRecords) {  
throw new IllegalArgumentException("Requested more partitions than input keys (" + numPartitions +  
" > " + numRecords + ")");  
}  
new QuickSort().sort(this, 0, records.size());  
float stepSize = numRecords / (float) numPartitions;  //采集的时候应该是采了100条记录,从10个分片查找的,此处再取numPartitions-1条
Text[] result = new Text[numPartitions - 1];  
for (int i = 1; i < numPartitions; ++i) {  
result[i - 1] = records.get(Math.round(stepSize * i));  
}  
return result;  
}  
}  
public static void writePartitionFile(JobConf conf, Path partFile) throws IOException {  
//前段代码从分片中采集数据,通过sampler.addKey存入TextSampler中的records数组
SamplerInputFormat inputFormat = new SamplerInputFormat();  
TextSampler sampler = new TextSampler();  
Text key = new Text();  
Text value = new Text();  
int partitions = conf.getNumReduceTasks(); // Reducer任务的个数   
long sampleSize = conf.getLong(SAMPLE_SIZE, 100); // 采集数据-键值对的个数   
InputSplit[] splits = inputFormat.getSplits(conf, conf.getNumMapTasks());// 获得数据分片   
int samples = Math.min(10, splits.length);// 采集分片的个数   ,采集10个分片
long recordsPerSample = sampleSize / samples;// 每个分片采集的键值对个数   
int sampleStep = splits.length / samples; // 采集分片的步长   ,总的分片个数/要采集的分片个数
long records = 0;  
for (int i = 0; i < samples; i++) {  //1...10分片数
RecordReader<Text, Text> reader = inputFormat.getRecordReader(splits[sampleStep * i], conf, null);  
while (reader.next(key, value)) {  
sampler.addKey(key);   //将key值增加到sampler的records数组
records += 1;  
if ((i + 1) * recordsPerSample <= records) {  //目的是均匀采集各分片的条数,比如采集到第5个分片,那么记录条数应该小于5个分片应该的条数
break;  
}  
}  
}  
FileSystem outFs = partFile.getFileSystem(conf);  
if (outFs.exists(partFile)) {  
outFs.delete(partFile, false);  
}  
SequenceFile.Writer writer = SequenceFile.createWriter(outFs, conf, partFile, Text.class, NullWritable.class);  
NullWritable nullValue = NullWritable.get();  
for (Text split : sampler.createPartitions(partitions)) {  //调用createPartitions方法,排序采集出来的数据,并取partitions条
writer.append(split, nullValue);  
}  
writer.close();  
}  
static class TeraRecordReader implements RecordReader<Text, Text> {  
private LineRecordReader in;  
private LongWritable junk = new LongWritable();  
private Text line = new Text();  
private static int KEY_LENGTH = 10;  
public TeraRecordReader(Configuration job, FileSplit split) throws IOException {  
in = new LineRecordReader(job, split);  
}  
public void close() throws IOException {  
in.close();  
}  
public Text createKey() {  
// TODO Auto-generated method stub   
return new Text();  
}  
public Text createValue() {  
return new Text();  
}  
public long getPos() throws IOException {  
// TODO Auto-generated method stub   
return in.getPos();  
}  
public float getProgress() throws IOException {  
// TODO Auto-generated method stub   
return in.getProgress();  
}  
public boolean next(Text arg0, Text arg1) throws IOException {  
if (in.next(junk, line)) {   //调用父类方法,将value值赋给key
// if (line.getLength() < KEY_LENGTH) {   
arg0.set(line);  
arg1.clear();  
//                } else {   
//                    byte[] bytes = line.getBytes(); // 默认知道读取要比较值的前10个字节 作为key   
//                                                    // 后面的字节作为value;   
//                    arg0.set(bytes, 0, KEY_LENGTH);   
//                    arg1.set(bytes, KEY_LENGTH, line.getLength() - KEY_LENGTH);   
//                }   
return true;  
} else {  
return false;  
}  
}  
}  
@Override  
public InputSplit[] getSplits(JobConf conf, int splits) throws IOException {  
if (conf == lastConf) {  
return lastResult;  
}  
lastConf = conf;  
lastResult = super.getSplits(lastConf, splits);  
return lastResult;  
}  
public org.apache.hadoop.mapred.RecordReader<Text, Text> getRecordReader(InputSplit arg0, JobConf arg1,  
Reporter arg2) throws IOException {  
return new TeraRecordReader(arg1, (FileSplit) arg0);  
}  
}  


 

package com.hadoop.totalsort;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Partitioner;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class SamplerSort extends Configured implements Tool {  
// 自定义的Partitioner   
public static class TotalOrderPartitioner implements Partitioner<Text, Text> {  
private Text[] splitPoints;  
public TotalOrderPartitioner() {  
}  
public int getPartition(Text arg0, Text arg1, int arg2) {  
// TODO Auto-generated method stub   
return findPartition(arg0);  
}  
public void configure(JobConf arg0) {  
try {  
FileSystem fs = FileSystem.getLocal(arg0);  
Path partFile = new Path(SamplerInputFormat.PARTITION_FILENAME);  
splitPoints = readPartitions(fs, partFile, arg0); // 读取采集文件   
} catch (IOException ie) {  
throw new IllegalArgumentException("can't read paritions file", ie);  
}  
}  
public int findPartition(Text key) // 分配可以到多个reduce   
{  
int len = splitPoints.length;  
for (int i = 0; i < len; i++) {  
int res = key.compareTo(splitPoints[i]);  
if (res > 0 && i < len - 1) {  
continue;  
} else if (res == 0) {  
return i;  
} else if (res < 0) {  
return i;  
} else if (res > 0 && i == len - 1) {  
return i + 1;  
}  
}  
return 0;  
}  
private static Text[] readPartitions(FileSystem fs, Path p, JobConf job) throws IOException {  
SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, job);  
List<Text> parts = new ArrayList<Text>();  
Text key = new Text();  
NullWritable value = NullWritable.get();  
while (reader.next(key, value)) {  
parts.add(key);  
}  
reader.close();  
return parts.toArray(new Text[parts.size()]);  
}  
}  
public int run(String[] args) throws Exception {  
JobConf job = (JobConf) getConf();  
// job.set(name, value);   
Path inputDir = new Path(args[0]);  
inputDir = inputDir.makeQualified(inputDir.getFileSystem(job));  
Path partitionFile = new Path(inputDir, SamplerInputFormat.PARTITION_FILENAME);  
URI partitionUri = new URI(partitionFile.toString() +  
"#" + SamplerInputFormat.PARTITION_FILENAME);  
SamplerInputFormat.setInputPaths(job, new Path(args[0]));  
FileOutputFormat.setOutputPath(job, new Path(args[1]));  
job.setJobName("SamplerTotalSort");  
job.setJarByClass(SamplerSort.class);  
job.setOutputKeyClass(Text.class);  
job.setOutputValueClass(Text.class);  
job.setInputFormat(SamplerInputFormat.class);  
job.setOutputFormat(TextOutputFormat.class);  
job.setPartitionerClass(TotalOrderPartitioner.class);  
job.setNumReduceTasks(4);  
SamplerInputFormat.writePartitionFile(job, partitionFile); // 数据采集并写入文件   
DistributedCache.addCacheFile(partitionUri, job); // 将这个文件作为共享文件   
DistributedCache.createSymlink(job);  
// SamplerInputFormat.setFinalSync(job, true);   
JobClient.runJob(job);  
return 0;  
}  
public static void main(String[] args) throws Exception {  
int res = ToolRunner.run(new JobConf(), new SamplerSort(), args);  
System.exit(res);  
}  
}  


 

 

这篇关于mapreduce实现全局排序的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1067441

相关文章

C++对象布局及多态实现探索之内存布局(整理的很多链接)

本文通过观察对象的内存布局,跟踪函数调用的汇编代码。分析了C++对象内存的布局情况,虚函数的执行方式,以及虚继承,等等 文章链接:http://dev.yesky.com/254/2191254.shtml      论C/C++函数间动态内存的传递 (2005-07-30)   当你涉及到C/C++的核心编程的时候,你会无止境地与内存管理打交道。 文章链接:http://dev.yesky

通过SSH隧道实现通过远程服务器上外网

搭建隧道 autossh -M 0 -f -D 1080 -C -N user1@remotehost##验证隧道是否生效,查看1080端口是否启动netstat -tuln | grep 1080## 测试ssh 隧道是否生效curl -x socks5h://127.0.0.1:1080 -I http://www.github.com 将autossh 设置为服务,隧道开机启动

时序预测 | MATLAB实现LSTM时间序列未来多步预测-递归预测

时序预测 | MATLAB实现LSTM时间序列未来多步预测-递归预测 目录 时序预测 | MATLAB实现LSTM时间序列未来多步预测-递归预测基本介绍程序设计参考资料 基本介绍 MATLAB实现LSTM时间序列未来多步预测-递归预测。LSTM是一种含有LSTM区块(blocks)或其他的一种类神经网络,文献或其他资料中LSTM区块可能被描述成智能网络单元,因为

vue项目集成CanvasEditor实现Word在线编辑器

CanvasEditor实现Word在线编辑器 官网文档:https://hufe.club/canvas-editor-docs/guide/schema.html 源码地址:https://github.com/Hufe921/canvas-editor 前提声明: 由于CanvasEditor目前不支持vue、react 等框架开箱即用版,所以需要我们去Git下载源码,拿到其中两个主

React+TS前台项目实战(十七)-- 全局常用组件Dropdown封装

文章目录 前言Dropdown组件1. 功能分析2. 代码+详细注释3. 使用方式4. 效果展示 总结 前言 今天这篇主要讲全局Dropdown组件封装,可根据UI设计师要求自定义修改。 Dropdown组件 1. 功能分析 (1)通过position属性,可以控制下拉选项的位置 (2)通过传入width属性, 可以自定义下拉选项的宽度 (3)通过传入classN

android一键分享功能部分实现

为什么叫做部分实现呢,其实是我只实现一部分的分享。如新浪微博,那还有没去实现的是微信分享。还有一部分奇怪的问题:我QQ分享跟QQ空间的分享功能,我都没配置key那些都是原本集成就有的key也可以实现分享,谁清楚的麻烦详解下。 实现分享功能我们可以去www.mob.com这个网站集成。免费的,而且还有短信验证功能。等这分享研究完后就研究下短信验证功能。 开始实现步骤(新浪分享,以下是本人自己实现

基于Springboot + vue 的抗疫物质管理系统的设计与实现

目录 📚 前言 📑摘要 📑系统流程 📚 系统架构设计 📚 数据库设计 📚 系统功能的具体实现    💬 系统登录注册 系统登录 登录界面   用户添加  💬 抗疫列表展示模块     区域信息管理 添加物资详情 抗疫物资列表展示 抗疫物资申请 抗疫物资审核 ✒️ 源码实现 💖 源码获取 😁 联系方式 📚 前言 📑博客主页:

探索蓝牙协议的奥秘:用ESP32实现高质量蓝牙音频传输

蓝牙(Bluetooth)是一种短距离无线通信技术,广泛应用于各种电子设备之间的数据传输。自1994年由爱立信公司首次提出以来,蓝牙技术已经经历了多个版本的更新和改进。本文将详细介绍蓝牙协议,并通过一个具体的项目——使用ESP32实现蓝牙音频传输,来展示蓝牙协议的实际应用及其优点。 蓝牙协议概述 蓝牙协议栈 蓝牙协议栈是蓝牙技术的核心,定义了蓝牙设备之间如何进行通信。蓝牙协议

python实现最简单循环神经网络(RNNs)

Recurrent Neural Networks(RNNs) 的模型: 上图中红色部分是输入向量。文本、单词、数据都是输入,在网络里都以向量的形式进行表示。 绿色部分是隐藏向量。是加工处理过程。 蓝色部分是输出向量。 python代码表示如下: rnn = RNN()y = rnn.step(x) # x为输入向量,y为输出向量 RNNs神经网络由神经元组成, python

利用Frp实现内网穿透(docker实现)

文章目录 1、WSL子系统配置2、腾讯云服务器安装frps2.1、创建配置文件2.2 、创建frps容器 3、WSL2子系统Centos服务器安装frpc服务3.1、安装docker3.2、创建配置文件3.3 、创建frpc容器 4、WSL2子系统Centos服务器安装nginx服务 环境配置:一台公网服务器(腾讯云)、一台笔记本电脑、WSL子系统涉及知识:docker、Frp