时间轮笔记

如果一台机器上有10w个定时任务,如何做到高效触发?

rpc调用时超时怎么实现?

订单超时未支付变更订单状态怎么实现?

(1) 方案一

维护一个映射 Map<id, TaskVo> 每次扫描映射里的所有数据,把超时的拿出来执行。

缺点

时间复杂度O(N) , 数据量很大时需要扫描所有数据,非常耗时,而且浪费CPU


(2) 方案二

使用Java里的延迟队列 DelayQueue, 使用优先队列 PriorityQueue 实现,底层数据结构是 二叉小顶堆 。

时间复杂度 log n

(2) 方案二

时间轮/时钟轮

(2.1) 时间轮原理

时钟轮的实现原理就是参考了生活中的时钟跳动的原理。

时钟有 时针、分针和秒针,
秒针跳动一圈之后,也就是跳动 60 个刻度之后,分针跳动 1 次
分针跳动 60 个刻度,时针走动一步。

时钟轮的运行机制和生活中的时钟也是一样的,每隔固定的单位时间,就会从一个时间槽位跳到下一个时间槽位,相当于秒针跳动了一次;
时钟轮可以分为多层,下一层时钟轮中每个槽位的单位时间是当前时间轮整个周期的时间,这就相当于 1 分钟等于 60 秒钟;
当时钟轮将一个周期的所有槽位都跳动完之后,就会从下一层时钟轮中取出一个槽位的任务,重新分布到当前的时钟轮中,当前时钟轮则从第 0 槽位从新开始跳动,这就相当于下一分钟的第 1 秒。

(2.2) 时间轮数据结构

时钟

时间轮

单个时间轮

(2.3) 时间轮的实现

时间轮的基本功能

  1. 添加延时任务
  2. 执行时间到的延时任务
  3. 删除执行完成的延时任务
  4. 查看延时任务个数
  5. 提前执行延时任务
  6. 延迟执行延时任务

以kafka的时间轮为例
Kafka中的时间轮(TimingWheel)是一个存储定时任务的环形队列,底层采用数组实现,数组中的每个元素可以存放一个定时任务列表(TimerTaskList)。
TimerTaskList是一个环形的双向链表,链表中的每一项表示的都是定时任务项(TimerTaskEntry),其中封装了真正的定时任务TimerTask。在Kafka源码中对这个TimeTaskList是用一个名称为buckets的数组表示的。

tickMs :时间轮由多个 时间格/单位 组成,每个 时间格/单位 就是tickMs,它代表当前时间轮的基本时间跨度。
wheelSize :代表每一层时间轮的格数
interval :当前时间轮的总体时间跨度,interval = tickMs × wheelSize
startMs :构造当层时间轮时候的当前时间,第一层的时间轮的startMs是TimeUnit.NANOSECONDS.toMillis(nanoseconds()),上层时间轮的startMs为下层时间轮的currentTime。
currentTime :表示时间轮当前所处的时间,currentTime是tickMs的整数倍(通过currentTime=startMs - (startMs % tickMs来保正currentTime一定是tickMs的整数倍),这个运算类比钟表中65秒分钟指针指向的还是1分钟)。currentTime可以将整个时间轮划分为到期部分和未到期部分,currentTime当前指向的时间格也属于到期部分,表示刚好到期,需要处理此时间格所对应的TimerTaskList的所有任务。

(2.3.1) 添加任务

添加延时任务逻辑

  1. 检查任务是否取消
  2. 检查任务是否过期
  3. 检查任务是否在当前时间轮,如果在找到对应的bucket,放入延时队列
  4. 任务不在当前时间轮,放入上一层的时间轮。 如果上一层时间轮不存在,创建时间轮。

添加任务 kafka.utils.timer.TimingWheel::add


