如何在MaxCompute上处理存储在OSS上的开源格式数据

2024-02-20 23:40

本文主要是介绍如何在MaxCompute上处理存储在OSS上的开源格式数据,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

为什么80%的码农都做不了架构师?>>>   hot3.png

前言

MaxCompute作为使用最广泛的大数据平台,内部存储的数据以EB量级计算。巨大的数据存储量以及大规模计算下高性能数据读写的需求,对于MaxCompute提出了各种高要求及挑战。处在大数据时代,数据的来源多种多样,开源社区经过十几年的发展,百花齐放,各种各样的数据格式不断的出现。 我们的用户也在各个场景上,通过各种计算框架,积累了各种不同格式的数据。怎样将MaxCompute强大的计算能力开放给这些使用开源格式存储沉淀下来的数据,在MaxCompute上挖掘这些数据中的信息,是MaxCompute团队希望解决的问题。

MaxCompute 2.0最近推出的非结构化计算框架【公测阶段】,旨在从存储介质和存储格式两个维度,打通计算与存储的通道。 在之前的文章中,我们已经介绍过怎样在MaxCompute上对存储在OSS上的文本,音频,图像等格式的数据,以及TableStore(OTS)的KV数据进行计算处理。在这里,则将介绍对于各种流行的开源数据格式(ORC, PARQUET, SEQUENCEFILE, RCFILE, AVRO, TEXTFILE等等),怎样将其存储在OSS上面,并通过非结构化框架在MaxCompute进行处理。

本着不重造轮子的原则,对于绝大部分这些开源数据格式的解析工作,在非结构化框架中会直接调用开源社区的实现,并且无缝的与MaxCompute系统做对接。

1. 创建EXTERNAL TABLE来绑定OSS外部数据

MaxCompute非结构化数据框架通过EXTERNAL TABLE的概念来提供MaxCompute与各种数据的联通,与读取OSS数据的使用方法类似,对OSS数据进行写操作,首先要通过CREATE EXTERNAL TABLE语句创建出一个外部表,而在读取开源数据格式时,创建外表的DDL语句格式如下:

DROP TABLE [IF EXISTS] <external_table>;CREATE EXTERNAL TABLE [IF NOT EXISTS] <external_table>
(<column schemas>)
[PARTITIONED BY (partition column schemas)]
[ROW FORMAT SERDE '<serde class>']
STORED AS <file format>
LOCATION 'oss://${accessKeyId}:${accessKeySecret}@${endpoint}/${bucket}/${userPath}/'

可以看到,这个语法与HIVE的语法是相当接近的,而在这个CREATE EXTERNAL TABLE的ddl语句中,有如下几点要说明:

  1. 首先要特别说明的是这里使用的是STORED AS的关键字,而不是普通非结构化外表用的STORED BY关键字,这也是目前在读取开源兼容数据时独有的。
  2. 外部表的<column schemas> 必须与具体OSS上存储存储数据的schema相符合。
  3. ROW FORMAT SERDE 并非必选选项,只有在使用一些特殊的格式上,比如TEXTFILE时才需要使用。
  4. STORED AS后面接的是文件格式名字, 比如 ORC/PARQUET/RCFILE/SEQUENCEFILE/TEXTFILE 等等。
  5. 最后还要提到的是,在上面这个例子中,我们在LOCATION上使用了OSS明文AK,这只适用于在用户对于AK的保密性不敏感情况下使用。 对于数据安全比较敏感的场景,比如在多用户场景或者弹外集群上,则推荐使用通过STS/RAM体系事先进行鉴权,从而避免使用明文AK。

1.1 范例1: 关联OSS上存储的PARQUET数据

现在再来看一个具体的例子,假设我们有一些PARQUET文件存放在一个OSS路径上,每个文件都是PARQUET格式,存放着schema为16列(4列BINGINT, 4列DOUBLE, 8列STRING)的数据,那么可以通过如下DDL语句来描述:

CREATE EXTERNAL TABLE tpch_lineitem_parquet
(l_orderkey bigint,l_partkey bigint,l_suppkey bigint,l_linenumber bigint,l_quantity double,l_extendedprice double,l_discount double,l_tax double,l_returnflag string,l_linestatus string,l_shipdate string,l_commitdate string,l_receiptdate string,l_shipinstruct string,l_shipmode string,l_comment string
)
STORED AS PARQUET
LOCATION 'oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/parquet_data/';

1.2 范例2:分区表关联OSS上存储的TEXTFILE数据

同样的数据,如果是每行以JSON格式,存储成OSS上TEXTFILE文件;同时,数据在OSS通过多个目录组织,这时是可以使用MaxCompute分区表和数据关联,则可以通过如下DDL语句来描述:

CREATE EXTERNAL TABLE tpch_lineitem_textfile
(l_orderkey bigint,l_partkey bigint,l_suppkey bigint,l_linenumber bigint,l_quantity double,l_extendedprice double,l_discount double,l_tax double,l_returnflag string,l_linestatus string,l_shipdate string,l_commitdate string,l_receiptdate string,l_shipinstruct string,l_shipmode string,l_comment string
)
PARTITIONED BY (ds string)
ROW FORMAT serde 'org.apache.hive.hcatalog.data.JsonSerDe'
STORED AS TEXTFILE
LOCATION 'oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data/';

