一、MQTT主题和主题过滤器匹配需求
MQTT协议规定,客户端可以订阅(Subscribe)主题过滤器(TopicFilter),主题过滤器可能为:
- 完全精确的主题过滤器。例如:abc/def/123
- 含有单层通配符(+,SINGLE)的主题过滤器。例如:abc/+/123
- 含有多层通配符(#,MULTI)的主题过滤器。例如:abc/#
同时,客户端还可以向某个主题(Topic)发布(Publish)消息,比如abc/def/123,主题不能含有通配符。MQTT Broker需要查询有哪些客户端订阅了匹配这个主题的主题过滤器,并将消息发送给它们。例如下图所示情况:
客户端X是发布者,将数据发布到主题"abc/def/123";客户端A、B、C、D都是订阅者,但只有A、B、C订阅的主题过滤器和该主题匹配:
- A订阅"abc/+/123","+"匹配了单层任意字符"def";
- B订阅"abc/#","#"匹配了多层任意字符"def/123";
- C订阅"abc/def/123",完全精确地匹配。
那么,MQTT消息代理(MQTT Broker)应该如何做“主题”到“主题过滤器”的匹配呢?很容易想到一种基础实现——“暴力扫描”。
算法1-1 暴力扫描伪代码
/**
* 查询Topic匹配的订阅者
**/
FUNCTION Set findSubscriber(subscriptionArray, publishTopic)
// 订阅者
subscribers = []
// 发布主题token数组
topicTokenArray = split(publishTopic,"/")
// 遍历所有订阅
FOR i = 0 to size(subscriptionArray)
// 订阅主题过滤器token数组
topicFilterTokenArray = split(subscriptionArray[i].topicFilter,"/")
//逐个比对是否匹配
IF matchTopic(topicTokenArray, TopicFilterTokenArray)
subscribers.add(subscriptionArray[i])
END
END
RETURN subscribers
END
/**
* 匹配主题和主题过滤器
**/
FUNCTION boolean matchTopic(topicTokenArray, topicFilterTokenArray)
topicSize = size(topicTokenArray)
topicFilterSize = size(topicFilterTokenArray)
// 遍历所有主题token,逐个token比对
FOR i = 0 to topicSize
// 如果filter是单层通配符,或者token相同,则匹配
IF topicTokenFilterArray[i] == "+" || topicTokenFilterArray[i] == topicTokenArray[i]
CONTINUE
// 如果filter是多层通配符且在最后,则完全匹配
ELSEIF topicTokenFilterArray[i] == "#" && i == topicFilterSize
RETURN true
// 不匹配,直接返回
ELSE
RETURN false
END
// 如果遍历完成,说明topic过了一遍都匹配,则当二者长度相等时,说明完全匹配
IF (topicSize == topicFilterSize)
RETURN true
ELSE
RETURN false
END
END
很容易看出,两层for循环,时间复杂度O(S✖T),其中S是订阅信息数量(Subscription,不是订阅者Subscriber数量),T是发布主题的平均长度(划分成token的平均个数)。
仔细想一想,虽然实现了功能,但是在生产环境上,假设一个Broker连接2W设备,每个设备订阅10个TopicFilter,发布主题的平均长度约为10,执行一次匹配,要执行200W次token比较,要知道这还没有计算每秒钟客户端会发送多少条消息,这个时延都已经完全无法接受。
二、订阅树简介
2.1 订阅树的定义
订阅树(Topic Tree / Subscription Tree)是解决这个匹配问题的一种方法。TopicTree本质是一棵按token划分的字典树(Trie Tree),每个节点上存储着订阅信息,一棵典型的订阅树如下图所示:
订阅树有如下性质:
(1)有一个根节点Root,一个父节点可以有多个子节点
(2)每条边代表一个Token,从根节点遍历到子节点所经历的路径定义为TopicFilter
(3)任何节点的内容都可能为空,也可能包含订阅信息
例如,这棵订阅树有一个根节点Root,每条边上都有一个token。从根节点遍历到最左边的叶子节点经历的路径“abc/+/123”就是一个TopicFilter。叶子节点包含了一条订阅信息{客户端A以QoS0的质量订阅了abc/+/123}。并且中间的节点可能为空,也可能包含订阅信息;叶子节点可能包含订阅信息,也可能为空。
2.2 订阅树的匹配过程
有了订阅树的数据结构,如何匹配Topic和TopicFilter呢?仍以这棵树为例,我们给节点加上编号:
其中,订阅信息为:
节点编号 | 订阅客户端 | 订阅主题 | 订阅质量 |
---|---|---|---|
N5 | A | abc/+/123 | 0 |
N3 | B | abc/# | 1 |
N3 | A | abc/# | 0 |
N4 | B | abc/def | 0 |
N6 | B | abc/def/123 | 0 |
N6 | C | abc/def/123 | 1 |
N7 | D | abc/def/456 | 0 |
整个匹配过程是宽度优先搜索(BFS)或深度优先搜索(DFS)的过程,这里按DFS描述,假设客户端X发布主题为“abc/def/123”,从根节点开始遍历:
(1)进入Root,查找匹配的边,只有一条边abc,且和第一个token“abc”匹配,进入子节点N1
(2)查找匹配的边,找到三条边“+”、“#”、“def”
(3.1.1)进入子节点N2,查找匹配的边,找到匹配边“123”,进入N5
(3.1.2)进入N5,匹配完成,返回订阅信息{A,"abc/+/123",0}
(3.2.1)进入子节点N3,匹配完成,返回订阅信息{B,“abc/#”,1}和订阅信息{A,“abc/#”,0}
(3.3.1)进入子节点N4,查找匹配的边,只有一条边“123”,进入N6
(3.3.2)进入子节点N6,匹配完成,返回订阅信息{B,“abc/def/123”,0}和订阅信息{C,“abc/def/123”,1}
(4)回退到Root,查找结束
我们就得到了匹配这个主题的5条订阅信息:
节点编号 | 订阅客户端 | 订阅主题 | 订阅质量 |
---|---|---|---|
N5 | A | abc/+/123 | 0 |
N3 | B | abc/# | 1 |
N3 | A | abc/# | 0 |
N6 | B | abc/def/123 | 0 |
N6 | C | abc/def/123 | 1 |
注意,在这里A订阅的两个主题“abc/+/123”、"abc/#"都成功匹配,但下发给客户端A的时候要去重,只能下发1条消息,因为Broker是依赖MQTT-Publish给客户端下发消息的,发布的主题就是客户端X发布的主题“abc/+/123”,区分不出是客户端哪条订阅匹配的,所以发过去由客户端自己本地匹配出2条订阅(客户端本地也有订阅结构)。回头来看,客户端A这种“重复主题过滤器”的设计是不好的,在实际应用上应该极力避免这种设计,一个客户端的各个订阅都最好都是独立接受消息的。
如果区分出通配符和非通配符,那么查找的时间复杂度就是字典树的查找时间复杂度,为O(T),其中T是发布主题的平均长度(划分成token的平均个数)。
2.3 增加订阅
(1)增加订阅时部分节点不存在
当增加新订阅时,对于划分出来的token,订阅树可能会有一部分节点/边已经存在,有一部分节点/边不存在,那么就需要增加不存在的新节点/边。
如图所示,客户端F以QoS0的质量订阅新主题过滤器“abc/ghi/789”,这个时候“abc”是存在的,“ghi”、"789"都是不存在的,那么我们需要增加两个新节点和两条新边,并在最下面的节点放置F的这条订阅信息。
(2)添加订阅时节点已存在
如果节点已经存在,只需要增加订阅信息到对应节点中去即可。
如图所示,左侧为客户端F订阅“abc/+”,节点都已存在,将F加入到空节点中;
右侧为客户端F订阅“abc/+/123”,节点都已存在,A所在节点增加一条订阅信息F。
插入的时间复杂度同样为O(T),其中T是发布主题的平均长度(划分成token的平均个数)。
2.4 取消订阅
(1)取消订阅后没有订阅者
如果取消订阅后没有订阅者,可以不用管(可能存在内存溢出的风险),也可以通过一些方式删除该节点。
如图所示,假设客户端A取消了订阅“abc/+/123”,节点没有任何其他订阅信息了,那么该节点(灰色节点)可以被删除掉。
(2)取消订阅后仍然有订阅者
如果取消订阅后仍然有订阅者,那么只需要移除该订阅信息即可。
如图所示,假设客户端C取消了订阅“abc/def/123”,客户端B仍然存在订阅“abc/def/123”,那么只需要去掉此节点上C的订阅信息。
三、订阅树实现难点和业界解决方案
3.1 订阅树实现难点
(1)MQTT客户端可能并发地订阅、取消订阅,如何保证并发安全?
例如:A订阅abc/+的时候,B也订阅、取消订阅abc/+,需要保证并发安全。
(2)订阅树是内存结构,在分布式环境下如何同步?
例如:如下订阅、发布情况,Broker2怎么知道Broker1上有客户端订阅了这个主题?
(3)MQTT客户端订阅数据量非常大的时候,如何加快查找速度?
例如:10W客户端,每个客户端40个订阅,如果采用锁的机制控制并发,订阅/取消订阅会产生大量冲突造成查询获锁缓慢。
3.2 业界订阅树的实现
(1)并发安全
- EMQ:底层采用Erlang的Mnesia分布式数据库,利用分布式数据库的事务保证并发 —— Erlang语言开发者很少
- JMQTT:采用锁的机制控制并发
- Moquette:利用CAS控制并发
- Mosquitto:采用锁的机制控制并发
- MqttWk:无内存订阅树结构,订阅存放在分布式Redis里(通配符直接扫描),由Redis保证并发 —— 通配符扫描效率极低
- 知乎网关:采用锁的机制控制并发(单Broker2~3万客户端、2~3万订阅量)
(2)分布式同步
- EMQ:底层采用Erlang的Mnesia分布式数据库保证同步 —— Erlang语言开发者很少
- JMQTT 1.x:单机,无同步
- Moquette 0.10:采用Hazelcast,靠订阅数据全量广播来同步 ——量太大且不可靠(同时宕机数据丢失),水平扩展无法扩展消费速率
- Moquette 0.12+:单机,无同步
- Mosquitto:可通过“桥接”搭建伪分布式,靠订阅数据全量广播来同步 ——量太大且不可靠 ,水平扩展无法扩展消费速率
- MqttWk:无内存订阅树结构,订阅存放在分布式Redis里(通配符直接扫描),不需要同步
- 百度IoT:由Kafka存储订阅数据,靠读取Kafka同步 —— 比较好的实现机制
- HiveMQ:自制向量时钟组件,可以分析本地订阅树数据之间的差异,通过Jgroups将差异部分协商同步 —— 代码不开源,自己实现困难
- 某车联网代码:Broker自己作为MQTT客户端连接到其他Broker上,当真实的客户端在自己节点订阅时,自己再发出一个订阅请求到所有其他Broker节点 —— 类似桥接模式,非标准分布式模式,且订阅数量较少、客户端较少(一台5万连接)
(3)查找速度
- EMQ:底层采用Erlang的Mnesia分布式数据库保证速度 —— Erlang语言开发者很少
- JMQTT:无优化
- Moquette 0.10:无优化
- Moquette 0.12+:无优化
- Mosquitto:无优化 修改版
- Mosquitto:分离精确订阅和通配符订阅,精确订阅采用Hash表,通配符订阅采用订阅树 —— 比较好的优化方向
- MqttWk:无优化
- 知乎网关:根据clientId进行Hash,划分到100+个哈希表空间(没有通配符所以没用订阅树) —— 比较好的优化方向
总结
MQTT的订阅匹配最好这样处理:
- 订阅树 —— 负责通配符的匹配,查找速率O(T),需要注意控制树的深度宽度、取消订阅的内存回收
- 哈希表 —— 负责精确匹配,查找速率O(1),需要注意并发冲突
这是带通配符字符串匹配的常见解决方案——精确用表、前缀用树。具体判断用什么数据结构,主要依靠如下三点衡量:
- 数据总量有多大 —— 数据量较大,尽量不用树。
- 数据是否持续增长 —— 连接量有上限、订阅量有上限,所以订阅数据有上限,如果没有上限的话,重哈希会比较耗时。
- 查找是否频繁 —— 物联网消息非常多,非常频繁,因此能用哈希表就用哈希表。
订阅树的原理很简单,但是上了并发和分布式就会有很多问题,这些解决方案都需要压力测试和生产环境检验。