EMQ共享订阅

2024-05-05 06:48
文章标签 共享 订阅 emq

本文主要是介绍EMQ共享订阅,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1 共享订阅

多个客户端订阅了同一个主题,发布者发布主题时,每个客户端都会同时收到这个主题的消息。在客户端集群部署的场景下会出现消息重复处理的问题。
EMQ支持共享订阅,多个客户端订阅了同一个主题,发布者发布主题时,只有其中一个客户端接收到消息。
共享订阅有两种方式:
(1)共享订阅:订阅前缀$queue/
多个客户端订阅了$queue/topic,发布者发布到topic,则只有一个客户端会接收到消息。
(2)分组订阅:订阅前缀$share/<group>/
多组客户端订阅了$queue/group1/topic、$queue/group2/topic...,发布者发布到topic,则消息会发布到每个group中,但是每个group中只有一个客户端会接收到消息。

2 Java客户端实现共享订阅

开发时发现,使用eclipse paho java客户端时,无法处理共享订阅。订阅$queue/topic能够订阅成功,并且跟踪代码能看到emq也把消息转发到了客户端,但是客户端丢弃掉了。
解决方法就是重写mqtt的回调函数,实现MqttCallback接口。

 

实现MqttCallback接口的代码如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

package com.emqtest.emqtest;

 

 

import java.util.HashMap;

import java.util.Map;

 

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;

import org.eclipse.paho.client.mqttv3.IMqttMessageListener;

import org.eclipse.paho.client.mqttv3.MqttCallback;

import org.eclipse.paho.client.mqttv3.MqttMessage;

import org.eclipse.paho.client.mqttv3.MqttTopic;

 

public class SharedSubCallbackRouter implements MqttCallback {

    private Map<String, IMqttMessageListener> topicFilterListeners;

 

    public SharedSubCallbackRouter(Map<String, IMqttMessageListener> topicFilterListeners) {

        this.topicFilterListeners = topicFilterListeners;

    }

 

    public void addSubscriber(String topicFilter, IMqttMessageListener listener) {

        if (this.topicFilterListeners == null) {

             this.topicFilterListeners = new HashMap<>();

        }

        this.topicFilterListeners.put(topicFilter, listener);

    }

 

    @Override

    public void connectionLost(Throwable cause) {

 

    }

 

    @Override

    public void messageArrived(String topic, MqttMessage message) throws Exception {

        for (Map.Entry<String, IMqttMessageListener> listenerEntry : topicFilterListeners.entrySet()) {

            String topicFilter = listenerEntry.getKey();

            if (isMatched(topicFilter, topic)) {

                listenerEntry.getValue().messageArrived(topic, message);

            }

        }

    }

 

    @Override

    public void deliveryComplete(IMqttDeliveryToken token) {

 

    }

 

    /**

     * Paho topic matcher does not work with shared subscription topic filter of emqttd

     * https://github.com/eclipse/paho.mqtt.java/issues/367#issuecomment-300100385

     * <p>

     * http://emqtt.io/docs/v2/advanced.html#shared-subscription

     *

     * @param topicFilter the topicFilter for mqtt

     * @param topic       the topic

     * @return boolean for matched

     */

    private boolean isMatched(String topicFilter, String topic) {

        if (topicFilter.startsWith("$queue/")) {

            topicFilter = topicFilter.replaceFirst("\\$queue/""");

        else if (topicFilter.startsWith("$share/")) {

            topicFilter = topicFilter.replaceFirst("\\$share/""");

            topicFilter = topicFilter.substring(topicFilter.indexOf('/'));

        }

        return MqttTopic.isMatched(topicFilter, topic);

    }

}

 

创建emq连接代码如下:

1

2

3

4

5

6

7

mqttClient = new MqttClient("tcp://localhost:1883""MqttClient");

mqttClient.connect();

Map<String, IMqttMessageListener> listeners = new HashMap<>();

IMqttMessageListener emqListener = new EmqListener();

listeners.put("$queue/testmqtt", emqListener);

mqttClient.setCallback(new SharedSubCallbackRouter(listeners));

mqttClient.subscribe("$queue/testmqtt"new EmqListener());

 

还要再写一个实现IMqttMessageListener接口的Emq消息处理类:

1

2

3

4

5

6

7

8

9

10

11

12

@Component

public class EmqListener implements IMqttMessageListener {

 

  @Override

