尚品汇-商品上下架完善(更新ES)、延迟消息(四十四)

2024-09-04 21:12

本文主要是介绍尚品汇-商品上下架完善(更新ES)、延迟消息(四十四),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录:

(1)改造商品搜索上下架

(2)延迟消息

(1)改造商品搜索上下架

定义商品上下架常量

rabbit-util模块中导入常量类MqConst
/*** 商品上下架.*/
public static final String EXCHANGE_DIRECT_GOODS = "exchange.direct.goods";
public static final String ROUTING_GOODS_UPPER = "goods.upper";
public static final String ROUTING_GOODS_LOWER = "goods.lower";
//队列
public static final String QUEUE_GOODS_UPPER  = "queue.goods.upper";
public static final String QUEUE_GOODS_LOWER  = "queue.goods.lower";

service-list与service-product引入依赖与配置

<!--rabbitmq消息队列-->
<dependency><groupId>com.atguigu.gmall</groupId><artifactId>rabbit-util</artifactId><version>1.0</version>
</dependency>

service-product发送消息

我在商品上架与商品添加时发送消息

商品上架

实现类

@Override
@Transactional
public void onSale(Long skuId) {// 更改销售状态SkuInfo skuInfoUp = new SkuInfo();skuInfoUp.setId(skuId);skuInfoUp.setIsSale(1);skuInfoMapper.updateById(skuInfoUp);//商品上架 交换机 路由key 队列rabbitService.sendMessage(MqConst.EXCHANGE_DIRECT_GOODS, MqConst.ROUTING_GOODS_UPPER, skuId);
}

商品下架

@Override
@Transactional
public void cancelSale(Long skuId) {// 更改销售状态SkuInfo skuInfoUp = new SkuInfo();skuInfoUp.setId(skuId);skuInfoUp.setIsSale(0);skuInfoMapper.updateById(skuInfoUp);//商品下架  交换机 路由key 队列rabbitService.sendMessage(MqConst.EXCHANGE_DIRECT_GOODS, MqConst.ROUTING_GOODS_LOWER, skuId);
}

service-list消费消息

package com.atguigu.gmall.list.receiver;import com.atguigu.gmall.constant.MqConst;
import com.atguigu.gmall.list.service.SearchService;
import com.rabbitmq.client.Channel;
import lombok.SneakyThrows;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class ListReceiver {@Autowiredprivate SearchService searchService;//监听上架队列@SneakyThrows@RabbitListener(bindings = @QueueBinding(value =@Queue(value = MqConst.QUEUE_GOODS_UPPER,durable = "true",autoDelete = "false"),exchange =@Exchange(value = MqConst.EXCHANGE_DIRECT_GOODS,autoDelete = "false"),key = {MqConst.ROUTING_GOODS_UPPER}))public void upperGoodsToEs(Long skuId, Message message, Channel channel){try {//判断if(skuId!=null){//操作搜索模块操作ES的上架方法searchService.upperGoods(skuId);}} catch (Exception e) {//写入日志文件 ,,写入数据库, 对接程序员手机短信e.printStackTrace();}//消息确认    // 参数一:消息的唯一标识,参数二:是否批量确认 false 确认一个消息,true 批量确认channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}//监听下架队列//监听上架队列@SneakyThrows@RabbitListener(bindings = @QueueBinding(value =@Queue(value = MqConst.QUEUE_GOODS_LOWER,durable = "true",autoDelete = "false"),exchange =@Exchange(value = MqConst.EXCHANGE_DIRECT_GOODS,autoDelete = "false"),key = {MqConst.ROUTING_GOODS_LOWER}))public void lowerGoodsToEs(Long skuId, Message message, Channel channel){try {//判断if(skuId!=null){//操作搜索模块操作ES的下架方法searchService.lowerGoods(skuId);}} catch (Exception e) {//写入日志文件 ,,写入数据库, 对接程序员手机短信e.printStackTrace();}//消息确认    // 参数一:消息的唯一标识,参数二:是否批量确认 false 确认一个消息,true 批量确认channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}
}

