ODPS MR开发 WordCount

2024-05-09 14:32
文章标签 开发 mr wordcount odps

本文主要是介绍ODPS MR开发 WordCount,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!


参考:
ODPS初始篇--客户端配置和使用:http://blog.itpub.net/26613085/viewspace-1327313/
odps dship客户端使用:http://blog.itpub.net/26613085/viewspace-1328434/
有了上面两篇文章,就可以使用ODPS的客户端;使用ODPS DSHIP往ODPS上上传数据。

1、 在Eclipse中创建一个JAVA工程ODPS_WORD_COUNT
2、 下载ODPS JAVA SDK包,http://www.aliyun.com/product/odps/,“开发者资源”,“JAVA SDK包”,解压缩。把这些jar包搜罗到一个文件夹下吧,等下添加的时候比较省事。

3、 在工程中添加ODPS SDK的JAR包,需要将这些JAR文件添加到Eclipse Project的Build Path中:选中ODPS_WORD_COUNT,右键Bulid Path,Configure Build Path,Java Build Path,Libraries,Add External JARs,选中JAR,然后OK就可以。把上面JAVA SDK包中的Jar文件都添加进来。 (如果想要在本地测试MR程序,需要把ODPS CLT里面的jar包都添加到项目中)
4、 在ODPS中准备一张表,yangsw_test.word_count.
odps:sql:yangsw_test> create table word_count(content string);
InstanceId: 2014111201412840gqtigdx5
OK
odps:sql:yangsw_test> desc word_count;
+------------------------------------------------------------------------------------+
| Table: word_count                                                                  |
| Owner: ALIYUN$******@***.com     | Project: yangsw_test                       |
| TableComment:                                                                      |
+------------------------------------------------------------------------------------+
| CreatedTime:              2014-11-12 09:41:28                                      |
| LastMetaModifiedTime:     2014-11-12 09:41:28                                      |
| LastDataModifiedTime:     1970-01-01 08:00:00                                      |
+------------------------------------------------------------------------------------+
| Type : Table                 | Size: 0 Bytes                                       |
+------------------------------------------------------------------------------------+
| Native Columns:                                                                    |
+------------------------------------------------------------------------------------+
| Field           | Type       | Comment                                             |
+------------------------------------------------------------------------------------+
| content         | STRING     |                                                     |
+------------------------------------------------------------------------------------+

5、 准备数据文件word_count.txt
hello word
hello odps
let us begin cloud compute now
6、 dship导入到word_count表
dship upload -fd "~" -rd "\r\n" C:\Users\yangswa\Desktop\word_count.txt word_count
Upload session: 201411120951352581870a0025cfcc
2014-11-12 09:51:06     scanning file: 'word_count.txt'
2014-11-12 09:51:06     uploading file: 'word_count.txt'
2014-11-12 09:51:07     'word_count.txt' uploaded
OK


7、 查看下这张表的数据
odps:sql:yangsw_test> select * from word_count;
InstanceId: 2014111214471814gabmgdx5
SQL: .
+---------+
| content |
+---------+
| hello word |
| hello odps |
| let us begin cloud compute now |
+---------+
8、 再创建一个结果表
odps:sql:yangsw_test> create table word_count_result(word string,count bigint);
InstanceId: 20141112074523943g9l5pdx5
OK
odps:sql:yangsw_test> desc word_count_result;
+------------------------------------------------------------------------------------+
| Table: word_count_result                                                           |
| Owner: ALIYUN$**********@126.com     | Project: yangsw_test                       |
| TableComment:                                                                      |
+------------------------------------------------------------------------------------+
| CreatedTime:              2014-11-12 15:45:24                                      |
| LastMetaModifiedTime:     2014-11-12 15:45:24                                      |
| LastDataModifiedTime:     1970-01-01 08:00:00                                      |
+------------------------------------------------------------------------------------+
| Type : Table                 | Size: 0 Bytes                                       |
+------------------------------------------------------------------------------------+
| Native Columns:                                                                    |
+------------------------------------------------------------------------------------+
| Field           | Type       | Comment                                             |
+------------------------------------------------------------------------------------+
| word            | STRING     |                                                     |
| count           | BIGINT     |                                                     |
+------------------------------------------------------------------------------------+
9、 实现程序
先建立一个包吧,com.thomas.odps
先借用官方的一个WordCount.java代码,下一篇文章将这个类改写成mapper、reducer和main类分开的写法。这次先把流程都走通。
package com.thomas.odps;

import java.io.IOException;
import java.util.Iterator;
import com.aliyun.odps.Column;
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.OdpsType;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.mapred.JobClient;
import com.aliyun.odps.mapred.MapperBase;
import com.aliyun.odps.mapred.ReducerBase;
import com.aliyun.odps.mapred.RunningJob;
import com.aliyun.odps.mapred.conf.JobConf;
import com.aliyun.odps.mapred.utils.InputUtils;
import com.aliyun.odps.mapred.utils.OutputUtils;


