资讯专栏INFORMATION COLUMN

Kafka Network层解析

CoderStudy / 2086人阅读

摘要:创建一个设置为非阻塞模式创建并设置相关属性调用的方法,该方法会向远端发起建连请求因为是非阻塞的,所以该方法返回时,连接不一定已经建立好即完成次握手。

我们知道kafka是基于TCP连接的。其并没有像很多中间件使用netty作为TCP服务器。而是自己基于Java NIO写了一套。关于kafka为什么没有选用netty的原因可以看这里。

对Java NIO不太了解的同学可以先看下这两篇文章,本文需要读者对NIO有一定的了解。

https://segmentfault.com/a/11...

https://www.jianshu.com/p/0d4...

更多文章见个人博客:https://github.com/farmerjohn...

几个重要类

先看下Kafka Client的网络层架构,图片来自于这篇文章。

本文主要分析的是Network层。

Network层有两个重要的类:SelectorKafkaChannel

这两个类和Java NIO层的java.nio.channels.SelectorChannel有点类似。

Selector几个关键字段如下

</>复制代码

  1. // jdk nio中的Selector
  2. java.nio.channels.Selector nioSelector;
  3. // 记录当前Selector的所有连接信息
  4. Map channels;
  5. // 已发送完成的请求
  6. List completedSends;
  7. // 已收到的请求
  8. List completedReceives;
  9. // 还没有完全收到的请求,对上层不可见
  10. Map> stagedReceives;
  11. // 作为client端,调用connect连接远端时返回true的连接
  12. Set immediatelyConnectedKeys;
  13. // 已经完成的连接
  14. List connected;
  15. // 一次读取的最大大小
  16. int maxReceiveSize;

从网络层来看kafka是分为client端(producer和consumer,broker作为从时也是client)和server端(broker)的。本文将分析client端是如何建立连接,以及收发数据的。server也是依靠SelectorKafkaChannel进行网络传输。在Network层两端的区别并不大。

建立连接

kafka的client端启动时会调用Selector#connect(下文中如无特殊注明,均指org.apache.kafka.common.network.Selector)方法建立连接。

</>复制代码

  1. public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
  2. if (this.channels.containsKey(id))
  3. throw new IllegalStateException("There is already a connection for id " + id);
  4. // 创建一个SocketChannel
  5. SocketChannel socketChannel = SocketChannel.open();
  6. // 设置为非阻塞模式
  7. socketChannel.configureBlocking(false);
  8. // 创建socket并设置相关属性
  9. Socket socket = socketChannel.socket();
  10. socket.setKeepAlive(true);
  11. if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
  12. socket.setSendBufferSize(sendBufferSize);
  13. if (receiveBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
  14. socket.setReceiveBufferSize(receiveBufferSize);
  15. socket.setTcpNoDelay(true);
  16. boolean connected;
  17. try {
  18. // 调用SocketChannel的connect方法,该方法会向远端发起tcp建连请求
  19. // 因为是非阻塞的,所以该方法返回时,连接不一定已经建立好(即完成3次握手)。连接如果已经建立好则返回true,否则返回false。一般来说server和client在一台机器上,该方法可能返回true
  20. connected = socketChannel.connect(address);
  21. } catch (UnresolvedAddressException e) {
  22. socketChannel.close();
  23. throw new IOException("Can"t resolve address: " + address, e);
  24. } catch (IOException e) {
  25. socketChannel.close();
  26. throw e;
  27. }
  28. // 对CONNECT事件进行注册
  29. SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);
  30. KafkaChannel channel;
  31. try {
  32. // 构造一个KafkaChannel
  33. channel = channelBuilder.buildChannel(id, key, maxReceiveSize);
  34. } catch (Exception e) {
  35. ...
  36. }
  37. // 将kafkachannel绑定到SelectionKey上
  38. key.attach(channel);
  39. // 放入到map中,id是远端服务器的名称
  40. this.channels.put(id, channel);
  41. // connectct为true代表该连接不会再触发CONNECT事件,所以这里要多带带处理
  42. if (connected) {
  43. // OP_CONNECT won"t trigger for immediately connected channels
  44. log.debug("Immediately connected to node {}", channel.id());
  45. // 加入到一个多带带的集合中
  46. immediatelyConnectedKeys.add(key);
  47. // 取消对该连接的CONNECT事件的监听
  48. key.interestOps(0);
  49. }
  50. }

这里的流程和标准的NIO流程差不多,需要多带带说下的是socketChannel#connect方法返回true的场景,该方法的注释中有提到