测试

启动后台管理页面

http://localhost:8888/#/product/sku/list

操作商品的上架,下架。动态更改es中的数据。

全部下架,页面查看:

上架几个:

(2)延迟消息

前面解决了搜索与商品服务的问题,下面解决这个问题订单取消问题:

 

延迟消息有两种实现方案:

  1. 基于死信队列
  2. 集成延迟插件

1.基于死信实现延迟消息

使用RabbitMQ来实现延迟消息必须先了解RabbitMQ的两个概念:消息的TTL和死信Exchange,通过这两者的组合来实现延迟队列

消息的TTL(Time To Live)

消息的TTL就是消息的存活时间。RabbitMQ可以对队列和消息分别设置TTL。对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信。

如何设置TTL:

我们创建一个队列queue.temp,在Arguments 中添加x-message-ttl 为5000 (单位是毫秒),那所在压在这个队列的消息在5秒后会消失。

死信交换机  Dead Letter Exchanges

一个消息在满足如下条件下,会进死信路由,记住这里是路由而不是队列,一个路由可以对应很多队列。

(1) 一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。

(2)上面的消息的TTL到了,消息过期了。

(3)队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上。

Dead Letter Exchange其实就是一种普通的exchange,和创建其他exchange没有两样。只是在某一个设置Dead Letter Exchange的队列中有消息过期了,会自动触发消息的转发,发送到Dead Letter Exchange中去。

 

我们现在可以测试一下延迟队列。

(1)创建死信队列

(2)创建交换机

(3)建立交换器与队列之间的绑定

(4)创建队列

代码实现

 

在service-mq 中添加配置类:前面我们使用注解的方式进行创建交换机队列,下面我们用配置类进行创建 

package com.atguigu.gmall.mq.config;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class DeadLetterMqConfig {// 声明一些变量public static final String exchange_dead = "exchange.dead";public static final String routing_dead_1 = "routing.dead.1";public static final String routing_dead_2 = "routing.dead.2";public static final String queue_dead_1 = "queue.dead.1";public static final String queue_dead_2 = "queue.dead.2";// 定义交换机@Beanpublic DirectExchange exchange(){return new DirectExchange(exchange_dead,true,false,null);}@Beanpublic Queue queue1(){// 设置如果队列一 出现问题,则通过参数转到exchange_dead,routing_dead_2 上!HashMap<String, Object> map = new HashMap<>();// 参数绑定 此处的key 固定值,不能随意写map.put("x-dead-letter-exchange",exchange_dead);map.put("x-dead-letter-routing-key",routing_dead_2);// 设置延迟时间map.put("x-message-ttl", 10 * 1000);// 队列名称,是否持久化,是否独享、排外的【true:只可以在本次连接中访问】,是否自动删除,队列的其他属性参数return new Queue(queue_dead_1,true,false,false,map);}@Beanpublic Binding binding(){// 将队列一 通过routing_dead_1 key 绑定到exchange_dead 交换机上return BindingBuilder.bind(queue1()).to(exchange()).with(routing_dead_1);}// 这个队列二就是一个普通队列@Beanpublic Queue queue2(){return new Queue(queue_dead_2,true,false,false,null);}// 设置队列二的绑定规则@Beanpublic Binding binding2(){// 将队列二通过routing_dead_2 key 绑定到exchange_dead交换机上!return BindingBuilder.bind(queue2()).to(exchange()).with(routing_dead_2);}
}

配置发送消息

