资讯专栏INFORMATION COLUMN

徒手撸框架--高并发环境下的请求合并

刘东 / 416人阅读

摘要:我们就可以将这些请求合并,达到一定数量我们统一提交。总结一个比较生动的例子给大家讲解了一些多线程的具体运用。学习多线程应该多思考多动手,才会有比较好的效果。地址徒手撸框架系列文章地址徒手撸框架实现徒手撸框架实现

原文地址:https://www.xilidou.com/2018/01/22/merge-request/

在高并发系统中,我们经常遇到这样的需求:系统产生大量的请求,但是这些请求实时性要求不高。我们就可以将这些请求合并,达到一定数量我们统一提交。最大化的利用系统性IO,提升系统的吞吐性能。

所以请求合并框架需要考虑以下两个需求:

当请求收集到一定数量时提交数据

一段时间后如果请求没有达到指定的数量也进行提交

我们就聊聊一如何实现这样一个需求。

阅读这篇文章你将会了解到:

ScheduledThreadPoolExecutor

阻塞队列

线程安全的参数

LockSuppor的使用

设计思路和实现

我们就聊一聊实现这个东西的具体思路是什么。希望大家能够学习到分析问题,设计模块的一些套路。

底层使用什么数据结构来持有需要合并的请求?

既然我们的系统是在高并发的环境下使用,那我们肯定不能使用,普通的ArrayList来持有。我们可以使用阻塞队列来持有需要合并的请求。

我们的数据结构需要提供一个 add() 的方法给外部,用于提交数据。当外部add数据以后,需要检查队列里面的数据的个数是否达到我们限额?达到数量提交数据,不达到继续等待。

数据结构还需要提供一个timeOut()的方法,外部有一个计时器定时调用这个timeOut方法,如果方法被调用,则直接向远程提交数据。

条件满足的时候线程执行提交动作,条件不满足的时候线程应当暂停,等待队列达到提交数据的条件。所以我们可以考虑使用 LockSuppor.park()LockSuppor.unpark 来暂停和激活操作线程。

经过上面的分析,我们就有了这样一个数据结构:

private static class FlushThread implements Runnable{

        private final String name;

        //队列大小
        private final int bufferSize;
        //操作间隔
        private int flushInterval;

        //上一次提交的时间。
        private volatile long lastFlushTime;
        private volatile Thread writer;

        //持有数据的阻塞队列
        private final BlockingQueue queue;

        //达成条件后具体执行的方法
        private final Processor processor;

        //构造函数
        public FlushThread(String name, int bufferSize, int flushInterval,int queueSize,Processor processor) {
            this.name = name;
            this.bufferSize = bufferSize;
            this.flushInterval = flushInterval;
            this.lastFlushTime = System.currentTimeMillis();
            this.processor = processor;

            this.queue = new ArrayBlockingQueue<>(queueSize);

        }

        //外部提交数据的方法
        public boolean add(Item item){
            boolean result = queue.offer(item);
            flushOnDemand();
            return result;
        }

        //提供给外部的超时方法
        public void timeOut(){
            //超过两次提交超过时间间隔
            if(System.currentTimeMillis() - lastFlushTime >= flushInterval){
                start();
            }
        }
        
        //解除线程的阻塞
        private void start(){
            LockSupport.unpark(writer);
        }

        //当前的数据是否大于提交的条件
        private void flushOnDemand(){
            if(queue.size() >= bufferSize){
                start();
            }
        }

        //执行提交数据的方法
        public void flush(){
            lastFlushTime = System.currentTimeMillis();
            List temp = new ArrayList<>(bufferSize);
            int size = queue.drainTo(temp,bufferSize);
            if(size > 0){
                try {
                    processor.process(temp);
                }catch (Throwable e){
                    log.error("process error",e);
                }
            }
        }

        //根据数据的尺寸和时间间隔判断是否提交
        private boolean canFlush(){
            return queue.size() > bufferSize || System.currentTimeMillis() - lastFlushTime > flushInterval;
        }

        @Override
        public void run() {
            writer = Thread.currentThread();
            writer.setName(name);

            while (!writer.isInterrupted()){
                while (!canFlush()){
                    //如果线程没有被打断,且不达到执行的条件,则阻塞线程
                    LockSupport.park(this);
                }
                flush();
            }

        }

    }

如何实现定时提交呢?

通常我们遇到定时相关的需求,首先想到的应该是使用 ScheduledThreadPoolExecutor定时来调用FlushThread 的 timeOut 方法,如果你想到的是 Thread.sleep()...那需要再努力学习,多看源码了。

怎样进一步的提升系统的吞吐量?

我们使用的FlushThread 实现了 Runnable 所以我们可以考虑使用线程池来持有多个FlushThread

所以我们就有这样的代码:

public class Flusher {

    private final FlushThread[] flushThreads;

    private AtomicInteger index;

    //防止多个线程同时执行。增加一个随机数间隔
    private static final Random r = new Random();

    private static final int delta = 50;

    private static ScheduledExecutorService TIMER = new ScheduledThreadPoolExecutor(1);

    private static ExecutorService POOL = Executors.newCachedThreadPool();

