纯净、安全、绿色的下载网站

首页

当前位置:首页IT学院IT技术

心跳与超时:高并发高性能的时间轮超时器

风火1989   2020-01-30 我要评论

目录

  • 心跳与超时:高并发高性能的时间轮超时器
    • 引言
    • JDK 原生提供的超时任务支持
      • java.util.Timer
      • ScheduledThreadPoolExecutor
    • 更高效的数据结构
      • 基本原理
      • 支撑更多超过范围的延迟时间
    • Netty 的时间轮实现
      • 接口定义
      • 构建循环数组
      • 新增延迟任务
      • 工作线程workerThread
    • 思考总结

心跳与超时:高并发高性能的时间轮超时器

引言

在许多业务场景中我们都会碰到延迟任务定时任务这种需求特别的在网络连接的场景中常常会出现一些超时控制由于服务端的连接数量很大这些超时任务的数量往往也是很庞大的实现对大量任务的超时管理并不是一个容易的事情

本章我们将介绍几种用于实现超时任务的数据结构并且最后分析 Netty 在超时任务上采取的结构和代码

欢迎加入技术交流群186233599讨论交流也欢迎关注笔者公众号:风火说

JDK 原生提供的超时任务支持

java.util.Timer

JDK 在 1.3 的时候引入了Timer数据结构用于实现定时任务Timer的实现思路比较简单其内部有两个主要属性:

  • TaskQueue:定时任务抽象类TimeTask的列表
  • TimerThread:用于执行定时任务的线程

Timer结构还定义了一个抽象类TimerTask并且继承了Runnable接口业务系统实现了这个抽象类的run方法用于提供具体的延时任务逻辑

TaskQueue内部采用大顶堆的方式依据任务的触发时间进行排序而TimerThread则以死循环的方式从TaskQueue获取队列头等待队列头的任务的超时时间到达后触发该任务并且将任务从队列中移除

Timer的数据结构和算法都很容易理解所有的超时任务都首先进入延时队列后台超时线程不断的从延迟队列中获取任务并且等待超时时间到达后执行任务延迟队列采用大顶堆排序在延迟任务的场景中有三种操作分别是:添加任务提取队列头任务查看队列头任务

查看队列头任务的事件复杂度是 O(1) 而添加任务和提取队列头任务的时间复杂度都是 O(Log2n) 当任务数量较大时添加和删除的开销也是比较大的此外由于Timer内部只有一个处理线程如果有一个延迟任务的处理消耗了较多的时间会对应的延迟后续任务的处理

ScheduledThreadPoolExecutor

由于Timer只有一个线程用来处理延迟任务在任务数量很多的时候显然是不足够的在 JDK1.5 引入线程池接口ExecutorService后也对应的提供了一个用于处理延时任务的ScheduledExecutorService子类接口该接口内部也一样使用了一个使用小顶堆进行排序的延迟队列存放任务线程池中的线程会在这个队列上等待直到有任务可以提取

ScheduledExecutorService的实现上有一些特殊只有一个线程能够提取到延迟队列头的任务并且根据任务的超时时间进行等待在这个等待期间其他的线程是无法获取任务的这样的实现是为了避免多个线程同时获取任务导致超时时间未到达就任务触发或者在等待任务超时时间时有新的任务被加入而无法响应

由于ScheduledExecutorService可以使用多个线程这样也缓解了因为个别任务执行时间长导致的后续任务被阻塞的情况不过延迟队列也是一样采用小顶堆的排序方式因此添加任务和删除任务的时间复杂度都是 O(Log2n) 在任务数量很大的情况下性能表现比较差

更高效的数据结构

虽然TimerScheduledThreadPoolExecutor都提供了对延迟任务的支撑能力但是由于新增任务和提取任务的时间复杂度都是 O(Log2n) 在任务数量很大比如几万十几万的时候性能的开销就变得很巨大

