mongodb-elasticsearch-rive源码解析

2024-02-08 00:40

本文主要是介绍mongodb-elasticsearch-rive源码解析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

MongoDBRiverPlugin

MongoDBRiverPlugin类是插件注册类,它继承自AbstractPlugin,其功能是

1.      在RiverModule中注册一个MongoDBRiver

2.      在RestModule中注册一个RestMongoDBRiverAction

package org.elasticsearch.plugin.river.mongodb;
import org.elasticsearch.plugins.AbstractPlugin;
import org.elasticsearch.rest.RestModule;
import org.elasticsearch.rest.action.mongodb.RestMongoDBRiverAction;
import org.elasticsearch.river.RiversModule;
import org.elasticsearch.river.mongodb.MongoDBRiver;
import org.elasticsearch.river.mongodb.MongoDBRiverModule;
/**
* @author flaper87 (Flavio Percoco Premoli)
* @author aparo (Alberto Paro)
* @author kryptt (Rodolfo Hansen)
*/
public class MongoDBRiverPlugin extends AbstractPlugin {
@Override
public String name() {
return MongoDBRiver.NAME;
}
@Override
public String description() {
return MongoDBRiver.DESCRIPTION;
}
/**
* Register the MongoDB river to Elasticsearch node
*
* @param module
*/
public void onModule(RiversModule module) {
module.registerRiver(MongoDBRiver.TYPE, MongoDBRiverModule.class);
}
/**
* Register the REST move to Elasticsearch node
*
* @param module
*/
public void onModule(RestModule module) {
module.addRestAction(RestMongoDBRiverAction.class);
}
}


MongoDBRiver

首先看river部分 org.elasticsearch.river.mongodb.MongoDBRiver是核心类,构造函数中都是都是elasticsearch 的配置信息和服务

参数类型
参数名称含义取值
RiverNameriverName名称 
RiverSettingssettings设置信息 
StringriverIndexName索引名 
Clientclient客户端 
ScriptServicescriptService 脚本服务 
MongoDBRiverDefinitiondefinition解析后的定义MongoDBRiverDefinition.parseSettings(riverName.name(),riverIndexName, settings, scriptService);

还有一个参数stream表示操作流,用来存储需要放在mongo oplog中的数据队列

BlockingQueue<QueueEntry> stream = 
definition.getThrottleSize() == -1 ? 
new LinkedTransferQueue<QueueEntry>() 
: new ArrayBlockingQueue<QueueEntry>(definition.getThrottleSize());

可以看到,如果definition中设定的阈值大小没有设定的话,使用一个链表数据结构作为队列,否则使用一个数组队列。不过两种情况使用的数据结构都是多线程使用的数据结构BlockingQueue阻塞队列。阻塞队列是用在“生产者-消费者”模式的主要数据结构,其作用是如果队列空,则消费者阻塞;如果队列满,则生产者阻塞。而且队列支持多个生产者和消费者线程。其中QueueEntry定义如下,其中Operation是一个枚举,包含了各种mongodb操作:INSERT,UPDATE, DELETE, DROP_COLLECTION, DROP_DATABASE, COMMAND, UNKNOWN;

protected static class QueueEntry {
private final DBObject data;
private final Operation operation;
private final Timestamp<?> oplogTimestamp;
private final String collection;
public QueueEntry(DBObject data, String collection) {
this(null, Operation.INSERT, data, collection);
}
public QueueEntry(Timestamp<?> oplogTimestamp, Operation oplogOperation, DBObject data, String collection) {
this.data = data;
this.operation = oplogOperation;
this.oplogTimestamp = oplogTimestamp;
this.collection = collection;
}
public boolean isOplogEntry() {
return oplogTimestamp != null;
}
public boolean isAttachment() {
return (data instanceof GridFSDBFile);
}
public DBObject getData() {
return data;
}
public Operation getOperation() {
return operation;
}
public Timestamp<?> getOplogTimestamp() {
return oplogTimestamp;
}
public String getCollection() {
return collection;
}
}
}

最后MongoDBRiver构造函数里面还有一个全局参数SharedContext context,这个参数包含了这个队列的引用,并且包含了整体运行状态的一个上下文状态:UNKNOWN, START_FAILED, RUNNING, STOPPED, IMPORT_FAILED,INITIAL_IMPORT_FAILED, SCRIPT_IMPORT_FAILED, RIVER_STALE;

this.context = new SharedContext(stream, Status.STOPPED);
初始化之后就可以,elasticsearch将通过start方法启动这个插件,启动逻辑如下:

* 首先是各种状态的检查:

     1、 用client获取elastic的状态,转成Status

client.prepareGet("_river", "mongodb-river", "_riverstatus").get()
XContentMapValues.extractValue("mongodb.status")
     2、如果状态是IMPORT_FAILED、INITIAL_IMPORT_FAILED、SCRIPT_IMPORT_FAILED Status.START_FAILED 或者 STOPPED;表示有问题,直接打印日志并返回
    

    3、 如果没有问题,则使用方法设定river为启动状态:

