文章目录

一、M10 简介

Moquette 0.10(以下简称M10)的订阅树实现在moquette-0.10/broker/src/main/java/io/moquette/spi/impl/subscriptions,包括:

  • SubscriptionsDirectory:订阅树的增删改查
  • Token:片段,由topicFilter按“/”分割而成的最小字符串单位
  • Topic:主题/主题过滤器,包含一系列方法,比如比较主题和主题过滤器是否匹配
  • Subscription:订阅信息,{clientId, topicFilter,qos,active}
  • TreeNode:树节点

查看源码的时候应该从SubscriptionsDirectory开始阅读。

二、M10 订阅树表示

假设有如下订阅信息:

 节点编号  订阅客户端  订阅主题  订阅质量 
 N5 abc/+/123 0
 N3 abc/# 1
 N3 abc/# 0
 N4 abc/def 0
 N6B abc/def/123 0
 N6 abc/def/123 1
 N7 D abc/def/456 0

则可以构建一棵典型的M10订阅树:

 变量名  数据类型  数据含义 
 m_token  Token 该节点代表片段
 m_children List<TreeNode> 子节点
 m_subscriptions Set<ClientTopicCouple>[1] 该节点订阅信息集合
 subtreeSubscriptions Integer 子树订阅信息数量和[2]

[1]  ClientTopicCouple是{clientId,topicFilter},相比Subscription少了一些字段,减少内存占用。 

[2] 不是子节点数量之和,而是本节点订阅数量与每个子节点下面的订阅数量之和(递归遍历)

可以看到,M10订阅树的信息全在节点内(因为并没有所谓“边”的数据结构)。

三、M10 数据结构

3.1 节点 TreeNode

(1)数据结构:TreeNode

(2)属性值

 数据名  数据类型  数据含义 
 m_token Token 本节点片段,例如“abc”
 m_children List<TreeNode> 子节点
 m_subscription Set<ClientTopicCouple> 本节点订阅信息,例如{client123,"+"}

(3)重要方法

  • copy:浅拷贝,新建一个TreeNode,拷贝TreeNode的所有属性,由于m_children和m_subscription是引用,这里直接拷贝了引用。
  • childWithToken:查询所有的子节点,找到其中具有token片段的节点,如果找不到,返回null
  • updateChild:移除旧子节点,增加新子节点
  • remove:移除本节点上特定的订阅信息
  • matches:查询树中所有匹配某个主题Token的订阅信息。 该方法会递归比对每个Token是否和该节点上所有订阅信息的TopicToken匹配,如果匹配,加入结果。

3.2 主题(主题过滤器)Topic

(1)数据结构:Topic

(2)属性值

 数据名  数据类型  数据含义 
 topic String 主题或主题过滤器
 tokens transient List<Token> 划分的片段(不会被序列化)
 valid transient boolean 是否是合法的主题(不会被序列化)

(3)重要方法

  • getTokens:生成Token列表,只会生成1次,且会判断主题是否合法,比如“/abc/#/123”是非法的 
  • isValid:主题是否合法 
  • match:返回是否匹配某个订阅主题。要求本主题不含通配符,被匹配的主题可以含通配符。
  • asTopic:根据String生成Topic对象

3.3 片段 Token

(1)数据结构:Token

(2)属性值

 数据名  数据类型  数据含义 
 name String 片段内容

(3)重要方法

  • match:两个Token是否匹配。要求本Token可以含通配符,被比较的Token不能含通配符。

3.4 简要订阅信息 ClientTopicCouple

(1)数据结构:ClientTopicCouple

(2)属性值

 数据名  数据类型  数据含义 
 topicFilter

Topic

 主题过滤器
 clientID String MQTT协议的clientID,客户端ID

3.5 订阅信息 Subscription

(1)数据结构:Subscription

(2)属性值

 数据名  数据类型  数据含义 
 topicFilter

 Topic

 主题过滤器
 clientID String MQTT协议的clientID,客户端
 requestQos MQTTQoS 订阅质量
 active boolean 订阅是否有效,暂未使用这个字段

3.6 订阅树 SubscriptionsDirectory

(1)数据结构:SubscriptionsDirectory

(2)属性值

 数据名  数据类型  数据含义 
 subscriptions AtomicReference<TreeNode> 根节点

