文章目录

一、MQTT报文通信流程

要做消息代理,首先需要解析协议。我们从抽象层面去考虑协议,不去更深挖到底哪个比特组合代表什么含义,这样有利于理清逻辑,而且早就有很多组件可以直接使用,只需要True和False代表什么含义,不需要知道001和010代表什么含义。以MQTT 3.1.1为例来描述细节。

1、术语定义

1)客户端 Client

使用MQTT的程序或设备,它可以做的操作是:

  • 发布消息给其他客户端
  • 订阅主题
  • 取消订阅主题
  • 断开连接

2)服务端 Server

作为发送消息的客户端和请求订阅的客户端之间的中介,也就是MQTT Broker,它可以:

  • 接受网络连接
  • 接受客户端发布的应用消息
  • 处理客户端的订阅和取消订阅请求
  • 转发应用消息给符合条件的已订阅客户端

3)订阅 Subscription

包含一个主题过滤器(Topic Filter)、一个最大服务质量等级QoS。

4)主题名 Topic Name

附加在应用消息上的一个标签,也就是发布(Publish)消息所携带的主题名称。

5)主题过滤器 Topic Filter

订阅中包含的一个表达式,用于表示相关的一个或多个主题,可以使用通配符,也就是订阅(Subscribe)消息所携带的主题名称。

6)会话 Session

就和HTTP的会话概念一致,每一个会话里包含着客户端的信息、客户端订阅了哪些主题、以及一些保存的消息等等。

7)控制报文 MQTT Control Packet

MQTT的信息数据包,一共有14种。

2、MQTT通信图

客户端首先通过发包CONNECT、收包CONNACK来建立MQTT连接;然后通过发心跳包PINGREQ、收心跳包PINGRESP来建立心跳,维护通信链路。

连接建立后,可以通过发包SUBSCRIBE、收包SUBACK来订阅主题,通过发包UNSUBSCRIBE、收包UNSUBACK来取消订阅主题。

对于QoS0的消息,直接发送PUBLISH即可,不需要接受回包。

对于QoS1的消息,发包PUBLISH、收包PUBACK,才算是消息发送成功了。

对于QoS2的消息,发包PUBLISH、收包PUBREC;再次发包PUBREL,再次收包PUBCOMP,消息才发送成功。

二、Broker的任务

1、CONNECT/CONNACK 连接服务端

1)判断是否是合法的MQTT包

是否是MQTT协议:不是,断开连接。

是否是支持的MQTT版本:不支持,返回CONNACK[不支持的协议级别]报文,断开连接。

2)用户名和密码是否正确

用户名是否合法:UTF-8、字符串,不合法,返回CONNACK[无效的用户名或密码],断开连接。

密码是否合法:0~65535字节,不合法,返回CONNACK[无效的用户名或密码],断开连接。

用户名、密码是否正确:密码完全可以不用明文密码,采用JWT等验证方式的数据,不正确,返回CONNACK[未授权],断开连接。

3)审查连接情况

ClientID是否合法:必须不为空,必须是UTF-8编码、大小写字母和数字、1~23字节长(1~23个数字或字母),不合法,返回CONNACK[标识符不合格],断开连接。

【自定义】协议规定服务端可以自己设计ClientID长度、字符,可以是零字节。如果是零字节,则服务器需要辅助生成ClientID,并且只能是临时会话(Clean Session为1),因为断开后下一次服务器就不认识这个客户端了。

是否已有在线连接占用了这个ClientID:MQTT要求ClientID唯一,如果占用,则踢掉旧会话,维持这个新会话。

是否清理会话:

① 如果不清理会话(要求持久化会话,CleanSession为False),那么服务端必须在客户端离线之后保留客户端的会话状态,包括:

  • 客户端的订阅信息
  • 已经发送给客户端,还未确认的QoS1、QoS2消息
  • 离线接收的QoS1、QoS2消息
  • 已从客户端接收,还未确认的QoS2消息
  • 可选,准备发给客户端的QoS0消息

持久化Session虽然是持久化,也需要设置过期时间

② 如果清理会话(临时会话,CleanSession为True),则服务端丢弃之前的所有数据,并开始一个新的临时会话。

如果客户端想要清理之前的数据,重新开始一个新连接,那么需要先CleanSession=True连接一次,断开连接,再重新建立新连接。

4)是否有遗嘱消息

遗嘱主题是否合法:UTF-8,字符串

遗嘱消息是否合法:0~65535字节

遗嘱消息是否保留:如果设置了保留,服务端将为这个遗嘱主题保留这条消息。

遗嘱消息会在以下情况下分发:

  • 服务端检测到I/O错误或者网络故障
  • 客户端在保持连接(Keep Alive)的时间段内未能通讯
  • 客户端没发送DISCCONECT报文强行关闭了连接
  • 由于协议错误服务端主动关闭了连接

遗嘱消息会在以下情况下被清理:

  • 客户端发送了DISCONNECT报文正常退出(无论是否是持久会话)

5)保持心跳

