资讯专栏INFORMATION COLUMN

【自己读源码】Netty4.X系列(四) Netty中的异步调用

Flands / 423人阅读

摘要:今天是小明女朋友的生日,小明想给她一个惊喜,于是想到了订一个蛋糕给她,所以小明打电话给蛋糕店预定,店员回复他说好的,我们知道了,制作好了会通知你的。于是小明就开开心心的打游戏去了。值检查,整个设计中均没有对对象做的检查,容易引起。

Netty中的异步调用

如果大家观察仔细,会发现我们之前所写的代码都是串行执行的,这是什么意思?就是我们看到代码是什么顺序,最后程序就是按什么顺序执行的。

但是Netty作为一个高性能网络框架,他的调用很多都是异步的,这样,就可以不等上一步做完,继续行进下一步,达到多任务并行的作用。

实现概述

Netty是怎么实现他的异步调用呢,大致总结了下由以下几个核心部分
组成:

异步执行(executor)

异步结果(future and promise)

Listener

同步接口

首先,既然是异步调用,肯定要有异步执行,同学们这里肯定想到的是使用线程,没错,他的底层确实也是线程,只不过netty自身封装成了executor,增强了线程的调度。

其次,是要能获取到这次执行的结果,有的同学可能会说使用callable,没错这确实是一种解决方案,但是netty并没有使用这种,而是使用了一种更为巧妙的设计(也就是通过promise对象来传递执行的结果)来完成这种操作,下面我们会详细说明。

最后就是promise对象提供的各种接口,比如Listener:可以监听执行的完成。或者是同步接口:保证异步执行的方法顺序也是同步的。这篇文章中,我们主要就讲这两,三个,其他的各位童鞋可以自己去看源码。

Executor实现

Netty中每个Channel都有一个eventloop对象,实现还蛮复杂的,在这里不是重点,所以我们先实现一个,具有异步调用功能的exector。

自定义executor很简单,只要实现Executor接口就行

public class MyNettyExecutor implements Executor {
private ThreadFactory factory;

public MyNettyExecutor(ThreadFactory factory) {
    this.factory = factory;
}

public void execute(Runnable command) {
    factory.newThread(command).start();
}
}

然后在需要使用的时候,实例化这个类,这里为了增强使用,我们在类内部提供一个静态初始化方法,并提供最简单factory实现。

 public static Executor newExecutor(){
        return new MyNettyExecutor(new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r);
            }
        });
    }
Promise详解 对furture/promise的理解

我对future的认识最开始源于Java的FutureTask框架,简单来说,FutureTask是Future接口的一种实现,Future则是异步执行的结果。
而promise,从接口注释上来看,是一种可修改的Future

/**
 * Special {@link Future} which is writable.
 */

那么现在来看,一个异步结果的程序主要有下面几步

生成promise对象

具体调用的地方传入promise参数

异步调用完成后,设置promise为完成

返回future对象

其中,第三步是发生在异步调用里的,所以我们看到的顺序其实就是1->2>4,让我们来画一张图。

这其实可以用一个现实中的例子来讲述。

今天是小明女朋友的生日,小明想给她一个惊喜,于是想到了订一个蛋糕给她,所以小明打电话给蛋糕店预定,店员回复他说:好的,我们知道了,制作好了会通知你的。于是小明就开开心心的打游戏去了。

在上面的例子中,预定蛋糕就是一个异步过程,我只要通知需要做这件事的人(execute),并拿到回复(Future),然后就可以做其他事情了。然后过一段时间打电话询问蛋糕做好没(isDone),如果没做好,那就请他做好的时候通知我(listener)

所以现在我们有了异步执行,还需要什么呢?

Future和Promise的定义接口

Promise实现

然后,我们理一下需要哪些接口

isDone 判断任务是否完成

addListener

trySuccess 设置任务完成并通知所有listener

sync 同步方法,等待任务完成

定义

首先定义接口

/*listener接口,提供complete方法**/
public interface MyFutureListener>  extends EventListener {

    void operationComplete(F future);
}
/*Future接口**/
public interface MyFuture {

    boolean isDone();

    MyFuture sync() throws InterruptedException ;

    MyFuture addListener(MyFutureListener> listener);


}
/*promise接口**/
public interface MyPromise extends MyFuture{

    boolean trySuccess();

    @Override
    MyPromise addListener(MyFutureListener> listener);


}
isDone

我们假设只有完成和未完成两个状态,Promise内维护着这个状态值(初始为null),那么判断是否完成只需要判断这个值不为空就行了。

    private volatile Object result = null;

    @Override
    public boolean isDone() {
        return result != null;
    }
trySucess

那么最简单的success实现就是给这个对象赋值

    @Override
    public boolean trySuccess() {
        result = new Object();
        return true;
    }

当然,这里很不严谨,我们后面再说。

Listener接口实现

上面我们定义了listener接口,这里要实现addListener方法

    private List>> listeners;

@Override
    public MyPromise addListener(MyFutureListener> listener) {
        synchronized (this) {
            if(listeners == null){
                listeners = new ArrayList>>();
                listeners.add(listener);
            }else {
                listeners.add(listener);
            }
        }
        if (isDone()){
            for (MyFutureListener f: listeners
                    ) {
                f.operationComplete(this);
            }
        }
        return this;
    }