那么是否存在新增任务和提取任务比 O(Log2n) 复杂度更低的数据结构呢?答案是存在的在论文《Hashed and Hierarchical Timing Wheels》中设计了一种名为时间轮( Timing Wheels )的数据结构这种结构在处理延迟任务时其新增任务和删除任务的时间复杂度降低到了 O(1) 

基本原理

时间轮的数据结构很类似于我们钟表上的数据指针故而得名时间轮其数据结构用图示意如下

每一个时间“格子”我们称之为槽位槽位中存放着延迟任务队列槽位本身代表着一个时间单位比如 1 秒时间轮拥有的槽位个数就是该时间轮能够处理的最大延迟跨度的任务槽位的时间单位代表着时间轮的精度这意味着小于时间单位的时间在该时间轮是无法被区分的

槽位上的延迟任务队列中的任务都有相同的延迟时间每一个单位时间指针都会移动到下一个槽位当指针指向某一个槽位时该槽位的延迟任务队列中的任务都会被触发

当有一个延迟任务要插入时间轮时首先计算其延迟时间与单位时间的余值从指针指向的当前槽位移动余值的个数槽位就是该延迟任务需要被放入的槽位

举个例子时间轮有8个槽位编号为 0 ~ 7 指针当前指向槽位 2 新增一个延迟时间为 4 秒的延迟任务4 % 8 = 4因此该任务会被插入 4 + 2 = 6也就是槽位6的延迟任务队列

时间轮的槽位实现可以采用循环数组的方式达成也就是让指针在越过数组的边界后重新回到起始下标概括来说可以将时间轮的算法描述为

用队列来存储延迟任务同一个队列中的任务其延迟时间相同用循环数组的方式来存储元素数组中的每一个元素都指向一个延迟任务队列

有一个当前指针指向数组中的某一个槽位每间隔一个单位时间指针就移动到下一个槽位被指针指向的槽位的延迟队列其中的延迟任务全部被触发

在时间轮中新增一个延迟任务将其延迟时间除以单位时间得到的余值从当前指针开始移动余值对应个数的槽位就是延迟任务被放入的槽位

基于这样的数据结构插入一个延迟任务的时间复杂度就下降到 O(1) 而当指针指向到一个槽位时该槽位连接的延迟任务队列中的延迟任务全部被触发

延迟任务的触发和执行不应该影响指针向后移动的时间精确性因此一般情况下用于移动指针的线程只负责任务的触发任务的执行交由其他的线程来完成比如可以将槽位上的延迟任务队列放入到额外的线程池中执行然后在槽位上新建一个空白的新的延迟任务队列用于后续任务的添加

支撑更多超过范围的延迟时间

在基本原理中我们分析了时间轮的基础结构不过当时我们假设需要插入的延迟任务的时间不会超过时间轮的长度也就是说每一个槽位上的延迟任务队列中的任务的延迟时间都是相同的

在这种情况下要支持更大时间跨度的延迟任务要么增加时间轮的槽位数要么减少时间轮的精度也就是每一个槽位代表的单位时间时间轮的精度显然是一个业务上的硬性要求那么只能增加槽位数假设要求精度为 1 秒要能支持延迟时间为 1 天的延迟任务时间轮的槽位数需要 60 × 60 × 24 = 86400 这就需要消耗更多的内存显然单纯增加槽位数并不是一个好的解决方案

在论文中针对大跨度的延迟任务支持提供了两种扩展方案

方案一:不同轮次的延迟任务共存相同的延迟队列

在该方案中算法引入了“轮次”的概念延迟任务的延迟时间除以时间轮长度得到的商值为轮次延迟任务的延迟时间除以时间轮长度得到的余数为要插入的槽位偏移量

当插入延迟任务时首先计算轮次和槽位偏移量通过槽位偏移量确定延迟任务插入的槽位当指针指向某一个槽位时对槽位指向的延迟任务队列进行遍历其中轮次为0的延迟任务全部触发其余任务则等待下一个周期