如果OSS表目录下面的子目录是以Partition Name方式组织,比如:

oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data/ds=20170102/'
oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data/ds=20170103/'
...

则可以使用以下DDL语句ADD PARTITION:

ALTER TABLE tpch_lineitem_textfile ADD PARTITION(ds="20170102");
ALTER TABLE tpch_lineitem_textfile ADD PARTITION(ds="20170103");

如果OSS分区目录不是按这种方式组织,或者根本不在表目录下,比如:

oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data_20170102/;
oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data_20170103/;
...

则可以使用以下DDL语句ADD PARTITION:

ALTER TABLE tpch_lineitem_textfile ADD PARTITION(ds="20170102")
LOCATION 'oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data_20170102/';
ALTER TABLE tpch_lineitem_textfile ADD PARTITION(ds="20170103")
LOCATION 'oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data_20170103/';
...

2. 读取以及处理 OSS 上面的开源格式数据

对比上面的两个范例,可以看出对于不同文件类型,只要简单修改STORED AS后的格式名。在接下来的例子中,我们将只集中描述对上面PARQUET数据对应的外表(tpch_lineitem_parquet)的处理,如果要处理不同的文件类型,只要在DDL创建外表时指定是PARQUET/ORC/TEXTFILE/RCFILE/TEXTFILE即可,处理数据的语句则是一样的。

2.1 直接读取以及处理OSS上面的开源数据

在创建数据外表后,直接对外表就可以进行与普通MaxCompute表的操作,直接对存储在OSS上的数据进行处理,比如:

SELECT l_returnflag,l_linestatus,SUM(l_extendedprice*(1-l_discount)) AS sum_disc_price,AVG(l_quantity) AS avg_qty,COUNT(*) AS count_order
FROM tpch_lineitem_parquet
WHERE l_shipdate <= '1998-09-02'
GROUP BYl_returnflag,l_linestatus;

可以看到,在这里tpch_lineitem_parquet这个外表被当作一个普通的内部表一样使用。唯一不同的只是在MaxCompute内部计算引擎将从OSS上去读取对应的PARQUET数据来进行处理。

但是我们应该强调的是,在这里直接使用外表,每次读取的时候都需要涉及外部OSS的IO操作,并且MaxCompute系统本身针对内部存储做的许多高性能优化都用不上了,所以性能上会有所损失。 所以如果是需要对数据进行反复计算以及对计算的高效性比较敏感的场景上,我们推荐下面这种用法:先将数据导入MaxCompute内部,再进行计算。

注意,上面例子中的tpch_lineitem_textfile表,因为使用了ROW FORMAT + STORED AS,需要手动设置flag(只使用STORED AS,odps.sql.hive.compatible默认为TRUE),再进行读取,否则会有报错。

SELECT * FROM tpch_lineitem_textfile LIMIT 1;
FAILED: ODPS-0123131:User defined function exception - Traceback:
com.aliyun.odps.udf.UDFException: java.lang.ClassNotFoundException: com.aliyun.odps.hive.wrapper.HiveStorageHandlerWrapper--需要手动设置hive兼容flag
set odps.sql.hive.compatible=true;
SELECT * FROM tpch_lineitem_textfile LIMIT 1;
+------------+------------+------------+--------------+------------+-----------------+------------+------------+--------------+--------------+------------+--------------+---------------+----------------+------------+-----------+
| l_orderkey | l_partkey  | l_suppkey  | l_linenumber | l_quantity | l_extendedprice | l_discount | l_tax      | l_returnflag | l_linestatus | l_shipdate | l_commitdate | l_receiptdate | l_shipinstruct | l_shipmode | l_comment |
+------------+------------+------------+--------------+------------+-----------------+------------+------------+--------------+--------------+------------+--------------+---------------+----------------+------------+-----------+
| 5640000001 | 174458698  | 9458733    | 1            | 14.0       | 23071.58        | 0.08       | 0.06       | N            | O            | 1998-01-26 | 1997-11-16   | 1998-02-18    | TAKE BACK RETURN | SHIP       | cuses nag silently. quick |
+------------+------------+------------+--------------+------------+-----------------+------------+------------+--------------+--------------+------------+--------------+---------------+----------------+------------+-----------+

2.2 将OSS上的开源数据导入MaxCompute,再进行计算

  • 首先创建一个与外部表schema一样的内部表tpch_lineitem_internal,然后将OSS上的开源数据导入MaxCompute内部表,以cFile格式存储在MaxCompute内部:
CREATE TABLE tpch_lineitem_internal LIKE tpch_lineitem_parquet;INSERT OVERWRITE TABLE tpch_lineitem_internal
SELECT * FROM tpch_lineitem_parquet;
  • 直接就可以对内部表进行同样的操作:
