资讯专栏INFORMATION COLUMN

Java多线程进阶(三六)—— J.U.C之collections框架:DelayQueue

enda / 2468人阅读

摘要:之后,会重复上一步,新唤醒的线程可能取代成为新的线程。这其实是一种名为的多线程设计模式。我们之前说了,线程作用之一就是用来唤醒其它无限等待的线程,所以必须要有这个判断。线程池框架中的就是一种延时阻塞队列。

本文首发于一世流云专栏:https://segmentfault.com/blog...
一、DelayQueue简介

DelayQueue是JDK1.5时,随着J.U.C包一起引入的一种阻塞队列,它实现了BlockingQueue接口,底层基于已有的PriorityBlockingQueue实现:

DelayQueue也是一种比较特殊的阻塞队列,从类声明也可以看出,DelayQueue中的所有元素必须实现Delayed接口:

/**
 * 一种混合风格的接口,用来标记那些应该在给定延迟时间之后执行的对象。
 * 

* 此接口的实现必须定义一个 compareTo 方法,该方法提供与此接口的 getDelay 方法一致的排序。 */ public interface Delayed extends Comparable { /** * 返回与此对象相关的剩余有效时间,以给定的时间单位表示. */ long getDelay(TimeUnit unit); }

可以看到,Delayed接口除了自身的getDelay方法外,还实现了Comparable接口。getDelay方法用于返回对象的剩余有效时间,实现Comparable接口则是为了能够比较两个对象,以便排序。

也就是说,如果一个类实现了Delayed接口,当创建该类的对象并添加到DelayQueue中后,只有当该对象的getDalay方法返回的剩余时间≤0时才会出队

另外,由于DelayQueue内部委托了PriorityBlockingQueue对象来实现所有方法,所以能以堆的结构维护元素顺序,这样剩余时间最小的元素就在堆顶,每次出队其实就是删除剩余时间≤0的最小元素

DelayQueue的特点简要概括如下:

DelayQueue是无界阻塞队列;

队列中的元素必须实现Delayed接口,元素过期后才会从队列中取走;

二、DelayQueue示例

为了便于理解DelayQueue的功能,我们先来看一个使用DelayQueue的示例。

队列元素

第一节说了,队列元素必须实现Delayed接口,我们先来定义一个Data类,作为队列元素:

public class Data implements Delayed {
    private static final AtomicLong atomic = new AtomicLong(0);
    private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss-n");

    // 数据的失效时间点
    private final long time;

    // 序号
    private final long seqno;

    /**
     * @param deadline 数据失效时间点
     */
    public Data(long deadline) {
        this.time = deadline;
        this.seqno = atomic.getAndIncrement();
    }

    /**
     * 返回剩余有效时间
     *
     * @param unit 时间单位
     */
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(this.time - System.nanoTime(), TimeUnit.NANOSECONDS);
    }

    /**
     * 比较两个Delayed对象的大小, 比较顺序如下:
     * 1. 如果是对象本身, 返回0;
     * 2. 比较失效时间点, 先失效的返回-1,后失效的返回1;
     * 3. 比较元素序号, 序号小的返回-1, 否则返回1.
     * 4. 非Data类型元素, 比较剩余有效时间, 剩余有效时间小的返回-1,大的返回1,相同返回0
     */
    @Override
    public int compareTo(Delayed other) {
        if (other == this)  // compare zero if same object
            return 0;

        if (other instanceof Data) {
            Data x = (Data) other;

            // 优先比较失效时间
            long diff = this.time - x.time;
            if (diff < 0)
                return -1;
            else if (diff > 0)
                return 1;

            else if (this.seqno < x.seqno)    // 剩余时间相同则比较序号
                return -1;
            else
                return 1;
        }

        // 一般不会执行到此处,除非元素不是Data类型
        long diff = this.getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS);
        return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
    }

    @Override
    public String toString() {
        return "Data{" +
            "time=" + time +
            ", seqno=" + seqno +
            "}, isValid=" + isValid();
    }

    private boolean isValid() {
        return this.getDelay(TimeUnit.NANOSECONDS) > 0;
    }
}

关于队列元素Data类,需要注意以下几点:

每个元素的time字段保存失效时间点)的纳秒形式(构造时指定,比如当前时间+60s);

seqno字段表示元素序号,每个元素唯一,仅用于失效时间点一致的元素之间的比较。

getDelay方法返回元素的剩余有效时间,可以根据入参的TimeUnit选择时间的表示形式(秒、微妙、纳秒等),一般选择纳秒以提高精度;

compareTo方法用于比较两个元素的大小,以便在队列中排序。由于DelayQueue基于优先级队列实现,所以内部是“堆”的形式,我们定义的规则是先失效的元素将先出队,所以先失效元素应该在堆顶,即compareTo方法返回结果<0的元素优先出队;

生产者-消费者

还是以“生产者-消费者”模式来作为DelayQueued的示例:

生产者

public class Producer implements Runnable {
    private final DelayQueue queue;

