本文主要是介绍Spark——Spark读写Greenplum/Greenplum-Spark Connector高速写Greenplum,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
文章目录
- 问题背景
- 解决方式
- 代码实现
- Spark写Greenplum
- Spark读Greenplum
- 参考
问题背景
通过数据平台上的DataX把Hive表数据同步至Greenplum(因为DataX原生不支持Greenplum Writer,只能采用PostgreSQL驱动的方式),但是同步速度太慢了,<100Kb/s(DataX服务器和Greenplum服务器都在内网,实测服务器间传输文件速率可以达到170Mb/s+),根本没法用。
解决方式
查看Greenplum官网,给出了以下几种将外部数据写入Greenplum方式:
- JDBC:JDBC方式,写大数据量会很慢。
- gpload:适合写大数据量数据,能并行写入。但其缺点是需要安装客户端,包括gpfdist等依赖,安装起来很麻烦。需要了解可以参考gpload。
- Greenplum-Spark Connector:基于Spark并行处理,并行写入Greenplum,并提供了并行读取的接口。
而我们之前采用的PostgreSQL驱动的方式就是因为使用了JDBC,导致写入速度非常慢。综合官网提供的这3中方式,我们最终选择了Greenplum-Spark Connector这种方式,但是只提供了Spark2.3版本支持,其他版本未验证过。
Greenplum-Spark Connector具体的读写架构和流程,请参考Greenplum官网文档:https://cn.greenplum.org/greenplum-spark-connector/。
代码实现
Greenplum-Spark Connector需要引入两个依赖包:
- greenplum-spark_2.11-2.3.0.jar
- postgresql-42.2.27.jar
greenplum-spark_2.11-2.3.0.jar无法通过Maven自动下载,需要到上面网址手动下载,且要先注册网址账号才允许下载。
Spark写Greenplum
代码实现:
package com.demoimport org.apache.spark.sql.{SaveMode, SparkSession}import java.time.LocalDateTime
import java.time.format.DateTimeFormatterobject SparkWriteGreenplum {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("Spark to Greenplum").enableHiveSupport().getOrCreate()spark.sparkContext.setLogLevel("INFO")// main函数传参数获取表名val tableName = args(0)val days = args(1).toLong/** spark写greenplum *///Greenplum配置信息val gscWriteOptionMap = Map("url" -> "jdbc:postgresql://host:5432/db","user" -> "u","password" -> "p","dbschema" -> "schema","dbtable" -> "table")// Hiv表分区val ds = LocalDateTime.now().minusDays(days).format(DateTimeFormatter.ofPattern("yyyyMMdd"))// 读取Hive表val df = spark.sql("select * from db." + tableName + " where ds = " + ds)// Dataframe写Greenplumdf.write.format("greenplum").mode(SaveMode.Overwrite).options(gscWriteOptionMap).save()spark.stop()}
}
最终以4个executor、每个executor 1核1G执行Spark任务,1400w+条数据,3分钟左右就导完了,效果提升非常明显。
Spark读Greenplum
// spark读greenplumval gscReadOptionMap = Map("url" -> "jdbc:postgresql://host:5432/db","user" -> "u","password" -> "p","dbschema" -> "sc","dbtable" -> "table")val df: DataFrame = spark.read.format("greenplum").options(gscReadOptionMap).load()df.show()
参考
- https://cn.greenplum.org/greenplum-spark-connector/
- https://greenplum-spark-connector.readthedocs.io/en/latest/Write-data-from-Spark-into-Greenplum.html
- https://network.pivotal.io/products/vmware-greenplum#/releases/1427678/file_groups/17497
这篇关于Spark——Spark读写Greenplum/Greenplum-Spark Connector高速写Greenplum的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!