资讯专栏INFORMATION COLUMN

Java中的线程池

tomato / 947人阅读

摘要:中的线程池运用场景非常广泛,几乎所有的一步或者并发执行程序都可以使用。代码中如果执行了方法,线程池会提前创建并启动所有核心线程。线程池最大数量线程池允许创建的线程最大数量。被称为是可重用固定线程数的线程池。

Java中的线程池运用场景非常广泛,几乎所有的一步或者并发执行程序都可以使用。那么线程池有什么好处呢,以及他的实现原理是怎么样的呢?

使用线程池的好处

在开发过程中,合理的使用线程池能够带来以下的一些优势,这些优势同时也是一些其他池化的优势,例如数据库连接池,http连接池等。

降低资源消耗,通过重复利用已经创建的线程降低线程创建和销毁造成的消耗。

提高响应速度,当任务到达时,任务可以不需要等到线程创建就能立即执行。

提高线程的可管理性 线程的比较稀缺的资源,如果无限的创建,不仅会消耗系统资源,还会降低系统的稳定性。

线程池实现原理

我们创建线程池完成之后,当把一个任务提交给线程池处理的时候,线程池的处理流程如下:
1)线程池判断核心线程池的任务是否都在执行任务,如果不是,则创建一个新的线程来执行任务,如何核心线程池的线程都在执行任务,则进入下一个流程
2)线程池判断工作队列是否已满,如果工作队列没有满,那么新提交的任务将会存储在工作队列中,如果工作队列满了,那会进入到下一个流程
3)线程池判断线程池中的线程是否都处于工作状态,如果没有,则创建一个新的工作线程来执行任务。如果已经满了,那么使用饱和策略来处理这个任务

线程池使用
创建线程池

创建线程池可以使用java中已经内置的一些默认配置的线程池,也可以使用ThreadPoolExecutor来自己配置参数来创建线程池,阿里代码规约中推荐后者来创建线程池,这样创建的线程池更加符合业务的实际需求。
下面是我创建线程池的一个例子:

ExecutorService executor = new ThreadPoolExecutor(2, 5,  30 , TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.AbortPolicy());

 public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue workQueue,
                          RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), handler);
}

在上面的这段代码里new ThreadPoolExecutor中一共有6个参数,我们来解析一下这些参数的意义,这样下次创建线程池的时候能够更加灵活的使用。

corePoolSize (核心线程的数量):任务提交到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的核心线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池核心数大小时就不再创建。代码中如果执行了prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。

maximumPoolSize (线程池最大数量):线程池允许创建的线程最大数量。如果队列满了,并且创建的线程数小于最大线程数,那么线程池会再创建新的线程执行任务,如果使用无界队列,那么这个参数设置了意义不大。

keepAliveTime (线程保持活动时间):线程池的工作线程空闲后,保持存活的时间,如果任务很多,切任务执行时间较长,这个值可以设置的大一点。

TimeUnit (上面的那个时间的单位)

BlockingQueue workQueue (任务队列):用于保存等待执行的任务的阻塞队列。可以选择的有以下:

ArrayBlockingQueue:基于数组的有界阻塞队列 FIFO

LinkedBlockingQueue:基于链表的有界阻塞队列 FIFO 吞吐量高于ArrayBlockingQueue, FixedThreadPool基于这个实现

SynchronousQueue: 不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入处于阻塞状态,吞吐量高于LinkedBlockingQueue,静态方法:Executors.newCachedThreadPool使用了这个队列

PriorityBlockingQueue 具有优先级的无界阻塞队列

handler (饱和策略):当队列和线程池都满了,说明线程池已经处于饱和状态,那么必须采取一种策略处理新提交的任务,目前提供四种策略

AbortPolicy:直接抛出异常

CallerRunsPolicy: 只用调用者所在线程来运行任务

DiscardOldestPolicy :丢弃队列里最近的一个任务,执行当前任务

DiscardPolicy : 不处理丢弃掉

也可以实现RejectedExecutionHandler接口自定义处理方式。

向线程池提交任务