    public Producer(DelayQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        while (true) {

            long currentTime = System.nanoTime();
            long validTime = ThreadLocalRandom.current().nextLong(1000000000L, 7000000000L);

            Data data = new Data(currentTime + validTime);
            queue.put(data);

            System.out.println(Thread.currentThread().getName() + ": put " + data);

            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

消费者

public class Consumer implements Runnable {
    private final DelayQueue queue;

    public Consumer(DelayQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        while (true) {
            try {
                Data data = queue.take();
                System.out.println(Thread.currentThread().getName() + ": take " + data);

                Thread.yield();

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

调用

public class Main {
    public static void main(String[] args) {
        DelayQueue queue = new DelayQueue<>();

        Thread c1 = new Thread(new Consumer(queue), "consumer-1");
        Thread p1 = new Thread(new Producer(queue), "producer-1");

        c1.start();
        p1.start();
    }
}

执行结果:

producer-1: put Data{time=73262562161592, seqno=0}, isValid=true
producer-1: put Data{time=73262787192726, seqno=1}, isValid=true
producer-1: put Data{time=73265591291171, seqno=2}, isValid=true
producer-1: put Data{time=73266850330909, seqno=3}, isValid=true
consumer-1: take Data{time=73262562161592, seqno=0}, isValid=false
consumer-1: take Data{time=73262787192726, seqno=1}, isValid=false
producer-1: put Data{time=73267928737184, seqno=4}, isValid=true
producer-1: put Data{time=73265083111776, seqno=5}, isValid=true
producer-1: put Data{time=73268729942809, seqno=6}, isValid=true
consumer-1: take Data{time=73265083111776, seqno=5}, isValid=false

上面示例中,我们创建了一个生产者,一个消费者,生产者不断得入队元素,每个元素都会有个截止有效期;消费者不断得从队列者获取元素。从输出可以看出,消费者每次获取到的元素都是有效期最小的,且都是已经失效了的。(因为DelayQueue每次出队只会删除有效期最小且已经过期的元素)

三、DelayQueue原理

介绍完了DelayQueued的基本使用,读者应该对该阻塞队列的功能有了基本了解,接下来我们看下Doug Lea是如何实现DelayQueued的。

构造

DelayQueued提供了两种构造器,都非常简单:

/**
 * 默认构造器.
 */
public DelayQueue() {
}
/**
 * 从已有集合构造队列.
 */
public DelayQueue(Collection c) {
    this.addAll(c);
}

可以看到,内部的PriorityQueue并非在构造时创建,而是对象创建时生成:

public class DelayQueue extends AbstractQueue
    implements BlockingQueue {

    private final transient ReentrantLock lock = new ReentrantLock();
    private final PriorityQueue q = new PriorityQueue();

    /**
     * leader线程是首个尝试出队元素(队列不为空)但被阻塞的线程.
     * 该线程会限时等待(队首元素的剩余有效时间),用于唤醒其它等待线程
     */
    private Thread leader = null;

    /**
     * 出队线程条件队列, 当有多个线程, 会在此条件队列上等待.
     */
    private final Condition available = lock.newCondition();

    //...

}

上述比较特殊的是leader字段,我们之前已经说过,DelayQueue每次只会出队一个过期的元素,如果队首元素没有过期,就会阻塞出队线程,让线程在available这个条件队列上无限等待。

为了提升性能,DelayQueue并不会让所有出队线程都无限等待,而是用leader保存了第一个尝试出队的线程,该线程的等待时间是队首元素的剩余有效期。这样,一旦leader线程被唤醒(此时队首元素也失效了),就可以出队成功,然后唤醒一个其它在available条件队列上等待的线程。之后,会重复上一步,新唤醒的线程可能取代成为新的leader线程。这样,就避免了无效的等待,提升了性能。这其实是一种名为“Leader-Follower pattern”的多线程设计模式。

入队——put

put方法没有什么特别,由于是无界队列,所以也不会阻塞线程。

/**
 * 入队一个指定元素e.
 * 由于是无界队列, 所以该方法并不会阻塞线程.
 */
public void put(E e) {
    offer(e);
}

public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        q.offer(e);             // 调用PriorityQueue的offer方法
        if (q.peek() == e) {    // 如果入队元素在队首, 则唤醒一个出队线程
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}
需要注意的是当首次入队元素时,需要唤醒一个出队线程,因为此时可能已有出队线程在空队列上等待了,如果不唤醒,会导致出队线程永远无法执行。
if (q.peek() == e) {    // 如果入队元素在队首, 则唤醒一个出队线程
    leader = null;
    available.signal();
}
出队——take

整个take方法在一个自旋中完成,其实就分为两种情况:

1.队列为空

这种情况直接阻塞出队线程。(在available条件队列等待)

2.队列非空

队列非空时,还要看队首元素的状态(有效期),如果队首元素过期了,那直接出队就行了;如果队首元素未过期,就要看当前线程是否是第一个到达的出队线程(即判断leader是否为空),如果不是,就无限等待,如果是,则限时等待。

/**
 * 队首出队元素.
 * 如果队首元素(堆顶)未到期或队列为空, 则阻塞线程.
 */
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (; ; ) {
            E first = q.peek();     // 读取队首元素
            if (first == null)      // CASE1: 队列为空, 直接阻塞
                available.await();
            else {                  // CASE2: 队列非空
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)                             // CASE2.0: 队首元素已过期
                    return q.poll();

                // 执行到此处说明队列非空, 且队首元素未过期
                first = null;
                if (leader != null)                         // CASE2.1: 已存在leader线程
                    available.await();      // 无限期阻塞当前线程
                else {                                      // CASE2.2: 不存在leader线程
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;    // 将当前线程置为leader线程
                    try {
                        available.awaitNanos(delay);        // 阻塞当前线程(限时等待剩余有效时间)
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && q.peek() != null)             // 不存在leader线程, 则唤醒一个其它出队线程
            available.signal();
        lock.unlock();
    }
}
需要注意,自旋结束后如果leader == null && q.peek() != null,需要唤醒一个等待中的出队线程。
leader == null && q.peek() != null的含义就是——没有leader线程但队列中存在元素。我们之前说了,leader线程作用之一就是用来唤醒其它无限等待的线程,所以必须要有这个判断。
四、总结

DelayQueue是阻塞队列中非常有用的一种队列,经常被用于缓存或定时任务等的设计。

考虑一种使用场景:

异步通知的重试,在很多系统中,当用户完成服务调用后,系统有时需要将结果异步通知到用户的某个URI。由于网络等原因,很多时候会通知失败,这个时候就需要一种重试机制。

这时可以用DelayQueue保存通知失败的请求,失效时间可以根据已通知的次数来设定(比如:2s、5s、10s、20s),这样每次从队列中take获取的就是剩余时间最短的请求,如果已重复通知次数超过一定阈值,则可以把消息抛弃。

后面,我们在讲J.U.C之executors框架的时候,还会再次看到DelayQueue的身影。JUC线程池框架中的ScheduledThreadPoolExecutor.DelayedWorkQueue就是一种延时阻塞队列。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/77112.html

相关文章

  • Java线程进阶(一)—— J.U.C并发包概述

    摘要:整个包,按照功能可以大致划分如下锁框架原子类框架同步器框架集合框架执行器框架本系列将按上述顺序分析,分析所基于的源码为。后,根据一系列常见的多线程设计模式,设计了并发包,其中包下提供了一系列基础的锁工具,用以对等进行补充增强。 showImg(https://segmentfault.com/img/remote/1460000016012623); 本文首发于一世流云专栏:https...

    anonymoussf 评论0 收藏0
  • Java线程进阶(二六)—— J.U.Ccollections框架:ConcurrentSkip

    摘要:我们来看下的类继承图可以看到,实现了接口,在多线程进阶二五之框架中,我们提到过实现了接口,以提供和排序相关的功能,维持元素的有序性,所以就是一种为并发环境设计的有序工具类。唯一的区别是针对的仅仅是键值,针对键值对进行操作。 showImg(https://segmentfault.com/img/bVbggic?w=960&h=600); 本文首发于一世流云专栏:https://seg...

    levius 评论0 收藏0
  • Java线程进阶(二七)—— J.U.Ccollections框架:CopyOnWriteArr

    摘要:仅仅当有多个线程同时进行写操作时,才会进行同步。可以看到,上述方法返回一个迭代器对象,的迭代是在旧数组上进行的,当创建迭代器的那一刻就确定了,所以迭代过程中不会抛出并发修改异常。另外,迭代器对象也不支持修改方法,全部会抛出异常。 showImg(https://segmentfault.com/img/bVbggij?w=960&h=600); 本文首发于一世流云专栏:https://...

    garfileo 评论0 收藏0
  • Java线程进阶(二八)—— J.U.Ccollections框架:CopyOnWriteArr

    摘要:我们之前已经介绍过了,底层基于跳表实现,其操作平均时间复杂度均为。事实上,内部引用了一个对象,以组合方式,委托对象实现了所有功能。线程安全内存的使用较多迭代是对快照进行的,不会抛出,且迭代过程中不支持修改操作。 showImg(https://segmentfault.com/img/bVbggjf?w=600&h=377); 本文首发于一世流云专栏:https://segmentfa...

    NeverSayNever 评论0 收藏0
  • Java线程进阶(三七)—— J.U.Ccollections框架:LinkedBlocking

    摘要:接口截止目前为止,我们介绍的阻塞队列都是实现了接口。该类在构造时一般需要指定容量,如果不指定,则最大容量为。另外,由于内部通过来保证线程安全,所以的整体实现时比较简单的。另外,双端队列相比普通队列,主要是多了队尾出队元素队首入队元素的功能。 showImg(https://segmentfault.com/img/bVbgZ7j?w=770&h=514); 本文首发于一世流云专栏:ht...

    light 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<