Ignite集成Spark之IgniteDataFrames

2024-04-30 13:38

本文主要是介绍Ignite集成Spark之IgniteDataFrames,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Ignite是一个分布式的内存数据库、缓存和处理平台,为事务型、分析型和流式负载而设计,在保证扩展性的前提下提供了内存级的性能。

Spark是一个流式数据和计算引擎,通常从HDFS或者其他存储中获取数据,一直以来,他都倾向于OLAP型业务,并且聚焦于MapReduce类型负载。

因此,这两种技术是可以互补的。

将Ignite与Spark整合

整合这两种技术会为Spark用户带来若干明显的好处:

  • 通过避免大量的数据移动,获得真正可扩展的内存级性能;
  • 提高RDD、DataFrame和SQL的性能;
  • 在Spark作业之间更方便地共享状态和数据。

下图中显示了如何整合这两种技术,并且标注了显著的优势: 

在第一篇文章中,主要聚焦于IgniteRDD,而本文会聚焦于IgniteDataFrames。

IgniteDataframes

Spark的DataFrame API为描述数据引入了模式的概念,Spark通过表格的形式进行模式的管理和数据的组织。

DataFrame是一个组织为命名列形式的分布式数据集,从概念上讲,DataFrame等同于关系数据库中的表,并允许Spark使用Catalyst查询优化器来生成高效的查询执行计划。而RDD只是跨集群节点分区化的元素集合。

Ignite扩展了DataFrames,简化了开发,改进了将Ignite作为Spark的内存存储时的数据访问时间,好处包括:

  • 通过Ignite读写DataFrames时,可以在Spark作业之间共享数据和状态;
  • 通过优化Spark的查询执行计划加快SparkSQL查询,这些主要是通过IgniteSQL引擎的高级索引以及避免了Ignite和Spark之间的网络数据移动实现的。

IgniteDataframes示例

下面通过一些代码以及搭建几个小程序的方式,了解Ignite DataFrames如何使用,如果想实际运行这些代码,可以从GitHub上下载。

一共会写两个Java的小应用,然后在IDE中运行,还会在这些Java应用中执行一些SQL。

一个Java应用会从JSON文件中读取一些数据,然后创建一个存储于Ignite的DataFrame,这个JSON文件Ignite的发行版中已经提供,另一个Java应用会从Ignite的DataFrame中读取数据然后使用SQL进行查询。

下面是写应用的代码:

public class DFWriter {private static final String CONFIG = "config/example-ignite.xml";public static void main(String args[]) {Ignite ignite = Ignition.start(CONFIG);SparkSession spark = SparkSession.builder().appName("DFWriter").master("local").config("spark.executor.instances", "2").getOrCreate();Logger.getRootLogger().setLevel(Level.OFF);Logger.getLogger("org.apache.ignite").setLevel(Level.OFF);Dataset<Row> peopleDF = spark.read().json(resolveIgnitePath("resources/people.json").getAbsolutePath());System.out.println("JSON file contents:");peopleDF.show();System.out.println("Writing DataFrame to Ignite.");peopleDF.write().format(IgniteDataFrameSettings.FORMAT_IGNITE()).option(IgniteDataFrameSettings.OPTION_CONFIG_FILE(), CONFIG).option(IgniteDataFrameSettings.OPTION_TABLE(), "people").option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS(), "id").option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PARAMETERS(), "template=replicated").save();System.out.println("Done!");Ignition.stop(false);}
}

DFWriter中,首先创建了SparkSession,它包含了应用名,之后会使用spark.read().json()读取JSON文件并且输出文件内容,下一步是将数据写入Ignite存储。下面是DFReader的代码:

public class DFReader {private static final String CONFIG = "config/example-ignite.xml";public static void main(String args[]) {Ignite ignite = Ignition.start(CONFIG);SparkSession spark = SparkSession.builder().appName("DFReader").master("local").config("spark.executor.instances", "2").getOrCreate();Logger.getRootLogger().setLevel(Level.OFF);Logger.getLogger("org.apache.ignite").setLevel(Level.OFF);System.out.println("Reading data from Ignite table.");Dataset<Row> peopleDF = spark.read().format(IgniteDataFrameSettings.FORMAT_IGNITE()).option(IgniteDataFrameSettings.OPTION_CONFIG_FILE(), CONFIG).option(IgniteDataFrameSettings.OPTION_TABLE(), "people").load();peopleDF.createOrReplaceTempView("people");Dataset<Row> sqlDF = spark.sql("SELECT * FROM people WHERE id > 0 AND id < 6");sqlDF.show();System.out.println("Done!");Ignition.stop(false);}
}

DFReader中,初始化和配置与DFWriter相同,这个应用会执行一些过滤,需求是查找所有的id > 0 以及 < 6的人,然后输出结果。

在IDE中,通过下面的代码可以启动一个Ignite节点:

public class ExampleNodeStartup {public static void main(String[] args) throws IgniteException {Ignition.start("config/example-ignite.xml");}
}

到此,就可以对代码进行测试了。

运行应用

首先在IDE中启动一个Ignite节点,然后运行DFWriter应用,输出如下:

