资讯专栏INFORMATION COLUMN

180621-一个简单的时间窗口设计与实现

XiNGRZ / 3122人阅读

摘要:如何设计一个计数的时间窗口时间窗口,通常对于一些实时信息展示中用得比较多,比如维持一个五分钟的交易明细时间窗口,就需要记录当前时间,到五分钟之前的所有交易明细,而五分钟之前的数据,则丢掉一个简单的实现就是用一个队列来做,新的数据在对头添加同

如何设计一个计数的时间窗口

时间窗口,通常对于一些实时信息展示中用得比较多,比如维持一个五分钟的交易明细时间窗口,就需要记录当前时间,到五分钟之前的所有交易明细,而五分钟之前的数据,则丢掉

一个简单的实现就是用一个队列来做,新的数据在对头添加;同时起一个线程,不断的询问队尾的数据是否过期,如果过期则丢掉

另外一中场景需要利用到这个时间窗口内的数据进行计算,如计算着五分钟交易中资金的流入流出总和,如果依然用上面的这种方式,会有什么问题?

如果时间窗口大,需要存储大量的明细数据

我们主要关心的只有资金流入流出;存这些明细数据得不偿失

每次新增or删除过期数据时,实时计算流入流出消耗性能

针对这种特殊的场景,是否有什么取巧的实现方式呢?

I. 方案设计 1. 基于队列的轮询删除方式

将时间窗口分割成一个一个的时间片,每个时间片中记录资金的流入流出总数,然后总的流入流出就是所有时间片的流入流出的和

新增数据:

若未跨时间片,则更新队头的值

若跨时间片,新增一个队列头

删除数据:

轮询任务,判断队列尾是否过期

队尾过期,则删除队尾,此时若队头数据未加入计算,也需要加入计算

2. 基于队列的新增时删除方式

相比较前面的轮询方式,这个的应用场景为另外一种,只有在新增数据时,确保数据的准确性即可,不需要轮询的任务去删除过期的数据

简单来说,某些场景下(比如能确保数据不会断续的进来,即每个时间片都至少有一个数据过来),此时希望我的时间窗口数据是由新增的数据来驱动并更新

新增数据:

未跨时间片,则更新队头值

跨时间片,新塞入一个,并删除旧的数据

II. 基于数组的时间窗口实现

针对上面第二种,基于数组给出一个简单的实现,本篇主要是给出一个基础的时间窗口的设计与实现方式,当然也需要有进阶的case,比如上面的资金流入流出中,我需要分别计算5min,10min,30min,1h,3h,6h,12h,24h的时间窗口,该怎么来实现呢?能否用一个队列就满足所有的时间窗口的计算呢?关于这些留待下一篇给出

1. 时间轮计算器

前面用队列的方式比较好理解,这里为什么用数组方式来实现?

固定长度,避免频繁的新增和删除对象

定位和更新数据方便

首先是需要实现一个时间轮计算器,根据传入的时间,获取需要删除的过期数据

@Data
public class TimeWheelCalculate {
    private static final long START = 0;

    private int period;
    private int length;

    /**
     * 划分的时间片个数
     */
    private int cellNum;

    private void check() {
        if (length % period != 0) {
            throw new IllegalArgumentException(
                    "length % period should be zero but not! now length: " + length + " period: " + period);
        }
    }

    public TimeWheelCalculate(int period, int length) {
        this.period = period;
        this.length = length;

        check();

        this.cellNum = length / period;
    }

    public int calculateIndex(long time) {
        return (int) ((time - START) % length / period);
    }

