摘要:源码分析创建可缓冲的线程池。源码分析使用创建线程池源码分析的构造函数构造函数参数核心线程数大小,当线程数,会创建线程执行最大线程数,当线程数的时候,会把放入中保持存活时间,当线程数大于的空闲线程能保持的最大时间。
之前创建线程的时候都是用的 newCachedThreadPoo,newFixedThreadPool,newScheduledThreadPool,newSingleThreadExecutor 这四个方法。
当然 Executors 也是用不同的参数去 new ThreadPoolExecutor 实现的,本文先分析前四种线程创建方式,后在分析 new ThreadPoolExecutor 创建方式
由于使用了LinkedBlockingQueue所以maximumPoolSize没用,当corePoolSize满了之后就加入到LinkedBlockingQueue队列中。
每当某个线程执行完成之后就从LinkedBlockingQueue队列中取一个。
所以这个是创建固定大小的线程池。
源码分析
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(
nThreads,
nThreads,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue());
}
2.newSingleThreadPool()
创建线程数为1的线程池,由于使用了LinkedBlockingQueue所以maximumPoolSize 没用,corePoolSize为1表示线程数大小为1,满了就放入队列中,执行完了就从队列取一个。
源码分析
public static ExecutorService newSingleThreadExecutor() {
return new Executors.FinalizableDelegatedExecutorService
(
new ThreadPoolExecutor(
1,
1,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue())
);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
3.newCachedThreadPool()
创建可缓冲的线程池。没有大小限制。由于corePoolSize为0所以任务会放入SynchronousQueue队列中,SynchronousQueue只能存放大小为1,所以会立刻新起线程,由于maxumumPoolSize为Integer.MAX_VALUE所以可以认为大小为2147483647。受内存大小限制。
源码分析
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(
0,
Integer.MAX_VALUE,
60L,
TimeUnit.SECONDS,
new SynchronousQueue());
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
使用 ThreadPoolExecutor 创建线程池
源码分析 ,ThreadPoolExecutor 的构造函数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
构造函数参数
1、corePoolSize 核心线程数大小,当线程数 < corePoolSize ,会创建线程执行 runnable
2、maximumPoolSize 最大线程数, 当线程数 >= corePoolSize的时候,会把 runnable 放入 workQueue中
3、keepAliveTime 保持存活时间,当线程数大于corePoolSize的空闲线程能保持的最大时间。
4、unit 时间单位
5、workQueue 保存任务的阻塞队列
6、threadFactory 创建线程的工厂
7、handler 拒绝策略
任务执行顺序1、当线程数小于 corePoolSize时,创建线程执行任务。
2、当线程数大于等于 corePoolSize并且 workQueue 没有满时,放入workQueue中
3、线程数大于等于 corePoolSize并且当 workQueue 满时,新任务新建线程运行,线程总数要小于 maximumPoolSize
4、当线程总数等于 maximumPoolSize 并且 workQueue 满了的时候执行 handler 的 rejectedExecution。也就是拒绝策略。
JDK7提供了7个阻塞队列。(也属于并发容器)
ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。
PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
DelayQueue:一个使用优先级队列实现的无界阻塞队列。
SynchronousQueue:一个不存储元素的阻塞队列。
LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
什么是阻塞队列?阻塞队列是一个在队列基础上又支持了两个附加操作的队列。
2个附加操作:
支持阻塞的插入方法:队列满时,队列会阻塞插入元素的线程,直到队列不满。
支持阻塞的移除方法:队列空时,获取元素的线程会等待队列变为非空。
阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程。简而言之,阻塞队列是生产者用来存放元素、消费者获取元素的容器。
几个方法在阻塞队列不可用的时候,上述2个附加操作提供了四种处理方法
| 方法处理方式 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
|---|---|---|---|---|
| 插入方法 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
| 移除方法 | remove() | poll() | take() | poll(time,unit) |
| 检查方法 | element() | peek() | 不可用 | 不可用 |
JDK 7 提供了7个阻塞队列,如下
1、ArrayBlockingQueue 数组结构组成的有界阻塞队列。
此队列按照先进先出(FIFO)的原则对元素进行排序,但是默认情况下不保证线程公平的访问队列,即如果队列满了,那么被阻塞在外面的线程对队列访问的顺序是不能保证线程公平(即先阻塞,先插入)的。
2、LinkedBlockingQueue一个由链表结构组成的有界阻塞队列
此队列按照先出先进的原则对元素进行排序
3、PriorityBlockingQueue支持优先级的无界阻塞队列
4、DelayQueue支持延时获取元素的无界阻塞队列,即可以指定多久才能从队列中获取当前元素
5、SynchronousQueue不存储元素的阻塞队列,每一个put必须等待一个take操作,否则不能继续添加元素。并且他支持公平访问队列。
6、LinkedTransferQueue由链表结构组成的无界阻塞TransferQueue队列。相对于其他阻塞队列,多了tryTransfer和transfer方法
transfer方法
如果当前有消费者正在等待接收元素(take或者待时间限制的poll方法),transfer可以把生产者传入的元素立刻传给消费者。如果没有消费者等待接收元素,则将元素放在队列的tail节点,并等到该元素被消费者消费了才返回。
tryTransfer方法
用来试探生产者传入的元素能否直接传给消费者。,如果没有消费者在等待,则返回false。和上述方法的区别是该方法无论消费者是否接收,方法立即返回。而transfer方法是必须等到消费者消费了才返回。
7、LinkedBlockingDeque链表结构的双向阻塞队列,优势在于多线程入队时,减少一半的竞争。
四个拒绝策略ThreadPoolExecutor默认有四个拒绝策略:
1、ThreadPoolExecutor.AbortPolicy() 直接抛出异常RejectedExecutionException
2、ThreadPoolExecutor.CallerRunsPolicy() 直接调用run方法并且阻塞执行
3、ThreadPoolExecutor.DiscardPolicy() 直接丢弃后来的任务
4、ThreadPoolExecutor.DiscardOldestPolicy() 丢弃在队列中队首的任务
当然可以自己继承RejectedExecutionHandler来写拒绝策略.
TestThreadPoolExecutor 示例 TestThreadPoolExecutor.javapackage io.ymq.thread.TestThreadPoolExecutor;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 描述:
*
* @author yanpenglei
* @create 2017-10-12 15:39
**/
public class TestThreadPoolExecutor {
public static void main(String[] args) {
long currentTimeMillis = System.currentTimeMillis();
// 构造一个线程池
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 6, 3,
TimeUnit.SECONDS, new ArrayBlockingQueue(3)
);
for (int i = 1; i <= 10; i++) {
try {
String task = "task=" + i;
System.out.println("创建任务并提交到线程池中:" + task);
threadPool.execute(new ThreadPoolTask(task));
Thread.sleep(100);
} catch (Exception e) {
e.printStackTrace();
}
}
try {
//等待所有线程执行完毕当前任务。
threadPool.shutdown();
boolean loop = true;
do {
//等待所有线程执行完毕当前任务结束
loop = !threadPool.awaitTermination(2, TimeUnit.SECONDS);//等待2秒
} while (loop);
if (loop != true) {
System.out.println("所有线程执行完毕");
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("耗时:" + (System.currentTimeMillis() - currentTimeMillis));
}
}
}
ThreadPoolTask.java
package io.ymq.thread.TestThreadPoolExecutor;
import java.io.Serializable;
/**
* 描述:
*
* @author yanpenglei
* @create 2017-10-12 15:40
**/
public class ThreadPoolTask implements Runnable, Serializable {
private Object attachData;
ThreadPoolTask(Object tasks) {
this.attachData = tasks;
}
public void run() {
try {
System.out.println("开始执行任务:" + attachData + "任务,使用的线程池,线程名称:" + Thread.currentThread().getName());
System.out.println();
} catch (Exception e) {
e.printStackTrace();
}
attachData = null;
}
}
遇到java.util.concurrent.RejectedExecutionException
第一
你的线程池 ThreadPoolExecutor 显示的 shutdown() 之后,再向线程池提交任务的时候。 如果你配置的拒绝策略是 AbortPolicy 的话,这个异常就会抛出来。
第二
当你设置的任务缓存队列过小的时候,或者说, 你的线程池里面所有的线程都在干活(线程数== maxPoolSize),并且你的任务缓存队列也已经充满了等待的队列, 这个时候,你再向它提交任务,则会抛出这个异常。
响应
可以看到线程 pool-1-thread-1 到5 循环使用
创建任务并提交到线程池中:task=1 开始执行任务:task=1任务,使用的线程池,线程名称:pool-1-thread-1 创建任务并提交到线程池中:task=2 开始执行任务:task=2任务,使用的线程池,线程名称:pool-1-thread-2 创建任务并提交到线程池中:task=3 开始执行任务:task=3任务,使用的线程池,线程名称:pool-1-thread-3 创建任务并提交到线程池中:task=4 开始执行任务:task=4任务,使用的线程池,线程名称:pool-1-thread-4 创建任务并提交到线程池中:task=5 开始执行任务:task=5任务,使用的线程池,线程名称:pool-1-thread-5 创建任务并提交到线程池中:task=6 开始执行任务:task=6任务,使用的线程池,线程名称:pool-1-thread-1 创建任务并提交到线程池中:task=7 开始执行任务:task=7任务,使用的线程池,线程名称:pool-1-thread-2 创建任务并提交到线程池中:task=8 开始执行任务:task=8任务,使用的线程池,线程名称:pool-1-thread-3 创建任务并提交到线程池中:task=9 开始执行任务:task=9任务,使用的线程池,线程名称:pool-1-thread-4 创建任务并提交到线程池中:task=10 开始执行任务:task=10任务,使用的线程池,线程名称:pool-1-thread-5 所有线程执行完毕 耗时:1015测试代码
github https://github.com/souyunku/ymq-example/tree/master/ymq-thread
Contact作者:鹏磊
出处:http://www.ymq.io
Email:admin@souyunku.com
版权归作者所有,转载请注明出处
Wechat:关注公众号,搜云库,专注于开发技术的研究与知识分享
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/67766.html
摘要:创建方法最大线程数即源码单线程化的线程池有且仅有一个工作线程执行任务所有任务按照指定顺序执行,即遵循队列的入队出队规则创建方法源码还有一个结合了和,就不介绍了,基本不用。 *本篇文章已授权微信公众号 guolin_blog (郭霖)独家发布 为什么用线程池 创建/销毁线程伴随着系统开销,过于频繁的创建/销毁线程,会很大程度上影响处理效率 >例如: > >记创建线程消耗时间T1,执行...
摘要:本文主要内容为简单总结中线程池的相关信息。方法簇方法簇用于创建固定线程数的线程池。三种常见线程池的对比上文总结了工具类创建常见线程池的方法,现对三种线程池区别进行比较。 概述 线程可认为是操作系统可调度的最小的程序执行序列,一般作为进程的组成部分,同一进程中多个线程可共享该进程的资源(如内存等)。在单核处理器架构下,操作系统一般使用分时的方式实现多线程;在多核处理器架构下,多个线程能够...
摘要:线程池常见实现线程池一般包含三个主要部分调度器决定由哪个线程来执行任务执行任务所能够的最大耗时等线程队列存放并管理着一系列线程这些线程都处于阻塞状态或休眠状态任务队列存放着用户提交的需要被执行的任务一般任务的执行的即先提交的任务先被执行调度 线程池常见实现 线程池一般包含三个主要部分: 调度器: 决定由哪个线程来执行任务, 执行任务所能够的最大耗时等 线程队列: 存放并管理着一系列线...
阅读 3195·2023-04-26 01:32
阅读 1757·2021-09-13 10:37
阅读 2526·2019-08-30 15:56
阅读 1908·2019-08-30 14:00
阅读 3343·2019-08-30 12:44
阅读 2133·2019-08-26 12:20
阅读 1409·2019-08-23 16:29
阅读 3465·2019-08-23 14:44