深入理解Kafka:核心设计与实践原理-朱忠华
- 书名: 深入理解Kafka:核心设计与实践原理
- 作者: 朱忠华
- 简介: 本书从Kafka的基础概念切入,循序渐进地转入对其内部原理的剖析。本书主要阐述了Kafka中生产者客户端、消费者客户端、主题与分区、日志存储、原理解析、监控管理、应用扩展及流式计算等内容。虽然Kafka的内核使用Scala语言编写,但本书基本以Java语言作为主要的示例语言,方便大多数读者的理解。虽然本书没有明确的界定,但总体上可以划分为三个部分:基础篇、原理篇和扩展篇,前4章为基础篇,包括基础概念、生产者、消费者,以及主题与分区,学习完这4章的内容完全可以应对绝大多数的开发场景。第5章至第8章为原理篇,包括对日志存储、协议设计、控制器、组协调器、事务、一致性、可靠性等内容的探究,学习完这4章的内容可以让读者对Kafka有一个深刻的认知。最后4章从应用扩展层面来做讲解,可以归类为扩展篇,主要内容包括监控、应用工具、应用扩展(延时队列、重试队列、死信队列、消息轨迹等)、与Spark的集成等,让读者可以对Kafka的生态有一个更加全面的认知。本
- 出版时间 2019-01-01 00:00:00
- ISBN: 9787121359026
- 分类: 计算机-编程设计
- 出版社: 电子工业出版社
高亮划线
封面
版权信息
前言
第1章 初识Kafka
1.1 基本概念
1.2 安装与配置
1.3 生产与消费
1.4 服务端参数配置
1.5 总结
第2章 生产者
2.1 客户端开发
-
📌 其中topic和partition字段分别代表消息要发往的主题和分区号。headers字段是消息的头部,Kafka 0.11.x版本才引入这个属性,它大多用来设定一些与应用相关的信息,如无需要也可以不用设置。key是用来指定消息的键,它不仅是消息的附加信息,还可以用来计算分区号进而可以让消息发往特定的分区 ^11-1468-1621
- ⏱ 2024-05-22 08:18:23
-
📌 key.serializer 和 value.serializer:broker 端接收的消息必须以字节数组(byte[])的形式存在。代码清单2-1中生产者使用的KafkaProducer<String,String>和ProducerRecord<String,String>中的泛型<String,String>对应的就是消息中key和value的类型,生产者客户端使用这种方式可以让代码具有良好的可读性,不过在发往broker之前需要将消息中对应的key和value做相应的序列化操作来转换成字节数组。key.serializer和value.serializer这两个参数分别用来指定key和value序列化操作的序列化器,这两个参数无默认值。注意这里必须填写序列化器的全限定名,如代码清单2-1中的org.apache.kafka.common.serialization.StringSerializer,单单指定StringSerializer是错误的,更多有关序列化的内容可以参考2.1.3节 ^11-2389-2844
- ⏱ 2024-05-22 08:23:06
-
📌 注意到代码清单2-1中的initConfig()方法里还设置了一个参数client.id,这个参数用来设定KafkaProducer对应的客户端id,默认值为“”。如果客户端不设置,则KafkaProducer会自动生成一个非空字符串,内容形式如“producer-1”“producer-2”,即字符串“producer-”与数字的拼接。 ^11-2874-3044
- ⏱ 2024-05-22 08:24:05
-
📌 KafkaProducer是线程安全的,可以在多个线程中共享单个KafkaProducer实例,也可以将KafkaProducer实例进行池化来供其他线程调用。 ^11-4238-4318
- ⏱ 2024-05-22 08:25:23
-
📌 清单2-1中我们已经了解了ProducerRecord的属性结构,其中topic属性和value属性是必填项,其余属性是选填项, ^11-4906-4970
- ⏱ 2024-05-22 08:29:13
-
📌 KafkaProducer 的 send()方法并非是 void 类型,而是 Future<RecordMetadata>类型,send()方法有2个重载方法,具体定义如下:[插图]要实现同步的发送方式,可以利用返回的Future对象实现,示例如下:[插图]实际上send()方法本身就是异步的,send()方法返回的Future对象可以使调用方稍后获得发送的结果。示例中在执行send()方法之后直接链式调用了get()方法来阻塞等待Kafka的响应,直到消息发送成功,或者发生异常。如果发生异常,那么就需要捕获异常并交由外层逻辑处理。 ^11-5984-6677
- ⏱ 2024-05-22 08:32:07
-
📌 KafkaProducer中一般会发生两种类型的异常:可重试的异常和不可重试的异常。常见的可重试异常有:NetworkException、LeaderNotAvailableException、UnknownTopicOrPartitionException、NotEnoughReplicasException、NotCoordinatorException 等。比如NetworkException 表示网络异常,这个有可能是由于网络瞬时故障而导致的异常,可以通过重试解决;又比如LeaderNotAvailableException表示分区的leader副本不可用,这个异常通常发生在leader副本下线而新的 leader 副本选举完成之前,重试之后可以重新恢复。不可重试的异常,比如 1.4 节中提及的RecordTooLargeException异常,暗示了所发送的消息太大,KafkaProducer对此不会进行任何重试,直接抛出异常。 ^11-7559-7984
- ⏱ 2024-05-22 08:33:58
-
📌 onCompletion()方法的两个参数是互斥的,消息发送成功时,metadata 不为 null 而exception为null;消息发送异常时,metadata为null而exception不为null。 ^11-9121-9226
- ⏱ 2024-05-22 08:37:52
-
📌 对于同一个分区而言,如果消息record1于record2之前先发送(参考上面的示例代码),那么KafkaProducer就可以保证对应的callback1在callback2之前调用,也就是说,回调函数的调用也可以保证分区有序。 ^11-9441-9556
- ⏱ 2024-05-22 08:39:08
-
📌 生产者使用的序列化器和消费者使用的反序列化器是需要一一对应的, ^11-11315-11346
- ⏱ 2024-05-22 08:47:39
-
📌 如果生产者使用了某种序列化器,比如StringSerializer,而消费者使用了另一种序列化器,比如IntegerSerializer,那么是无法解析出想要的数据的 ^11-11346-11429
- ⏱ 2024-05-22 08:47:50
-
📌 消息在通过send()方法发往broker的过程中,有可能需要经过拦截器(Interceptor)、序列化器(Serializer)和分区器(Partitioner)的一系列作用之后才能被真正地发往 broker。拦截器(下一章会详细介绍)一般不是必需的,而序列化器是必需的。消息经过序列化之后就需要确定它发往的分区,如果消息ProducerRecord中指定了partition字段,那么就不需要分区器的作用,因为partition代表的就是所要发往的分区号。 ^11-14056-14287
- ⏱ 2024-05-22 13:27:14
-
📌 如果 key 不为 null,那么默认的分区器会对 key 进行哈希(采用MurmurHash2算法,具备高运算性能及低碰撞率),最终根据得到的哈希值来计算分区号,拥有相同key的消息会被写入同一个分区。如果key为null,那么消息将会以轮询的方式发往主题内的各个可用分区。 ^11-15391-15529
- ⏱ 2024-05-22 13:29:56
-
📌 如果 key 不为 null,那么计算得到的分区号会是所有分区中的任意一个;如果key为null,那么计算得到的分区号仅为可用分区中的任意一个,注意两者之间的差别。 ^11-15587-15670
- ⏱ 2024-05-22 13:30:18
-
📌 Kafka一共有两种拦截器:生产者拦截器和消费者拦截器 ^11-16928-16955
- ⏱ 2024-05-22 13:32:29
-
📌 生产者拦截器既可以用来在消息发送前做一些准备工作,比如按照某个规则过滤不符合要求的消息、修改消息的内容等,也可以用来在发送回调逻辑前做一些定制化的需求,比如统计类工作。 ^11-17026-17110
- ⏱ 2024-05-22 13:32:48
-
📌 KafkaProducer 会在消息被应答(Acknowledgement)之前或消息发送失败时调用生产者拦截器的 onAcknowledgement()方法,优先于用户设定的 Callback 之前执行。这个方法运行在Producer 的 I/O 线程中,所以这个方法中实现的代码逻辑越简单越好,否则会影响消息的发送速度。 ^11-17696-17858
- ⏱ 2024-05-22 13:33:54
-
📌 在拦截链中,如果某个拦截器执行失败,那么下一个拦截器会接着从上一个执行成功的拦截器继续执行。 ^11-20796-20842
- ⏱ 2024-05-22 13:35:26
2.2 原理分析
-
📌 在2.1.4节的开头介绍了消息在真正发往Kafka之前,有可能需要经历拦截器(Interceptor)、序列化器(Serializer)和分区器(Partitioner)等一系列的作用,那么在此之后又会发生什么呢?下面我们来看一下生产者客户端的整体架构,如图2-1所示。[插图]图2-1 生产者客户端的整体架构整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和Sender线程(发送线程)。在主线程中由KafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator,也称为消息收集器)中。Sender 线程负责从RecordAccumulator中获取消息并将其发送到Kafka中。RecordAccumulator 主要用来缓存消息以便 Sender 线程可以批量发送,进而减少网络传输的资源消耗以提升性能。RecordAccumulator 缓存的大小可以通过生产者客户端参数buffer.memory 配置,默认值为 33554432B,即 32MB。如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候KafkaProducer的send()方法调用要么被阻塞,要么抛出异常,这个取决于参数max.block.ms的配置,此参数的默认值为60000,即60秒。主线程中发送过来的消息都会被追加到RecordAccumulator的某个双端队列(Deque)中,在 RecordAccumulator 的内部为每个分区都维护了一个双端队列,队列中的内容就是ProducerBatch,即 Deque<ProducerBatch>。消息写入缓存时,追加到双端队列的尾部;Sender读取消息时,从双端队列的头部读取。注意ProducerBatch不是ProducerRecord,ProducerBatch中可以包含一至多个 ProducerRecord。通俗地说,ProducerRecord 是生产者中创建的消息,而ProducerBatch是指一个消息批次,ProducerRecord会被包含在ProducerBatch中,这样可以使字节的使用更加紧凑。与此同时,将较小的ProducerRecord拼凑成一个较大的ProducerBatch,也可以减少网络请求的次数以提升整体的吞吐量 ^12-635-1956
- ⏱ 2024-05-22 19:13:22
-
📌 ProducerBatch和消息的具体格式有关,更多的详细内容可以参考 5.2 节。如果生产者客户端需要向很多分区发送消息,则可以将buffer.memory参数适当调大以增加整体的吞吐量。消息在网络上都是以字节(Byte)的形式传输的,在发送之前需要创建一块内存区域来保存对应的消息。在Kafka生产者客户端中,通过java.io.ByteBuffer实现消息内存的创建和释放。不过频繁的创建和释放是比较耗费资源的,在RecordAccumulator的内部还有一个BufferPool,它主要用来实现ByteBuffer的复用,以实现缓存的高效利用。不过BufferPool只针对特定大小的ByteBuffer进行管理,而其他大小的ByteBuffer不会缓存进BufferPool中,这个特定的大小由batch.size参数来指定,默认值为16384B,即16KB。我们可以适当地调大batch.size参数以便多缓存一些消息。ProducerBatch的大小和batch.size参数也有着密切的关系。当一条消息(ProducerRecord)流入RecordAccumulator时,会先寻找与消息分区所对应的双端队列(如果没有则新建),再从这个双端队列的尾部获取一个 ProducerBatch(如果没有则新建),查看 ProducerBatch 中是否还可以写入这个 ProducerRecord,如果可以则写入,如果不可以则需要创建一个新的ProducerBatch。在新建ProducerBatch时评估这条消息的大小是否超过batch.size参数的大小,如果不超过,那么就以 batch.size 参数的大小来创建 ProducerBatch,这样在使用完这段内存区域之后,可以通过BufferPool 的管理来进行复用;如果超过,那么就以评估的大小来创建ProducerBatch,这段内存区域不会被复用。Sender 从 RecordAccumulator 中获取缓存的消息之后,会进一步将原本<分区,Deque<ProducerBatch>>的保存形式转变成<Node,List< ProducerBatch>的形式,其中Node表示Kafka集群的broker节点。对于网络连接来说,生产者客户端是与具体的broker节点建立的连接,也就是向具体的 broker 节点发送消息,而并不关心消息属于哪一个分区 ^12-1957-3062
- ⏱ 2024-05-22 19:13:34
-
📌 ;而对于 KafkaProducer的应用逻辑而言,我们只关注向哪个分区中发送哪些消息,所以在这里需要做一个应用逻辑层面到网络I/O层面的转换。在转换成<Node,List<ProducerBatch>>的形式之后,Sender 还会进一步封装成<Node,Request>的形式,这样就可以将Request请求发往各个Node了,这里的Request是指Kafka的各种协议请求,对于消息发送而言就是指具体的 ProduceRequest,更多与 Kafka 协议有关的内容可以参考6.1节。请求在从Sender线程发往Kafka之前还会保存到InFlightRequests中,InFlightRequests保存对象的具体形式为 Map<NodeId,Deque<Request>>,它的主要作用是缓存了已经发出去但还没有收到响应的请求(NodeId 是一个 String 类型,表示节点的 id 编号)。与此同时,InFlightRequests还提供了许多管理类的方法,并且通过配置参数还可以限制每个连接(也就是客户端与Node之间的连接)最多缓存的请求数。这个配置参数为max.in.flight.requests.per.connection,默认值为 5,即每个连接最多只能缓存 5 个未响应的请求,超过该数值之后就不能再向这个连接发送更多的请求了,除非有缓存的请求收到了响应(Response)。通过比较Deque<Request>的size与这个参数的大小来判断对应的Node中是否已经堆积了很多未响应的消息,如果真是如此,那么说明这个 Node 节点负载较大或网络连接有问题,再继续向其发送请求会增大请求超时的可能。 ^12-3062-3839
- ⏱ 2024-05-22 19:13:45
-
📌 元数据是指Kafka集群的元数据,这些元数据具体记录了集群中有哪些主题,这些主题有哪些分区,每个分区的leader副本分配在哪个节点上,follower副本分配在哪些节点上,哪些副本在AR、ISR等集合中,集群中有哪些节点,控制器节点又是哪一个等信息。 ^12-5376-5502
- ⏱ 2024-05-22 19:16:04
-
📌 当客户端中没有需要使用的元数据信息时,比如没有指定的主题信息,或者超过metadata.max.age.ms 时间没有更新元数据都会引起元数据的更新操作。客户端参数metadata.max.age.ms的默认值为300000,即5分钟。元数据的更新操作是在客户端内部进行的,对客户端的外部使用者不可见。当需要更新元数据时,会先挑选出leastLoadedNode,然后向这个Node发送MetadataRequest请求来获取具体的元数据信息。这个更新操作是由Sender线程发起的,在创建完MetadataRequest之后同样会存入InFlightRequests,之后的步骤就和发送消息时的类似。元数据虽然由Sender线程负责更新,但是主线程也需要读取这些信息,这里的数据同步通过synchronized和final关键字来保障。 ^12-5531-5900
- ⏱ 2024-05-22 19:17:48
2.3 重要的生产者参数
-
📌 acks 是生产者客户端中一个非常重要的参数,它涉及消息的可靠性和吞吐量之间的权衡。acks参数有3种类型的值(都是字符串类型)。 ^13-759-824
- ⏱ 2024-05-22 20:15:46
-
📌 注意acks参数配置的值是一个字符串类型,而不是整数类型。举个例子,将acks参数设置为0,需要采用下面这两种形式: ^13-1570-1628
- ⏱ 2024-05-22 20:16:04
-
📌 retries参数用来配置生产者重试的次数,默认值为0 ^13-2953-2980
- ⏱ 2024-05-22 20:15:16
-
📌 重试还和另一个参数retry.backoff.ms有关,这个参数的默认值为100,它用来设定两次重试之间的时间间隔,避免无效的频繁重试。 ^13-3244-3312
- ⏱ 2024-05-22 20:15:07
-
📌 6.linger.ms这个参数用来指定生产者发送 ProducerBatch 之前等待更多消息(ProducerRecord)加入ProducerBatch 的时间,默认值为 0。生产者客户端会在ProducerBatch 被填满或等待时间超过linger.ms 值时发送出去。增大这个参数的值会增加消息的延迟,但是同时能提升一定的吞吐量。这个linger.ms参数与TCP协议中的Nagle算法有异曲同工之妙。 ^13-4258-4501
- ⏱ 2024-05-22 20:20:03
2.4 总结
第3章 消费者
3.1 消费者与消费组
-
📌 每个消费者都有一个对应的消费组。当消息发布到主题后,只会被投递给订阅它的每个消费组中的一个消费者。 ^16-552-601
- ⏱ 2024-05-22 08:05:45
-
📌 每一个分区只能被一个消费组中的一个消费者所消费。 ^16-833-857
- ⏱ 2024-05-22 08:09:46
-
📌 消费者与消费组这种模型可以让整体的消费能力具备横向伸缩性,我们可以增加(或减少)消费者的个数来提高(或降低)整体的消费能力。 ^16-1995-2057
- ⏱ 2024-05-22 08:11:09
-
📌 对于分区数固定的情况,一味地增加消费者并不会让消费能力一直得到提升,如果消费者过多,出现了消费者的个数大于分区个数的情况,就会有消费者分配不到任何分区。 ^16-2057-2133
- ⏱ 2024-05-22 08:12:07
-
📌 对于消息中间件而言,一般有两种消息投递模式:点对点(P2P,Point-to-Point)模式和发布/订阅(Pub/Sub)模式。点对点模式是基于队列的,消息生产者发送消息到队列,消息消费者从队列中接收消息。发布订阅模式定义了如何向一个内容节点发布和订阅消息,这个内容节点称为主题(Topic),主题可以认为是消息传递的中介,消息发布者将消息发布到某个主题,而消息订阅者从主题中订阅消息。主题使得消息的订阅者和发布者互相保持独立,不需要进行接触即可保证消息的传递,发布/订阅模式在消息的一对多广播时采用。Kafka 同时支持两种消息投递模式,而这正是得益于消费者与消费组模型的契合:· 如果所有的消费者都隶属于同一个消费组,那么所有的消息都会被均衡地投递给每一个消费者,即每条消息只会被一个消费者处理,这就相当于点对点模式的应用。· 如果所有的消费者都隶属于不同的消费组,那么所有的消息都会被广播给所有的消费者,即每条消息会被所有的消费者处理,这就相当于发布/订阅模式的应用。 ^16-2847-3343
- ⏱ 2024-05-22 08:14:29
3.2 客户端开发
-
📌 bootstrap.servers: ^17-1855-1874
- ⏱ 2024-05-23 07:43:33
-
📌 这里设置两个以上的broker地址信息,当其中任意一个宕机时,消费者仍然可以连接到Kafka集群上。有关此参数的更多释义可以参考6.5.2节。 ^17-2068-2139
- ⏱ 2024-05-23 07:43:37
-
📌 group.id: ^17-2170-2179
- ⏱ 2024-05-23 07:43:24
-
📌 一般而言,这个参数需要设置成具有一定的业务意义的名称。 ^17-2329-2356
- ⏱ 2024-05-23 07:43:05
-
📌 消费者不仅可以通过KafkaConsumer.subscribe()方法订阅主题,还可以直接订阅某些主题的特定分区,在KafkaConsumer中还提供了一个assign()方法来实现这些功能 ^17-5553-5649
- ⏱ 2024-05-23 07:49:53
-
📌 如果将subscribe(Collection)或assign(Collection)中的集合参数设置为空集合,那么作用等同于unsubscribe()方法,下面示例中的三行代码的效果相同: ^17-8005-8100
- ⏱ 2024-05-23 07:52:08
-
📌 通过 subscribe()方法订阅主题具有消费者自动再均衡的功能,在多个消费者的情况下可以根据分区分配策略来自动分配各个消费者与分区的关系。当消费组内的消费者增加或减少时,分区分配关系会自动调整,以实现消费负载均衡及故障自动转移。而通过assign()方法订阅分区时,是不具备消费者自动均衡的功能的,其实这一点从assign()方法的参数中就可以看出端倪,两种类型的subscribe()都有ConsumerRebalanceListener类型参数的方法,而assign()方法却没有。 ^17-9024-9269
- ⏱ 2024-05-23 07:54:39
-
📌 在实际应用中,在Kafka提供的序列化器和反序列化器满足不了应用需求的前提下,推荐使用Avro、JSON、Thrift、ProtoBuf或Protostuff等通用的序列化工具来包装,以求尽可能实现得更加通用且前后兼容。使用通用的序列化工具也需要实现 Serializer 和 Deserializer 接口,因为Kafka客户端的序列化和反序列化入口必须是这两个类型。 ^17-12124-12309
- ⏱ 2024-05-23 07:56:26
-
📌 Kafka中的消费是基于拉模式的。消息的消费一般有两种模式:推模式和拉模式。推模式是服务端主动将消息推送给消费者,而拉模式是消费者主动向服务端发起请求来拉取消息。 ^17-13256-13337
- ⏱ 2024-05-23 07:59:01
-
📌 timeout的设置取决于应用程序对响应速度的要求,比如需要在多长时间内将控制权移交给执行轮询的应用线程。可以直接将timeout设置为0,这样poll()方法会立刻返回,而不管是否已经拉取到了消息。如果应用线程唯一的工作就是从Kafka中拉取并消费消息,则可以将这个参数设置为最大值Long.MAX_VALUE。 ^17-14467-14624
- ⏱ 2024-05-23 07:59:10
-
📌 到目前为止,可以简单地认为poll()方法只是拉取一下消息而已,但就其内部逻辑而言并不简单,它涉及消费位移、消费者协调器、组协调器、消费者的选举、分区分配的分发、再均衡的逻辑、心跳等内容,在后面的章节中会循序渐进地介绍这些内容。 ^17-17752-17866
- ⏱ 2024-05-23 08:05:26
-
📌 在旧消费者客户端中,消费位移是存储在ZooKeeper中的。而在新消费者客户端中,消费位移存储在Kafka内部的主题__consumer_offsets中。这里把将消费位移存储起来(持久化)的动作称为“提交”,消费者在消费完消息之后需要执行消费位移的提交。 ^17-18691-18819
- ⏱ 2024-05-23 08:14:22
-
📌 在默认的方式下,消费者每隔5秒会将拉取到的每个分区中最大的消息位移进行提交。自动位移提交的动作是在poll()方法的逻辑里完成的,在每次真正向服务端发起拉取请求之前会检查是否可以进行位移提交,如果可以,那么就会提交上一次轮询的位移。 ^17-22232-22348
- ⏱ 2024-05-23 08:16:59
3.3 总结
第4章 主题与分区
4.1 主题的管理
-
📌 主题、分区、副本和 Log(日志)的关系如图 4-1 所示,主题和分区都是提供给上层用户的抽象,而在副本层面或更加确切地说是Log层面才有实际物理上的存在。同一个分区中的多个副本必须分布在不同的broker中,这样才能提供有效的数据冗余。对于示例中的分区数为4、副本因子为2、broker数为3的情况下,按照2、3、3的分区副本个数分配给各个broker是最优的选择。再比如在分区数为3、副本因子为3,并且broker数同样为3的情况下,分配3、3、3的分区副本个数给各个broker是最优的选择,也就是每个broker中都拥有所有分区的一个副本。[插图]图4-1 主题、分区、副本和Log之间的关系 ^20-3112-3632
- ⏱ 2024-05-24 07:41:15
-
📌 我们不仅可以通过日志文件的根目录来查看集群中各个broker的分区副本的分配情况,还可以通过ZooKeeper客户端来获取。当创建一个主题时会在ZooKeeper的/brokers/topics/目录下创建一个同名的实节点,该节点中记录了该主题的分区副本分配方案。示例如下:[插图]示例数据中的"2":[1,2]表示分区 2 分配了 2 个副本,分别在 brokerId 为 1 和 2 的 broker节点中。 ^20-3671-4090
- ⏱ 2024-05-24 07:41:50
4.2 初识KafkaAdminClient
4.3 分区的管理
4.4 如何选择合适的分区数
4.5 总结
第5章 日志存储
5.1 文件目录布局
5.2 日志格式的演变
5.3 日志索引
5.4 日志清理
5.5 磁盘存储
5.6 总结
第6章 深入服务端
6.1 协议设计
- 📌 Kafka中最枯燥的莫过于它的上百个参数、几百个监控指标和几十种请求协议,掌握这三者的“套路”,相信你会对Kafka有更深入的理解。 ^33-7681-7747
- ⏱ 2024-05-24 17:27:50
6.2 时间轮
-
📌 DK中Timer和DelayQueue的插入和删除操作的平均时间复杂度为O(nlogn)并不能满足Kafka的高性能要求,而基于时间轮可以将插入和删除操作的时间复杂度都降为O(1)。时间轮的应用并非Kafka独有,其应用场景还有很多,在Netty、Akka、Quartz、ZooKeeper等组件中都存在时间轮的踪影。 ^34-564-723
- ⏱ 2024-05-24 17:35:33
-
📌 如图6-7所示,Kafka中的时间轮(TimingWheel)是一个存储定时任务的环形队列,底层采用数组实现,数组中的每个元素可以存放一个定时任务列表(TimerTaskList)。TimerTaskList是一个环形的双向链表,链表中的每一项表示的都是定时任务项(TimerTaskEntry),其中封装了真正的定时任务(TimerTask)。时间轮由多个时间格组成,每个时间格代表当前时间轮的基本时间跨度(tickMs)。时间轮的时间格个数是固定的,可用wheelSize来表示,那么整个时间轮的总体时间跨度(interval)可以通过公式 tickMs×wheelSize计算得出。时间轮还有一个表盘指针(currentTime),用来表示时间轮当前所处的时间,currentTime是tickMs的整数倍。currentTime可以将整个时间轮划分为到期部分和未到期部分,currentTime当前指向的时间格也属于到期部分,表示刚好到期,需要处理此时间格所对应的TimerTaskList中的所有任务。 ^34-752-1237
- ⏱ 2024-05-24 17:49:02
-
📌 [插图]图6-7 时间轮结构若时间轮的tickMs为1ms且wheelSize等于20,那么可以计算得出总体时间跨度interval为20ms。初始情况下表盘指针currentTime指向时间格0,此时有一个定时为2ms的任务插进来会存放到时间格为2的TimerTaskList中。随着时间的不断推移,指针currentTime不断向前推进,过了2ms之后,当到达时间格2时,就需要将时间格2对应的TimeTaskList中的任务进行相应的到期操作。此时若又有一个定时为 8ms 的任务插进来,则会存放到时间格 10 中,currentTime再过8ms后会指向时间格10。如果同时有一个定时为19ms的任务插进来怎么办?新来的TimerTaskEntry会复用原来的TimerTaskList,所以它会插入原本已经到期的时间格1。总之,整个时间轮的总体跨度是不变的,随着指针currentTime的不断推进,当前时间轮所能处理的时间段也在不断后移,总体时间范围在currentTime和currentTime+interval之间。 ^34-1276-1963
- ⏱ 2024-05-24 17:49:10
-
📌 如果此时有一个定时为350ms的任务该如何处理?直接扩充wheelSize的大小?Kafka中不乏几万甚至几十万毫秒的定时任务,这个wheelSize的扩充没有底线,就算将所有的定时任务的到期时间都设定一个上限,比如100万毫秒,那么这个wheelSize为100万毫秒的时间轮不仅占用很大的内存空间,而且也会拉低效率。Kafka 为此引入了层级时间轮的概念,当任务的到期时间超过了当前时间轮所表示的时间范围时,就会尝试添加到上层时间轮中。如图6-8所示,复用之前的案例,第一层的时间轮tickMs=1ms、wheelSize=20、interval=20ms。第二层的时间轮的tickMs为第一层时间轮的interval,即20ms。每一层时间轮的wheelSize是固定的,都是20,那么第二层的时间轮的总体时间跨度interval为400ms。以此类推,这个400ms也是第三层的tickMs的大小,第三层的时间轮的总体时间跨度为8000ms。 ^34-1992-2446
- ⏱ 2024-05-24 17:49:25
-
📌 对于之前所说的350ms的定时任务,显然第一层时间轮不能满足条件,所以就升级到第二层时间轮中,最终被插入第二层时间轮中时间格17所对应的TimerTaskList。如果此时又有一个定时为450ms的任务,那么显然第二层时间轮也无法满足条件,所以又升级到第三层时间轮中,最终被插入第三层时间轮中时间格1的TimerTaskList。注意到在到期时间为[400ms,800ms)区间内的多个任务(比如446ms、455ms和473ms的定时任务)都会被放入第三层时间轮的时间格1,时间格1对应的TimerTaskList的超时时间为400ms。随着时间的流逝,当此TimerTaskList到期之时,原本定时为450ms的任务还剩下50ms的时间,还不能执行这个任务的到期操作。这里就有一个时间轮降级的操作,会将这个剩余时间为 50ms 的定时任务重新提交到层级时间轮中,此时第一层时间轮的总体时间跨度不够,而第二层足够,所以该任务被放到第二层时间轮到期时间为[40ms,60ms)的时间格中。再经历40ms之后,此时这个任务又被“察觉”,不过还剩余10ms,还是不能立即执行到期操作 ^34-2475-2965
- ⏱ 2024-05-24 17:49:35
-
📌 所以还要再有一次时间轮的降级,此任务被添加到第一层时间轮到期时间为[10ms,11ms)的时间格中,之后再经历 10ms 后,此任务真正到期,最终执行相应的到期操作。[插图]图6-8 多层时间轮 ^34-2966-3283
- ⏱ 2024-05-24 17:49:45