SpringBoot集成redis的JedisCluster和RedisTemplate 实现redis的消息队列消费者-生产者模式,订阅者发布者模式

本文主要是介绍SpringBoot集成redis的JedisCluster和RedisTemplate 实现redis的消息队列消费者-生产者模式,订阅者发布者模式,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一般来说,消息队列有两种场景,一种是发布者订阅者模式,一种是生产者消费者模式。利用redis这两种场景的消息队列都能够实现。
定义:

  • 生产者消费者模式 :生产者生产消息放到队列里,多个消费者同时监听队列,谁先抢到消息谁就会从队列中取走消息;即对于每个消息只能被最多一个消费者拥有。
  • 发布者订阅者模式:发布者生产消息放到队列里,多个监听队列的消费者都会收到同一份消息;即正常情况下每个消费者收到的消息应该都是一样的。

Redis不仅可作为缓存服务器,还可用作消息队列。它的列表类型天生支持用作消息队列。如下图所示:
在这里插入图片描述
由于Redis的列表是使用双向链表实现的,保存了头尾节点,所以在列表头尾两边插取元素都是非常快的。

在Redis中,List类型是按照插入顺序排序的字符串链表。和数据结构中的普通链表一样,我们可以在其头部(left)和尾部(right)添加新的元素。在插入时,如果该键并不存在,Redis将为该键创建一个新的链表。与此相反,如果链表中所有的元素均被移除,那么该键也将会被从数据库中删除。List中可以包含的最大元素数量是4294967295。
从元素插入和删除的效率视角来看,如果我们是在链表的两头插入或删除元素,这将会是非常高效的操作,即使链表中已经存储了百万条记录,该操作也可以在常量时间内完成。然而需要说明的是,如果元素插入或删除操作是作用于链表中间,那将会是非常低效的。相信对于有良好数据结构基础的开发者而言,这一点并不难理解。

Redis List的主要操作为lpush/lpop/rpush/rpop四种,分别代表从头部和尾部的push/pop,除此之外List还提供了两种pop操作的阻塞版本blpop/brpop,用于阻塞获取一个对象。

Redis通常都被用做一个处理各种后台工作或消息任务的消息服务器。 一个简单的队列模式就是:生产者把消息放入一个列表中,等待消息的消费者用 RPOP 命令(用轮询方式), 或者用 BRPOP 命令(如果客户端使用阻塞操作会更好)来得到这个消息。

以下列举SpringBoot集成redis的JedisCluster和RedisTemplate

引入依赖到pom.xml

        <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>jcl-over-slf4j</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId></dependency><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId></dependency>

application.yml增加redis集群配置

spring:redis:password:clusters: 10.10.1.238:7000,10.10.1.238:7001,10.10.1.238:7002,10.10.1.238:7003,10.10.1.238:7004,10.10.1.238:7005

RedisConfig配置

