ES 数据写入方式:直连 VS Flink 集成系统

2024-05-16 08:52

本文主要是介绍ES 数据写入方式:直连 VS Flink 集成系统,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

ES 作为一个分布式搜索引擎,从扩展能力和搜索特性上而言无出其右,然而它有自身的弱势存在,其作为近实时存储系统,由于其分片和复制的设计原理,也使其在数据延迟和一致性方面都是无法和 OLTP(Online Transaction Processing)系统相媲美的。

也正因如此,通常它的数据都来源于其他存储系统同步而来,做二次过滤和分析的。这就引入了一个关键节点,即 ES 数据的同步写入方式,本文介绍的则是 MySQL 同步 ES 方式。

将 MySQL 数据写入 ES,首先想到的一定是消费 Binlog 直连 ES 写入,这种方式简单明了,然而如果稍微考量维度多一点,就会发现该方式的一些弊端。因此还有另外一个方式,即【RocketMQ + Flink Consumer + ES Bulk】集成生态,我们将从同步延迟、消费特性,ES 写入性能、系统容灾能力四个方面评估这两种接入方式,希望给到大家灵感并选择适合业务的同步方式。

ES 基础写入原理

ES 写入属于追加式写入,先形成特定大小的 Segment,然后定时 Merge 小数据段为大数据段以减少内存碎片,提升查询效率的过程。一个 Index 由 N 个 Shard 及其副本构成,存储了同一种 Type 类型的 Documents,由 Mapping 定义了其索引方式,每一个 Shard 由 N 个 Segment 组成,每个 Shard 都是一个全功能且完整的 Lucene 索引,它是 ES 的最小处理单元;Segment 是 ES 最小的数据处理单位,每个 Segment 都是一个独立的倒排索引。

ES 写入其实是不断将数据写入到同一个 Segment(内存),然后触发 Refresh 刷新,将 Segment 刷新到 OS Cache(默认 1s),此时数据就可以查询到了,OS Cache 会由操作系统触发 Flush 操作持久化到磁盘。

引发思考:ES 是如何保证数据不丢失的呢?追加式写入的优劣点是什么?追加式写入是如何处理数据更新问题的?MySQL 是属于哪种写入方式呢?本文重点不在此处,大家可以另行查阅文章。

ES 基本概念

ES 写入过程

ES 直连写入

采用 ES 直连写入的优点是因为路径短,依赖组件少,加上 Dsyncer(异构存储转换系统)通常已经提供了完善的限流重试机制,所以消费延迟和消费的数据完整性都是可以保证的。

缺点:

  1. 不易于接入多机房容灾部署,目前 ES 容灾机房都属于独立部署,独立读写模式,所以如果采用该方式,则难以同时对多机房写入分别做管控,达不到容灾效果。Binlog-->Dsyncer 通常一个 MySQL Table 对应一个转换任务,如果为了写多机房起多个重复的转换任务,则显得有些愚笨。

  2. 如果自身业务场景有对同一条记录并发写场景,但写不一定全部来源于 Binlog 的情况下,那全局考虑直写 ES 则更容易遇到写入冲突问题,因为缺乏有序队列的保障。

通过 Flink 搭建 ES 集成系统

Flink 搭建 ES 集成系统,则指的是所有的 ES 写入都由 Flink 任务完成,Flink 监听 RocketMQ 实时数据流,既保证了数据的分区有序性,又充分利用了 ES 的批量写入能力,ES 的批量写入能力比单条写入性能高出多倍。同时由于 Flink 本身的容错性,即使在异常场景下,也能保证数据的最终一致性。

优点

  1. 通过 MQ 可以更快捷的接入多机房 ES 集群,写入解耦,三机房分别起消费者写入数据,彼此独立,当出现单机房故障时,只要有可用机房,直接处理读流量切流即可,容灾方案简单清晰

  2. 网络抖动等问题会导致 ES 暂时性写入失败时,不影响其他集群写入的情况下,RocketMQ 会暂存消息,Flink 会保存消费快照,不断重试直至成功,更好的保障了数据最终一致性

  3. 多数据源写入能保证全局分区一致性。

缺点

  1. 依赖了更多组件,会增加全链路数据同步延迟,而 ES 默认的 Refresh 频率是每秒一次,经测试该链路正常情况下数据延迟都是秒级的,不是完全不可接受;

  2. 依赖了更多组件,对基础组件的稳定性有更高的要求,RocketMQ 异常,或者 Flink 任务异常都会导致同步链路出现问题,增加一定的业务异常风险。

在这里需要注意的一个问题是有人可能会考虑接入多机房 ES 集群,是怎么保证多机房同时成功的、以及怎么保证写入成功后就可以查询得到?目前这两点暂时无法做到,因为多个机房都是独立写入的,互不影响,且 ES 集群属于弱数据一致性集群,无法保证写入成功立刻就能查到。

