记一次项目所学(中间件等)-动态提醒功能(RocketMQ)

2024-03-10 10:44

本文主要是介绍记一次项目所学(中间件等)-动态提醒功能(RocketMQ),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

记一次项目所学(中间件等)–动态提醒功能(RocketMQ)

订阅发布模式与观察者模式

在这里插入图片描述

在这里插入图片描述

RocketMQ:纯java编写的开源消息中间件 高性能低延迟分布式事务

Redis : 高性能缓存工具,数据存储在内存中,读写速度非常快

RocketMQ相关工具类及配置实现

配置类

 
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.9.1</version></dependency>//redis<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId><version>2.2.2.RELEASE</version></dependency>

生产者发送消息工具类

public class RocketMQUtil {//同步发送消息public static void syncSendMsg(DefaultMQProducer producer, Message msg) throws Exception{SendResult result = producer.send(msg);System.out.println(result);}//异步发送消息public static void asyncSendMsg(DefaultMQProducer producer, Message msg) throws Exception{producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {Logger logger = LoggerFactory.getLogger(RocketMQUtil.class);logger.info("异步发送消息成功,消息id:" + sendResult.getMsgId());}@Overridepublic void onException(Throwable e) {e.printStackTrace();}});}
}

RocketMQ配置类

@Configuration
public class RocketMQConfig {//  rocketMQ名称服务器的地址@Value("${rocketmq.name.server.address}")private String nameServerAddr;@Autowiredprivate RedisTemplate<String, String> redisTemplate;@Autowiredprivate UserFollowingService userFollowingService;//生产者@Bean("momentsProducer")public DefaultMQProducer momentsProducer() throws Exception{DefaultMQProducer producer = new DefaultMQProducer(UserMomentsConstant.GROUP_MOMENTS);producer.setNamesrvAddr(nameServerAddr);producer.start();return producer;}@Bean("momentsConsumer")//push 为推送,还有拉取等consumerpublic DefaultMQPushConsumer momentsConsumer() throws Exception{DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(UserMomentsConstant.GROUP_MOMENTS);consumer.setNamesrvAddr(nameServerAddr);//订阅    *表示所有内容consumer.subscribe(UserMomentsConstant.TOPIC_MOMENTS, "*");//消费者监听器,监听到后下一步操作//registerMessageListener注册消息监听consumer.registerMessageListener(new MessageListenerConcurrently() {@Override//ConsumeConcurrentlyStatus并发处理//MessageExt消息的扩充,ConsumeConcurrentlyContext为处理的上下文public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){MessageExt msg = msgs.get(0);if(msg == null){return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}//取出的是byte数组类型String bodyStr = new String(msg.getBody());UserMoment userMoment = JSONObject.toJavaObject(JSONObject.parseObject(bodyStr), UserMoment.class);Long userId = userMoment.getUserId();//定位粉丝idList<UserFollowing>fanList = userFollowingService.getUserFans(userId);for(UserFollowing fan : fanList){//发到redis用户到redis拿String key = "subscribed-" + fan.getUserId();//把动态列表拿出来String subscribedListStr = redisTemplate.opsForValue().get(key);List<UserMoment> subscribedList;if(StringUtil.isNullOrEmpty(subscribedListStr)){subscribedList = new ArrayList<>();}else{//转换列表的类subscribedList = JSONArray.parseArray(subscribedListStr, UserMoment.class);}subscribedList.add(userMoment);//把列表再转成字符串放进去redisTemplate.opsForValue().set(key, JSONObject.toJSONString(subscribedList));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();return consumer;}

具体业务逻辑:

@Service
public class UserMomentsService {@Autowiredprivate UserMomentsDao userMomentsDao;@Autowiredprivate ApplicationContext applicationContext;@Autowiredprivate RedisTemplate<String, String> redisTemplate;public void addUserMoments(UserMoment userMoment) throws Exception {userMoment.setCreateTime(new Date());//cruduserMomentsDao.addUserMoments(userMoment);DefaultMQProducer producer = (DefaultMQProducer)applicationContext.getBean("momentsProducer");//主题 以及json的数组消息Message msg = new Message(UserMomentsConstant.TOPIC_MOMENTS, JSONObject.toJSONString(userMoment).getBytes(StandardCharsets.UTF_8));RocketMQUtil.syncSendMsg(producer, msg);}// 查询订阅动态public List<UserMoment> getUserSubscribedMoments(Long userId) {String key = "subscribed-" + userId;//查出来的是String描述的json类型String listStr = redisTemplate.opsForValue().get(key);//返回的是List类型,要把查出来的String封装成一个一个的UserMoment再进List中return JSONArray.parseArray(listStr, UserMoment.class);}
}

PS:消费信息逻辑在配置类的Consumer中已经写好了

这篇关于记一次项目所学(中间件等)-动态提醒功能(RocketMQ)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/793988

相关文章

如何关闭 Mac 触发角功能或设置修饰键? mac电脑防止误触设置技巧

《如何关闭Mac触发角功能或设置修饰键?mac电脑防止误触设置技巧》从Windows换到iOS大半年来,触发角是我觉得值得吹爆的MacBook效率神器,成为一大说服理由,下面我们就来看看mac电... MAC 的「触发角」功能虽然提高了效率,但过于灵敏也让不少用户感到头疼。特别是在关键时刻,一不小心就可能触

Nginx实现高并发的项目实践

《Nginx实现高并发的项目实践》本文主要介绍了Nginx实现高并发的项目实践,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧... 目录使用最新稳定版本的Nginx合理配置工作进程(workers)配置工作进程连接数(worker_co

Vue中动态权限到按钮的完整实现方案详解

《Vue中动态权限到按钮的完整实现方案详解》这篇文章主要为大家详细介绍了Vue如何在现有方案的基础上加入对路由的增、删、改、查权限控制,感兴趣的小伙伴可以跟随小编一起学习一下... 目录一、数据库设计扩展1.1 修改路由表(routes)1.2 修改角色与路由权限表(role_routes)二、后端接口设计

springboot rocketmq配置生产者和消息者的步骤

《springbootrocketmq配置生产者和消息者的步骤》本文介绍了如何在SpringBoot中集成RocketMQ,包括添加依赖、配置application.yml、创建生产者和消费者,并展... 目录1. 添加依赖2. 配置application.yml3. 创建生产者4. 创建消费者5. 使用在

Vue项目的甘特图组件之dhtmlx-gantt使用教程和实现效果展示(推荐)

《Vue项目的甘特图组件之dhtmlx-gantt使用教程和实现效果展示(推荐)》文章介绍了如何使用dhtmlx-gantt组件来实现公司的甘特图需求,并提供了一个简单的Vue组件示例,文章还分享了一... 目录一、首先 npm 安装插件二、创建一个vue组件三、业务页面内 引用自定义组件:四、dhtmlx

SpringBoot项目注入 traceId 追踪整个请求的日志链路(过程详解)

《SpringBoot项目注入traceId追踪整个请求的日志链路(过程详解)》本文介绍了如何在单体SpringBoot项目中通过手动实现过滤器或拦截器来注入traceId,以追踪整个请求的日志链... SpringBoot项目注入 traceId 来追踪整个请求的日志链路,有了 traceId, 我们在排

前端 CSS 动态设置样式::class、:style 等技巧(推荐)

《前端CSS动态设置样式::class、:style等技巧(推荐)》:本文主要介绍了Vue.js中动态绑定类名和内联样式的两种方法:对象语法和数组语法,通过对象语法,可以根据条件动态切换类名或样式;通过数组语法,可以同时绑定多个类名或样式,此外,还可以结合计算属性来生成复杂的类名或样式对象,详细内容请阅读本文,希望能对你有所帮助...

MobaXterm远程登录工具功能与应用小结

《MobaXterm远程登录工具功能与应用小结》MobaXterm是一款功能强大的远程终端软件,主要支持SSH登录,拥有多种远程协议,实现跨平台访问,它包括多会话管理、本地命令行执行、图形化界面集成和... 目录1. 远程终端软件概述1.1 远程终端软件的定义与用途1.2 远程终端软件的关键特性2. 支持的

Nginx实现动态封禁IP的步骤指南

《Nginx实现动态封禁IP的步骤指南》在日常的生产环境中,网站可能会遭遇恶意请求、DDoS攻击或其他有害的访问行为,为了应对这些情况,动态封禁IP是一项十分重要的安全策略,本篇博客将介绍如何通过NG... 目录1、简述2、实现方式3、使用 fail2ban 动态封禁3.1 安装 fail2ban3.2 配

Java中实现订单超时自动取消功能(最新推荐)

《Java中实现订单超时自动取消功能(最新推荐)》本文介绍了Java中实现订单超时自动取消功能的几种方法,包括定时任务、JDK延迟队列、Redis过期监听、Redisson分布式延迟队列、Rocket... 目录1、定时任务2、JDK延迟队列 DelayQueue(1)定义实现Delayed接口的实体类 (