MongoDBRiverHelper.setRiverStatus(client, riverName.getName(), Status.RUNNING);
context.setStatus(Status.RUNNING);

    4、如果不存在索引则建立之
// Create the index if it does not exist
client.admin().indices().prepareCreate(definition.getIndexName()).get();

   5、 如果是GridFS要做一些额外的索引工作

client.admin().indices().preparePutMapping(definition.getIndexName()).setType(definition.getTypeName()).setSource(getGridFSMapping()).get();
    6、 然后我们开始启动相关的线程:

如果是mongos,就启动多个OpLog处理线程,否则使用一个线程,创建方式如下:

EsExecutors.daemonThreadFactory(settings.globalSettings(), "mongodb_river_slurper").newThread(new Slurper(definition.getMongoServers(), definition, context, client));
   7、启动之后再启动Indexer进程
EsExecutors.daemonThreadFactory(settings.globalSettings(),"mongodb_river_indexer").newThread(new Indexer(this, definition, context, client, scriptService));
   8、最后再启动一个状态监测进程:
EsExecutors.daemonThreadFactory(settings.globalSettings(), "mongodb_river_status").newThread(new StatusChecker(this, definition, context));

*  所以代码的核心就是三个线程:

收割 new Slurper(definition.getMongoServers(), definition, context,client)

索引处理 new Indexer(this, definition, context, client, scriptService)

状态检查 new StatusChecker(this, definition, context)

可以看到共同的参数都是:一个definition包含所有的配置,context包含了操作队列和状态


Slurper收割线程

其逻辑是:

1、  如果driver的状态是Running,则查找OpLog的信息并放入stream队列中

 

2、 如果无法获取oplogCollection队列,则退出线程failed to assign oplogCollection orslurpedCollection

3、  增量处理是按照上次注入时间点为查询条件的

cursor = oplogCursor(startTimestamp);
if (cursor == null) {cursor = processFullOplog();
}

查询条件是

filter.put(MongoDBRiver.OPLOG_TIMESTAMP,new BasicDBObject(QueryOperators.GTE, time));ts > time

4、获得数据库指针之后,处理每一个OpLog的数据

while (cursor.hasNext()) {DBObject item = cursor.next();startTimestamp = processOplogEntry(item, startTimestamp);
}

处理这些数据最后就是调用 addToStream 或 addInsertToStream 加入stream中

 

初始化导入

上面的过程只适合于从当前时间开始的数据,如果需要把原来的数据导入的话,还需要做一个initialimport

当程序配置满足一下条件的时候,才会在第一次运行该线程的时候进行初始化导入:

SkipInitialImport == false
InitialTimestamp == null // initial timestamp 为空
MongoDBRiver.getIndexCount(client, definition) == 0 // 没有index过
MongoDBRiver.getLastTimestamp(client, definition) == null;
Get the latest timestamp for a given namespace.

满足这些条件之后才会进行数据的初始化导入:初始化导入会查看一下设置,如果是ImportAllCollections,则检查每一个collection并注入否则,找出设定的collection并注入

核心代码是这样的:
if (!definition.isSkipInitialImport()) {if (!riverHasIndexedFromOplog() && definition.getInitialTimestamp() == null) {if (!isIndexEmpty()) {MongoDBRiverHelper.setRiverStatus(client, definition.getRiverName(), Status.INITIAL_IMPORT_FAILED);break;}if (definition.isImportAllCollections()) {for (String name : slurpedDb.getCollectionNames()) {DBCollection collection = slurpedDb.getCollection(name);startTimestamp = doInitialImport(collection);}} else {DBCollection collection = slurpedDb.getCollection(definition.getMongoCollection());startTimestamp = doInitialImport(collection);}}} else {logger.info("Skip initial import from collection {}", definition.getMongoCollection());}

 Indexer线程

其逻辑是:

1、如果driver的状态是Running,则从stream队列中获取信息并放入Index中

在构造函数初始化的时候会做一些MongoDBRiverBulkProcessor的创建 build:

  SimpleEntry<String, String> entry = new SimpleEntry<String, String>(index, type);if (!processors.containsKey(entry)) {processors.put(new SimpleEntry<String, String>(index, type), new MongoDBRiverBulkProcessor.Builder(river, definition, client,index, type).build());}return processors.get(entry);

然后在业务逻辑中读取entry,并processBlockingQueue processBlockingQueue就是根据不同的业务的内容做不同的处理,就是对不同的操作用相关的es api加以处理。

 // 1. Attempt to fill as much of the bulk request as possibleQueueEntry entry = context.getStream().take();lastTimestamp = processBlockingQueue(entry);while ((entry = context.getStream().poll(definition.getBulk().getFlushInterval().millis(), MILLISECONDS)) != null) {lastTimestamp = processBlockingQueue(entry);}// 2. Update the timestampif (lastTimestamp != null) {MongoDBRiver.setLastTimestamp(definition, lastTimestamp,getBulkProcessor(definition.getIndexName(), definition.getTypeName()).getBulkProcessor());}

