kafaka发送接收消息stream方式实例

2023-11-11 04:38

本文主要是介绍kafaka发送接收消息stream方式实例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1.配置文件

input为接收,output为发送

如果发送接收在同一个程序中,则不需要加上consumer: headerMode:raw ,如果本程序仅是接收消息进行消费,需要加上consumer: headerMode:raw 

spring:
  cloud:
    stream:
      bindings:
        input-collect:
          contentType: text/plain;charset=UTF-8destination: ACCOUNT_COLLECT_NOTIFYgroup: account-devinput-order:
          consumer:
            headerMode: rawcontentType: text/plain;charset=UTF-8destination: ACCOUNT_ORDER_NOTIFYgroup: account-devoutput-collect:
          contentType: text/plain;charset=UTF-8destination: ACCOUNT_COLLECT_NOTIFYoutput-watch:
          contentType: text/plain;charset=UTF-8destination: ACCOUNT_WATCH_NOTIFYkafka:
        binder:
          brokers: 192.168.1.158:9092,192.168.1.159:9092,192.168.1.160:9092zkNodes: 192.168.1.158:2181,192.168.1.159:2181,192.168.1.160:2181

2.发送消息outPut分类Bean

public interface NotifyMessageChannel {String COLLECT_OUTPUT = "output-collect";
   String WATCH_OUTPUT ="output-watch" ;
   
   @Output(NotifyMessageChannel.COLLECT_OUTPUT)MessageChannel collectOutPut();

   @Output(NotifyMessageChannel.WATCH_OUTPUT)MessageChannel watchOutPut();

}

3.发送消息service

NotifyMessageChannel中定义了2个发送MessageChannel,发送时可以直接.collectOutPut().send,选择不同的output进行发送

@Service
@Slf4j
@EnableBinding(NotifyMessageChannel.class)
public class NotifyServiceImpl implements NotifyService {@Autowired
    private NotifyMessageChannel notifyMessageChannel;
    private ObjectMapper mapper = new ObjectMapper();

    @Override
    public void sendUserCollectCourse(UserCollectCourseNotify userCollectCourseNotify) {try {Boolean result = notifyMessageChannel.collectOutPut().send(MessageBuilder.withPayload(mapper.writeValueAsString(userCollectCourseNotify)).build()) ;
            log.info("send result:"+result);
        } catch (Exception e) {log.error("Exception from create user UserCollectCourse.", e);
        }}
}

4.接收消息input配置

public interface ReceiveMessageChannel {String COLLECT_INPUT = "input-collect";
   String ORDER_INPUT ="input-order" ;
   
   @Input(ReceiveMessageChannel.COLLECT_INPUT)SubscribableChannel collectInput();

   @Input(ReceiveMessageChannel.ORDER_INPUT)SubscribableChannel orderInput();

}

5.监听接收到的消息,进行消费处理

@Service
@Slf4j
@EnableBinding(ReceiveMessageChannel.class)
public class CollectListener {private UserCollectCourseClient userCollectCourseService;
   
   private ObjectMapper mapper = new ObjectMapper();

   public CollectListener(UserCollectCourseClient userCollectCourseClient) {this.userCollectCourseService = userCollectCourseClient;
   }@StreamListener(ReceiveMessageChannel.COLLECT_INPUT)public void process(Message<String> message) {log.debug("Received Notify:[{}]",message.toString());

      String content = message.getPayload();
      UserCollectCourseNotify uccn;
      try {uccn = mapper.readValue(content,UserCollectCourseNotify.class);
         log.debug("Received Notify:[userId:{},courseId:{}]",uccn.getUserId(),uccn.getCourseId());

         if(uccn!=null){log.info("receive UserCollectCourseNotify:"+uccn);
//          userCollectCourseService.saveUserCollectCourse(ucci);
//          log.debug("Save Collect to Mongo:[userId:{},courseId:{}]",ucci.getUserId(),ucci.getCourseId());
         }} catch (Exception e) {log.warn("RECEIVE Collect NOTIFY ERROR:[message_body:{},error:{}]",message.toString(),e.getLocalizedMessage());
      }}
}






