资讯专栏INFORMATION COLUMN

线程池工作窃取实例

ruicbAndroid / 3456人阅读

摘要:序本文主要来展示一下简版的线程池的实现。默认提供了几个工厂方法思路主要用到的是双端队列,不过这里我们粗糙的实现的话,也可以不用到。测试实例输出从数据来看,还是相对均匀的。

本文主要来展示一下简版的work stealing线程池的实现。

Executors

Executors默认提供了几个工厂方法

</>复制代码

  1. /**
  2. * Creates a thread pool that maintains enough threads to support
  3. * the given parallelism level, and may use multiple queues to
  4. * reduce contention. The parallelism level corresponds to the
  5. * maximum number of threads actively engaged in, or available to
  6. * engage in, task processing. The actual number of threads may
  7. * grow and shrink dynamically. A work-stealing pool makes no
  8. * guarantees about the order in which submitted tasks are
  9. * executed.
  10. *
  11. * @param parallelism the targeted parallelism level
  12. * @return the newly created thread pool
  13. * @throws IllegalArgumentException if {@code parallelism <= 0}
  14. * @since 1.8
  15. */
  16. public static ExecutorService newWorkStealingPool(int parallelism) {
  17. return new ForkJoinPool
  18. (parallelism,
  19. ForkJoinPool.defaultForkJoinWorkerThreadFactory,
  20. null, true);
  21. }
  22. /**
  23. * Creates a work-stealing thread pool using all
  24. * {@link Runtime#availableProcessors available processors}
  25. * as its target parallelism level.
  26. * @return the newly created thread pool
  27. * @see #newWorkStealingPool(int)
  28. * @since 1.8
  29. */
  30. public static ExecutorService newWorkStealingPool() {
  31. return new ForkJoinPool
  32. (Runtime.getRuntime().availableProcessors(),
  33. ForkJoinPool.defaultForkJoinWorkerThreadFactory,
  34. null, true);
  35. }
思路

ForkJoinPool主要用到的是双端队列,不过这里我们粗糙的实现的话,也可以不用到deque。

</>复制代码

  1. public class WorkStealingChannel {
  2. private static final Logger LOGGER = LoggerFactory.getLogger(WorkStealingChannel.class);
  3. BlockingDeque[] managedQueues;
  4. AtomicLongMap stat = AtomicLongMap.create();
  5. public WorkStealingChannel() {
  6. int nCPU = Runtime.getRuntime().availableProcessors();
  7. int queueCount = nCPU / 2 + 1;
  8. managedQueues = new LinkedBlockingDeque[queueCount];
  9. for(int i=0;i();
  10. }
  11. }
  12. public void put(T item) throws InterruptedException {
  13. int targetIndex = Math.abs(item.hashCode() % managedQueues.length);
  14. BlockingQueue targetQueue = managedQueues[targetIndex];
  15. targetQueue.put(item);
  16. }
  17. public T take() throws InterruptedException {
  18. int rdnIdx = ThreadLocalRandom.current().nextInt(managedQueues.length);
  19. int idx = rdnIdx;
  20. while (true){
  21. idx = idx % managedQueues.length;
  22. T item = null;
  23. if(idx == rdnIdx){
  24. item = managedQueues[idx].poll();
  25. }else{
  26. item = managedQueues[idx].pollLast();
  27. }
  28. if(item != null){
  29. LOGGER.info("take ele from queue {}",idx);
  30. stat.addAndGet(idx,1);
  31. return item;
  32. }
  33. idx++;
  34. if(idx == rdnIdx){
  35. break;
  36. }
  37. }
  38. //走完一轮没有,则随机取一个等待
  39. LOGGER.info("wait for queue:{}",rdnIdx);
  40. stat.addAndGet(rdnIdx,1);
  41. return managedQueues[rdnIdx].take();
  42. }
  43. public AtomicLongMap getStat() {
  44. return stat;
  45. }
  46. }

</>复制代码

  1. 这里根据cpu的数量建立了几个deque,然后每次put的时候,根据hashcode取模放到对应的队列。然后获取的时候,先从随机一个队列取,没有的话,再robbin round取其他队列的,还没有的话,则阻塞等待指定队列的元素。

测试实例

