资讯专栏INFORMATION COLUMN

【JAVA新生】拿协程开始写个异步io应用

singerye / 1267人阅读

摘要:接下来,就看怎么用协程来实现异步了。直接拿的原始写代码会死人的。引入协程就是为了把上下连续的业务逻辑放在一个协程里,把与业务关系不大的的处理部分放到框架的里。第三部分是放弃掉执行权。这样一个只能接收打印一行的异步应用就写好了。

前面已经准备好了greenlet对应的Java版本了,一个删减后的kilim(http://segmentfault.com/blog/taowen/1190000000697487)。接下来,就看怎么用协程来实现异步io了。首先,拿一段最最简单的tcp socket accept的代码:

Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(9090));
serverSocketChannel.configureBlocking(false);
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("listening...");
selector.select();
scheduler.accept(serverSocketChannel);
System.out.println("hello");

这里使用的是java 6的NIO1的selector模型。直接拿NIO的原始api写代码会死人的。引入协程就是为了把上下连续的业务逻辑放在一个协程里,把与业务关系不大的selector的处理部分放到框架的ioloop里。也就是把一段交织的代码,分成两个关注点不同的组成部分。
改造之后的代码在这里: https://github.com/taowen/daili/tree/1e319f929678213a8d8f63ee5e8b8cf016637317
这是改造之后的效果:

Scheduler scheduler = new Scheduler();
DailiTask task = new DailiTask(scheduler) {
    @Override
    public void execute() throws Pausable, Exception {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.socket().bind(new InetSocketAddress(9090));
        serverSocketChannel.configureBlocking(false);
        System.out.println("listening...");
        scheduler.accept(serverSocketChannel);
        System.out.println("hello");
    }
};
scheduler.callSoon(task);
scheduler.loop();

其中最关键的一行是 scheduler.accept(serverSocketChannel); 这个调用是阻塞的。但是只阻塞调用它的Task协程。如果有多个Task并行的话,别的Task可以在这个时候被运行。那么scheduler.accept是如何做到把NIO的selector api转换成这样的形式的呢?

public SocketChannel accept(ServerSocketChannel serverSocketChannel) throws IOException, Pausable {
    SocketChannel socketChannel = serverSocketChannel.accept();
    if (null != socketChannel) {
        return socketChannel;
    }
    SelectionKey selectionKey = serverSocketChannel.keyFor(selector);
    if (null == selectionKey) {
        selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, new WaitingSelectorIO());
    } else {
        selectionKey.interestOps(selectionKey.interestOps() | SelectionKey.OP_ACCEPT);
    }
    WaitingSelectorIO waitingSelectorIO = (WaitingSelectorIO) selectionKey.attachment();
    waitingSelectorIO.acceptBlockedAt = System.currentTimeMillis();
    waitingSelectorIO.acceptTask = (Task) Task.getCurrentTask();
    selectionKey.attach(waitingSelectorIO);
    Task.pause(waitingSelectorIO);
    return serverSocketChannel.accept();
}

这个函数分成四部分:第一部分是尝试去accept,如果有戏就不用NIO了。第二部分是注册selection key,说明我希望知道什么时候可以accept了,并把task作为附件加上去。第三部分是Task.pause放弃掉执行权。第四部分是task被回调了,说明等待的accept已经ok了,可以去调用了。
但是Task.pause了之后,是谁在把这个暂停的task重新拉起来执行的呢?这个就是scheduler的loop干的活了

public void loop() throws IOException {
    while (true) {
        executeReadyTasks();
        selector.select();
        Set selectionKeys = selector.selectedKeys();
        for (SelectionKey selectionKey : selectionKeys) {
            WaitingSelectorIO waitingSelectorIO = (WaitingSelectorIO) selectionKey.attachment();
            if (selectionKey.isAcceptable()) {
                Task taskToCall = waitingSelectorIO.acceptTask;
                waitingSelectorIO.acceptBlockedAt = 0;
                waitingSelectorIO.acceptTask = null;
                callSoon(taskToCall);
            }
        }
    }
}

在循环中调用selector.select获得网络事件的通知。如果selection key就绪了,就把附件里的task取出来回调。具体的回调发生在executeReadyTasks内部,其实就是调用一下resume而已。

private void executeReadyTasks() {
    Task task;
    while((task = readyTasks.poll()) != null) {
        executeTask(task);
    }
}

private void executeTask(Task task) {
    try {
        task.resume();
    } catch (Exception e) {
        LOGGER.error("failed to execute task: " + task, e);
    }
}

这样一个只能接收telnet 127.0.0.1 9090打印一行hello的异步io应用就写好了。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/64155.html

相关文章

  • JAVA新生】echo server的第n种写法

    摘要:基本上所有的网络应用都会示范一个的写法。除了这些操作的主体是而不是,操作的是,而不是。以为例其过程是这样的这段代码就是创建一个,并注册一个,并把附着到上。关键之一显然是利用了协程的和,把回调转换成顺序的逻辑执行。 基本上所有的网络应用都会示范一个tcp的echo写法。前面我们已经看到了如何使用协程和异步io来做tcp服务器的第一步,accept。下面是一个完整的echo server的...

    Luosunce 评论0 收藏0
  • JAVA新生】nio attach引发的问题

    摘要:理由是如果到了上,而这个对应的操作迟迟不能就绪被出来。但我认为这其实是一个超时处理问题。问题是,原生的是没有超时支持的。如果是回调性质的,一般的做法是正常就绪给一个,超时给另外一个。只要时间合理,作者之前所说的会引发的问题并不会出现。 grizzly框架的作者曾经提到NIO框架不应该使用selection key的attach功能(链接)。理由是如果attach到了selection ...

    ruicbAndroid 评论0 收藏0
  • JAVA新生】kilim版的协程

    摘要:试用了一下,发现它是基于的主要是。于是拿的代码改了一个纯协程的版本出来。两个都是以提供和为主要,把协程的隐藏在下面。为了搞一个更简单的,纯协程来玩,把里无关的代码都给删了。结果在这里和官方的版本的不同在于,添加了一个方法相当于的。 试用了一下 http://docs.paralleluniverse.co/quasar/,发现它是基于JDK 1.7的(主要是fork join pool...

    Barry_Ng 评论0 收藏0
  • PHP回顾之协程

    摘要:本文先回顾生成器,然后过渡到协程编程。其作用主要体现在三个方面数据生成生产者,通过返回数据数据消费消费者,消费传来的数据实现协程。解决回调地狱的方式主要有两种和协程。重点应当关注控制权转让的时机,以及协程的运作方式。 转载请注明文章出处: https://tlanyan.me/php-review... PHP回顾系列目录 PHP基础 web请求 cookie web响应 sess...

    Java3y 评论0 收藏0

发表评论

0条评论

singerye

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<