【天衍系列 04】深入理解Flink的ElasticsearchSink组件:实时数据流如何无缝地流向Elasticsearch

本文主要是介绍【天衍系列 04】深入理解Flink的ElasticsearchSink组件:实时数据流如何无缝地流向Elasticsearch,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • 01 Elasticsearch Sink 基础概念
  • 02 Elasticsearch Sink 工作原理
  • 03 Elasticsearch Sink 核心组件
  • 04 Elasticsearch Sink 配置参数
  • 05 Elasticsearch Sink 依赖管理
  • 06 Elasticsearch Sink 初阶实战
  • 07 Elasticsearch Sink 进阶实战
    • 7.1 包结构 & 项目配置
      • 项目配置application.properties
      • 日志配置log4j2.properties
      • 项目pom.xml文件
    • 7.2 实体类ElasticsearchEntity
    • 7.3 客户端工厂类CustomRestClientFactory
    • 7.4 回调函数类CustomRequestConfigCallback
    • 7.5 客户端配置类CustomHttpClientConfigCallback
    • 7.6 Es操作类CustomElasticsearchSinkFunction
    • 7.7 异常处理类CustomActionRequestFailureHandler
    • 7.8 作业主类ElasticsearchSinkStreamJobAdvancedDemo

01 Elasticsearch Sink 基础概念

Flink的Elasticsearch Sink是用于将Flink数据流(DataStream)中的数据发送到Elasticsearch的组件。它是Flink的一个连接器(Connector),用于实现将实时处理的结果或数据持续地写入Elasticsearch集群中的索引中。

下面是一些关于Flink的Elasticsearch Sink的基础概念:

  1. 数据源(Source):Flink数据流的源头,可以是各种数据源,例如Kafka、文件系统、Socket等。Elasticsearch Sink通常是连接到Flink数据流的末端,用于将最终处理结果或数据写入Elasticsearch。
  2. Elasticsearch集群:一个或多个Elasticsearch节点的集合,用于存储和处理数据。Elasticsearch提供了分布式的数据存储和搜索功能。
  3. 索引(Index):在Elasticsearch中,索引是存储相关数据的地方,类似于关系数据库中的表。每个索引可以包含多个文档(Document),每个文档包含一个或多个字段(Field)。
  4. 文档(Document):在Elasticsearch中,文档是最小的数据单元。它们以JSON格式表示,并存储在索引中。
  5. Elasticsearch Sink:是Flink的一个数据接收器,用于将数据流中的数据发送到Elasticsearch集群中的特定索引。Sink负责将Flink数据流中的事件转换为Elasticsearch要求的格式,并将其发送到指定的索引。
  6. 序列化与映射:在将数据写入Elasticsearch之前,通常需要对数据进行序列化和映射。序列化是将数据从Flink的内部表示转换为Elasticsearch要求的JSON格式。映射则是定义如何将Flink数据流中的字段映射到Elasticsearch文档中的字段。
  7. 并行度控制:Elasticsearch Sink支持并行度控制,可以根据需要调整并发写入Elasticsearch的任务数量。这有助于优化性能并避免对Elasticsearch集群造成过大的负载。

总的来说,Flink的Elasticsearch Sink是一个关键的组件,用于将实时处理的结果或数据可靠地写入Elasticsearch中,从而支持各种实时数据分析和搜索应用。

02 Elasticsearch Sink 工作原理

Elasticsearch Sink 是 Apache Flink 提供的一个连接器,用于将 Flink 数据流中的数据发送到 Elasticsearch 集群中。以下是 Elasticsearch Sink 的工作原理:

  1. 数据流入 Flink 程序: 数据首先从外部数据源(如 Kafka、RabbitMQ、文件系统等)进入到 Flink 程序中。Flink 以流式处理的方式处理数据,这意味着数据会一条一条地进入 Flink 的数据流中。
  2. 数据转换与处理: 一旦数据进入 Flink,您可以对数据进行各种转换和处理。这可能包括数据清洗、转换、聚合、窗口操作等。在您的 Flink 程序中,您可以通过各种 Flink 的算子来实现这些转换和处理。
  3. Elasticsearch Sink 的配置: 当需要将数据写入 Elasticsearch 时,您需要配置 Elasticsearch Sink。这通常包括指定 Elasticsearch 集群的地址、端口、索引名称等信息。您还可以配置其他参数,例如批量写入的大小、超时时间等。
  4. 数据发送到 Elasticsearch: 一旦配置完成,Elasticsearch Sink 会将 Flink 数据流中的数据转换为 JSON 格式,并通过 Elasticsearch 的 REST API 将数据发送到指定的索引中。通常,Elasticsearch Sink 会将数据批量发送到 Elasticsearch,以提高写入的效率和性能。
  5. 序列化与映射: 在发送数据之前,通常需要将 Flink 数据流中的数据序列化为 JSON 格式,并根据 Elasticsearch 索引的映射规则进行字段映射。这确保了发送到 Elasticsearch 的数据与索引的结构一致。
  6. 容错与错误处理: Flink 提供了容错机制来确保数据的可靠性和一致性。如果在数据发送过程中发生错误,例如网络故障或 Elasticsearch 集群不可用,Flink 会自动进行故障恢复,并重新发送丢失的数据,以确保数据不会丢失。
  7. 性能优化: 为了提高性能,Elasticsearch Sink 可以通过调整批量写入的大小、并发度等参数来优化性能。这可以减少与 Elasticsearch 的通信开销,并提高写入的效率。

总的来说,Elasticsearch Sink 通过将 Flink 数据流中的数据转换为 JSON 格式,并利用 Elasticsearch 的 REST API 将数据发送到指定的索引中,实现了将实时流数据写入 Elasticsearch 的功能。

03 Elasticsearch Sink 核心组件

