使用MapReduce实现k-means算法

2024-06-20 18:18

本文主要是介绍使用MapReduce实现k-means算法,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

主要的算法流程就是:

(1)随机选择k个点,放到磁盘上供个个点进行共享

(2)每一个map读取中心点,每一条及记录找到最近的Cluster,发出的记录是<(id),(cluster)>,Reduce的功能就是重新计算新的k均值,并写到hdfs中,供下一次的迭代使用

(3)当迭代停止,根据最终的中心点,分配所有的点,形成最终的聚类。

以下是具体的代码:

package kmeans;


import java.io.DataInput;


/*
 * k-means聚类算法簇信息
 */
public class Cluster implements Writable {
private int clusterID;
private long numOfPoints;
private Instance center;


public Cluster() {
this.setClusterID(-1);
this.setNumOfPoints(0);
this.setCenter(new Instance());
}


public Cluster(int clusterID, Instance center) {
this.setClusterID(clusterID);
this.setNumOfPoints(0);
this.setCenter(center);
}


public Cluster(String line) {
String[] value = line.split(",", 3);
clusterID = Integer.parseInt(value[0]);
numOfPoints = Long.parseLong(value[1]);
center = new Instance(value[2]);
}


public String toString() {
String result = String.valueOf(clusterID) + ","
+ String.valueOf(numOfPoints) + "," + center.toString();
return result;
}


public int getClusterID() {
return clusterID;
}


public void setClusterID(int clusterID) {
this.clusterID = clusterID;
}


public long getNumOfPoints() {
return numOfPoints;
}


public void setNumOfPoints(long numOfPoints) {
this.numOfPoints = numOfPoints;
}


public Instance getCenter() {
return center;
}


public void setCenter(Instance center) {
this.center = center;
}


public void observeInstance(Instance instance) {
try {
Instance sum = center.multiply(numOfPoints).add(instance);
numOfPoints++;
center = sum.divide(numOfPoints);
} catch (Exception e) {
e.printStackTrace();
}
}


@Override
public void write(DataOutput out) throws IOException {
out.writeInt(clusterID);
out.writeLong(numOfPoints);
center.write(out);
}


@Override
public void readFields(DataInput in) throws IOException {
clusterID = in.readInt();
numOfPoints = in.readLong();
center.readFields(in);
}
}


package kmeans;


import java.io.DataInput;


public class Instance implements Writable {
ArrayList<Double> value;


public Instance() {
value = new ArrayList<Double>();
}


public Instance(String line) {
String[] valueString = line.split(",");
value = new ArrayList<Double>();
for (int i = 0; i < valueString.length; i++) {
value.add(Double.parseDouble(valueString[i]));
}
}


public Instance(Instance ins) {
value = new ArrayList<Double>();
for (int i = 0; i < ins.getValue().size(); i++) {
value.add(new Double(ins.getValue().get(i)));
}
}


public Instance(int k) {
value = new ArrayList<Double>();
for (int i = 0; i < k; i++) {
value.add(0.0);
}
}


public ArrayList<Double> getValue() {
return value;
}


public Instance add(Instance instance) {
if (value.size() == 0)
return new Instance(instance);
else if (instance.getValue().size() == 0)
return new Instance(this);
else if (value.size() != instance.getValue().size())
try {
throw new Exception("can not add! dimension not compatible!"
+ value.size() + "," + instance.getValue().size());
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
return null;
}
else {
Instance result = new Instance();
for (int i = 0; i < value.size(); i++) {
result.getValue()
.add(value.get(i) + instance.getValue().get(i));
}
return result;
}
}


public Instance multiply(double num) {
Instance result = new Instance();
for (int i = 0; i < value.size(); i++) {
result.getValue().add(value.get(i) * num);
}
return result;
}


public Instance divide(double num) {
Instance result = new Instance();
for (int i = 0; i < value.size(); i++) {
result.getValue().add(value.get(i) / num);
}
return result;
}


public String toString() {
String s = new String();
for (int i = 0; i < value.size() - 1; i++) {
s += (value.get(i) + ",");
}
s += value.get(value.size() - 1);
return s;
}


@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
out.writeInt(value.size());
for (int i = 0; i < value.size(); i++) {
out.writeDouble(value.get(i));
}
}


@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
int size = 0;
value = new ArrayList<Double>();
if ((size = in.readInt()) != 0) {
for (int i = 0; i < size; i++) {
value.add(in.readDouble());
}
}
}
}