public class WordCount {

 public static class TokenizerMapper extends MapperBase {
  Record word;
  Record one;

  @Override
  public void setup(TaskContext context) throws IOException {
   word = context.createMapOutputKeyRecord();
   one = context.createMapOutputValueRecord();
   one.setBigint(0, 1L);
  }

  @Override
  public void map(long recordNum, Record record, TaskContext context)
    throws IOException {
   for (int i = 0; i < record.getColumnCount(); i++) {
    String[] words = record.get(i).toString().split("\\s+");
    for (String w : words) {
     word.setString(0, w);
     context.write(word, one);
    }
   }
  }

  public static class SumCombiner extends ReducerBase {
   private Record count;

   @Override
   public void setup(TaskContext context) throws IOException {
    count = context.createMapOutputValueRecord();
   }

   @Override
   public void reduce(Record key, Iterator<Record> values,
     TaskContext context) throws IOException {
    long c = 0;
    while (values.hasNext()) {
     Record val = values.next();
     c += (Long) val.get(0);
    }
    count.set(0, c);
    context.write(key, count);
   }
  }

  public static class SumReducer extends ReducerBase {
   private Record result;

   @Override
   public void setup(TaskContext context) throws IOException {
    result = context.createOutputRecord();
   }

   @Override
   public void reduce(Record key, Iterator<Record> values,
     TaskContext context) throws IOException {
    long count = 0;
    while (values.hasNext()) {
     Record val = values.next();
     count += (Long) val.get(0);
    }
    result.set(0, key.get(0));
    result.set(1, count);
    context.write(result);
   }
  }

  public static void main(String[] args) throws OdpsException {
   if (args.length != 2) {
    System.err.println("Usage: wordcount <in_table> <out_table>");
    System.exit(2);
   }
   JobConf job = new JobConf();
   job.setMapperClass(TokenizerMapper.class);
   job.setCombinerClass(SumCombiner.class);
   job.setReducerClass(SumReducer.class);
   job.setMapOutputKeySchema(new Column[] { new Column("word",
     OdpsType.STRING) });
   job.setMapOutputValueSchema(new Column[] { new Column("count",
     OdpsType.BIGINT) });
   InputUtils.addTable(TableInfo.builder().tableName(args[0]).build(), job);
   OutputUtils.addTable(TableInfo.builder().tableName(args[1]).build(), job);
   RunningJob rj = JobClient.runJob(job);
   rj.waitForCompletion();
  }
 }
}
10、 本地测试
WordCount.java右键,“Run As”,“Run Configurations …”:

Main”,”Project”:
ODPS_WORD_COUNT
Main”,”Main class”:
com.thomas.odps.WordCount$TokenizerMapper
(这个本来应该是com.thomas.odps.WordCount但是系统自动带出来上面的,改掉在本地执行时还报错因此就设置成上面的吧)
Arguments”,”Program arguments”:
word_count word_count_result
Arguments”,”VM arguments”:
-Dodps.runner.mode=local
-Dodps.project.name=yangsw_test
-Dodps.end.point=http://service.odps.aliyun.com/api
-Dodps.access.id=**********
-Dodps.access.key=**********
配置好后点击“Run”,启动本地测试

