资讯专栏INFORMATION COLUMN

【源起Netty 正传】升级版卡车——ByteBuf

Jason_Geng / 2814人阅读

摘要:之所以称它为卡车,只因编程思想中有段比喻我们可以把它想象成一个煤矿,通道是一个包含煤层数据的矿藏,而缓冲器则是派送到矿藏中的卡车。那么升级版卡车,自然指的就是。结构和功能之所以再次打造了升级版的缓冲器,显然是不满中的某些弊端。

卡车

卡车指的是java原生类ByteBuffer,这兄弟在NIO界大名鼎鼎,与Channel、Selector的铁三角组合构筑了NIO的核心。之所以称它为卡车,只因《编程思想》中有段比喻:

我们可以把它想象成一个煤矿,通道(Channel)是一个包含煤层(数据)的矿藏,而缓冲器(ByteBuffer)则是派送到矿藏中的卡车。卡车满载煤炭而归,我们再从卡车上获得煤炭。也就是说,我们并没有直接和通道交互;我们只是和缓冲器交互,并把缓冲器派送到通道。

那么升级版卡车,自然指的就是ByteBuf

结构和功能

Netty之所以再次打造了升级版的缓冲器,显然是不满ByteBuffer中的某些弊端。

ByteBuffer长度固定

使用者经常需要调用flip()、rewind()方法调整position的位置,不方便

API功能有限

ByteBuffer中有三个重要的位置属性:position、limit、capacity,一个写操作之后大概是这样的

如若想进行读操作,那么flip()的调用是少不了的,从图中不难看出,目前position到limit啥也没有。
调用flip()之后则不一样了(我们不一样~):

而ByteBuf的人设则不相同,它的两个位置属性readIndexwriteIndex,分别和读操作、写操作相对应。“写”不操作readIndex,“读”不操作writeIndex,两者不会相互干扰。这里盗几张图说明下好了:

初始状态

写入N个字节

读取M个(M

释放已读缓存discardReadBytes

重点在于ByteBuf的read和write相关方法,已经封装好了对readIndex、writeIndex位置索引的操作,不需要使用者繁琐的flip()。且write()方法中,ByteBuf设计了自动扩容,这一点后续章节会进行详细说明。

功能方面,主要关注两点:

Derived buffers,类似于数据库视图。ByteBuf提供了多个接口用于创建某ByteBuf的视图或复制ByteBuf:

duplicate:返回当前ByteBuf的复制对象,缓冲区内容共享(修改复制的ByteBuf,原来的ByteBuf内容也随之改变),索引独立维护。

copy:内容和索引都独立。

slice:返回当前ByteBuf的可读子缓冲区,内容共享,索引独立。

转换成ByteBuffer
nio的SocketChanel进行网络操作,还是操作的java原生的ByteBuffer,所以ByteBuf转换成ByteBuffer的需求还是有市场的。

ByteBuffer nioBuffer():当前ByteBuf的可读缓冲区转换成ByteBuffer,缓冲区内容共享,索引独立。需要指出的是,返回后的ByteBuffer无法感知原ByteBuf的动态扩展操作。

ByteBuf星系

称之为“星系”,是因为ByteBuf一脉涉及到的类实在太多了,但多而不乱,归功于类关系结构的设计。

类关系结构

依然盗图:

从内存分配角度,ByteBuf可分为两类

堆内存HeapByteBuf字节缓冲区

直接内存DirectByteBuf字节缓冲区

从内存回收角度,ByteBuf也可分为两类:

普通缓冲区UnpooledByteBuf

池化缓冲区PooledByteBuf

纵观该关继承节构,给我留下的印象就是每层各司其职:读操作以及其它的一些公共功能由父类实现,差异化功能由子类实现。

下面聊下笔者感兴趣的几个点……

AbstractByteBuf的写操作簇

AbstractByteBuf的写操作有很多,这里以writeBytes(byte[] src, int srcIndex, int length)方法为例

@Override
public ByteBuf writeBytes(byte[] src, int srcIndex, int length) {
    ensureWritable(length);    //一、确保可写,对边界进行验证
    setBytes(writerIndex, src, srcIndex, length);    //二、写入操作,不同类型的子类实现方式不同
    writerIndex += length;
    return this;
}

注释部分分别展开看下。

注释一、确保可写,对边界进行验证

跟调用栈ensureWritable -> ensureWritable0,观察ensureWritable0方法

final void ensureWritable0(int minWritableBytes) {
    ensureAccessible();    //确保对象可用
    if (minWritableBytes <= writableBytes()) {
        return;
    }

    if (minWritableBytes > maxCapacity - writerIndex) {
        throw new IndexOutOfBoundsException(String.format(
                "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
                writerIndex, minWritableBytes, maxCapacity, this));
    }

    // Normalize the current capacity to the power of 2.
    // 三、计算扩容量
    int newCapacity = alloc().calculateNewCapacity(writerIndex + minWritableBytes, maxCapacity);

    // Adjust to the new capacity.
    capacity(newCapacity);    //四、内存分配
}

比较

先对要写入的字节数minWritableBytes进行判断:如果minWritableBytes < capacity - writeIndex,那么很好,不需要扩容;如果minWritableBytes > maxCapacity - writerIndex,也就是要写入字节数超过了允许的最大字节数,直接抛出越界异常IndexOutOfBoundsException。

眼尖的朋友可能发现了,两次用来判断的上界并不相同——capacity / maxCapacity。maxCapacity是AbstractByteBuf的属性,而capacity设定在其子类中。简单看下UnpooledDirectByteBuf的构造函数:

public UnpooledDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
    super(maxCapacity);    //为AbstractByteBuf的maxCapacity属性赋值
    
    /**
     *    ……
     *    省略无关部分
     */
     
    setByteBuffer(ByteBuffer.allocateDirect(initialCapacity));    //capacity赋值
}