线程池创建完成之后,就可以处理提交的任务,任务如何提交到线程池呢,线程池提供了两个方法:submit()和execute()方法。

 public static void main(String[] args) {
    //提交没有返回结果的任务
    EXECUTOR_SERVICE.execute(new Runnable() {
        @Override
        public void run() {
            System.out.println("hello world");
        }
    });
    Future future = EXECUTOR_SERVICE.submit(new Runnable() {
        @Override
        public void run() {
            System.out.println("hello world1");
        }
    });
    //提交有返回结果的任务,返回一个future的对象,调用get方法获取返回值,阻塞直到返回
    Future future1 = EXECUTOR_SERVICE.submit(new Callable() {
        @Override
        public Object call() throws Exception {
            System.out.println("hello world1");
            return "succ";
        }
    });
    try {
        System.out.println(future.get());
        System.out.println(future1.get());
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
    EXECUTOR_SERVICE.shutdownNow();
}
关闭线程池

Java对ExecutorService关闭方式有两种,一是调用shutdown()方法,二是调用shutdownNow()方法。
这两个方法虽然都可以关闭线程池,但是有一些区别

shutdown()

1、调用之后不允许继续往线程池内继续添加线程,如果继续添加会出现RejectedExecutionException异常;
2、线程池的状态变为SHUTDOWN状态;
3、所有在调用shutdown()方法之前提交到ExecutorSrvice的任务都会执行;
4、一旦所有线程结束执行当前任务,ExecutorService才会真正关闭。

shutdownNow()

1、该方法返回尚未执行的 task 的 List;
2、线程池的状态变为STOP状态;
3、阻止所有正在等待启动的任务, 并且停止当前正在执行的任务;

最好的关闭线程池的方法:

package multithread.threadpool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
 * @author pangjianfei
 * @Date 2019/6/21
 * @desc 测试线程池
 */
public class TestThreadPoolExecutor {

    static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(10);

    public static void main(String[] args) {
        EXECUTOR_SERVICE.execute( ()-> System.out.println("hello world"));
        Future future = EXECUTOR_SERVICE.submit(() -> System.out.println("hello world1"));
        Future future1 = EXECUTOR_SERVICE.submit(() -> {System.out.println("hello world1");return "succ";});
        Future future2 = EXECUTOR_SERVICE.submit(() -> {Thread.sleep(5000);System.out.println("hello world2");return "线程执行成功了";});
        try {
            System.out.println(future.get());
            System.out.println(future1.get());
            System.out.println(future2.get());
        } catch (Exception e) {
            e.printStackTrace();
        }
        Runnable run = () -> {
            long sum = 0;
            boolean flag = true;
            while (flag && !Thread.currentThread().isInterrupted()) {
                sum += 1;
                if (sum == Long.MAX_VALUE) {
                    flag = false;
                }
            }
        };
        EXECUTOR_SERVICE.execute(run);
        //先调用shutdown()使线程池状态改变为SHUTDOWN
        EXECUTOR_SERVICE.shutdown();
        try {
            //2s后检测线程池内的线程是否执行完毕
            if (!EXECUTOR_SERVICE.awaitTermination(2, TimeUnit.SECONDS)) {
                //内部实际通过interrupt来终止线程,所以当调用shutdownNow()时,isInterrupted()会返回true。
                EXECUTOR_SERVICE.shutdownNow();
            }
        } catch (InterruptedException e) {
            EXECUTOR_SERVICE.shutdownNow();
        }
    }
}
合理配置线程池

一般说来,大家认为线程池的大小经验值应该这样设置:(其中N为CPU的个数)

如果是CPU密集型应用,则线程池大小设置为N+1
如果是IO密集型应用,则线程池大小设置为2N+1
IO优化中,更加合理的方式是:最佳线程数目 = (线程等待时间与线程CPU时间之比 + 1)* CPU数目

linux查看CPU的数量:

# 查看物理CPU个数
cat /proc/cpuinfo| grep "physical id"| sort| uniq| wc -l

# 查看每个物理CPU中core的个数(即核数)
cat /proc/cpuinfo| grep "cpu cores"| uniq

# 查看逻辑CPU的个数
cat /proc/cpuinfo| grep "processor"| wc -l
线程池的监控

基于下面的方法可以对线程池中的一些数据进行监控:

//当前排队线程数
((ThreadPoolExecutor)EXECUTOR_SERVICE).getQueue().size();
//当前活动的线程数
((ThreadPoolExecutor)EXECUTOR_SERVICE).getActiveCount();
//执行完成的线程数
((ThreadPoolExecutor)EXECUTOR_SERVICE).getCompletedTaskCount();
//总线程数
((ThreadPoolExecutor)EXECUTOR_SERVICE).getTaskCount();
Executor框架
Executor框架的调度模型

Executor框架的调度模型是一种二级调度模型。
在HotSpot VM 的线程模型中,Java线程会被一对一的映射为本地操作系统的线程。Java线程的启动与销毁都与本地线程同步。操作系统会调度所有线程并将它们分配给可用的CPU。
在上层,我们通常会创建任务,然后将任务提交到调度器,调度器(Executor框架)将这些任务映射为对应数量的线程;底层,操作系统会将这些线程映射到硬件处理器上,这里不是由应用程序控制。模型图如下:

Executor框架的结构和成员

Executor框架主要包括三部分:

任务: 定义的被执行的任务需要实现两个接口之一:Runable、Callable;

任务的执行: 包括任务执行机制的核心接口Executor,以及继承自Executor的ExecutorService接口。Executor框架有两个关键类实现了ExecutorService接口:ThreadPoolExecutor 和 ScheduledThreadPoolExecutor、ForkJoinPool;

任务的异步计算结果: 包括Future接口和实现Future接口的FutureTask类、ForkJoinTask类。

Executor主要包含的类和接口如下:
Executor是一个接口,他是Executor框架的基础,他将任务的提交和执行分离。
ThreadPoolExecutor是线程池的核心类,用来执行被提交的任务
ScheduleThreadPoolExecutor是一个实现类,可以在给定的延迟后执行命令,或者定期执行命令,比Timer更加灵活。
Future接口和实现Future接口的FutureTask类,代表异步计算的结果

ThreadPoolExecutor详解

Executor框架最核心的类就是ThreadPoolExecutor,他是线程池的实现类,我们在上面看到过创建线程池的例子
ExecutorService executor = new ThreadPoolExecutor(2, 5, 30 , TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.AbortPolicy()); 里面包含了几个核心的参数,通过Executor框架的工具类Executors,可以直接创建3中类型的线程池。

FixedThreadPool

FixedThreadPool被称为是可重用固定线程数的线程池。为什么会这么说,我们看一下他的实现源码:

public static ExecutorService newFixedThreadPool(int nThreads) {
    /**
     * 核心线程数和最大线程数相同,线程等待时间为0表示多余的空闲线程会被立刻终止,LinkedBlockingQueue默认容量是Integer.MAX_VALUE,基本类似于无界队列
     */
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue());
}
SingleThreadExecutor