通过引入轮次就可以在有限的槽位上支持无穷时间范围的延迟任务但是虽然插入任务的时间复杂度仍然是 O(1) 但是在延迟任务触发时却需要遍历延迟任务队列来确认其轮次是否为0任务触发时的时间复杂却上升为了 O(n) 

对于这个情况还有一个变化的细节可以采用就是将延迟任务队列按照轮次进行排序比方说使用小顶堆对延迟任务队列进行排序这样当指针指向一个槽位触发延迟任务时只需要不断的从队列头取出任务进行轮次检查一旦任务轮次不等于0就可以停止任务触发的时间复杂度下降为 O(1) 对应的由于队列是排序的了任务插入的时候除了需要定位插入的槽位还需要定位在队列中的插入位置插入的时间复杂度变化为 O(1) 和 O(Log2n) n 为该槽位上延迟任务队列的长度

方案二:多层次时间轮

看看手表的设计有秒针分针时针像秒针与分针虽然都有 60 格 但是各自的格子代表的时间长度不同参考这个思路我们可以声明多个不同层级的时间轮每一个时间轮的槽位的时间跨度是其次级时间轮的整体时间范围

当低层级的时间轮的指针完整的走完一圈其对应的高层级时间轮对应的移动一个槽位并且高层级时间轮指针指向的槽位中的任务按照延迟时间计算重新放入到低层级时间轮的不同槽位中这样的方式保证了每一个时间轮中的每一个槽位的延迟任务队列中的任务都具备相同时间精度的延迟时间

以精度为 1 秒时间范围为 1 天的时间轮为例子可以设计三级时间轮:秒级时间轮有 60 个槽位每个槽位的时间为 1 秒分钟级时间轮有 60 个槽位每个槽位的时间为 60 秒小时级时间轮有24个槽位每个槽位的时间为 60 分钟当秒级时间轮走完 60 秒后秒级时间轮的指针再次指向下标为0的槽位而分钟级时间轮的指针向后移动一个槽位并且将该槽位上的延迟任务全部取出并且重新计算后放入秒级时间轮

总共只需要 60 + 60 + 24 = 144 个槽位即可支撑对比上面提到的单级时间轮需要 86400 个槽位而言节省了相当的内存

层级时间轮有两种常见的做法:

  • 固定时间范围:时间轮的个数以及不同层级的时间轮的槽位数是通过构造方法的入参指定这意味着时间轮整体能够支撑的时间范围是在构造方法的时候被确定
  • 非固定时间范围:定义好一个时间轮的槽位个数以及最小的时间轮的槽位时间当插入的延迟任务的时间超过时间轮范围时则动态生成更高层级的时间轮由于时间轮是在运行期生成并且根据任务的延迟时间计算当已经存在的时间轮不满足其延迟时间范围要求时动态生成高层级时间轮因此整体能够支撑的时间范围是没有上限的

Netty 的时间轮实现

时间轮算法的核心思想就是通过循环数组和指针移动的方式将新增延迟任务的时间复杂度下降到 O(1) 但是在具体实现上包括如何处理更大时间跨度的延迟任务上各家不同的实现都会有一些细节上的变化下面我们以 Netty 中都时间轮实现为例子来进行代码分析

接口定义

Netty 的实现自定义了一个超时器的接口io.netty.util.Timer其方法如下

public interface Timer
{
    //新增一个延时任务入参为定时任务TimerTask和对应的延迟时间
    Timeout newTimeout(TimerTask task, long delay, TimeUnit unit);
    //停止时间轮的运行并且返回所有未被触发的延时任务
    Set < Timeout > stop();
}
public interface Timeout
{
    Timer timer();
    TimerTask task();
    boolean isExpired();
    boolean isCancelled();
    boolean cancel();
}

