文章目录

Kafka中,必须要知道的概念就是消息和日志段。

一、V0和V1消息

在0.11之前,消息被称为Message,一批以数组形式保存的消息被称为消息集合(Message Set)。

消息集合只是简单的聚合,消息集合中的每条消息都有自己的元信息(Metadata),这个并没有聚合。0.10前Message version=0,0.10~0.11版本Message version=1,这个Message version被存储为魔术数字(Magic Number),放在消息里面。

1.1 V0消息  

V0版本的结构都比较简单:

  • attribute:1字节,低3位是压缩类型,0=NONE,1=GZIP,2=Snappy,3=LZ4(LZ4需要kafka 0.9.x以上)
  • key:key可以为null,如果为null则key length=-1
  • value:value可以为null,如果为null则value length=-1

因此,一个V0版本消息的最小长度为4+1+1+4+4=14B,这也是为什么日志段大小log.segement.bytes参数的下限值是14B的原因。我们假设设置key="key",value="value",则消息大小为8+4+14+3+5=34字节;如果写入key=null,value="value",则消息大小为8+4+14+0+5=31字节。

2.1 V1消息


从0.10.0.0开始,引入了V1格式,V1在magic之后多了一个8字节的timestamp字段,表示消息的时间戳。并且attribute的第4位,表示时间戳类型,0表示timestamp是内层消息的最大时间戳(CreateTime),也就是Producer创建这条消息的时间,1表示timestamp是Kafka服务器将这条消息写入日志段(Log Segment)的时间戳(LogAppendTime),V1版本消息最小大小为22字节。

2.2 V1为什么要引入时间戳

KIP-32里面有详细解释引入动机,中文注释版可以参考《KIP-32 Add timestamps to Kafka message》

如果你读到这里还不太清楚Kafka的日志机制也没有关系,当作阅读材料了解一下即可,后面了解了日志机制之后回头来看可以收获很多。

引入时间戳主要为了解决三个问题:

(1)日志保存(Log Retention)策略里面,Kafka可以根据设定的时间参数log.retention.hours等,这个参数的含义是“消息可以在Kafka的磁盘里面存储多久”,当超过这个过期时间,消息理应被删除以节省磁盘空间。旧Kafka的比较方式是比较日志段文件(一批消息会通过日志段的形式存储在磁盘里)的最新修改时间,如果当前时间减去最新修改时间大于这个过期时间值,则执行删除操作。然而如果分区发生了分区副本重分配(Replica Reassignment),比如我们水平扩展了一台新的Kafka机器加入集群,或者我们从Kafka集群剔除了一台机器,或者我们修改了分区的副本数量等等,那么分区的副本就需要和分区Leader进行同步,同步意味着我们可能会创建和追加日志段,这意味着日志段的“修改时间”会被更新。本来应该在log.retention.hours时间内删除的日志,却发现没有删除,这是很奇怪的事情。

(2)日志切分(Log Rolling)策略。日志会根据一些规则对日志段进行切分,其中一个规则就是根据时间log.roll.hours进行切分,只要到了这个时间就应当关闭当前日志段,创建一个新的日志段。日志段是否过期,同样取决于“当前时间-最近修改时间”的差值是否大于log.roll.hours,如果日志也因为副本重分配导致修改时间变化,就会无法执行切分操作,这个问题其实比无法删除更为严重,例如磁盘一旦有单个文件大小限制,就会导致无法写消息到磁盘。

(3)流处理。很多流处理系统都需要消息的时间戳,这也是Kafka 2.x以后一致在追求的定位。

2.3 消息压缩和消息包装

我们都知道,对于常见压缩算法,消息压缩的内容越多,消息压缩的效果就越明显。但Kafka单条消息通常不会太大,所以Kafka的压缩是将多条消息一起压缩。

具体而言,Kafka将消息构造为递归的模式(Recursive message),外层是一个消息集合,称为包装消息或外层消息(Wrapper message),其value值又是一个消息集合,称为内层消息(Inner message)。外层消息可能有多条,每条外层消息的value都包装了多条内层消息。这样在外层指定一个压缩方法,然后内层是用这种压缩方法压缩过的消息集合,就可以实现多条消息压缩了。

在位移计算方面,外层消息的offset是内层最后一条消息的offset。对于V0版本的消息,服务端必须在解压每条内层消息,为每条消息分配独立地分配偏移(Offset),并重新压缩;而对于V1版本的消息,只会为外层消息分配基础偏移,同样这个基础偏移是内层消息最后一条消息的偏移,内层消息则是相对偏移,从0开始顺序递增,这样可以直接计算出内层消息的偏移。注意这里的计算是减法不是加法,举个例子,假设外层消息偏移是100,内层有3条消息,相对偏移为0、1、2,那么内层第一条消息的偏移应该为(100-(2-0))=98,内层最后一条消息偏移为100。这样做的好处在于,Broker无需解压内层消息,消息会由消费者消费的时候解压,然后自行计算偏移,这样压缩是在Producer,解压缩是在Consumer,Broker不会有任何负担。

