文章目录

一、MQTT主题和主题过滤器匹配需求

MQTT协议规定,客户端可以订阅(Subscribe)主题过滤器(TopicFilter),主题过滤器可能为:

  • 完全精确的主题过滤器。例如:abc/def/123
  • 含有单层通配符(+,SINGLE)的主题过滤器。例如:abc/+/123
  • 含有多层通配符(#,MULTI)的主题过滤器。例如:abc/#

同时,客户端还可以向某个主题(Topic)发布(Publish)消息,比如abc/def/123,主题不能含有通配符。MQTT Broker需要查询有哪些客户端订阅了匹配这个主题的主题过滤器,并将消息发送给它们。例如下图所示情况:

客户端X是发布者,将数据发布到主题"abc/def/123";客户端A、B、C、D都是订阅者,但只有A、B、C订阅的主题过滤器和该主题匹配:

  • A订阅"abc/+/123","+"匹配了单层任意字符"def";
  • B订阅"abc/#","#"匹配了多层任意字符"def/123";
  • C订阅"abc/def/123",完全精确地匹配。

那么,MQTT消息代理(MQTT Broker)应该如何做“主题”到“主题过滤器”的匹配呢?很容易想到一种基础实现——“暴力扫描”。

算法1-1 暴力扫描伪代码 
/**
* 查询Topic匹配的订阅者
**/
FUNCTION Set findSubscriber(subscriptionArray, publishTopic)
    // 订阅者
    subscribers = []
    // 发布主题token数组
    topicTokenArray = split(publishTopic,"/")
    // 遍历所有订阅
    FOR i = 0 to size(subscriptionArray)
        // 订阅主题过滤器token数组
        topicFilterTokenArray = split(subscriptionArray[i].topicFilter,"/")
        //逐个比对是否匹配
        IF matchTopic(topicTokenArray, TopicFilterTokenArray)
            subscribers.add(subscriptionArray[i])
        END
    END
    RETURN subscribers
END

/**
* 匹配主题和主题过滤器
**/
FUNCTION boolean matchTopic(topicTokenArray, topicFilterTokenArray)
    topicSize = size(topicTokenArray)
    topicFilterSize = size(topicFilterTokenArray)
    // 遍历所有主题token,逐个token比对
    FOR i = 0 to topicSize
        // 如果filter是单层通配符,或者token相同,则匹配
        IF topicTokenFilterArray[i] == "+" || topicTokenFilterArray[i] == topicTokenArray[i]
            CONTINUE
        // 如果filter是多层通配符且在最后,则完全匹配
        ELSEIF topicTokenFilterArray[i] == "#" && i == topicFilterSize
            RETURN true
        // 不匹配,直接返回
        ELSE
            RETURN false
    END
    // 如果遍历完成,说明topic过了一遍都匹配,则当二者长度相等时,说明完全匹配
    IF (topicSize == topicFilterSize)
        RETURN true
    ELSE
        RETURN false
    END
END

很容易看出,两层for循环,时间复杂度O(S✖T),其中S是订阅信息数量(Subscription,不是订阅者Subscriber数量),T是发布主题的平均长度(划分成token的平均个数)。

仔细想一想,虽然实现了功能,但是在生产环境上,假设一个Broker连接2W设备,每个设备订阅10个TopicFilter,发布主题的平均长度约为10,执行一次匹配,要执行200W次token比较,要知道这还没有计算每秒钟客户端会发送多少条消息,这个时延都已经完全无法接受。

二、订阅树简介

2.1 订阅树的定义

订阅树(Topic Tree / Subscription Tree)是解决这个匹配问题的一种方法。TopicTree本质是一棵按token划分的字典树(Trie Tree),每个节点上存储着订阅信息,一棵典型的订阅树如下图所示:

订阅树有如下性质:

(1)有一个根节点Root,一个父节点可以有多个子节点

(2)每条边代表一个Token,从根节点遍历到子节点所经历的路径定义为TopicFilter

(3)任何节点的内容都可能为空,也可能包含订阅信息

例如,这棵订阅树有一个根节点Root,每条边上都有一个token。从根节点遍历到最左边的叶子节点经历的路径“abc/+/123”就是一个TopicFilter。叶子节点包含了一条订阅信息{客户端A以QoS0的质量订阅了abc/+/123}。并且中间的节点可能为空,也可能包含订阅信息;叶子节点可能包含订阅信息,也可能为空。

2.2 订阅树的匹配过程

有了订阅树的数据结构,如何匹配Topic和TopicFilter呢?仍以这棵树为例,我们给节点加上编号:

其中,订阅信息为:

 节点编号  订阅客户端  订阅主题  订阅质量 
 N5 abc/+/123 0
 N3 abc/# 1
 N3 abc/# 0
 N4 abc/def 0
 N6B abc/def/123 0
 N6 abc/def/123 1
 N7 D abc/def/456 0

整个匹配过程是宽度优先搜索(BFS)或深度优先搜索(DFS)的过程,这里按DFS描述,假设客户端X发布主题为“abc/def/123”,从根节点开始遍历:

(1)进入Root,查找匹配的边,只有一条边abc,且和第一个token“abc”匹配,进入子节点N1

(2)查找匹配的边,找到三条边“+”、“#”、“def”

(3.1.1)进入子节点N2,查找匹配的边,找到匹配边“123”,进入N5

(3.1.2)进入N5,匹配完成,返回订阅信息{A,"abc/+/123",0}

(3.2.1)进入子节点N3,匹配完成,返回订阅信息{B,“abc/#”,1}和订阅信息{A,“abc/#”,0}

(3.3.1)进入子节点N4,查找匹配的边,只有一条边“123”,进入N6

(3.3.2)进入子节点N6,匹配完成,返回订阅信息{B,“abc/def/123”,0}和订阅信息{C,“abc/def/123”,1}

(4)回退到Root,查找结束

我们就得到了匹配这个主题的5条订阅信息:

 节点编号  订阅客户端  订阅主题  订阅质量 
 N5 abc/+/123 0
 N3 abc/# 1
 N3 abc/# 0
 N6B abc/def/123 0
 N6 abc/def/123 1

注意,在这里A订阅的两个主题“abc/+/123”、"abc/#"都成功匹配,但下发给客户端A的时候要去重,只能下发1条消息,因为Broker是依赖MQTT-Publish给客户端下发消息的,发布的主题就是客户端X发布的主题“abc/+/123”,区分不出是客户端哪条订阅匹配的,所以发过去由客户端自己本地匹配出2条订阅(客户端本地也有订阅结构)。回头来看,客户端A这种“重复主题过滤器”的设计是不好的,在实际应用上应该极力避免这种设计,一个客户端的各个订阅都最好都是独立接受消息的。

如果区分出通配符和非通配符,那么查找的时间复杂度就是字典树的查找时间复杂度,为O(T),其中T是发布主题的平均长度(划分成token的平均个数)。

2.3 增加订阅

(1)增加订阅时部分节点不存在

当增加新订阅时,对于划分出来的token,订阅树可能会有一部分节点/边已经存在,有一部分节点/边不存在,那么就需要增加不存在的新节点/边。

如图所示,客户端F以QoS0的质量订阅新主题过滤器“abc/ghi/789”,这个时候“abc”是存在的,“ghi”、"789"都是不存在的,那么我们需要增加两个新节点和两条新边,并在最下面的节点放置F的这条订阅信息。

(2)添加订阅时节点已存在

如果节点已经存在,只需要增加订阅信息到对应节点中去即可。

如图所示,左侧为客户端F订阅“abc/+”,节点都已存在,将F加入到空节点中;

右侧为客户端F订阅“abc/+/123”,节点都已存在,A所在节点增加一条订阅信息F。

插入的时间复杂度同样为O(T),其中T是发布主题的平均长度(划分成token的平均个数)。

2.4 取消订阅

(1)取消订阅后没有订阅者

如果取消订阅后没有订阅者,可以不用管(可能存在内存溢出的风险),也可以通过一些方式删除该节点。

如图所示,假设客户端A取消了订阅“abc/+/123”,节点没有任何其他订阅信息了,那么该节点(灰色节点)可以被删除掉。

(2)取消订阅后仍然有订阅者

如果取消订阅后仍然有订阅者,那么只需要移除该订阅信息即可。

如图所示,假设客户端C取消了订阅“abc/def/123”,客户端B仍然存在订阅“abc/def/123”,那么只需要去掉此节点上C的订阅信息。

三、订阅树实现难点和业界解决方案

3.1 订阅树实现难点

(1)MQTT客户端可能并发地订阅、取消订阅,如何保证并发安全?

例如:A订阅abc/+的时候,B也订阅、取消订阅abc/+,需要保证并发安全。

(2)订阅树是内存结构,在分布式环境下如何同步?

例如:如下订阅、发布情况,Broker2怎么知道Broker1上有客户端订阅了这个主题?

(3)MQTT客户端订阅数据量非常大的时候,如何加快查找速度?

例如:10W客户端,每个客户端40个订阅,如果采用锁的机制控制并发,订阅/取消订阅会产生大量冲突造成查询获锁缓慢。

3.2 业界订阅树的实现

(1)并发安全

  • EMQ:底层采用Erlang的Mnesia分布式数据库,利用分布式数据库的事务保证并发 —— Erlang语言开发者很少
  • JMQTT:采用锁的机制控制并发  
  • Moquette:利用CAS控制并发 
  • Mosquitto:采用锁的机制控制并发 
  • MqttWk:无内存订阅树结构,订阅存放在分布式Redis里(通配符直接扫描),由Redis保证并发 —— 通配符扫描效率极低
  • 知乎网关:采用锁的机制控制并发(单Broker2~3万客户端、2~3万订阅量)

(2)分布式同步

  • EMQ:底层采用Erlang的Mnesia分布式数据库保证同步 —— Erlang语言开发者很少
  • JMQTT 1.x:单机,无同步
  • Moquette 0.10:采用Hazelcast,靠订阅数据全量广播来同步 ——量太大且不可靠(同时宕机数据丢失),水平扩展无法扩展消费速率
  • Moquette 0.12+:单机,无同步
  • Mosquitto:可通过“桥接”搭建伪分布式,靠订阅数据全量广播来同步 ——量太大且不可靠 ,水平扩展无法扩展消费速率
  • MqttWk:无内存订阅树结构,订阅存放在分布式Redis里(通配符直接扫描),不需要同步
  • 百度IoT:由Kafka存储订阅数据,靠读取Kafka同步 —— 比较好的实现机制
  • HiveMQ:自制向量时钟组件,可以分析本地订阅树数据之间的差异,通过Jgroups将差异部分协商同步 —— 代码不开源,自己实现困难 
  • 某车联网代码:Broker自己作为MQTT客户端连接到其他Broker上,当真实的客户端在自己节点订阅时,自己再发出一个订阅请求到所有其他Broker节点 —— 类似桥接模式,非标准分布式模式,且订阅数量较少、客户端较少(一台5万连接)

(3)查找速度

  • EMQ:底层采用Erlang的Mnesia分布式数据库保证速度 —— Erlang语言开发者很少
  • JMQTT:无优化 
  • Moquette 0.10:无优化 
  • Moquette 0.12+:无优化
  • Mosquitto:无优化 修改版
  • Mosquitto:分离精确订阅和通配符订阅,精确订阅采用Hash表,通配符订阅采用订阅树 —— 比较好的优化方向
  • MqttWk:无优化 
  • 知乎网关:根据clientId进行Hash,划分到100+个哈希表空间(没有通配符所以没用订阅树) —— 比较好的优化方向

总结

MQTT的订阅匹配最好这样处理:

  • 订阅树 —— 负责通配符的匹配,查找速率O(T),需要注意控制树的深度宽度、取消订阅的内存回收
  • 哈希表 —— 负责精确匹配,查找速率O(1),需要注意并发冲突

这是带通配符字符串匹配的常见解决方案——精确用表、前缀用树。具体判断用什么数据结构,主要依靠如下三点衡量:

  • 数据总量有多大 —— 数据量较大,尽量不用树。
  • 数据是否持续增长 —— 连接量有上限、订阅量有上限,所以订阅数据有上限,如果没有上限的话,重哈希会比较耗时。
  • 查找是否频繁 —— 物联网消息非常多,非常频繁,因此能用哈希表就用哈希表。

订阅树的原理很简单,但是上了并发和分布式就会有很多问题,这些解决方案都需要压力测试和生产环境检验。


转载请注明出处http://www.bewindoweb.com/268.html | 三颗豆子
分享许可方式知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议
重大发现:转载注明原文网址的同学刚买了彩票就中奖,刚写完代码就跑通,刚转身就遇到了真爱。
你可能还会喜欢
具体问题具体杠