资讯专栏INFORMATION COLUMN

【源起Netty 正传】Netty Channel

jindong / 2682人阅读

摘要:搞懂了这部分后,我们将明白在世界中扮演的角色进击的此图展示的已经算是优化后的了用到了线程池。多线程将这种处理操作分隔出来,非型操作业务操作配备以线程池,进化成多线程模型这样的架构,系统瓶颈转移至部分。

Channel定位

注意:如无特别说明,文中的Channel都指的是Netty Channel(io.netty.channel)

一周时间的Channel家族学习,一度让我怀疑人生——研究这个方法有没有用?学习Netty是不是有点儿下了高速走乡间小路的意思?我为啥要读源码?
之所以产生这些疑问,除了我本身心理活动丰富以外,主要病因在于没搞清楚Channel在Netty体系中的定位。而没能清晰理解Netty的定位,也默默的送出了一记助攻。

作些本质思考:Netty是一个NIO框架,是一个嫁接在java NIO基础上的框架

宏观上可以这么理解,见下图:

先不急着聊Channel,回顾下IO演进过程,重点关注IO框架的结构变化。搞懂了这部分后,我们将明白Channel在IO世界中扮演的角色!

进击的IO BIO

此图展示的已经算是优化后的BIO了——用到了线程池。显然,每一个client都需要server端付出一个Thread的代价,即使你通过线程池做了优化,由于受到线程个数的制约,激增的客户端依旧表现的“欲求不满”。

NIO

Acceptor注册Selector,监听accept事件

当客户端连接后,触发accept事件

服务器构建对应的Channel,并在其上注册Selector,监听读写事件

当发生读写事件后,进行相应的读写处理

Reactor单线程

与NIO模型相似,当然也就有和NIO同样的问题:selector/reactor单个线程处理多个channel的各种操作,如果其中一个channel的事件处理延缓了,将影响其它channel。

Reactor多线程

将read/write这种io处理操作分隔出来,非io型操作(业务操作)配备以线程池,进化成reactor多线程模型:

这样的架构,系统瓶颈转移至Reactor部分。而目前劳苦功高的Reactor作了两件事:
1.接收客户端链接请求
2.处理IO型读写操作

主从Reactor

将接收client链接的功能再次拆分出来:

Netty恰恰就是主从Reactor模型的实践者,想想服务端创建时的代码:

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)

...

从nio时代的模型图上开始出现channel(java channel),它的定位就是进行诸如connect、write、read、close等底层交互。概括一下,java channel是上承selector下连socket的存在。而netty channel,则把java channel当作了底层

源码分析 类结构

清楚了Channel的定位,接下来对其常用api进行分析。

首先拍出类图:

其实Channel内部还有一套体系,Unsafe家族:

Unsafe是Channel的内置类(接口),与java channel交互的重任最终会落到Unsafe身上。

write方法

write只是将数据写入到了ChannelOutboundBuffer中,并没有真正的发送出去,到flush方法调用时,才写入到java channel中发送给对方。

下面列出AbstractChannel的write方法,值得关注的地方已打上中文注释:

@Override
public final void write(Object msg, ChannelPromise promise) {
    assertEventLoop();

    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null) {
        // If the outboundBuffer is null we know the channel was closed and so
        // need to fail the future right away. If it is not null the handling of the rest
        // will be done in flush0()
        // See https://github.com/netty/netty/issues/2362
        safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
        // release message now to prevent resource-leak
        ReferenceCountUtil.release(msg);
        return;
    }

    int size;
    try {
        msg = filterOutboundMessage(msg);   //作消息的包装,转换成ByteBuf等
        size = pipeline.estimatorHandle().size(msg);
        if (size < 0) {
            size = 0;
        }
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        ReferenceCountUtil.release(msg);
        return;
    }

    outboundBuffer.addMessage(msg, size, promise);    //msg消息写入ChannelOutboundBuffer
}

上述代码最后一行,msg写入了ChannelOutboundBuffer的尾节点tailEntry,同时将unflushedEntry赋值暂存。代码展开如下:

public void addMessage(Object msg, int size, ChannelPromise promise) {
    Entry entry = Entry.newInstance(msg, size, total(msg), promise);
    if (tailEntry == null) {
        flushedEntry = null;
        tailEntry = entry;
    } else {
        Entry tail = tailEntry;
        tail.next = entry;
        tailEntry = entry;
    }
    if (unflushedEntry == null) {    //注释一、标记成“未刷新”的数据
        unflushedEntry = entry;
    }

    incrementPendingOutboundBytes(entry.pendingSize, false);
}
ChannelOutboundBuffer类

