kafka(四)消息类型

2024-06-23 12:20
文章标签 kafka 类型 消息

本文主要是介绍kafka(四)消息类型,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、同步消息

1、生产者

同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回 ack。 由于 send 方法返回的是一个 Future 对象,根据 Futrue 对象的特点,我们也可以实现同 步发送的效果,只需在调用 Future 对象的 get 方发即可。

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;
import java.util.concurrent.ExecutionException;public class CustomProducerSync {public static void main(String[] args) throws ExecutionException, InterruptedException {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// key,value序列化(必须):properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 3. 创建kafka生产者对象KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {// 默认为异步发送kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i));// 末尾加get为同步发送kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i)).get();}// 5. 关闭资源kafkaProducer.close();}
}

二、异步消息

1、生产者

异步消息有两种:

1.1、普通异步
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class CustomProducer {public static void main(String[] args) {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// key,value序列化(必须):properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 3. 创建kafka生产者对象KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {kafkaProducer.send(new ProducerRecord<>("first", "wtyy"));}// 5. 关闭资源kafkaProducer.close();}
}
1.2、带回调函数的异步发送

回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是 RecordMetadata 和 Exception,如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。

注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

import org.apache.kafka.clients.producer.*;import java.util.Properties;public class CustomProducerCallBack {public static void main(String[] args) {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// key,value序列化(必须):// 序列化器的serialization是一个接口,找到他的实现类// 我们一般都是使用Stringproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 3. 创建kafka生产者对象KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i),new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {//(1)消息发送成功  exception == null  接受到服务端ack消息   调用该方法//(2)消息发送失败  exception != null  也会调用该方法if (exception == null) {System.out.println(metadata);//使用打印演示}else{exception.printStackTrace();//打印异常信息}}});}// 5. 关闭资源kafkaProducer.close();}
}

三、顺序消息

这篇关于kafka(四)消息类型的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1087147

相关文章

SQL Server中,查询数据库中有多少个表,以及数据库其余类型数据统计查询

sqlserver查询数据库中有多少个表 sql server 数表:select count(1) from sysobjects where xtype='U'数视图:select count(1) from sysobjects where xtype='V'数存储过程select count(1) from sysobjects where xtype='P' SE

C#中,decimal类型使用

在Microsoft SQL Server中numeric类型,在C#中使用的时候,需要用decimal类型与其对应,不能使用int等类型。 SQL:numeric C#:decimal

说一说三大运营商的流量类型,看完就知道该怎么选运营商了!

说一说三大运营商的流量类型,看完就知道该怎么选运营商了?目前三大运营商的流量类型大致分为通用流量和定向流量,比如: 中国电信:通用流量+定向流量 电信推出的套餐通常由通用流量+定向流量所组成,通用流量比较多,一般都在100G以上,而且电信套餐长期套餐较多,大多无合约期,自主激活的卡也是最多的,适合没有通话需求的朋友办理。 中国移动:通用流量+定向流量 移动推出的套餐通常由通用流量+定向

常用MQ消息中间件Kafka、ZeroMQ和RabbitMQ对比及RabbitMQ详解

1、概述   在现代的分布式系统和实时数据处理领域,消息中间件扮演着关键的角色,用于解决应用程序之间的通信和数据传递的挑战。在众多的消息中间件解决方案中,Kafka、ZeroMQ和RabbitMQ 是备受关注和广泛应用的代表性系统。它们各自具有独特的特点和优势,适用于不同的应用场景和需求。   Kafka 是一个高性能、可扩展的分布式消息队列系统,被设计用于处理大规模的数据流和实时数据传输。它

微服务中RPC的强类型检查与HTTP的弱类型对比

在微服务架构中,服务间的通信是一个至关重要的环节。其中,远程过程调用(RPC)和HTTP是两种最常见的通信方式。虽然它们都能实现服务间的数据交换,但在类型检查方面,RPC的强类型检查和HTTP的弱类型之间有着显著的差异。本文将深入探讨这两种通信方式在类型检查方面的优缺点,以及它们对微服务架构的影响。 一、RPC的强类型检查 RPC的强类型检查是其核心优势之一。在RPC通信中,客户端和服务端都使

消息认证码解析

1. 什么是消息认证码         消息认证码(Message Authentication Code)是一种确认完整性并进行认证的技术,取三个单词的首字母,简称为MAC。         消息认证码的输入包括任意长度的消息和一个发送者与接收者之间共享的密钥,它可以输出固定长度的数据,这个数据称为MAC值。         根据任意长度的消息输出固定长度的数据,这一点和单向散列函数很类似

一二三应用开发平台应用开发示例(4)——视图类型介绍以及新增、修改、查看视图配置

调整上级属性类型 前面为了快速展示平台的低代码配置功能,将实体文件夹的数据模型上级属性的数据类型暂时配置为文本类型,现在我们调整下,将其数据类型调整为实体,如下图所示: 数据类型需要选择实体,并在实体选择框中选择自身“文件夹” 这时候,再点击生成代码,平台会报错,提示“实体【文件夹】未设置主参照视图”。这是因为文件夹选择的功能页面,同样是基于配置产生的,因为视图我们还没有配置,所以会报错。

【面试干货】Java中的四种引用类型:强引用、软引用、弱引用和虚引用

【面试干货】Java中的四种引用类型:强引用、软引用、弱引用和虚引用 1、强引用(Strong Reference)2、软引用(Soft Reference)3、弱引用(Weak Reference)4、虚引用(Phantom Reference)5、总结 💖The Begin💖点点关注,收藏不迷路💖 在Java中,除了我们常见的强引用(Strong Refer

rocketmq问题汇总-如何将特定消息发送至特定queue,消费者从特定queue消费

业务描述 由于业务需要这样一种场景,将消息按照id(业务id)尾号发送到对应的queue中,并启动10个消费者(单jvm,10个消费者组),从对应的queue中集群消费,如下图1所示(假设有两个broker组成的集群):  producer如何实现 producer只需发送消息时调用如下方法即可 /*** 发送有序消息** @param messageMap 消息数据* @param

Spring 集成 RabbitMQ 与其概念,消息持久化,ACK机制

目录 RabbitMQ 概念exchange交换机机制 什么是交换机binding?Direct Exchange交换机Topic Exchange交换机Fanout Exchange交换机Header Exchange交换机RabbitMQ 的 Hello - Demo(springboot实现)RabbitMQ 的 Hello Demo(spring xml实现)RabbitMQ 在生产环境