JSON file contents:
+-------------------+---+------------------+
|         department| id|              name|
+-------------------+---+------------------+
|Executive Committee|  1|       Ivan Ivanov|
|Executive Committee|  2|       Petr Petrov|
|         Production|  3|          John Doe|
|         Production|  4|         Ann Smith|
|         Accounting|  5|    Sergey Smirnov|
|         Accounting|  6|Alexandra Sergeeva|
|                 IT|  7|         Adam West|
|        Head Office|  8|    Beverley Chase|
|        Head Office|  9|      Igor Rozhkov|
|                 IT| 10|Anastasia Borisova|
+-------------------+---+------------------+Writing DataFrame to Ignite.
Done!

如果将上面的结果与JSON文件的内容进行对比,会显示两者是一致的,这也是期望的结果。

下一步会运行DFReader,输出如下:

Reading data from Ignite table.
+-------------------+--------------+---+
|         DEPARTMENT|          NAME| ID|
+-------------------+--------------+---+
|Executive Committee|   Ivan Ivanov|  1|
|Executive Committee|   Petr Petrov|  2|
|         Production|      John Doe|  3|
|         Production|     Ann Smith|  4|
|         Accounting|Sergey Smirnov|  5|
+-------------------+--------------+---+Done!

这也是期望的输出。

总结

通过本文,会发现使用Ignite DataFrames是如何简单,这样就可以通过Ignite DataFrame进行数据的读写了。

未来,这些代码示例也会作为Ignite发行版的一部分进行发布。

关于Ignite和Spark的集成,内容就是这些了。

这篇关于Ignite集成Spark之IgniteDataFrames的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/949017

相关文章

Java中Springboot集成Kafka实现消息发送和接收功能

《Java中Springboot集成Kafka实现消息发送和接收功能》Kafka是一个高吞吐量的分布式发布-订阅消息系统,主要用于处理大规模数据流,它由生产者、消费者、主题、分区和代理等组件构成,Ka... 目录一、Kafka 简介二、Kafka 功能三、POM依赖四、配置文件五、生产者六、消费者一、Kaf

SpringCloud集成AlloyDB的示例代码

《SpringCloud集成AlloyDB的示例代码》AlloyDB是GoogleCloud提供的一种高度可扩展、强性能的关系型数据库服务,它兼容PostgreSQL,并提供了更快的查询性能... 目录1.AlloyDBjavascript是什么?AlloyDB 的工作原理2.搭建测试环境3.代码工程1.

SpringBoot操作spark处理hdfs文件的操作方法

《SpringBoot操作spark处理hdfs文件的操作方法》本文介绍了如何使用SpringBoot操作Spark处理HDFS文件,包括导入依赖、配置Spark信息、编写Controller和Ser... 目录SpringBoot操作spark处理hdfs文件1、导入依赖2、配置spark信息3、cont

SpringBoot使用注解集成Redis缓存的示例代码

《SpringBoot使用注解集成Redis缓存的示例代码》:本文主要介绍在SpringBoot中使用注解集成Redis缓存的步骤,包括添加依赖、创建相关配置类、需要缓存数据的类(Tes... 目录一、创建 Caching 配置类二、创建需要缓存数据的类三、测试方法Spring Boot 熟悉后,集成一个外

Docker集成CI/CD的项目实践

《Docker集成CI/CD的项目实践》本文主要介绍了Docker集成CI/CD的项目实践,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学... 目录一、引言1.1 什么是 CI/CD?1.2 docker 在 CI/CD 中的作用二、Docke

SpringBoot集成SOL链的详细过程

《SpringBoot集成SOL链的详细过程》Solanaj是一个用于与Solana区块链交互的Java库,它为Java开发者提供了一套功能丰富的API,使得在Java环境中可以轻松构建与Solana... 目录一、什么是solanaj?二、Pom依赖三、主要类3.1 RpcClient3.2 Public

SpringBoot3集成swagger文档的使用方法

《SpringBoot3集成swagger文档的使用方法》本文介绍了Swagger的诞生背景、主要功能以及如何在SpringBoot3中集成Swagger文档,Swagger可以帮助自动生成API文档... 目录一、前言1. API 文档自动生成2. 交互式 API 测试3. API 设计和开发协作二、使用

SpringBoot如何集成Kaptcha验证码

《SpringBoot如何集成Kaptcha验证码》本文介绍了如何在Java开发中使用Kaptcha生成验证码的功能,包括在pom.xml中配置依赖、在系统公共配置类中添加配置、在控制器中添加生成验证... 目录SpringBoot集成Kaptcha验证码简介实现步骤1. 在 pom.XML 配置文件中2.

【区块链 + 人才服务】区块链集成开发平台 | FISCO BCOS应用案例

随着区块链技术的快速发展,越来越多的企业开始将其应用于实际业务中。然而,区块链技术的专业性使得其集成开发成为一项挑战。针对此,广东中创智慧科技有限公司基于国产开源联盟链 FISCO BCOS 推出了区块链集成开发平台。该平台基于区块链技术,提供一套全面的区块链开发工具和开发环境,支持开发者快速开发和部署区块链应用。此外,该平台还可以提供一套全面的区块链开发教程和文档,帮助开发者快速上手区块链开发。

【Shiro】Shiro 的学习教程(三)之 SpringBoot 集成 Shiro

目录 1、环境准备2、引入 Shiro3、实现认证、退出3.1、使用死数据实现3.2、引入数据库,添加注册功能后端代码前端代码 3.3、MD5、Salt 的认证流程 4.、实现授权4.1、基于角色授权4.2、基于资源授权 5、引入缓存5.1、EhCache 实现缓存5.2、集成 Redis 实现 Shiro 缓存 1、环境准备 新建一个 SpringBoot 工程,引入依赖: