资讯专栏INFORMATION COLUMN

CountDownLatch + Callbale+FutureTask 实现异步变同步调用

张金宝 / 3599人阅读

摘要:背景通过接口实现调用发送数据,接口返回值为发送数据的对应结果。接口为同步阻塞,为异步回调方式。接收数据回调接收到数据后,通过闭锁释放阻塞的线程,同时设置结果返回给调用者

背景

通过HTTP接口实现调用MQTT Client发送数据,HTTP接口返回值为MQTT Client发送数据的对应结果。 HTTP接口为同步阻塞,MQTT Client 为异步回调方式。
如何实现在HTTP接口中调用MQTT Client发送数据后,能够阻塞等待MQTT返回结果,然后将结果返回?

解决方法

CountDownLatch + Callbale+FutureTask

1.CountDownLatch作用

CountDownLatch实现在MQTT Client 发送数据后 到接收数据后这段时间的阻塞。
HTTP每次请求,新建一个CountDownLatch,然后将CountDownLatch作为值和deviceId作为KEY保存到Map中,
调用MQTT Client 发送数据后,countDownLatch.await(),进行同步等待
在MQTT Client接收数据的回调方法中更加deviceId取出CountDwonLatch然后计数减一


2.Callbale+FutureTask作用

将调用MQTT Client发送数据的过程,封装成Callable,投递发送任务时,通过返回的FutureTask的get()方法,
同步阻塞,直到结果返回。

关键代码

1.Map保存CountDownObj用于同步阻塞等待MQTT Client返回结果,以及将返回结果传递个FutureTask

    private final static ConcurrentMap countDownLatchMap = new ConcurrentHashMap<>();
    //线程池
    private final ThreadPoolExecutor threadPoolExecutor = 
            new ThreadPoolExecutor(3, 5, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), runnable -> {
        Thread thread = new Thread(runnable, "mqtt thread");
        return thread;
    });

2.HTTP API 调用的发送MQTT 消息数据的接口

    /**
     * HTTP API 调用的发送MQTT 消息数据的接口
     * 同步阻塞
     */
    public Integer send(Long packageId, String deviceId) throws Exception {
        ......
       FutureTask futureTask = sendTask(publishDto));
       return futureTask.get()
    }

3.投递发送MQTT指令的task方法

   /**
     * 投递MQTT发送指令任务
     * 同步阻塞
     */ 
   private FutureTask sendTask(PublishDto publishDto) throws Exception {
        FutureTask futureTask = new FutureTask<>(new GetDatapointValueCallable(publishDto));
        threadPoolExecutor.execute(futureTask);
        //阻塞线程
        return futureTask;
    } 

4.封装CountDownLatch 和 Integer的对象,用于CountDownLatch阻塞控制和返回结果

    /**
     * 封装CountDownLatch 和 Integer
     * 用于CountDownLatch阻塞控制和返回结果
     */
    private class CountDownObj {
        private final CountDownLatch countDownLatch;
        private volatile Integer value;

        private CountDownObj(CountDownLatch countDownLatch) {
            this.countDownLatch = countDownLatch;
        }

        public CountDownLatch getCountDownLatch() {
            return countDownLatch;
        }

        public Integer getValue() {
            return value;
        }

        public void setValue(Integer value) {
            this.value = value;
        }
    }

5.具体发送MQTT数据的Callbale线程Task,会新建CountDownLatch,并通过CountDownLatch.await()方法阻塞,直到MQTT回调接收到数据或者超时。

    /**
     * 发送MQTT消息的任务Callable
     */
    private class GetDatapointValueCallable implements Callable {
        private final PublishDto publishDto;

        GetDatapointValueCallable(PublishDto publishDto) {
            this.publishDto = publishDto;
        }

        @Override
        public Integer call() throws Exception {
            //mqtt client 发送数据,此处具体代码省略
            ......
            
            CountDownLatch countDownLatch = new CountDownLatch(1);
            countDownLatchMap.putIfAbsent(publishDto.getDeviceId(), new CountDownObj(countDownLatch));
            //阻塞,超时时间3s
            countDownLatch.await(3, TimeUnit.SECONDS);
            //返回mqtt指令对应的结果或者null
            return countDownLatchMap.remove(publishDto.getDeviceId()).getValue();
        }

    }