Keep Alive声明了客户端想要保持的心跳间隔,以秒为单位,一共2个字节。一般是设置为60秒或几分钟,最大值为18小时12分15秒。

如果值非零,服务端如果在1.5倍KeepAlive时间内没有收到任何控制报文,则视为客户端离开,需要断开连接。

如果值为零,服务端不需要判断客户端是否活跃,但只要服务端认为不活跃,就可以主动断开连接。

需要核查客户端声明的心跳间隔,否则如果时间过短服务端承受不了那么大的压力。

6)返回CONNACK

设置Session Present:

如果CleanSession为0(要求持久化会话),并且服务端保存了旧会话的数据,那么必须将SessionPresent设置为1,表明服务端存在旧会话,方便客户端选择是否要使用这个旧会话。其余情况均设置SessionPresent为0。

返回报文:

除了明确的异常断开的情况,其他所有异常返回CONNACK[服务端不可用]。

如果连接被接受,则返回CONNACK[已接受]。

2、DISCONNECT 断开连接

1)丢弃遗嘱消息

2)断开网络连接

3、SUBSCRIBE/SUBACK 订阅主题

1)解析订阅请求

一个SUBSCRIBE报文可以包含一系列的主题过滤器,以及对应的QoS等级。

2)权限检查

如果客户端没有权限订阅这个主题过滤器,则需要标记订阅失败。

3)发布主题的保留消息

对于设置了保留消息的主题,如果任何的主题过滤器能够匹配上,都需要将保留消息发布给该客户端。

如果订阅的主题过滤器已经存在(可能QoS等级不同),那么用新的订阅替换旧订阅,并重发匹配主题的保留消息。  

4)回复SUBACK

如果SUBSCRIBE是多个主题过滤器,那么回复也必须在一个SUBACK报文中回复全部的主题过滤器订阅结果。

一个主题过滤器的订阅结果包含主题过滤器、QoS等级。如果服务端不支持这么高的服务质量,QoS等级回复支持的最大服务质量或者回复订阅失败。如果主题过滤器不合法,QoS等级则是订阅失败。

4、UNSUBSCRIBE/UNSUBACK 取消订阅

同样在一个报文可以包含希望取消的多个主题过滤器。在取消订阅后,已经在分发的消息仍然可以继续分发,但服务端不能在接受任何该主题的新消息了。

服务端是无法清空所有订阅的,如果要清空,需要客户端准确的取消订阅所有订阅主题,或者清除服务端的Session重新连接。

5、PUBLISH 发布消息

1)检查主题名是否正确

主题名只有一个,而且是精确的主题,不能包含任何通配符。

2)检查客户端是否有权限向某个主题发布消息

防止客户端非法地发布消息。

3)重发标志DUP

如果收到重发标志,则表明这是重发的PUBLISH报文。

唯一要求重发的场景是客户端设置CleanSession为0(持久化会话)重连,服务端需要重发未确认的PUBLISH报文和PUBREL报文。

这里有个疑问是,假设服务端发送了QoS1的PUBLISH报文,在一定时间内没有收到PUBACK报文,那么需要重新尝试分发吗?虽然底层是TCP保证了消息不会丢失,但可能客户端出了错或者发生了什么,导致没有回复PUBACK报文。我的理解是当然需要重发,因为在发布客户端那边,我们已经做完了QoS1通信,发布客户端会以为消息已经按QoS1送达了,而Broker转发给接受客户端这边,必须确保接收客户端收到了这条消息,所以一旦超时,必须要重发。

消息排序:重发任何之前的PUBLISH报文时,必须按原始PUBLISH报文的发送顺序重发。这里不是说要等待前一条消息确认后才能下发新一条消息,否则就变成同步了效率低下,这里是说当未收到ACK需要重发的时候,需要按照相同的顺序进行重发。例如发布1、2、3、4消息,订阅者收到的可能是1、2、3、2、3、4,后面的2、3是重发的报文,必须按照2、3的顺序,整个过程也并不是同步的而是并发的,否则应该是1、2、2、3、3、4了。

4)保留消息Retain

一个主题的保留消息只有一条。

作用:如果Retain为1(True),则服务端需要保存这条消息及其QoS,以便分发给未来的新订阅者。

设置:如果Retain为1,则服务端必须丢弃之前为这个主题的保留消息,将这条消息作为最新的保留消息保存。

清除:如果Retain为1且有效载荷为0字节,那么会被当做正常消息进行分发,并且该主题的保留消息会被移除,服务端不能存储零字节的保留消息。

下发:如果客户端的新订阅匹配了一条保留消息,那么服务端给它发送的PUBLISH报文的Retain标志必须置为1,以提醒客户端这是来自于主题的保留消息而非正常消息。  

5)主题名

主题名只能是精确的,不能包含通配符。那如果我们有需求是向“北京地区”的客户端分发消息应该怎么办呢?那就需要北京地区的客户端订阅通配符主题表明自己是北京地区的,而不是发送者向通配符主题发送消息。