package kmeans;


import java.io.BufferedReader;


/**
 * KMeans聚类算法
 * 
 */
public class KMeans {
public static class KMeansMapper extends
Mapper<LongWritable, Text, IntWritable, Cluster> {
private ArrayList<Cluster> kClusters = new ArrayList<Cluster>();


/**
* 读入目前的簇信息
*/
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
super.setup(context);
FileSystem fs = FileSystem.get(context.getConfiguration());
FileStatus[] fileList = fs.listStatus(new Path(context
.getConfiguration().get("clusterPath")));
BufferedReader in = null;
FSDataInputStream fsi = null;
String line = null;
for (int i = 0; i < fileList.length; i++) {
if (!fileList[i].isDir()) {
fsi = fs.open(fileList[i].getPath());
in = new BufferedReader(new InputStreamReader(fsi, "UTF-8"));
while ((line = in.readLine()) != null) {
System.out.println("read a line:" + line);
Cluster cluster = new Cluster(line);
cluster.setNumOfPoints(0);
kClusters.add(cluster);
}
}
}
in.close();
fsi.close();
}


/**
* 读取一行然后寻找离该点最近的簇发射(clusterID,instance)
*/
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
Instance instance = new Instance(value.toString());
int id;
try {
id = getNearest(instance);
if (id == -1)
throw new InterruptedException("id == -1");
else {
Cluster cluster = new Cluster(id, instance);
cluster.setNumOfPoints(1);
System.out.println("cluster that i emit is:"
+ cluster.toString());
context.write(new IntWritable(id), cluster);
}
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}


/**
* 返回离instance最近的簇的ID

* @param instance
* @return
* @throws Exception
*/
public int getNearest(Instance instance) throws Exception {
int id = -1;
double distance = Double.MAX_VALUE;
Distance<Double> distanceMeasure = new EuclideanDistance<Double>();
double newDis = 0.0;
for (Cluster cluster : kClusters) {
newDis = distanceMeasure.getDistance(cluster.getCenter()
.getValue(), instance.getValue());
if (newDis < distance) {
id = cluster.getClusterID();
distance = newDis;
}
}
return id;
}


public Cluster getClusterByID(int id) {
for (Cluster cluster : kClusters) {
if (cluster.getClusterID() == id)
return cluster;
}
return null;
}
}


public static class KMeansCombiner extends
Reducer<IntWritable, Cluster, IntWritable, Cluster> {
public void reduce(IntWritable key, Iterable<Cluster> value,
Context context) throws IOException, InterruptedException {
Instance instance = new Instance();
int numOfPoints = 0;
for (Cluster cluster : value) {
numOfPoints += cluster.getNumOfPoints();
System.out.println("cluster is:" + cluster.toString());
instance = instance.add(cluster.getCenter().multiply(
cluster.getNumOfPoints()));
}
Cluster cluster = new Cluster(key.get(), instance
.divide(numOfPoints));
cluster.setNumOfPoints(numOfPoints);
System.out.println("combiner emit cluster:" + cluster.toString());
context.write(key, cluster);
}
}


public static class KMeansReducer extends
Reducer<IntWritable, Cluster, NullWritable, Cluster> {
public void reduce(IntWritable key, Iterable<Cluster> value,
Context context) throws IOException, InterruptedException {
Instance instance = new Instance();
int numOfPoints = 0;
for (Cluster cluster : value) {
numOfPoints += cluster.getNumOfPoints();
instance = instance.add(cluster.getCenter().multiply(
cluster.getNumOfPoints()));
}
Cluster cluster = new Cluster(key.get(), instance
.divide(numOfPoints));
cluster.setNumOfPoints(numOfPoints);
context.write(NullWritable.get(), cluster);
}
}
}


package kmeans;


import java.io.BufferedReader;


/**
 * 在收敛条件满足且所有簇中心的文件最后产生后,再对输入文件 中的所有实例进行划分簇的工作,最后把所有实例按照(实例,簇id) 的方式写进结果文件
 * 
 * @author KING
 * 
 */