6.MQTT接收数据回调,这里通过deviceId从MAP里面取到CountDownObj,释放闭锁(结束callable线程的等待)和设置MQTT返回的结果(即callable中call()返回的结果,也就是FutureTask的get()方法返回的结果)。

    /**
     * MQTT 接收数据回调
     */
    void mqttReceiveCallback(String deviceId, String datapointId, String value) {
        ......
        
        //接收到数据后,通过闭锁释放阻塞的线程,同时设置结果返回给调用者
        CountDownObj countDownObj=countDownLatchMap.get(deviceId);
        if(countDownObj!=null) {
            countDownObj.setValue(Integer.parseInt(value));
            countDownObj.getCountDownLatch().countDown();
        }
        
        .......
    }

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/68941.html

相关文章

  • Java多线程&高并发

    摘要:线程启动规则对象的方法先行发生于此线程的每一个动作。所以局部变量是不被多个线程所共享的,也就不会出现并发问题。通过获取到数据,放入当前线程处理完之后将当前线程中的信息移除。主线程必须在启动其他线程后立即调用方法。 一、线程安全性 定义:当多个线程访问某个类时,不管运行时环境采用何种调度方式,或者这些线程将如何交替执行,并且在主调代码中不需要任何额外的同步或协同,这个类都能表现出正确的行...

    SQC 评论0 收藏0
  • 40道阿里巴巴JAVA研发岗多线程面试题详解,你能答出多少

    摘要:但是单核我们还是要应用多线程,就是为了防止阻塞。多线程可以防止这个问题,多条线程同时运行,哪怕一条线程的代码执行读取数据阻塞,也不会影响其它任务的执行。 1、多线程有什么用?一个可能在很多人看来很扯淡的一个问题:我会用多线程就好了,还管它有什么用?在我看来,这个回答更扯淡。所谓知其然知其所以然,会用只是知其然,为什么用才是知其所以然,只有达到知其然知其所以然的程度才可以说是把一个知识点...

    lpjustdoit 评论0 收藏0
  • bat等大公司常考java多线程面试题

    摘要:典型地,和被用在等待另一个线程产生的结果的情形测试发现结果还没有产生后,让线程阻塞,另一个线程产生了结果后,调用使其恢复。使当前线程放弃当前已经分得的时间,但不使当前线程阻塞,即线程仍处于可执行状态,随时可能再次分得时间。 1、说说进程,线程,协程之间的区别 简而言之,进程是程序运行和资源分配的基本单位,一个程序至少有一个进程,一个进程至少有一个线程.进程在执行过程中拥有独立的内存单元...

    Charlie_Jade 评论0 收藏0
  • 想进大厂?50个多线程面试题,你会多少?【后25题】(二)

    摘要:大多数待遇丰厚的开发职位都要求开发者精通多线程技术并且有丰富的程序开发调试优化经验,所以线程相关的问题在面试中经常会被提到。掌握了这些技巧,你就可以轻松应对多线程和并发面试了。进入等待通行准许时,所提供的对象。 最近看到网上流传着,各种面试经验及面试题,往往都是一大堆技术题目贴上去,而没有答案。 不管你是新程序员还是老手,你一定在面试中遇到过有关线程的问题。Java语言一个重要的特点就...

    caozhijian 评论0 收藏0
  • java并发编程学习10--同步器--倒计时门栓

    摘要:每个工作线程在结束前将门栓计数器减一,门栓的计数变为就表明工作完成。常用方法递减锁存器的计数,如果计数到达零,则释放所有等待的线程。使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断或超出了指定的等待时间。 【同步器 java.util.concurrent包包含几个能帮助人们管理相互合作的线程集的类。这些机制具有为线程直间的共用集结点模式提供的‘预制功能’。如果有一个相互合作的...

    stackfing 评论0 收藏0

发表评论

0条评论

张金宝

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<