这篇关于kafaka发送接收消息stream方式实例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/387596

相关文章

Java实现MD5加密的四种方式

《Java实现MD5加密的四种方式》MD5是一种广泛使用的哈希算法,其输出结果是一个128位的二进制数,通常以32位十六进制数的形式表示,MD5的底层实现涉及多个复杂的步骤和算法,本文给大家介绍了Ja... 目录MD5介绍Java 中实现 MD5 加密方式方法一:使用 MessageDigest方法二:使用

Linux配置IP地址的三种实现方式

《Linux配置IP地址的三种实现方式》:本文主要介绍Linux配置IP地址的三种实现方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录环境RedHat9第一种安装 直接配置网卡文件第二种方式 nmcli(Networkmanager command-line

如何使用C#串口通讯实现数据的发送和接收

《如何使用C#串口通讯实现数据的发送和接收》本文详细介绍了如何使用C#实现基于串口通讯的数据发送和接收,通过SerialPort类,我们可以轻松实现串口通讯,并结合事件机制实现数据的传递和处理,感兴趣... 目录1. 概述2. 关键技术点2.1 SerialPort类2.2 异步接收数据2.3 数据解析2.

JAVA封装多线程实现的方式及原理

《JAVA封装多线程实现的方式及原理》:本文主要介绍Java中封装多线程的原理和常见方式,通过封装可以简化多线程的使用,提高安全性,并增强代码的可维护性和可扩展性,需要的朋友可以参考下... 目录前言一、封装的目标二、常见的封装方式及原理总结前言在 Java 中,封装多线程的原理主要围绕着将多线程相关的操

MyBatis-Plus中Service接口的lambdaUpdate用法及实例分析

《MyBatis-Plus中Service接口的lambdaUpdate用法及实例分析》本文将详细讲解MyBatis-Plus中的lambdaUpdate用法,并提供丰富的案例来帮助读者更好地理解和应... 目录深入探索MyBATis-Plus中Service接口的lambdaUpdate用法及示例案例背景

Golang中拼接字符串的6种方式性能对比

《Golang中拼接字符串的6种方式性能对比》golang的string类型是不可修改的,对于拼接字符串来说,本质上还是创建一个新的对象将数据放进去,主要有6种拼接方式,下面小编就来为大家详细讲讲吧... 目录拼接方式介绍性能对比测试代码测试结果源码分析golang的string类型是不可修改的,对于拼接字

MyBatis-Plus中静态工具Db的多种用法及实例分析

《MyBatis-Plus中静态工具Db的多种用法及实例分析》本文将详细讲解MyBatis-Plus中静态工具Db的各种用法,并结合具体案例进行演示和说明,具有很好的参考价值,希望对大家有所帮助,如有... 目录MyBATis-Plus中静态工具Db的多种用法及实例案例背景使用静态工具Db进行数据库操作插入

Linux下修改hostname的三种实现方式

《Linux下修改hostname的三种实现方式》:本文主要介绍Linux下修改hostname的三种实现方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录linux下修改ho编程stname三种方式方法1:修改配置文件方法2:hFvEWEostnamectl命

SpringBoot接收JSON类型的参数方式

《SpringBoot接收JSON类型的参数方式》:本文主要介绍SpringBoot接收JSON类型的参数方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、jsON二、代码准备三、Apifox操作总结一、JSON在学习前端技术时,我们有讲到过JSON,而在

Spring Security注解方式权限控制过程

《SpringSecurity注解方式权限控制过程》:本文主要介绍SpringSecurity注解方式权限控制过程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、摘要二、实现步骤2.1 在配置类中添加权限注解的支持2.2 创建Controller类2.3 Us