kafaka发送接收消息stream方式实例

2023-11-11 04:38

本文主要是介绍kafaka发送接收消息stream方式实例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1.配置文件

input为接收,output为发送

如果发送接收在同一个程序中,则不需要加上consumer: headerMode:raw ,如果本程序仅是接收消息进行消费,需要加上consumer: headerMode:raw 

spring:
  cloud:
    stream:
      bindings:
        input-collect:
          contentType: text/plain;charset=UTF-8destination: ACCOUNT_COLLECT_NOTIFYgroup: account-devinput-order:
          consumer:
            headerMode: rawcontentType: text/plain;charset=UTF-8destination: ACCOUNT_ORDER_NOTIFYgroup: account-devoutput-collect:
          contentType: text/plain;charset=UTF-8destination: ACCOUNT_COLLECT_NOTIFYoutput-watch:
          contentType: text/plain;charset=UTF-8destination: ACCOUNT_WATCH_NOTIFYkafka:
        binder:
          brokers: 192.168.1.158:9092,192.168.1.159:9092,192.168.1.160:9092zkNodes: 192.168.1.158:2181,192.168.1.159:2181,192.168.1.160:2181

2.发送消息outPut分类Bean

public interface NotifyMessageChannel {String COLLECT_OUTPUT = "output-collect";
   String WATCH_OUTPUT ="output-watch" ;
   
   @Output(NotifyMessageChannel.COLLECT_OUTPUT)MessageChannel collectOutPut();

   @Output(NotifyMessageChannel.WATCH_OUTPUT)MessageChannel watchOutPut();

}

3.发送消息service

NotifyMessageChannel中定义了2个发送MessageChannel,发送时可以直接.collectOutPut().send,选择不同的output进行发送

@Service
@Slf4j
@EnableBinding(NotifyMessageChannel.class)
public class NotifyServiceImpl implements NotifyService {@Autowired
    private NotifyMessageChannel notifyMessageChannel;
    private ObjectMapper mapper = new ObjectMapper();

    @Override
    public void sendUserCollectCourse(UserCollectCourseNotify userCollectCourseNotify) {try {Boolean result = notifyMessageChannel.collectOutPut().send(MessageBuilder.withPayload(mapper.writeValueAsString(userCollectCourseNotify)).build()) ;
            log.info("send result:"+result);
        } catch (Exception e) {log.error("Exception from create user UserCollectCourse.", e);
        }}
}

4.接收消息input配置

public interface ReceiveMessageChannel {String COLLECT_INPUT = "input-collect";
   String ORDER_INPUT ="input-order" ;
   
   @Input(ReceiveMessageChannel.COLLECT_INPUT)SubscribableChannel collectInput();

   @Input(ReceiveMessageChannel.ORDER_INPUT)SubscribableChannel orderInput();

}

5.监听接收到的消息,进行消费处理

@Service
@Slf4j
@EnableBinding(ReceiveMessageChannel.class)
public class CollectListener {private UserCollectCourseClient userCollectCourseService;
   
   private ObjectMapper mapper = new ObjectMapper();

   public CollectListener(UserCollectCourseClient userCollectCourseClient) {this.userCollectCourseService = userCollectCourseClient;
   }@StreamListener(ReceiveMessageChannel.COLLECT_INPUT)public void process(Message<String> message) {log.debug("Received Notify:[{}]",message.toString());

      String content = message.getPayload();
      UserCollectCourseNotify uccn;
      try {uccn = mapper.readValue(content,UserCollectCourseNotify.class);
         log.debug("Received Notify:[userId:{},courseId:{}]",uccn.getUserId(),uccn.getCourseId());

         if(uccn!=null){log.info("receive UserCollectCourseNotify:"+uccn);
//          userCollectCourseService.saveUserCollectCourse(ucci);
//          log.debug("Save Collect to Mongo:[userId:{},courseId:{}]",ucci.getUserId(),ucci.getCourseId());
         }} catch (Exception e) {log.warn("RECEIVE Collect NOTIFY ERROR:[message_body:{},error:{}]",message.toString(),e.getLocalizedMessage());
      }}
}






这篇关于kafaka发送接收消息stream方式实例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/387596

相关文章

内核启动时减少log的方式

内核引导选项 内核引导选项大体上可以分为两类:一类与设备无关、另一类与设备有关。与设备有关的引导选项多如牛毛,需要你自己阅读内核中的相应驱动程序源码以获取其能够接受的引导选项。比如,如果你想知道可以向 AHA1542 SCSI 驱动程序传递哪些引导选项,那么就查看 drivers/scsi/aha1542.c 文件,一般在前面 100 行注释里就可以找到所接受的引导选项说明。大多数选项是通过"_