Elasticsearch Sink 在 Apache Flink 中是一个核心组件,它负责将 Flink 数据流中的数据发送到 Elasticsearch。下面是 Elasticsearch Sink 的核心组件:

  1. SinkFunction: SinkFunction 是 Flink 中的一个接口,用于定义将数据发送到外部系统的逻辑。在 Elasticsearch Sink 中,您需要实现 SinkFunction 接口,以将 Flink 数据流中的数据发送到 Elasticsearch。通常,您需要在 SinkFunction 中实现将数据转换为 JSON 格式,并通过 Elasticsearch 的 REST API 将数据发送到指定的索引中。
  2. BulkProcessor: BulkProcessor 是 Elasticsearch Java 客户端提供的一个功能,用于批量写入数据到 Elasticsearch。在 Elasticsearch Sink 中,BulkProcessor 负责将 Flink 数据流中的数据批量发送到 Elasticsearch。您可以通过 BulkProcessor 来配置批量写入的大小、并发度等参数,以优化写入性能。
  3. TransportClient 或 RestHighLevelClient: 在 Elasticsearch Sink 中,您可以使用 Elasticsearch Java 客户端的 TransportClient 或 RestHighLevelClient 来与 Elasticsearch 集群进行通信。这些客户端提供了与 Elasticsearch 集群交互的接口,使您可以发送数据到 Elasticsearch、执行查询、索引管理等操作。
  4. 序列化器(Serializer): 在将数据发送到 Elasticsearch 之前,通常需要将 Flink 数据流中的数据序列化为 JSON 格式。序列化器负责将 Flink 数据流中的数据转换为 Elasticsearch 所需的 JSON 格式。您可以根据具体的数据类型和业务需求来实现自定义的序列化器。
  5. Elasticsearch 连接配置: 在 Elasticsearch Sink 中,您需要配置与 Elasticsearch 集群的连接信息,包括 Elasticsearch 集群的地址、端口、索引名称等。这些配置信息通常在初始化 Elasticsearch Sink 时进行设置,并在发送数据时使用。
  6. 容错与错误处理机制: Elasticsearch Sink 需要具备容错和错误处理机制,以确保数据的可靠性和一致性。如果在数据发送过程中发生错误,例如网络故障或 Elasticsearch 集群不可用,Sink 需要能够进行故障恢复,并重新发送丢失的数据,以确保数据不会丢失。

这些组件共同作用,构成了 Elasticsearch Sink 在 Flink 中的核心功能,使得 Flink 用户可以轻松地将实时流数据发送到 Elasticsearch,并实现各种实时数据分析和搜索应用。

04 Elasticsearch Sink 配置参数

nodes :Elasticsearch 集群的节点地址列表

port :Elasticsearch 集群的端口

Elasticsearch 集群的节点地址列表

scheme : Elasticsearch 集群的通信协议,http或https

type :Elasticsearch 集群的文档类型,es7以后是_doc

index :Elasticsearch 集群的索引名称

bulkFlushMaxActions :内部批量处理器,刷新前最大缓存的操作数

bulkFlushMaxSizeMb :刷新前最大缓存的数据量(以兆字节为单位)

bulkFlushInterval :刷新的时间间隔(不论缓存操作的数量或大小如何)

bulkFlushBackoff :是否启用批量写入的退避策略,当Elasticsearch 写入失败时,可以启用退避策略,以避免频繁的重试。此时,setBulkFlushBackoffDelay 和 setBulkFlushBackoffRetries 参数生效。

bulkFlushBackoffDelay :设置批量写入的退避延迟时间,在发生写入失败后,等待指定的延迟时间后再进行重试

bulkFlushBackoffRetries :设置批量写入的最大重试次数,设置在写入失败后的最大重试次数。超过这个次数后,将不再重试

connectTimeout :设置与 Elasticsearch 集群建立连接的超时时间,单位为毫秒。在指定的时间内无法建立连接将会抛出连接超时异常

socketTimeout :设置与 Elasticsearch 连接的套接字超时时间,单位为毫秒。该参数定义了在建立连接后从服务器读取数据的超时时间。

connectionRequestTimeout :设置连接请求超时时间,单位为毫秒。该参数表示从连接池获取连接的超时时间。如果在指定的时间内无法获得连接,将会抛出连接请求超时异常。

redirectsEnabled :设置是否允许重定向。如果设置为true,则当遇到重定向响应时,客户端将跟随重定向并继续请求;如果设置为false,重定向响应将被视为错误。

maxRedirects :客户端允许的最大重定向次数

authenticationEnabled :启用身份验证功能。通过设置该参数为true,可以提供用户名和密码进行身份验证,以连接到 Elasticsearch 集群。

circularRedirectsAllowed :设置是否允许循环重定向。如果设置为true,则允许在重定向过程中发生循环重定向;如果设置为false,则在检测到循环重定向时,将会抛出异常。

contentCompressionEnabled :设置是否启用内容压缩。如果设置为true,则允许客户端和 Elasticsearch 之间进行内容压缩,以减少数据传输量。

expectContinueEnabled :设置是否启用 “Expect: continue” 机制。当设置为true时,在发送请求之前,客户端会发送一个请求头部,询问服务器是否接受请求的主体部分。如果服务器响应允许继续发送请求主体,则客户端会继续发送请求;如果服务器响应拒绝继续发送请求主体,则客户端会放弃该请求。

normalizeUri :设置是否标准化 URI。如果设置为true,则客户端会尝试标准化请求 URI,以便消除多余和重复的斜杠等

05 Elasticsearch Sink 依赖管理

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.14.4</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_1.12</artifactId><version>1.14.4</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_1.12</artifactId><version>1.14.4</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch7_1.12</artifactId><version>1.14.4</version>
</dependency>

06 Elasticsearch Sink 初阶实战