这里对ChannelOutboundBuffer类进行简单说明,按惯例先看类注释。

/**
 * (Transport implementors only) an internal data structure used by {@link AbstractChannel} to store its pending
 * outbound write requests.
 *
 *  省略...
 */

前文提到过,write方法将消息写到ChannelOutboundBuffer,算是数据暂存;之后的flush再将消息刷到java channel乃至客户端。

来张示意图,方便理解:

图中列出的三个属性,在write->ChannelOutboundBuffer->flush的数据流转过程中比较关键。Entry是啥?ChannelOutboundBuffer的静态内部类,典型的链表结构数据:

static final class Entry {

        Entry next;
        
        // 省略...
}

write方法的最后部分(注释一位置)调用outboundBuffer.addMessage(msg, size, promise),已将封装msg的Entry赋值给tailEntry和unflushedEntry;而flush方法,通过调用outboundBuffer.addFlush()(下文,注释二位置),将unflushedEntry间接赋值给了flushedEntry

public void addFlush() {
    Entry entry = unflushedEntry;
    if (entry != null) {
        if (flushedEntry == null) {
            // there is no flushedEntry yet, so start with the entry
            flushedEntry = entry;
        }
        do {
            flushed ++;
            if (!entry.promise.setUncancellable()) {
                // Was cancelled so make sure we free up memory and notify about the freed bytes
                int pending = entry.cancel();
                decrementPendingOutboundBytes(pending, false, true);
            }
            entry = entry.next;
        } while (entry != null);

        // All flushed so reset unflushedEntry
        unflushedEntry = null;
    }
}
flush方法

直接从AbstractChannel的flush方法开始(若以Channel的flush为开端会经pipeline,将有很长调用链,省略):

public final void flush() {
    assertEventLoop();

    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null) {
        return;
    }

    outboundBuffer.addFlush();    //注释二、标记成“已刷新”数据
    flush0();    //数据处理
}

outboundBuffer.addFlush()方法已经分析过了,跟踪调用链flush0->doWrite,我们看下AbstractNioByteChanneldoWrite方法:

@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    int writeSpinCount = config().getWriteSpinCount();    //自旋计数,限制循环次数,默认16
    do {
        Object msg = in.current();    //flushedEntry的msg
        if (msg == null) {
            // Wrote all messages.
            clearOpWrite();
            // Directly return here so incompleteWrite(...) is not called.
            return;
        }
        writeSpinCount -= doWriteInternal(in, msg);
    } while (writeSpinCount > 0);

    incompleteWrite(writeSpinCount < 0);
}

writeSpinCount是个自旋计数,类似于自旋锁的设定,防止当前IO线程由于网络等原因无尽执行写操作,而使得线程假死,造成资源浪费

观察doWriteInternal方法,关键处依旧中文注释伺候:

private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception {
    if (msg instanceof ByteBuf) {
        ByteBuf buf = (ByteBuf) msg;
        if (!buf.isReadable()) {    //writerIndex - readerIndex >0 ? true: flase
            in.remove();
            return 0;
        }

        final int localFlushedAmount = doWriteBytes(buf);   //返回实际写入到java channel的字节数
        if (localFlushedAmount > 0) {   //写入成功
            in.progress(localFlushedAmount);
            /**
             * 1.已经全部写完,执行in.remove()
             * 2.“写半包”场景,直接返回1。
             *   外层方法的自旋变量writeSpinCount递减成15,轮询再次执行本方法
             */
            if (!buf.isReadable()) {
                in.remove();
            }
            return 1;
        }
    } else if (msg instanceof FileRegion) {
    
        //“文件型”消息处理逻辑省略..
        
    } else {
        // Should not reach here.
        throw new Error();
    }
    return WRITE_STATUS_SNDBUF_FULL;    //发送缓冲区满,值=Integer.MAX_VALUE
}

回到doWrite方法,最后执行了incompleteWrite(writeSpinCount < 0)

protected final void incompleteWrite(boolean setOpWrite) {
    // Did not write completely.
    if (setOpWrite) {
        setOpWrite();
    } else {
        // Schedule flush again later so other tasks can be picked up in the meantime
        Runnable flushTask = this.flushTask;
        if (flushTask == null) {
            flushTask = this.flushTask = new Runnable() {
                @Override
                public void run() {
                    flush();
                }
            };
        }
        eventLoop().execute(flushTask);
    }
}