</>复制代码

  1. *

    If this channel is in non-blocking mode then an invocation of this

  2. * method initiates a non-blocking connection operation. If the connection
  3. * is established immediately, as can happen with a local connection, then
  4. * this method returns true. Otherwise this method returns
  5. * false and the connection operation must later be completed by
  6. * invoking the {@link #finishConnect finishConnect} method.

也就是说在非阻塞模式下,对于local connection,连接可能在马上就建立好了,那该方法会返回true,对于这种情况,不会再触发之后的connect事件。因此kafka用一个多带带的集合immediatelyConnectedKeys将这些特殊的连接记录下来。在接下来的步骤会进行特殊处理。

之后会调用poll方法对网络事件监听:

</>复制代码

  1. public void poll(long timeout) throws IOException {
  2. ...
  3. // select方法是对java.nio.channels.Selector#select的一个简单封装
  4. int readyKeys = select(timeout);
  5. ...
  6. // 如果有就绪的事件或者immediatelyConnectedKeys非空
  7. if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
  8. // 对已就绪的事件进行处理,第2个参数为false
  9. pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);
  10. // 对immediatelyConnectedKeys进行处理。第2个参数为true
  11. pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
  12. }
  13. addToCompletedReceives();
  14. ...
  15. }
  16. private void pollSelectionKeys(Iterable selectionKeys,
  17. boolean isImmediatelyConnected,
  18. long currentTimeNanos) {
  19. Iterator iterator = selectionKeys.iterator();
  20. // 遍历集合
  21. while (iterator.hasNext()) {
  22. SelectionKey key = iterator.next();
  23. // 移除当前元素,要不然下次poll又会处理一遍
  24. iterator.remove();
  25. // 得到connect时创建的KafkaChannel
  26. KafkaChannel channel = channel(key);
  27. ...
  28. try {
  29. // 如果当前处理的是immediatelyConnectedKeys集合的元素或处理的是CONNECT事件
  30. if (isImmediatelyConnected || key.isConnectable()) {
  31. // finishconnect中会增加READ事件的监听
  32. if (channel.finishConnect()) {
  33. this.connected.add(channel.id());
  34. this.sensors.connectionCreated.record();
  35. ...
  36. } else
  37. continue;
  38. }
  39. // 对于ssl的连接还有些额外的步骤
  40. if (channel.isConnected() && !channel.ready())
  41. channel.prepare();
  42. // 如果是READ事件
  43. if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
  44. NetworkReceive networkReceive;
  45. while ((networkReceive = channel.read()) != null)
  46. addToStagedReceives(channel, networkReceive);
  47. }
  48. // 如果是WRITE事件
  49. if (channel.ready() && key.isWritable()) {
  50. Send send = channel.write();
  51. if (send != null) {
  52. this.completedSends.add(send);
  53. this.sensors.recordBytesSent(channel.id(), send.size());
  54. }
  55. }
  56. // 如果连接失效
  57. if (!key.isValid())
  58. close(channel, true);
  59. } catch (Exception e) {
  60. String desc = channel.socketDescription();
  61. if (e instanceof IOException)
  62. log.debug("Connection with {} disconnected", desc, e);
  63. else
  64. log.warn("Unexpected error from {}; closing connection", desc, e);
  65. close(channel, true);
  66. } finally {
  67. maybeRecordTimePerConnection(channel, channelStartTimeNanos);
  68. }
  69. }
  70. }

因为immediatelyConnectedKeys中的连接不会触发CONNNECT事件,所以在poll时会多带带对immediatelyConnectedKeys的channel调用finishConnect方法。在明文传输模式下该方法会调用到PlaintextTransportLayer#finishConnect,其实现如下:

</>复制代码

  1. public boolean finishConnect() throws IOException {
  2. // 返回true代表已经连接好了
  3. boolean connected = socketChannel.finishConnect();
  4. if (connected)
  5. // 取消监听CONNECt事件,增加READ事件的监听
  6. key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
  7. return connected;
  8. }

关于immediatelyConnectedKeys更详细的内容可以看看这里。

发送数据

kafka发送数据分为两个步骤:

1.调用Selector#send将要发送的数据保存在对应的KafkaChannel中,该方法并没有进行真正的网络IO

</>复制代码

  1. // Selector#send
  2. public void send(Send send) {
  3. String connectionId = send.destination();
  4. // 如果所在的连接正在关闭中,则加入到失败集合failedSends中
  5. if (closingChannels.containsKey(connectionId))
  6. this.failedSends.add(connectionId);
  7. else {
  8. KafkaChannel channel = channelOrFail(connectionId, false);
  9. try {
  10. channel.setSend(send);
  11. } catch (CancelledKeyException e) {
  12. this.failedSends.add(connectionId);
  13. close(channel, false);
  14. }
  15. }
  16. }
  17. //KafkaChannel#setSend
  18. public void setSend(Send send) {
  19. // 如果还有数据没有发送出去则报错
  20. if (this.send != null)
  21. throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");
  22. // 保存下来
  23. this.send = send;
  24. // 添加对WRITE事件的监听
  25. this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
  26. }