package com.atguigu.gmall.mq.controller;@RestController
@RequestMapping("/mq")
@Slf4j
public class MqController {@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate RabbitService rabbitService;@GetMapping("sendDeadLettle")public Result sendDeadLettle() {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");this.rabbitService.convertAndSend(DeadLetterMqConfig.exchange_dead, DeadLetterMqConfig.routing_dead_1, "我是延迟消息");System.out.println(" 消息发送时间:"+sdf.format(new Date()) );return Result.ok();}
}

 

消息接收方

package com.atguigu.gmall.mq.receiver;@Component
@Configuration
public class DeadLetterReceiver {//消费的是队列2,队列1,没有人消息,超过超时时间后会变成死信@RabbitListener(queues = DeadLetterMqConfig.queue_dead_2)public void get(String msg) {System.out.println("Receive:" + msg);SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.println("消息接收时间: " + sdf.format(new Date()));System.out.println("消息内容为:"+msg)}
}

 

2.基于延迟插件实现延迟消息

Rabbitmq实现了一个插件x-delay-message来实现延时队列

插件安装

1. 首先我们将刚下载下来的rabbitmq_delayed_message_exchange-3.9.0.ez文件上传到RabbitMQ所在服务器,下载地址:https://www.rabbitmq.com/community-plugins.html

2. 切换到插件所在目录,执行 docker cp rabbitmq_delayed_message_exchange-3.9.0.ez rabbitmq:/plugins 命令,将刚插件拷贝到容器内plugins目录下

3. 执行 docker exec -it rabbitmq /bin/bash 命令进入到容器内部,并 cd plugins 进入plugins目录

4. 执行 ls -l|grep delay  命令查看插件是否copy成功

5. 在容器内plugins目录下,执行 rabbitmq-plugins enable rabbitmq_delayed_message_exchange  命令启用插件

6. exit命令退出RabbitMQ容器内部,然后执行 docker restart rabbitmq 命令重启RabbitMQ容器

代码实现

在service-mq 中添加类

配置队列

package com.atguigu.gmall.mq.config;@Configuration
public class DelayedMqConfig {public static final String exchange_delay = "exchange.delay";public static final String routing_delay = "routing.delay";public static final String queue_delay_1 = "queue.delay.1";@Beanpublic Queue delayQeue1() {// 第一个参数是创建的queue的名字,第二个参数是是否支持持久化return new Queue(queue_delay_1, true);}@Beanpublic CustomExchange delayExchange() {Map<String, Object> args = new HashMap<String, Object>();args.put("x-delayed-type", "direct");return new CustomExchange(exchange_delay, "x-delayed-message", true, false, args);}@Beanpublic Binding delayBbinding1() {return BindingBuilder.bind(delayQeue1()).to(delayExchange()).with(routing_delay).noargs();}
}

发送消息:这个延迟插件,需要使用rabbitTemplate,上面的可以使用rabbitService封装的service

@GetMapping("sendelay")
public Result sendDelay() {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");this.rabbitTemplate.convertAndSend(DelayedMqConfig.exchange_delay, DelayedMqConfig.routing_delay, "我是延迟插件的消息", new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDelay(10 * 1000);//消息的延迟时间System.out.println("延迟插件消息发送时间:"+sdf.format(new Date()) );return message;//放行消息}});return Result.ok();
}

接收消息

package com.atguigu.gmall.mq.receiver;@Component
public class DelayReceiver {@RabbitListener(queues = DelayedMqConfig.queue_delay_1)public void get(String msg) {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.println("消息接受时间: " + sdf.format(new Date()));System.out.println("消息内容是:"+msg)}}

报错了不用管,因为我们灭有使用封装的发送消息的rabbittservice 发送消息,没有往Redis放消息,发送延迟消息发送失败,会消息回调重新发送,会获取key,没有key就会报错:

修改消息回调 

加一个为空的判断:并且在接受消息的时候加一个消息确认 