控制台输出:
Running open mr on old console.
2014-11-13 16:15:35 com.aliyun.odps.mapred.LocalJobRunner submit
信息: run mapreduce job in local mode
2014-11-13 16:15:35 com.aliyun.odps.mapred.LocalJobRunner submit
信息: job id: mr_20141113161535_909_4984
2014-11-13 16:15:35 com.aliyun.odps.mapred.LocalJobRunner processInput
信息: Processing input: word_count
2014-11-13 16:15:36 com.aliyun.odps.mapred.local.utils.SchemaUtils generateSchemaFile
信息: generate schema file: D:\MyEclipseWorkStation\ODPS_WORD_COUNT\warehouse\yangsw_test\word_count\__schema__
2014-11-13 16:15:36 com.aliyun.odps.mapred.local.utils.LocalRunUtils downloadTableSchemeAndData
信息: Create table scheme of word_count, Path: D:\MyEclipseWorkStation\ODPS_WORD_COUNT\warehouse\yangsw_test\word_count
2014-11-13 16:15:36 com.aliyun.odps.mapred.local.utils.LocalRunUtils downloadTable
信息: Start to download table: yangsw_test.word_count partSpec: null to D:\MyEclipseWorkStation\ODPS_WORD_COUNT\warehouse\yangsw_test\word_count
2014-11-13 16:15:37 com.aliyun.odps.mapred.local.utils.SchemaUtils generateSchemaFile
信息: generate schema file: D:\MyEclipseWorkStation\ODPS_WORD_COUNT\temp\mr_20141113161535_909_4984\input\yangsw_test\word_count\__schema__
2014-11-13 16:15:37 com.aliyun.odps.mapred.local.utils.SchemaUtils generateSchemaFile
信息: generate schema file: D:\MyEclipseWorkStation\ODPS_WORD_COUNT\temp\mr_20141113161535_909_4984\output\__default__\__schema__
2014-11-13 16:15:37 com.aliyun.odps.mapred.LocalJobRunner runJob
信息: Start to run mappers, num: 1
2014-11-13 16:15:37 com.aliyun.odps.mapred.local.MapDriver <init>
信息: Map M_000000: input: temp\mr_20141113161535_909_4984\input\yangsw_test\word_count\data:0+56
2014-11-13 16:15:37 com.aliyun.odps.mapred.local.MapDriver <init>
信息: create record reader: finished
2014-11-13 16:15:37 com.aliyun.odps.mapred.local.MapDriver run
信息: Start to run Mapper: M_000000
2014-11-13 16:15:37 com.aliyun.odps.mapred.local.MapDriver$ProxiedMapContextImpl close
信息: Start to run Combiner
2014-11-13 16:15:37 com.aliyun.odps.mapred.local.MapDriver$ProxiedMapContextImpl close
信息: Fininshed run Combiner
2014-11-13 16:15:37 com.aliyun.odps.mapred.local.MapDriver run
信息: Fininshed run Reducer: M_000000
2014-11-13 16:15:37 com.aliyun.odps.mapred.LocalJobRunner runJob
信息: Start to run reduces, num: 1
2014-11-13 16:15:37 com.aliyun.odps.mapred.local.ReduceDriver run
信息: Start to run Reducer: R_000000
2014-11-13 16:15:37 com.aliyun.odps.mapred.local.ReduceDriver run
信息: Fininshed run Reducer: R_000000
2014-11-13 16:15:37 com.aliyun.odps.mapred.LocalJobRunner moveOutputs
信息: Copy output to warehouse: label=__default__ -> D:\MyEclipseWorkStation\ODPS_WORD_COUNT\warehouse\yangsw_test\word_count_result
Summary:
counters: 8
 map-reduce framework
  combine_input_groups=9
  combine_output_records=9
  map_input_bytes=56
  map_input_records=3
  map_output_records=10
  reduce_output_[word_count_result]_bytes=74
  reduce_output_[word_count_result]_records=9

OK
InstanceId: mr_20141113161535_909_4984

执行完成后,会在该工程的目录下面多了如下目录结构:
./temp
./ warehouse
/ yangsw_test
    /word_count
        / __schema__
        / data
    /word_count_result
        / __schema__
        / R_000000