</>复制代码

  1. public class WorkStealingDemo {
  2. static final WorkStealingChannel channel = new WorkStealingChannel<>();
  3. static volatile boolean running = true;
  4. static class Producer extends Thread{
  5. @Override
  6. public void run() {
  7. while(running){
  8. try {
  9. channel.put(UUID.randomUUID().toString());
  10. } catch (InterruptedException e) {
  11. e.printStackTrace();
  12. }
  13. }
  14. }
  15. }
  16. static class Consumer extends Thread{
  17. @Override
  18. public void run() {
  19. while(running){
  20. try {
  21. String value = channel.take();
  22. System.out.println(value);
  23. } catch (InterruptedException e) {
  24. e.printStackTrace();
  25. }
  26. }
  27. }
  28. }
  29. public static void stop(){
  30. running = false;
  31. System.out.println(channel.getStat());
  32. }
  33. public static void main(String[] args) throws InterruptedException {
  34. int nCPU = Runtime.getRuntime().availableProcessors();
  35. int consumerCount = nCPU / 2 + 1;
  36. for (int i = 0; i < nCPU; i++) {
  37. new Producer().start();
  38. }
  39. for (int i = 0; i < consumerCount; i++) {
  40. new Consumer().start();
  41. }
  42. Thread.sleep(30*1000);
  43. stop();
  44. }
  45. }

输出

</>复制代码

  1. {0=660972, 1=660613, 2=661537, 3=659846, 4=659918}

</>复制代码

  1. 从数据来看,还是相对均匀的。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/70405.html

相关文章

  • Java™ 教程(执行器)

    执行器 在前面的所有示例中,由新的线程(由其Runnable对象定义)和线程本身(由Thread对象定义)完成的任务之间存在紧密的联系,这适用于小型应用程序,但在大型应用程序中,将线程管理和创建与应用程序的其余部分分开是有意义的,封装这些函数的对象称为执行器,以下小节详细描述了执行器。 执行器接口定义三个执行器对象类型。 线程池是最常见的执行器实现类型。 Fork/Join是一个利用多个处理器的...

    马忠志 评论0 收藏0
  • 想进大厂?50个多线程面试题,你会多少?【后25题】(二)

    摘要:大多数待遇丰厚的开发职位都要求开发者精通多线程技术并且有丰富的程序开发调试优化经验,所以线程相关的问题在面试中经常会被提到。掌握了这些技巧,你就可以轻松应对多线程和并发面试了。进入等待通行准许时,所提供的对象。 最近看到网上流传着,各种面试经验及面试题,往往都是一大堆技术题目贴上去,而没有答案。 不管你是新程序员还是老手,你一定在面试中遇到过有关线程的问题。Java语言一个重要的特点就...

    caozhijian 评论0 收藏0
  • Java多线程进阶(四三)—— J.U.C之executors框架:Fork/Join框架(1) 原

    摘要:同时,它会通过的方法将自己注册到线程池中。线程池中的每个工作线程都有一个自己的任务队列,工作线程优先处理自身队列中的任务或顺序,由线程池构造时的参数决定,自身队列为空时,以的顺序随机窃取其它队列中的任务。 showImg(https://segmentfault.com/img/bVbizJb?w=1802&h=762); 本文首发于一世流云的专栏:https://segmentfau...

    cooxer 评论0 收藏0
  • 基于Fork/Join框架实现对大型浮点数数组排序(归并算法和插入排序算法)

    摘要:方法接受对象数组作为参数,目标是对数组进行升序排序。创建一个对象,并调用方法将它提交给线程池。此排序算法不直接返回结果给调用方,因此基于类。 分支/合并框架 说明 重点是那个浮点数数组排序的例子,从主函数展开,根据序号看 1、GitHub代码欢迎star。你们轻轻的一点,对我鼓励特大,我有一个习惯,看完别人的文章是会点赞的。 2、个人认为学习语言最好的方式就是模仿、思考别人为什么这么写...

    yuxue 评论0 收藏0
  • Java多线程进阶(四三)—— J.U.C之executors框架:Fork/Join框架(2)实现

    摘要:并不会为每个任务都创建工作线程,而是根据实际情况构造线程池时的参数确定是唤醒已有空闲工作线程,还是新建工作线程。 showImg(https://segmentfault.com/img/bVbiYSP?w=1071&h=707); 本文首发于一世流云的专栏:https://segmentfault.com/blog... 一、引言 前一章——Fork/Join框架(1) 原理,我们...

    FingerLiu 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<