如何使用Spark大规模并行构建索引

2024-05-15 03:38

本文主要是介绍如何使用Spark大规模并行构建索引,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

使用Spark构建索引非常简单,因为spark提供了更高级的抽象rdd分布式弹性数据集,相比以前的使用Hadoop的MapReduce来构建大规模索引,Spark具有更灵活的api操作,性能更高,语法更简洁等一系列优点。 

先看下,整体的拓扑图: 





然后,再来看下,使用scala写的spark程序: 

Java代码   收藏代码
  1. package com.easy.build.index  
  2.   
  3. import java.util  
  4.   
  5. import org.apache.solr.client.solrj.beans.Field  
  6. import org.apache.solr.client.solrj.impl.HttpSolrClient  
  7. import org.apache.spark.rdd.RDD  
  8. import org.apache.spark.{SparkConf, SparkContext}  
  9.   
  10. import scala.annotation.meta.field  
  11. /** 
  12.   * Created by qindongliang on 2016/1/21. 
  13.   */  
  14.   
  15. //注册model,时间类型可以为字符串,只要后台索引配置为Long即可,注解映射形式如下  
  16. case class Record(  
  17.                    @(Field@field)("rowkey")     rowkey:String,  
  18.                    @(Field@field)("title")  title:String,  
  19.                    @(Field@field)("content") content:String,  
  20.                    @(Field@field)("isdel") isdel:String,  
  21.                    @(Field@field)("t1") t1:String,  
  22.                    @(Field@field)("t2")t2:String,  
  23.                    @(Field@field)("t3")t3:String,  
  24.                    @(Field@field)("dtime") dtime:String  
  25.   
  26.   
  27.                  )  
  28.   
  29. /*** 
  30.   * Spark构建索引==>Solr 
  31.   */  
  32. object SparkIndex {  
  33.   
  34.   //solr客户端  
  35.   val client=new  HttpSolrClient("http://192.168.1.188:8984/solr/monitor");  
  36.   //批提交的条数  
  37.   val batchCount=10000;  
  38.   
  39.   def main2(args: Array[String]) {  
  40.   
  41.     val d1=new Record("row1","title","content","1","01","57","58","3");  
  42.     val d2=new Record("row2","title","content","1","01","57","58","45");  
  43.     val d3=new Record("row3","title","content","1","01","57","58",null);  
  44.     client.addBean(d1);  
  45.     client.addBean(d2)  
  46.     client.addBean(d3)  
  47.     client.commit();  
  48.     println("提交成功!")  
  49.   
  50.   
  51.   }  
  52.   
  53.   
  54.   /*** 
  55.     * 迭代分区数据(一个迭代器集合),然后进行处理 
  56.     * @param lines 处理每个分区的数据 
  57.     */  
  58.   def  indexPartition(lines:scala.Iterator[String] ): Unit ={  
  59.           //初始化集合,分区迭代开始前,可以初始化一些内容,如数据库连接等  
  60.           val datas = new util.ArrayList[Record]()  
  61.           //迭代处理每条数据,符合条件会提交数据  
  62.           lines.foreach(line=>indexLineToModel(line,datas))  
  63.           //操作分区结束后,可以关闭一些资源,或者做一些操作,最后一次提交数据  
  64.           commitSolr(datas,true);  
  65.   }  
  66.   
  67.   /*** 
  68.     *  提交索引数据到solr中 
  69.     * 
  70.     * @param datas 索引数据 
  71.     * @param isEnd 是否为最后一次提交 
  72.     */  
  73.   def commitSolr(datas:util.ArrayList[Record],isEnd:Boolean): Unit ={  
  74.           //仅仅最后一次提交和集合长度等于批处理的数量时才提交  
  75.           if ((datas.size()>0&&isEnd)||datas.size()==batchCount) {  
  76.             client.addBeans(datas);  
  77.             client.commit(); //提交数据  
  78.             datas.clear();//清空集合,便于重用  
  79.           }  
  80.   }  
  81.   
  82.   
  83.   /*** 
  84.     * 得到分区的数据具体每一行,并映射 
  85.     * 到Model,进行后续索引处理 
  86.     * 
  87.     * @param line 每行具体数据 
  88.     * @param datas 添加数据的集合,用于批量提交索引 
  89.     */  
  90.   def indexLineToModel(line:String,datas:util.ArrayList[Record]): Unit ={  
  91.     //数组数据清洗转换  
  92.     val fields=line.split("\1",-1).map(field =>etl_field(field))  
  93.     //将清洗完后的数组映射成Tuple类型  
  94.     val tuple=buildTuble(fields)  
  95.     //将Tuple转换成Bean类型  
  96.     val recoder=Record.tupled(tuple)  
  97.     //将实体类添加至集合,方便批处理提交  
  98.     datas.add(recoder);  
  99.     //提交索引到solr  
  100.     commitSolr(datas,false);  
  101.   }  
  102.   
  103.   
  104.   /*** 
  105.     * 将数组映射成Tuple集合,方便与Bean绑定 
  106.     * @param array field集合数组 
  107.     * @return tuple集合 
  108.     */  
  109.   def buildTuble(array: Array[String]):(String, String, String, String, String, String, String, String)={  
  110.      array match {  
  111.        case Array(s1, s2, s3, s4, s5, s6, s7, s8) => (s1, s2, s3, s4, s5, s6, s7,s8)  
  112.      }  
  113.   }  
  114.   
  115.   
  116.   /*** 
  117.     *  对field进行加工处理 
  118.     * 空值替换为null,这样索引里面就不会索引这个字段 
  119.     * ,正常值就还是原样返回 
  120.     * 
  121.     * @param field 用来走特定规则的数据 
  122.     * @return 映射完的数据 
  123.     */  
  124.   def etl_field(field:String):String={  
  125.     field match {  
  126.       case "" => null  
  127.       case _ => field  
  128.     }  
  129.   }  
  130.   
  131.   /*** 
  132.     * 根据条件清空某一类索引数据 
  133.     * @param query 删除的查询条件 
  134.     */  
  135.   def deleteSolrByQuery(query:String): Unit ={  
  136.     client.deleteByQuery(query);  
  137.     client.commit()  
  138.     println("删除成功!")  
  139.   }  
  140.   
  141.   
  142.   def main(args: Array[String]) {  
  143.     //根据条件删除一些数据  
  144.     deleteSolrByQuery("t1:03")  
  145.     //远程提交时,需要提交打包后的jar  
  146.     val jarPath = "target\\spark-build-index-1.0-SNAPSHOT.jar";  
  147.     //远程提交时,伪装成相关的hadoop用户,否则,可能没有权限访问hdfs系统  
  148.     System.setProperty("user.name""webmaster");  
  149.     //初始化SparkConf  
  150.     val conf = new SparkConf().setMaster("spark://192.168.1.187:7077").setAppName("build index ");  
  151.     //上传运行时依赖的jar包  
  152.     val seq = Seq(jarPath) :+ "D:\\tmp\\lib\\noggit-0.6.jar" :+ "D:\\tmp\\lib\\httpclient-4.3.1.jar" :+ "D:\\tmp\\lib\\httpcore-4.3.jar" :+ "D:\\tmp\\lib\\solr-solrj-5.1.0.jar" :+ "D:\\tmp\\lib\\httpmime-4.3.1.jar"  
  153.     conf.setJars(seq)  
  154.     //初始化SparkContext上下文  
  155.     val sc = new SparkContext(conf);  
  156.     //此目录下所有的数据,将会被构建索引,格式一定是约定好的  
  157.     val rdd = sc.textFile("hdfs://192.168.1.187:9000/user/monitor/gs/");  
  158.     //通过rdd构建索引  
  159.     indexRDD(rdd);  
  160.     //关闭索引资源  
  161.     client.close();  
  162.     //关闭SparkContext上下文  
  163.     sc.stop();  
  164.   
  165.   
  166.   }  
  167.   
  168.   
  169.   /*** 
  170.     * 处理rdd数据,构建索引 
  171.     * @param rdd 
  172.     */  
  173.   def indexRDD(rdd:RDD[String]): Unit ={  
  174.     //遍历分区,构建索引  
  175.     rdd.foreachPartition(line=>indexPartition(line));  
  176.   }  
  177.   
  178.   
  179.   
  180. }  


