EMQ共享订阅

2024-05-05 06:48
文章标签 共享 订阅 emq

本文主要是介绍EMQ共享订阅,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1 共享订阅

多个客户端订阅了同一个主题,发布者发布主题时,每个客户端都会同时收到这个主题的消息。在客户端集群部署的场景下会出现消息重复处理的问题。
EMQ支持共享订阅,多个客户端订阅了同一个主题,发布者发布主题时,只有其中一个客户端接收到消息。
共享订阅有两种方式:
(1)共享订阅:订阅前缀$queue/
多个客户端订阅了$queue/topic,发布者发布到topic,则只有一个客户端会接收到消息。
(2)分组订阅:订阅前缀$share/<group>/
多组客户端订阅了$queue/group1/topic、$queue/group2/topic...,发布者发布到topic,则消息会发布到每个group中,但是每个group中只有一个客户端会接收到消息。

2 Java客户端实现共享订阅

开发时发现,使用eclipse paho java客户端时,无法处理共享订阅。订阅$queue/topic能够订阅成功,并且跟踪代码能看到emq也把消息转发到了客户端,但是客户端丢弃掉了。
解决方法就是重写mqtt的回调函数,实现MqttCallback接口。

 

实现MqttCallback接口的代码如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

package com.emqtest.emqtest;

 

 

import java.util.HashMap;

import java.util.Map;

 

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;

import org.eclipse.paho.client.mqttv3.IMqttMessageListener;

import org.eclipse.paho.client.mqttv3.MqttCallback;

import org.eclipse.paho.client.mqttv3.MqttMessage;

import org.eclipse.paho.client.mqttv3.MqttTopic;

 

public class SharedSubCallbackRouter implements MqttCallback {

    private Map<String, IMqttMessageListener> topicFilterListeners;

 

    public SharedSubCallbackRouter(Map<String, IMqttMessageListener> topicFilterListeners) {

        this.topicFilterListeners = topicFilterListeners;

    }

 

    public void addSubscriber(String topicFilter, IMqttMessageListener listener) {

        if (this.topicFilterListeners == null) {

             this.topicFilterListeners = new HashMap<>();

        }

        this.topicFilterListeners.put(topicFilter, listener);

    }

 

    @Override

    public void connectionLost(Throwable cause) {

 

    }

 

    @Override

    public void messageArrived(String topic, MqttMessage message) throws Exception {

        for (Map.Entry<String, IMqttMessageListener> listenerEntry : topicFilterListeners.entrySet()) {

            String topicFilter = listenerEntry.getKey();

            if (isMatched(topicFilter, topic)) {

                listenerEntry.getValue().messageArrived(topic, message);

            }

        }

    }

 

    @Override

    public void deliveryComplete(IMqttDeliveryToken token) {

 

    }

 

    /**

     * Paho topic matcher does not work with shared subscription topic filter of emqttd

     * https://github.com/eclipse/paho.mqtt.java/issues/367#issuecomment-300100385

     * <p>

     * http://emqtt.io/docs/v2/advanced.html#shared-subscription

     *

     * @param topicFilter the topicFilter for mqtt

     * @param topic       the topic

     * @return boolean for matched

     */

    private boolean isMatched(String topicFilter, String topic) {

        if (topicFilter.startsWith("$queue/")) {

            topicFilter = topicFilter.replaceFirst("\\$queue/""");

        else if (topicFilter.startsWith("$share/")) {

            topicFilter = topicFilter.replaceFirst("\\$share/""");

            topicFilter = topicFilter.substring(topicFilter.indexOf('/'));

        }

        return MqttTopic.isMatched(topicFilter, topic);

    }

}

 

创建emq连接代码如下:

1

2

3

4

5

6

7

mqttClient = new MqttClient("tcp://localhost:1883""MqttClient");

mqttClient.connect();

Map<String, IMqttMessageListener> listeners = new HashMap<>();

IMqttMessageListener emqListener = new EmqListener();

listeners.put("$queue/testmqtt", emqListener);

mqttClient.setCallback(new SharedSubCallbackRouter(listeners));

mqttClient.subscribe("$queue/testmqtt"new EmqListener());

 

还要再写一个实现IMqttMessageListener接口的Emq消息处理类:

1

2

3

4

5

6

7

8

9

10

11

12

@Component

public class EmqListener implements IMqttMessageListener {

 

  @Override

