hadoop集群运行MR程序、mahout程序

2024-03-03 23:08

本文主要是介绍hadoop集群运行MR程序、mahout程序,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

hadoop集群运行MR程序

  • 1. 启动集群
  • 2. 上传程序资源到hdfs
  • 3. 修改程序文件路径
  • 4. 安装mahout
  • 5. 提交程序到集群

本教程在配置完hadoop,可以正常运行的前提下进行

1. 启动集群

# 启动hdfs
sbin/start-dfs.sh# 启动yarn
sbin/start-yarn.sh

使用jps命令,看到如下图所示,启动成功。
进程


2. 上传程序资源到hdfs

  1. 第一步:把文件上传到服务器。
  2. 第二步:把文件上传到hdfs集群。
bin/hadoop dfs -put ../train_data /

看到如下图所示,上传成功。
hdfs文件目录


3. 修改程序文件路径

把程序读取文件的路径,修改为从参数中获取,本程序需要修改为如下代码:

package com.demo;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;
import java.util.*;/*** @author affable* @description 处理数据* @date 2020/5/13 10:30*/
public class DataAnalysis {/*** 解析文件,取出userId*/static class ParseTxtMapper extends Mapper<LongWritable, Text, Text, Text> {Text k = new Text();Text v = new Text();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// 切分获取每个字段值String[] fields = value.toString().split(",");// k -> userId// v -> workId,actionk.set(fields[0]);v.set(fields[1] + "," + fields[2]);context.write(k, v);}}/*** 按每个userId进行reducer*/static class ScoreReducer extends Reducer<Text, Text, Text, Text> {// 此userId对于所有浏览职位的actionMap<String, Integer> workActions = new HashMap<>();// 此userId对于所有已投递职位的actionMap<String, Integer> deliveryWorks = new HashMap<>();// 最大值double maxWorkAction = 0.0D;double maxDeliveryWorkAction = 0.0D;// 这次的userIdString userId;Text k = new Text();Text v = new Text();@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {userId = key.toString();for (Text value : values) {String[] fields = value.toString().split(",");if ("2".equals(fields[1])) {// 如果投递了该职位// 如果map中没有,则放入1,如果有,则在此基础加1deliveryWorks.merge(fields[0], 1, Integer::sum);Integer count = deliveryWorks.get(fields[0]);maxDeliveryWorkAction = count > maxDeliveryWorkAction ? count : maxDeliveryWorkAction;} else {// 只是浏览了职位workActions.merge(fields[0], 1, Integer::sum);Integer count = workActions.get(fields[0]);maxWorkAction = count > maxWorkAction ? count : maxWorkAction;}}// 从浏览职位中去除已投递的deliveryWorks.forEach((k, v) -> workActions.remove(k));// 计算分数// 规则://     浏览量/最大浏览量*4  (0, 4]//     投递量/最大投递量+4  (4, 5]for (Map.Entry<String, Integer> entry : workActions.entrySet()) {String workId = entry.getKey();Integer count = entry.getValue();k.set(userId + "\t" + workId);v.set(String.format("%.2f", count / maxWorkAction * 4));context.write(k, v);}for (Map.Entry<String, Integer> entry : deliveryWorks.entrySet()) {String workId = entry.getKey();Integer count = entry.getValue();k.set(userId + "\t" + workId);v.set(String.format("%.2f", 4.0 + count / maxDeliveryWorkAction));context.write(k, v);}// 清空数据workActions.clear();deliveryWorks.clear();maxWorkAction = 0.0D;maxDeliveryWorkAction = 0.0D;}}public static void main(String[] args) throws Exception {String inputPath = args[0];String outputPath = args[1];Configuration conf = new Configuration();Job job = Job.getInstance(conf, "cal_user_score");job.setJarByClass(DataAnalysis.class);job.setMapperClass(ParseTxtMapper.class);job.setReducerClass(ScoreReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);job.setNumReduceTasks(1);// 设置输入输出路径FileInputFormat.setInputPaths(job, new Path(inputPath));FileOutputFormat.setOutputPath(job, new Path(outputPath));System.exit(job.waitForCompletion(true) ? 0 : 1);}}
package com.demo;import org.apache.commons.csv.CSVParser;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.mahout.cf.taste.common.TasteException;
import org.apache.mahout.cf.taste.impl.common.FastByIDMap;
import org.apache.mahout.cf.taste.impl.common.LongPrimitiveIterator;
import org.apache.mahout.cf.taste.impl.model.GenericDataModel;
import org.apache.mahout.cf.taste.impl.model.GenericPreference;
import org.apache.mahout.cf.taste.impl.model.GenericUserPreferenceArray;
import org.apache.mahout.cf.taste.impl.model.MemoryIDMigrator;
import org.apache.mahout.cf.taste.impl.recommender.svd.ALSWRFactorizer;
import org.apache.mahout.cf.taste.impl.recommender.svd.SVDRecommender;
import org.apache.mahout.cf.taste.model.DataModel;
import org.apache.mahout.cf.taste.model.Preference;
import org.apache.mahout.cf.taste.model.PreferenceArray;
import org.apache.mahout.cf.taste.recommender.RecommendedItem;
import org.apache.mahout.cf.taste.recommender.Recommender;import java.io.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;/*** @author affable* @description 职位推荐* @date 2020/5/11 10:02*/
public class WorkRecommend {private static final int SIZE = 20;/*** 获取文件系统*/private static FileSystem getFiledSystem() throws IOException {Configuration configuration = new Configuration();return FileSystem.get(configuration);}/*** 读取hdfs文件* @param filePath 文件路径* @return 返回读取文件流*/private static BufferedReader readHDFSFile(String filePath) throws FileNotFoundException {FSDataInputStream fsDataInputStream = null;BufferedReader reader = null;try {Path path = new Path(filePath);fsDataInputStream = getFiledSystem().open(path);reader = new BufferedReader(new InputStreamReader(fsDataInputStream));} catch (IOException e) {e.printStackTrace();}return reader;}public static void main(String[] args) throws TasteException, IOException {// *******************************处理开始******************************************// 使用推荐模型之前,对数据的userId和workId映射成long类型// 防止模型把userId和workId转为long,出现数据异常// 数据存储路径String dataPath = args[0];Map<Long,List<Preference>> preferecesOfUsers = new HashMap<>(16);// 读取原始数据并处理BufferedReader reader = readHDFSFile(dataPath);CSVParser parser = new CSVParser(reader, '\t');String[] line;MemoryIDMigrator userIdMigrator = new MemoryIDMigrator();MemoryIDMigrator workIdMigrator = new MemoryIDMigrator();while((line = parser.getLine()) != null) {// string 转 longlong userIdLong = userIdMigrator.toLongID(line[0]);long workIdLong = workIdMigrator.toLongID(line[1]);userIdMigrator.storeMapping(userIdLong, line[0]);workIdMigrator.storeMapping(workIdLong, line[1]);List<Preference> userPrefList;if((userPrefList = preferecesOfUsers.get(userIdLong)) == null) {userPrefList = new ArrayList<>();preferecesOfUsers.put(userIdLong, userPrefList);}userPrefList.add(new GenericPreference(userIdLong, workIdLong, Float.parseFloat(line[2])));}FastByIDMap<PreferenceArray> preferecesOfUsersFastMap = new FastByIDMap<>();for(Map.Entry<Long, List<Preference>> entry : preferecesOfUsers.entrySet()) {preferecesOfUsersFastMap.put(entry.getKey(), new GenericUserPreferenceArray(entry.getValue()));}// ***********************************处理完成**************************************// 读取数据DataModel dataModel = new GenericDataModel(preferecesOfUsersFastMap);// 使用als求损失函数ALSWRFactorizer factorizer = new ALSWRFactorizer(dataModel, 5, 0.2, 200);// 使用SVD算法进行推荐Recommender recommender = new SVDRecommender(dataModel, factorizer);// 推荐测试LongPrimitiveIterator userIdIterator = dataModel.getUserIDs();int i = 0;while (userIdIterator.hasNext()) {long userIdLong = userIdIterator.nextLong();String userId = userIdMigrator.toStringID(userIdLong);List<RecommendedItem> recommendedItems = recommender.recommend(userIdLong, SIZE);for (RecommendedItem item : recommendedItems) {// 写出到mysql// MysqlUtils.insert(userId, workIdMigrator.toStringID(item.getItemID()), item.getValue());System.out.println(String.format("userId: %s, itemId: %s, score: %.2f",userId, workIdMigrator.toStringID(item.getItemID()), item.getValue()));i++;}}System.out.println(i);// 释放资源// MysqlUtils.release();reader.close();}}

4. 安装mahout

