文章目录

前言

基于MqttWkv1.0.7。

MqttWk没有构建内存中的订阅树,直接利用存储在Redis中的订阅信息来获取订阅者。

订阅树算法分析

主要的代码在cn.wizzer.iot.mqtt.server.store.subscribe.SubscribeStoreService:

@Override
    public List<SubscribeStore> search(String topic) {
        List<SubscribeStore> subscribeStores = new ArrayList<SubscribeStore>();
        List<SubscribeStore> list = subscribeNotWildcardCache.all(topic);
        if (list.size() > 0) {
            subscribeStores.addAll(list);
        }
        subscribeWildcardCache.all().forEach((topicFilter, map) -> {
            if (StrUtil.split(topic, '/').size() >= StrUtil.split(topicFilter, '/').size()) {
                List<String> splitTopics = StrUtil.split(topic, '/');//a
                List<String> spliteTopicFilters = StrUtil.split(topicFilter, '/');//#
                String newTopicFilter = "";
                for (int i = 0; i < spliteTopicFilters.size(); i++) {
                    String value = spliteTopicFilters.get(i);
                    if (value.equals("+")) {
                        newTopicFilter = newTopicFilter + "+/";
                    } else if (value.equals("#")) {
                        newTopicFilter = newTopicFilter + "#/";
                        break;
                    } else {
                        newTopicFilter = newTopicFilter + splitTopics.get(i) + "/";
                    }
                }
                newTopicFilter = StrUtil.removeSuffix(newTopicFilter, "/");
                if (topicFilter.equals(newTopicFilter)) {
                    Collection<SubscribeStore> collection = map.values();
                    List<SubscribeStore> list2 = new ArrayList<SubscribeStore>(collection);
                    subscribeStores.addAll(list2);
                }
            }
        });
        return subscribeStores;
    }

对于普通的精确订阅,直接根据主题名得到对应的订阅者。

对于通配符订阅,直接获取所有通配符订阅Topic和订阅者,一个一个比对是否匹配,匹配的话就把订阅者加进去。

普通精确订阅正是之前有人提出mosquitto的订阅树优化成Hash表的方法。我惊讶的是通配符订阅直接拉取了所有通配符Topic和订阅者,这个量不会巨大吗。后来仔细想想,设备端的订阅全都是以设备deviceId或者什么唯一标识符的精确订阅,根本不会订阅通配符主题,订阅通配符主题的只有后端服务,因为后端服务才会想要/+/connected这种全局的信息。后端服务总共就那么多,所以这样拉取也不会太多,一个一个遍历时长也能够接受。

同时采用Redis做集中存储的好处是,集群不需要同步内存中的订阅树,Redis的单线程保证了并发修改订阅信息不会出现问题。


转载请注明出处http://www.bewindoweb.com/251.html | 三颗豆子
分享许可方式知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议
重大发现:转载注明原文网址的同学刚买了彩票就中奖,刚写完代码就跑通,刚转身就遇到了真爱。
你可能还会喜欢
具体问题具体杠