    /**
     * 获取所有过期的时间片索引
     *
     * @param lastInsertTime 上次更新时间轮的时间戳
     * @param nowInsertTime  本次更新时间轮的时间戳
     * @return
     */
    public List getExpireIndexes(long lastInsertTime, long nowInsertTime) {
        if (nowInsertTime - lastInsertTime >= length) {
            // 已经过了一轮,过去的数据全部丢掉
            return null;
        }

        List removeIndexList = new ArrayList<>();
        int lastIndex = calculateIndex(lastInsertTime);
        int nowIndex = calculateIndex(nowInsertTime);
        if (lastIndex == nowIndex) {
            // 还没有跨过这个时间片,则不需要删除过期数据
            return Collections.emptyList();
        } else if (lastIndex < nowIndex) {
            for (int tmp = lastIndex; tmp < nowIndex; tmp++) {
                removeIndexList.add(tmp);
            }
        } else {
            for (int tmp = lastIndex; tmp < cellNum; tmp++) {
                removeIndexList.add(tmp);
            }

            for (int tmp = 0; tmp < nowIndex; tmp++) {
                removeIndexList.add(tmp);
            }
        }
        return removeIndexList;
    }
}

这个计算器的实现比较简单,首先是指定时间窗口的长度(length),时间片(period),其主要提供两个方法

calculateIndex 根据当前时间,确定过期的数据在数组的索引

getExpireIndexes 根据上次插入的时间,和当前插入的时间,计算两次插入时间之间,所有的过期数据索引

2. 时间轮容器

容器内保存的时间窗口下的数据,包括实时数据,和过去n个时间片的数组,其主要的核心就是在新增数据时,需要判断

若跨时间片,则删除过期数据,更新实时数据,更新总数

若未跨时间片,则直接更新实时数据即可

@Data
public class TimeWheelContainer {
    private TimeWheelCalculate calculate;

    /**
     * 历史时间片计数,每个时间片对应其中的一个元素
     */
    private int[] counts;

    /**
     * 实时的时间片计数
     */
    private int realTimeCount;

    /**
     * 整个时间轮计数
     */
    private int timeWheelCount;

    private Long lastInsertTime;


    public TimeWheelContainer(TimeWheelCalculate calculate) {
        this.counts = new int[calculate.getCellNum()];
        this.calculate = calculate;
        this.realTimeCount = 0;
        this.timeWheelCount = 0;
        this.lastInsertTime = null;
    }

    public void add(long now, int amount) {
        if (lastInsertTime == null) {
            realTimeCount = amount;
            lastInsertTime = now;
            return;
        }


        List removeIndex = calculate.getExpireIndexes(lastInsertTime, now);
        if (removeIndex == null) {
            // 两者时间间隔超过一轮,则清空计数
            realTimeCount = amount;
            lastInsertTime = now;
            timeWheelCount = 0;
            clear();
            return;
        }

        if (removeIndex.isEmpty()) {
            // 没有跨过时间片,则只更新实时计数
            realTimeCount += amount;
            lastInsertTime = now;
            return;
        }

        // 跨过了时间片,则需要在总数中删除过期的数据,并追加新的数据
        for (int index : removeIndex) {
            timeWheelCount -= counts[index];
            counts[index] = 0;
        }
        timeWheelCount += realTimeCount;
        counts[calculate.calculateIndex(lastInsertTime)] = realTimeCount;
        lastInsertTime = now;
        realTimeCount = amount;
    }

    private void clear() {
        for (int i = 0; i < counts.length; i++) {
            counts[i] = 0;
        }
    }
}
3. 测试

主要就是验证上面的实现有没有明显的问题,为什么是明显的问题?

深层次的bug在实际的使用中,更容易暴露

public class CountTimeWindow {

