资讯专栏INFORMATION COLUMN

Cobar源码解析(二)

pkwenda / 3235人阅读

摘要:如果数据库检测到是连续的,则表明没有串包,如果不连续,则表示串包,数据库会直接丢弃该连接。源码分析上一节我们分析到,当一个前端连接过来,并不是直接和绑定,而是先插入到线程的注册队列中这样能释放的压力处理更多前端连接。

报文格式

这一节我们来讲Cobar Handshake的过程。

MySQL服务端和客户端交互的所有的包格式都是统一的,报文格式如下图:

MySQL报文的消息头共有4个字节,前3字节表示的是实际数据的长度(不包含消息头),并且字节是按照小端模式排放的。

第四个字节MySQL为了防止串包用的,其原理是每收到一个报文,都在sequence id上加1。如果数据库检测到sequence id是连续的,则表明没有串包,如果不连续,则表示串包,数据库会直接丢弃该连接。

小端模式就是低位字节排放在内存的低地址端,高位字节排放在内存的高地址端。

大端模式则相反。

下面是Handshake包的结构,括号内表示该字段的字节数:

seed部分是加密种子,分为前后两个部分,通过随机数生成。

源码分析

上一节我们分析到,当一个前端连接过来,并不是直接和selector绑定,而是先插入到R线程的注册队列中,这样能释放NIOAcceptor的压力,处理更多前端连接。所以,连接和selector的绑定过程是在R线程中进行的,由register方法实现,代码如下:

private void register(Selector selector) {
            NIOConnection c = null;
            while ((c = registerQueue.poll()) != null) {
                try {
                    c.register(selector);
                } catch (Throwable e) {
                    c.error(ErrorCode.ERR_REGISTER, e);
                }
            }
        }

实际的绑定操作是由NIOConnectionregister方法实现的,NIOConnection接口的抽象类是AbstractConnection,我们来看它实现的register方法:

@Override
    public void register(Selector selector) throws IOException {
        try {
            // 该连接只监听socket可读事件
            processKey = channel.register(selector, SelectionKey.OP_READ, this);
            isRegistered = true;
        } finally {
            if (isClosed.get()) {
                clearSelectionKey();
            }
        }
    }

我们发现,前端连接注册选择器时,只监听了可读事件。这是考虑到,JavaNIO属于水平触发LT(只要满足条件,就触发一个事件),使用水平触发时,如果应用程序不需要写就不要关注socket可写的事件,否则就会无限次地立即返回write ready notification,长期关注socket可写事件会出现CPU打满的情况,所以在使用JDK的NIO编程时,如果没有数据往外写,就取消写事件,有数据往外写时再注册写事件。

FrontendConnection继承了AbstractConnection,它又重新实现了register方法,代码如下:

@Override
    public void register(Selector selector) throws IOException {
        // 调用父类的register方法
        super.register(selector);
        if (!isClosed.get()) {
            // 生成认证数据
            byte[] rand1 = RandomUtil.randomBytes(8);
            byte[] rand2 = RandomUtil.randomBytes(12);

            // 保存认证数据
            byte[] seed = new byte[rand1.length + rand2.length];
            System.arraycopy(rand1, 0, seed, 0, rand1.length);
            System.arraycopy(rand2, 0, seed, rand1.length, rand2.length);
            this.seed = seed;

            // 发送握手数据包
            HandshakePacket hs = new HandshakePacket();
            hs.packetId = 0;
            hs.protocolVersion = Versions.PROTOCOL_VERSION;
            hs.serverVersion = Versions.SERVER_VERSION;
            hs.threadId = id;
            hs.seed = rand1;
            hs.serverCapabilities = getServerCapabilities();
            hs.serverCharsetIndex = (byte) (charsetIndex & 0xff);
            hs.serverStatus = 2;
            hs.restOfScrambleBuff = rand2;
            // 异步写入Handshake包
            hs.write(this);
        }
    }

该方法生成了HandShake包,和上面结构图相一致,关键是最后异步写入HandShake包的write方法,代码如下:

