kafka Interceptors and Listeners

2024-03-06 10:04

本文主要是介绍kafka Interceptors and Listeners,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Interceptors

ProducerInterceptor

https://www.cnblogs.com/huxi2b/p/7072447.html

Producer拦截器(interceptor)是个相当新的功能,它和consumer端interceptor是在Kafka 0.10版本被引入的,主要用于实现clients端的定制化控制逻辑。

对于producer而言,interceptor使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,producer允许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain),按照指定顺序调用它们.

API

public interface ProducerInterceptor<K, V> extends Configurable, AutoCloseable {//该方法封装进KafkaProducer.send方法中,即它运行在用户主线程中的。Producer确保在消息被序列化以计算分区**前**调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);//该方法会在消息被应答之前或消息发送失败时调用,并且通常都是在producer回调逻辑触发之前。onAcknowledgement运行在producer的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢producer的消息发送效率void onAcknowledgement(RecordMetadata metadata, Exception exception);//关闭interceptor,主要用于执行一些资源清理工作void close();
}

demo

    public static void main(String[] args) throws ExecutionException, InterruptedException {Map<String, Object> props = new HashMap();props.put("bootstrap.servers", "localhost:9092");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, RawSerializer.class);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, RawSerializer.class);List<String> interceptors = new ArrayList<>();interceptors.add("cn.jhs.kakfa.p.interceptor.TimeStampInterceptor"); // interceptor 1interceptors.add("cn.jhs.kakfa.p.interceptor.CounterInterceptor"); // interceptor 2props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);String topic = "test-topic";Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<>(topic, "message:" + i);producer.send(record).get();}// 一定要关闭producer,这样才会调用interceptor的close方法producer.close();}
}

ConsumerInterceptor

https://blog.csdn.net/warybee/article/details/121980296

消费者(Consumer)在拉取了分区消息之后,要首先经过反序列化器对key和value进行反序列化处理,处理完之后,如果消费端设置了拦截器,则需要经过拦截器的处理之后,才能返回给消费者应用程 序进行处理。

  • ConsumerInterceptor允许拦截甚至更改消费者接收到的消息。
  • 常用在于将第三方组件引入 消费者应用程序,用于定制的监控、日志处理等。
  • ConsumerInterceptor方法抛出的异常会被捕获、记录,但是不会向下传播。如果用户配置了 错误的key或value类型参数,消费者不会抛出异常,而仅仅是记录下来。
  • 如果有多个拦截器,则该方法按照KafkaConsumer的configs中配置的顺序调用。
  • 从调用 KafkaConsumer.poll(long) 的同一线程调用 ConsumerInterceptor 回调。

API

public interface ConsumerInterceptor<K, V> extends Configurable, AutoCloseable {/**该方法在poll方法返回之前调用。调用结束后poll方法就返回消息了。该方法可以修改消费者消息,返回新的消息。拦截器可以过滤收到的消息或生成新的消息。*/ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);/**当消费者提交偏移量时,调用该方法。通常你可以在该方法中做一些记账类的动作,比如打日志等。调用者将忽略此方法抛出的任何异常。*/void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);/***  关闭Interceptor之前调用*/void close();
}

配置

//如果有多个拦截器,用,分割即可
configs.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.warybee.interceptor.MyConsumerInterceptor");

Listeners

ProducerListener

https://blog.csdn.net/u014494148/article/details/125344184

Kafka提供了生产者监听器 ProducerListener,他的作用类似于带回调的KafkaTemplate#send(callback) ; 可以监听到消息发送成功或者失败。ProducerListener 提供了onSuccess 成功回调,和 onError 失败回调,如下:

API

public interface ProducerListener<K, V> {/*** Invoked after the successful send of a message (that is, after it has been acknowledged by the broker).*/default void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata) {}/*** Invoked after an attempt to send a message has failed.*/default void onError(ProducerRecord<K, V> producerRecord, @Nullable RecordMetadata recordMetadata,Exception exception) {}}

自定义Listener

public class MyProducerListener<K, V> implements ProducerListener<K, V> {private FallbackHandler<K, V> fallbackHandler;@Overridepublic void onError(ProducerRecord<K, V> producerRecord, @Nullable RecordMetadata recordMetadata, Exception exception) {//fallbackHandler.process.//write error metrics...}@Overridepublic void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata) {//write success metrics...}
}

demo(KafkaTemplate.setProducerListener())