package com.aurora.demo;import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.xcontent.XContentType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;/*** 描述:Flink集成Elasticsearch Connector连接器快速入门运行demo* 实现实时数据流如何无缝地流向Elasticsearch** @author 浅夏的猫* @version 1.0.0* @date 2024-02-13 22:25:58*/
public class ElasticsearchSinkStreamJobQuickDemo {private static final Logger logger = LoggerFactory.getLogger(ElasticsearchSinkStreamJobQuickDemo.class);public static void main(String[] args) throws Exception {// 创建elasticsearch集群的httpHost连接HttpHost httpHost = new HttpHost("localhost", 9200, "http");List<HttpHost> httpHosts = new ArrayList<>();httpHosts.add(httpHost);// 创建elasticsearchSinkFunction函数对象,专门用于处理数据写入elasticsearchSink算子队列,会自动创建索引ElasticsearchSinkFunction<JSONObject> elasticsearchSinkFunction = new ElasticsearchSinkFunction<JSONObject>() {@Overridepublic void process(JSONObject element, RuntimeContext runtimeContext, RequestIndexer indexer) {String transId = element.getString("transId");String tradeTime = element.getString("tradeTime");String index = "flink_" + tradeTime;logger.info("交易流水={},数据写入索引{}成功", transId, index);IndexRequest indexRequest = Requests.indexRequest().index(index).type("_doc").id(transId).source(element, XContentType.JSON);indexer.add(indexRequest);}};// 构建elasticsearchSink算子BuilderElasticsearchSink.Builder<JSONObject> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts, elasticsearchSinkFunction);// 每个请求最多发送的文档数量esSinkBuilder.setBulkFlushMaxActions(1);// 每次发送请求的时间间隔esSinkBuilder.setBulkFlushInterval(1000);//构建elasticsearchSink算子ElasticsearchSink<JSONObject> sink = esSinkBuilder.build();// 自定义数据源,模拟生产环境交易接入,每秒下发一个json格式数据SourceFunction<JSONObject> dataSource = new SourceFunction<JSONObject>() {@Overridepublic void run(SourceContext sourceContext) throws Exception {while (true) {//交易流水号String tradeId = UUID.randomUUID().toString();//交易发生时间戳long timeStamp = System.currentTimeMillis();//交易发生金额long tradeAmount = new Random().nextInt(1000);//交易名称String tradeName = "支付宝转账";JSONObject dataObj = new JSONObject();dataObj.put("transId", tradeId);dataObj.put("timeStamp", timeStamp);dataObj.put("tradeTime", dateUtil(timeStamp));dataObj.put("tradeAmount", tradeAmount);dataObj.put("tradeName", tradeName);//模拟生产,每隔1秒生成一笔交易Thread.sleep(1000);logger.info("源交易流水={},原始报文={}", tradeId, dataObj.toJSONString());sourceContext.collect(dataObj);}}@Overridepublic void cancel() {}};// 创建运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 构建数据源DataStreamSource<JSONObject> dataStreamSource = env.addSource(dataSource);// 数据源写入数据算子,进行输出到elasticsearchdataStreamSource.addSink(sink);// 执行任务env.execute();}/*** 描述:时间格式化工具类** @param timestamp 时间戳* @return {@code String }*/private static String dateUtil(long timestamp) {//时间戳加工timestamp = timestamp / 1000;// 将时间戳转换为 LocalDateTime 对象LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochSecond(timestamp), ZoneId.systemDefault());// 定义日期时间格式DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMdd");// 格式化日期时间对象为指定格式的字符串String dateTimeFormat = formatter.format(dateTime);return dateTimeFormat;}
}

启动上述作业后,根据对应的交易流水号查询es,或者查询es的索引数据,但是索引数据一般是一段时间才更新

验证1:检查索引数据变化
http://127.0.0.1:9200/_cat/indices?v

在这里插入图片描述

验证2:根据id查询es的文档记录

在这里插入图片描述
在这里插入图片描述

07 Elasticsearch Sink 进阶实战

进阶实战主要是包括ElasticsearchSink的各种参数配置,以及性能调优

7.1 包结构 & 项目配置

在这里插入图片描述

项目配置application.properties

es.cluster.hosts=localhost
es.cluster.port=9200
es.cluster.scheme=http
es.cluster.type=_doc
es.cluster.indexPrefix=flink_#内部批量处理器,刷新前最大缓存的操作数
es.cluster.bulkFlushMaxActions=1
#刷新前最大缓存的数据量(以兆字节为单位)
es.cluster.bulkFlushMaxSizeMb=10
#刷新的时间间隔(不论缓存操作的数量或大小如何)
es.cluster.bulkFlushInterval=10000#是否启用批量写入的退避策略,当Elasticsearch 写入失败时,可以启用退避策略,以避免频繁的重试。此时,setBulkFlushBackoffDelay 和 setBulkFlushBackoffRetries 参数生效。
es.cluster.bulkFlushBackoff=false
#设置批量写入的退避延迟时间,在发生写入失败后,等待指定的延迟时间后再进行重试
es.cluster.bulkFlushBackoffDelay=10000
#设置批量写入的最大重试次数,设置在写入失败后的最大重试次数。超过这个次数后,将不再重试
es.cluster.bulkFlushBackoffRetries=3#设置与 Elasticsearch 集群建立连接的超时时间,单位为毫秒。在指定的时间内无法建立连接将会抛出连接超时异常
es.cluster.connectTimeout=10000
#设置与 Elasticsearch 连接的套接字超时时间,单位为毫秒。该参数定义了在建立连接后从服务器读取数据的超时时间。
es.cluster.socketTimeout=10000
#设置连接请求超时时间,单位为毫秒。该参数表示从连接池获取连接的超时时间。如果在指定的时间内无法获得连接,将会抛出连接请求超时异常。
es.cluster.connectionRequestTimeout=10000
设置是否允许重定向。如果设置为true,则当遇到重定向响应时,客户端将跟随重定向并继续请求;如果设置为false,重定向响应将被视为错误。
es.cluster.redirectsEnabled=false
#客户端允许的最大重定向次数
es.cluster.maxRedirects=3#启用身份验证功能。通过设置该参数为true,可以提供用户名和密码进行身份验证,以连接到 Elasticsearch 集群。
es.cluster.authenticationEnabled=false
#设置是否允许循环重定向。如果设置为true,则允许在重定向过程中发生循环重定向;如果设置为false,则在检测到循环重定向时,将会抛出异常。
es.cluster.circularRedirectsAllowed=false
#设置是否启用内容压缩。如果设置为true,则允许客户端和 Elasticsearch 之间进行内容压缩,以减少数据传输量。
es.cluster.contentCompressionEnabled=false
#设置是否启用 "Expect: continue" 机制。当设置为true时,在发送请求之前,客户端会发送一个请求头部,询问服务器是否接受请求的主体部分。如果服务器响应允许继续发送请求主体,则客户端会继续发送请求;如果服务器响应拒绝继续发送请求主体,则客户端会放弃该请求。
es.cluster.expectContinueEnabled=false
#设置是否标准化 URI。如果设置为true,则客户端会尝试标准化请求 URI,以便消除多余和重复的斜杠等。
es.cluster.normalizeUri=false

日志配置log4j2.properties

rootLogger.level=INFO
rootLogger.appenderRef.console.ref=ConsoleAppender
appender.console.name=ConsoleAppender
appender.console.type=CONSOLE
appender.console.layout.type=PatternLayout
appender.console.layout.pattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
log.file=D:\\tmprootLogger.level=INFO
rootLogger.appenderRef.console.ref=ConsoleAppender
appender.console.name=ConsoleAppender
appender.console.type=CONSOLE
appender.console.layout.type=PatternLayout
appender.console.layout.pattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
log.file=D:\\tmp

项目pom.xml文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.aurora</groupId><artifactId>aurora_elasticsearch_connector</artifactId><version>1.0-SNAPSHOT</version><!--属性设置--><properties><!--java_JDK版本--><java.version>1.8</java.version><!--maven打包插件--><maven.plugin.version>3.8.1</maven.plugin.version><!--编译编码UTF-8--><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><!--输出报告编码UTF-8--><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><!--json数据格式处理工具--><fastjson.version>1.2.75</fastjson.version><!--log4j版本--><log4j.version>2.17.1</log4j.version><!--flink版本--><flink.version>1.14.4</flink.version><!--scala版本--><scala.binary.version>2.12</scala.binary.version></properties><!--依赖管理--><dependencies><!-- fastJson工具类依赖 start --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>${fastjson.version}</version></dependency><!-- fastJson工具类依赖 end --><!-- log4j日志框架依赖 start --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>${log4j.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>${log4j.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>${log4j.version}</version></dependency><!-- log4j日志框架依赖 end --><!-- Flink基础依赖 start --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- Flink基础依赖 end --><!-- Flink Elasticsearch 连接器依赖 start --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch7_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- Flink Elasticsearch 连接器依赖 end --></dependencies><!--编译打包--><build><finalName>${project.name}</finalName><!--资源文件打包--><resources><resource><directory>src/main/resources</directory></resource><resource><directory>src/main/java</directory><includes><include>**/*.xml</include></includes></resource></resources><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>org.apache.flink:force-shading</exclude><exclude>org.google.code.flindbugs:jar305</exclude><exclude>org.slf4j:*</exclude><excluder>org.apache.logging.log4j:*</excluder></excludes></artifactSet><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>com.aurora.demo,ElasticsearchSinkStreamingJobDemo</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins><!--插件统一管理--><pluginManagement><plugins><!--maven打包插件--><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><version>${spring.boot.version}</version><configuration><fork>true</fork><finalName>${project.build.finalName}</finalName></configuration><executions><execution><goals><goal>repackage</goal></goals></execution></executions></plugin><!--编译打包插件--><plugin><artifactId>maven-compiler-plugin</artifactId><version>${maven.plugin.version}</version><configuration><source>${java.version}</source><target>${java.version}</target><encoding>UTF-8</encoding><compilerArgs><arg>-parameters</arg></compilerArgs></configuration></plugin></plugins></pluginManagement></build></project>

7.2 实体类ElasticsearchEntity

package com.aurora.advanced;import java.io.Serializable;/*** 描述:elasticsearch实体类** @author 浅夏的猫* @version 1.0.0* @date 2024-02-10 20:08:20*/
public class ElasticsearchEntity implements Serializable {private static final long serialVersionUID = 1L;/*** 集群地址* */private String hosts;/*** 集群端口* */private Integer port;/***执行计划* */private String scheme;/*** 文档类型,es7一般都是_doc* */private String type;/*** 索引前缀* */private String indexPrefix;/*** 内部批量处理器,刷新前最大缓存的操作数* */private Integer bulkFlushMaxActions=1;/*** 刷新前最大缓存的数据量(以兆字节为单位)* */private Integer bulkFlushMaxSizeMb=10;/*** 刷新的时间间隔(不论缓存操作的数量或大小如何)* */private Integer bulkFlushInterval=10000;/*** 是否启用批量写入的退避策略,当Elasticsearch 写入失败时,可以启用退避策略,以避免频繁的重试。* 此时,setBulkFlushBackoffDelay 和 setBulkFlushBackoffRetries 参数生效。* */private Boolean bulkFlushBackoff=false;/*** 设置批量写入的退避延迟时间,在发生写入失败后,等待指定的延迟时间后再进行重试* */private Integer bulkFlushBackoffDelay=10000;/*** 设置批量写入的最大重试次数,设置在写入失败后的最大重试次数。超过这个次数后,将不再重试* */private Integer bulkFlushBackoffRetries=3;/*** 设置与 Elasticsearch 集群建立连接的超时时间,单位为毫秒。在指定的时间内无法建立连接将会抛出连接超时异常* */private Integer connectTimeout=10000;/*** 设置与 Elasticsearch 连接的套接字超时时间,单位为毫秒。该参数定义了在建立连接后从服务器读取数据的超时时间。* */private Integer socketTimeout=10000;/*** 设置连接请求超时时间,单位为毫秒。该参数表示从连接池获取连接的超时时间。如果在指定的时间内无法获得连接,将会抛出连接请求超时异常。* */private Integer connectionRequestTimeout=10000;/*** 设置是否允许重定向。如果设置为true,则当遇到重定向响应时,客户端将跟随重定向并继续请求;如果设置为false,重定向响应将被视为错误。* */private Boolean redirectsEnabled=false;/*** 客户端允许的最大重定向次数* */private Integer maxRedirects=3;/*** 启用身份验证功能。通过设置该参数为true,可以提供用户名和密码进行身份验证,以连接到 Elasticsearch 集群。* */private Boolean authenticationEnabled=true;/*** 设置是否允许循环重定向。如果设置为true,则允许在重定向过程中发生循环重定向;如果设置为false,则在检测到循环重定向时,将会抛出异常。* */private Boolean circularRedirectsAllowed=false;/*** 设置是否启用内容压缩。如果设置为true,则允许客户端和 Elasticsearch 之间进行内容压缩,以减少数据传输量。* */private Boolean contentCompressionEnabled=false;/*** 设置是否启用 "Expect: continue" 机制。当设置为true时,在发送请求之前,客户端会发送一个请求头部,询问服务器是否接受请求的主体部分。* 如果服务器响应允许继续发送请求主体,则客户端会继续发送请求;如果服务器响应拒绝继续发送请求主体,则客户端会放弃该请求。* */private Boolean expectContinueEnabled=false;/*** 设置是否标准化 URI。如果设置为true,则客户端会尝试标准化请求 URI,以便消除多余和重复的斜杠等。* */private Boolean normalizeUri=false;/*** 用于设置 HTTP 请求的路径前缀。* 这个配置选项通常用于设置反向代理或者负载均衡器等中间件与 Elasticsearch 集群之间的连接* */private String pathPrefix;public String getHosts() {return hosts;}public void setHosts(String hosts) {this.hosts = hosts;}public Integer getPort() {return port;}public void setPort(Integer port) {this.port = port;}public String getScheme() {return scheme;}public void setScheme(String scheme) {this.scheme = scheme;}public String getType() {return type;}public void setType(String type) {this.type = type;}public String getIndexPrefix() {return indexPrefix;}public void setIndexPrefix(String indexPrefix) {this.indexPrefix = indexPrefix;}public Integer getBulkFlushMaxActions() {return bulkFlushMaxActions;}public void setBulkFlushMaxActions(Integer bulkFlushMaxActions) {this.bulkFlushMaxActions = bulkFlushMaxActions;}public Integer getBulkFlushMaxSizeMb() {return bulkFlushMaxSizeMb;}public void setBulkFlushMaxSizeMb(Integer bulkFlushMaxSizeMb) {this.bulkFlushMaxSizeMb = bulkFlushMaxSizeMb;}public Integer getBulkFlushInterval() {return bulkFlushInterval;}public void setBulkFlushInterval(Integer bulkFlushInterval) {this.bulkFlushInterval = bulkFlushInterval;}public Boolean getBulkFlushBackoff() {return bulkFlushBackoff;}public void setBulkFlushBackoff(Boolean bulkFlushBackoff) {this.bulkFlushBackoff = bulkFlushBackoff;}public Integer getBulkFlushBackoffDelay() {return bulkFlushBackoffDelay;}public void setBulkFlushBackoffDelay(Integer bulkFlushBackoffDelay) {this.bulkFlushBackoffDelay = bulkFlushBackoffDelay;}public Integer getBulkFlushBackoffRetries() {return bulkFlushBackoffRetries;}public void setBulkFlushBackoffRetries(Integer bulkFlushBackoffRetries) {this.bulkFlushBackoffRetries = bulkFlushBackoffRetries;}public Integer getConnectTimeout() {return connectTimeout;}public void setConnectTimeout(Integer connectTimeout) {this.connectTimeout = connectTimeout;}public Integer getSocketTimeout() {return socketTimeout;}public void setSocketTimeout(Integer socketTimeout) {this.socketTimeout = socketTimeout;}public Integer getConnectionRequestTimeout() {return connectionRequestTimeout;}public void setConnectionRequestTimeout(Integer connectionRequestTimeout) {this.connectionRequestTimeout = connectionRequestTimeout;}public Boolean getRedirectsEnabled() {return redirectsEnabled;}public void setRedirectsEnabled(Boolean redirectsEnabled) {this.redirectsEnabled = redirectsEnabled;}public Integer getMaxRedirects() {return maxRedirects;}public void setMaxRedirects(Integer maxRedirects) {this.maxRedirects = maxRedirects;}public Boolean getAuthenticationEnabled() {return authenticationEnabled;}public void setAuthenticationEnabled(Boolean authenticationEnabled) {this.authenticationEnabled = authenticationEnabled;}public Boolean getCircularRedirectsAllowed() {return circularRedirectsAllowed;}public void setCircularRedirectsAllowed(Boolean circularRedirectsAllowed) {this.circularRedirectsAllowed = circularRedirectsAllowed;}public Boolean getContentCompressionEnabled() {return contentCompressionEnabled;}public void setContentCompressionEnabled(Boolean contentCompressionEnabled) {this.contentCompressionEnabled = contentCompressionEnabled;}public Boolean getExpectContinueEnabled() {return expectContinueEnabled;}public void setExpectContinueEnabled(Boolean expectContinueEnabled) {this.expectContinueEnabled = expectContinueEnabled;}public Boolean getNormalizeUri() {return normalizeUri;}public void setNormalizeUri(Boolean normalizeUri) {this.normalizeUri = normalizeUri;}public String getPathPrefix() {return pathPrefix;}public void setPathPrefix(String pathPrefix) {this.pathPrefix = pathPrefix;}
}

7.3 客户端工厂类CustomRestClientFactory

作用:设置用于创建 Elasticsearch REST 客户端的工厂,可以自定义创建 Elasticsearch REST 客户端的逻辑,实现 ElasticsearchSinkBase.RestClientFactory 接口

package com.aurora.advanced;import org.apache.commons.lang3.StringUtils;
import org.apache.flink.streaming.connectors.elasticsearch7.RestClientFactory;
import org.apache.http.Header;
import org.apache.http.message.BasicHeader;
import org.elasticsearch.client.NodeSelector;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;/*** 描述:设置用于创建 Elasticsearch REST 客户端的工厂* 解释:可以自定义创建 Elasticsearch REST 客户端的逻辑,实现 ElasticsearchSinkBase.RestClientFactory 接口** @author 浅夏的猫* @version 1.0.0* @date 2024-02-13 00:12:15*/
public class CustomRestClientFactory implements RestClientFactory {private ElasticsearchEntity elasticsearchEntity;public CustomRestClientFactory(ElasticsearchEntity elasticsearchEntity) {this.elasticsearchEntity = elasticsearchEntity;}@Overridepublic void configureRestClientBuilder(RestClientBuilder restClientBuilder) {//设置默认的 HTTP 头部信息,这些信息将在每个请求中包含Header contentType = new BasicHeader("Content-Type", "application/json");Header authorization = new BasicHeader("Authorization", "Bearer your_access_token");Header[] headers = {contentType, authorization};restClientBuilder.setDefaultHeaders(headers);//设置用于监听节点故障的监听器。当节点发生故障时,可以执行特定的操作restClientBuilder.setFailureListener(new RestClient.FailureListener());//配置用于选择与之通信的节点的策略。这涉及到 Elasticsearch 集群中多个节点的选择。restClientBuilder.setNodeSelector(NodeSelector.ANY);//为每个请求设置路径前缀。这可以用于将请求定向到特定的子路径。if(StringUtils.isNoneBlank(elasticsearchEntity.getPathPrefix())){restClientBuilder.setPathPrefix(elasticsearchEntity.getPathPrefix());}//允许在创建每个请求的时候进行额外的请求配置。restClientBuilder.setRequestConfigCallback(new CustomRequestConfigCallback(elasticsearchEntity));//允许在创建 CloseableHttpClient 实例时进行额外的 HTTP 客户端配置。restClientBuilder.setHttpClientConfigCallback(new CustomHttpClientConfigCallback(elasticsearchEntity));//设置是否启用严格的废弃模式,用于警告有关已弃用功能的使用。restClientBuilder.setStrictDeprecationMode(false);}
}

7.4 回调函数类CustomRequestConfigCallback

作用:允许在创建每个请求的时候进行额外的请求配置

package com.aurora.advanced;import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.cookie.DefaultCookieSpec;
import org.elasticsearch.client.RestClientBuilder;/*** 描述:* 允许在创建每个请求的时候进行额外的请求配置* @author 浅夏的猫* @version 1.0.0* @date 2024-02-13 23:24:42*/
public class CustomRequestConfigCallback implements RestClientBuilder.RequestConfigCallback {private ElasticsearchEntity elasticsearchEntity;public CustomRequestConfigCallback(ElasticsearchEntity elasticsearchEntity) {this.elasticsearchEntity = elasticsearchEntity;}@Overridepublic RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder custom) {// 设置与 Elasticsearch 集群建立连接的超时时间,单位为毫秒。在指定的时间内无法建立连接将会抛出连接超时异常custom.setConnectTimeout(elasticsearchEntity.getConnectTimeout());// 设置与 Elasticsearch 连接的套接字超时时间,单位为毫秒。该参数定义了在建立连接后从服务器读取数据的超时时间。如果在指定的时间内没有读取到数据,将会抛出超时异常。custom.setSocketTimeout(elasticsearchEntity.getSocketTimeout());// 设置连接请求超时时间,单位为毫秒。该参数表示从连接池获取连接的超时时间。如果在指定的时间内无法获得连接,将会抛出连接请求超时异常。custom.setConnectionRequestTimeout(elasticsearchEntity.getConnectionRequestTimeout());// 设置是否允许重定向。如果设置为true,则当遇到重定向响应时,客户端将跟随重定向并继续请求;如果设置为false,重定向响应将被视为错误。custom.setRedirectsEnabled(elasticsearchEntity.getRedirectsEnabled());// 设置最大重定向次数。当允许重定向时,该参数指定在遇到重定向响应时,最多可以重定向的次数。custom.setMaxRedirects(elasticsearchEntity.getMaxRedirects());// 设置是否允许循环重定向。如果设置为true,则允许在重定向过程中发生循环重定向;如果设置为false,则在检测到循环重定向时,将会抛出异常。custom.setCircularRedirectsAllowed(elasticsearchEntity.getCircularRedirectsAllowed());// 设置是否启用内容压缩。如果设置为true,则允许客户端和 Elasticsearch 之间进行内容压缩,以减少数据传输量。custom.setContentCompressionEnabled(elasticsearchEntity.getContentCompressionEnabled());// 设置是否启用 "Expect: continue" 机制。当设置为true时,在发送请求之前,客户端会发送一个请求头部,询问服务器是否接受请求的主体部分。//  如果服务器响应允许继续发送请求主体,则客户端会继续发送请求;如果服务器响应拒绝继续发送请求主体,则客户端会放弃该请求。custom.setExpectContinueEnabled(elasticsearchEntity.getExpectContinueEnabled());// 设置是否标准化 URI。如果设置为true,则客户端会尝试标准化请求 URI,以便消除多余和重复的斜杠等。custom.setNormalizeUri(elasticsearchEntity.getNormalizeUri());// 设置使用的 Cookie 规范。可以指定客户端在处理与 Elasticsearch 服务器之间的 Cookie 交互时使用的 Cookie 规范custom.setCookieSpec(new DefaultCookieSpec().toString());return custom;}
}

7.5 客户端配置类CustomHttpClientConfigCallback

作用:允许在创建 CloseableHttpClient 实例时进行额外的 HTTP 客户端配置

package com.aurora.advanced;import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.cookie.DefaultCookieSpec;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.client.RestClientBuilder;/*** 描述:客户端配置* 允许在创建 CloseableHttpClient 实例时进行额外的 HTTP 客户端配置** @author 浅夏的猫* @version 1.0.0* @date 2024-02-13 23:28:15*/
public class CustomHttpClientConfigCallback implements RestClientBuilder.HttpClientConfigCallback {private ElasticsearchEntity elasticsearchEntity;CustomHttpClientConfigCallback(ElasticsearchEntity elasticsearchEntity) {this.elasticsearchEntity = elasticsearchEntity;}@Overridepublic HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {RequestConfig.Builder custom = RequestConfig.custom();// 设置与 Elasticsearch 集群建立连接的超时时间,单位为毫秒。在指定的时间内无法建立连接将会抛出连接超时异常custom.setConnectTimeout(elasticsearchEntity.getConnectTimeout());// 设置与 Elasticsearch 连接的套接字超时时间,单位为毫秒。该参数定义了在建立连接后从服务器读取数据的超时时间。如果在指定的时间内没有读取到数据,将会抛出超时异常。custom.setSocketTimeout(elasticsearchEntity.getSocketTimeout());// 设置连接请求超时时间,单位为毫秒。该参数表示从连接池获取连接的超时时间。如果在指定的时间内无法获得连接,将会抛出连接请求超时异常。custom.setConnectionRequestTimeout(elasticsearchEntity.getConnectionRequestTimeout());// 设置是否允许重定向。如果设置为true,则当遇到重定向响应时,客户端将跟随重定向并继续请求;如果设置为false,重定向响应将被视为错误。custom.setRedirectsEnabled(elasticsearchEntity.getRedirectsEnabled());// 设置最大重定向次数。当允许重定向时,该参数指定在遇到重定向响应时,最多可以重定向的次数。custom.setMaxRedirects(elasticsearchEntity.getMaxRedirects());// 启用身份验证功能。通过设置该参数为true,可以提供用户名和密码进行身份验证,以连接到 Elasticsearch 集群。custom.setAuthenticationEnabled(elasticsearchEntity.getAuthenticationEnabled());// 设置是否允许循环重定向。如果设置为true,则允许在重定向过程中发生循环重定向;如果设置为false,则在检测到循环重定向时,将会抛出异常。custom.setCircularRedirectsAllowed(elasticsearchEntity.getCircularRedirectsAllowed());// 设置是否启用内容压缩。如果设置为true,则允许客户端和 Elasticsearch 之间进行内容压缩,以减少数据传输量。custom.setContentCompressionEnabled(elasticsearchEntity.getContentCompressionEnabled());// 设置是否启用 "Expect: continue" 机制。当设置为true时,在发送请求之前,客户端会发送一个请求头部,询问服务器是否接受请求的主体部分。//  如果服务器响应允许继续发送请求主体,则客户端会继续发送请求;如果服务器响应拒绝继续发送请求主体,则客户端会放弃该请求。custom.setExpectContinueEnabled(elasticsearchEntity.getExpectContinueEnabled());// 设置是否标准化 URI。如果设置为true,则客户端会尝试标准化请求 URI,以便消除多余和重复的斜杠等。custom.setNormalizeUri(elasticsearchEntity.getNormalizeUri());// 设置使用的 Cookie 规范。可以指定客户端在处理与 Elasticsearch 服务器之间的 Cookie 交互时使用的 Cookie 规范custom.setCookieSpec(new DefaultCookieSpec().toString());return httpAsyncClientBuilder.setDefaultRequestConfig(custom.build());}
}

7.6 Es操作类CustomElasticsearchSinkFunction

作用:实时把数据写入到队列中,再通过批量提交到Elasticsearch中,实现数据写入

package com.aurora.advanced;import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.xcontent.XContentType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/*** 描述:自定义elasticsearch sink 算子函数* ElasticsearchSinkFunction 是用于将数据流写入 Elasticsearch 的接口。* 它允许您自定义如何将 Flink 流式处理的数据写入 Elasticsearch 索引** @author 浅夏的猫* @version 1.0.0* @date 2024-02-12 23:49:22*/
public class CustomElasticsearchSinkFunction implements ElasticsearchSinkFunction<JSONObject> {private static final Logger logger = LoggerFactory.getLogger(CustomElasticsearchSinkFunction.class);private ElasticsearchEntity elasticsearchEntity;public CustomElasticsearchSinkFunction(ElasticsearchEntity elasticsearchEntity) {this.elasticsearchEntity = elasticsearchEntity;}@Overridepublic void process(JSONObject element, RuntimeContext runtimeContext, RequestIndexer indexer) {String transId = element.getString("transId");String tradeTime = element.getString("tradeTime");String index = elasticsearchEntity.getIndexPrefix() + tradeTime;logger.info("交易流水={},数据写入索引{}成功", tradeTime, index);IndexRequest indexRequest = Requests.indexRequest().index(index).type(elasticsearchEntity.getType()).id(transId).source(element, XContentType.JSON);indexer.add(indexRequest);}
}

7.7 异常处理类CustomActionRequestFailureHandler

作用:当sink写Elasticsearch出现异常时,可以自定义操作策略

package com.aurora.advanced;import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/*** 描述:es写入异常处理** @author 浅夏的猫* @version 1.0.0* @date 2024-02-13 00:04:24*/
public class CustomActionRequestFailureHandler implements ActionRequestFailureHandler {private static final Logger logger = LoggerFactory.getLogger(CustomActionRequestFailureHandler.class);@Overridepublic void onFailure(ActionRequest action, Throwable throwable, int restStatusCode, RequestIndexer requestIndexer) throws Throwable {// 处理不同类型的异常if (throwable instanceof EsRejectedExecutionException) {// 如果是由于线程池饱和导致的拒绝执行异常,可以采取相应的处理措施logger.warn("Elasticsearch action execution was rejected due to thread pool saturation.");// 这里你可以选择执行重试或者其他处理逻辑,例如将数据写入到一个备用存储// 例如: indexer.add(createAnotherRequest(action));} else {// 对于其他类型的异常,默认返回放弃策略logger.error("Unhandled failure, abandoning request: {}", action.toString());}}
}

7.8 作业主类ElasticsearchSinkStreamJobAdvancedDemo

package com.aurora.advanced;import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.IOException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;/*** 描述:Flink集成Elasticsearch Connector连接器进阶Demo* 实现实时数据流如何无缝地流向Elasticsearch** @author 浅夏的猫* @version 1.0.0* @date 2024-02-11 22:06:45*/
public class ElasticsearchSinkStreamJobAdvancedDemo {private static final Logger logger = LoggerFactory.getLogger(ElasticsearchSinkStreamJobAdvancedDemo.class);public static void main(String[] args) {try {// 读取配置参数ElasticsearchEntity elasticsearchEntity = paramsInit();// 设置elasticsearch节点List<HttpHost> httpHosts = esClusterHttpHostHandler(elasticsearchEntity);// 创建esSinkFunction函数ElasticsearchSinkFunction<JSONObject> esSinkFunction = new CustomElasticsearchSinkFunction(elasticsearchEntity);// 构建ElasticsearchSink算子builderElasticsearchSink.Builder<JSONObject> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts, esSinkFunction);// es参数配置esBuilderHandler(esSinkBuilder, elasticsearchEntity);// 构建sink算子ElasticsearchSink<JSONObject> esSink = esSinkBuilder.build();// 自定义数据源,模拟生产环境交易接入,json格式数据SourceFunction<JSONObject> dataSource = new SourceFunction<JSONObject>() {@Overridepublic void run(SourceContext sourceContext) throws Exception {while (true) {//交易流水号String tradeId = UUID.randomUUID().toString();//交易发生时间戳long timeStamp = System.currentTimeMillis();//交易发生金额long tradeAmount = new Random().nextInt(100);//交易名称String tradeName = "支付宝转账";JSONObject dataObj = new JSONObject();dataObj.put("transId", tradeId);dataObj.put("timeStamp", timeStamp);dataObj.put("tradeTime", dateUtil(timeStamp));dataObj.put("tradeAmount", tradeAmount);dataObj.put("tradeName", tradeName);//模拟生产,每隔1秒生成一笔交易Thread.sleep(1000);logger.info("交易接入,原始报文={}", dataObj.toJSONString());sourceContext.collect(dataObj);}}@Overridepublic void cancel() {}};// 创建运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 构建数据源DataStreamSource<JSONObject> dataStreamSource = env.addSource(dataSource);// 构建sink算子dataStreamSource.addSink(esSink);// 运行作业env.execute();} catch (Exception e) {e.printStackTrace();}}/*** 描述:Flink参数配置读取** @return {@code ElasticsearchEntity }* @throws IOException*/private static ElasticsearchEntity paramsInit() throws IOException {// 通过flink内置工具类获取命令行参数String propertiesFilePath = "E:\\project\\aurora_dev\\aurora_flink_connector_elasticsearch\\src\\main\\resources\\application.properties";ParameterTool paramsMap = ParameterTool.fromPropertiesFile(propertiesFilePath);ElasticsearchEntity elasticsearchEntity = new ElasticsearchEntity();String hosts = paramsMap.get("es.cluster.hosts");int port = paramsMap.getInt("es.cluster.port");String scheme = paramsMap.get("es.cluster.scheme");String type = paramsMap.get("es.cluster.type");String indexPrefix = paramsMap.get("es.cluster.indexPrefix");int bulkFlushMaxActions = paramsMap.getInt("es.cluster.bulkFlushMaxActions");int bulkFlushMaxSizeMb = paramsMap.getInt("es.cluster.bulkFlushMaxSizeMb");int bulkFlushInterval = paramsMap.getInt("es.cluster.bulkFlushInterval");boolean bulkFlushBackoff = paramsMap.getBoolean("es.cluster.bulkFlushBackoff");int bulkFlushBackoffDelay = paramsMap.getInt("es.cluster.bulkFlushBackoffDelay");int bulkFlushBackoffRetries = paramsMap.getInt("es.cluster.bulkFlushBackoffRetries");int connectTimeout = paramsMap.getInt("es.cluster.connectTimeout");int socketTimeout = paramsMap.getInt("es.cluster.socketTimeout");int connectionRequestTimeout = paramsMap.getInt("es.cluster.connectionRequestTimeout");boolean redirectsEnabled = paramsMap.getBoolean("es.cluster.redirectsEnabled");int maxRedirects = paramsMap.getInt("es.cluster.maxRedirects");boolean authenticationEnabled = paramsMap.getBoolean("es.cluster.authenticationEnabled");boolean circularRedirectsAllowed = paramsMap.getBoolean("es.cluster.circularRedirectsAllowed");boolean contentCompressionEnabled = paramsMap.getBoolean("es.cluster.contentCompressionEnabled");boolean expectContinueEnabled = paramsMap.getBoolean("es.cluster.expectContinueEnabled");boolean normalizeUri = paramsMap.getBoolean("es.cluster.normalizeUri");elasticsearchEntity.setHosts(hosts);elasticsearchEntity.setPort(port);elasticsearchEntity.setScheme(scheme);elasticsearchEntity.setType(type);elasticsearchEntity.setIndexPrefix(indexPrefix);elasticsearchEntity.setBulkFlushMaxActions(bulkFlushMaxActions);elasticsearchEntity.setBulkFlushMaxSizeMb(bulkFlushMaxSizeMb);elasticsearchEntity.setBulkFlushInterval(bulkFlushInterval);elasticsearchEntity.setBulkFlushBackoff(bulkFlushBackoff);elasticsearchEntity.setBulkFlushBackoffDelay(bulkFlushBackoffDelay);elasticsearchEntity.setBulkFlushBackoffRetries(bulkFlushBackoffRetries);elasticsearchEntity.setConnectTimeout(connectTimeout);elasticsearchEntity.setSocketTimeout(socketTimeout);elasticsearchEntity.setConnectionRequestTimeout(connectionRequestTimeout);elasticsearchEntity.setRedirectsEnabled(redirectsEnabled);elasticsearchEntity.setMaxRedirects(maxRedirects);elasticsearchEntity.setAuthenticationEnabled(authenticationEnabled);elasticsearchEntity.setCircularRedirectsAllowed(circularRedirectsAllowed);elasticsearchEntity.setExpectContinueEnabled(expectContinueEnabled);elasticsearchEntity.setContentCompressionEnabled(contentCompressionEnabled);elasticsearchEntity.setNormalizeUri(normalizeUri);return elasticsearchEntity;}/*** 描述:时间格式化工具类** @param timestamp 时间戳* @return {@code String }*/private static String dateUtil(long timestamp) {timestamp = timestamp / 1000;// 将时间戳转换为 LocalDateTime 对象LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochSecond(timestamp), ZoneId.systemDefault());// 定义日期时间格式DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMdd");// 格式化日期时间对象为指定格式的字符串String dateTimeFormat = formatter.format(dateTime);return dateTimeFormat;}/*** 描述:es参数配置** @param esSinkBuilder       esSinkBuilder建造器* @param elasticsearchEntity es实体类*/private static void esBuilderHandler(ElasticsearchSink.Builder<JSONObject> esSinkBuilder, ElasticsearchEntity elasticsearchEntity) {// 设置触发批量写入的最大动作数,// 解释:当达到指定的最大动作数时,将触发批量写入到 Elasticsearch。如果你希望在每次写入到 Elasticsearch 时都进行批量写入,可以将该值设置为 1esSinkBuilder.setBulkFlushMaxActions(elasticsearchEntity.getBulkFlushMaxActions());// 设置触发批量写入的最大数据量// 解释:当写入的数据量达到指定的最大值时,将触发批量写入到 Elasticsearch。单位为 MBesSinkBuilder.setBulkFlushMaxSizeMb(elasticsearchEntity.getBulkFlushMaxSizeMb());// 设置批量写入的时间间隔// 解释:每隔指定的时间间隔,无论是否达到最大动作数或最大数据量,都会触发批量写入esSinkBuilder.setBulkFlushInterval(elasticsearchEntity.getBulkFlushInterval());// 启用批量写入的退避策略// 解释:当 Elasticsearch 写入失败时,可以启用退避策略,以避免频繁的重试。此时,setBulkFlushBackoffDelay 和 setBulkFlushBackoffRetries 参数生效。esSinkBuilder.setBulkFlushBackoff(elasticsearchEntity.getBulkFlushBackoff());// 设置批量写入的退避延迟时间// 解释:在发生写入失败后,等待指定的延迟时间后再进行重试esSinkBuilder.setBulkFlushBackoffDelay(elasticsearchEntity.getBulkFlushBackoffDelay());// 设置批量写入的最大重试次数// 解释:设置在写入失败后的最大重试次数。超过这个次数后,将不再重试esSinkBuilder.setBulkFlushBackoffRetries(elasticsearchEntity.getBulkFlushBackoffRetries());// 设置写入失败时的处理策略// 解释:可以自定义处理失败的策略,实现 ElasticsearchSinkFunction.FailureHandler 接口esSinkBuilder.setFailureHandler(new CustomActionRequestFailureHandler());// 设置用于创建 Elasticsearch REST 客户端的工厂// 解释:可以自定义创建 Elasticsearch REST 客户端的逻辑,实现 ElasticsearchSinkBase.RestClientFactory 接口esSinkBuilder.setRestClientFactory(new CustomRestClientFactory(elasticsearchEntity));}/*** 描述:* elasticsearch 节点配置** @param elasticsearchEntity es实体类* @return {@code List<HttpHost> }*/private static List<HttpHost> esClusterHttpHostHandler(ElasticsearchEntity elasticsearchEntity) {List<HttpHost> httpHosts = new ArrayList<>();String[] clusterArray = elasticsearchEntity.getHosts().split(",");for (String node : clusterArray) {httpHosts.add(new HttpHost(node, elasticsearchEntity.getPort(), elasticsearchEntity.getScheme()));}return httpHosts;}
}

这篇关于【天衍系列 04】深入理解Flink的ElasticsearchSink组件:实时数据流如何无缝地流向Elasticsearch的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/716022

相关文章

Java操作ElasticSearch的实例详解

《Java操作ElasticSearch的实例详解》Elasticsearch是一个分布式的搜索和分析引擎,广泛用于全文搜索、日志分析等场景,本文将介绍如何在Java应用中使用Elastics... 目录简介环境准备1. 安装 Elasticsearch2. 添加依赖连接 Elasticsearch1. 创

一文带你理解Python中import机制与importlib的妙用

《一文带你理解Python中import机制与importlib的妙用》在Python编程的世界里,import语句是开发者最常用的工具之一,它就像一把钥匙,打开了通往各种功能和库的大门,下面就跟随小... 目录一、python import机制概述1.1 import语句的基本用法1.2 模块缓存机制1.

深入理解C语言的void*

《深入理解C语言的void*》本文主要介绍了C语言的void*,包括它的任意性、编译器对void*的类型检查以及需要显式类型转换的规则,具有一定的参考价值,感兴趣的可以了解一下... 目录一、void* 的类型任意性二、编译器对 void* 的类型检查三、需要显式类型转换占用的字节四、总结一、void* 的

深入理解Redis大key的危害及解决方案

《深入理解Redis大key的危害及解决方案》本文主要介绍了深入理解Redis大key的危害及解决方案,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着... 目录一、背景二、什么是大key三、大key评价标准四、大key 产生的原因与场景五、大key影响与危

vue解决子组件样式覆盖问题scoped deep

《vue解决子组件样式覆盖问题scopeddeep》文章主要介绍了在Vue项目中处理全局样式和局部样式的方法,包括使用scoped属性和深度选择器(/deep/)来覆盖子组件的样式,作者建议所有组件... 目录前言scoped分析deep分析使用总结所有组件必须加scoped父组件覆盖子组件使用deep前言

基于Qt Qml实现时间轴组件

《基于QtQml实现时间轴组件》时间轴组件是现代用户界面中常见的元素,用于按时间顺序展示事件,本文主要为大家详细介绍了如何使用Qml实现一个简单的时间轴组件,需要的可以参考下... 目录写在前面效果图组件概述实现细节1. 组件结构2. 属性定义3. 数据模型4. 事件项的添加和排序5. 事件项的渲染如何使用

深入理解C++ 空类大小

《深入理解C++空类大小》本文主要介绍了C++空类大小,规定空类大小为1字节,主要是为了保证对象的唯一性和可区分性,满足数组元素地址连续的要求,下面就来了解一下... 目录1. 保证对象的唯一性和可区分性2. 满足数组元素地址连续的要求3. 与C++的对象模型和内存管理机制相适配查看类对象内存在C++中,规

Spring Security 从入门到进阶系列教程

Spring Security 入门系列 《保护 Web 应用的安全》 《Spring-Security-入门(一):登录与退出》 《Spring-Security-入门(二):基于数据库验证》 《Spring-Security-入门(三):密码加密》 《Spring-Security-入门(四):自定义-Filter》 《Spring-Security-入门(五):在 Sprin

JS常用组件收集

收集了一些平时遇到的前端比较优秀的组件,方便以后开发的时候查找!!! 函数工具: Lodash 页面固定: stickUp、jQuery.Pin 轮播: unslider、swiper 开关: switch 复选框: icheck 气泡: grumble 隐藏元素: Headroom

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M