本文主要是介绍kafka(四)消息类型,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
一、同步消息
1、生产者
同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回 ack。 由于 send 方法返回的是一个 Future 对象,根据 Futrue 对象的特点,我们也可以实现同 步发送的效果,只需在调用 Future 对象的 get 方发即可。
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;
import java.util.concurrent.ExecutionException;public class CustomProducerSync {public static void main(String[] args) throws ExecutionException, InterruptedException {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// key,value序列化(必须):properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 3. 创建kafka生产者对象KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {// 默认为异步发送kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i));// 末尾加get为同步发送kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i)).get();}// 5. 关闭资源kafkaProducer.close();}
}
二、异步消息
1、生产者
异步消息有两种:
1.1、普通异步
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class CustomProducer {public static void main(String[] args) {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// key,value序列化(必须):properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 3. 创建kafka生产者对象KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {kafkaProducer.send(new ProducerRecord<>("first", "wtyy"));}// 5. 关闭资源kafkaProducer.close();}
}
1.2、带回调函数的异步发送
回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是 RecordMetadata 和 Exception,如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。
注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。
import org.apache.kafka.clients.producer.*;import java.util.Properties;public class CustomProducerCallBack {public static void main(String[] args) {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// key,value序列化(必须):// 序列化器的serialization是一个接口,找到他的实现类// 我们一般都是使用Stringproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 3. 创建kafka生产者对象KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i),new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {//(1)消息发送成功 exception == null 接受到服务端ack消息 调用该方法//(2)消息发送失败 exception != null 也会调用该方法if (exception == null) {System.out.println(metadata);//使用打印演示}else{exception.printStackTrace();//打印异常信息}}});}// 5. 关闭资源kafkaProducer.close();}
}
三、顺序消息
这篇关于kafka(四)消息类型的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!