记一次项目所学(中间件等)-动态提醒功能(RocketMQ)

2024-03-10 10:44

本文主要是介绍记一次项目所学(中间件等)-动态提醒功能(RocketMQ),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

记一次项目所学(中间件等)–动态提醒功能(RocketMQ)

订阅发布模式与观察者模式

在这里插入图片描述

在这里插入图片描述

RocketMQ:纯java编写的开源消息中间件 高性能低延迟分布式事务

Redis : 高性能缓存工具,数据存储在内存中,读写速度非常快

RocketMQ相关工具类及配置实现

配置类

 
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.9.1</version></dependency>//redis<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId><version>2.2.2.RELEASE</version></dependency>

生产者发送消息工具类

public class RocketMQUtil {//同步发送消息public static void syncSendMsg(DefaultMQProducer producer, Message msg) throws Exception{SendResult result = producer.send(msg);System.out.println(result);}//异步发送消息public static void asyncSendMsg(DefaultMQProducer producer, Message msg) throws Exception{producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {Logger logger = LoggerFactory.getLogger(RocketMQUtil.class);logger.info("异步发送消息成功,消息id:" + sendResult.getMsgId());}@Overridepublic void onException(Throwable e) {e.printStackTrace();}});}
}

RocketMQ配置类

@Configuration
public class RocketMQConfig {//  rocketMQ名称服务器的地址@Value("${rocketmq.name.server.address}")private String nameServerAddr;@Autowiredprivate RedisTemplate<String, String> redisTemplate;@Autowiredprivate UserFollowingService userFollowingService;//生产者@Bean("momentsProducer")public DefaultMQProducer momentsProducer() throws Exception{DefaultMQProducer producer = new DefaultMQProducer(UserMomentsConstant.GROUP_MOMENTS);producer.setNamesrvAddr(nameServerAddr);producer.start();return producer;}@Bean("momentsConsumer")//push 为推送,还有拉取等consumerpublic DefaultMQPushConsumer momentsConsumer() throws Exception{DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(UserMomentsConstant.GROUP_MOMENTS);consumer.setNamesrvAddr(nameServerAddr);//订阅    *表示所有内容consumer.subscribe(UserMomentsConstant.TOPIC_MOMENTS, "*");//消费者监听器,监听到后下一步操作//registerMessageListener注册消息监听consumer.registerMessageListener(new MessageListenerConcurrently() {@Override//ConsumeConcurrentlyStatus并发处理//MessageExt消息的扩充,ConsumeConcurrentlyContext为处理的上下文public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){MessageExt msg = msgs.get(0);if(msg == null){return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}//取出的是byte数组类型String bodyStr = new String(msg.getBody());UserMoment userMoment = JSONObject.toJavaObject(JSONObject.parseObject(bodyStr), UserMoment.class);Long userId = userMoment.getUserId();//定位粉丝idList<UserFollowing>fanList = userFollowingService.getUserFans(userId);for(UserFollowing fan : fanList){//发到redis用户到redis拿String key = "subscribed-" + fan.getUserId();//把动态列表拿出来String subscribedListStr = redisTemplate.opsForValue().get(key);List<UserMoment> subscribedList;if(StringUtil.isNullOrEmpty(subscribedListStr)){subscribedList = new ArrayList<>();}else{//转换列表的类subscribedList = JSONArray.parseArray(subscribedListStr, UserMoment.class);}subscribedList.add(userMoment);//把列表再转成字符串放进去redisTemplate.opsForValue().set(key, JSONObject.toJSONString(subscribedList));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();return consumer;}

具体业务逻辑:

@Service
public class UserMomentsService {@Autowiredprivate UserMomentsDao userMomentsDao;@Autowiredprivate ApplicationContext applicationContext;@Autowiredprivate RedisTemplate<String, String> redisTemplate;public void addUserMoments(UserMoment userMoment) throws Exception {userMoment.setCreateTime(new Date());//cruduserMomentsDao.addUserMoments(userMoment);DefaultMQProducer producer = (DefaultMQProducer)applicationContext.getBean("momentsProducer");//主题 以及json的数组消息Message msg = new Message(UserMomentsConstant.TOPIC_MOMENTS, JSONObject.toJSONString(userMoment).getBytes(StandardCharsets.UTF_8));RocketMQUtil.syncSendMsg(producer, msg);}// 查询订阅动态public List<UserMoment> getUserSubscribedMoments(Long userId) {String key = "subscribed-" + userId;//查出来的是String描述的json类型String listStr = redisTemplate.opsForValue().get(key);//返回的是List类型,要把查出来的String封装成一个一个的UserMoment再进List中return JSONArray.parseArray(listStr, UserMoment.class);}
}

PS:消费信息逻辑在配置类的Consumer中已经写好了

这篇关于记一次项目所学(中间件等)-动态提醒功能(RocketMQ)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/793988

相关文章

将Java项目提交到云服务器的流程步骤

《将Java项目提交到云服务器的流程步骤》所谓将项目提交到云服务器即将你的项目打成一个jar包然后提交到云服务器即可,因此我们需要准备服务器环境为:Linux+JDK+MariDB(MySQL)+Gi... 目录1. 安装 jdk1.1 查看 jdk 版本1.2 下载 jdk2. 安装 mariadb(my

SpringBoot集成Milvus实现数据增删改查功能

《SpringBoot集成Milvus实现数据增删改查功能》milvus支持的语言比较多,支持python,Java,Go,node等开发语言,本文主要介绍如何使用Java语言,采用springboo... 目录1、Milvus基本概念2、添加maven依赖3、配置yml文件4、创建MilvusClient

Node.js 数据库 CRUD 项目示例详解(完美解决方案)

《Node.js数据库CRUD项目示例详解(完美解决方案)》:本文主要介绍Node.js数据库CRUD项目示例详解(完美解决方案),本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考... 目录项目结构1. 初始化项目2. 配置数据库连接 (config/db.js)3. 创建模型 (models/

使用Python开发一个带EPUB转换功能的Markdown编辑器

《使用Python开发一个带EPUB转换功能的Markdown编辑器》Markdown因其简单易用和强大的格式支持,成为了写作者、开发者及内容创作者的首选格式,本文将通过Python开发一个Markd... 目录应用概览代码结构与核心组件1. 初始化与布局 (__init__)2. 工具栏 (setup_t

springboot项目中常用的工具类和api详解

《springboot项目中常用的工具类和api详解》在SpringBoot项目中,开发者通常会依赖一些工具类和API来简化开发、提高效率,以下是一些常用的工具类及其典型应用场景,涵盖Spring原生... 目录1. Spring Framework 自带工具类(1) StringUtils(2) Coll

MySQL中动态生成SQL语句去掉所有字段的空格的操作方法

《MySQL中动态生成SQL语句去掉所有字段的空格的操作方法》在数据库管理过程中,我们常常会遇到需要对表中字段进行清洗和整理的情况,本文将详细介绍如何在MySQL中动态生成SQL语句来去掉所有字段的空... 目录在mysql中动态生成SQL语句去掉所有字段的空格准备工作原理分析动态生成SQL语句在MySQL

Spring Boot项目部署命令java -jar的各种参数及作用详解

《SpringBoot项目部署命令java-jar的各种参数及作用详解》:本文主要介绍SpringBoot项目部署命令java-jar的各种参数及作用的相关资料,包括设置内存大小、垃圾回收... 目录前言一、基础命令结构二、常见的 Java 命令参数1. 设置内存大小2. 配置垃圾回收器3. 配置线程栈大小

SpringBoot实现微信小程序支付功能

《SpringBoot实现微信小程序支付功能》小程序支付功能已成为众多应用的核心需求之一,本文主要介绍了SpringBoot实现微信小程序支付功能,文中通过示例代码介绍的非常详细,对大家的学习或者工作... 目录一、引言二、准备工作(一)微信支付商户平台配置(二)Spring Boot项目搭建(三)配置文件

在Android平台上实现消息推送功能

《在Android平台上实现消息推送功能》随着移动互联网应用的飞速发展,消息推送已成为移动应用中不可或缺的功能,在Android平台上,实现消息推送涉及到服务端的消息发送、客户端的消息接收、通知渠道(... 目录一、项目概述二、相关知识介绍2.1 消息推送的基本原理2.2 Firebase Cloud Me

Spring Boot项目中结合MyBatis实现MySQL的自动主从切换功能

《SpringBoot项目中结合MyBatis实现MySQL的自动主从切换功能》:本文主要介绍SpringBoot项目中结合MyBatis实现MySQL的自动主从切换功能,本文分步骤给大家介绍的... 目录原理解析1. mysql主从复制(Master-Slave Replication)2. 读写分离3.