  1. 下载安装包。
  2. 上传到服务器,并解压。
  3. 修改环境变量,在/etc/profile中做如下修改,并重新加载profile文件。

修改环境变量

source /etc/profile
  1. 修改mahout配置(或者添加HADOOP_CONF_DIR环境变量也可以)
vim bin/mahout

修改配置
6. 在命令行输入 mahout,测试是否可以正常运行。


5. 提交程序到集群

  1. 本地使用以下命令对程序打包,并上传到服务器。
    最好把后缀为-with-dependencies.jar的包上传到服务器,防止hadoop集群没有对应的依赖包
# maven程序打包
mvn clean package
  1. 运行数据分析处理程序。
# 最后两个参数为:待处理数据路径,处理后结果的保存路径
bin/hadoop jar ../work-recommend-1.0.0-Release-jar-with-dependencies.jar com.demo.DataAnalysis /train_data/user_action.csv /work_out/user_score

如下图所示,运行成功。
成功
成功
3. 运行预测程序。

mahout hadoop jar ../work-recommend-1.0.0-Release-jar-with-dependencies.jar com.demo.WorkRecommend hdfs://localhost:9000/work_out/user_score/part-r-00000

执行结果如下
运行结果

这篇关于hadoop集群运行MR程序、mahout程序的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/771212

相关文章

服务器集群同步时间手记

1.时间服务器配置(必须root用户) (1)检查ntp是否安装 [root@node1 桌面]# rpm -qa|grep ntpntp-4.2.6p5-10.el6.centos.x86_64fontpackages-filesystem-1.41-1.1.el6.noarchntpdate-4.2.6p5-10.el6.centos.x86_64 (2)修改ntp配置文件 [r

Hadoop企业开发案例调优场景

需求 (1)需求:从1G数据中,统计每个单词出现次数。服务器3台,每台配置4G内存,4核CPU,4线程。 (2)需求分析: 1G / 128m = 8个MapTask;1个ReduceTask;1个mrAppMaster 平均每个节点运行10个 / 3台 ≈ 3个任务(4    3    3) HDFS参数调优 (1)修改:hadoop-env.sh export HDFS_NAMENOD

HDFS—集群扩容及缩容

白名单:表示在白名单的主机IP地址可以,用来存储数据。 配置白名单步骤如下: 1)在NameNode节点的/opt/module/hadoop-3.1.4/etc/hadoop目录下分别创建whitelist 和blacklist文件 (1)创建白名单 [lytfly@hadoop102 hadoop]$ vim whitelist 在whitelist中添加如下主机名称,假如集群正常工作的节

Hadoop集群数据均衡之磁盘间数据均衡

生产环境,由于硬盘空间不足,往往需要增加一块硬盘。刚加载的硬盘没有数据时,可以执行磁盘数据均衡命令。(Hadoop3.x新特性) plan后面带的节点的名字必须是已经存在的,并且是需要均衡的节点。 如果节点不存在,会报如下错误: 如果节点只有一个硬盘的话,不会创建均衡计划: (1)生成均衡计划 hdfs diskbalancer -plan hadoop102 (2)执行均衡计划 hd

hadoop开启回收站配置

开启回收站功能,可以将删除的文件在不超时的情况下,恢复原数据,起到防止误删除、备份等作用。 开启回收站功能参数说明 (1)默认值fs.trash.interval = 0,0表示禁用回收站;其他值表示设置文件的存活时间。 (2)默认值fs.trash.checkpoint.interval = 0,检查回收站的间隔时间。如果该值为0,则该值设置和fs.trash.interval的参数值相等。

Hadoop数据压缩使用介绍

一、压缩原则 (1)运算密集型的Job,少用压缩 (2)IO密集型的Job,多用压缩 二、压缩算法比较 三、压缩位置选择 四、压缩参数配置 1)为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器 2)要在Hadoop中启用压缩,可以配置如下参数

如何用Docker运行Django项目

本章教程,介绍如何用Docker创建一个Django,并运行能够访问。 一、拉取镜像 这里我们使用python3.11版本的docker镜像 docker pull python:3.11 二、运行容器 这里我们将容器内部的8080端口,映射到宿主机的80端口上。 docker run -itd --name python311 -p

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

搭建Kafka+zookeeper集群调度

前言 硬件环境 172.18.0.5        kafkazk1        Kafka+zookeeper                Kafka Broker集群 172.18.0.6        kafkazk2        Kafka+zookeeper                Kafka Broker集群 172.18.0.7        kafkazk3

EMLOG程序单页友链和标签增加美化

单页友联效果图: 标签页面效果图: 源码介绍 EMLOG单页友情链接和TAG标签,友链单页文件代码main{width: 58%;是设置宽度 自己把设置成与您的网站宽度一样,如果自适应就填写100%,TAG文件不用修改 安装方法:把Links.php和tag.php上传到网站根目录即可,访问 域名/Links.php、域名/tag.php 所有模板适用,代码就不粘贴出来,已经打