ok,至此,我们的建索引程序就写完了,本例子中用的是远程提交模式,实际上它也可以支持spark on yarn (cluster 或者 client )  模式,不过此时需要注意的是,不需要显式指定setMaster的值,而由提交任务时,通过--master来指定运行模式,另外,依赖的相关jar包,也需要通过--jars参数来提交到集群里面,否则的话,运行时会报异常,最后看下本例子里面的solr是单机模式的,所以使用spark建索引提速并没有达到最大值,真正能发挥最大威力的是,多台search集群正如我画的架构图里面,每台机器是一个shard,这就是solrcloud的模式,或者在elasticsearch里面的集群shard,这样以来,才能真正达到高效批量的索引构建 

这篇关于如何使用Spark大规模并行构建索引的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/990701

相关文章

C语言中联合体union的使用

本文编辑整理自: http://bbs.chinaunix.net/forum.php?mod=viewthread&tid=179471 一、前言 “联合体”(union)与“结构体”(struct)有一些相似之处。但两者有本质上的不同。在结构体中,各成员有各自的内存空间, 一个结构变量的总长度是各成员长度之和。而在“联合”中,各成员共享一段内存空间, 一个联合变量

Spring Cloud:构建分布式系统的利器

引言 在当今的云计算和微服务架构时代,构建高效、可靠的分布式系统成为软件开发的重要任务。Spring Cloud 提供了一套完整的解决方案,帮助开发者快速构建分布式系统中的一些常见模式(例如配置管理、服务发现、断路器等)。本文将探讨 Spring Cloud 的定义、核心组件、应用场景以及未来的发展趋势。 什么是 Spring Cloud Spring Cloud 是一个基于 Spring