    public Flusher(String name,int bufferSiz,int flushInterval,int queueSize,int threads,Processor processor) {

        this.flushThreads = new FlushThread[threads];


        if(threads > 1){
            index = new AtomicInteger();
        }

        for (int i = 0; i < threads; i++) {
            final FlushThread flushThread = new FlushThread(name+ "-" + i,bufferSiz,flushInterval,queueSize,processor);
            flushThreads[i] = flushThread;
            POOL.submit(flushThread);
            //定时调用 timeOut()方法。
            TIMER.scheduleAtFixedRate(flushThread::timeOut, r.nextInt(delta), flushInterval, TimeUnit.MILLISECONDS);
        }
    }

    // 对 index 取模,保证多线程都能被add
    public boolean add(Item item){
        int len = flushThreads.length;
        if(len == 1){
            return flushThreads[0].add(item);
        }

        int mod = index.incrementAndGet() % len;
        return flushThreads[mod].add(item);

    }

    //上文已经描述
    private static class FlushThread implements Runnable{
        ...省略
    }
}

面向接口编程,提升系统扩展性:

public interface Processor {
    void process(List list);
}
使用

我们写个测试方法测试一下:

//实现 Processor 将 String 全部输出
public class PrintOutProcessor implements Processor{
    @Override
    public void process(List list) {

        System.out.println("start flush");

        list.forEach(System.out::println);

        System.out.println("end flush");
    }
}
public class Test {

    public static void main(String[] args) throws InterruptedException {

        Flusher stringFlusher = new Flusher<>("test",5,1000,30,1,new PrintOutProcessor());

        int index = 1;
        while (true){
            stringFlusher.add(String.valueOf(index++));
            Thread.sleep(1000);
        }
    }
}

执行的结果:

start flush
1
2
3
end flush
start flush
4
5
6
7
end flush

我们发现并没有达到10个数字就触发了flush。因为出发了超时提交,虽然还没有达到规定的5
个数据,但还是执行了 flush。

如果我们去除 Thread.sleep(1000); 再看看结果:

start flush
1
2
3
4
5
end flush
start flush
6
7
8
9
10
end flush

每5个数一次提交。完美。。。。

总结

一个比较生动的例子给大家讲解了一些多线程的具体运用。学习多线程应该多思考多动手,才会有比较好的效果。希望这篇文章大家读完以后有所收获,欢迎交流。

github地址:https://github.com/diaozxin007/framework

徒手撸框架系列文章地址:

徒手撸框架--实现IoC

徒手撸框架--实现Aop

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/68354.html

相关文章

  • JAVA 中的 CAS

    摘要:我们继续看代码的意思是这个是一段内嵌汇编代码。也就是在语言中使用汇编代码。就是汇编版的比较并交换。就是保证在多线程情况下,不阻塞线程的填充和消费。微观上看汇编的是实现操作系统级别的原子操作的基石。 原文地址:https://www.xilidou.com/2018/02/01/java-cas/ CAS 是现代操作系统,解决并发问题的一个重要手段,最近在看 eureka 的源码的时候。...

    CocoaChina 评论0 收藏0
  • 徒手一个简单的RPC框架

    摘要:徒手撸一个简单的框架之前在牛逼哄哄的框架,底层到底什么原理得知了远程过程调用简单来说就是调用远程的服务就像调用本地方法一样,其中用到的知识有序列化和反序列化动态代理网络传输动态加载反射这些知识点。 徒手撸一个简单的RPC框架 之前在牛逼哄哄的 RPC 框架,底层到底什么原理得知了RPC(远程过程调用)简单来说就是调用远程的服务就像调用本地方法一样,其中用到的知识有序列化和反序列化、动态...

    Gemini 评论0 收藏0
  • 徒手一个 Spring Boot 中的 Starter ,解密自动化配置黑魔法!

    摘要:先来看代码吧,一会松哥再慢慢解释关于这一段自动配置,解释如下首先注解表明这是一个配置类。本文的案例,松哥已经上传到上了,地址。我们使用 Spring Boot,基本上都是沉醉在它 Stater 的方便之中。Starter 为我们带来了众多的自动化配置,有了这些自动化配置,我们可以不费吹灰之力就能搭建一个生产级开发环境,有的小伙伴会觉得这个 Starter 好神奇呀!其实 Starter 也都...

    xiaochao 评论0 收藏0
  • 徒手UI之Paginator

    摘要:是一个组件库目前拥有的组件语法编写,无依赖原生模块化,以上支持,请开启静态服务器预览效果,静态服务器传送门采用变量配置样式辛苦造轮子,欢迎来仓库四月份找工作,求内推,坐标深圳写在前面去年年底项目中尝试着写过一个分页的组件,然后就有了写的想法 QingUI是一个UI组件库目前拥有的组件:DatePicker, TimePicker, Paginator, Tree, Cascader, ...

    liuhh 评论0 收藏0
  • 徒手框架--实现IoC

    摘要:从而能够进一步深入了解框架。至此我们框架开发完成。虽然说阅读源码是了解框架的最终手段。但是框架作为一个生产框架,为了保证通用和稳定,源码必定是高度抽象,且处理大量细节。下一篇文章应该会是徒手撸框架实现。 原文地址:https://www.xilidou.com/2018/... Spring 作为 J2ee 开发事实上的标准,是每个Java开发人员都需要了解的框架。但是Spring 的...

    rottengeek 评论0 收藏0

发表评论

0条评论

刘东

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<