    public static void main(String[] args) {
        TimeWheelContainer timeWheelContainer = new TimeWheelContainer(new TimeWheelCalculate(2, 20));

        timeWheelContainer.add(0, 1);
        Assert.isTrue(timeWheelContainer.getTimeWheelCount() == 0, "first");

        timeWheelContainer.add(1, 1);
        Assert.isTrue(timeWheelContainer.getTimeWheelCount() == 0, "first");

        timeWheelContainer.add(2, 1);
        Assert.isTrue(timeWheelContainer.getTimeWheelCount() == 2, "second");
        Assert.isTrue(timeWheelContainer.getCounts()[0] == 2, "second");

        for (int i = 3; i < 20; i++) {
            timeWheelContainer.add(i, 1);
            System.out.println("add index: " + i + " count: " + timeWheelContainer.getTimeWheelCount());
        }

        // 刚好一轮

        timeWheelContainer.add(20, 3);
        Assert.isTrue(timeWheelContainer.getTimeWheelCount() == 20, "third");
        timeWheelContainer.add(21, 3);
        Assert.isTrue(timeWheelContainer.getTimeWheelCount() == 20, "third");


        // 减去过期的那个数据
        timeWheelContainer.add(22, 3);
        Assert.isTrue(timeWheelContainer.getTimeWheelCount() == 26 - 2, "fourth");
        Assert.isTrue(timeWheelContainer.getCounts()[0] == 6, "fourth");


        timeWheelContainer.add(26, 3);
        Assert.isTrue(timeWheelContainer.getTimeWheelCount() == 24 - 2 - 2 + 3, "fifth");
        System.out.println(Arrays.toString(timeWheelContainer.getCounts()));


        timeWheelContainer.add(43, 3);
        System.out.println(Arrays.toString(timeWheelContainer.getCounts()));
        Assert.isTrue(timeWheelContainer.getTimeWheelCount() == 6, "six");
    }
}
III. 其他 1. 一灰灰Blog: https://liuyueyi.github.io/he...

一灰灰的个人博客,记录所有学习和工作中的博文,欢迎大家前去逛逛

2. 声明

尽信书则不如,已上内容,纯属一家之言,因个人能力有限,难免有疏漏和错误之处,如发现bug或者有更好的建议,欢迎批评指正,不吝感激

微博地址: 小灰灰Blog

QQ: 一灰灰/3302797840

3. 扫描关注

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/71290.html

相关文章

  • “怎么做好云迁移”? 深蓝云海资深架构师给你答案

    摘要:基于云迁移的三个阶段细分为八个主要步骤,评估阶段主要包括项目启动现状梳理以及应用系统关联关系分析三个步骤,设计阶段包括云架构优化设计和云迁移方案设计,实施阶段包括目标架构迁移演练及实施和试运行三个步骤。 在云计算市场规模不断扩大的大背景下,云迁移的需求越来越大且面临挑战。云迁移不是一个迁移软件工具,而是一种服务。前IBM资深架构师姜亚杰从云迁移的三个阶段、四个维度到八个步骤的方法,简述...

    kk_miles 评论0 收藏0
  • TBSSQL 那些事 | TiDB Hackathon 2018 优秀项目分享

    摘要:当我们正准备做前期调研和设计的时候,主办方把唐长老拉去做现场导师,参赛规则规定导师不能下场比赛,囧,于是就这样被被动放了鸽子。川总早早来到现场。 本文作者是来自 TiBoys 队的崔秋同学,他们的项目 TBSSQL 在 TiDB Hackathon 2018 中获得了一等奖。TiDB Batch and Streaming SQL(简称 TBSSQL)扩展了 TiDB 的 SQL 引擎...

    KnewOne 评论0 收藏0
  • 限流器及Guava实现分析

    摘要:计数限流算法无论固定窗口还是滑动窗口核心均是对请求进行计数,区别仅仅在于对于计数时间区间的处理。令牌桶限流实现原理令牌桶限流的实现原理在有详细说明。因此由此为入口进行分析。目前可返回的实现子类包括及两种,具体不同下文详细分析。 限流 限流一词常用于计算机网络之中,定义如下: In computer networks, rate limiting is used to control t...

    xcc3641 评论0 收藏0
  • vivo统一告警平台设计实践

    摘要:告警当一个问题通过告警系统将消息以短信电话邮件等方式告知给用户时,我们称之为一条告警。图统一告警系统结构图告警收敛对于告警平台每天会产生数以万计的告警,这些告警对于运维或开发人员都需要去分析甄别优先级并处理故障。 一、背景一套监控系统检测和告警是密不可分的,检测用来发现异常,告警用来将问题信息发送给相应的人。v...

    Rocko 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<