MongoDB CDC 导入 Elasticsearch

2024-08-29 08:20

本文主要是介绍MongoDB CDC 导入 Elasticsearch,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、docker-compose

version: '3'
services:mongo:image: "mongo:4.0-xenial"command: --replSet rs0 --smallfiles --oplogSize 128ports:- "27017:27017"environment:- MONGO_INITDB_ROOT_USERNAME=mongouser- MONGO_INITDB_ROOT_PASSWORD=mongopwelasticsearch:image: elastic/elasticsearch:7.6.0environment:- cluster.name=docker-cluster- bootstrap.memory_lock=true- "ES_JAVA_OPTS=-Xms512m -Xmx512m"- discovery.type=single-nodeports:- "9200:9200"- "9300:9300"ulimits:memlock:soft: -1hard: -1nofile:soft: 65536hard: 65536kibana:image: elastic/kibana:7.6.0ports:- "5601:5601"

二、进入 MongoDB 容器,初始化副本集和数据

docker-compose exec mongo /usr/bin/mongo -u mongouser -p mongopw
// 1. 初始化副本集
rs.initiate();
rs.status();// 2. 切换数据库
use mgdb;// 3. 初始化数据
db.orders.insertMany([{order_id: 101,order_date: ISODate("2020-07-30T10:08:22.001Z"),customer_id: 1001,price: NumberDecimal("50.50"),product: {name: 'scooter',description: 'Small 2-wheel scooter'},order_status: false},{order_id: 102, order_date: ISODate("2020-07-30T10:11:09.001Z"),customer_id: 1002,price: NumberDecimal("15.00"),product: {name: 'car battery',description: '12V car battery'},order_status: false},{order_id: 103,order_date: ISODate("2020-07-30T12:00:30.001Z"),customer_id: 1003,price: NumberDecimal("25.25"),product: {name: 'hammer',description: '16oz carpenter hammer'},order_status: false}
]);db.customers.insertMany([{ customer_id: 1001, name: 'Jark', address: 'Hangzhou' },{ customer_id: 1002, name: 'Sally',address: 'Beijing'},{ customer_id: 1003,name: 'Edward',address: 'Shanghai'}
]);

三、下载以下 jar 包到 <FLINK_HOME>/lib/

下载链接只对已发布的版本有效, SNAPSHOT 版本需要本地编译

  • flink-sql-connector-elasticsearch7-3.0.1-1.17.jar
  • flink-sql-connector-mongodb-cdc-2.4.0.jar

 四、然后启动 Flink 集群,再启动 SQL CLI.

-- Flink SQL
-- 设置间隔时间为3秒                       
Flink SQL> SET execution.checkpointing.interval = 3s;-- 设置本地时区为 Asia/Shanghai
Flink SQL> SET table.local-time-zone = Asia/Shanghai;Flink SQL> CREATE TABLE orders (_id STRING,order_id INT,order_date TIMESTAMP_LTZ(3),customer_id INT,price DECIMAL(10, 5),product ROW<name STRING, description STRING>,order_status BOOLEAN,PRIMARY KEY (_id) NOT ENFORCED) WITH ('connector' = 'mongodb-cdc','hosts' = 'localhost:27017','username' = 'mongouser','password' = 'mongopw','database' = 'mgdb','collection' = 'orders');Flink SQL> CREATE TABLE customers (_id STRING,customer_id INT,name STRING,address STRING,PRIMARY KEY (_id) NOT ENFORCED) WITH ('connector' = 'mongodb-cdc','hosts' = 'localhost:27017','username' = 'mongouser','password' = 'mongopw','database' = 'mgdb','collection' = 'customers');Flink SQL> CREATE TABLE enriched_orders (order_id INT,order_date TIMESTAMP_LTZ(3),customer_id INT,price DECIMAL(10, 5),product ROW<name STRING, description STRING>,order_status BOOLEAN,customer_name STRING,customer_address STRING,PRIMARY KEY (order_id) NOT ENFORCED) WITH ('connector' = 'elasticsearch-7','hosts' = 'http://localhost:9200','index' = 'enriched_orders');Flink SQL> INSERT INTO enriched_ordersSELECT o.order_id,o.order_date,o.customer_id,o.price,o.product,o.order_status,c.name,c. addressFROM orders AS oLEFT JOIN customers AS c ON o.customer_id = c.customer_id;