SELECT l_returnflag,l_linestatus,SUM(l_extendedprice*(1-l_discount)) AS sum_disc_price,AVG(l_quantity) AS avg_qty,COUNT(*) AS count_order
FROM tpch_lineitem_internal
WHERE l_shipdate <= '1998-09-02'
GROUP BYl_returnflag,l_linestatus;

通过这样子将数据先导入系统的情况下,对同样数据的计算就会更高效得多。

4. 结语

开源的种种数据格式往往由各种数据处理生态产生,而MaxCompute非结构化数据处理框架通过实现计算与存储的互联,希望打通阿里云核心计算平台与各种数据的通路。在这个基础上,各种各样依赖于不同数据格式的应用,将能在MaxCompute计算平台上实现,后继我们会对一些具体的这种应用,比如基因计算等,再做一些具体的case study以及介绍。我们也欢迎有对开源数据进行处理分析的更多应用,能在MaxCompute强大计算能力的基础上开花结果。

原文链接

本文为云栖社区原创内容,未经允许不得转载。

转载于:https://my.oschina.net/yunqi/blog/1821933

这篇关于如何在MaxCompute上处理存储在OSS上的开源格式数据的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/729905

相关文章

Python将博客内容html导出为Markdown格式

《Python将博客内容html导出为Markdown格式》Python将博客内容html导出为Markdown格式,通过博客url地址抓取文章,分析并提取出文章标题和内容,将内容构建成html,再转... 目录一、为什么要搞?二、准备如何搞?三、说搞咱就搞!抓取文章提取内容构建html转存markdown

Python获取中国节假日数据记录入JSON文件

《Python获取中国节假日数据记录入JSON文件》项目系统内置的日历应用为了提升用户体验,特别设置了在调休日期显示“休”的UI图标功能,那么问题是这些调休数据从哪里来呢?我尝试一种更为智能的方法:P... 目录节假日数据获取存入jsON文件节假日数据读取封装完整代码项目系统内置的日历应用为了提升用户体验,

Python FastAPI+Celery+RabbitMQ实现分布式图片水印处理系统

《PythonFastAPI+Celery+RabbitMQ实现分布式图片水印处理系统》这篇文章主要为大家详细介绍了PythonFastAPI如何结合Celery以及RabbitMQ实现简单的分布式... 实现思路FastAPI 服务器Celery 任务队列RabbitMQ 作为消息代理定时任务处理完整

C#使用SQLite进行大数据量高效处理的代码示例

《C#使用SQLite进行大数据量高效处理的代码示例》在软件开发中,高效处理大数据量是一个常见且具有挑战性的任务,SQLite因其零配置、嵌入式、跨平台的特性,成为许多开发者的首选数据库,本文将深入探... 目录前言准备工作数据实体核心技术批量插入:从乌龟到猎豹的蜕变分页查询:加载百万数据异步处理:拒绝界面

C# WinForms存储过程操作数据库的实例讲解

《C#WinForms存储过程操作数据库的实例讲解》:本文主要介绍C#WinForms存储过程操作数据库的实例,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、存储过程基础二、C# 调用流程1. 数据库连接配置2. 执行存储过程(增删改)3. 查询数据三、事务处

Springboot处理跨域的实现方式(附Demo)

《Springboot处理跨域的实现方式(附Demo)》:本文主要介绍Springboot处理跨域的实现方式(附Demo),具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不... 目录Springboot处理跨域的方式1. 基本知识2. @CrossOrigin3. 全局跨域设置4.

Java利用JSONPath操作JSON数据的技术指南

《Java利用JSONPath操作JSON数据的技术指南》JSONPath是一种强大的工具,用于查询和操作JSON数据,类似于SQL的语法,它为处理复杂的JSON数据结构提供了简单且高效... 目录1、简述2、什么是 jsONPath?3、Java 示例3.1 基本查询3.2 过滤查询3.3 递归搜索3.4

MySQL大表数据的分区与分库分表的实现

《MySQL大表数据的分区与分库分表的实现》数据库的分区和分库分表是两种常用的技术方案,本文主要介绍了MySQL大表数据的分区与分库分表的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有... 目录1. mysql大表数据的分区1.1 什么是分区?1.2 分区的类型1.3 分区的优点1.4 分

Mysql删除几亿条数据表中的部分数据的方法实现

《Mysql删除几亿条数据表中的部分数据的方法实现》在MySQL中删除一个大表中的数据时,需要特别注意操作的性能和对系统的影响,本文主要介绍了Mysql删除几亿条数据表中的部分数据的方法实现,具有一定... 目录1、需求2、方案1. 使用 DELETE 语句分批删除2. 使用 INPLACE ALTER T

python+opencv处理颜色之将目标颜色转换实例代码

《python+opencv处理颜色之将目标颜色转换实例代码》OpenCV是一个的跨平台计算机视觉库,可以运行在Linux、Windows和MacOS操作系统上,:本文主要介绍python+ope... 目录下面是代码+ 效果 + 解释转HSV: 关于颜色总是要转HSV的掩膜再标注总结 目标:将红色的部分滤