  public void messageArrived(String topic, MqttMessage message) throws Exception {

    try {

      System.out.println("topic: " + topic);

    catch (Exception e) {

      e.printStackTrace();

    }

  }

}

 

 

 

 

参考链接:

1 emq的github上关于这个问题的讨论:
https://github.com/emqx/emqx/issues/921#event-1023359646
2 网上有人给的一个解决方法示例代码:
https://github.com/yogin16/paho-shared-sub-example
3 eclipse paho的github链接:
https://github.com/eclipse/paho.mqtt.java

这篇关于EMQ共享订阅的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/961042

相关文章

Android实现两台手机屏幕共享和远程控制功能

《Android实现两台手机屏幕共享和远程控制功能》在远程协助、在线教学、技术支持等多种场景下,实时获得另一部移动设备的屏幕画面,并对其进行操作,具有极高的应用价值,本项目旨在实现两台Android手... 目录一、项目概述二、相关知识2.1 MediaProjection API2.2 Socket 网络

Linux samba共享慢的原因及解决方案

《Linuxsamba共享慢的原因及解决方案》:本文主要介绍Linuxsamba共享慢的原因及解决方案,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录linux samba共享慢原因及解决问题表现原因解决办法总结Linandroidux samba共享慢原因及解决

java父子线程之间实现共享传递数据

《java父子线程之间实现共享传递数据》本文介绍了Java中父子线程间共享传递数据的几种方法,包括ThreadLocal变量、并发集合和内存队列或消息队列,并提醒注意并发安全问题... 目录通过 ThreadLocal 变量共享数据通过并发集合共享数据通过内存队列或消息队列共享数据注意并发安全问题总结在 J

NFS实现多服务器文件的共享的方法步骤

《NFS实现多服务器文件的共享的方法步骤》NFS允许网络中的计算机之间共享资源,客户端可以透明地读写远端NFS服务器上的文件,本文就来介绍一下NFS实现多服务器文件的共享的方法步骤,感兴趣的可以了解一... 目录一、简介二、部署1、准备1、服务端和客户端:安装nfs-utils2、服务端:创建共享目录3、服

使用Nginx来共享文件的详细教程

《使用Nginx来共享文件的详细教程》有时我们想共享电脑上的某些文件,一个比较方便的做法是,开一个HTTP服务,指向文件所在的目录,这次我们用nginx来实现这个需求,本文将通过代码示例一步步教你使用... 在本教程中,我们将向您展示如何使用开源 Web 服务器 Nginx 设置文件共享服务器步骤 0 —

Python使用pysmb库访问Windows共享文件夹的详细教程

《Python使用pysmb库访问Windows共享文件夹的详细教程》本教程旨在帮助您使用pysmb库,通过SMB(ServerMessageBlock)协议,轻松连接到Windows共享文件夹,并列... 目录前置条件步骤一:导入必要的模块步骤二:配置连接参数步骤三:实例化SMB连接对象并尝试连接步骤四:

Linux使用粘滞位 (t-bit)共享文件的方法教程

《Linux使用粘滞位(t-bit)共享文件的方法教程》在Linux系统中,共享文件是日常管理和协作中的常见任务,而粘滞位(StickyBit或t-bit)是实现共享目录安全性的重要工具之一,本文将... 目录文件共享的常见场景基础概念linux 文件权限粘滞位 (Sticky Bit)设置共享目录并配置粘

怎么让1台电脑共享给7人同时流畅设计

在当今的创意设计与数字内容生产领域,图形工作站以其强大的计算能力、专业的图形处理能力和稳定的系统性能,成为了众多设计师、动画师、视频编辑师等创意工作者的必备工具。 设计团队面临资源有限,比如只有一台高性能电脑时,如何高效地让七人同时流畅地进行设计工作,便成为了一个亟待解决的问题。 一、硬件升级与配置 1.高性能处理器(CPU):选择多核、高线程的处理器,例如Intel的至强系列或AMD的Ry

# VMware 共享文件

VMware tools快速安装 VMware 提供了 open-vm-tools,这是 VMware 官方推荐的开源工具包,通常不需要手动安装 VMware Tools,因为大多数 Linux 发行版(包括 Ubuntu、CentOS 等)都包含了 open-vm-tools,并且已经优化以提供与 VMware 环境的兼容性和功能支持。 建议按照以下步骤安装 open-vm-tools 而不

未来工作趋势:零工小程序在共享经济中的作用

经济在不断发展的同时,科技也在飞速发展。零工经济作为一种新兴的工作模式,正在全球范围内迅速崛起。特别是在中国,随着数字经济的蓬勃发展和共享经济模式的深入推广,零工小程序在促进就业、提升资源利用效率方面显示出了巨大的潜力和价值。 一、零工经济的定义及现状 零工经济是指通过临时性、自由职业或项目制的工作形式,利用互联网平台快速匹配供需双方的新型经济模式。这种模式打破了传统全职工作的界限,为劳动