public class KMeansCluster {
public static class KMeansClusterMapper extends
Mapper<LongWritable, Text, Text, IntWritable> {
private ArrayList<Cluster> kClusters = new ArrayList<Cluster>();


/**
* 读入目前的簇信息
*/
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
super.setup(context);
FileSystem fs = FileSystem.get(context.getConfiguration());
FileStatus[] fileList = fs.listStatus(new Path(context
.getConfiguration().get("clusterPath")));
BufferedReader in = null;
FSDataInputStream fsi = null;
String line = null;
for (int i = 0; i < fileList.length; i++) {
if (!fileList[i].isDir()) {
fsi = fs.open(fileList[i].getPath());
in = new BufferedReader(new InputStreamReader(fsi, "UTF-8"));
while ((line = in.readLine()) != null) {
System.out.println("read a line:" + line);
Cluster cluster = new Cluster(line);
cluster.setNumOfPoints(0);
kClusters.add(cluster);
}
}
}
in.close();
fsi.close();
}


/**
* 读取一行然后寻找离该点最近的簇id发射(instance,clusterID)
*/
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
Instance instance = new Instance(value.toString());
int id;
try {
id = getNearest(instance);
if (id == -1)
throw new InterruptedException("id == -1");
else {
context.write(value, new IntWritable(id));
}
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}


public int getNearest(Instance instance) throws Exception {
int id = -1;
double distance = Double.MAX_VALUE;
Distance<Double> distanceMeasure = new EuclideanDistance<Double>();
double newDis = 0.0;
for (Cluster cluster : kClusters) {
newDis = distanceMeasure.getDistance(cluster.getCenter()
.getValue(), instance.getValue());
if (newDis < distance) {
id = cluster.getClusterID();
distance = newDis;
}
}
return id;
}
}
}


package kmeans;


import java.io.IOException;


/**
 * 调度整个KMeans运行的过程
 * 
 */
public class KMeansDriver {
private int k;
private int iterationNum;
private String sourcePath;
private String outputPath;
private Configuration conf;


public KMeansDriver(int k, int iterationNum, String sourcePath,
String outputPath, Configuration conf) {
this.k = k;
this.iterationNum = iterationNum;
this.sourcePath = sourcePath;
this.outputPath = outputPath;
this.conf = conf;
}


public void clusterCenterJob() throws IOException, InterruptedException,
ClassNotFoundException {
for (int i = 0; i < iterationNum; i++) {
Job clusterCenterJob = new Job();
clusterCenterJob.setJobName("clusterCenterJob" + i);
clusterCenterJob.setJarByClass(KMeans.class);


clusterCenterJob.getConfiguration().set("clusterPath",
outputPath + "/cluster-" + i + "/");


clusterCenterJob.setMapperClass(KMeans.KMeansMapper.class);
clusterCenterJob.setMapOutputKeyClass(IntWritable.class);
clusterCenterJob.setMapOutputValueClass(Cluster.class);


clusterCenterJob.setCombinerClass(KMeans.KMeansCombiner.class);
clusterCenterJob.setReducerClass(KMeans.KMeansReducer.class);
clusterCenterJob.setOutputKeyClass(NullWritable.class);
clusterCenterJob.setOutputValueClass(Cluster.class);


FileInputFormat
.addInputPath(clusterCenterJob, new Path(sourcePath));
FileOutputFormat.setOutputPath(clusterCenterJob, new Path(
outputPath + "/cluster-" + (i + 1) + "/"));


clusterCenterJob.waitForCompletion(true);
System.out.println("finished!");
}
}


public void KMeansClusterJod() throws IOException, InterruptedException,
ClassNotFoundException {
Job kMeansClusterJob = new Job();
kMeansClusterJob.setJobName("KMeansClusterJob");
kMeansClusterJob.setJarByClass(KMeansCluster.class);


kMeansClusterJob.getConfiguration().set("clusterPath",
outputPath + "/cluster-" + (iterationNum - 1) + "/");


kMeansClusterJob
.setMapperClass(KMeansCluster.KMeansClusterMapper.class);
kMeansClusterJob.setMapOutputKeyClass(Text.class);
kMeansClusterJob.setMapOutputValueClass(IntWritable.class);


kMeansClusterJob.setNumReduceTasks(0);


FileInputFormat.addInputPath(kMeansClusterJob, new Path(sourcePath));
FileOutputFormat.setOutputPath(kMeansClusterJob, new Path(outputPath
+ "/clusteredInstances" + "/"));


kMeansClusterJob.waitForCompletion(true);
System.out.println("finished!");
}


public void generateInitialCluster() {
RandomClusterGenerator generator = new RandomClusterGenerator(conf,
sourcePath, k);
generator.generateInitialCluster(outputPath + "/");
}


public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
System.out.println("start");
Configuration conf = new Configuration();
int k = Integer.parseInt(args[0]);
int iterationNum = Integer.parseInt(args[1]);
String sourcePath = args[2];
String outputPath = args[3];
KMeansDriver driver = new KMeansDriver(k, iterationNum, sourcePath,
outputPath, conf);
driver.generateInitialCluster();
System.out.println("initial cluster finished");
driver.clusterCenterJob();
driver.KMeansClusterJod();
}
}