这里的设定挺有意思:

如果 setOpWrite = writeSpinCount < 0 = true,即 doWriteInternal方法返回值 = WRITE_STATUS_SNDBUF_FULL(发送缓冲区满)时,设置写操作位:

protected final void setOpWrite() {
    final SelectionKey key = selectionKey();
    // Check first if the key is still valid as it may be canceled as part of the deregistration
    // from the EventLoop
    // See https://github.com/netty/netty/issues/2104
    if (!key.isValid()) {
        return;
    }
    final int interestOps = key.interestOps();
    if ((interestOps & SelectionKey.OP_WRITE) == 0) {
        key.interestOps(interestOps | SelectionKey.OP_WRITE);
    }
}

其实就是设置SelectionKey的OP_WRITE操作位,在selector/reactor下次轮询的时候,将再次执行写操作

如果 setOpWrite = writeSpinCount < 0 = false,即 doWriteInternal方法返回值 = 1,16次写半包仍旧没将消息发送出去,则通过定时器再次执行flush:

public Channel flush() {
    pipeline.flush();
    return this;
}

结论:前者由于发送缓冲区满,已无法写入数据,于是继希望于selector的下次轮询;后者则可能只是因为自旋次数少,引起的数据发送不完全,直接将任务再次放入pipeline,而无需等待selector。
这无疑是种优化,细节之处,功力尽显!

感谢

高性能Server---Reactor模型

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/68810.html

相关文章

  • 源起Netty 正传】升级版卡车——ByteBuf

    摘要:之所以称它为卡车,只因编程思想中有段比喻我们可以把它想象成一个煤矿,通道是一个包含煤层数据的矿藏,而缓冲器则是派送到矿藏中的卡车。那么升级版卡车,自然指的就是。结构和功能之所以再次打造了升级版的缓冲器,显然是不满中的某些弊端。 卡车 卡车指的是java原生类ByteBuffer,这兄弟在NIO界大名鼎鼎,与Channel、Selector的铁三角组合构筑了NIO的核心。之所以称它为卡车...

    Jason_Geng 评论0 收藏0
  • 源起Netty 前传】Linux网络模型小记

    摘要:非阻塞模型这种也很好理解,由阻塞的死等系统响应进化成多次调用查看数据就绪状态。复用模型,以及它的增强版就属于该种模型。此时用户进程阻塞在事件上,数据就绪系统予以通知。信号驱动模型应用进程建立信号处理程序时,是非阻塞的。 引言 之前的两篇文章 FastThreadLocal怎么Fast?、ScheduledThreadPoolExecutor源码解读 搞的我心力交瘁,且读源码过程中深感功...

    Null 评论0 收藏0
  • 源起Netty 外传】ServiceLoader详解

    摘要:答曰摸索直译为服务加载器,最终目的是获取的实现类。代码走起首先,要有一个接口形状接口介绍然后,要有该接口的实现类。期具体实现依靠的内部类,感性趣的朋友可以自己看一下。总结重点在于可跨越包获取,这一点笔者通过多模块项目亲测延时加载特性 前戏 netty源码注释有云: ... If a provider class has been installed in a jar file tha...

    MoAir 评论0 收藏0
  • 源起Netty 外传】System.getPropert()详解

    摘要:阅读源码时,发现很多,理所当然会想翻阅资料后,该技能,姿势如下环境中的全部属性全部属性注意如果将本行代码放在自定义属性之后,会不会打出把自定义属性也给获取到可以结论会获取目前环境中全部的属性值,无论系统提供还是个人定义系统提供属性代码中定义 阅读源码时,发现很多System.getProperty(xxx),理所当然会想:whats fucking this? 翻阅资料后,Get该技能...

    lixiang 评论0 收藏0
  • 源起Netty 外传】FastThreadLocal怎么Fast?

    摘要:实现原理浅谈帮助理解的示意图中有一属性,类型是的静态内部类。刚刚说过,是一个中的静态内部类,则是的内部节点。这个会在线程中,作为其属性初始是一个数组的索引,达成与类似的效果。的方法被调用时,会根据记录的槽位信息进行大扫除。 概述 FastThreadLocal的类名本身就充满了对ThreadLocal的挑衅,快男FastThreadLocal是怎么快的?源码中类注释坦白如下: /** ...

    gxyz 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<