RocketMQ中ACL权限控制

2024-09-04 03:08
文章标签 控制 rocketmq 权限 acl

本文主要是介绍RocketMQ中ACL权限控制,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1、什么是ACL?

ACL是access control list的简称,俗称访问控制列表。访问控制,基本上会涉及到用户、资源、权限、角色等概念,那在RocketMQ中上述会对应哪些对象呢?

  • 用户
    用户是访问控制的基础要素,也不难理解,RocketMQ ACL必然也会引入用户的概念,即支持用户名、密码。
  • 资源
    资源,需要保护的对象,在RocketMQ中,消息发送涉及的Topic、消息消费涉及的消费组,应该进行保护,故可以抽象成资源。
  • 权限
    针对资源,能进行的操作,
  • 角色
    RocketMQ中,只定义两种角色:是否是管理员。

另外,RocketMQ还支持按照客户端IP进行白名单设置。

2、ACL基本流程图

在讲解如何使用ACL之前,我们先简单看一下RocketMQ ACL的请求流程:
在这里插入图片描述
对于上述具体的实现,将在后续文章中重点讲解,本文的目的只是希望给读者一个大概的了解。

3、如何配置ACL

3.1 acl配置文件

acl默认的配置文件名:plain_acl.yml,需要放在${ROCKETMQ_HOME}/store/config目录下。下面对其配置项一一介绍。

3.1.1 globalWhiteRemoteAddresses

全局白名单,其类型为数组,即支持多个配置。其支持的配置格式如下:


  • 表示不设置白名单,该条规则默认返回false。
  • "*"
    表示全部匹配,该条规则直接返回true,将会阻断其他规则的判断,请慎重使用。
  • 192.168.0.{100,101}
    多地址配置模式,ip地址的最后一组,使用{},大括号中多个ip地址,用英文逗号(,)隔开。
  • 192.168.1.100,192.168.2.100
    直接使用,分隔,配置多个ip地址。
  • 192.168..或192.168.100-200.10-20
    每个IP段使用 "*" 或"-"表示范围。

3.1.2 accounts

配置用户信息,该类型为数组类型。拥有accessKey、secretKey、whiteRemoteAddress、admin、defaultTopicPerm、defaultGroupPerm、topicPerms、groupPerms子元素。

3.1.2.1 accessKey

登录用户名,长度必须大于6个字符。

3.1.2.2 secretKey

登录密码。长度必须大于6个字符。

3.1.2.3 whiteRemoteAddress

用户级别的IP地址白名单。其类型为一个字符串,其配置规则与globalWhiteRemoteAddresses,但只能配置一条规则。

3.1.2.4 admin

boolean类型,设置是否是admin。如下权限只有admin=true时才有权限执行。

  • UPDATE_AND_CREATE_TOPIC
    更新或创建主题。
  • UPDATE_BROKER_CONFIG
    更新Broker配置。
  • DELETE_TOPIC_IN_BROKER
    删除主题。
  • UPDATE_AND_CREATE_SUBSCRIPTIONGROUP
    更新或创建订阅组信息。
  • DELETE_SUBSCRIPTIONGROUP
    删除订阅组信息。

3.1.2.5 defaultTopicPerm

默认topic权限。该值默认为DENY(拒绝)。

3.1.2.6 defaultGroupPerm

默认消费组权限,该值默认为DENY(拒绝),建议值为SUB。

3.1.2.7 topicPerms

设置topic的权限。其类型为数组,其可选择值在下节介绍。

3.1.2.8 groupPerms

设置消费组的权限。其类型为数组,其可选择值在下节介绍。可以为每一消费组配置不一样的权限。

3.2 RocketMQ ACL权限可选值

  • DENY
    拒绝。
  • PUB
    拥有发送权限。
  • SUB
    拥有订阅权限。

3.3、权限验证流程

上面定义了全局白名单、用户级别的白名单,用户级别的权限,为了更好的配置ACL权限规则,下面给出权限匹配逻辑。
在这里插入图片描述

4、使用示例

4.1 Broker端安装

首先,需要在broker.conf文件中,增加参数aclEnable=true。并拷贝distribution/conf/plain_acl.yml文件到${ROCKETMQ_HOME}/conf目录。

broker.conf的配置文件如下:

brokerClusterName = DefaultCluster
brokerName = broker-b
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
listenPort=10915
storePathRootDir=E:/SH2019/tmp/rocketmq_home/rocketmq4.5MB/store
storePathCommitLog=E:/SH2019/tmp/rocketmq_home/rocketmq4.5MB/store/commitlog
namesrvAddr=127.0.0.1:9876
autoCreateTopicEnable=false
aclEnable=true

plain_acl.yml文件内容如下:

globalWhiteRemoteAddresses:accounts:
- accessKey: RocketMQsecretKey: 12345678whiteRemoteAddress:admin: falsedefaultTopicPerm: DENYdefaultGroupPerm: SUBtopicPerms:- TopicTest=PUBgroupPerms:# the group should convert to retry topic- oms_consumer_group=DENY- accessKey: adminsecretKey: 12345678whiteRemoteAddress:# if it is admin, it could access all resourcesadmin: true

从上面的配置可知,用户RocketMQ只能发送TopicTest的消息,其他topic无权限发送;拒绝oms_consumer_group消费组的消息消费,其他消费组默认可消费。

4.2 消息发送端示例

public class AclProducer {public static void main(String[] args) throws MQClientException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name", getAclRPCHook());producer.setNamesrvAddr("127.0.0.1:9876");producer.start();for (int i = 0; i < 1; i++) {try {Message msg = new Message("TopicTest3" ,"TagA" , ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);} catch (Exception e) {e.printStackTrace();Thread.sleep(1000);}}producer.shutdown();}static RPCHook getAclRPCHook() {return new AclClientRPCHook(new SessionCredentials("rocketmq","12345678"));}
}

运行效果如图所示:
在这里插入图片描述

4.3 消息消费端示例

public class AclConsumer {public static void main(String[] args) throws InterruptedException, MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4", getAclRPCHook(),new AllocateMessageQueueAveragely());consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe("TopicTest", "*");consumer.setNamesrvAddr("127.0.0.1:9876");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf("Consumer Started.%n");}static RPCHook getAclRPCHook() {return new AclClientRPCHook(new SessionCredentials("rocketmq","12345678"));}
}

发现并不没有消费消息,符合预期。

参考Demo:消费者