package kmeans;


import java.io.IOException;


/**
 * This class generates the initial Cluster centers as the input of successive
 * process. it randomly chooses k instances as the initial k centers and store
 * it as a sequenceFile.Specificly,we scan all the instances and each time when
 * we scan a new instance.we first check if the number of clusters no less than
 * k. we simply add current instance to our cluster if the condition is
 * satisfied or we will replace the first cluster with it with probability
 * 1/(currentNumber + 1).
 * 
 */
public final class RandomClusterGenerator {
private int k;
private FileStatus[] fileList;
private FileSystem fs;
private ArrayList<Cluster> kClusters;
private Configuration conf;


public RandomClusterGenerator(Configuration conf, String filePath, int k) {
this.k = k;
try {
fs = FileSystem.get(URI.create(filePath), conf);
fileList = fs.listStatus((new Path(filePath)));
kClusters = new ArrayList<Cluster>(k);
this.conf = conf;
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}


}


/**

* @param destinationPath
*            the destination Path we will store our cluster file in.the
*            initial file will be named clusters-0
*/
public void generateInitialCluster(String destinationPath) {
Text line = new Text();
FSDataInputStream fsi = null;
try {
for (int i = 0; i < fileList.length; i++) {
fsi = fs.open(fileList[i].getPath());
LineReader lineReader = new LineReader(fsi, conf);
while (lineReader.readLine(line) > 0) {
// 判断是否应该加入到中心集合中去
System.out.println("read a line:" + line);
Instance instance = new Instance(line.toString());
makeDecision(instance);
}
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
try {
// in.close();
fsi.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}


}


writeBackToFile(destinationPath);


}


public void makeDecision(Instance instance) {
if (kClusters.size() < k) {
Cluster cluster = new Cluster(kClusters.size() + 1, instance);
kClusters.add(cluster);
} else {
int choice = randomChoose(k);
if (!(choice == -1)) {
int id = kClusters.get(choice).getClusterID();
kClusters.remove(choice);
Cluster cluster = new Cluster(id, instance);
kClusters.add(cluster);
}
}
}


/**
* 以1/(1+k)的概率返回一个[0,k-1]中的正整数,以 k/k+1的概率返回-1.

* @param k
* @return
*/
public int randomChoose(int k) {
Random random = new Random();
if (random.nextInt(k + 1) == 0) {
return new Random().nextInt(k);
} else
return -1;
}


public void writeBackToFile(String destinationPath) {
// /clusters
Path path = new Path(destinationPath + "cluster-0");
FSDataOutputStream fsi = null;
try {
fsi = fs.create(path);
for (Cluster cluster : kClusters) {
fsi.write((cluster.toString() + "\n").getBytes());
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
try {
fsi.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}


}
}


数据:

2,1,3,4,1,4
3,2,5,2,3,5
4,4,4,3,1,5
2,3,1,2,0,3
4,0,1,1,1,5
1,2,3,5,0,1
5,3,2,2,1,3
3,4,1,1,2,1
0,2,3,3,1,4
0,2,5,0,2,2
2,1,4,5,4,3
4,1,4,3,3,2
0,3,2,2,0,1
1,3,1,0,3,0
3,3,4,2,1,3
3,5,3,5,3,2
2,3,2,3,0,1
4,3,3,2,2,3
1,4,3,4,3,1
3,2,3,0,2,5
1,0,2,1,0,4
4,4,3,5,5,4
5,1,4,3,5,2
3,4,4,4,1,1
2,2,4,4,5,5
5,2,0,3,1,3
1,1,3,1,1,3
2,4,2,0,3,5
1,1,1,1,0,4
1,1,4,1,3,0

这篇关于使用MapReduce实现k-means算法的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1078888

相关文章

C语言中联合体union的使用

本文编辑整理自: http://bbs.chinaunix.net/forum.php?mod=viewthread&tid=179471 一、前言 “联合体”(union)与“结构体”(struct)有一些相似之处。但两者有本质上的不同。在结构体中,各成员有各自的内存空间, 一个结构变量的总长度是各成员长度之和。而在“联合”中,各成员共享一段内存空间, 一个联合变量

C++对象布局及多态实现探索之内存布局(整理的很多链接)

本文通过观察对象的内存布局,跟踪函数调用的汇编代码。分析了C++对象内存的布局情况,虚函数的执行方式,以及虚继承,等等 文章链接:http://dev.yesky.com/254/2191254.shtml      论C/C++函数间动态内存的传递 (2005-07-30)   当你涉及到C/C++的核心编程的时候,你会无止境地与内存管理打交道。 文章链接:http://dev.yesky

Tolua使用笔记(上)

目录   1.准备工作 2.运行例子 01.HelloWorld:在C#中,创建和销毁Lua虚拟机 和 简单调用。 02.ScriptsFromFile:在C#中,对一个lua文件的执行调用 03.CallLuaFunction:在C#中,对lua函数的操作 04.AccessingLuaVariables:在C#中,对lua变量的操作 05.LuaCoroutine:在Lua中,

Vim使用基础篇

本文内容大部分来自 vimtutor,自带的教程的总结。在终端输入vimtutor 即可进入教程。 先总结一下,然后再分别介绍正常模式,插入模式,和可视模式三种模式下的命令。 目录 看完以后的汇总 1.正常模式(Normal模式) 1.移动光标 2.删除 3.【:】输入符 4.撤销 5.替换 6.重复命令【. ; ,】 7.复制粘贴 8.缩进 2.插入模式 INSERT

Lipowerline5.0 雷达电力应用软件下载使用

1.配网数据处理分析 针对配网线路点云数据,优化了分类算法,支持杆塔、导线、交跨线、建筑物、地面点和其他线路的自动分类;一键生成危险点报告和交跨报告;还能生成点云数据采集航线和自主巡检航线。 获取软件安装包联系邮箱:2895356150@qq.com,资源源于网络,本介绍用于学习使用,如有侵权请您联系删除! 2.新增快速版,简洁易上手 支持快速版和专业版切换使用,快速版界面简洁,保留主

如何免费的去使用connectedpapers?

免费使用connectedpapers 1. 打开谷歌浏览器2. 按住ctrl+shift+N,进入无痕模式3. 不需要登录(也就是访客模式)4. 两次用完,关闭无痕模式(继续重复步骤 2 - 4) 1. 打开谷歌浏览器 2. 按住ctrl+shift+N,进入无痕模式 输入网址:https://www.connectedpapers.com/ 3. 不需要登录(也就是

通过SSH隧道实现通过远程服务器上外网

搭建隧道 autossh -M 0 -f -D 1080 -C -N user1@remotehost##验证隧道是否生效,查看1080端口是否启动netstat -tuln | grep 1080## 测试ssh 隧道是否生效curl -x socks5h://127.0.0.1:1080 -I http://www.github.com 将autossh 设置为服务,隧道开机启动

时序预测 | MATLAB实现LSTM时间序列未来多步预测-递归预测

时序预测 | MATLAB实现LSTM时间序列未来多步预测-递归预测 目录 时序预测 | MATLAB实现LSTM时间序列未来多步预测-递归预测基本介绍程序设计参考资料 基本介绍 MATLAB实现LSTM时间序列未来多步预测-递归预测。LSTM是一种含有LSTM区块(blocks)或其他的一种类神经网络,文献或其他资料中LSTM区块可能被描述成智能网络单元,因为

vue项目集成CanvasEditor实现Word在线编辑器

CanvasEditor实现Word在线编辑器 官网文档:https://hufe.club/canvas-editor-docs/guide/schema.html 源码地址:https://github.com/Hufe921/canvas-editor 前提声明: 由于CanvasEditor目前不支持vue、react 等框架开箱即用版,所以需要我们去Git下载源码,拿到其中两个主

代码随想录算法训练营:12/60

非科班学习算法day12 | LeetCode150:逆波兰表达式 ,Leetcode239: 滑动窗口最大值  目录 介绍 一、基础概念补充: 1.c++字符串转为数字 1. std::stoi, std::stol, std::stoll, std::stoul, std::stoull(最常用) 2. std::stringstream 3. std::atoi, std