也就是说,ByteBuf的结构,可看成这样:

扩容计算

@Override
public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
    if (minNewCapacity < 0) {
        throw new IllegalArgumentException("minNewCapacity: " + minNewCapacity + " (expected: 0+)");
    }
    if (minNewCapacity > maxCapacity) {
        throw new IllegalArgumentException(String.format(
                "minNewCapacity: %d (expected: not greater than maxCapacity(%d)",
                minNewCapacity, maxCapacity));
    }

    /** 
     *  设置阀值为4MB
     *  1.如果扩展的容量大于阀值,对扩张后的内存和最大内存进行比较:大于最大长度使用最大长度,否则步进4M
     *  2.如果需要扩展的容量小于阀值,以64进行计数倍增:64->128->256;为防止倍增过猛,最后与最大值再次进行比较
     */
    final int threshold = CALCULATE_THRESHOLD; // 4 MiB page

    if (minNewCapacity == threshold) {
        return threshold;
    }

    // If over threshold, do not double but just increase by threshold.
    if (minNewCapacity > threshold) {
        int newCapacity = minNewCapacity / threshold * threshold;
        if (newCapacity > maxCapacity - threshold) {
            newCapacity = maxCapacity;
        } else {
            newCapacity += threshold;
        }
        return newCapacity;
    }

    // Not over threshold. Double up to 4 MiB, starting from 64.
    int newCapacity = 64;
    while (newCapacity < minNewCapacity) {
        newCapacity <<= 1;
    }

    return Math.min(newCapacity, maxCapacity);
}

具体的扩容策略,已拍入注释中,尽可查看!

注释二、写入操作,不同类型的子类实现方式不同

对比下UnpooledDirectByteBufUnpooledHeapByteBuf的实现

UnpooledDirectByteBuf

@Override
public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
    checkSrcIndex(index, length, srcIndex, src.length);
    ByteBuffer tmpBuf = internalNioBuffer();    //分配
    tmpBuf.clear().position(index).limit(index + length);
    tmpBuf.put(src, srcIndex, length);
    return this;
}

UnpooledHeapByteBuf

@Override
public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
    checkSrcIndex(index, length, srcIndex, src.length);
    System.arraycopy(src, srcIndex, array, index, length); //分配
    return this;
}

篇幅有限,不展开说了,结论就是:
UnpooledDirectByteBuf的底层实现为ByteBuffer.allocateDirect,分配时复制体通过buffer.duplicate()获取复制体;而UnpooledHeapByteBuf的底层实现为byte[],分配时通过System.arraycopy方法拷贝副本。

AbstractReferenceCountedByteBuf

AbstractReferenceCountedByteBuf的名字就挺有意思——“引用计数”,一副JVM垃圾回收的即视感。而事实上,也差不多一个意思。

看下类属性:

private static final AtomicIntegerFieldUpdater refCntUpdater =
          AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");

private volatile int refCnt;

以原子方式更新属性的AtomicIntegerFieldUpdater起了关键作用,将会对volatile修饰的refCnt进行更新,见retain方法(下面展示的是retain的关键部分retain0):

private ByteBuf retain0(final int increment) {
    int oldRef = refCntUpdater.getAndAdd(this, increment);
    if (oldRef <= 0 || oldRef + increment < oldRef) {
        // Ensure we don"t resurrect (which means the refCnt was 0) and also that we encountered an overflow.
        refCntUpdater.getAndAdd(this, -increment);
        throw new IllegalReferenceCountException(oldRef, increment);
    }
    return this;
}