2.4 V1的CreateTime和LogAppendTime

V1的时间戳有两种类型,CreateTime和LogAppendTime。前面已经提到过,CreateTime表示使用Proucer生产消息的时间,LogAppendTime表示使用Broker写入日志段的时间。

在Topic参数层面(每个Topic可以不一样),可以使用message.timestamp.type去设置默认使用哪种时间戳类型,如果选择CreateTime,则还可以设置max.message.time.difference.ms参数,当用户设置的消息时间与Broker的本地时间差值超过这个参数设定的最大值时,Broker将拒绝写入这条消息,以避免给日志段的保存(Retention)和切分(Rolling)带来隐藏问题,比如用户的时间异常到超过了当前时间几年,导致按时间让日志段过期的设置和预期表现不一致。在Kafka2.1.x里,message.timestamp.type参数的默认值是CreateTime。

对于压缩的消息,时间戳应该如何处理呢?我们仔细分析消息流转的各个场景:

(1)Producer生产消息

对于压缩的消息,无论Broker的message.timestamp.type是什么,Producer都会将内层消息的attributes的timestamp type设置为0(CreateTime)

(2)Broker接收到生产消息

如果message.timestamp.type=LogAppendTime,那么:

  • 如果消息是未压缩的:直接将消息的时间戳设置为当前Broker的本地时间,然后设置消息的attributes的timestamp type=1
  • 如果消息是已压缩的:用Broker本地时间覆盖外层消息的时间戳,然后将外层消息的attributes中timestamp type=1,内层消息的时间戳被忽略,不做任何修改,这样可以避免重新压缩。

如果message.timestamp.type=CreateTime,那么:

  • 如果消息是未压缩的:如果消息时间于Broker本地时间差值在max.message.time.difference.ms之内,则会接收消息并写入日志段,否则抛出异常拒绝消息。
  • 如果消息是已压缩的:除了判断 max.message.time.difference.ms外,Broker还会把外层消息的时间戳更新为内层消息中最大的时间戳。

这里可以看出,对于压缩消息,只有外层消息的attributes中的timestamp type会生效,内层消息的属性值会被忽略。

(3)Consumer收到消息

Consumer并不知道Broker的message.timestamp.type这个值是什么,它只会根据消息的attributes中的timestamp type进行处理。

如果消息是未被压缩的:无论是哪种时间戳,该消息的时间戳将会被直接使用。

如果消息是已经压缩的,那么: 

  • 如果外层消息的 timestamp type = 0(CreateTime),则内层消息的时间戳将被使用
  • 如果外层消息的  timestamp type = 1(LogAppendTime),则外层消息的时间戳将被用作内层消息的时间戳。

(4)时间戳索引

在提出V1的时间戳格式时,还并没有推出时间戳索引,所以当时的一个问题就是,如果设置为CreateTime,由于时间是Producer指定的,时间戳索引可能不是递增的。实际上,在后面的Kafka中,已经强行保证了时间戳索引的单调递增性。

二、V2消息

2.1 记录和消息

在0.11之后,开始使用V2格式的消息,消息被称为记录(Record),对应V1的消息(Message),一条记录有如下数据结构:

length: varint           # 长度
attributes: int8         # 属性值
	bit 0~7: unused
timestampDelta: varint   # 时间戳
offsetDelta: varint      # 偏移
keyLength: varint        # Key长度
key: byte[]              # Key
valueLen: varint         # Value长度
value: byte[]            # Value
Headers => [
	headerKeyLength: varint      # header的Key长度
	headerKey: String            # header的Key
	headerValueLength: varint    # header的Value长度
	Value: byte[]                # header的Value
]

这里的varint和Protobuf的varint是同一个技术,使用了ZigZag编码,用于紧凑地表示数字,数值越小占用字节数越少。

可以看到:

(1)V1的CRC32校验码,魔术数字都被移除了,原因是V2将其放到外面批量记录(Record Batch)里了。

在V1里,如果时间戳类型是LogAppendTime,Broker需要重写消息时间戳,这会导致CRC值需要重新计算;在V1里,如果要做V0版本兼容,同样由于时间戳是包含在CRC计算范围里的,需要重新计算CRC。所以V2将CRC转到了Record Batch里面。

同样,由于魔术数字通常都是重复的,这个值也可以存在RecordBatch里。

(2)attributes名称没变,但作用已经变化了,V1的attributes有一些标记压缩类型、时间戳类型等作用,V2的Record里attributes全部都为保留位(相当于废弃),而V2的Record Batch增加了16位的attribute,实现并扩展了V1的attributes的功能。

(3)同时timestamp也变为了timestampDelta,这样可以通过RecordBatch中包含的时间戳去计算,充分发挥varint的节省空间的作用。