Timeout接口是对延迟任务的一个封装其接口方法说明其实现内部需要维持该延迟任务的状态后续我们分析其实现内部代码时可以更容易的看到

Timer接口有唯一实现HashedWheelTimer首先来看其构造方法如下

构建循环数组

public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection, long maxPendingTimeouts)
{
    //省略代码省略参数非空检查内容
    wheel = createWheel(ticksPerWheel);
    mask = wheel.length - 1;
    //省略代码省略槽位时间范围检查避免溢出以及小于 1 毫秒
    workerThread = threadFactory.newThread(worker);
    //省略代码省略资源泄漏追踪设置以及时间轮实例个数检查
}

首先是方法createWheel用于创建时间轮的核心数据结构循环数组来看下其方法内容

private static HashedWheelBucket[] createWheel(int ticksPerWheel)
{
    //省略代码确认 ticksPerWheel 处于正确的区间
    //将 ticksPerWheel 规范化为 2 的次方幂大小
    ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
    HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
    for(int i = 0; i < wheel.length; i++)
    {
        wheel[i] = new HashedWheelBucket();
    }
    return wheel;
}

数组的长度为 2 的次方幂方便进行求商和取余计算

HashedWheelBucket内部存储着由HashedWheelTimeout节点构成的双向链表并且存储着链表的头节点和尾结点方便于任务的提取和插入

新增延迟任务

方法HashedWheelTimer#newTimeout用于新增延迟任务下面来看下代码

public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit)
{
    //省略代码用于参数检查
    start();
    long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
    if(delay > 0 && deadline < 0)
    {
        deadline = Long.MAX_VALUE;
    }
    HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
    timeouts.add(timeout);
    return timeout;
}

可以看到在新增任务的时候任务并不是直接进入到循环数组中而是首先被放入到一个队列也就是属性timeouts该队列是一个 MPSC 类型的队列采用这个模式主要出于提升并发性能考虑因为这个队列只有线程workerThread会进行任务提取操作

该线程是在构造方法中通过调用workerThread = threadFactory.newThread(worker)被创建但是创建之后并不是马上执行线程的start方法其启动的时机是这个时间轮第一次新增延迟任务的时候也就是本方法中的start方法的内容下面是其代码

public void start()
{
    switch(WORKER_STATE_UPDATER.get(this))
    {
        case WORKER_STATE_INIT:
            if(WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED))
            {
                workerThread.start();
            }
            break;
        case WORKER_STATE_STARTED:
            break;
        case WORKER_STATE_SHUTDOWN:
            throw new IllegalStateException("cannot be started once stopped");
        default:
            throw new Error("Invalid WorkerState");
    }
    while(startTime == 0)
    {
        try
        {
            startTimeInitialized.await();
        }
        catch(InterruptedException ignore)
        {
            // Ignore - it will be ready very soon.
        }
    }
}

方法很明显的分为两个部分第一部分为Switch方法块通过对状态变量的 CAS 操作确保只有一个线程能够执行workerThread.start()方法来启动工作线程避免并发异常第二部分为阻塞等待通过CountDownLatch类型变量startTimeInitialized执行阻塞等待用于等待工作线程workerThread真正进入工作状态

newTimeout方法的角度来看插入延迟任务首先是放入队列中之前分析数据结构的时候也说过任务的触发是指针指向时间轮中某个槽位时进行那么必然存在一个需要将队列中的延迟任务放入到时间轮的数组之中的工作这个动作显然就是就是由workerThread工作线程来完成下面就来看下这个线程的具体代码内容

工作线程workerThread

工作线程是依托于HashedWheelTimer.Worker这个实现了Runnable接口的类进行工作的那下面看下其对run方法的实现代码如下