其中word_count下面的__schema__内容包含word_count源表的项目和表结构:
project=yangsw_test
table=word_count
columns=content:STRING
word_count下面的data是word_count表的数据:
hello word
hello odps
let us begin cloud compute now
word_count_result下面的__schema__是目标表的项目和表结构:
project=yangsw_test
table=word_count_result
columns=word:STRING,count:BIGINT
word_count_result下面的R_000000是输出结果,本次只有一个reduce所以只有一个文件:
begin,1
cloud,1
compute,1
hello,2
let,1
now,1
odps,1
us,1
word,1
上面就是wordcount本地运行的结果了
11、 发布jar包
create resource jar C:\Users\yangswa\Desktop\word_count.jar word_count.jar
12、 在服务端运行
odps:yangsw_test> jar -resources word_count.jar -classpath C:\Users\yangswa\Desktop\word_count.jar com.thomas.odps.WordCount
word_count word_count_result
Exception in thread "main" java.lang.RuntimeException: java.lang.NoSuchMethodException: com.thomas.odps.WordCount.main([Ljava
.lang.String;)
        at com.aliyun.odps.mapred.cli.Cli.main(Cli.java:47)
Caused by: java.lang.NoSuchMethodException: com.thomas.odps.WordCount.main([Ljava.lang.String;)
        at java.lang.Class.getMethod(Unknown Source)
        at com.aliyun.odps.mapred.cli.Cli.main(Cli.java:35)
这个居然报错:main方法不存在,在odps用户中心的售后服务中,提交了个工单,得到的反馈是ODPS MR功能还没有正式开放,要等到12月底才能正式使用。好吧,就先等等吧.

另外一个需要注意的地方时,-classpath要制定本地jar文件的位置,而不是只是指向资源。 

这篇关于ODPS MR开发 WordCount的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/973645

相关文章

这15个Vue指令,让你的项目开发爽到爆

1. V-Hotkey 仓库地址: github.com/Dafrok/v-ho… Demo: 戳这里 https://dafrok.github.io/v-hotkey 安装: npm install --save v-hotkey 这个指令可以给组件绑定一个或多个快捷键。你想要通过按下 Escape 键后隐藏某个组件,按住 Control 和回车键再显示它吗?小菜一碟: <template

Hadoop企业开发案例调优场景

需求 (1)需求:从1G数据中,统计每个单词出现次数。服务器3台,每台配置4G内存,4核CPU,4线程。 (2)需求分析: 1G / 128m = 8个MapTask;1个ReduceTask;1个mrAppMaster 平均每个节点运行10个 / 3台 ≈ 3个任务(4    3    3) HDFS参数调优 (1)修改:hadoop-env.sh export HDFS_NAMENOD

嵌入式QT开发:构建高效智能的嵌入式系统

摘要: 本文深入探讨了嵌入式 QT 相关的各个方面。从 QT 框架的基础架构和核心概念出发,详细阐述了其在嵌入式环境中的优势与特点。文中分析了嵌入式 QT 的开发环境搭建过程,包括交叉编译工具链的配置等关键步骤。进一步探讨了嵌入式 QT 的界面设计与开发,涵盖了从基本控件的使用到复杂界面布局的构建。同时也深入研究了信号与槽机制在嵌入式系统中的应用,以及嵌入式 QT 与硬件设备的交互,包括输入输出设

OpenHarmony鸿蒙开发( Beta5.0)无感配网详解

1、简介 无感配网是指在设备联网过程中无需输入热点相关账号信息,即可快速实现设备配网,是一种兼顾高效性、可靠性和安全性的配网方式。 2、配网原理 2.1 通信原理 手机和智能设备之间的信息传递,利用特有的NAN协议实现。利用手机和智能设备之间的WiFi 感知订阅、发布能力,实现了数字管家应用和设备之间的发现。在完成设备间的认证和响应后,即可发送相关配网数据。同时还支持与常规Sof

活用c4d官方开发文档查询代码

当你问AI助手比如豆包,如何用python禁止掉xpresso标签时候,它会提示到 这时候要用到两个东西。https://developers.maxon.net/论坛搜索和开发文档 比如这里我就在官方找到正确的id描述 然后我就把参数标签换过来

Linux_kernel驱动开发11

一、改回nfs方式挂载根文件系统         在产品将要上线之前,需要制作不同类型格式的根文件系统         在产品研发阶段,我们还是需要使用nfs的方式挂载根文件系统         优点:可以直接在上位机中修改文件系统内容,延长EMMC的寿命         【1】重启上位机nfs服务         sudo service nfs-kernel-server resta

【区块链 + 人才服务】区块链集成开发平台 | FISCO BCOS应用案例

随着区块链技术的快速发展,越来越多的企业开始将其应用于实际业务中。然而,区块链技术的专业性使得其集成开发成为一项挑战。针对此,广东中创智慧科技有限公司基于国产开源联盟链 FISCO BCOS 推出了区块链集成开发平台。该平台基于区块链技术,提供一套全面的区块链开发工具和开发环境,支持开发者快速开发和部署区块链应用。此外,该平台还可以提供一套全面的区块链开发教程和文档,帮助开发者快速上手区块链开发。

Vue3项目开发——新闻发布管理系统(六)

文章目录 八、首页设计开发1、页面设计2、登录访问拦截实现3、用户基本信息显示①封装用户基本信息获取接口②用户基本信息存储③用户基本信息调用④用户基本信息动态渲染 4、退出功能实现①注册点击事件②添加退出功能③数据清理 5、代码下载 八、首页设计开发 登录成功后,系统就进入了首页。接下来,也就进行首页的开发了。 1、页面设计 系统页面主要分为三部分,左侧为系统的菜单栏,右侧

v0.dev快速开发

探索v0.dev:次世代开发者之利器 今之技艺日新月异,开发者之工具亦随之进步不辍。v0.dev者,新兴之开发者利器也,迅速引起众多开发者之瞩目。本文将引汝探究v0.dev之基本功能与优势,助汝速速上手,提升开发之效率。 何谓v0.dev? v0.dev者,现代化之开发者工具也,旨在简化并加速软件开发之过程。其集多种功能于一体,助开发者高效编写、测试及部署代码。无论汝为前端开发者、后端开发者

pico2 开发环境搭建-基于ubuntu

pico2 开发环境搭建-基于ubuntu 安装编译工具链下载sdk 和example编译example 安装编译工具链 sudo apt install cmake gcc-arm-none-eabi libnewlib-arm-none-eabi libstdc++-arm-none-eabi-newlib 注意cmake的版本,需要在3.17 以上 下载sdk 和ex