StatusChecker

状态检查就是更具用户的命令进行开/关

 

就是检查elastic中的最新状态【用户设定的状态】:MongoDBRiverHelper.getRiverStatus(client, riverName);

如果状态和当前状态不一致,就进行driver的start或stop

 

用一个流程图来解释这几个线程之间的关系就是这样的:


RestModule

注册这个模块的作用是在原来es支持的rest api基础上,增加针对mongodb的新的api类型,具体实现可以参考一下这篇文章,这里不再赘述了:

http://elasticsearchserverbook.com/creating-custom-elasticsearch-rest-action/


参考文档:

https://github.com/richardwilly98/elasticsearch-river-mongodb

http://elasticsearchserverbook.com/creating-custom-elasticsearch-rest-action/

http://blog.csdn.net/vernonzheng/article/details/8247564

http://www.cnblogs.com/jackyuj/archive/2010/11/24/1886553.html

这篇关于mongodb-elasticsearch-rive源码解析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/689440

相关文章

网页解析 lxml 库--实战

lxml库使用流程 lxml 是 Python 的第三方解析库,完全使用 Python 语言编写,它对 XPath表达式提供了良好的支 持,因此能够了高效地解析 HTML/XML 文档。本节讲解如何通过 lxml 库解析 HTML 文档。 pip install lxml lxm| 库提供了一个 etree 模块,该模块专门用来解析 HTML/XML 文档,下面来介绍一下 lxml 库

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

【C++】_list常用方法解析及模拟实现

相信自己的力量,只要对自己始终保持信心,尽自己最大努力去完成任何事,就算事情最终结果是失败了,努力了也不留遗憾。💓💓💓 目录   ✨说在前面 🍋知识点一:什么是list? •🌰1.list的定义 •🌰2.list的基本特性 •🌰3.常用接口介绍 🍋知识点二:list常用接口 •🌰1.默认成员函数 🔥构造函数(⭐) 🔥析构函数 •🌰2.list对象

Java ArrayList扩容机制 (源码解读)

结论:初始长度为10,若所需长度小于1.5倍原长度,则按照1.5倍扩容。若不够用则按照所需长度扩容。 一. 明确类内部重要变量含义         1:数组默认长度         2:这是一个共享的空数组实例,用于明确创建长度为0时的ArrayList ,比如通过 new ArrayList<>(0),ArrayList 内部的数组 elementData 会指向这个 EMPTY_EL

如何在Visual Studio中调试.NET源码

今天偶然在看别人代码时,发现在他的代码里使用了Any判断List<T>是否为空。 我一般的做法是先判断是否为null,再判断Count。 看了一下Count的源码如下: 1 [__DynamicallyInvokable]2 public int Count3 {4 [__DynamicallyInvokable]5 get

工厂ERP管理系统实现源码(JAVA)

工厂进销存管理系统是一个集采购管理、仓库管理、生产管理和销售管理于一体的综合解决方案。该系统旨在帮助企业优化流程、提高效率、降低成本,并实时掌握各环节的运营状况。 在采购管理方面,系统能够处理采购订单、供应商管理和采购入库等流程,确保采购过程的透明和高效。仓库管理方面,实现库存的精准管理,包括入库、出库、盘点等操作,确保库存数据的准确性和实时性。 生产管理模块则涵盖了生产计划制定、物料需求计划、

OWASP十大安全漏洞解析

OWASP(开放式Web应用程序安全项目)发布的“十大安全漏洞”列表是Web应用程序安全领域的权威指南,它总结了Web应用程序中最常见、最危险的安全隐患。以下是对OWASP十大安全漏洞的详细解析: 1. 注入漏洞(Injection) 描述:攻击者通过在应用程序的输入数据中插入恶意代码,从而控制应用程序的行为。常见的注入类型包括SQL注入、OS命令注入、LDAP注入等。 影响:可能导致数据泄

从状态管理到性能优化:全面解析 Android Compose

文章目录 引言一、Android Compose基本概念1.1 什么是Android Compose?1.2 Compose的优势1.3 如何在项目中使用Compose 二、Compose中的状态管理2.1 状态管理的重要性2.2 Compose中的状态和数据流2.3 使用State和MutableState处理状态2.4 通过ViewModel进行状态管理 三、Compose中的列表和滚动

Spring 源码解读:自定义实现Bean定义的注册与解析

引言 在Spring框架中,Bean的注册与解析是整个依赖注入流程的核心步骤。通过Bean定义,Spring容器知道如何创建、配置和管理每个Bean实例。本篇文章将通过实现一个简化版的Bean定义注册与解析机制,帮助你理解Spring框架背后的设计逻辑。我们还将对比Spring中的BeanDefinition和BeanDefinitionRegistry,以全面掌握Bean注册和解析的核心原理。