然后完善下success方法,成功的时候调用每一个listener的complete方法。

@Override
    public boolean trySuccess() {
        result = new Object();

        for (MyFutureListener f: listeners
             ) {
            f.operationComplete(this);
        }

        return true;
    }
同步接口实现

同步也很简单,就是先判断任务是否完成,没有完成就wait一下。注意,wait之前我们要保持同步,引入synchronized原语。

@Override
    public MyFuture sync() throws InterruptedException {
        if (isDone()){
            return this;
        }
       
         synchronized (this){
            while (!isDone()) {
            waiters++;
                try {
                    wait();
                }finally {
                    waiters--;
                }
            }
        }
      
        return this;
    }

同理,需要有地方去唤醒它,我们继续完善success方法,最终我们的trySuccess方法如下

private synchronized void checkNotify(){
        if (waiters > 0){
            notifyAll();
        }
    }

 @Override
    public boolean trySuccess() {
        result = new Object();
        checkNotify();
        for (MyFutureListener f: listeners
             ) {
            f.operationComplete(this);
        }

        return true;
    }
Demo

轮子造好了,是时候写个demo测试一下

public class MyExecutorDemo {
    public static void main(String[] args) {

        MyFuture future = asyncHello().addListener((MyFutureListener>) future1 -> System.out.println("监听到完成"));
        if (future.isDone()){
            System.out.println("异步执行完成");
        }else{
            try {
                future.sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    static MyFuture asyncHello(){
        Executor executor = MyNettyExecutor.newExecutor();
        final DefaultPromise promise = new DefaultPromise();
        executor.execute(() -> {
            System.out.println("Hello Async");
            try {
                //模拟一些操作
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            promise.trySuccess();
        });
        return promise;
    }
}
警告

不可用于生产,这个Future/promise的设计仅仅为了说明异步执行和结果,距离netty中的异步框架还缺少很多。

NULL值检查,整个设计中均没有对对象做NULL的检查,容易引起NullPointException。

异常处理缺失,对可能失败的地方做异常处理(这也是是否能用于生产的合格检验)

非完全异步,listener的通知没有使用异步

待补充(以我现在的水平,暂时想不到)

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/68619.html

相关文章

  • 自己源码Netty4.X系列(三) Channel Register

    摘要:我想这很好的解释了中,仅仅一个都这么复杂,在单线程或者说串行的程序中,编程往往是很简单的,说白了就是调用,调用,调用然后返回。 Netty源码分析(三) 前提概要 这次停更很久了,原因是中途迷茫了一段时间,不过最近调整过来了。不过有点要说下,前几天和业内某个大佬聊天,收获很多,所以这篇博文和之前也会不太一样,我们会先从如果是我自己去实现这个功能需要怎么做开始,然后去看netty源码,与...

    darkbug 评论0 收藏0
  • 自己源码Netty4.X系列(二) 启动类成员Channel

    摘要:下面无耻的贴点源码。启动类我们也学,把启动类抽象成两层,方便以后写客户端。别着急,我们慢慢来,下一篇我们会了解以及他的成员,然后,完善我们的程序,增加其接收数据的能力。文章的源码我会同步更新到我的上,欢迎大家,哈哈。 废话两句 这次更新拖了很长时间,第一是自己生病了,第二是因为最开始这篇想写的很大,然后构思了很久,发现不太合适把很多东西写在一起,所以做了点拆分,准备国庆前完成这篇博客。...

    waterc 评论0 收藏0
  • 自己源码Netty4.X系列(一) 启动类概览

    摘要:一些想法这个系列想开很久了,自己使用也有一段时间了,利用也编写了一个简单的框架,并运用到工作中了,感觉还不错,趁着这段时间工作不是很忙,来分析一波源码,提升下技术硬实力。 一些想法 这个系列想开很久了,自己使用netty也有一段时间了,利用netty也编写了一个简单的框架,并运用到工作中了,感觉还不错,趁着这段时间工作不是很忙,来分析一波源码,提升下技术硬实力。 结构 这里先看下net...

    qingshanli1988 评论0 收藏0
  • Netty4.x 源码实战系列):Pipeline全剖析

    摘要:在上一篇源码实战系列三全剖析中,我们详细分析了的初始化过程,并得出了如下结论在中,每一个都有一个对象,并且其内部本质上就是一个双向链表本篇我们将深入源码内部,对其一探究竟,给大家一个全方位解析。 在上一篇《Netty4.x 源码实战系列(三):NioServerSocketChannel全剖析》中,我们详细分析了NioServerSocketChannel的初始化过程,并得出了如下结论...

    13651657101 评论0 收藏0
  • Netty4.x 源码实战系列(二):服务端bind流程详解

    摘要:对于,目前大家只知道是个线程组,其内部到底如何实现的,它的作用到底是什么,大家也都不太清楚,由于篇幅原因,这里不作详细介绍,后面会有文章作专门详解。 在上一篇《ServerBootstrap 与 Bootstrap 初探》中,我们已经初步的了解了ServerBootstrap是netty进行服务端开发的引导类。 且在上一篇的服务端示例中,我们也看到了,在使用netty进行网络编程时,我...

    laoLiueizo 评论0 收藏0

发表评论

0条评论

Flands

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<