SingleThreadExecutor是单个Worker线程的Executor.他的实现源码如下:

/**
 * 可以看到核心线程数和最大线程数都是1,相当于是创建一个单线程顺序的执行队列中的任务
 */
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue()));
}
CachedThreadPool

CachedThreadPool是一个会根据需要创建新线程的线程池,源码如下:

/**
 * 核心线程数为0,最大线程数为Integer.MAX_VALUE,线程存活时间为1分钟,而且是使用SynchronousQueue没有容量的队列,这种情况下任务提交快的是时候,会创建大量的线程,耗尽CPU的资源
 */
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue());
}
ScheduledThreadPoolExecutor详解

ScheduledThreadPoolExecutor用来在给定的延迟之后运行任务,或者定期执行任务,他比Timer更加灵活,Timer是创建一个后台线程,而这个线程池可以在构造函数中指定多个对应的后台线程。源码如下:

/**
 * 这里使用的是无界的队列,如果核心线程池已满,那么任务会一直提交到队列
 */
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

ScheduledThreadPoolExecutor的执行主要包含两部分

/**参数的作用:1:执行的线程  2、初始化延时  3、两次开始执行最小时间间隔  4、计时单位*/
/**scheduleAtFixedRate()是按照指定频率执行一个任务,就是前一个任务没有执行完成,但是下次执行时间到,仍然会启动线程执行新的任务*/
scheduledExecutorService.scheduleAtFixedRate(()-> System.out.println("hello"), 10, 10, TimeUnit.SECONDS);
/**参数作用同上*/
/**scheduleWithFixedDelay()是按照指定频率间隔执行,本次执行结果之后,在延时10s执行下一次的任务*/
scheduledExecutorService.scheduleWithFixedDelay(()-> System.out.println("hello"), 10, 10, TimeUnit.SECONDS);