package com.example.myframe.config;import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.cache.interceptor.KeyGenerator;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisClusterConfiguration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisNode;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPoolConfig;import java.lang.reflect.Field;@Configuration
public class RedisConfig extends CachingConfigurerSupport {/**redis密码**/@Value("${spring.redis.password}")public String password;@Value("${spring.redis.clusters}")public String cluster;@Beanpublic KeyGenerator keyGenerator() {return (target, method, params) -> {StringBuilder sb = new StringBuilder();sb.append(target.getClass().getName());sb.append(method.getName());for (Object obj : params) {sb.append(obj.toString());}return sb.toString();};}public  Object getFieldValueByObject (Object object , String targetFieldName) throws Exception {// 获取该对象的ClassClass objClass = object.getClass();// 获取所有的属性数组Field[] fields = objClass.getDeclaredFields();for (Field field:fields) {// 属性名称field.setAccessible(true);String currentFieldName = field.getName();if(currentFieldName.equals(targetFieldName)){return field.get(object); // 通过反射拿到该属性在此对象中的值(也可能是个对象)}}return null;}/*** 通过反射获取JedisCluster* @param factory* @return*/@Beanpublic JedisCluster redisCluster(RedisConnectionFactory factory){Object object =null;try {object= getFieldValueByObject(factory,"cluster");} catch (Exception e) {e.printStackTrace();}return (JedisCluster)object;}@Beanpublic RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) {StringRedisTemplate template = new StringRedisTemplate(factory);Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);ObjectMapper om = new ObjectMapper();om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);jackson2JsonRedisSerializer.setObjectMapper(om);template.setValueSerializer(jackson2JsonRedisSerializer);template.afterPropertiesSet();return template;}@Bean(name="factory")public RedisConnectionFactory factory(RedisClusterConfiguration clusterConfig){JedisConnectionFactory redisConnectionFactory=new JedisConnectionFactory(clusterConfig);String redisPassword = password;redisConnectionFactory.setPassword(redisPassword);redisConnectionFactory.setPoolConfig(createJedisPoolConfig());redisConnectionFactory.setTimeout(30000);redisConnectionFactory.setUsePool(true);  return redisConnectionFactory;  }@Bean(name="clusterConfig")public RedisClusterConfiguration clusterConfig(){RedisClusterConfiguration config = new RedisClusterConfiguration();String[] nodes = cluster.split(",");for(String node : nodes){String[] host =  node.split(":");RedisNode redis = new RedisNode(host[0], Integer.parseInt(host[1]));config.addClusterNode(redis);}return config;}public JedisPoolConfig createJedisPoolConfig(){JedisPoolConfig config = new JedisPoolConfig();//连接耗尽时是否阻塞, false报异常,ture阻塞直到超时, 默认trueconfig.setBlockWhenExhausted(false);//设置的逐出策略类名, 默认DefaultEvictionPolicy(当连接超过最大空闲时间,或连接数超过最大空闲连接数)config.setEvictionPolicyClassName("org.apache.commons.pool2.impl.DefaultEvictionPolicy");//是否启用pool的jmx管理功能, 默认trueconfig.setJmxEnabled(true);//MBean ObjectName = new ObjectName("org.apache.commons.pool2:type=GenericObjectPool,name=" + "pool" + i); 默 认为"pool", JMX不熟,具体不知道是干啥的...默认就好.config.setJmxNamePrefix("pool");//是否启用后进先出, 默认trueconfig.setLifo(true);//最大空闲连接数, 默认8个config.setMaxIdle(2000);//最大连接数, 默认8个config.setMaxTotal(5000);//获取连接时的最大等待毫秒数(如果设置为阻塞时BlockWhenExhausted),如果超时就抛异常, 小于零:阻塞不确定的时间,  默认-1config.setMaxWaitMillis(10000);//逐出连接的最小空闲时间 默认1800000毫秒(30分钟)config.setMinEvictableIdleTimeMillis(1800000);//最小空闲连接数, 默认0config.setMinIdle(0);//每次逐出检查时 逐出的最大数目 如果为负数就是 : 1/abs(n), 默认3config.setNumTestsPerEvictionRun(3);//对象空闲多久后逐出, 当空闲时间>该值 且 空闲连接>最大空闲数 时直接逐出,不再根据MinEvictableIdleTimeMillis判断  (默认逐出策略)   config.setSoftMinEvictableIdleTimeMillis(1800000);//在获取连接的时候检查有效性, 默认falseconfig.setTestOnBorrow(false);//在空闲时检查有效性, 默认falseconfig.setTestWhileIdle(false);//逐出扫描的时间间隔(毫秒) 如果为负数,则不运行逐出线程, 默认-1config.setTimeBetweenEvictionRunsMillis(-1);return config;}}

RedisService.java辅助类

package com.example.myframe.redis.service;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.*;
import org.springframework.stereotype.Service;import java.io.Serializable;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;@Service("redisService")
public class RedisService {@Autowiredprivate RedisTemplate redisTemplate;/*** 写入缓存* @param key* @param value* @return*/public boolean set(final String key, Object value) {boolean result = false;try {ValueOperations<Serializable, Object> operations = redisTemplate.opsForValue();operations.set(key, value);result = true;} catch (Exception e) {e.printStackTrace();}return result;}/*** 写入缓存设置时效时间* @param key* @param value* @return*/public boolean set(final String key, Object value, Long expireTime) {boolean result = false;try {ValueOperations<Serializable, Object> operations = redisTemplate.opsForValue();operations.set(key, value);redisTemplate.expire(key, expireTime, TimeUnit.SECONDS);result = true;} catch (Exception e) {e.printStackTrace();}return result;}/*** 批量删除对应的value* @param keys*/public void remove(final String... keys) {for (String key : keys) {remove(key);}}/*** 批量删除key* @param pattern*/public void removePattern(final String pattern) {Set<Serializable> keys = redisTemplate.keys(pattern);if (keys.size() > 0){redisTemplate.delete(keys);}}/*** 删除对应的value* @param key*/public void remove(final String key) {if (exists(key)) {redisTemplate.delete(key);}}/*** 判断缓存中是否有对应的value* @param key* @return*/public boolean exists(final String key) {return redisTemplate.hasKey(key);}/*** 读取缓存* @param key* @return*/public Object get(final String key) {Object result = null;ValueOperations<Serializable, Object> operations = redisTemplate.opsForValue();result = operations.get(key);return result;}/*** 哈希 添加* @param key* @param hashKey* @param value*/public void hmSet(String key, Object hashKey, Object value){HashOperations<String, Object, Object> hash = redisTemplate.opsForHash();hash.put(key,hashKey,value);}/*** 哈希 获取哈希的key集合* @param key* @return*/public Set<Object> hmKeys(String key){HashOperations<String, Object, Object> hash = redisTemplate.opsForHash();return hash.keys(key);        }/*** 哈希 删除哈希的key* @param key* @param hashKey*/public void hmDelete(String key,String hashKey){HashOperations<String, Object, Object> hash = redisTemplate.opsForHash();hash.delete(key, hashKey);       }    /*** 哈希获取数据* @param key* @param hashKey* @return*/public Object hmGet(String key, Object hashKey){HashOperations<String, Object, Object> hash = redisTemplate.opsForHash();return hash.get(key,hashKey);}/*** 获取所有key值* @param key* @return*/public Set<Object>  hmKeySet(String key){HashOperations<String, Object, Object> hash = redisTemplate.opsForHash();return hash.keys(key);	}/*** 获取所有key值* @param key* @return*/public void  hmRemove(String key, Object hashKey){HashOperations<String, Object, Object> hash = redisTemplate.opsForHash();hash.delete(key, hashKey); 	}/*** 列表添加* @param k* @param v*/public void lPush(String k,Object v){ListOperations<String, Object> list = redisTemplate.opsForList();list.rightPush(k,v);}/*** 列表获取* @param k* @param l* @param l1* @return*/public List<Object> lRange(String k, long l, long l1){ListOperations<String, Object> list = redisTemplate.opsForList();return list.range(k,l,l1);}/*** 集合添加* @param key* @param value*/public void add(String key,Object value){SetOperations<String, Object> set = redisTemplate.opsForSet();set.add(key,value);}/*** 集合获取* @param key* @return*/public Set<Object> setMembers(String key){SetOperations<String, Object> set = redisTemplate.opsForSet();return set.members(key);}/*** 集合长度* @param key* @return*/public Long setSize(String key){SetOperations<String, Object> set = redisTemplate.opsForSet();return set.size(key);}/*** 集合获取* @param key* @param count* @return*/public Set<Object> setMembers(String key, int count){SetOperations<String, Object> set = redisTemplate.opsForSet();return set.distinctRandomMembers(key, count);}/*** 删除集合数据* @param key* @param value*/public void remove(String key, Object value){SetOperations<String, Object> set = redisTemplate.opsForSet();set.remove(key, value);}/*** 有序集合添加* @param key* @param value* @param scoure*/public void zAdd(String key,Object value,double scoure){ZSetOperations<String, Object> zset = redisTemplate.opsForZSet();zset.add(key,value,scoure);}/*** 有序集合获取* @param key* @param scoure* @param scoure1* @return*/public Set<Object> rangeByScore(String key,double scoure,double scoure1){ZSetOperations<String, Object> zset = redisTemplate.opsForZSet();return zset.rangeByScore(key, scoure, scoure1);}/*** 消息队列实现* @param channel* @param message*/public void convertAndSend(String channel, Object message){redisTemplate.convertAndSend(channel, message);}/*** 数列添加* @param key* @param value*/public void addList(String key,Object value){ListOperations<String, Object> list = redisTemplate.opsForList();list.rightPush(key, value);}/*** 数列获取* @param key* @return*/public List<Object> getList(String key){ListOperations<String, Object> list = redisTemplate.opsForList();return list.range(key, 0, list.size(key));}/*** 左弹出数列* @param key* @return*/public Object popList(String key) {ListOperations<String, Object> list = redisTemplate.opsForList();return list.leftPop(key);}public Long increment(String k, Long l) {return redisTemplate.opsForValue().increment(k, l);}}

TestRedisController.java

package com.example.myframe.controller;import com.example.myframe.redis.RedisUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
@RequestMapping(value = "/redis")
public class TestRedisController {@AutowiredRedisUtil redisUtil;/*** 生产者 通过此方法来往redis的list的尾部插入数据*/@RequestMapping("/shengchangzhe")public void shengChangZhe() {redisUtil.dealShengChangZhe();}/*** 消费者 ,通过此方法往redis的list的头部获取数据,直到list没有数据 阻塞,等到一有数据又继续获取*/@RequestMapping("/xiaoFeiZhe")public void xiaoFeiZhe() {redisUtil.dealXiaoFeiZhe();}/*** 发布者,发布信息,监听器一旦接收到监听,就进行操作*/@RequestMapping("/faBuDingYue")public void faBuDingYue() {redisUtil.dealFaBuDingYue();}}
package com.example.myframe.redis;import com.example.myframe.redis.service.RedisService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import redis.clients.jedis.JedisCluster;import java.util.List;@Component
public class RedisUtil {@AutowiredJedisCluster jedisCluster;@Autowiredprivate RedisService redisService;/*** 生产者*/public void dealShengChangZhe() {for (int i = 0; i < 10; i++) {jedisCluster.rpush("ceshi", "value1_" + i);}}/*** 消费者*/public void dealXiaoFeiZhe() {while (true) {//阻塞式brpop,List中无数据时阻塞,参数0表示一直阻塞下去,直到List出现数据List<String> listingList = jedisCluster.blpop(0, "ceshi");System.out.println("线程取数据:{}" + listingList.get(1));}}/*** 发布订阅模式*/public void dealFaBuDingYue() {redisService.convertAndSend("dealFaBuDingYue", "我是来发布信息的");}
}

redis消息队列监听信息

package com.example.myframe.redis.msg;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;@Configuration
public class RedisMsgListener {@AutowiredReceiver receiver;/*** redis消息队列监听信息* * @param connectionFactory* @param listenerAdapter* @return*/@BeanRedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,MessageListenerAdapter listenerAdapter) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.addMessageListener(listenerAdapter, new PatternTopic("dealFaBuDingYue"));return container;}/*** 监听方法* * @return*/@Bean(name = "listenerAdapter")MessageListenerAdapter listenerAdapter() {// 回调数据处理方法return new MessageListenerAdapter(receiver, "dealJt");}
}
package com.example.myframe.redis.msg;import com.example.myframe.redis.service.RedisService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;@Service("receiver")
public class Receiver{@Autowiredprivate RedisService redisService;@Autowiredprivate StringRedisTemplate redisTemplate;/*** 清除外部广告位本地缓存* @param message*/public  void dealJt(String message){System.out.println("我是用来监听信息的");System.out.println(message);}/*** 清除外部广告位本地缓存* @param message*/public  void dealJt1(String message){System.out.println("我是用来监听信息的1");System.out.println(message);}}

这篇关于SpringBoot集成redis的JedisCluster和RedisTemplate 实现redis的消息队列消费者-生产者模式,订阅者发布者模式的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/400031

相关文章

pandas中位数填充空值的实现示例

《pandas中位数填充空值的实现示例》中位数填充是一种简单而有效的方法,用于填充数据集中缺失的值,本文就来介绍一下pandas中位数填充空值的实现,具有一定的参考价值,感兴趣的可以了解一下... 目录什么是中位数填充?为什么选择中位数填充?示例数据结果分析完整代码总结在数据分析和机器学习过程中,处理缺失数

Golang HashMap实现原理解析

《GolangHashMap实现原理解析》HashMap是一种基于哈希表实现的键值对存储结构,它通过哈希函数将键映射到数组的索引位置,支持高效的插入、查找和删除操作,:本文主要介绍GolangH... 目录HashMap是一种基于哈希表实现的键值对存储结构,它通过哈希函数将键映射到数组的索引位置,支持

Java学习手册之Filter和Listener使用方法

《Java学习手册之Filter和Listener使用方法》:本文主要介绍Java学习手册之Filter和Listener使用方法的相关资料,Filter是一种拦截器,可以在请求到达Servl... 目录一、Filter(过滤器)1. Filter 的工作原理2. Filter 的配置与使用二、Listen

Pandas使用AdaBoost进行分类的实现

《Pandas使用AdaBoost进行分类的实现》Pandas和AdaBoost分类算法,可以高效地进行数据预处理和分类任务,本文主要介绍了Pandas使用AdaBoost进行分类的实现,具有一定的参... 目录什么是 AdaBoost?使用 AdaBoost 的步骤安装必要的库步骤一:数据准备步骤二:模型

Spring Boot中JSON数值溢出问题从报错到优雅解决办法

《SpringBoot中JSON数值溢出问题从报错到优雅解决办法》:本文主要介绍SpringBoot中JSON数值溢出问题从报错到优雅的解决办法,通过修改字段类型为Long、添加全局异常处理和... 目录一、问题背景:为什么我的接口突然报错了?二、为什么会发生这个错误?1. Java 数据类型的“容量”限制

使用Pandas进行均值填充的实现

《使用Pandas进行均值填充的实现》缺失数据(NaN值)是一个常见的问题,我们可以通过多种方法来处理缺失数据,其中一种常用的方法是均值填充,本文主要介绍了使用Pandas进行均值填充的实现,感兴趣的... 目录什么是均值填充?为什么选择均值填充?均值填充的步骤实际代码示例总结在数据分析和处理过程中,缺失数

Java对象转换的实现方式汇总

《Java对象转换的实现方式汇总》:本文主要介绍Java对象转换的多种实现方式,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录Java对象转换的多种实现方式1. 手动映射(Manual Mapping)2. Builder模式3. 工具类辅助映

SpringBoot请求参数接收控制指南分享

《SpringBoot请求参数接收控制指南分享》:本文主要介绍SpringBoot请求参数接收控制指南,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录Spring Boot 请求参数接收控制指南1. 概述2. 有注解时参数接收方式对比3. 无注解时接收参数默认位置

Go语言开发实现查询IP信息的MCP服务器

《Go语言开发实现查询IP信息的MCP服务器》随着MCP的快速普及和广泛应用,MCP服务器也层出不穷,本文将详细介绍如何在Go语言中使用go-mcp库来开发一个查询IP信息的MCP... 目录前言mcp-ip-geo 服务器目录结构说明查询 IP 信息功能实现工具实现工具管理查询单个 IP 信息工具的实现服

SpringBoot基于配置实现短信服务策略的动态切换

《SpringBoot基于配置实现短信服务策略的动态切换》这篇文章主要为大家详细介绍了SpringBoot在接入多个短信服务商(如阿里云、腾讯云、华为云)后,如何根据配置或环境切换使用不同的服务商,需... 目录目标功能示例配置(application.yml)配置类绑定短信发送策略接口示例:阿里云 & 腾