(4)V1的每条消息的前面,其实是有2个固定参数,offset和message size的,V2将其移到Record内部了,变为对应的offsetDelta和length。其中offsetDelta是和Record Batch起始位移的差值,同样是为了节省空间。

(5)增加了headers。这样可以扩展应用级别的属性。

2.2 批量记录和消息集合

同样的,Record也通常是使用批量的形式,被称为批量记录(Record Batch),对应于V0/V1的消息集合(Message Set)概念。记录总是被包装为批量记录,当只有1个记录时,它会被包装为只包含1个记录的批量记录。

批量记录的格式是:

baseOffset: int64
batchLength: int32
partitionLeaderEpoch: int32
magic: int8 (current magic value is 2)
crc: int32
attributes: int16
	bit 0~2:
		0: no compression
		1: gzip
		2: snappy
		3: lz4
		4: zstd
	bit 3: timestampType
	bit 4: isTransactional (0 means not transactional)
	bit 5: isControlBatch (0 means not a control batch)
	bit 6~15: unused
lastOffsetDelta: int32
firstTimestamp: int64
maxTimestamp: int64
producerId: int64
producerEpoch: int16
baseSequence: int32
recordsCount: int32
records: [Record]

V1的消息集合更像是把所有的消息用一个数组存放起来,是一个消息的排列,比如“消息1,消息2,……”,每条消息都有自己的偏移和消息长度。V2的批量记录参考了V1消息的压缩时使用的“外层消息、内层消息”这样的结构,将消息的头部信息抽离出来,类似“基本信息、记录数据1、记录数据2”。

(1)baseOffset:当前RecordBatch的起始位移。

(2)batchLength:RecordBatch的总长度。

(3)partitionLeaderEpoch:这个值的作用比较深,和Kafka的Leader选举有关,主要作用就是标记分区Leader的纪元,保证在进行Leader选举时消费不会发生异常。

(4)magic:魔术数字,这里是V2消息,所以值是2。

(5)crc:计算了从属性值到末尾的全部数据的校验码,partitionLeaderEpoch不在CRC里面是因为每次Broker收到消息时都会赋值partitionLeaderEpoch,如果包含在CRC里面会导致需要重新计算CRC。

(6)attributes:相比于V1,V2将其放在了RecordBatch里,且从8位扩展为了16位,0-2仍然是压缩类型,3仍然是时间戳类型,4表示是否是事务型记录。所谓“事务”是Kafka的新功能,开启事务之后,只有在事务提交之后,事务型消费者才可以看到记录。5表示是否是Control Record,这类记录总是单条出现,被包含在一个control record batch里面,它可以用于标记“事务是否已经提交”、“事务是否已经中止”等,它只会在Broker内处理,不会被传输给应用程序,对客户端是透明的。

(7)last offset delta:RecordBatch最后一个Record的相对位移,用于Broker确认RecordBatch中Records的组装正确性。

(8)firstTimestamp:RecordBatch第一条Record的时间戳

(9)maxTimestamp:RecordBatch中最大的时间戳,一般情况下是最后一条消息的时间戳,用于Broker确认RecordBatch中Records的组装正确性。

(10)producer id:生产者编号,用于支持幂等性(有且只有一次,Exactly Once Delivery),参考KIP-98 - Exactly Once Delivery and Transactional Messaging

(11)producer epoch:生产者纪元,用于支持幂等性

(12)base sequence:基础序号,用于支持幂等性,用于校验是否是重复消息

(13)records count:记录的数量。这个参数在Kafka官网上即使是最新版本的文档(2.6.x)里也没有提到,但看到某些其他网络资料上描述的有,于是去翻看了源码(2.1.x以上)的确是有的:

public static final int RECORDS_COUNT_OFFSET = BASE_SEQUENCE_OFFSET + BASE_SEQUENCE_LENGTH;
static final int RECORDS_COUNT_LENGTH = 4;

所以如果要分析真实的消息格式时,一定不要忘记这里的4字节records count。

2.3 V2消息压缩

由于V2的RecordBatch采用了良好的格式,再也不用像V1一样构造递归消息了,直接用RecordBatch标记为压缩,然后压缩records的全部内容即可。注意,这里只有records的内容会被压缩。

总结

消息格式是Kafka的基础,了解消息格式的演进有利于理解Kafka遇到过哪些问题,以及如何解决这些问题的。

V2的Record源代码在kafka/clients/src/main/java/org/apache/kafka/common/record目录下,注释都非常详细,值得一看。

参考资料:


转载请注明出处http://www.bewindoweb.com/299.html | 三颗豆子
分享许可方式知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议
重大发现:转载注明原文网址的同学刚买了彩票就中奖,刚写完代码就跑通,刚转身就遇到了真爱。
你可能还会喜欢
具体问题具体杠