源码阅读很有意思的一点就是能看到些自己不熟悉的类,比如AtomicIntegerFieldUpdater我以前就没接触过!

内存池

内存池可有效的提升效率,道理和线程池、数据库连接池相通,即省去了重复创建销毁的过程

到目前为止,看到的都是ByteBuf中的各Unpooled实现,而池化版的ByteBuf没怎么提过。为何如此?因为池化的实现较复杂,以我现在的功力尚不能完全掌握透彻。

先聊下内存池的设计思路,涨涨姿势:
为了集中集中管理内存的分配和释放,同事提高分配和释放内存时候的性能,很多框架和应用都会通过预先申请一大块内存,然后通过提供相应的分配和释放接口来使用内存。这样一来,堆内存的管理就被集中到几个类或函数中,由于不再频繁使用系统调用来申请和释放内存,应用或系统的性能也会大大提高。 ——节选自《Netty权威指南》

Netty的ByteBuf内存池也是按照这个思路搞的。首先,看下官方注释:

/**
 * Notation: The following terms are important to understand the code
 * > page  - a page is the smallest unit of memory chunk that can be allocated
 * > chunk - a chunk is a collection of pages
 * > in this code chunkSize = 2^{maxOrder} * pageSize
 */

这里面有两个重要的概念page(页)和chunk(块),chunk管理多个page组成二叉树结构,大概就是这个样子:

选择二叉树是有原因的:

/**
 * To search for the first offset in chunk that has at least requested size available we construct a
 * complete balanced binary tree and store it in an array (just like heaps) - memoryMap
 */

为了在chunk中找到至少可用的size的偏移量offset。
继线性结构后,人们又发明了树形结构的意义在于“提升查询效率”,也同样是这里选择二叉树的原因。

小于一个page的内存,直接在PoolSubpage中分配完成。

某块内存是否分配,将通过状态位进行标识。

后记

一如既往的啰嗦几句,最近工作忙,更新文章较慢,希望自己能坚持,如发现问题望大家指正!
thanks..

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/68669.html

相关文章

  • 源起Netty 正传Netty Channel

    摘要:搞懂了这部分后,我们将明白在世界中扮演的角色进击的此图展示的已经算是优化后的了用到了线程池。多线程将这种处理操作分隔出来,非型操作业务操作配备以线程池,进化成多线程模型这样的架构,系统瓶颈转移至部分。 Channel定位 注意:如无特别说明,文中的Channel都指的是Netty Channel(io.netty.channel) 一周时间的Channel家族学习,一度让我怀疑人生——...

    jindong 评论0 收藏0
  • Netty 4.x 官方入门指南 [译]

    摘要:目前为止,我们已经完成了一半的工作,剩下的就是在方法中启动服务器。第一个通常被称为,负责接收已到达的。这两个指针恰好标记着数据的起始终止位置。 前言 本篇翻译自netty官方Get Start教程,一方面能把好的文章分享给各位,另一方面能巩固所学的知识。若有错误和遗漏,欢迎各位指出。 https://netty.io/wiki/user-gu... 面临的问题 我们一般使用专用软件或者...

    刘玉平 评论0 收藏0
  • Netty ByteBuf 谁负责谁释放

    摘要:转发自 转发自 http://netty.io/wiki/referenc... Since Netty version 4, the life cycle of certain objects are managed by their reference counts, so that Netty can return them (or their shared resources)...

    Lyux 评论0 收藏0
  • 对于 Netty ByteBuf 的零拷贝(Zero Copy) 的理解

    摘要:根据对的定义即所谓的就是在操作数据时不需要将数据从一个内存区域拷贝到另一个内存区域因为少了一次内存的拷贝因此的效率就得到的提升在层面上的通常指避免在用户态与内核态之间来回拷贝数据例如提供的系统调用它可以将一段用户空间内存映射到内 根据 Wiki 对 Zero-copy 的定义: Zero-copy describes computer operations in which the C...

    ConardLi 评论0 收藏0
  • Netty ByteBuf

    摘要:提供了作为它的字节容器但是这个类使用起来过于复杂而且也有些繁琐的的代替品是的的数据处理通过两个组件暴露下面是的优点它可以被用户自定义的缓冲区类扩展通过内置的复合缓冲区类型实现了透明的零拷贝容量可以按需增长在读和写这两种模式之间雀环不需要调用 Java NIO 提供了 ByteBuffer 作为它的字节容器, 但是这个类使用起来过于复杂, 而且也有些繁琐. Netty 的 ByteBuf...

    whataa 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<