public void run()
{
    {//代码块①
        startTime = System.nanoTime();
        if(startTime == 0)
        {
            //使用startTime==0 作为线程进入工作状态模式标识因此这里重新赋值为1
            startTime = 1;
        }
        //通知外部初始化工作线程的线程工作线程已经启动完毕
        startTimeInitialized.countDown();
    }
    {//代码块②
        do {
            final long deadline = waitForNextTick();
            if(deadline > 0)
            {
                int idx = (int)(tick & mask);
                processCancelledTasks();
                HashedWheelBucket bucket = wheel[idx];
                transferTimeoutsToBuckets();
                bucket.expireTimeouts(deadline);
                tick++;
            }
        } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
    }
    {//代码块③
        for(HashedWheelBucket bucket: wheel)
        {
            bucket.clearTimeouts(unprocessedTimeouts);
        }
        for(;;)
        {
            HashedWheelTimeout timeout = timeouts.poll();
            if(timeout == null)
            {
                break;
            }
            if(!timeout.isCancelled())
            {
                unprocessedTimeouts.add(timeout);
            }
        }
        processCancelledTasks();
    }
}

线程启动与准备工作

为了方便阅读这边将run方法的内容分为三个代码块首先来看代码块①通过系统调用System.nanoTime为启动时间startTime设置初始值该变量代表了时间轮的基线时间用于后续相对时间的计算赋值完毕后通过startTimeInitialized变量对外部的等待线程进行通知

驱动指针和任务触发

接着来看代码块②这是主要的工作部分整体是在一个while循环中确保工作线程只在时间轮没有被终止的时候工作首先来看方法waitForNextTick在时间轮中指针移动一次称之为一个tick这个方法显然内部应该是用于等待指针移动到下一个tick来看具体代码如下

private long waitForNextTick()
{
    long deadline = tickDuration * (tick + 1);
    for(;;)
    {
        final long currentTime = System.nanoTime() - startTime;
        long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
        if(sleepTimeMs <= 0)
        {
            if(currentTime == Long.MIN_VALUE)
            {
                return -Long.MAX_VALUE;
            }
            else
            {
                return currentTime;
            }
        }
        if(PlatformDependent.isWindows())
        {
            sleepTimeMs = sleepTimeMs / 10 * 10;
        }
        try
        {
            Thread.sleep(sleepTimeMs);
        }
        catch(InterruptedException ignored)
        {
            if(WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN)
            {
                return Long.MIN_VALUE;
            }
        }
    }
}

整个方法的思路很简单前面说过时间轮每移动一次指针意味着一个tick这里tick可以看成是指针移动的次数由于槽位的时间范围是固定的因此可以简单的计算出来指针移动到下一个槽位理论上应该经过的时间也就是long deadline = tickDuration * (tick + 1) 之后再计算从时间轮启动到当前实际经过的时间也就是long currentTime = System.nanoTime() - startTime 二者的差值就是线程所需要睡眠的时间

如果差值小于0意味着实际经过的时间超过了理论时间此时已经超出了应该休眠的范围方法需要立即返回由于在这个方法的执行过程中可能会遇到时间轮被停止的情况因此使用一个特殊值来表达这个事件也就是Long.MIN_VALUE这也是为什么currentTime要避开这个值的原因

还有一点需要注意Thread.sleep方法的实现是依托于操作系统提供的中断检查也就是操作系统会在每一个中断的时候去检查是否有线程需要唤醒并且提供CPU资源默认情况下 Linux 的中断间隔是 1 毫秒而 Windows 的中断间隔是 10 毫秒或者 15 毫秒具体取决于硬件识别

如果是在 Windows 平台下当方法调用Thread.sleep传入的参数不是10的整数倍时其内部会调用系统方法timeBeginPeriod()timeEndPeriod()来修改中断周期为 1 毫秒并且在休眠结束后再次设置回默认值这样的目的是为了保证休眠时间的准确性但是在 Windows 平台下频繁的调用修改中断周期会导致 Windows 时钟出现异常大多数时候的表现是导致时钟加快这将导致比如尝试休眠 10 秒时实际上只休眠了 9 秒所以在这里通过sleepTimeMs = sleepTimeMs / 10 * 10保证了sleepTimeMs 是 10 的整数倍从而避免了 Windows 的这个 BUG 

