【天衍系列 04】深入理解Flink的ElasticsearchSink组件:实时数据流如何无缝地流向Elasticsearch

本文主要是介绍【天衍系列 04】深入理解Flink的ElasticsearchSink组件:实时数据流如何无缝地流向Elasticsearch,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • 01 Elasticsearch Sink 基础概念
  • 02 Elasticsearch Sink 工作原理
  • 03 Elasticsearch Sink 核心组件
  • 04 Elasticsearch Sink 配置参数
  • 05 Elasticsearch Sink 依赖管理
  • 06 Elasticsearch Sink 初阶实战
  • 07 Elasticsearch Sink 进阶实战
    • 7.1 包结构 & 项目配置
      • 项目配置application.properties
      • 日志配置log4j2.properties
      • 项目pom.xml文件
    • 7.2 实体类ElasticsearchEntity
    • 7.3 客户端工厂类CustomRestClientFactory
    • 7.4 回调函数类CustomRequestConfigCallback
    • 7.5 客户端配置类CustomHttpClientConfigCallback
    • 7.6 Es操作类CustomElasticsearchSinkFunction
    • 7.7 异常处理类CustomActionRequestFailureHandler
    • 7.8 作业主类ElasticsearchSinkStreamJobAdvancedDemo

01 Elasticsearch Sink 基础概念

Flink的Elasticsearch Sink是用于将Flink数据流(DataStream)中的数据发送到Elasticsearch的组件。它是Flink的一个连接器(Connector),用于实现将实时处理的结果或数据持续地写入Elasticsearch集群中的索引中。

下面是一些关于Flink的Elasticsearch Sink的基础概念:

  1. 数据源(Source):Flink数据流的源头,可以是各种数据源,例如Kafka、文件系统、Socket等。Elasticsearch Sink通常是连接到Flink数据流的末端,用于将最终处理结果或数据写入Elasticsearch。
  2. Elasticsearch集群:一个或多个Elasticsearch节点的集合,用于存储和处理数据。Elasticsearch提供了分布式的数据存储和搜索功能。
  3. 索引(Index):在Elasticsearch中,索引是存储相关数据的地方,类似于关系数据库中的表。每个索引可以包含多个文档(Document),每个文档包含一个或多个字段(Field)。
  4. 文档(Document):在Elasticsearch中,文档是最小的数据单元。它们以JSON格式表示,并存储在索引中。
  5. Elasticsearch Sink:是Flink的一个数据接收器,用于将数据流中的数据发送到Elasticsearch集群中的特定索引。Sink负责将Flink数据流中的事件转换为Elasticsearch要求的格式,并将其发送到指定的索引。
  6. 序列化与映射:在将数据写入Elasticsearch之前,通常需要对数据进行序列化和映射。序列化是将数据从Flink的内部表示转换为Elasticsearch要求的JSON格式。映射则是定义如何将Flink数据流中的字段映射到Elasticsearch文档中的字段。
  7. 并行度控制:Elasticsearch Sink支持并行度控制,可以根据需要调整并发写入Elasticsearch的任务数量。这有助于优化性能并避免对Elasticsearch集群造成过大的负载。

总的来说,Flink的Elasticsearch Sink是一个关键的组件,用于将实时处理的结果或数据可靠地写入Elasticsearch中,从而支持各种实时数据分析和搜索应用。

02 Elasticsearch Sink 工作原理

Elasticsearch Sink 是 Apache Flink 提供的一个连接器,用于将 Flink 数据流中的数据发送到 Elasticsearch 集群中。以下是 Elasticsearch Sink 的工作原理:

  1. 数据流入 Flink 程序: 数据首先从外部数据源(如 Kafka、RabbitMQ、文件系统等)进入到 Flink 程序中。Flink 以流式处理的方式处理数据,这意味着数据会一条一条地进入 Flink 的数据流中。
  2. 数据转换与处理: 一旦数据进入 Flink,您可以对数据进行各种转换和处理。这可能包括数据清洗、转换、聚合、窗口操作等。在您的 Flink 程序中,您可以通过各种 Flink 的算子来实现这些转换和处理。
  3. Elasticsearch Sink 的配置: 当需要将数据写入 Elasticsearch 时,您需要配置 Elasticsearch Sink。这通常包括指定 Elasticsearch 集群的地址、端口、索引名称等信息。您还可以配置其他参数,例如批量写入的大小、超时时间等。
  4. 数据发送到 Elasticsearch: 一旦配置完成,Elasticsearch Sink 会将 Flink 数据流中的数据转换为 JSON 格式,并通过 Elasticsearch 的 REST API 将数据发送到指定的索引中。通常,Elasticsearch Sink 会将数据批量发送到 Elasticsearch,以提高写入的效率和性能。
  5. 序列化与映射: 在发送数据之前,通常需要将 Flink 数据流中的数据序列化为 JSON 格式,并根据 Elasticsearch 索引的映射规则进行字段映射。这确保了发送到 Elasticsearch 的数据与索引的结构一致。
  6. 容错与错误处理: Flink 提供了容错机制来确保数据的可靠性和一致性。如果在数据发送过程中发生错误,例如网络故障或 Elasticsearch 集群不可用,Flink 会自动进行故障恢复,并重新发送丢失的数据,以确保数据不会丢失。
  7. 性能优化: 为了提高性能,Elasticsearch Sink 可以通过调整批量写入的大小、并发度等参数来优化性能。这可以减少与 Elasticsearch 的通信开销,并提高写入的效率。

总的来说,Elasticsearch Sink 通过将 Flink 数据流中的数据转换为 JSON 格式,并利用 Elasticsearch 的 REST API 将数据发送到指定的索引中,实现了将实时流数据写入 Elasticsearch 的功能。

03 Elasticsearch Sink 核心组件

Elasticsearch Sink 在 Apache Flink 中是一个核心组件,它负责将 Flink 数据流中的数据发送到 Elasticsearch。下面是 Elasticsearch Sink 的核心组件:

  1. SinkFunction: SinkFunction 是 Flink 中的一个接口,用于定义将数据发送到外部系统的逻辑。在 Elasticsearch Sink 中,您需要实现 SinkFunction 接口,以将 Flink 数据流中的数据发送到 Elasticsearch。通常,您需要在 SinkFunction 中实现将数据转换为 JSON 格式,并通过 Elasticsearch 的 REST API 将数据发送到指定的索引中。
  2. BulkProcessor: BulkProcessor 是 Elasticsearch Java 客户端提供的一个功能,用于批量写入数据到 Elasticsearch。在 Elasticsearch Sink 中,BulkProcessor 负责将 Flink 数据流中的数据批量发送到 Elasticsearch。您可以通过 BulkProcessor 来配置批量写入的大小、并发度等参数,以优化写入性能。
  3. TransportClient 或 RestHighLevelClient: 在 Elasticsearch Sink 中,您可以使用 Elasticsearch Java 客户端的 TransportClient 或 RestHighLevelClient 来与 Elasticsearch 集群进行通信。这些客户端提供了与 Elasticsearch 集群交互的接口,使您可以发送数据到 Elasticsearch、执行查询、索引管理等操作。
  4. 序列化器(Serializer): 在将数据发送到 Elasticsearch 之前,通常需要将 Flink 数据流中的数据序列化为 JSON 格式。序列化器负责将 Flink 数据流中的数据转换为 Elasticsearch 所需的 JSON 格式。您可以根据具体的数据类型和业务需求来实现自定义的序列化器。
  5. Elasticsearch 连接配置: 在 Elasticsearch Sink 中,您需要配置与 Elasticsearch 集群的连接信息,包括 Elasticsearch 集群的地址、端口、索引名称等。这些配置信息通常在初始化 Elasticsearch Sink 时进行设置,并在发送数据时使用。
  6. 容错与错误处理机制: Elasticsearch Sink 需要具备容错和错误处理机制,以确保数据的可靠性和一致性。如果在数据发送过程中发生错误,例如网络故障或 Elasticsearch 集群不可用,Sink 需要能够进行故障恢复,并重新发送丢失的数据,以确保数据不会丢失。