(3)重要方法

  • init:从持久化存储里初始化订阅信息
  • add:新增订阅
  • removeSubscription:删除一个订阅
  • removeForClient:移除一个客户端的所有订阅
  • matches:查询一个发布主题(不含通配符)匹配的订阅信息

四、M10 匹配订阅信息过程

M10的订阅匹配实现是DFS的:

算法4-1 递归匹配 
FUNCTION void matches(Queue<Token> tokens, List<ClientTopicCouple> matchingSubs)
    // 弹出队列头片段
    t = tokens.poll()
    // 如果已经匹配完了
    IF t == null
       // 说明完全匹配,加入所有的订阅信息
       matchingSubs.addAll(m_subscriptions)
      // 子节点如果含有通配符,也加入子节点订阅信息,因为通配符可以匹配空层,比如“/abc”是能够匹配上“/abc/#”的
       FOR childNode IN m_children
           IF childNode.m_token == "#" || childNode.m_token == "+"
               matchingSubs.addAll(childNode.m_subscriptions) 
           END
       END
       RETURN
    END


    // 如果本层token是通配符(对于订阅Topic来说不可能含有通配符),则加入所有订阅信息
    IF m_token == "#"
        matchingSubs.addAll(m_subscriptions) 
        RETURN
    END


    // 遍历子节点,是否有匹配的,如果匹配,递归调用
    FOR childNode in m_children
        IF childNode.m_token matches t
            matches(tokens, mathingSubs)
        END
    END
END

例如,我们仍然保持前面的订阅,订阅树也一样:

客户端X向主题abc/def/123发布消息,从根节点开始遍历:

(1)进入Root,弹出第一个token“abc”,查找子节点发现N1的m_token"abc"匹配

(2)进入N1,弹出第二个token“def”,发现N2的m_token“+”匹配,N3的m_token“#”也匹配,N4的m_token“def”也匹配

(3.1.1)进入子节点N2,弹出第三个token“123”,发现N5的m_token"123"匹配

(3.1.2)进入子节点N5,队列空,匹配完成,将N5的m_subscriptions[{A, abc/+/123}]加入订阅信息集合

