Mongodb 开启oplog,java监听oplog并写入关系型数据库

2023-12-05 21:15

本文主要是介绍Mongodb 开启oplog,java监听oplog并写入关系型数据库,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

开启Oplog

windows mongodb bin目录下找到配置文件/bin/mongod.cfg,配置如下:

replication:replSetName: localoplogSizeMB: 1024

在这里插入图片描述
双击mongo.exe
在这里插入图片描述
在这里插入图片描述
执行

rs.initiate({_id: "local", members: [{_id: 0, host: "localhost:27017"}]})

若出现如下情况则成功

{"ok" : 1,"operationTime" : Timestamp(1627503341, 1),"$clusterTime" : {"clusterTime" : Timestamp(1627503341, 1),"signature" : {"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),"keyId" : NumberLong(0)}}
}

监听Oplog日志

pom

 	<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.10</version><relativePath/></parent><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-mongodb</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-jpa</artifactId></dependency><dependency><groupId>org.mongodb</groupId><artifactId>mongodb-driver</artifactId><version>3.12.7</version></dependency><dependency><groupId>com.vividsolutions</groupId><artifactId>jts</artifactId><version>1.13</version></dependency><dependency><groupId>org.hibernate</groupId><artifactId>hibernate-spatial</artifactId><version>5.3.0.Beta1</version></dependency><dependency><groupId>org.hibernate</groupId><artifactId>hibernate-java8</artifactId><version>5.3.0.Beta1</version></dependency><dependency><groupId>com.bedatadriven</groupId><artifactId>jackson-datatype-jts</artifactId><version>2.3</version></dependency><dependency><groupId>org.postgresql</groupId><artifactId>postgresql</artifactId><scope>runtime</scope></dependency>

配置

spring.datasource.driver-class-name=org.postgresql.Driver
spring.datasource.url=jdbc:postgresql://localhost:5432/databaseName?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&currentSchema=public
spring.datasource.username=postgres
spring.datasource.password=123456
spring.jpa.database=postgresql
spring.jpa.show-sql=true
spring.jpa.hibernate.ddl-auto=update
spring.jpa.properties.hibernate.dialect=org.hibernate.spatial.dialect.postgis.PostgisDialect
server.port=10050
spring.data.mongodb.uri=mongodb://admin:123456@localhost:27017/?authSource=admin
spring.data.mongodb.database=databseName

代码

  import com.mongodb.CursorType;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import com.mongodb.util.JSON;
import lombok.extern.slf4j.Slf4j;
import org.bson.BsonTimestamp;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;import javax.annotation.Resource;
import javax.persistence.EntityManager;
import javax.persistence.Query;@Slf4j
@Component
public class OplogListener implements ApplicationListener<ContextRefreshedEvent> {@Resourceprivate MongoTemplate mongoTemplate;@Resourceprivate EntityManager entityManager;@Overridepublic void onApplicationEvent(ContextRefreshedEvent event) {MongoDatabase db = mongoTemplate.getMongoDatabaseFactory().getMongoDatabase("local");MongoCollection<Document> oplog = db.getCollection("oplog.rs");BsonTimestamp startTS = getStartTimestamp();BsonTimestamp endTS = getEndTimestamp();Bson filter = Filters.and(Filters.gt("ts", startTS));MongoCursor<Document> cursor = oplog.find(filter).cursorType(CursorType.TailableAwait).iterator();while (true) {if (cursor.hasNext()) {Document doc = cursor.next();String operation = doc.getString("op");if (!"n".equals(operation)) {String namespace = doc.getString("ns");String[] nsParts = StringUtils.split(namespace, ".");String collectionName = nsParts[1];String databaseName = nsParts[0];Document object = (Document) doc.get("o");log.info("同步数据:databse-{}  collention-{}  data-{}", databaseName, collectionName, object);if ("i".equals(operation)) {insert((Document) doc.get("o"), databaseName, collectionName);} else if ("u".equals(operation)) {update((Document) doc.get("o"), (Document) doc.get("o2"), databaseName, collectionName);} else if ("d".equals(operation)) {delete((Document) doc.get("o"), databaseName, collectionName);}}}}}private BsonTimestamp getStartTimestamp() {long currentSeconds = System.currentTimeMillis() / 1000;return new BsonTimestamp((int) currentSeconds, 1);}private BsonTimestamp getEndTimestamp() {return new BsonTimestamp(0, 1);}private void insert(Document object, String databaseName, String collectionName) {entityManager.getTransaction().begin();try {String json = JSON.serialize(object);Query query = entityManager.createNativeQuery("INSERT INTO " + collectionName + " (json) VALUES (:json)");query.setParameter("json", json);query.executeUpdate();entityManager.getTransaction().commit();} catch (Exception e) {entityManager.getTransaction().rollback();throw new RuntimeException(e);}}private void update(Document object, Document update, String databaseName, String collectionName) {entityManager.getTransaction().begin();try {String json = JSON.serialize(object);String updateJson = JSON.serialize(update);Query query = entityManager.createNativeQuery("UPDATE " + collectionName + " SET json = :json WHERE json = :updateJson");query.setParameter("json", json);query.setParameter("updateJson", updateJson);query.executeUpdate();entityManager.getTransaction().commit();} catch (Exception e) {entityManager.getTransaction().rollback();throw new RuntimeException(e);}}private void delete(Document object, String databaseName, String collectionName) {entityManager.getTransaction().begin();try {String json = JSON.serialize(object);Query query = entityManager.createNativeQuery("DELETE FROM " + collectionName + " WHERE json = :json");query.setParameter("json", json);query.executeUpdate();entityManager.getTransaction().commit();} catch (Exception e) {entityManager.getTransaction().rollback();throw new RuntimeException(e);}}
}

这篇关于Mongodb 开启oplog,java监听oplog并写入关系型数据库的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/459166

相关文章

Java 正则表达式URL 匹配与源码全解析

《Java正则表达式URL匹配与源码全解析》在Web应用开发中,我们经常需要对URL进行格式验证,今天我们结合Java的Pattern和Matcher类,深入理解正则表达式在实际应用中... 目录1.正则表达式分解:2. 添加域名匹配 (2)3. 添加路径和查询参数匹配 (3) 4. 最终优化版本5.设计思

Java使用ANTLR4对Lua脚本语法校验详解

《Java使用ANTLR4对Lua脚本语法校验详解》ANTLR是一个强大的解析器生成器,用于读取、处理、执行或翻译结构化文本或二进制文件,下面就跟随小编一起看看Java如何使用ANTLR4对Lua脚本... 目录什么是ANTLR?第一个例子ANTLR4 的工作流程Lua脚本语法校验准备一个Lua Gramm

Java字符串操作技巧之语法、示例与应用场景分析

《Java字符串操作技巧之语法、示例与应用场景分析》在Java算法题和日常开发中,字符串处理是必备的核心技能,本文全面梳理Java中字符串的常用操作语法,结合代码示例、应用场景和避坑指南,可快速掌握字... 目录引言1. 基础操作1.1 创建字符串1.2 获取长度1.3 访问字符2. 字符串处理2.1 子字

Java Optional的使用技巧与最佳实践

《JavaOptional的使用技巧与最佳实践》在Java中,Optional是用于优雅处理null的容器类,其核心目标是显式提醒开发者处理空值场景,避免NullPointerExce... 目录一、Optional 的核心用途二、使用技巧与最佳实践三、常见误区与反模式四、替代方案与扩展五、总结在 Java

基于Java实现回调监听工具类

《基于Java实现回调监听工具类》这篇文章主要为大家详细介绍了如何基于Java实现一个回调监听工具类,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录监听接口类 Listenable实际用法打印结果首先,会用到 函数式接口 Consumer, 通过这个可以解耦回调方法,下面先写一个

使用Java将DOCX文档解析为Markdown文档的代码实现

《使用Java将DOCX文档解析为Markdown文档的代码实现》在现代文档处理中,Markdown(MD)因其简洁的语法和良好的可读性,逐渐成为开发者、技术写作者和内容创作者的首选格式,然而,许多文... 目录引言1. 工具和库介绍2. 安装依赖库3. 使用Apache POI解析DOCX文档4. 将解析

Java字符串处理全解析(String、StringBuilder与StringBuffer)

《Java字符串处理全解析(String、StringBuilder与StringBuffer)》:本文主要介绍Java字符串处理全解析(String、StringBuilder与StringBu... 目录Java字符串处理全解析:String、StringBuilder与StringBuffer一、St

springboot整合阿里云百炼DeepSeek实现sse流式打印的操作方法

《springboot整合阿里云百炼DeepSeek实现sse流式打印的操作方法》:本文主要介绍springboot整合阿里云百炼DeepSeek实现sse流式打印,本文给大家介绍的非常详细,对大... 目录1.开通阿里云百炼,获取到key2.新建SpringBoot项目3.工具类4.启动类5.测试类6.测

数据库面试必备之MySQL中的乐观锁与悲观锁

《数据库面试必备之MySQL中的乐观锁与悲观锁》:本文主要介绍数据库面试必备之MySQL中乐观锁与悲观锁的相关资料,乐观锁适用于读多写少的场景,通过版本号检查避免冲突,而悲观锁适用于写多读少且对数... 目录一、引言二、乐观锁(一)原理(二)应用场景(三)示例代码三、悲观锁(一)原理(二)应用场景(三)示例

Spring Boot循环依赖原理、解决方案与最佳实践(全解析)

《SpringBoot循环依赖原理、解决方案与最佳实践(全解析)》循环依赖指两个或多个Bean相互直接或间接引用,形成闭环依赖关系,:本文主要介绍SpringBoot循环依赖原理、解决方案与最... 目录一、循环依赖的本质与危害1.1 什么是循环依赖?1.2 核心危害二、Spring的三级缓存机制2.1 三