def add(timerTaskEntry: TimerTaskEntry): Boolean = {
  val expiration = timerTaskEntry.expirationMs

  if (timerTaskEntry.cancelled) {
    // 任务取消
    // Cancelled
    false
  } else if (expiration < currentTime + tickMs) {
    // 过期
    // Already expired
    false
  } else if (expiration < currentTime + interval) {
    // 任务过期时间比当前时间轮时间加周期小说明任务过期时间在本时间轮周期内
    // Put in its own bucket
    val virtualId = expiration / tickMs
    //找到任务对应本时间轮的bucket
    val bucket = buckets((virtualId % wheelSize.toLong).toInt)
    bucket.add(timerTaskEntry)

    // 只有本bucket内的任务都过期后才会bucket.setExpiration返回true此时将bucket放入延迟队列
    // Set the bucket expiration time
    if (bucket.setExpiration(virtualId * tickMs)) {

      // bucket是一个TimerTaskList,它实现了java.util.concurrent.Delayed接口,里面是一个多任务组成的链表
      // The bucket needs to be enqueued because it was an expired bucket
      // We only need to enqueue the bucket when its expiration time has changed, i.e. the wheel has advanced
      // and the previous buckets gets reused; further calls to set the expiration within the same wheel cycle
      // will pass in the same value and hence return false, thus the bucket with the same expiration will not
      // be enqueued multiple times.
      queue.offer(bucket)
    }
    true
  } else {
    // 任务的过期时间不在本时间轮周期内说明需要升级时间轮,如果不存在则构造上一层时间轮,继续用上一层时间轮添加任务
    // Out of the interval. Put it into the parent timer
    if (overflowWheel == null) addOverflowWheel()
    overflowWheel.add(timerTaskEntry)
  }
}

bucket 被放入了 延时队列 DelayQueue
bucket 就是 TimerTaskList 存放了同一 时间格/单位 里的所有延时任务
TimerTaskEntry 是一个延时任务,被放到 TimerTaskList 集合

(2.3.2) 执行时间到的延时任务

// timeout timer
private[this] val taskExecutor = Executors.newFixedThreadPool(1,
  (runnable: Runnable) => KafkaThread.nonDaemon("executor-" + executorName, runnable))

private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = {
  if (!timingWheel.add(timerTaskEntry)) {
    // Already expired or cancelled
    if (!timerTaskEntry.cancelled)
      taskExecutor.submit(timerTaskEntry.timerTask)
  }
}

可以看到其实是通过线程池启动一个固定线程去执行延时任务

(2.3.x) kafka.utils.timer.Timer

package kafka.utils.timer

import java.util.concurrent.{DelayQueue, Executors, TimeUnit}
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.locks.ReentrantReadWriteLock

import kafka.utils.threadsafe
import org.apache.kafka.common.utils.{KafkaThread, Time}

trait Timer {
  /** 
    * 添加新任务,在执行延迟任务后执行新增任务
    * Add a new task to this executor. It will be executed after the task's delay
    * (beginning from the time of submission)
    * @param timerTask the task to add
    */
  def add(timerTask: TimerTask): Unit

  /**
    * Advance the internal clock, executing any tasks whose expiration has been
    * reached within the duration of the passed timeout.
    * @param timeoutMs
    * @return whether or not any tasks were executed
    */
  def advanceClock(timeoutMs: Long): Boolean

  /** 
    * 获取待执行的任务数
    * Get the number of tasks pending execution  
    * @return the number of tasks
    */
  def size: Int

  /** 
    * 关闭计时器服务,保留未执行的任务
    * Shutdown the timer service, leaving pending tasks unexecuted
    */
  def shutdown(): Unit
}

scala 中的 trait 相当于 java中的 interface

可以看到 Timer接口里定义了 新加任务、提前执行、获取待执行的任务数、关闭计时器 的方法

kafka的延迟队列使用时间轮实现,能够支持大量任务的高效触发,但是在kafka延迟队列实现方案里还是看到了delayQueue的影子,使用delayQueue是对时间轮里面的bucket放入延迟队列,以此来推动时间轮滚动,但是基于将插入和删除操作则放入时间轮中,将这些操作的时间复杂度都降为O(1),提升效率。

在 RPC 框架中,只要涉及到定时任务,我们都可以应用时钟轮,比较典型的就是调用端的超时处理、调用端与服务端的启动超时以及定时心跳等等。

60s超时,就创建一个index从0到60的环形队列(本质是个数组)
环上每一个slot是一个Set,任务集合
同时还有一个Map<tid, index>,记录tid落在环上的哪个slot里

当有一个任务时:

新增任务
将tid加入到slot中,具体是哪一个slot呢 => Current Index + 超时时间
更新Map,这个tid对应slot的index值

删除任务
Current Index每秒种移动一个slot,这个slot对应的Set中所有tid都应该被集体超时

这样基于时间轮可以将插入和删除操作的时间复杂度都降为O(1)

References

[1] 时间轮在Kafka的实践
[2] 20 | 详解时钟轮在RPC中的应用 - RPC实战与核心原理
[3] timer/TimingWheel.scala