(3.2.1)返回节点N1,进入子节点N3,队列空,匹配完成,将N3的订阅信息{A,“abc/#”}和订阅信息{B,“abc/#”}加入集合

(3.3.1)返回节点N1,进入子节点N4,发现子节点N6的m_token"123"匹配

(3.3.2)进入子节点N6,队列空,将N6的订阅信息[{B, abc/def/123},{C, abc/def/123}]加入集合

(4)返回N1,返回Root,查找结束

这样就成功匹配到了5条订阅信息:

 节点编号  订阅客户端  订阅主题  订阅质量 
 N5 abc/+/123 0
 N3 abc/# 1
 N3 abc/# 0
 N6B abc/def/123 0
 N6 abc/def/123 1

五、M10增加订阅

5.1 路径重建算法 RP

M10的增加订阅和取消订阅都会调用路径重建算法(recreatePath,RP)。在了解路径重建算法之前,需要知道M10订阅树一个性质:

【特性1】给定一个主题,一旦发现某个token之前没有建立过节点,其后的所有token都必将建立新节点

【解释】给定一个主题"abc/+/123/new1/new2/new3",前面的token"abc","+","123"由于A客户端的订阅已经构建了对应的节点TreeNode,发现其后的第一个没有构建过的token“new1”,则其后的“new2”、"new3"都肯定没有构建过。

【证明】假设后面的某个token Tn是以前建立过的,那么必然存在之前的某个订阅构建了从Root到Tn的完整路径,且当前主题S一定也遍历的这个路径(否则不会访问到Tn); 但现在已经在这条路径上遇到了一个新节点T0,矛盾,所以Tn必然不可能被建立过。  

RP算法如下:

算法5-2 路径重建recreatePath 
FUNCTION NodeCouple recreatePath(Topic topic, final TreeNode oldRoot)
    // 新建节点,浅拷贝子节点,增加两个指针
    newRoot = oldRoot.copy()
    parent = newRoot
    current = newRoot
    // 遍历片段
    For token IN Topic
       // 如果已经存在某个子节点具有这个token
       IF EXIST token IN children
           // 浅拷贝这个子节点,当前节点指针下移
           current = matchChild.copy()
           // 删掉旧子节点,增加新子节点
           parrent.update(matchChild, current)
           // 父节点指针下移
           parrent = current
       ELSE
           // 新建一个子节点
           matchChild = new TreeNode
           matchChild.m_token = token
           // 当前节点加入新子节点
           current.addChild(matchChild)
           // 当前节点指针下移
           current = matchChild
       END
    END
    
    RETURN NodeCouple(newRoot, current)
END

5.2 增加订阅过程

举一个详细的例子,假设现在客户端F产生了一个新订阅"abc/+/123/new1/new2/new3",我们看看是如何插入的:

(1)建立初始数据结构

浅拷贝的newRoot,两个指针P,C:

(2)发现“abc”有子节点,浅拷贝,指针下移

(3)发现“+”有子节点,浅拷贝,指针下移

(4)发现“123”有子节点,浅拷贝,指针下移

(5)发现“new1”没有节点了,增加节点

(6)同理new2、new3

(7)很明显,newRoot构建了一个新的树,包含了最新的订阅,current指向了创建的最后一个节点,我们更改颜色来更好地直观感受

5.3 增加订阅剩余工作

目前只做了路径重建,没有完成订阅,剩下三个工作:

(1)将订阅信息加入节点 有了current指针,就能够直接到达最后创建的节点,将订阅信息加入即可

(2)重新计算订阅数量subtreeSubscriptions 重新执行一次递归计算(耗时)

(3)替换旧树根节点 需要考虑并发,这里M10用的是经典的CAS,替换根节点,旧节点的内存回收由JAVA垃圾回收器来做。 

我们来看一下CAS的这段源码:

do {
            oldRoot = subscriptions.get();
            couple = recreatePath(newSubscription.topicFilter, oldRoot);
            couple.createdNode.addSubscription(newSubscription); //createdNode could be null?
            couple.root.recalculateSubscriptionsSize();
            //spin lock repeating till we can, swap root, if can't swap just re-do the operation
} while (!subscriptions.compareAndSet(oldRoot, couple.root));

当CAS替换不成功的时候(比如同时有并发的插入操作且它们执行成功导致根节点发生变化),while条件判断为true,会丢弃掉之前生成的节点,重新执行生成操作。利用这个思路可以写很多CAS的代码,例如:

do {
	num = myAtomNum.intValue();
	if (num >=  maxNum) {
		do something 1 ……
                return;
	}
 } while (!myAtomNum.compareAndSet(num, num + 1));
 
 do something 2 ……

myAtomNum是一个原子类型,我们希望当myAtomNum的值在达到maxNum后执行“do something 1”,而没有达到时正常执行“do something 2”。如果仅仅写一个if,就会有并发问题。如果像上面这样写,则保证了并发安全。

  • 假设maxNum=50,num=40,此时有2个并发线程进入,一个成功执行do something 2,一个失败循环后成功执行do something 2。
  • 假设maxNum=50,num=49,此时有2个并发线程进入,一个成功执行do something 2,一个失败循环后成功执行do something 1。
  • 假设maxNum=50,num=50,此时有2个并发线程进入,则都将执行do something 1。

六、M10 取消订阅

M10取消订阅同样会使用RP算法,找到这棵树的最后一个订阅节点(current指针),然后移除其中的订阅信息。

例如A取消订阅“abc/+/123”,如下图所示:

再进行和订阅同样的后续操作。

七、总结

可以发现M10的订阅树缺点:

(1)只要订阅过,哪怕取消订阅后,这个订阅的节点也不会删除

(2)无论通配符订阅还是非通配符订阅,都需要遍历一遍树结构。

(3)每次订阅、取消订阅,递归更新订阅数量的时间耗费是很大的,但是这个“统计订阅量”的需求并不需要实时,也不是频繁发生的


转载请注明出处http://www.bewindoweb.com/269.html | 三颗豆子
分享许可方式知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议
重大发现:转载注明原文网址的同学刚买了彩票就中奖,刚写完代码就跑通,刚转身就遇到了真爱。
你可能还会喜欢
具体问题具体杠