public void write(FrontendConnection c) {
        // 分配缓存
        ByteBuffer buffer = c.allocate();
        
        // 将HandShake包写入缓存
        BufferUtil.writeUB3(buffer, calcPacketSize());
        buffer.put(packetId);
        buffer.put(protocolVersion);
        BufferUtil.writeWithNull(buffer, serverVersion);
        BufferUtil.writeUB4(buffer, threadId);
        BufferUtil.writeWithNull(buffer, seed);
        BufferUtil.writeUB2(buffer, serverCapabilities);
        buffer.put(serverCharsetIndex);
        BufferUtil.writeUB2(buffer, serverStatus);
        buffer.put(FILLER_13);
        // buffer.position(buffer.position() + 13);
        BufferUtil.writeWithNull(buffer, restOfScrambleBuff);
        
        // 将ByteBuffer中的数据异步写入Socket
        c.write(buffer);
    }

我们再来看最后一行的write方法:

@Override
    public void write(ByteBuffer buffer) {
        // 检查连接是否关闭,若关闭则将缓存回收
        if (isClosed.get()) {
            processor.getBufferPool().recycle(buffer);
            return;
        }
        if (isRegistered) {
            try {
                // 将缓存先插入对队列中,其实就是一个循环数组,如数组已满,则 wait;
                // 这个队列是AbstractConnection的一个成员变量
                writeQueue.put(buffer);
            } catch (InterruptedException e) {
                error(ErrorCode.ERR_PUT_WRITE_QUEUE, e);
                return;
            }
            // 插入队列后,调用NIOProcessor的postWrite方法,其实就是NIOReacor的postWrite方法
            processor.postWrite(this);
        } else {
            // 若连接未注册,也回收缓存
            processor.getBufferPool().recycle(buffer);
            close();
        }
    }

我们看NIOReactor的postWrite方法:

final void postWrite(NIOConnection c) {
        reactorW.writeQueue.offer(c);
    }

其实是将连接插入到W线程的writeQueue阻塞队列中,我们再来看W线程的run方法,

@Override
        public void run() {
            NIOConnection c = null;
            for (;;) {
                try {
                    if ((c = writeQueue.take()) != null) {
                        write(c);
                    }
                } catch (Throwable e) {
                    LOGGER.warn(name, e);
                }
            }
        }
        
private void write(NIOConnection c) {
            try {
                c.writeByQueue();
            } catch (Throwable e) {
                c.error(ErrorCode.ERR_WRITE_BY_QUEUE, e);
            }
        }

轮询阻塞队列,若队列不为空,则取出连接,基于队列写方法writeByQueue将缓存中的数据写入socket,下一节再分析writeByQueue方法。

总结

阅读源码后,发现Cobar从前端连接的accept并注册selector到发送Handshake包都是异步,本质是将连接插入到R线程和W线程的阻塞队列中,不立即进行注册和写操作,从而实现整个过程的异步化,提高了Cobar的吞吐量。

以上。

原文链接

https://segmentfault.com/a/11...

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/70582.html

相关文章

  • Cobar源码解析(一)

    摘要:的使用方法就不多介绍了,本文的主要内容是剖析的源代码。而又有一个私有的静态变量,以及获取这个私有静态变量的静态方法,显然,这是一个单例设计模式,使程序运行的时候全局只有一个对象。 简介 当业务的数据量和访问量急剧增加的情况下,我们需要对数据进行水平拆分,从而降低单库的压力,并且数据的水平拆分需要对业务透明,屏蔽掉水平拆分的细节。并且,前端业务的高并发会导致后端的数据库连接过多,从而DB...

    jiekechoo 评论0 收藏0
  • 【深度】| 值得收藏的阿里开源技术

    摘要:淘宝定制基于,是国内第一个优化定制且开源的服务器版虚拟机。数据库开源数据库是基于官方版本的一个分支,由阿里云数据库团队维护,目前也应用于阿里巴巴集团业务以及阿里云数据库服务。淘宝服务器是由淘宝网发起的服务器项目。 Java JAVA 研发框架 SOFAStack SOFAStack(Scalable Open Financial Architecture Stack)是用于快速构建金融...

    econi 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<