Tolua使用笔记(上)

目录   1.准备工作 2.运行例子 01.HelloWorld:在C#中,创建和销毁Lua虚拟机 和 简单调用。 02.ScriptsFromFile:在C#中,对一个lua文件的执行调用 03.CallLuaFunction:在C#中,对lua函数的操作 04.AccessingLuaVariables:在C#中,对lua变量的操作 05.LuaCoroutine:在Lua中,

Vim使用基础篇

本文内容大部分来自 vimtutor,自带的教程的总结。在终端输入vimtutor 即可进入教程。 先总结一下,然后再分别介绍正常模式,插入模式,和可视模式三种模式下的命令。 目录 看完以后的汇总 1.正常模式(Normal模式) 1.移动光标 2.删除 3.【:】输入符 4.撤销 5.替换 6.重复命令【. ; ,】 7.复制粘贴 8.缩进 2.插入模式 INSERT

mysql索引四(组合索引)

单列索引,即一个索引只包含单个列,一个表可以有多个单列索引,但这不是组合索引;组合索引,即一个索引包含多个列。 因为有事,下面内容全部转自:https://www.cnblogs.com/farmer-cabbage/p/5793589.html 为了形象地对比单列索引和组合索引,为表添加多个字段:    CREATE TABLE mytable( ID INT NOT NULL, use

mysql索引三(全文索引)

前面分别介绍了mysql索引一(普通索引)、mysql索引二(唯一索引)。 本文学习mysql全文索引。 全文索引(也称全文检索)是目前搜索引擎使用的一种关键技术。它能够利用【分词技术】等多种算法智能分析出文本文字中关键词的频率和重要性,然后按照一定的算法规则智能地筛选出我们想要的搜索结果。 在MySql中,创建全文索引相对比较简单。例如:我们有一个文章表(article),其中有主键ID(

mysql索引二(唯一索引)

前文中介绍了MySQL中普通索引用法,和没有索引的区别。mysql索引一(普通索引) 下面学习一下唯一索引。 创建唯一索引的目的不是为了提高访问速度,而只是为了避免数据出现重复。唯一索引可以有多个但索引列的值必须唯一,索引列的值允许有空值。如果能确定某个数据列将只包含彼此各不相同的值,在为这个数据列创建索引的时候就应该使用关键字UNIQUE,把它定义为一个唯一索引。 添加数据库唯一索引的几种

mysql索引一(普通索引)

mysql的索引分为两大类,聚簇索引、非聚簇索引。聚簇索引是按照数据存放的物理位置为顺序的,而非聚簇索引则不同。聚簇索引能够提高多行检索的速度、非聚簇索引则对单行检索的速度很快。         在这两大类的索引类型下,还可以降索引分为4个小类型:         1,普通索引:最基本的索引,没有任何限制,是我们经常使用到的索引。         2,唯一索引:与普通索引

Lipowerline5.0 雷达电力应用软件下载使用

1.配网数据处理分析 针对配网线路点云数据,优化了分类算法,支持杆塔、导线、交跨线、建筑物、地面点和其他线路的自动分类;一键生成危险点报告和交跨报告;还能生成点云数据采集航线和自主巡检航线。 获取软件安装包联系邮箱:2895356150@qq.com,资源源于网络,本介绍用于学习使用,如有侵权请您联系删除! 2.新增快速版,简洁易上手 支持快速版和专业版切换使用,快速版界面简洁,保留主

如何免费的去使用connectedpapers?

免费使用connectedpapers 1. 打开谷歌浏览器2. 按住ctrl+shift+N,进入无痕模式3. 不需要登录(也就是访客模式)4. 两次用完,关闭无痕模式(继续重复步骤 2 - 4) 1. 打开谷歌浏览器 2. 按住ctrl+shift+N,进入无痕模式 输入网址:https://www.connectedpapers.com/ 3. 不需要登录(也就是