文章目录

前言

基于MqttWk v1.0.7。  

MqttWk是我见过最清晰、代码量最少的Broker了,分析其源码有利于初步了解。

一、代码结构分析

1、整体架构

mqtt-auth:验证权限方面的代码

mqtt-broker:Broker核心代码

mqtt-common:抽象出来的持久化接口

mqtt-store:持久化接口的Redis、Kafka实现

mqtt-zoo:简单测试代码

2、mqtt-auth代码结构

验证服务的实现,还有一些验证工具类

3、mqtt-broker核心代码结构

cluster:集群通信的实现,用的是Redis的发布订阅作消息总线

codec:基于Websocket的MQTT通信需要特殊的编解码器,这里是为websocket写的编解码器

config:Broker参数配置

handler:Broker是基于netty做网络通信的,所以这里是netty连接的核心handler

internal:内部通信代码(Redis集群通信和kafka转发)

protocol:MQTT协议逻辑核心代码

server:启动MQTT服务器的代码,主要是MQTT over websocket、MQTT TCP、链路加密SSL等分别启动在不同端口提供服务

service:Kafka转发的实现,就是一个Producer

MainLauncher:启动器,基于nutzboot,非常简洁,和Spring的Application启动器一样

resources:密钥,参数配置。这里的参数配置是直接properties文件配置,config里的参数配置是基于类的配置,都和Spring的概念一样。

protocol:在不同MQTT包发过来的时候,Broker应该做的逻辑操作

4、mqtt-common

auth:验证接口

message | DupPublish:QoS1/QoS2发布的缓存消息

message | DupPubRel:QoS2第二阶段的消息

message | MessageId:分布式的MessageId生成。这里做得不太好,所有的messageId都用这个,甚至没有区分不同客户端,更不用说入口和出口分开了。

message | RetainMessage:保留消息的存储接口

session:会话信息存储,包括clientID、遗嘱消息等等

subscribe:订阅信息,实际上是一棵构建于Redis之上的订阅树

5、mqtt-store

cache:Redis的增删改查实现,值得注意的是将精确订阅主题和通配符订阅主题分开存储了

kafka:分区的方式,这里采用按Topic分区,可以分得更均匀,避免热点问题。

message:消息存储的实现

session:会话存储的实现

starter:初始化Kafka存储

subscribe:订阅信息存储

storeutil:序列化、反序列化工具。因为netty默认的mqtt数据结构没有继承Serializable,序列化会出问题,所以作者自己写了一个。

6、mqtt-zoo

keystore:密钥存储,用于验证

mqtt-test-kafka:kafka测试

mqtt-test-websocket:websocket连接测试

二、初步运行

1、安装好本地的单机版Redis,推荐使用Redis Desktop Manager来可视化Redis。

2、配置broker-application.properties中的redis

redis.host=127.0.0.1
redis.port=6379
redis.mode=normal

3、运行MainLauncher.java

nutzboot会打印参数,这是我最喜欢的一点。

4、用mqtt-spy测试

注意端口默认8885而不是1883,可以在application.properties里面配置。

5、观察Redis存储的内容

都是JSON格式的,因为作者使用的FastJson存储。


转载请注明出处http://www.bewindoweb.com/247.html | 三颗豆子
分享许可方式知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议
重大发现:转载注明原文网址的同学刚买了彩票就中奖,刚写完代码就跑通,刚转身就遇到了真爱。
你可能还会喜欢
具体问题具体杠