【机器学习】高斯过程的基本概念和应用领域以及在python中的实例

引言 高斯过程(Gaussian Process,简称GP)是一种概率模型,用于描述一组随机变量的联合概率分布,其中任何一个有限维度的子集都具有高斯分布 文章目录 引言一、高斯过程1.1 基本定义1.1.1 随机过程1.1.2 高斯分布 1.2 高斯过程的特性1.2.1 联合高斯性1.2.2 均值函数1.2.3 协方差函数(或核函数) 1.3 核函数1.4 高斯过程回归(Gauss

用命令行的方式启动.netcore webapi

用命令行的方式启动.netcore web项目 进入指定的项目文件夹,比如我发布后的代码放在下面文件夹中 在此地址栏中输入“cmd”,打开命令提示符,进入到发布代码目录 命令行启动.netcore项目的命令为:  dotnet 项目启动文件.dll --urls="http://*:对外端口" --ip="本机ip" --port=项目内部端口 例: dotnet Imagine.M

C++操作符重载实例(独立函数)

C++操作符重载实例,我们把坐标值CVector的加法进行重载,计算c3=c1+c2时,也就是计算x3=x1+x2,y3=y1+y2,今天我们以独立函数的方式重载操作符+(加号),以下是C++代码: c1802.cpp源代码: D:\YcjWork\CppTour>vim c1802.cpp #include <iostream>using namespace std;/*** 以独立函数

深入理解RxJava:响应式编程的现代方式

在当今的软件开发世界中,异步编程和事件驱动的架构变得越来越重要。RxJava,作为响应式编程(Reactive Programming)的一个流行库,为Java和Android开发者提供了一种强大的方式来处理异步任务和事件流。本文将深入探讨RxJava的核心概念、优势以及如何在实际项目中应用它。 文章目录 💯 什么是RxJava?💯 响应式编程的优势💯 RxJava的核心概念

【即时通讯】轮询方式实现

技术栈 LayUI、jQuery实现前端效果。django4.2、django-ninja实现后端接口。 代码仓 - 后端 代码仓 - 前端 实现功能 首次访问页面并发送消息时需要设置昵称发送内容为空时要提示用户不能发送空消息前端定时获取消息,然后展示在页面上。 效果展示 首次发送需要设置昵称 发送消息与消息展示 提示用户不能发送空消息 后端接口 发送消息 DB = []@ro

实例:如何统计当前主机的连接状态和连接数

统计当前主机的连接状态和连接数 在 Linux 中,可使用 ss 命令来查看主机的网络连接状态。以下是统计当前主机连接状态和连接主机数量的具体操作。 1. 统计当前主机的连接状态 使用 ss 命令结合 grep、cut、sort 和 uniq 命令来统计当前主机的 TCP 连接状态。 ss -nta | grep -v '^State' | cut -d " " -f 1 | sort |

脏页的标记方式详解

脏页的标记方式 一、引言 在数据库系统中,脏页是指那些被修改过但还未写入磁盘的数据页。为了有效地管理这些脏页并确保数据的一致性,数据库需要对脏页进行标记。了解脏页的标记方式对于理解数据库的内部工作机制和优化性能至关重要。 二、脏页产生的过程 当数据库中的数据被修改时,这些修改首先会在内存中的缓冲池(Buffer Pool)中进行。例如,执行一条 UPDATE 语句修改了某一行数据,对应的缓

Java Websocket实例【服务端与客户端实现全双工通讯】

Java Websocket实例【服务端与客户端实现全双工通讯】 现很多网站为了实现即时通讯,所用的技术都是轮询(polling)。轮询是在特定的的时间间隔(如每1秒),由浏览器对服务器发 出HTTP request,然后由服务器返回最新的数据给客服端的浏览器。这种传统的HTTP request 的模式带来很明显的缺点 – 浏 览器需要不断的向服务器发出请求,然而HTTP

ActiveMQ—消息特性(延迟和定时消息投递)

ActiveMQ消息特性:延迟和定时消息投递(Delay and Schedule Message Delivery) 转自:http://blog.csdn.net/kimmking/article/details/8443872 有时候我们不希望消息马上被broker投递出去,而是想要消息60秒以后发给消费者,或者我们想让消息没隔一定时间投递一次,一共投递指定的次数。。。 类似