    public KafkaTemplate<Object, Object> buildKafkaTemplate(Map<String, Object> props) {ProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>(props);KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(factory);MyProducerListener<Object, Object> listener1 = new MyProducerListener<>();listener1.setFallbackHandler(fallbackHandler);kafkaTemplate.setProducerListener(listener1);return kafkaTemplate;}

KafkaListenerErrorHandler

当@KafkaListener方法抛出异常时调用的错误处理程序.

API

@FunctionalInterface
public interface KafkaListenerErrorHandler {/*** Handle the error.*/Object handleError(Message<?> message, ListenerExecutionFailedException exception);
}

自定义CustomKafkaListenerErrorHandler(当异常过多时,暂停消费)

/*** 可以通过:* @org.springframework.kafka.annotation.KafkaListener(errorHandler="customKafkaListenerErrorHandler")* 来引入该配置*/
@Component
public class CustomKafkaListenerErrorHandler implements KafkaListenerErrorHandler {//记录了所有的 kafka MessageListenerContainerprivate final KafkaListenerEndpointRegistry endpointRegistry;public CustomKafkaListenerErrorHandler(KafkaListenerEndpointRegistry endpointRegistry) {this.endpointRegistry = endpointRegistry;}@Overridepublic Object handleError(Message<?> message, ListenerExecutionFailedException exception) {// 处理异常// 暂停消费者String listenerId = exception.getGroupId();MessageListenerContainer listenerContainer = endpointRegistry.getListenerContainer(listenerId);listenerContainer.pause();//滑动窗口算法 ---// 休眠一段时间(例如 30秒)try {Thread.sleep(30000); // 暂停 30 秒} catch (InterruptedException e) {Thread.currentThread().interrupt();}// 恢复消费者listenerContainer.resume();return null;}
}

demo

@org.springframework.kafka.annotation.KafkaListener(errorHandler="customKafkaListenerErrorHandler")

Callback

producer.Callback

public interface Callback {//processed befeore listener...void onCompletion(RecordMetadata metadata, Exception exception);
}

demo

producer.send(producerRecord, (recordMetadata, exception) -> {if (exception == null) {System.out.println("Record written to offset " +recordMetadata.offset() + " timestamp " +recordMetadata.timestamp());} else {System.err.println("An error occurred");exception.printStackTrace(System.err);}
});

这篇关于kafka Interceptors and Listeners的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/779682

相关文章

搭建Kafka+zookeeper集群调度

前言 硬件环境 172.18.0.5        kafkazk1        Kafka+zookeeper                Kafka Broker集群 172.18.0.6        kafkazk2        Kafka+zookeeper                Kafka Broker集群 172.18.0.7        kafkazk3

Java消息队列:RabbitMQ与Kafka的集成与应用

Java消息队列:RabbitMQ与Kafka的集成与应用 大家好,我是微赚淘客返利系统3.0的小编,是个冬天不穿秋裤,天冷也要风度的程序猿! 在现代的分布式系统中,消息队列是实现系统间通信、解耦和提高可扩展性的重要组件。RabbitMQ和Kafka是两个广泛使用的消息队列系统,它们各有特点和优势。本文将介绍如何在Java应用中集成RabbitMQ和Kafka,并展示它们的应用场景。 消息队

Kafka (快速)安装部署

文章目录 1、软件下载&配置环境1_JDK安装2_Zookeeper安装3_Kafka安装 2、单机安装1_配置主机名和IP映射2_单机Kafka配置 3、集群安装1_配置主机名和IP的映射关系2_时钟同步3_Zookeeper配置信息4_集群Kafka配置 4、kafka的其他脚本命令 1、软件下载&配置环境 下面的操作无论是单机部署还是分布式集群环境下都是通用的。 准

Kafka 分布式消息系统详细介绍

Kafka 分布式消息系统 一、Kafka 概述1.1 Kafka 定义1.2 Kafka 设计目标1.3 Kafka 特点 二、Kafka 架构设计2.1 基本架构2.2 Topic 和 Partition2.3 消费者和消费者组2.4 Replica 副本 三、Kafka 分布式集群搭建3.1 下载解压3.1.1 上传解压 3.2 修改 Kafka 配置文件3.2.1 修改zookeep

Kafka 实战演练:创建、配置与测试 Kafka全面教程

文章目录 1.配置文件2.消费者1.注解方式2.KafkaConsumer 3.依赖1.注解依赖2.KafkaConsumer依赖 本文档只是为了留档方便以后工作运维,或者给同事分享文档内容比较简陋命令也不是特别全,不适合小白观看,如有不懂可以私信,上班期间都是在得 1.配置文件 Yml配置 spring:kafka:bootstrap-servers: cons

Kafka【十二】消费者拉取主题分区的分配策略

【1】消费者组、leader和follower 消费者想要拉取主题分区的数据,首先必须要加入到一个组中。 但是一个组中有多个消费者的话,那么每一个消费者该如何消费呢,是不是像图中一样的消费策略呢?如果是的话,那假设消费者组中只有2个消费者或有4个消费者,和分区的数量不匹配,怎么办? 所以这里,我们需要学习Kafka中基本的消费者组中的消费者和分区之间的分配规则: 同一个消费者组的消费者都订

Kafka【十三】消费者消费消息的偏移量

偏移量offset是消费者消费数据的一个非常重要的属性。默认情况下,消费者如果不指定消费主题数据的偏移量,那么消费者启动消费时,无论当前主题之前存储了多少历史数据,消费者只能从连接成功后当前主题最新的数据偏移位置读取,而无法读取之前的任何数据。如果想要获取之前的数据,就需要设定配置参数或指定数据偏移量。 【1】起始偏移量 在消费者的配置中,我们可以增加偏移量相关参数auto.offset.re

Kafka的三高设计原理

1.生产者缓存机制--高性能 生产者缓存机制的主要目的是将消息打包,减少网络IO频率 kafka生产者端存在消息累加器RecordAccumulator,它会对每个Partition维护一个双端队列,队列中消息到达一定数量后 或者 到达一定时间后,通过sender线程批量的将消息发送给kafka服务端。(批量发送) 2.发送应答机制--高可用 发送应发机制保证了消息可以安全到达服务端!

Flink读取kafka数据并以parquet格式写入HDFS

《2021年最新版大数据面试题全面开启更新》 《2021年最新版大数据面试题全面开启更新》 大数据业务场景中,经常有一种场景:外部数据发送到kafka中,flink作为中间件消费kafka数据并进行业务处理;处理完成之后的数据可能还需要写入到数据库或者文件系统中,比如写入hdfs中; 目前基于spark进行计算比较主流,需要读取hdfs上的数据,可以通过读取parquet:spark.read

Pulsar与Kafka消费模型对比

kafka kafka 属于 Stream 的消费模型,为了支持多 partition 的消费关系,引入了 consumer group 的概念,同时支持在消费端动态的 reblance 操作,当多个 Consumer 订阅了同一个 Topic 时,会根据分区策略进行消费者订阅分区的重分配。只要 consumer-group 与 topic 之间的关系发生变更,就会动态触发 reblance 操