这些组件共同作用,构成了 Elasticsearch Sink 在 Flink 中的核心功能,使得 Flink 用户可以轻松地将实时流数据发送到 Elasticsearch,并实现各种实时数据分析和搜索应用。

04 Elasticsearch Sink 配置参数

nodes :Elasticsearch 集群的节点地址列表

port :Elasticsearch 集群的端口

Elasticsearch 集群的节点地址列表

scheme : Elasticsearch 集群的通信协议,http或https

type :Elasticsearch 集群的文档类型,es7以后是_doc

index :Elasticsearch 集群的索引名称

bulkFlushMaxActions :内部批量处理器,刷新前最大缓存的操作数

bulkFlushMaxSizeMb :刷新前最大缓存的数据量(以兆字节为单位)

bulkFlushInterval :刷新的时间间隔(不论缓存操作的数量或大小如何)

bulkFlushBackoff :是否启用批量写入的退避策略,当Elasticsearch 写入失败时,可以启用退避策略,以避免频繁的重试。此时,setBulkFlushBackoffDelay 和 setBulkFlushBackoffRetries 参数生效。

bulkFlushBackoffDelay :设置批量写入的退避延迟时间,在发生写入失败后,等待指定的延迟时间后再进行重试

bulkFlushBackoffRetries :设置批量写入的最大重试次数,设置在写入失败后的最大重试次数。超过这个次数后,将不再重试

connectTimeout :设置与 Elasticsearch 集群建立连接的超时时间,单位为毫秒。在指定的时间内无法建立连接将会抛出连接超时异常

socketTimeout :设置与 Elasticsearch 连接的套接字超时时间,单位为毫秒。该参数定义了在建立连接后从服务器读取数据的超时时间。

connectionRequestTimeout :设置连接请求超时时间,单位为毫秒。该参数表示从连接池获取连接的超时时间。如果在指定的时间内无法获得连接,将会抛出连接请求超时异常。

redirectsEnabled :设置是否允许重定向。如果设置为true,则当遇到重定向响应时,客户端将跟随重定向并继续请求;如果设置为false,重定向响应将被视为错误。

maxRedirects :客户端允许的最大重定向次数

authenticationEnabled :启用身份验证功能。通过设置该参数为true,可以提供用户名和密码进行身份验证,以连接到 Elasticsearch 集群。

circularRedirectsAllowed :设置是否允许循环重定向。如果设置为true,则允许在重定向过程中发生循环重定向;如果设置为false,则在检测到循环重定向时,将会抛出异常。

contentCompressionEnabled :设置是否启用内容压缩。如果设置为true,则允许客户端和 Elasticsearch 之间进行内容压缩,以减少数据传输量。

expectContinueEnabled :设置是否启用 “Expect: continue” 机制。当设置为true时,在发送请求之前,客户端会发送一个请求头部,询问服务器是否接受请求的主体部分。如果服务器响应允许继续发送请求主体,则客户端会继续发送请求;如果服务器响应拒绝继续发送请求主体,则客户端会放弃该请求。

normalizeUri :设置是否标准化 URI。如果设置为true,则客户端会尝试标准化请求 URI,以便消除多余和重复的斜杠等

05 Elasticsearch Sink 依赖管理

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.14.4</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_1.12</artifactId><version>1.14.4</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_1.12</artifactId><version>1.14.4</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch7_1.12</artifactId><version>1.14.4</version>
</dependency>

06 Elasticsearch Sink 初阶实战