五、修改 MongoDB 里面的数据,观察 elasticsearch 里的结果 

db.orders.insert({order_id: 104,order_date: ISODate("2020-07-30T12:00:30.001Z"),customer_id: 1004,price: NumberDecimal("25.25"),product: {name: 'rocks',description: 'box of assorted rocks'},order_status: false
});db.customers.insert({customer_id: 1004,name: 'Jacob',address: 'Shanghai'
});db.orders.updateOne({ order_id: 104 },{ $set: { order_status: true } }
);db.orders.deleteOne({ order_id : 104 }
);

这篇关于MongoDB CDC 导入 Elasticsearch的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1117340

相关文章

ElasticSearch+Kibana通过Docker部署到Linux服务器中操作方法

《ElasticSearch+Kibana通过Docker部署到Linux服务器中操作方法》本文介绍了Elasticsearch的基本概念,包括文档和字段、索引和映射,还详细描述了如何通过Docker... 目录1、ElasticSearch概念2、ElasticSearch、Kibana和IK分词器部署

Go Mongox轻松实现MongoDB的时间字段自动填充

《GoMongox轻松实现MongoDB的时间字段自动填充》这篇文章主要为大家详细介绍了Go语言如何使用mongox库,在插入和更新数据时自动填充时间字段,从而提升开发效率并减少重复代码,需要的可以... 目录前言时间字段填充规则Mongox 的安装使用 Mongox 进行插入操作使用 Mongox 进行更

Java实现Elasticsearch查询当前索引全部数据的完整代码

《Java实现Elasticsearch查询当前索引全部数据的完整代码》:本文主要介绍如何在Java中实现查询Elasticsearch索引中指定条件下的全部数据,通过设置滚动查询参数(scrol... 目录需求背景通常情况Java 实现查询 Elasticsearch 全部数据写在最后需求背景通常情况下

使用Python实现操作mongodb详解

《使用Python实现操作mongodb详解》这篇文章主要为大家详细介绍了使用Python实现操作mongodb的相关知识,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录一、示例二、常用指令三、遇到的问题一、示例from pymongo import MongoClientf

使用MongoDB进行数据存储的操作流程

《使用MongoDB进行数据存储的操作流程》在现代应用开发中,数据存储是一个至关重要的部分,随着数据量的增大和复杂性的增加,传统的关系型数据库有时难以应对高并发和大数据量的处理需求,MongoDB作为... 目录什么是MongoDB?MongoDB的优势使用MongoDB进行数据存储1. 安装MongoDB

Java操作ElasticSearch的实例详解

《Java操作ElasticSearch的实例详解》Elasticsearch是一个分布式的搜索和分析引擎,广泛用于全文搜索、日志分析等场景,本文将介绍如何在Java应用中使用Elastics... 目录简介环境准备1. 安装 Elasticsearch2. 添加依赖连接 Elasticsearch1. 创

Python数据处理之导入导出Excel数据方式

《Python数据处理之导入导出Excel数据方式》Python是Excel数据处理的绝佳工具,通过Pandas和Openpyxl等库可以实现数据的导入、导出和自动化处理,从基础的数据读取和清洗到复杂... 目录python导入导出Excel数据开启数据之旅:为什么Python是Excel数据处理的最佳拍档

Python模块导入的几种方法实现

《Python模块导入的几种方法实现》本文主要介绍了Python模块导入的几种方法实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学... 目录一、什么是模块?二、模块导入的基本方法1. 使用import整个模块2.使用from ... i

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

python: 多模块(.py)中全局变量的导入

文章目录 global关键字可变类型和不可变类型数据的内存地址单模块(单个py文件)的全局变量示例总结 多模块(多个py文件)的全局变量from x import x导入全局变量示例 import x导入全局变量示例 总结 global关键字 global 的作用范围是模块(.py)级别: 当你在一个模块(文件)中使用 global 声明变量时,这个变量只在该模块的全局命名空