当方法waitForNextTick返回后并且返回的值是正数意味着当前tick的休眠等待已经完成可以进行延迟任务的触发处理了通过int idx = (int)(tick & mask)调用确定下一个被触发延迟任务的槽位在循环数组中的下标在处理触发任务之前首先将已经取消的延迟任务从槽位所指向的延迟任务队列中删除每次调用HashedWheelTimer#newTimeout新增延迟任务时都会返回一个Timeout对象可以通过cancle方法将这个延迟任务取消当执行取消动作的时候并不会直接从延迟队列中删除而是将这个对象放入到取消队列也就是HashedWheelTimer.cancelledTimeouts属性在准备遍历槽位上延迟任务队列之前通过方法processCancelledTasks来遍历这个取消队列将其中的延迟任务从各自槽位上的延迟任务队列中删除使用这种方式的好处在于延迟任务的删除只有一个线程会进行避免了多线程带来的并发干扰减少了开发难度

在处理完取消的延迟任务后调用方法transferTimeoutsToBuckets来将新增延迟任务队列HashedWheelTimer.timeouts中的延迟任务分别添加到合适其延迟时间的槽位中方法的代码很简单就是循环不断从timeouts取出任务并且计算其延迟时间与时间轮范围的商值和余数结果分别为其轮次与槽位下标根据槽位下标将该任务添加到槽位对应的延迟任务队列中

在这里可以看到 Netty 作者对时间轮这一结构的并发设计新增任务是向 MPSC 队列新增元素实现而槽位上的延迟任务队列只有时间轮本身的线程能够进行新增和删除设计为了 SPSC 模式前者是为了提高无锁并发下的性能后者则是通过约束减少了设计难度

transferTimeoutsToBuckets方法每次最多只会转移 100000 个延迟任务到合适的槽位中这是为了避免外部循环添加任务导致的饿死方法执行完毕后就到了槽位上延迟任务的触发处理也就是方法HashedWheelBucket#expireTimeouts的功能方法内的逻辑也很简单遍历队列如果延迟任务的轮次不为 0则减 1否则触发任务执行方法也就是HashedWheelTimeout#expire该方法内部依然通过 CAS 方式对状态进行更新避免方法的触发和取消之间的竞争冲突从这个方法的实现可以看到Netty 采用了轮次的方式来对超出时间轮范围的延迟时间进行支持多层级时间轮的实现相比轮次概念的实现更为复杂考虑到在网络IO应用中超出时间轮范围的场景比较少使用轮次的方式去支撑更大的时间是一个相对容易实现的方案

当需要被触发的延迟任务都被触发后通过tick加 1 来表达指针移动到下一个槽位

时间轮停止

外部线程通过调用HashedWheelTimer#stop方法来停止时间轮停止的方式很简单就是通过 CAS 调用来修改时间轮的状态属性而在代码块②中通过循环的方式在每一次tick都会检查这个状态位代码块③的内容很简单遍历所有的槽位并且遍历槽位的延迟任务队列将所有未到达延迟时间并且未取消的任务都放入到一个集合中最终将这个集合返回这个集合内存储的就是所有未能执行的延迟任务

思考总结

在处理大量延迟任务的场景中时间轮是一个很高效的算法与数据结构Netty 在对时间轮的实现上在添加任务过期任务删除任务等环节进行了一些细节上的调整实际上不同中间件中都有对时间轮的一些实现各自也都有区别但是核心都是围绕在循环数组与槽位过期这个概念上不同的细节变化有各自适合的场景和考量


相关文章

猜您喜欢

网友评论

Copyright 2022 版权所有 软件发布

声明:所有软件和文章来自软件开发商或者作者 如有异议 请与本站联系 联系我们