package com.aurora.demo;import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.xcontent.XContentType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;/*** 描述:Flink集成Elasticsearch Connector连接器快速入门运行demo* 实现实时数据流如何无缝地流向Elasticsearch** @author 浅夏的猫* @version 1.0.0* @date 2024-02-13 22:25:58*/
public class ElasticsearchSinkStreamJobQuickDemo {private static final Logger logger = LoggerFactory.getLogger(ElasticsearchSinkStreamJobQuickDemo.class);public static void main(String[] args) throws Exception {// 创建elasticsearch集群的httpHost连接HttpHost httpHost = new HttpHost("localhost", 9200, "http");List<HttpHost> httpHosts = new ArrayList<>();httpHosts.add(httpHost);// 创建elasticsearchSinkFunction函数对象,专门用于处理数据写入elasticsearchSink算子队列,会自动创建索引ElasticsearchSinkFunction<JSONObject> elasticsearchSinkFunction = new ElasticsearchSinkFunction<JSONObject>() {@Overridepublic void process(JSONObject element, RuntimeContext runtimeContext, RequestIndexer indexer) {String transId = element.getString("transId");String tradeTime = element.getString("tradeTime");String index = "flink_" + tradeTime;logger.info("交易流水={},数据写入索引{}成功", transId, index);IndexRequest indexRequest = Requests.indexRequest().index(index).type("_doc").id(transId).source(element, XContentType.JSON);indexer.add(indexRequest);}};// 构建elasticsearchSink算子BuilderElasticsearchSink.Builder<JSONObject> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts, elasticsearchSinkFunction);// 每个请求最多发送的文档数量esSinkBuilder.setBulkFlushMaxActions(1);// 每次发送请求的时间间隔esSinkBuilder.setBulkFlushInterval(1000);//构建elasticsearchSink算子ElasticsearchSink<JSONObject> sink = esSinkBuilder.build();// 自定义数据源,模拟生产环境交易接入,每秒下发一个json格式数据SourceFunction<JSONObject> dataSource = new SourceFunction<JSONObject>() {@Overridepublic void run(SourceContext sourceContext) throws Exception {while (true) {//交易流水号String tradeId = UUID.randomUUID().toString();//交易发生时间戳long timeStamp = System.currentTimeMillis();//交易发生金额long tradeAmount = new Random().nextInt(1000);//交易名称String tradeName = "支付宝转账";JSONObject dataObj = new JSONObject();dataObj.put("transId", tradeId);dataObj.put("timeStamp", timeStamp);dataObj.put("tradeTime", dateUtil(timeStamp));dataObj.put("tradeAmount", tradeAmount);dataObj.put("tradeName", tradeName);//模拟生产,每隔1秒生成一笔交易Thread.sleep(1000);logger.info("源交易流水={},原始报文={}", tradeId, dataObj.toJSONString());sourceContext.collect(dataObj);}}@Overridepublic void cancel() {}};// 创建运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 构建数据源DataStreamSource<JSONObject> dataStreamSource = env.addSource(dataSource);// 数据源写入数据算子,进行输出到elasticsearchdataStreamSource.addSink(sink);// 执行任务env.execute();}/*** 描述:时间格式化工具类** @param timestamp 时间戳* @return {@code String }*/private static String dateUtil(long timestamp) {//时间戳加工timestamp = timestamp / 1000;// 将时间戳转换为 LocalDateTime 对象LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochSecond(timestamp), ZoneId.systemDefault());// 定义日期时间格式DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMdd");// 格式化日期时间对象为指定格式的字符串String dateTimeFormat = formatter.format(dateTime);return dateTimeFormat;}
}

启动上述作业后,根据对应的交易流水号查询es,或者查询es的索引数据,但是索引数据一般是一段时间才更新

验证1:检查索引数据变化
http://127.0.0.1:9200/_cat/indices?v

在这里插入图片描述

验证2:根据id查询es的文档记录

在这里插入图片描述
在这里插入图片描述

07 Elasticsearch Sink 进阶实战

进阶实战主要是包括ElasticsearchSink的各种参数配置,以及性能调优

7.1 包结构 & 项目配置

在这里插入图片描述

项目配置application.properties

es.cluster.hosts=localhost
es.cluster.port=9200
es.cluster.scheme=http
es.cluster.type=_doc
es.cluster.indexPrefix=flink_#内部批量处理器,刷新前最大缓存的操作数
es.cluster.bulkFlushMaxActions=1
#刷新前最大缓存的数据量(以兆字节为单位)
es.cluster.bulkFlushMaxSizeMb=10
#刷新的时间间隔(不论缓存操作的数量或大小如何)
es.cluster.bulkFlushInterval=10000#是否启用批量写入的退避策略,当Elasticsearch 写入失败时,可以启用退避策略,以避免频繁的重试。此时,setBulkFlushBackoffDelay 和 setBulkFlushBackoffRetries 参数生效。
es.cluster.bulkFlushBackoff=false
#设置批量写入的退避延迟时间,在发生写入失败后,等待指定的延迟时间后再进行重试
es.cluster.bulkFlushBackoffDelay=10000
#设置批量写入的最大重试次数,设置在写入失败后的最大重试次数。超过这个次数后,将不再重试
es.cluster.bulkFlushBackoffRetries=3#设置与 Elasticsearch 集群建立连接的超时时间,单位为毫秒。在指定的时间内无法建立连接将会抛出连接超时异常
es.cluster.connectTimeout=10000
#设置与 Elasticsearch 连接的套接字超时时间,单位为毫秒。该参数定义了在建立连接后从服务器读取数据的超时时间。
es.cluster.socketTimeout=10000
#设置连接请求超时时间,单位为毫秒。该参数表示从连接池获取连接的超时时间。如果在指定的时间内无法获得连接,将会抛出连接请求超时异常。
es.cluster.connectionRequestTimeout=10000
设置是否允许重定向。如果设置为true,则当遇到重定向响应时,客户端将跟随重定向并继续请求;如果设置为false,重定向响应将被视为错误。
es.cluster.redirectsEnabled=false
#客户端允许的最大重定向次数
es.cluster.maxRedirects=3#启用身份验证功能。通过设置该参数为true,可以提供用户名和密码进行身份验证,以连接到 Elasticsearch 集群。
es.cluster.authenticationEnabled=false
#设置是否允许循环重定向。如果设置为true,则允许在重定向过程中发生循环重定向;如果设置为false,则在检测到循环重定向时,将会抛出异常。
es.cluster.circularRedirectsAllowed=false
#设置是否启用内容压缩。如果设置为true,则允许客户端和 Elasticsearch 之间进行内容压缩,以减少数据传输量。
es.cluster.contentCompressionEnabled=false
#设置是否启用 "Expect: continue" 机制。当设置为true时,在发送请求之前,客户端会发送一个请求头部,询问服务器是否接受请求的主体部分。如果服务器响应允许继续发送请求主体,则客户端会继续发送请求;如果服务器响应拒绝继续发送请求主体,则客户端会放弃该请求。
es.cluster.expectContinueEnabled=false
#设置是否标准化 URI。如果设置为true,则客户端会尝试标准化请求 URI,以便消除多余和重复的斜杠等。
es.cluster.normalizeUri=false

日志配置log4j2.properties

rootLogger.level=INFO
rootLogger.appenderRef.console.ref=ConsoleAppender
appender.console.name=ConsoleAppender
appender.console.type=CONSOLE
appender.console.layout.type=PatternLayout
appender.console.layout.pattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
log.file=D:\\tmprootLogger.level=INFO
rootLogger.appenderRef.console.ref=ConsoleAppender
appender.console.name=ConsoleAppender
appender.console.type=CONSOLE
appender.console.layout.type=PatternLayout
appender.console.layout.pattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
log.file=D:\\tmp

项目pom.xml文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.aurora</groupId><artifactId>aurora_elasticsearch_connector</artifactId><version>1.0-SNAPSHOT</version><!--属性设置--><properties><!--java_JDK版本--><java.version>1.8</java.version><!--maven打包插件--><maven.plugin.version>3.8.1</maven.plugin.version><!--编译编码UTF-8--><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><!--输出报告编码UTF-8--><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><!--json数据格式处理工具--><fastjson.version>1.2.75</fastjson.version><!--log4j版本--><log4j.version>2.17.1</log4j.version><!--flink版本--><flink.version>1.14.4</flink.version><!--scala版本--><scala.binary.version>2.12</scala.binary.version></properties><!--依赖管理--><dependencies><!-- fastJson工具类依赖 start --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>${fastjson.version}</version></dependency><!-- fastJson工具类依赖 end --><!-- log4j日志框架依赖 start --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>${log4j.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>${log4j.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>${log4j.version}</version></dependency><!-- log4j日志框架依赖 end --><!-- Flink基础依赖 start --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- Flink基础依赖 end --><!-- Flink Elasticsearch 连接器依赖 start --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch7_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- Flink Elasticsearch 连接器依赖 end --></dependencies><!--编译打包--><build><finalName>${project.name}</finalName><!--资源文件打包--><resources><resource><directory>src/main/resources</directory></resource><resource><directory>src/main/java</directory><includes><include>**/*.xml</include></includes></resource></resources><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>org.apache.flink:force-shading</exclude><exclude>org.google.code.flindbugs:jar305</exclude><exclude>org.slf4j:*</exclude><excluder>org.apache.logging.log4j:*</excluder></excludes></artifactSet><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>com.aurora.demo,ElasticsearchSinkStreamingJobDemo</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins><!--插件统一管理--><pluginManagement><plugins><!--maven打包插件--><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><version>${spring.boot.version}</version><configuration><fork>true</fork><finalName>${project.build.finalName}</finalName></configuration><executions><execution><goals><goal>repackage</goal></goals></execution></executions></plugin><!--编译打包插件--><plugin><artifactId>maven-compiler-plugin</artifactId><version>${maven.plugin.version}</version><configuration><source>${java.version}</source><target>${java.version}</target><encoding>UTF-8</encoding><compilerArgs><arg>-parameters</arg></compilerArgs></configuration></plugin></plugins></pluginManagement></build></project>

7.2 实体类ElasticsearchEntity

package com.aurora.advanced;import java.io.Serializable;/*** 描述:elasticsearch实体类** @author 浅夏的猫* @version 1.0.0* @date 2024-02-10 20:08:20*/
public class ElasticsearchEntity implements Serializable {private static final long serialVersionUID = 1L;/*** 集群地址* */private String hosts;/*** 集群端口* */private Integer port;/***执行计划* */private String scheme;/*** 文档类型,es7一般都是_doc* */private String type;/*** 索引前缀* */private String indexPrefix;/*** 内部批量处理器,刷新前最大缓存的操作数* */private Integer bulkFlushMaxActions=1;/*** 刷新前最大缓存的数据量(以兆字节为单位)* */private Integer bulkFlushMaxSizeMb=10;/*** 刷新的时间间隔(不论缓存操作的数量或大小如何)* */private Integer bulkFlushInterval=10000;/*** 是否启用批量写入的退避策略,当Elasticsearch 写入失败时,可以启用退避策略,以避免频繁的重试。* 此时,setBulkFlushBackoffDelay 和 setBulkFlushBackoffRetries 参数生效。* */private Boolean bulkFlushBackoff=false;/*** 设置批量写入的退避延迟时间,在发生写入失败后,等待指定的延迟时间后再进行重试* */private Integer bulkFlushBackoffDelay=10000;/*** 设置批量写入的最大重试次数,设置在写入失败后的最大重试次数。超过这个次数后,将不再重试* */private Integer bulkFlushBackoffRetries=3;/*** 设置与 Elasticsearch 集群建立连接的超时时间,单位为毫秒。在指定的时间内无法建立连接将会抛出连接超时异常* */private Integer connectTimeout=10000;/*** 设置与 Elasticsearch 连接的套接字超时时间,单位为毫秒。该参数定义了在建立连接后从服务器读取数据的超时时间。* */private Integer socketTimeout=10000;/*** 设置连接请求超时时间,单位为毫秒。该参数表示从连接池获取连接的超时时间。如果在指定的时间内无法获得连接,将会抛出连接请求超时异常。* */private Integer connectionRequestTimeout=10000;/*** 设置是否允许重定向。如果设置为true,则当遇到重定向响应时,客户端将跟随重定向并继续请求;如果设置为false,重定向响应将被视为错误。* */private Boolean redirectsEnabled=false;/*** 客户端允许的最大重定向次数* */private Integer maxRedirects=3;/*** 启用身份验证功能。通过设置该参数为true,可以提供用户名和密码进行身份验证,以连接到 Elasticsearch 集群。* */private Boolean authenticationEnabled=true;/*** 设置是否允许循环重定向。如果设置为true,则允许在重定向过程中发生循环重定向;如果设置为false,则在检测到循环重定向时,将会抛出异常。* */private Boolean circularRedirectsAllowed=false;/*** 设置是否启用内容压缩。如果设置为true,则允许客户端和 Elasticsearch 之间进行内容压缩,以减少数据传输量。* */private Boolean contentCompressionEnabled=false;/*** 设置是否启用 "Expect: continue" 机制。当设置为true时,在发送请求之前,客户端会发送一个请求头部,询问服务器是否接受请求的主体部分。* 如果服务器响应允许继续发送请求主体,则客户端会继续发送请求;如果服务器响应拒绝继续发送请求主体,则客户端会放弃该请求。* */private Boolean expectContinueEnabled=false;/*** 设置是否标准化 URI。如果设置为true,则客户端会尝试标准化请求 URI,以便消除多余和重复的斜杠等。* */private Boolean normalizeUri=false;/*** 用于设置 HTTP 请求的路径前缀。* 这个配置选项通常用于设置反向代理或者负载均衡器等中间件与 Elasticsearch 集群之间的连接* */private String pathPrefix;public String getHosts() {return hosts;}public void setHosts(String hosts) {this.hosts = hosts;}public Integer getPort() {return port;}public void setPort(Integer port) {this.port = port;}public String getScheme() {return scheme;}public void setScheme(String scheme) {this.scheme = scheme;}public String getType() {return type;}public void setType(String type) {this.type = type;}public String getIndexPrefix() {return indexPrefix;}public void setIndexPrefix(String indexPrefix) {this.indexPrefix = indexPrefix;}public Integer getBulkFlushMaxActions() {return bulkFlushMaxActions;}public void setBulkFlushMaxActions(Integer bulkFlushMaxActions) {this.bulkFlushMaxActions = bulkFlushMaxActions;}public Integer getBulkFlushMaxSizeMb() {return bulkFlushMaxSizeMb;}public void setBulkFlushMaxSizeMb(Integer bulkFlushMaxSizeMb) {this.bulkFlushMaxSizeMb = bulkFlushMaxSizeMb;}public Integer getBulkFlushInterval() {return bulkFlushInterval;}public void setBulkFlushInterval(Integer bulkFlushInterval) {this.bulkFlushInterval = bulkFlushInterval;}public Boolean getBulkFlushBackoff() {return bulkFlushBackoff;}public void setBulkFlushBackoff(Boolean bulkFlushBackoff) {this.bulkFlushBackoff = bulkFlushBackoff;}public Integer getBulkFlushBackoffDelay() {return bulkFlushBackoffDelay;}public void setBulkFlushBackoffDelay(Integer bulkFlushBackoffDelay) {this.bulkFlushBackoffDelay = bulkFlushBackoffDelay;}public Integer getBulkFlushBackoffRetries() {return bulkFlushBackoffRetries;}public void setBulkFlushBackoffRetries(Integer bulkFlushBackoffRetries) {this.bulkFlushBackoffRetries = bulkFlushBackoffRetries;}public Integer getConnectTimeout() {return connectTimeout;}public void setConnectTimeout(Integer connectTimeout) {this.connectTimeout = connectTimeout;}public Integer getSocketTimeout() {return socketTimeout;}public void setSocketTimeout(Integer socketTimeout) {this.socketTimeout = socketTimeout;}public Integer getConnectionRequestTimeout() {return connectionRequestTimeout;}public void setConnectionRequestTimeout(Integer connectionRequestTimeout) {this.connectionRequestTimeout = connectionRequestTimeout;}public Boolean getRedirectsEnabled() {return redirectsEnabled;}public void setRedirectsEnabled(Boolean redirectsEnabled) {this.redirectsEnabled = redirectsEnabled;}public Integer getMaxRedirects() {return maxRedirects;}public void setMaxRedirects(Integer maxRedirects) {this.maxRedirects = maxRedirects;}public Boolean getAuthenticationEnabled() {return authenticationEnabled;}public void setAuthenticationEnabled(Boolean authenticationEnabled) {this.authenticationEnabled = authenticationEnabled;}public Boolean getCircularRedirectsAllowed() {return circularRedirectsAllowed;}public void setCircularRedirectsAllowed(Boolean circularRedirectsAllowed) {this.circularRedirectsAllowed = circularRedirectsAllowed;}public Boolean getContentCompressionEnabled() {return contentCompressionEnabled;}public void setContentCompressionEnabled(Boolean contentCompressionEnabled) {this.contentCompressionEnabled = contentCompressionEnabled;}public Boolean getExpectContinueEnabled() {return expectContinueEnabled;}public void setExpectContinueEnabled(Boolean expectContinueEnabled) {this.expectContinueEnabled = expectContinueEnabled;}public Boolean getNormalizeUri() {return normalizeUri;}public void setNormalizeUri(Boolean normalizeUri) {this.normalizeUri = normalizeUri;}public String getPathPrefix() {return pathPrefix;}public void setPathPrefix(String pathPrefix) {this.pathPrefix = pathPrefix;}
}

7.3 客户端工厂类CustomRestClientFactory

作用:设置用于创建 Elasticsearch REST 客户端的工厂,可以自定义创建 Elasticsearch REST 客户端的逻辑,实现 ElasticsearchSinkBase.RestClientFactory 接口

package com.aurora.advanced;import org.apache.commons.lang3.StringUtils;
import org.apache.flink.streaming.connectors.elasticsearch7.RestClientFactory;
import org.apache.http.Header;
import org.apache.http.message.BasicHeader;
import org.elasticsearch.client.NodeSelector;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;/*** 描述:设置用于创建 Elasticsearch REST 客户端的工厂* 解释:可以自定义创建 Elasticsearch REST 客户端的逻辑,实现 ElasticsearchSinkBase.RestClientFactory 接口** @author 浅夏的猫* @version 1.0.0* @date 2024-02-13 00:12:15*/
public class CustomRestClientFactory implements RestClientFactory {private ElasticsearchEntity elasticsearchEntity;public CustomRestClientFactory(ElasticsearchEntity elasticsearchEntity) {this.elasticsearchEntity = elasticsearchEntity;}@Overridepublic void configureRestClientBuilder(RestClientBuilder restClientBuilder) {//设置默认的 HTTP 头部信息,这些信息将在每个请求中包含Header contentType = new BasicHeader("Content-Type", "application/json");Header authorization = new BasicHeader("Authorization", "Bearer your_access_token");Header[] headers = {contentType, authorization};restClientBuilder.setDefaultHeaders(headers);//设置用于监听节点故障的监听器。当节点发生故障时,可以执行特定的操作restClientBuilder.setFailureListener(new RestClient.FailureListener());//配置用于选择与之通信的节点的策略。这涉及到 Elasticsearch 集群中多个节点的选择。restClientBuilder.setNodeSelector(NodeSelector.ANY);//为每个请求设置路径前缀。这可以用于将请求定向到特定的子路径。if(StringUtils.isNoneBlank(elasticsearchEntity.getPathPrefix())){restClientBuilder.setPathPrefix(elasticsearchEntity.getPathPrefix());}//允许在创建每个请求的时候进行额外的请求配置。restClientBuilder.setRequestConfigCallback(new CustomRequestConfigCallback(elasticsearchEntity));//允许在创建 CloseableHttpClient 实例时进行额外的 HTTP 客户端配置。restClientBuilder.setHttpClientConfigCallback(new CustomHttpClientConfigCallback(elasticsearchEntity));//设置是否启用严格的废弃模式,用于警告有关已弃用功能的使用。restClientBuilder.setStrictDeprecationMode(false);}
}

7.4 回调函数类CustomRequestConfigCallback

作用:允许在创建每个请求的时候进行额外的请求配置

package com.aurora.advanced;import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.cookie.DefaultCookieSpec;
import org.elasticsearch.client.RestClientBuilder;/*** 描述:* 允许在创建每个请求的时候进行额外的请求配置* @author 浅夏的猫* @version 1.0.0* @date 2024-02-13 23:24:42*/
public class CustomRequestConfigCallback implements RestClientBuilder.RequestConfigCallback {private ElasticsearchEntity elasticsearchEntity;public CustomRequestConfigCallback(ElasticsearchEntity elasticsearchEntity) {this.elasticsearchEntity = elasticsearchEntity;}@Overridepublic RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder custom) {// 设置与 Elasticsearch 集群建立连接的超时时间,单位为毫秒。在指定的时间内无法建立连接将会抛出连接超时异常custom.setConnectTimeout(elasticsearchEntity.getConnectTimeout());// 设置与 Elasticsearch 连接的套接字超时时间,单位为毫秒。该参数定义了在建立连接后从服务器读取数据的超时时间。如果在指定的时间内没有读取到数据,将会抛出超时异常。custom.setSocketTimeout(elasticsearchEntity.getSocketTimeout());// 设置连接请求超时时间,单位为毫秒。该参数表示从连接池获取连接的超时时间。如果在指定的时间内无法获得连接,将会抛出连接请求超时异常。custom.setConnectionRequestTimeout(elasticsearchEntity.getConnectionRequestTimeout());// 设置是否允许重定向。如果设置为true,则当遇到重定向响应时,客户端将跟随重定向并继续请求;如果设置为false,重定向响应将被视为错误。custom.setRedirectsEnabled(elasticsearchEntity.getRedirectsEnabled());// 设置最大重定向次数。当允许重定向时,该参数指定在遇到重定向响应时,最多可以重定向的次数。custom.setMaxRedirects(elasticsearchEntity.getMaxRedirects());// 设置是否允许循环重定向。如果设置为true,则允许在重定向过程中发生循环重定向;如果设置为false,则在检测到循环重定向时,将会抛出异常。custom.setCircularRedirectsAllowed(elasticsearchEntity.getCircularRedirectsAllowed());// 设置是否启用内容压缩。如果设置为true,则允许客户端和 Elasticsearch 之间进行内容压缩,以减少数据传输量。custom.setContentCompressionEnabled(elasticsearchEntity.getContentCompressionEnabled());// 设置是否启用 "Expect: continue" 机制。当设置为true时,在发送请求之前,客户端会发送一个请求头部,询问服务器是否接受请求的主体部分。//  如果服务器响应允许继续发送请求主体,则客户端会继续发送请求;如果服务器响应拒绝继续发送请求主体,则客户端会放弃该请求。custom.setExpectContinueEnabled(elasticsearchEntity.getExpectContinueEnabled());// 设置是否标准化 URI。如果设置为true,则客户端会尝试标准化请求 URI,以便消除多余和重复的斜杠等。custom.setNormalizeUri(elasticsearchEntity.getNormalizeUri());// 设置使用的 Cookie 规范。可以指定客户端在处理与 Elasticsearch 服务器之间的 Cookie 交互时使用的 Cookie 规范custom.setCookieSpec(new DefaultCookieSpec().toString());return custom;}
}

7.5 客户端配置类CustomHttpClientConfigCallback

作用:允许在创建 CloseableHttpClient 实例时进行额外的 HTTP 客户端配置

package com.aurora.advanced;import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.cookie.DefaultCookieSpec;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.client.RestClientBuilder;/*** 描述:客户端配置* 允许在创建 CloseableHttpClient 实例时进行额外的 HTTP 客户端配置** @author 浅夏的猫* @version 1.0.0* @date 2024-02-13 23:28:15*/
public class CustomHttpClientConfigCallback implements RestClientBuilder.HttpClientConfigCallback {private ElasticsearchEntity elasticsearchEntity;CustomHttpClientConfigCallback(ElasticsearchEntity elasticsearchEntity) {this.elasticsearchEntity = elasticsearchEntity;}@Overridepublic HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {RequestConfig.Builder custom = RequestConfig.custom();// 设置与 Elasticsearch 集群建立连接的超时时间,单位为毫秒。在指定的时间内无法建立连接将会抛出连接超时异常custom.setConnectTimeout(elasticsearchEntity.getConnectTimeout());// 设置与 Elasticsearch 连接的套接字超时时间,单位为毫秒。该参数定义了在建立连接后从服务器读取数据的超时时间。如果在指定的时间内没有读取到数据,将会抛出超时异常。custom.setSocketTimeout(elasticsearchEntity.getSocketTimeout());// 设置连接请求超时时间,单位为毫秒。该参数表示从连接池获取连接的超时时间。如果在指定的时间内无法获得连接,将会抛出连接请求超时异常。custom.setConnectionRequestTimeout(elasticsearchEntity.getConnectionRequestTimeout());// 设置是否允许重定向。如果设置为true,则当遇到重定向响应时,客户端将跟随重定向并继续请求;如果设置为false,重定向响应将被视为错误。custom.setRedirectsEnabled(elasticsearchEntity.getRedirectsEnabled());// 设置最大重定向次数。当允许重定向时,该参数指定在遇到重定向响应时,最多可以重定向的次数。custom.setMaxRedirects(elasticsearchEntity.getMaxRedirects());// 启用身份验证功能。通过设置该参数为true,可以提供用户名和密码进行身份验证,以连接到 Elasticsearch 集群。custom.setAuthenticationEnabled(elasticsearchEntity.getAuthenticationEnabled());// 设置是否允许循环重定向。如果设置为true,则允许在重定向过程中发生循环重定向;如果设置为false,则在检测到循环重定向时,将会抛出异常。custom.setCircularRedirectsAllowed(elasticsearchEntity.getCircularRedirectsAllowed());// 设置是否启用内容压缩。如果设置为true,则允许客户端和 Elasticsearch 之间进行内容压缩,以减少数据传输量。custom.setContentCompressionEnabled(elasticsearchEntity.getContentCompressionEnabled());// 设置是否启用 "Expect: continue" 机制。当设置为true时,在发送请求之前,客户端会发送一个请求头部,询问服务器是否接受请求的主体部分。//  如果服务器响应允许继续发送请求主体,则客户端会继续发送请求;如果服务器响应拒绝继续发送请求主体,则客户端会放弃该请求。custom.setExpectContinueEnabled(elasticsearchEntity.getExpectContinueEnabled());// 设置是否标准化 URI。如果设置为true,则客户端会尝试标准化请求 URI,以便消除多余和重复的斜杠等。custom.setNormalizeUri(elasticsearchEntity.getNormalizeUri());// 设置使用的 Cookie 规范。可以指定客户端在处理与 Elasticsearch 服务器之间的 Cookie 交互时使用的 Cookie 规范custom.setCookieSpec(new DefaultCookieSpec().toString());return httpAsyncClientBuilder.setDefaultRequestConfig(custom.build());}
}

7.6 Es操作类CustomElasticsearchSinkFunction

作用:实时把数据写入到队列中,再通过批量提交到Elasticsearch中,实现数据写入

package com.aurora.advanced;import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.xcontent.XContentType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/*** 描述:自定义elasticsearch sink 算子函数* ElasticsearchSinkFunction 是用于将数据流写入 Elasticsearch 的接口。* 它允许您自定义如何将 Flink 流式处理的数据写入 Elasticsearch 索引** @author 浅夏的猫* @version 1.0.0* @date 2024-02-12 23:49:22*/
public class CustomElasticsearchSinkFunction implements ElasticsearchSinkFunction<JSONObject> {private static final Logger logger = LoggerFactory.getLogger(CustomElasticsearchSinkFunction.class);private ElasticsearchEntity elasticsearchEntity;public CustomElasticsearchSinkFunction(ElasticsearchEntity elasticsearchEntity) {this.elasticsearchEntity = elasticsearchEntity;}@Overridepublic void process(JSONObject element, RuntimeContext runtimeContext, RequestIndexer indexer) {String transId = element.getString("transId");String tradeTime = element.getString("tradeTime");String index = elasticsearchEntity.getIndexPrefix() + tradeTime;logger.info("交易流水={},数据写入索引{}成功", tradeTime, index);IndexRequest indexRequest = Requests.indexRequest().index(index).type(elasticsearchEntity.getType()).id(transId).source(element, XContentType.JSON);indexer.add(indexRequest);}
}

7.7 异常处理类CustomActionRequestFailureHandler

作用:当sink写Elasticsearch出现异常时,可以自定义操作策略

package com.aurora.advanced;import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/*** 描述:es写入异常处理** @author 浅夏的猫* @version 1.0.0* @date 2024-02-13 00:04:24*/
public class CustomActionRequestFailureHandler implements ActionRequestFailureHandler {private static final Logger logger = LoggerFactory.getLogger(CustomActionRequestFailureHandler.class);@Overridepublic void onFailure(ActionRequest action, Throwable throwable, int restStatusCode, RequestIndexer requestIndexer) throws Throwable {// 处理不同类型的异常if (throwable instanceof EsRejectedExecutionException) {// 如果是由于线程池饱和导致的拒绝执行异常,可以采取相应的处理措施logger.warn("Elasticsearch action execution was rejected due to thread pool saturation.");// 这里你可以选择执行重试或者其他处理逻辑,例如将数据写入到一个备用存储// 例如: indexer.add(createAnotherRequest(action));} else {// 对于其他类型的异常,默认返回放弃策略logger.error("Unhandled failure, abandoning request: {}", action.toString());}}
}

7.8 作业主类ElasticsearchSinkStreamJobAdvancedDemo

package com.aurora.advanced;import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.IOException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;/*** 描述:Flink集成Elasticsearch Connector连接器进阶Demo* 实现实时数据流如何无缝地流向Elasticsearch** @author 浅夏的猫* @version 1.0.0* @date 2024-02-11 22:06:45*/
public class ElasticsearchSinkStreamJobAdvancedDemo {private static final Logger logger = LoggerFactory.getLogger(ElasticsearchSinkStreamJobAdvancedDemo.class);public static void main(String[] args) {try {// 读取配置参数ElasticsearchEntity elasticsearchEntity = paramsInit();// 设置elasticsearch节点List<HttpHost> httpHosts = esClusterHttpHostHandler(elasticsearchEntity);// 创建esSinkFunction函数ElasticsearchSinkFunction<JSONObject> esSinkFunction = new CustomElasticsearchSinkFunction(elasticsearchEntity);// 构建ElasticsearchSink算子builderElasticsearchSink.Builder<JSONObject> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts, esSinkFunction);// es参数配置esBuilderHandler(esSinkBuilder, elasticsearchEntity);// 构建sink算子ElasticsearchSink<JSONObject> esSink = esSinkBuilder.build();// 自定义数据源,模拟生产环境交易接入,json格式数据SourceFunction<JSONObject> dataSource = new SourceFunction<JSONObject>() {@Overridepublic void run(SourceContext sourceContext) throws Exception {while (true) {//交易流水号String tradeId = UUID.randomUUID().toString();//交易发生时间戳long timeStamp = System.currentTimeMillis();//交易发生金额long tradeAmount = new Random().nextInt(100);//交易名称String tradeName = "支付宝转账";JSONObject dataObj = new JSONObject();dataObj.put("transId", tradeId);dataObj.put("timeStamp", timeStamp);dataObj.put("tradeTime", dateUtil(timeStamp));dataObj.put("tradeAmount", tradeAmount);dataObj.put("tradeName", tradeName);//模拟生产,每隔1秒生成一笔交易Thread.sleep(1000);logger.info("交易接入,原始报文={}", dataObj.toJSONString());sourceContext.collect(dataObj);}}@Overridepublic void cancel() {}};// 创建运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 构建数据源DataStreamSource<JSONObject> dataStreamSource = env.addSource(dataSource);// 构建sink算子dataStreamSource.addSink(esSink);// 运行作业env.execute();} catch (Exception e) {e.printStackTrace();}}/*** 描述:Flink参数配置读取** @return {@code ElasticsearchEntity }* @throws IOException*/private static ElasticsearchEntity paramsInit() throws IOException {// 通过flink内置工具类获取命令行参数String propertiesFilePath = "E:\\project\\aurora_dev\\aurora_flink_connector_elasticsearch\\src\\main\\resources\\application.properties";ParameterTool paramsMap = ParameterTool.fromPropertiesFile(propertiesFilePath);ElasticsearchEntity elasticsearchEntity = new ElasticsearchEntity();String hosts = paramsMap.get("es.cluster.hosts");int port = paramsMap.getInt("es.cluster.port");String scheme = paramsMap.get("es.cluster.scheme");String type = paramsMap.get("es.cluster.type");String indexPrefix = paramsMap.get("es.cluster.indexPrefix");int bulkFlushMaxActions = paramsMap.getInt("es.cluster.bulkFlushMaxActions");int bulkFlushMaxSizeMb = paramsMap.getInt("es.cluster.bulkFlushMaxSizeMb");int bulkFlushInterval = paramsMap.getInt("es.cluster.bulkFlushInterval");boolean bulkFlushBackoff = paramsMap.getBoolean("es.cluster.bulkFlushBackoff");int bulkFlushBackoffDelay = paramsMap.getInt("es.cluster.bulkFlushBackoffDelay");int bulkFlushBackoffRetries = paramsMap.getInt("es.cluster.bulkFlushBackoffRetries");int connectTimeout = paramsMap.getInt("es.cluster.connectTimeout");int socketTimeout = paramsMap.getInt("es.cluster.socketTimeout");int connectionRequestTimeout = paramsMap.getInt("es.cluster.connectionRequestTimeout");boolean redirectsEnabled = paramsMap.getBoolean("es.cluster.redirectsEnabled");int maxRedirects = paramsMap.getInt("es.cluster.maxRedirects");boolean authenticationEnabled = paramsMap.getBoolean("es.cluster.authenticationEnabled");boolean circularRedirectsAllowed = paramsMap.getBoolean("es.cluster.circularRedirectsAllowed");boolean contentCompressionEnabled = paramsMap.getBoolean("es.cluster.contentCompressionEnabled");boolean expectContinueEnabled = paramsMap.getBoolean("es.cluster.expectContinueEnabled");boolean normalizeUri = paramsMap.getBoolean("es.cluster.normalizeUri");elasticsearchEntity.setHosts(hosts);elasticsearchEntity.setPort(port);elasticsearchEntity.setScheme(scheme);elasticsearchEntity.setType(type);elasticsearchEntity.setIndexPrefix(indexPrefix);elasticsearchEntity.setBulkFlushMaxActions(bulkFlushMaxActions);elasticsearchEntity.setBulkFlushMaxSizeMb(bulkFlushMaxSizeMb);elasticsearchEntity.setBulkFlushInterval(bulkFlushInterval);elasticsearchEntity.setBulkFlushBackoff(bulkFlushBackoff);elasticsearchEntity.setBulkFlushBackoffDelay(bulkFlushBackoffDelay);elasticsearchEntity.setBulkFlushBackoffRetries(bulkFlushBackoffRetries);elasticsearchEntity.setConnectTimeout(connectTimeout);elasticsearchEntity.setSocketTimeout(socketTimeout);elasticsearchEntity.setConnectionRequestTimeout(connectionRequestTimeout);elasticsearchEntity.setRedirectsEnabled(redirectsEnabled);elasticsearchEntity.setMaxRedirects(maxRedirects);elasticsearchEntity.setAuthenticationEnabled(authenticationEnabled);elasticsearchEntity.setCircularRedirectsAllowed(circularRedirectsAllowed);elasticsearchEntity.setExpectContinueEnabled(expectContinueEnabled);elasticsearchEntity.setContentCompressionEnabled(contentCompressionEnabled);elasticsearchEntity.setNormalizeUri(normalizeUri);return elasticsearchEntity;}/*** 描述:时间格式化工具类** @param timestamp 时间戳* @return {@code String }*/private static String dateUtil(long timestamp) {timestamp = timestamp / 1000;// 将时间戳转换为 LocalDateTime 对象LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochSecond(timestamp), ZoneId.systemDefault());// 定义日期时间格式DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMdd");// 格式化日期时间对象为指定格式的字符串String dateTimeFormat = formatter.format(dateTime);return dateTimeFormat;}/*** 描述:es参数配置** @param esSinkBuilder       esSinkBuilder建造器* @param elasticsearchEntity es实体类*/private static void esBuilderHandler(ElasticsearchSink.Builder<JSONObject> esSinkBuilder, ElasticsearchEntity elasticsearchEntity) {// 设置触发批量写入的最大动作数,// 解释:当达到指定的最大动作数时,将触发批量写入到 Elasticsearch。如果你希望在每次写入到 Elasticsearch 时都进行批量写入,可以将该值设置为 1esSinkBuilder.setBulkFlushMaxActions(elasticsearchEntity.getBulkFlushMaxActions());// 设置触发批量写入的最大数据量// 解释:当写入的数据量达到指定的最大值时,将触发批量写入到 Elasticsearch。单位为 MBesSinkBuilder.setBulkFlushMaxSizeMb(elasticsearchEntity.getBulkFlushMaxSizeMb());// 设置批量写入的时间间隔// 解释:每隔指定的时间间隔,无论是否达到最大动作数或最大数据量,都会触发批量写入esSinkBuilder.setBulkFlushInterval(elasticsearchEntity.getBulkFlushInterval());// 启用批量写入的退避策略// 解释:当 Elasticsearch 写入失败时,可以启用退避策略,以避免频繁的重试。此时,setBulkFlushBackoffDelay 和 setBulkFlushBackoffRetries 参数生效。esSinkBuilder.setBulkFlushBackoff(elasticsearchEntity.getBulkFlushBackoff());// 设置批量写入的退避延迟时间// 解释:在发生写入失败后,等待指定的延迟时间后再进行重试esSinkBuilder.setBulkFlushBackoffDelay(elasticsearchEntity.getBulkFlushBackoffDelay());// 设置批量写入的最大重试次数// 解释:设置在写入失败后的最大重试次数。超过这个次数后,将不再重试esSinkBuilder.setBulkFlushBackoffRetries(elasticsearchEntity.getBulkFlushBackoffRetries());// 设置写入失败时的处理策略// 解释:可以自定义处理失败的策略,实现 ElasticsearchSinkFunction.FailureHandler 接口esSinkBuilder.setFailureHandler(new CustomActionRequestFailureHandler());// 设置用于创建 Elasticsearch REST 客户端的工厂// 解释:可以自定义创建 Elasticsearch REST 客户端的逻辑,实现 ElasticsearchSinkBase.RestClientFactory 接口esSinkBuilder.setRestClientFactory(new CustomRestClientFactory(elasticsearchEntity));}/*** 描述:* elasticsearch 节点配置** @param elasticsearchEntity es实体类* @return {@code List<HttpHost> }*/private static List<HttpHost> esClusterHttpHostHandler(ElasticsearchEntity elasticsearchEntity) {List<HttpHost> httpHosts = new ArrayList<>();String[] clusterArray = elasticsearchEntity.getHosts().split(",");for (String node : clusterArray) {httpHosts.add(new HttpHost(node, elasticsearchEntity.getPort(), elasticsearchEntity.getScheme()));}return httpHosts;}
}

这篇关于【天衍系列 04】深入理解Flink的ElasticsearchSink组件:实时数据流如何无缝地流向Elasticsearch的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/716022

相关文章

Spring Security 从入门到进阶系列教程

Spring Security 入门系列 《保护 Web 应用的安全》 《Spring-Security-入门(一):登录与退出》 《Spring-Security-入门(二):基于数据库验证》 《Spring-Security-入门(三):密码加密》 《Spring-Security-入门(四):自定义-Filter》 《Spring-Security-入门(五):在 Sprin

JS常用组件收集

收集了一些平时遇到的前端比较优秀的组件,方便以后开发的时候查找!!! 函数工具: Lodash 页面固定: stickUp、jQuery.Pin 轮播: unslider、swiper 开关: switch 复选框: icheck 气泡: grumble 隐藏元素: Headroom

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

【前端学习】AntV G6-08 深入图形与图形分组、自定义节点、节点动画(下)

【课程链接】 AntV G6:深入图形与图形分组、自定义节点、节点动画(下)_哔哩哔哩_bilibili 本章十吾老师讲解了一个复杂的自定义节点中,应该怎样去计算和绘制图形,如何给一个图形制作不间断的动画,以及在鼠标事件之后产生动画。(有点难,需要好好理解) <!DOCTYPE html><html><head><meta charset="UTF-8"><title>06

认识、理解、分类——acm之搜索

普通搜索方法有两种:1、广度优先搜索;2、深度优先搜索; 更多搜索方法: 3、双向广度优先搜索; 4、启发式搜索(包括A*算法等); 搜索通常会用到的知识点:状态压缩(位压缩,利用hash思想压缩)。

深入探索协同过滤:从原理到推荐模块案例

文章目录 前言一、协同过滤1. 基于用户的协同过滤(UserCF)2. 基于物品的协同过滤(ItemCF)3. 相似度计算方法 二、相似度计算方法1. 欧氏距离2. 皮尔逊相关系数3. 杰卡德相似系数4. 余弦相似度 三、推荐模块案例1.基于文章的协同过滤推荐功能2.基于用户的协同过滤推荐功能 前言     在信息过载的时代,推荐系统成为连接用户与内容的桥梁。本文聚焦于

如何在页面调用utility bar并传递参数至lwc组件

1.在app的utility item中添加lwc组件: 2.调用utility bar api的方式有两种: 方法一,通过lwc调用: import {LightningElement,api ,wire } from 'lwc';import { publish, MessageContext } from 'lightning/messageService';import Ca

C#实战|大乐透选号器[6]:实现实时显示已选择的红蓝球数量

哈喽,你好啊,我是雷工。 关于大乐透选号器在前面已经记录了5篇笔记,这是第6篇; 接下来实现实时显示当前选中红球数量,蓝球数量; 以下为练习笔记。 01 效果演示 当选择和取消选择红球或蓝球时,在对应的位置显示实时已选择的红球、蓝球的数量; 02 标签名称 分别设置Label标签名称为:lblRedCount、lblBlueCount

科研绘图系列:R语言扩展物种堆积图(Extended Stacked Barplot)

介绍 R语言的扩展物种堆积图是一种数据可视化工具,它不仅展示了物种的堆积结果,还整合了不同样本分组之间的差异性分析结果。这种图形表示方法能够直观地比较不同物种在各个分组中的显著性差异,为研究者提供了一种有效的数据解读方式。 加载R包 knitr::opts_chunk$set(warning = F, message = F)library(tidyverse)library(phyl

【生成模型系列(初级)】嵌入(Embedding)方程——自然语言处理的数学灵魂【通俗理解】

【通俗理解】嵌入(Embedding)方程——自然语言处理的数学灵魂 关键词提炼 #嵌入方程 #自然语言处理 #词向量 #机器学习 #神经网络 #向量空间模型 #Siri #Google翻译 #AlexNet 第一节:嵌入方程的类比与核心概念【尽可能通俗】 嵌入方程可以被看作是自然语言处理中的“翻译机”,它将文本中的单词或短语转换成计算机能够理解的数学形式,即向量。 正如翻译机将一种语言