  public void messageArrived(String topic, MqttMessage message) throws Exception {

    try {

      System.out.println("topic: " + topic);

    catch (Exception e) {

      e.printStackTrace();

    }

  }

}

 

 

 

 

参考链接:

1 emq的github上关于这个问题的讨论:
https://github.com/emqx/emqx/issues/921#event-1023359646
2 网上有人给的一个解决方法示例代码:
https://github.com/yogin16/paho-shared-sub-example
3 eclipse paho的github链接:
https://github.com/eclipse/paho.mqtt.java

这篇关于EMQ共享订阅的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/961042

相关文章

怎么让1台电脑共享给7人同时流畅设计

在当今的创意设计与数字内容生产领域,图形工作站以其强大的计算能力、专业的图形处理能力和稳定的系统性能,成为了众多设计师、动画师、视频编辑师等创意工作者的必备工具。 设计团队面临资源有限,比如只有一台高性能电脑时,如何高效地让七人同时流畅地进行设计工作,便成为了一个亟待解决的问题。 一、硬件升级与配置 1.高性能处理器(CPU):选择多核、高线程的处理器,例如Intel的至强系列或AMD的Ry

# VMware 共享文件

VMware tools快速安装 VMware 提供了 open-vm-tools,这是 VMware 官方推荐的开源工具包,通常不需要手动安装 VMware Tools,因为大多数 Linux 发行版(包括 Ubuntu、CentOS 等)都包含了 open-vm-tools,并且已经优化以提供与 VMware 环境的兼容性和功能支持。 建议按照以下步骤安装 open-vm-tools 而不

未来工作趋势:零工小程序在共享经济中的作用

经济在不断发展的同时,科技也在飞速发展。零工经济作为一种新兴的工作模式,正在全球范围内迅速崛起。特别是在中国,随着数字经济的蓬勃发展和共享经济模式的深入推广,零工小程序在促进就业、提升资源利用效率方面显示出了巨大的潜力和价值。 一、零工经济的定义及现状 零工经济是指通过临时性、自由职业或项目制的工作形式,利用互联网平台快速匹配供需双方的新型经济模式。这种模式打破了传统全职工作的界限,为劳动

【C++】作用域指针、智能指针、共享指针、弱指针

十、智能指针、共享指针 从上篇文章 【C++】如何用C++创建对象,理解作用域、堆栈、内存分配-CSDN博客 中我们知道,你的对象是创建在栈上还是在堆上,最大的区别就是对象的作用域不一样。所以在C++中,一旦程序进入另外一个作用域,那其他作用域的对象就自动销毁了。这种机制有好有坏。我们可以利用这个机制,比如可以自动化我们的代码,像智能指针、作用域锁(scoped_lock)等都是利用了这种机制。

OpenStack:Glance共享与上传、Nova操作选项解释、Cinder操作技巧

目录 Glance member task Nova lock shelve rescue Cinder manage local-attach transfer backup-export 总结 原作者:int32bit,参考内容 从2013年开始折腾OpenStack也有好几年的时间了。在使用过程中,我发现有很多很有用的操作,但是却很少被提及。这里我暂不直接

基于springboot+vue+uniapp的“共享书角”图书借还管理系统小程序

开发语言:Java框架:springboot+uniappJDK版本:JDK1.8服务器:tomcat7数据库:mysql 5.7(一定要5.7版本)数据库工具:Navicat11开发软件:eclipse/myeclipse/ideaMaven包:Maven3.3.9 系统展示 后台登录界面 管理员功能界面 出借者管理 图书信息管理 图书归还管理 出租收入管理

C++编程:ZeroMQ进程间(订阅-发布)通信配置优化

文章目录 0. 概述1. 发布者同步发送(pub)与订阅者异步接收(sub)示例代码可能的副作用: 2. 适度增加缓存和队列示例代码副作用: 3. 动态的IPC通道管理示例代码副作用: 4. 接收消息的超时设置示例代码副作用: 5. 增加I/O线程数量示例代码副作用: 6. 异步消息发送(使用`dontwait`标志)示例代码副作用: 7. 其他可以考虑的优化项7.1 立即发送(ZMQ_IM

Nacos Config 配置中心支持配置共享

文章目录 一、什么是配置中心二、Nacos Config2.1 Nacos Config 工作原理 (★)2.2 Nacos Config 的使用2.3 动态刷新2.4 配置共享2.4.1 同一个微服务的不同环境之间共享配置2.4.2 不同微服务中间共享配置 一、什么是配置中心 微服务架构下关于配置文件的存在以下问题: 配置文件相对分散。在一个微服务架构下,配置文件会随

【0324】Postgres内核 Shared Buffer Access Rules (共享缓冲区访问规则)说明

0. 章节内容 1. 共享磁盘缓冲区访问机制 (shared disk buffers) 共享磁盘缓冲区有两套独立的访问控制机制:引用计数(a/k/a pin 计数)和缓冲区内容锁。(实际上,还有第三级访问控制:在访问任何属于某个关系表的页面之前,必须持有该关系表的适当类型的锁。这里不讨论关系级锁。) Pins 在对缓冲区做任何操作之前,必须“对缓冲区pin”(即增加其引用计数, re