目录
- 心跳与超时:高并发高性能的时间轮超时器
- 引言
- JDK 原生提供的超时任务支持
- java.util.Timer
- ScheduledThreadPoolExecutor
- 更高效的数据结构
- 基本原理
- 支撑更多超过范围的延迟时间
- Netty 的时间轮实现
- 接口定义
- 构建循环数组
- 新增延迟任务
- 工作线程workerThread
- 思考总结
心跳与超时:高并发高性能的时间轮超时器
引言
在许多业务场景中我们都会碰到延迟任务定时任务这种需求特别的在网络连接的场景中常常会出现一些超时控制由于服务端的连接数量很大这些超时任务的数量往往也是很庞大的实现对大量任务的超时管理并不是一个容易的事情
本章我们将介绍几种用于实现超时任务的数据结构并且最后分析 Netty 在超时任务上采取的结构和代码
欢迎加入技术交流群186233599讨论交流也欢迎关注笔者公众号:风火说
JDK 原生提供的超时任务支持
java.util.Timer
JDK 在 1.3 的时候引入了Timer
数据结构用于实现定时任务Timer
的实现思路比较简单其内部有两个主要属性:
TaskQueue
:定时任务抽象类TimeTask
的列表TimerThread
:用于执行定时任务的线程
Timer
结构还定义了一个抽象类TimerTask
并且继承了Runnable
接口业务系统实现了这个抽象类的run
方法用于提供具体的延时任务逻辑
TaskQueue
内部采用大顶堆的方式依据任务的触发时间进行排序而TimerThread
则以死循环的方式从TaskQueue
获取队列头等待队列头的任务的超时时间到达后触发该任务并且将任务从队列中移除
Timer
的数据结构和算法都很容易理解所有的超时任务都首先进入延时队列后台超时线程不断的从延迟队列中获取任务并且等待超时时间到达后执行任务延迟队列采用大顶堆排序在延迟任务的场景中有三种操作分别是:添加任务提取队列头任务查看队列头任务
查看队列头任务的事件复杂度是 O(1) 而添加任务和提取队列头任务的时间复杂度都是 O(Log2n) 当任务数量较大时添加和删除的开销也是比较大的此外由于Timer
内部只有一个处理线程如果有一个延迟任务的处理消耗了较多的时间会对应的延迟后续任务的处理
ScheduledThreadPoolExecutor
由于Timer
只有一个线程用来处理延迟任务在任务数量很多的时候显然是不足够的在 JDK1.5 引入线程池接口ExecutorService
后也对应的提供了一个用于处理延时任务的ScheduledExecutorService
子类接口该接口内部也一样使用了一个使用小顶堆进行排序的延迟队列存放任务线程池中的线程会在这个队列上等待直到有任务可以提取
ScheduledExecutorService
的实现上有一些特殊只有一个线程能够提取到延迟队列头的任务并且根据任务的超时时间进行等待在这个等待期间其他的线程是无法获取任务的这样的实现是为了避免多个线程同时获取任务导致超时时间未到达就任务触发或者在等待任务超时时间时有新的任务被加入而无法响应
由于ScheduledExecutorService
可以使用多个线程这样也缓解了因为个别任务执行时间长导致的后续任务被阻塞的情况不过延迟队列也是一样采用小顶堆的排序方式因此添加任务和删除任务的时间复杂度都是 O(Log2n) 在任务数量很大的情况下性能表现比较差
更高效的数据结构
虽然Timer
和ScheduledThreadPoolExecutor
都提供了对延迟任务的支撑能力但是由于新增任务和提取任务的时间复杂度都是 O(Log2n) 在任务数量很大比如几万十几万的时候性能的开销就变得很巨大
那么是否存在新增任务和提取任务比 O(Log2n) 复杂度更低的数据结构呢?答案是存在的在论文《Hashed and Hierarchical Timing Wheels》中设计了一种名为时间轮( Timing Wheels )的数据结构这种结构在处理延迟任务时其新增任务和删除任务的时间复杂度降低到了 O(1)
基本原理
时间轮的数据结构很类似于我们钟表上的数据指针故而得名时间轮其数据结构用图示意如下
每一个时间“格子”我们称之为槽位槽位中存放着延迟任务队列槽位本身代表着一个时间单位比如 1 秒时间轮拥有的槽位个数就是该时间轮能够处理的最大延迟跨度的任务槽位的时间单位代表着时间轮的精度这意味着小于时间单位的时间在该时间轮是无法被区分的
槽位上的延迟任务队列中的任务都有相同的延迟时间每一个单位时间指针都会移动到下一个槽位当指针指向某一个槽位时该槽位的延迟任务队列中的任务都会被触发
当有一个延迟任务要插入时间轮时首先计算其延迟时间与单位时间的余值从指针指向的当前槽位移动余值的个数槽位就是该延迟任务需要被放入的槽位
举个例子时间轮有8个槽位编号为 0 ~ 7 指针当前指向槽位 2 新增一个延迟时间为 4 秒的延迟任务4 % 8 = 4因此该任务会被插入 4 + 2 = 6也就是槽位6的延迟任务队列
时间轮的槽位实现可以采用循环数组的方式达成也就是让指针在越过数组的边界后重新回到起始下标概括来说可以将时间轮的算法描述为
用队列来存储延迟任务同一个队列中的任务其延迟时间相同用循环数组的方式来存储元素数组中的每一个元素都指向一个延迟任务队列
有一个当前指针指向数组中的某一个槽位每间隔一个单位时间指针就移动到下一个槽位被指针指向的槽位的延迟队列其中的延迟任务全部被触发
在时间轮中新增一个延迟任务将其延迟时间除以单位时间得到的余值从当前指针开始移动余值对应个数的槽位就是延迟任务被放入的槽位
基于这样的数据结构插入一个延迟任务的时间复杂度就下降到 O(1) 而当指针指向到一个槽位时该槽位连接的延迟任务队列中的延迟任务全部被触发
延迟任务的触发和执行不应该影响指针向后移动的时间精确性因此一般情况下用于移动指针的线程只负责任务的触发任务的执行交由其他的线程来完成比如可以将槽位上的延迟任务队列放入到额外的线程池中执行然后在槽位上新建一个空白的新的延迟任务队列用于后续任务的添加
支撑更多超过范围的延迟时间
在基本原理中我们分析了时间轮的基础结构不过当时我们假设需要插入的延迟任务的时间不会超过时间轮的长度也就是说每一个槽位上的延迟任务队列中的任务的延迟时间都是相同的
在这种情况下要支持更大时间跨度的延迟任务要么增加时间轮的槽位数要么减少时间轮的精度也就是每一个槽位代表的单位时间时间轮的精度显然是一个业务上的硬性要求那么只能增加槽位数假设要求精度为 1 秒要能支持延迟时间为 1 天的延迟任务时间轮的槽位数需要 60 × 60 × 24 = 86400 这就需要消耗更多的内存显然单纯增加槽位数并不是一个好的解决方案
在论文中针对大跨度的延迟任务支持提供了两种扩展方案
方案一:不同轮次的延迟任务共存相同的延迟队列
在该方案中算法引入了“轮次”的概念延迟任务的延迟时间除以时间轮长度得到的商值为轮次延迟任务的延迟时间除以时间轮长度得到的余数为要插入的槽位偏移量
当插入延迟任务时首先计算轮次和槽位偏移量通过槽位偏移量确定延迟任务插入的槽位当指针指向某一个槽位时对槽位指向的延迟任务队列进行遍历其中轮次为0的延迟任务全部触发其余任务则等待下一个周期
通过引入轮次就可以在有限的槽位上支持无穷时间范围的延迟任务但是虽然插入任务的时间复杂度仍然是 O(1) 但是在延迟任务触发时却需要遍历延迟任务队列来确认其轮次是否为0任务触发时的时间复杂却上升为了 O(n)
对于这个情况还有一个变化的细节可以采用就是将延迟任务队列按照轮次进行排序比方说使用小顶堆对延迟任务队列进行排序这样当指针指向一个槽位触发延迟任务时只需要不断的从队列头取出任务进行轮次检查一旦任务轮次不等于0就可以停止任务触发的时间复杂度下降为 O(1) 对应的由于队列是排序的了任务插入的时候除了需要定位插入的槽位还需要定位在队列中的插入位置插入的时间复杂度变化为 O(1) 和 O(Log2n) n 为该槽位上延迟任务队列的长度
方案二:多层次时间轮
看看手表的设计有秒针分针时针像秒针与分针虽然都有 60 格 但是各自的格子代表的时间长度不同参考这个思路我们可以声明多个不同层级的时间轮每一个时间轮的槽位的时间跨度是其次级时间轮的整体时间范围
当低层级的时间轮的指针完整的走完一圈其对应的高层级时间轮对应的移动一个槽位并且高层级时间轮指针指向的槽位中的任务按照延迟时间计算重新放入到低层级时间轮的不同槽位中这样的方式保证了每一个时间轮中的每一个槽位的延迟任务队列中的任务都具备相同时间精度的延迟时间
以精度为 1 秒时间范围为 1 天的时间轮为例子可以设计三级时间轮:秒级时间轮有 60 个槽位每个槽位的时间为 1 秒分钟级时间轮有 60 个槽位每个槽位的时间为 60 秒小时级时间轮有24个槽位每个槽位的时间为 60 分钟当秒级时间轮走完 60 秒后秒级时间轮的指针再次指向下标为0的槽位而分钟级时间轮的指针向后移动一个槽位并且将该槽位上的延迟任务全部取出并且重新计算后放入秒级时间轮
总共只需要 60 + 60 + 24 = 144 个槽位即可支撑对比上面提到的单级时间轮需要 86400 个槽位而言节省了相当的内存
层级时间轮有两种常见的做法:
- 固定时间范围:时间轮的个数以及不同层级的时间轮的槽位数是通过构造方法的入参指定这意味着时间轮整体能够支撑的时间范围是在构造方法的时候被确定
- 非固定时间范围:定义好一个时间轮的槽位个数以及最小的时间轮的槽位时间当插入的延迟任务的时间超过时间轮范围时则动态生成更高层级的时间轮由于时间轮是在运行期生成并且根据任务的延迟时间计算当已经存在的时间轮不满足其延迟时间范围要求时动态生成高层级时间轮因此整体能够支撑的时间范围是没有上限的
Netty 的时间轮实现
时间轮算法的核心思想就是通过循环数组和指针移动的方式将新增延迟任务的时间复杂度下降到 O(1) 但是在具体实现上包括如何处理更大时间跨度的延迟任务上各家不同的实现都会有一些细节上的变化下面我们以 Netty 中都时间轮实现为例子来进行代码分析
接口定义
Netty 的实现自定义了一个超时器的接口io.netty.util.Timer
其方法如下
public interface Timer
{
//新增一个延时任务入参为定时任务TimerTask和对应的延迟时间
Timeout newTimeout(TimerTask task, long delay, TimeUnit unit);
//停止时间轮的运行并且返回所有未被触发的延时任务
Set < Timeout > stop();
}
public interface Timeout
{
Timer timer();
TimerTask task();
boolean isExpired();
boolean isCancelled();
boolean cancel();
}
Timeout
接口是对延迟任务的一个封装其接口方法说明其实现内部需要维持该延迟任务的状态后续我们分析其实现内部代码时可以更容易的看到
Timer
接口有唯一实现HashedWheelTimer
首先来看其构造方法如下
构建循环数组
public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection, long maxPendingTimeouts)
{
//省略代码省略参数非空检查内容
wheel = createWheel(ticksPerWheel);
mask = wheel.length - 1;
//省略代码省略槽位时间范围检查避免溢出以及小于 1 毫秒
workerThread = threadFactory.newThread(worker);
//省略代码省略资源泄漏追踪设置以及时间轮实例个数检查
}
首先是方法createWheel
用于创建时间轮的核心数据结构循环数组来看下其方法内容
private static HashedWheelBucket[] createWheel(int ticksPerWheel)
{
//省略代码确认 ticksPerWheel 处于正确的区间
//将 ticksPerWheel 规范化为 2 的次方幂大小
ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
for(int i = 0; i < wheel.length; i++)
{
wheel[i] = new HashedWheelBucket();
}
return wheel;
}
数组的长度为 2 的次方幂方便进行求商和取余计算
HashedWheelBucket
内部存储着由HashedWheelTimeout
节点构成的双向链表并且存储着链表的头节点和尾结点方便于任务的提取和插入
新增延迟任务
方法HashedWheelTimer#newTimeout
用于新增延迟任务下面来看下代码
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit)
{
//省略代码用于参数检查
start();
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
if(delay > 0 && deadline < 0)
{
deadline = Long.MAX_VALUE;
}
HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
timeouts.add(timeout);
return timeout;
}
可以看到在新增任务的时候任务并不是直接进入到循环数组中而是首先被放入到一个队列也就是属性timeouts
该队列是一个 MPSC 类型的队列采用这个模式主要出于提升并发性能考虑因为这个队列只有线程workerThread
会进行任务提取操作
该线程是在构造方法中通过调用workerThread = threadFactory.newThread(worker)
被创建但是创建之后并不是马上执行线程的start
方法其启动的时机是这个时间轮第一次新增延迟任务的时候也就是本方法中的start
方法的内容下面是其代码
public void start()
{
switch(WORKER_STATE_UPDATER.get(this))
{
case WORKER_STATE_INIT:
if(WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED))
{
workerThread.start();
}
break;
case WORKER_STATE_STARTED:
break;
case WORKER_STATE_SHUTDOWN:
throw new IllegalStateException("cannot be started once stopped");
default:
throw new Error("Invalid WorkerState");
}
while(startTime == 0)
{
try
{
startTimeInitialized.await();
}
catch(InterruptedException ignore)
{
// Ignore - it will be ready very soon.
}
}
}
方法很明显的分为两个部分第一部分为Switch
方法块通过对状态变量的 CAS 操作确保只有一个线程能够执行workerThread.start()
方法来启动工作线程避免并发异常第二部分为阻塞等待通过CountDownLatch
类型变量startTimeInitialized
执行阻塞等待用于等待工作线程workerThread
真正进入工作状态
从newTimeout
方法的角度来看插入延迟任务首先是放入队列中之前分析数据结构的时候也说过任务的触发是指针指向时间轮中某个槽位时进行那么必然存在一个需要将队列中的延迟任务放入到时间轮的数组之中的工作这个动作显然就是就是由workerThread
工作线程来完成下面就来看下这个线程的具体代码内容
工作线程workerThread
工作线程是依托于HashedWheelTimer.Worker
这个实现了Runnable
接口的类进行工作的那下面看下其对run
方法的实现代码如下
public void run()
{
{//代码块①
startTime = System.nanoTime();
if(startTime == 0)
{
//使用startTime==0 作为线程进入工作状态模式标识因此这里重新赋值为1
startTime = 1;
}
//通知外部初始化工作线程的线程工作线程已经启动完毕
startTimeInitialized.countDown();
}
{//代码块②
do {
final long deadline = waitForNextTick();
if(deadline > 0)
{
int idx = (int)(tick & mask);
processCancelledTasks();
HashedWheelBucket bucket = wheel[idx];
transferTimeoutsToBuckets();
bucket.expireTimeouts(deadline);
tick++;
}
} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
}
{//代码块③
for(HashedWheelBucket bucket: wheel)
{
bucket.clearTimeouts(unprocessedTimeouts);
}
for(;;)
{
HashedWheelTimeout timeout = timeouts.poll();
if(timeout == null)
{
break;
}
if(!timeout.isCancelled())
{
unprocessedTimeouts.add(timeout);
}
}
processCancelledTasks();
}
}
线程启动与准备工作
为了方便阅读这边将run
方法的内容分为三个代码块首先来看代码块①通过系统调用System.nanoTime
为启动时间startTime
设置初始值该变量代表了时间轮的基线时间用于后续相对时间的计算赋值完毕后通过startTimeInitialized
变量对外部的等待线程进行通知
驱动指针和任务触发
接着来看代码块②这是主要的工作部分整体是在一个while
循环中确保工作线程只在时间轮没有被终止的时候工作首先来看方法waitForNextTick
在时间轮中指针移动一次称之为一个tick
这个方法显然内部应该是用于等待指针移动到下一个tick
来看具体代码如下
private long waitForNextTick()
{
long deadline = tickDuration * (tick + 1);
for(;;)
{
final long currentTime = System.nanoTime() - startTime;
long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
if(sleepTimeMs <= 0)
{
if(currentTime == Long.MIN_VALUE)
{
return -Long.MAX_VALUE;
}
else
{
return currentTime;
}
}
if(PlatformDependent.isWindows())
{
sleepTimeMs = sleepTimeMs / 10 * 10;
}
try
{
Thread.sleep(sleepTimeMs);
}
catch(InterruptedException ignored)
{
if(WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN)
{
return Long.MIN_VALUE;
}
}
}
}
整个方法的思路很简单前面说过时间轮每移动一次指针意味着一个tick
这里tick
可以看成是指针移动的次数由于槽位的时间范围是固定的因此可以简单的计算出来指针移动到下一个槽位理论上应该经过的时间也就是long deadline = tickDuration * (tick + 1)
之后再计算从时间轮启动到当前实际经过的时间也就是long currentTime = System.nanoTime() - startTime
二者的差值就是线程所需要睡眠的时间
如果差值小于0意味着实际经过的时间超过了理论时间此时已经超出了应该休眠的范围方法需要立即返回由于在这个方法的执行过程中可能会遇到时间轮被停止的情况因此使用一个特殊值来表达这个事件也就是Long.MIN_VALUE
这也是为什么currentTime
要避开这个值的原因
还有一点需要注意Thread.sleep
方法的实现是依托于操作系统提供的中断检查也就是操作系统会在每一个中断的时候去检查是否有线程需要唤醒并且提供CPU资源默认情况下 Linux 的中断间隔是 1 毫秒而 Windows 的中断间隔是 10 毫秒或者 15 毫秒具体取决于硬件识别
如果是在 Windows 平台下当方法调用Thread.sleep
传入的参数不是10的整数倍时其内部会调用系统方法timeBeginPeriod()
和timeEndPeriod()
来修改中断周期为 1 毫秒并且在休眠结束后再次设置回默认值这样的目的是为了保证休眠时间的准确性但是在 Windows 平台下频繁的调用修改中断周期会导致 Windows 时钟出现异常大多数时候的表现是导致时钟加快这将导致比如尝试休眠 10 秒时实际上只休眠了 9 秒所以在这里通过sleepTimeMs = sleepTimeMs / 10 * 10
保证了sleepTimeMs
是 10 的整数倍从而避免了 Windows 的这个 BUG
当方法waitForNextTick
返回后并且返回的值是正数意味着当前tick
的休眠等待已经完成可以进行延迟任务的触发处理了通过int idx = (int)(tick & mask)
调用确定下一个被触发延迟任务的槽位在循环数组中的下标在处理触发任务之前首先将已经取消的延迟任务从槽位所指向的延迟任务队列中删除每次调用HashedWheelTimer#newTimeout
新增延迟任务时都会返回一个Timeout
对象可以通过cancle
方法将这个延迟任务取消当执行取消动作的时候并不会直接从延迟队列中删除而是将这个对象放入到取消队列也就是HashedWheelTimer.cancelledTimeouts
属性在准备遍历槽位上延迟任务队列之前通过方法processCancelledTasks
来遍历这个取消队列将其中的延迟任务从各自槽位上的延迟任务队列中删除使用这种方式的好处在于延迟任务的删除只有一个线程会进行避免了多线程带来的并发干扰减少了开发难度
在处理完取消的延迟任务后调用方法transferTimeoutsToBuckets
来将新增延迟任务队列HashedWheelTimer.timeouts
中的延迟任务分别添加到合适其延迟时间的槽位中方法的代码很简单就是循环不断从timeouts
取出任务并且计算其延迟时间与时间轮范围的商值和余数结果分别为其轮次与槽位下标根据槽位下标将该任务添加到槽位对应的延迟任务队列中
在这里可以看到 Netty 作者对时间轮这一结构的并发设计新增任务是向 MPSC 队列新增元素实现而槽位上的延迟任务队列只有时间轮本身的线程能够进行新增和删除设计为了 SPSC 模式前者是为了提高无锁并发下的性能后者则是通过约束减少了设计难度
transferTimeoutsToBuckets
方法每次最多只会转移 100000 个延迟任务到合适的槽位中这是为了避免外部循环添加任务导致的饿死方法执行完毕后就到了槽位上延迟任务的触发处理也就是方法HashedWheelBucket#expireTimeouts
的功能方法内的逻辑也很简单遍历队列如果延迟任务的轮次不为 0则减 1否则触发任务执行方法也就是HashedWheelTimeout#expire
该方法内部依然通过 CAS 方式对状态进行更新避免方法的触发和取消之间的竞争冲突从这个方法的实现可以看到Netty 采用了轮次的方式来对超出时间轮范围的延迟时间进行支持多层级时间轮的实现相比轮次概念的实现更为复杂考虑到在网络IO应用中超出时间轮范围的场景比较少使用轮次的方式去支撑更大的时间是一个相对容易实现的方案
当需要被触发的延迟任务都被触发后通过tick
加 1 来表达指针移动到下一个槽位
时间轮停止
外部线程通过调用HashedWheelTimer#stop
方法来停止时间轮停止的方式很简单就是通过 CAS 调用来修改时间轮的状态属性而在代码块②中通过循环的方式在每一次tick
都会检查这个状态位代码块③的内容很简单遍历所有的槽位并且遍历槽位的延迟任务队列将所有未到达延迟时间并且未取消的任务都放入到一个集合中最终将这个集合返回这个集合内存储的就是所有未能执行的延迟任务
思考总结
在处理大量延迟任务的场景中时间轮是一个很高效的算法与数据结构Netty 在对时间轮的实现上在添加任务过期任务删除任务等环节进行了一些细节上的调整实际上不同中间件中都有对时间轮的一些实现各自也都有区别但是核心都是围绕在循环数组与槽位过期这个概念上不同的细节变化有各自适合的场景和考量
相关文章
猜您喜欢
LightningChart® .NET 8.5版重磅上线新年特惠
新年回馈用户新年伊始全球抢先的数据可视化图表工具LightningChart®正式发布了.Net 8.5版..- 我是用的版本是2018.3.6别的朋友运用的是2019的某个版本不过关都不影响破解下载jar包:链接:h..
- 01-30Idea破解至2089年
- 01-30双指针BFS与图论(一)
- 01-30深入理解大数据之——事务及其ACID特性
- 01-30T117897 七步洗手法 / PJT1(洛谷)
- 01-30Java内部类总结
- 01-30es6 语法