搭建并运行一个 ES Flink 消费程序的必备条件

  • Flink 运行环境:首先需要有 Flink 任务的运行环境,通常企业级的 Flink 任务会作为一个 YARN 作业在分布式系统中被调度并分配资源执行,但同时 Flink 也可作为单机进程,亦或搭建一个独立集群运行。

  • ES 消息格式:需要约定一种 ES 消息传输格式和序列化方式,一套范式解决所有同步场景,目前流行的序列化方式是 pb 格式或 json 格式,目前我们都是推荐使用 pb 格式的,数据格式 Schema 定义:

字段名

值类型

必需/可选

描述

_index

string

必需

文档要写入索引的名称或别名

_type

string

必需/可选

文档的类型

_op_type

string

必需

文档写入操作类型,取值范围: index, create, update, upsert, delete

_id

string

可选

文档 ID,不指定时写入 ES 会自动生成,但同一条数据被重复消费写入 ES 会生成多个文档

_routing

string

可选

文档路由,不指定时默认使用 _id 字段值路由

_version

int64

可选

文档版本,指定时大于 0 且仅操作为 index/delete 有效,默认使用 external_gte 版本类型

_source

object

必需/可选

文档内容,操作类型为 delete 时可不指定

_script

object

可选

文档脚本,操作类型为 update/upsert 时有效,但和 _source 不能同时存在

syntax = "proto3";message ESIndexInfo {string Name = 1;  // 文档要写入索引的名称或别名
}enum ESOPType { // 文档写入操作类型DELETE = 0; // 删除文档INDEX = 1;  // 创建新文档或更新老文档,只能全量更新 (替换老文档)UPDATE = 2; // 更新老文档,支持部分更新 (合并老文档)UPSERT = 3; // 创建新文档或更新老文档,支持部分更新 (合并老文档)CREATE = 4; // 创建新文档,存在时报错丢弃
}message ESDocAction {ESIndexInfo IndexInfo = 1; // 索引信息 (必需)ESOPType OPType = 2;       // 操作类型 (必需)string ID = 3;             // 文档 ID (可选)string Doc = 4;            // 文档内容 (JSON 格式, 删除操作时不需要)int64 Version = 5;         // 文档版本 (可选, 大于 0 且操作为 index/create/delete 有效)string Routing = 6;        // 文档路由 (可选, 非空有效)string Script = 7;         // 文档脚本 (JSON 格式, 操作类型为 update/upsert 有效,但和 Doc 不能同时存在)
}
  • Flink 任务必要配置:监听的 RocketMQ Topic 信息,写 ES 集群信息;

  • Flink 执行函数:Flink 处理流式消息有流式 SQL 和自定义应用程序两种方式,流式 SQL 约束于本身的一些限制,比如不支持同一个 MQ 有多个索引消息,而自定义编程更加灵活,比如添加各种打点,日志,错误码处理等,推荐该方式;

  • Flink 资源配置:JobManager 资源配置,TaskManager 资源配置等等;

  • Flink 自定义参数配置:可以自定义一些与应用程序紧密相关的动态配置,方便动态调节 Flink 消费能力,比如:

参数名

用途

默认值

job.writer.connector.bulk-flush.max-actions

单次 bulk 最大文档数,超过进行一次 flush (即执行一次 es 的 bulk 请求)

默认 300

job.writer.connector.bulk-flush.max-size

单次 bulk 最大字节数,超过进行一次 flush (即执行一次 es 的 bulk 请求)

默认 10MB

job.writer.connector.bulk-flush.interval

两次 bulk 最大间隔,超过进行一次 flush (即执行一次 es 的 bulk 请求)

默认 1000ms

job.writer.connector.global-rate-limit

全局写入限速值

默认 -1,不限速

job.writer.connector.failure-handler

指定自定义失败处理器,比如处理4xx错误,5xx错误的方式不同,429总是无限重试等;

global_parallelism_num

flink 任务全局并发度

rmq 是 queue/4,bmq/kafka 是 partition/3

max_parallelism_num

flink 任务最大并发度

mq 的 queue/partition 的个数

checkpoint_interval

创建 Checkpoint 的间隔,单位 ms (5min=300000)

默认 15min

checkpoint_timeout

创建 Checkpoint 的超时时间,单位 ms (5min=300000)

默认 10min

rebalance_enable

开启乱序消费

默认 false

对比建议

写入方式

同步延迟

写入特性

ES写入性能

消费者

容灾能力

直连

依赖组件少,延迟低

Binlog 单 key 有序

bulk写入

FaaS

较差

RocketMQ+Flink+ES

依赖组件多,延迟较高/秒级

全局单 key 有序

bulk写入

Flink

经过以上介绍如果业务在都可接受秒级延迟的条件下,使用 RocketMQ+Flink 的方式能够更好的实现有序性和容灾能力,Flink 在流式任务处理能力上也远优 FaaS,但是直连方式明显链路更加简洁,架构更加轻量,系统集成和维护成本较低,所以还是需要依照业务特性选择最适合的才是最好的。