调用Selector#poll,在第一步中已经对该channel注册了WRITE事件的监听,所以在当channel可写时,会调用到pollSelectionKeys将数据真正的发送出去。

</>复制代码

  1. private void pollSelectionKeys(Iterable selectionKeys,
  2. boolean isImmediatelyConnected,
  3. long currentTimeNanos) {
  4. Iterator iterator = selectionKeys.iterator();
  5. // 遍历集合
  6. while (iterator.hasNext()) {
  7. SelectionKey key = iterator.next();
  8. // 移除当前元素,要不然下次poll又会处理一遍
  9. iterator.remove();
  10. // 得到connect时创建的KafkaChannel
  11. KafkaChannel channel = channel(key);
  12. ...
  13. try {
  14. ...
  15. // 如果是WRITE事件
  16. if (channel.ready() && key.isWritable()) {
  17. // 真正的网络写
  18. Send send = channel.write();
  19. // 一个Send对象可能会被拆成几次发送,write非空代表一个send发送完成
  20. if (send != null) {
  21. // completedSends代表已发送完成的集合
  22. this.completedSends.add(send);
  23. this.sensors.recordBytesSent(channel.id(), send.size());
  24. }
  25. }
  26. ...
  27. } catch (Exception e) {
  28. ...
  29. } finally {
  30. maybeRecordTimePerConnection(channel, channelStartTimeNanos);
  31. }
  32. }
  33. }

当可写时,会调用KafkaChannel#write方法,该方法中会进行真正的网络IO:

</>复制代码

  1. public Send write() throws IOException {
  2. Send result = null;
  3. if (send != null && send(send)) {
  4. result = send;
  5. send = null;
  6. }
  7. return result;
  8. }
  9. private boolean send(Send send) throws IOException {
  10. // 最终调用SocketChannel#write进行真正的写
  11. send.writeTo(transportLayer);
  12. if (send.completed())
  13. // 如果写完了,则移除对WRITE事件的监听
  14. transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
  15. return send.completed();
  16. }
接收数据

如果远端有发送数据过来,那调用poll方法时,会对接收到的数据进行处理。

</>复制代码

  1. public void poll(long timeout) throws IOException {
  2. ...
  3. // select方法是对java.nio.channels.Selector#select的一个简单封装
  4. int readyKeys = select(timeout);
  5. ...
  6. // 如果有就绪的事件或者immediatelyConnectedKeys非空
  7. if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
  8. // 对已就绪的事件进行处理,第2个参数为false
  9. pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);
  10. // 对immediatelyConnectedKeys进行处理。第2个参数为true
  11. pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
  12. }
  13. addToCompletedReceives();
  14. ...
  15. }
  16. private void pollSelectionKeys(Iterable selectionKeys,
  17. boolean isImmediatelyConnected,
  18. long currentTimeNanos) {
  19. Iterator iterator = selectionKeys.iterator();
  20. // 遍历集合
  21. while (iterator.hasNext()) {
  22. SelectionKey key = iterator.next();
  23. // 移除当前元素,要不然下次poll又会处理一遍
  24. iterator.remove();
  25. // 得到connect时创建的KafkaChannel
  26. KafkaChannel channel = channel(key);
  27. ...
  28. try {
  29. ...
  30. // 如果是READ事件
  31. if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
  32. NetworkReceive networkReceive;
  33. // read方法会从网络中读取数据,但可能一次只能读取一个req的部分数据。只有读到一个完整的req的情况下,该方法才返回非null
  34. while ((networkReceive = channel.read()) != null)
  35. // 将读到的请求存在stagedReceives中
  36. addToStagedReceives(channel, networkReceive);
  37. }
  38. ...
  39. } catch (Exception e) {
  40. ...
  41. } finally {
  42. maybeRecordTimePerConnection(channel, channelStartTimeNanos);
  43. }
  44. }
  45. }
  46. private void addToStagedReceives(KafkaChannel channel, NetworkReceive receive) {
  47. if (!stagedReceives.containsKey(channel))
  48. stagedReceives.put(channel, new ArrayDeque());
  49. Deque deque = stagedReceives.get(channel);
  50. deque.add(receive);
  51. }

在之后的addToCompletedReceives方法中会对该集合进行处理。

