资讯专栏INFORMATION COLUMN

并发编程中级篇三----并行设计模式----生产者-消费者模式

Aldous / 1469人阅读

摘要:生产者消费者模式是一个经典的多线程设计模式,它为多线程的协作提供了良好的解决方案。生产者消费者模式中的内存缓冲区的主要功能是数据在多线程间的共享。

生产者-消费者模式是一个经典的多线程设计模式,它为多线程的协作提供了良好的解决方案。在生产者-消费者模式中,通常有两类线程,即若干个生产者线程和若干个消费者线程。生产者线程负责提交用户请求,消费者线程负责处理用户请求。生产者和消费者之间通过共享内存缓冲区进行通信。

生产者-消费者模式中的内存缓冲区的主要功能是数据在多线程间的共享。

1.创建一个被消费的对象

    public final class Data{
        
        private String id;
        
        private String name;
        
        //getter/setter(),toString()省略,构造方法省略
    }

2.创建一个生产者

    public class Provider implements Runnable{
        
        //共享缓存区
        private BlockingQueue queue;
        //多线程间是否启动变量,有强制从主内存中刷新的功能。即时返回线程的状态
        private volatile boolean isRunning = true;
        //id生成器
        private static AtomicInteger count = new AtomicInteger();
        //随机对象
        private static Random r = new Random(); 
        
        public Provider(BlockingQueue queue){
            this.queue = queue;
        }
    
        @Override
        public void run() {
            while(isRunning){
                try {
                    //随机休眠0 - 1000 毫秒 表示获取数据(产生数据的耗时) 
                    Thread.sleep(r.nextInt(1000));
                    //获取的数据进行累计...
                    int id = count.incrementAndGet();
                    //比如通过一个getData方法获取了
                    Data data = new Data(Integer.toString(id), "数据" + id);
                    System.out.println("当前线程:" +
                     Thread.currentThread().getName() +
                      ", 获取了数据,id为:" + id +
                       ", 进行装载到公共缓冲区中...");
                    if(!this.queue.offer(data, 2, TimeUnit.SECONDS)){
                        System.out.println("提交缓冲区数据失败....");
                        //do something... 比如重新提交
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        
        public void stop(){
            this.isRunning = false;
        }
    }

3.添加一个消费者

    public class Consumer implements Runnable{
        
        private BlockingQueue queue;
    
        public Consumer(BlockingQueue queue){
            this.queue = queue;
        }
        
        //随机对象
        private static Random r = new Random(); 
    
        @Override
        public void run() {
            while(true){
                try {
                    //获取数据
                    Data data = this.queue.take();
                    //进行数据处理。休眠0 - 1000毫秒模拟耗时
                    Thread.sleep(r.nextInt(1000));
                    System.out.println("当前消费线程:" + 
                    Thread.currentThread().getName() +
                     ", 消费成功,消费数据为id: " + data.getId());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

4.定义一个测试类

    public class Main{
        
        public static void main(String[] args) throws Exception {
            //内存缓冲区
            BlockingQueue queue = new LinkedBlockingQueue(10);
            //生产者
            Provider p1 = new Provider(queue);
            
            Provider p2 = new Provider(queue);
            Provider p3 = new Provider(queue);
            //消费者
            Consumer c1 = new Consumer(queue);
            Consumer c2 = new Consumer(queue);
            Consumer c3 = new Consumer(queue);
            //创建线程池运行,这是一个缓存的线程池,可以创建无穷大的线程,
            //没有任务的时候不创建线程。空闲线程存活时间为60s(默认值)
    
            ExecutorService cachePool = Executors.newCachedThreadPool();
            cachePool.execute(p1);
            cachePool.execute(p2);
            cachePool.execute(p3);
            cachePool.execute(c1);
            cachePool.execute(c2);
            cachePool.execute(c3);
    
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            p1.stop();
            p2.stop();
            p3.stop();
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }        
    //        cachePool.shutdown(); 
    //        cachePool.shutdownNow();
            
    
        }
    }

运行结果如下所示

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/70455.html

相关文章

  • 并发编程中级篇二----并行设计模式----Master-Wroker模式

    摘要:模式是常用的并行计算模式,它的核心思想是系统是由两类进程协助工作。负责接收和分配任务,负责处理子任务。当各个子进程处理完成后,会返回结果给,由做归纳和总结。其好处是能将一个大任务分解成若干个小任务,并行执行,从而提高系统的吞吐量。 Master-worker模式是常用的并行计算模式,它的核心思想是系统是由两类进程协助工作。Master负责接收和分配任务,worker负责处理子任务。当各...

    RebeccaZhong 评论0 收藏0
  • 并发编程中级篇二----并行设计模式----Future模式

    摘要:模式类似于用户提交商品订单,下单成功以后后台异步的执行耗时的业务在包中接口是线程模式的实现,可以来进行异步计算。 Future模式类似于用户提交商品订单,下单成功以后后台异步的执行耗时的业务在java.util.concurrent包中.Future接口是Java线程Future模式的实现,可以来进行异步计算。 showImg(https://segmentfault.com/img/...

    lx1036 评论0 收藏0
  • 你和阿里资深架构师之间,差的不仅仅是年龄(进阶必看)

    摘要:导读阅读本文需要有足够的时间,笔者会由浅到深带你一步一步了解一个资深架构师所要掌握的各类知识点,你也可以按照文章中所列的知识体系对比自身,对自己进行查漏补缺,觉得本文对你有帮助的话,可以点赞关注一下。目录一基础篇二进阶篇三高级篇四架构篇五扩 导读:阅读本文需要有足够的时间,笔者会由浅到深带你一步一步了解一个资深架构师所要掌握的各类知识点,你也可以按照文章中所列的知识体系对比自身,对自己...

    huaixiaoz 评论0 收藏0
  • Java 多线程编程基础——Thread 类

    摘要:程序执行时,至少会有一个线程在运行,这个运行的线程被称为主线程。程序的终止是指除守护线程以外的线程全部终止。多线程程序由多个线程组成的程序称为多线程程序。线程休眠期间可以被中断,中断将会抛出异常。 线程 我们在阅读程序时,表面看来是在跟踪程序的处理流程,实际上跟踪的是线程的执行。 单线程程序 在单线程程序中,在某个时间点执行的处理只有一个。 Java 程序执行时,至少会有一个线程在运行...

    zhoutk 评论0 收藏0
  • 并发(concurrency)与并行(parallellism)

    摘要:并发与并行并发与并行的概念并行多个实例或者多台机器同时执行一段处理逻辑,是真正的同时。并发通过调度算法,让用户看上去同时执行,实际上从操作层面不是真正的同时。并行与并发的异同点相似性都是为了合理且最大化利用系统的资源。 并发(concurrency)与并行(parallellism) 并发与并行的概念   并行:多个cpu实例或者多台机器同时执行一段处理逻辑,是真正的同时。 ...

    KavenFan 评论0 收藏0

发表评论

0条评论

Aldous

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<