上面的两种方法都是向ScheduledThreadPoolExecutor的DelayQueue中添加了一个实现了RunnableScheduleFuture接口的ScheduledTask.

 public ScheduledFuture scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (period <= 0)
        throw new IllegalArgumentException();
    //这里对commond进行了封装
    ScheduledFutureTask sft =
        new ScheduledFutureTask(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(period));
    RunnableScheduledFuture t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}

DelayQueue是一个支持优先级的队列,这里默认会按照执行时间进行排序,time小的排在前面。线程在获取到期需要执行的ScheduledTutureTask之后,执行这个任务,执行完成之后会修改time,将其变为下次要执行的时间,修改之后放回到DelayQueue中。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/77839.html

相关文章

  • 后端ing

    摘要:当活动线程核心线程非核心线程达到这个数值后,后续任务将会根据来进行拒绝策略处理。线程池工作原则当线程池中线程数量小于则创建线程,并处理请求。当线程池中的数量等于最大线程数时默默丢弃不能执行的新加任务,不报任何异常。 spring-cache使用记录 spring-cache的使用记录,坑点记录以及采用的解决方案 深入分析 java 线程池的实现原理 在这篇文章中,作者有条不紊的将 ja...

    roadtogeek 评论0 收藏0
  • Java线程

    摘要:中的线程池是运用场景最多的并发框架。才是真正的线程池。存放任务的队列存放需要被线程池执行的线程队列。所以线程池的所有任务完成后,它最终会收缩到的大小。饱和策略一般情况下,线程池采用的是,表示无法处理新任务时抛出异常。 Java线程池 1. 简介 系统启动一个新线程的成本是比较高的,因为它涉及与操作系统的交互,这个时候使用线程池可以提升性能,尤其是需要创建大量声明周期很短暂的线程时。Ja...

    jerry 评论0 收藏0
  • 一看就懂的Java线程分析详解

    摘要:任务性质不同的任务可以用不同规模的线程池分开处理。线程池在运行过程中已完成的任务数量。如等于线程池的最大大小,则表示线程池曾经满了。线程池的线程数量。获取活动的线程数。通过扩展线程池进行监控。框架包括线程池,,,,,,等。 Java线程池 [toc] 什么是线程池 线程池就是有N个子线程共同在运行的线程组合。 举个容易理解的例子:有个线程组合(即线程池,咱可以比喻为一个公司),里面有3...

    Yangder 评论0 收藏0
  • Java线程的工作原理,好处和注意事项

    摘要:线程池的工作原理一个线程池管理了一组工作线程,同时它还包括了一个用于放置等待执行任务的任务队列阻塞队列。使用线程池可以对线程进行统一的分配和监控。线程池的注意事项虽然线程池是构建多线程应用程序的强大机制,但使用它并不是没有风险的。 线程池的工作原理一个线程池管理了一组工作线程, 同时它还包括了一个用于放置等待执行 任务的任务队列(阻塞队列) 。 一个线程池管理了一组工作线程, 同时它还...

    ZweiZhao 评论0 收藏0
  • Java线程(2):使用线程 ThreadPoolExecutor

    摘要:本文只介绍中线程池的基本使用,不会过多的涉及到线程池的原理。可缓存线程的线程池创建一个可缓存线程的线程池。首先是从接口继承到的方法使用该方法即将一个任务交给线程池去执行。方法方法的作用是向线程池发送关闭的指令。 首先,我们为什么需要线程池?让我们先来了解下什么是 对象池 技术。某些对象(比如线程,数据库连接等),它们创建的代价是非常大的 —— 相比于一般对象,它们创建消耗的时间和内存都...

    darry 评论0 收藏0

发表评论

0条评论

tomato

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<