Spar(k)ql:SPARQL evaluation method on spark GraphX

2023-11-23 03:21

本文主要是介绍Spar(k)ql:SPARQL evaluation method on spark GraphX,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

abstract:
由于灵活和简易,RDF变的越来越受欢迎的框架。查询大量的RDF数据也是面临的巨大的挑战。in this paper ,我们研究如何评估分布式系统中的sparql查询。我们提出了一中利用graphx图形分析工具对sparql查询评估的新方法。通过此工具,我们成功利用RDF语句的类图结构。
introduction:
P:查询大量的数据集变成严峻的挑战。statement called triples:subject-predicate-object or entity-attr
ibute-value。hadoop is it store  the temporary computational result on the disk instead  of the memory. it is too slow for processing querying. Spark system has overcome this issue with the Resilient Distributed Dataset concept which allow us to store object in the memory.
GraphX is an API for graph computation . problems can be solved with iterative graph programs.   each vertex of graph  is  a processor and they has the same program (receive ,process , send message)    
        main idea : use the graph computation framework of GraphX to evaluate SPARQL queries distributively on the RDF dataset. 
Relation work:

hadoop system: is the open source implementation of the MapReduce paradigm,like H2RDF,hadoopsparql.
use hadoop but it has another query language: Pig(Pigsparql),Impara(Sempala) .
problem:hadoop system uses a lot of disk I/O, it can slow.  solution: Spark, How to evaluate sematic queries on Spark? we mentioned that the semantic data seems like a graph and the evalution on SPARQL with distributed graph computing is a new research area.  like GraphLab:store the rdf:type and the rdf:label properties on the node and use iteration to answer query. this paper:store all the data properties on the nodes and check that before sending message.
Formal model:      
definition 1:(RDF Triple) I: IRIs, B: blank node, L: literals(文字) 任意分离的集合。tuple (s,p,o) ∈ (I B) X I X (I B L). in this paper we handle blank nodes as IRIs.     
definition  2:  (Triple pattern , basic graph pattern)  V(变量variable)任意的集合,与 I,L分离。tuplet t (I V) X I X (I L V) is a triple pattern ->is a Basic graph pattern.
          definition 3: (Property Graph)  PG(p) = (V,E,P) a property graph.  v:节点的集合,E:边的集合,P:图中的属性。     Pv(i):节点i的属性。 Pe(i,j):边(i,j)的属性。
          definition 4:(variable substitution) 设q是一个基本的图形模式。变量替换是从变量q,var(q)= {v |∃(s,p,v)∈ q or ∃(v,p,o)∈ q }映射到IRI和文字集,即r:var(q)→(I×L)。
           definition 5 : 查询结果设q是与SPARQL查询对应的基本图模式,T是RDF数据集。查询的结果是一组变量替换R = {r:var(q)→(I×L)} apply(r,q)⊆T,其中apply(r,q)= {(s,p,o)|(∃(v,p,o) ∈ (V × I × (I ∪ L)) : (v,p,o) ∈ q∧(r(v),p,o)=(s,p,o)) or(∃(s,p,v) ∈ (I×I× V :(s,p,v) ∈ q∧(s,p,r(v)) = (s,p,o))}.
Evaluation Model:
分布式图处理系统使用节点之间的message来计算最终结果。in this model:程序仅将消息发送给适当的邻居实现   query plan generator,以确定响应查询所需的message。该生成器为每次迭代创建消息传递计划。它计算消息的适当边缘(appropriate edges)和方向。Vertex program在迭代步骤中以不同方式处理传入消息:sometimes 消息是连接先前到达的,sometimes 两者的结合正在计算。
                    A,Graph loading:
将数据加载到内存中,,将n-triple文件转化为分离的文件便于spark和graphx的读取。在n-triple 中,rdf的声明(实体-属性-取值)存储在不同的行中,so,图的节点的IRI可能出现在多个行,必须先收集才能创建节点。如果声明的对象是个literal,则存储该值为对应该subject节点的属性。节点创建,边根据声明也创建,其中s和o为IRIs。

                B,node model:
两种properties类型,一中是将两个资源(IRIs)链接起来的对象object属性,一种是描述关于资源的literal文字信息的数据属性data property。对象属性将是图的边,数据属性将存储在图的节点中。rdf:type是一个将类或类型分配给主题的对象属性。将rdf:type存储为节点node的属性的原因是,大多数SPARQL查询都具有rdf:type的三元组模式。这些模式可以用来选择重要的节点。 因此,我们将rdf:type作为数据属性并将其值存储在节点中。  查询应答过程是通过顶点程序实现的,节点在数据属性旁边存储子结果表。每个节点都会从其邻居获取消息,并根据存储的信息和传入消息计算子结果。子结果存储在Map数据结构中。 Map的键是查询的变量,值是数据表。数据表包含对答案进行变量替换的约束。列标题是查询的变量。表的一行表示r替换是否将节点分配给变量,则r应将列的值分配给列标题中的变量,以便成为答案的一部分。
                C,Message model:
GraphX上的算法使用迭代来分析图。 在每次迭代中,它会检查所有活动的边。 当其中一个节点处于活动状态时,边缘处于活动状态。 节点在从邻居获得消息时处于活动状态。 P问题是当查询不是一个链时。对于这个问题,我们创建了一个特殊的消息(活着)。 该消息在每次迭代中发送到后面迭代中需要的边上。 有了这个消息,节点在处理之前一直处于活动状态。 当GraphX启动时,每个节点都处于活动状态,因为GraphX会向节点发送初始消息。
另一种类型的消息是评估消息。 这些消息有一个变量和一个子结果表。 该变量表示该迭代中目的地的变量。 subresult的头部是变量,主体存储可用值。 在发送源节点之前,将边缘edge三元组与其子结果结合在一起。 GraphX的另一个必要部分是顶点程序。 这个程序在获得消息后会在所有节点上运行,并且我们需要编写如何将两条消息合并到一条消息。 该节点只在顶点程序中获得一条合并消息。
如何解释两条消息的合并? 可用的解决方案是加入join和联合union。 一些消息是不同的结果,我们需要它们的联合union,但是一些消息需要加入join到以前的子结果。
                D,Query plan:
定义评估顺修, 在评估的每一步中,节点都将消息从一个节点发送到另一个节点。最后一步是最终的结果。我们创建一棵树tree。 在这棵树中,root根是最终节点。 节点是三元组模式的主体s和对象o变量或常量。 边是三元组模式的属性。此方法只能评估查询计划是树的查询。message由子节点转到父节点。
创建查询数,广度优先搜索算法。first,将SPARQL查询拆分为对象属性object properties和数据属性data properties(其中model中,数据属性存储在节点上,对象属性图的边)。创建树,算法只使用对象属性,评估的时候用到数据属性,会得到一个初始化节点为树的根节点。
如果查询中没有任何对象属性,则算法不需要在图上发送任何message,结果中具有所有的数据属性的节点。另一种情况下,需要将根节点存储到队列中,队列存储为处理的变量,还需要一个vars集合来存储所有的变量和常量。结果和子结果与变量存储在节点上,则变量为节点的替代物。在评估期间,一个节点可以存储多个变量和子结果。故需要一个vars集合,因为最终结果只存储在节点的替代节点是根节点的节点,并且子结果的变量与vars集中的变量相等。有结果中没有全部变量的情况,如果节点或者前节点没有查询中所有的链接,但是该节点有最终三元组模式的链接会造成这种情况。
如果查询中有对象属性 ,则算法使用广度优先搜索创建查询树,从队列中获取一个变量,查询在subject和object上位置上有此变量三元组模式,并且它未处理,因此它不在查询树中。之后,算法通过边将subject和object位置上的变量和常量存储到vars集合中并且创建platitem,platitem具有三元组模式和资源,资源决定message发送的方向。实际的变量将是树中的父节点。如果它在subject位置上,则在object位置上的变量或常量将是子节点并且此节点需要将子结果发给父节点。如果实际变量位于object位置上,则subject位置上的变量将是子节点,并将子结果传递给实际变量。之后,算法将新的变量(还没有进入)放入队列,并将完成的platitem添加到评估列表中。最后,设置三元组模式进行处理。in our model,评估在一次迭代中只使用一个三元组,父节点可能含有多个子节点,故允许算法在一次迭代中评估这些三元组模式。为此,需要处理不同类型的消息。这是重要的因为一些message需要union,有的需要join到其他中。

算法创建platitem序列后,addAcceptHeaders函数创造platitem的条件,这个条件决定着message需要什么条件。如果platitem的源节点在一次迭代中不包含子结果中的所有变量,它将不会包含在最终结果中。如果源节点在前面的迭代中获取正确的message,那么它的子结果包含所有的变量和常量。利用这种方法来减少message的数量。
算法2中,创建addAcceptHeaders。第一步,反转platitem的序列,因为树的创建是从根节点开始的,但是评估是从叶子节点开始的。
算法为headers创建映射map,存储在先前步骤中创建的条件。第一步,根据源变量和目标变量的集合创建一个联合union,初始化为空集。它添加源变量到目标变量的集合中。最后,它为platitem设置acceptHeader。
创建query plan的最后一步是创建GraphX需要的alive message。此消息需要保持边的活跃直到被需要。GraphX读取活动边,我们可以用边的节点创建消息。算法利用最新迭代中的三元组模式创建alive message。每次迭代中,我们需要与评估中剩下的迭代一样多的alive message。alive message的类型为三元组模式的属性。
               
                E,Evaluation the SPARQL Query:
sparkql评估算法。第一步,sparkql读取文件到内存,之后读取sparql查询并创建query plan。每个节点都会获得初始化的空message。message让节点处于活跃状态。在第一次迭代中,每个节点和边处于活跃。GraphX在边上迭代并当边的标签与query plan中给定的迭代相等时生成message。一次迭代会存储多个标签,因为迭代中包含评估message和alive message。之后,算法会根据节点的属性收集filters的边。我们只有在源节点具有先前迭代的子结果并且包含所有必要的变量的情况下发送message。如果节点不包含此子结果,它的结果不会成为最终结果的一部分。另一个filter是数据属性的检查,如果源或者目标节点不包含必要的属性,算法不会发送message。这个属性是由query plan 计算calculated的。如果节点不包含数据属性也不会成为最终结果的一部分。我们在节点上存储数据属性,但是不存储有关谓词是数据属性的信息。
如果三元组模式包含数据属性部分,query plan 不知道这是数据属性。对于此问题,我们需要创建循环边loop edges。当GraphX检查边时,我们检查节点的数据属性。如果在节点上数据属性存在,算法会创建message并通过loop edge发送。







这篇关于Spar(k)ql:SPARQL evaluation method on spark GraphX的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/414992

相关文章

BD错误集锦7——在集成Spring MVC + MyBtis时使用c3p0作为数据库时报错Method com/mchange/v2/c3p0/impl/NewProxyPreparedStatem

异常信息如下: Type Exception ReportMessage Handler dispatch failed; nested exception is java.lang.AbstractMethodError: Method com/mchange/v2/c3p0/impl/NewProxyPreparedStatement.isClosed()Z is abstractDescr

openfire+spark 在linux下安装,配置

文章转自:点击打开链接 相关软件下载 链接: https://pan.baidu.com/s/1boJs61h 密码: 2wd7 Openfire 在linux下安装和配置 + spark 在windows下配置 本机环境 系统:CentOS 6.7 64 位JDK 1.7 64 位MySQL 5.6 Openfir

任务5.1 初识Spark Streaming

实战概述:使用Spark Streaming进行词频统计 1. 项目背景与目标 背景: Spark Streaming是Apache Spark的流处理框架,用于构建可伸缩、高吞吐量的实时数据处理应用。目标: 实现一个实时词频统计系统,能够处理流式数据并统计文本中的单词出现频率。 2. 技术要点 Spark Streaming集成: 与Spark生态的其他组件如Spark SQL、ML

「Debug R」报错unable to find an inherited method for function是如何产生的

在一个群里看到这样一条报错,截图如下: 报错信息 当然这种问题解决起来也很快,无非就是把报错信息复制出来放在搜索引擎上,只不过你要挑选合适的搜索引擎。 百度 谷歌 必应 解决方案就是用dplyr::select。 虽然报错解决了,但是我还想着要重复出这个报错。因为只有能重复出报错,才能证明你不是运气好才解

abstract的method是否可同时是static,是否可同时是native,是否可同时是synchronized

1,abstract的method是否可同时是static,是否可同时是native,是否可同时是synchronized 都不可以,因为abstract申明的方法是要求子类去实现的,abstract只是告诉你有这样一个接口,你要去实现,至于你的具体实现可以是native和synchronized,也可以不是,抽象方法是不关心这些事的,所以写这两个是没有意义的。然后,static方法是不会被覆

Spark算子:RDD行动Action操作(3)–aggregate、fold、lookup

aggregate def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): U aggregate用户聚合RDD中的元素,先使用seqOp将RDD中每个分区中的T类型元素聚合成U类型,再使用combOp将之前每个分区聚合后的U类型聚合成U类型,特别注意se

Spark算子:RDDAction操作–first/count/reduce/collect/collectAsMap

first def first(): T first返回RDD中的第一个元素,不排序。 scala> var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[33] at mak

Spark算子:RDD键值转换操作(4)–cogroup/join

cogroup 函数原型:最多可以组合4个RDD,可以通过partitioner和numsPartitions设置 def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)], partitioner: Partitioner) :RDD[(K, (Iterable[V],

Spark算子:RDD键值转换操作(3)–groupByKey、reduceByKey、reduceByKeyLocally

groupByKey def groupByKey(): RDD[(K, Iterable[V])] def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] 该函数用于将RDD[K,V]中每个K对应

Spark算子:RDD键值转换操作(1)–partitionBy、mapValues、flatMapValues

partitionBy       def partitionBy(partitioner: Partitioner): RDD[(K, V)]       该函数根据partitioner函数生成新的ShuffleRDD,将原RDD重新分区。 scala> var rdd1 = sc.makeRDD(Array((1,"A"),(2,"B"),(3,"C"),(4,"D")),2)rd