@Overridepublic void run(String... args) throws Exception {System.out.println("通过实现CommandLineRunner接口,在spring boot项目启动后打印参数");// AccessKey 身份验证AclClientRPCHook rpcHook = new AclClientRPCHook(new SessionCredentials(accessKey,secretKey));consumer = new DefaultMQPushConsumer(consumerGroupId, rpcHook,new AllocateMessageQueueAveragely());//consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);//设置 TCP 接入域名consumer.setNamesrvAddr(nameserverMQ);//消费模式:一个新的订阅组第一次启动从队列的最后位置开始消费 后续再启动接着上次消费的进度开始消费consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);consumer.setVipChannelEnabled(false);consumer.setConsumeTimeout(3);consumer.setMessageModel(MessageModel.BROADCASTING);//广播订阅方式//订阅主题和 标签( * 代表所有标签)下信息String later = "_" + rocketmqAreacode + "_" + hikvisionId;consumer.subscribe(MQIOTDEV001 + later,  MqServerTypeConstant.YCCK);// //注册消费的监听 并在此监听中消费信息,并返回消费的状态信息consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {// msgs中只收集同一个topic,同一个tag,并且key相同的message// 会把不同的消息分别放置到不同的队列中try {for (Message msg : msgs) {//消费者获取消息 这里只输出 不做后面逻辑处理String body = new String(msg.getBody(), "utf-8");log.info("Consumer-获取消息-主题topic为={}, 消费消息为={}", msg.getTopic(), body);} catch (Exception e) {log.error("0x80517800消费远程操控接口失败"+e.getMessage());}}} catch (UnsupportedEncodingException e) {e.printStackTrace();return ConsumeConcurrentlyStatus.RECONSUME_LATER;}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});consumer.start();System.out.println("消费者 启动成功=======");}

消费者:

public void sendControl(BaseResponse baseResponse) {try {String later = "_" + rocketmqAreacode + "_" + hikvisionId;
//        		DefaultMQPushConsumer dconsumer = new DefaultMQPushConsumer("my_test_group");AclClientRPCHook rpcHook = new AclClientRPCHook(new SessionCredentials(remoteProperties.getAccessKey(),remoteProperties.getSecretKey()));DefaultMQProducer producer = new DefaultMQProducer(producerGroupIOTDEV001, rpcHook);producer.setNamesrvAddr(nameserverMQ);producer.setInstanceName(RunTimeUtil.getRocketMqUniqeInstanceName());producer.start();Message msg = new Message(MQIOTDEV001 + later, MqServerTypeConstant.YCCK,// Message Tag,可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在MQ服务器过滤MqServerTypeConstant.YCCK,// Message Body// 任何二进制形式的数据, MQ不做任何干预,// 需要Producer与Consumer协商好一致的序列化和反序列化方式JsonUtils.object2Json(baseResponse).getBytes("UTF-8"));//同步发送消息producer.send(msg);// 在callback返回之前即可取得msgId。// 在应用退出前,销毁Producer对象。注意:如果不销毁也没有问题log.info("推送反控响应信息到rocketmq:同步发送消息");producer.shutdown();}catch (Exception e) {log.error("0x80517800推送响应信息到rocketmq失败:"+e.getMessage());e.printStackTrace();}}

 

关于RocketMQ ACL的使用就介绍到这里了,下一篇将介绍RocketMQ ACL实现原理。

 

这篇关于RocketMQ中ACL权限控制的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1134827

相关文章

Spring Security 基于表达式的权限控制

前言 spring security 3.0已经可以使用spring el表达式来控制授权,允许在表达式中使用复杂的布尔逻辑来控制访问的权限。 常见的表达式 Spring Security可用表达式对象的基类是SecurityExpressionRoot。 表达式描述hasRole([role])用户拥有制定的角色时返回true (Spring security默认会带有ROLE_前缀),去

字节面试 | 如何测试RocketMQ、RocketMQ?

字节面试:RocketMQ是怎么测试的呢? 答: 首先保证消息的消费正确、设计逆向用例,在验证消息内容为空等情况时的消费正确性; 推送大批量MQ,通过Admin控制台查看MQ消费的情况,是否出现消费假死、TPS是否正常等等问题。(上述都是临场发挥,但是RocketMQ真正的测试点,还真的需要探讨) 01 先了解RocketMQ 作为测试也是要简单了解RocketMQ。简单来说,就是一个分

控制反转 的种类

之前对控制反转的定义和解释都不是很清晰。最近翻书发现在《Pro Spring 5》(免费电子版在文章最后)有一段非常不错的解释。记录一下,有道翻译贴出来方便查看。如有请直接跳过中文,看后面的原文。 控制反转的类型 控制反转的类型您可能想知道为什么有两种类型的IoC,以及为什么这些类型被进一步划分为不同的实现。这个问题似乎没有明确的答案;当然,不同的类型提供了一定程度的灵活性,但

深入解析秒杀业务中的核心问题 —— 从并发控制到事务管理

深入解析秒杀业务中的核心问题 —— 从并发控制到事务管理 秒杀系统是应对高并发、高压力下的典型业务场景,涉及到并发控制、库存管理、事务管理等多个关键技术点。本文将深入剖析秒杀商品业务中常见的几个核心问题,包括 AOP 事务管理、同步锁机制、乐观锁、CAS 操作,以及用户限购策略。通过这些技术的结合,确保秒杀系统在高并发场景下的稳定性和一致性。 1. AOP 代理对象与事务管理 在秒杀商品

PostgreSQL中的多版本并发控制(MVCC)深入解析

引言 PostgreSQL作为一款强大的开源关系数据库管理系统,以其高性能、高可靠性和丰富的功能特性而广受欢迎。在并发控制方面,PostgreSQL采用了多版本并发控制(MVCC)机制,该机制为数据库提供了高效的数据访问和更新能力,同时保证了数据的一致性和隔离性。本文将深入解析PostgreSQL中的MVCC功能,探讨其工作原理、使用场景,并通过具体SQL示例来展示其在实际应用中的表现。 一、

Golang进程权限调度包runtime

关于 runtime 包几个方法: Gosched:让当前线程让出 cpu 以让其它线程运行,它不会挂起当前线程,因此当前线程未来会继续执行GOMAXPROCS:设置最大的可同时使用的 CPU 核数Goexit:退出当前 goroutine(但是defer语句会照常执行)NumGoroutine:返回正在执行和排队的任务总数GOOS:目标操作系统NumCPU:返回当前系统的 CPU 核数量 p

网络学习-eNSP配置ACL

AR1路由器配置 <Huawei>system-viewEnter system view, return user view with Ctrl+Z.[Huawei]undo info-center enableInfo: Information center is disabled.[Huawei]interface gigabitethernet 0/0/0[Huawei-G

vue2实践:el-table实现由用户自己控制行数的动态表格

需求 项目中需要提供一个动态表单,如图: 当我点击添加时,便添加一行;点击右边的删除时,便删除这一行。 至少要有一行数据,但是没有上限。 思路 这种每一行的数据固定,但是不定行数的,很容易想到使用el-table来实现,它可以循环读取:data所绑定的数组,来生成行数据,不同的是: 1、table里面的每一个cell,需要放置一个input来支持用户编辑。 2、最后一列放置两个b

【Rocketmq入门-基本概念】

Rocketmq入门-基本概念 名词解释名称服务器(NameServer)消息队列(Message Queue)主题(Topic)标签(Tag)生产者(Producer)消费者(Consumer)拉取模式(Pull)推送模式(Push)消息模型(Message Model) 关键组件Broker消息存储工作流程 名词解释 名称服务器(NameServer) 定义: 名称服务器

android java.io.IOException: open failed: ENOENT (No such file or directory)-api23+权限受权

问题描述 在安卓上,清单明明已经受权了读写文件权限,但偏偏就是创建不了目录和文件 调用mkdirs()总是返回false. <uses-permission android:name="android.permission.WRITE_EXTERNAL_STORAGE"/><uses-permission android:name="android.permission.READ_E