这篇关于ES 数据写入方式:直连 VS Flink 集成系统的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/994447

相关文章

大模型研发全揭秘:客服工单数据标注的完整攻略

在人工智能(AI)领域,数据标注是模型训练过程中至关重要的一步。无论你是新手还是有经验的从业者,掌握数据标注的技术细节和常见问题的解决方案都能为你的AI项目增添不少价值。在电信运营商的客服系统中,工单数据是客户问题和解决方案的重要记录。通过对这些工单数据进行有效标注,不仅能够帮助提升客服自动化系统的智能化水平,还能优化客户服务流程,提高客户满意度。本文将详细介绍如何在电信运营商客服工单的背景下进行

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

关于数据埋点,你需要了解这些基本知识

产品汪每天都在和数据打交道,你知道数据来自哪里吗? 移动app端内的用户行为数据大多来自埋点,了解一些埋点知识,能和数据分析师、技术侃大山,参与到前期的数据采集,更重要是让最终的埋点数据能为我所用,否则可怜巴巴等上几个月是常有的事。   埋点类型 根据埋点方式,可以区分为: 手动埋点半自动埋点全自动埋点 秉承“任何事物都有两面性”的道理:自动程度高的,能解决通用统计,便于统一化管理,但个性化定

使用SecondaryNameNode恢复NameNode的数据

1)需求: NameNode进程挂了并且存储的数据也丢失了,如何恢复NameNode 此种方式恢复的数据可能存在小部分数据的丢失。 2)故障模拟 (1)kill -9 NameNode进程 [lytfly@hadoop102 current]$ kill -9 19886 (2)删除NameNode存储的数据(/opt/module/hadoop-3.1.4/data/tmp/dfs/na

异构存储(冷热数据分离)

异构存储主要解决不同的数据,存储在不同类型的硬盘中,达到最佳性能的问题。 异构存储Shell操作 (1)查看当前有哪些存储策略可以用 [lytfly@hadoop102 hadoop-3.1.4]$ hdfs storagepolicies -listPolicies (2)为指定路径(数据存储目录)设置指定的存储策略 hdfs storagepolicies -setStoragePo

Hadoop集群数据均衡之磁盘间数据均衡

生产环境,由于硬盘空间不足,往往需要增加一块硬盘。刚加载的硬盘没有数据时,可以执行磁盘数据均衡命令。(Hadoop3.x新特性) plan后面带的节点的名字必须是已经存在的,并且是需要均衡的节点。 如果节点不存在,会报如下错误: 如果节点只有一个硬盘的话,不会创建均衡计划: (1)生成均衡计划 hdfs diskbalancer -plan hadoop102 (2)执行均衡计划 hd

【Prometheus】PromQL向量匹配实现不同标签的向量数据进行运算

✨✨ 欢迎大家来到景天科技苑✨✨ 🎈🎈 养成好习惯,先赞后看哦~🎈🎈 🏆 作者简介:景天科技苑 🏆《头衔》:大厂架构师,华为云开发者社区专家博主,阿里云开发者社区专家博主,CSDN全栈领域优质创作者,掘金优秀博主,51CTO博客专家等。 🏆《博客》:Python全栈,前后端开发,小程序开发,人工智能,js逆向,App逆向,网络系统安全,数据分析,Django,fastapi

内核启动时减少log的方式

内核引导选项 内核引导选项大体上可以分为两类:一类与设备无关、另一类与设备有关。与设备有关的引导选项多如牛毛,需要你自己阅读内核中的相应驱动程序源码以获取其能够接受的引导选项。比如,如果你想知道可以向 AHA1542 SCSI 驱动程序传递哪些引导选项,那么就查看 drivers/scsi/aha1542.c 文件,一般在前面 100 行注释里就可以找到所接受的引导选项说明。大多数选项是通过"_

Android平台播放RTSP流的几种方案探究(VLC VS ExoPlayer VS SmartPlayer)

技术背景 好多开发者需要遴选Android平台RTSP直播播放器的时候,不知道如何选的好,本文针对常用的方案,做个大概的说明: 1. 使用VLC for Android VLC Media Player(VLC多媒体播放器),最初命名为VideoLAN客户端,是VideoLAN品牌产品,是VideoLAN计划的多媒体播放器。它支持众多音频与视频解码器及文件格式,并支持DVD影音光盘,VCD影

烟火目标检测数据集 7800张 烟火检测 带标注 voc yolo

一个包含7800张带标注图像的数据集,专门用于烟火目标检测,是一个非常有价值的资源,尤其对于那些致力于公共安全、事件管理和烟花表演监控等领域的人士而言。下面是对此数据集的一个详细介绍: 数据集名称:烟火目标检测数据集 数据集规模: 图片数量:7800张类别:主要包含烟火类目标,可能还包括其他相关类别,如烟火发射装置、背景等。格式:图像文件通常为JPEG或PNG格式;标注文件可能为X