 更改后就不报错了

这篇关于尚品汇-商品上下架完善(更新ES)、延迟消息(四十四)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1137044

相关文章

Redis延迟队列的实现示例

《Redis延迟队列的实现示例》Redis延迟队列是一种使用Redis实现的消息队列,本文主要介绍了Redis延迟队列的实现示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习... 目录一、什么是 Redis 延迟队列二、实现原理三、Java 代码示例四、注意事项五、使用 Redi

Redis缓存问题与缓存更新机制详解

《Redis缓存问题与缓存更新机制详解》本文主要介绍了缓存问题及其解决方案,包括缓存穿透、缓存击穿、缓存雪崩等问题的成因以及相应的预防和解决方法,同时,还详细探讨了缓存更新机制,包括不同情况下的缓存更... 目录一、缓存问题1.1 缓存穿透1.1.1 问题来源1.1.2 解决方案1.2 缓存击穿1.2.1

Linux Mint Xia 22.1重磅发布: 重要更新一览

《LinuxMintXia22.1重磅发布:重要更新一览》Beta版LinuxMint“Xia”22.1发布,新版本基于Ubuntu24.04,内核版本为Linux6.8,这... linux Mint 22.1「Xia」正式发布啦!这次更新带来了诸多优化和改进,进一步巩固了 Mint 在 Linux 桌面

SpringCloud配置动态更新原理解析

《SpringCloud配置动态更新原理解析》在微服务架构的浩瀚星海中,服务配置的动态更新如同魔法一般,能够让应用在不重启的情况下,实时响应配置的变更,SpringCloud作为微服务架构中的佼佼者,... 目录一、SpringBoot、Cloud配置的读取二、SpringCloud配置动态刷新三、更新@R

Ubuntu 24.04 LTS怎么关闭 Ubuntu Pro 更新提示弹窗?

《Ubuntu24.04LTS怎么关闭UbuntuPro更新提示弹窗?》Ubuntu每次开机都会弹窗提示安全更新,设置里最多只能取消自动下载,自动更新,但无法做到直接让自动更新的弹窗不出现,... 如果你正在使用 Ubuntu 24.04 LTS,可能会注意到——在使用「软件更新器」或运行 APT 命令时,

SpringBoot 自定义消息转换器使用详解

《SpringBoot自定义消息转换器使用详解》本文详细介绍了SpringBoot消息转换器的知识,并通过案例操作演示了如何进行自定义消息转换器的定制开发和使用,感兴趣的朋友一起看看吧... 目录一、前言二、SpringBoot 内容协商介绍2.1 什么是内容协商2.2 内容协商机制深入理解2.2.1 内容

MyBatis延迟加载的处理方案

《MyBatis延迟加载的处理方案》MyBatis支持延迟加载(LazyLoading),允许在需要数据时才从数据库加载,而不是在查询结果第一次返回时就立即加载所有数据,延迟加载的核心思想是,将关联对... 目录MyBATis如何处理延迟加载?延迟加载的原理1. 开启延迟加载2. 延迟加载的配置2.1 使用

poj3468(线段树成段更新模板题)

题意:包括两个操作:1、将[a.b]上的数字加上v;2、查询区间[a,b]上的和 下面的介绍是下解题思路: 首先介绍  lazy-tag思想:用一个变量记录每一个线段树节点的变化值,当这部分线段的一致性被破坏我们就将这个变化值传递给子区间,大大增加了线段树的效率。 比如现在需要对[a,b]区间值进行加c操作,那么就从根节点[1,n]开始调用update函数进行操作,如果刚好执行到一个子节点,

hdu1394(线段树点更新的应用)

题意:求一个序列经过一定的操作得到的序列的最小逆序数 这题会用到逆序数的一个性质,在0到n-1这些数字组成的乱序排列,将第一个数字A移到最后一位,得到的逆序数为res-a+(n-a-1) 知道上面的知识点后,可以用暴力来解 代码如下: #include<iostream>#include<algorithm>#include<cstring>#include<stack>#in

hdu1689(线段树成段更新)

两种操作:1、set区间[a,b]上数字为v;2、查询[ 1 , n ]上的sum 代码如下: #include<iostream>#include<algorithm>#include<cstring>#include<stack>#include<queue>#include<set>#include<map>#include<stdio.h>#include<stdl