</>复制代码

  1. private void addToCompletedReceives() {
  2. if (!this.stagedReceives.isEmpty()) {
  3. Iterator>> iter = this.stagedReceives.entrySet().iterator();
  4. while (iter.hasNext()) {
  5. Map.Entry> entry = iter.next();
  6. KafkaChannel channel = entry.getKey();
  7. // 对于client端来说该isMute返回为false,server端则依靠该方法保证消息的顺序
  8. if (!channel.isMute()) {
  9. Deque deque = entry.getValue();
  10. addToCompletedReceives(channel, deque);
  11. if (deque.isEmpty())
  12. iter.remove();
  13. }
  14. }
  15. }
  16. }
  17. private void addToCompletedReceives(KafkaChannel channel, Deque stagedDeque) {
  18. // 将每个channel的第一个NetworkReceive加入到completedReceives
  19. NetworkReceive networkReceive = stagedDeque.poll();
  20. this.completedReceives.add(networkReceive);
  21. this.sensors.recordBytesReceived(channel.id(), networkReceive.payload().limit());
  22. }

读出数据后,会先放到stagedReceives集合中,然后在addToCompletedReceives方法中对于每个channel都会从stagedReceives取出一个NetworkReceive(如果有的话),放入到completedReceives中。

这样做的原因有两点:

对于SSL的连接来说,其数据内容是加密的,所以不能精准的确定本次需要读取的数据大小,只能尽可能的多读,这样会导致可能会比请求的数据读的要多。那如果该channel之后没有数据可以读,会导致多读的数据将不会被处理。

kafka需要确保一个channel上request被处理的顺序是其发送的顺序。因此对于每个channel而言,每次poll上层最多只能看见一个请求,当该请求处理完成之后,再处理其他的请求。在sever端,每次poll后都会将该channel给mute掉,即不再从该channel上读取数据。当处理完成之后,才将该channelunmute,即之后可以从该socket上读取数据。而client端则是通过InFlightRequests#canSendMore控制。

代码中关于这段逻辑的注释如下:

</>复制代码

  1. /* In the "Plaintext" setting, we are using socketChannel to read & write to the network. But for the "SSL" setting,
  2. * we encrypt the data before we use socketChannel to write data to the network, and decrypt before we return the responses.
  3. * This requires additional buffers to be maintained as we are reading from network, since the data on the wire is encrypted
  4. * we won"t be able to read exact no.of bytes as kafka protocol requires. We read as many bytes as we can, up to SSLEngine"s
  5. * application buffer size. This means we might be reading additional bytes than the requested size.
  6. * If there is no further data to read from socketChannel selector won"t invoke that channel and we"ve have additional bytes
  7. * in the buffer. To overcome this issue we added "stagedReceives" map which contains per-channel deque. When we are
  8. * reading a channel we read as many responses as we can and store them into "stagedReceives" and pop one response during
  9. * the poll to add the completedReceives. If there are any active channels in the "stagedReceives" we set "timeout" to 0
  10. * and pop response and add to the completedReceives.
  11. * Atmost one entry is added to "completedReceives" for a channel in each poll. This is necessary to guarantee that
  12. * requests from a channel are processed on the broker in the order they are sent. Since outstanding requests added
  13. * by SocketServer to the request queue may be processed by different request handler threads, requests on each
  14. * channel must be processed one-at-a-time to guarantee ordering.
  15. */
End

本文分析了kafka network层的实现,在阅读kafka源码时,如果不把network层搞清楚会比较迷,比如req/resp的顺序保障机制、真正进行网络IO的不是send方法等等。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/72791.html

相关文章

  • 快速搭建ELK日志收集(kafka队列版)

    摘要:快速搭建日志收集版本进行文章的第二次修改,包括了之前的简单方案的升级过程。分割线快速搭建日志收集第一版本新项目短时间来实现日志采集。 快速搭建elk日志收集 kafka版本 进行文章的第二次修改,包括了之前的简单方案的升级过程。 因为业务的不断更新升级,为了保证线上业务也能正常使用elk服务,并且使得elk的服务和线业务流解耦(即避免直接写入es的方式可能会带来的耗时影响)所以我们采用...

    妤锋シ 评论0 收藏0
  • 日志平台(网关) - 基于Openresty+ELKF+Kafka

    摘要:现在用方式调用接口,中使用方式输入内容日志平台网关层基于。日志平台网关层基于到此为止,提取经过网关的接口信息,并将其写入日志文件就完成了,所有的接口日志都写入了文件中。 背景介绍 1、问题现状与尝试 没有做日志记录的线上系统,绝对是给系统运维人员留下的坑。尤其是前后端分离的项目,后端的接口日志可以解决对接、测试和运维时的很多问题。之前项目上发布的接口都是通过Oracle Service...

    xumenger 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<