如果客户端同时订阅了通配符和精确匹配的主题,那么匹配的通配符和精确匹配的主题过滤器都将会收到消息。例如订阅了/abc/#和/abc/123,当发布/abc/123的时候,两个主题都会收到消息,哪怕它们是一个客户端订阅的。  

① 主题由主题层级分隔符(Topic Level Separator)斜杠“/”进行分隔,两个斜杠可以相邻,表示一个零长度的主题层级,虽然不推荐这样做。

② 多层通配符(Mutil-Level wildcard)井号"#"可以匹配任意层级,但必须跟在主题层级分隔符之后,并且必须是最后一个字符。

【非法格式】

"sport/tennis#"是无效的,因为tennis和#处于了同一层级

"sport/tennis/#/ranking"是无效的,因为它不是最后一个字符。

"sport/tennis/#ranking"是无效的,因为它不是最后一个字符。  

【匹配策略】

如果客户端订阅了"sport/tennis/player1/#",那么:

"sport/tennis/player1"可以发布消息给它;

"sport/tennis/player1/ranking"可以发布消息给它;

"sport/tennis/player1/score/wimbledon"可以发布消息给它;

③ 单层通配符加号“+”可以匹配单层,可以放到任意位置,可以和多层通配符一起使用,但必须独占整个层级。

【非法格式】

"sport+"是无效的,因为没有独占整个层级。

【匹配策略】

如果客户端订阅了"sport/tennis/+",那么:

"sport/tennis/player1"可以发消息给它;

"sport/tennis/player1/ranking"不能发消息给它,因为多了一层。

  "sport/tennis/"可以发消息给它,斜杠后面没有数据视为零长度的层级。

④ $开头的主题

通配符(#或+)不能匹配以$开头的主题,例如发布消息到"$TEST/monitor",订阅"#"、"+/monitor"的主题将不会收到消息,只有“$TEST/#”等头部精确匹配的才会收到消息。

通常$SYS会被用作服务器内部的统计信息主题。

⑤ 主题格式规定

主题名至少包含1个字符(不能为空);

主题名大小写敏感;

主题名可以包含空格;

只包含一个“/”的主题名是合法的,含义为[空]/[空];

主题名是UTF-8字符串,不能超过65535个字节。

6)报文标识符

报文标识符只会在QoS1、QoS2出现,它用来标记一对完整的通信:

QoS1-PUBLISH / QoS1-PUBACK

QoS2-PUBLISH / QoS2- PUBREC

QoS2-PUBREL / QoS2-PUBCOMP

由于发送消息不是同步等待发完一条收到回包再发下一条的,而是同时发送多条消息同时等待回包,所以通信双方通过设定的报文标识符来知道这是哪一次通信,从而去做相应处理。

报文标识符范围是1~65535,注意是非零的。

7)QoS的具体流程

QoS0,只需要简单地接受PUBLISH报文,然后投递的时候也发送PUBLISH报文即可,不需要管回包。


QoS1:  

1.1 发布者发布消息给Broker

1.2 Broker持久化这条消息

1.3 以相同packetId=12回复ACK

2.1 Broker发布消息给订阅者

2.2 订阅者以相同packetId=631回复ACK

2.3 Broker删除持久化的消息

对于Broker来说,需要关注收到消息后的操作;对于订阅者来说,当Broker未收到ACK的时候会持续重复发送Publish消息,所以可能重复但必定到达;两侧的packetId是由各自的客户端session分别维护的,没有任何关系。


QoS2:  

1.1 发布者发布消息给Broker

1.2 Broker存储消息

2.1 Broker回复发布者Publish REC,此后收到任何packetId=13的Publish消息都视为重复消息进行丢弃。

2.2 发布者发布Publish REL,Broker将存储的消息移动到下发队列,向订阅者下发消息

2.3 Broker回复发布者Publish COMP,完成发布端的QoS2全部阶段。

3.1 订阅者回复Publish REC,Broker删除存储的Publish消息(第一阶段),重新存储一个Publish REL消息并下发。

3.2 订阅者回复Publish REL,Broker删除存储的Publish REL消息(第二阶段),回复订阅者Publish COMP,完成订阅端的QoS2全部阶段。

订阅者此时才会向上转发消息,上层收到的QoS2消息有且只会有一条。  

也许你会担心例如联网的天然气灶,如果之前一直发送打开消息没有发送成功,半夜的时候突然重连上网络,接收到了打开消息怎么办!或者联网的防盗门,半夜才重连,接收到了打开消息怎么办!不用担心,一般这种消息都会带有时间戳的,完全可以在客户端上做逻辑操作,例如超时5分钟则丢弃消息;同时服务器保存的session具有过期时间,一般为2小时,过期后哪怕是持久化的会话,所有离线消息也都将被丢弃。


转载请注明出处http://www.bewindoweb.com/243.html | 三颗豆子
分享许可方式知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议
重大发现:转载注明原文网址的同学刚买了彩票就中奖,刚写完代码就跑通,刚转身就遇到了真爱。
你可能还会喜欢
具体问题具体杠