资讯专栏INFORMATION COLUMN

Netty实现心跳检测与断线重连

RobinQu / 2657人阅读

摘要:使用实现心跳机制代码环境和具体思路如下使用提供的来检测读写操作的空闲时间使用序列化客户端空闲后向服务端发送一个心跳包服务端空闲后心跳丢失计数器丢失的心跳包数量当丢失的心跳包数量超过个时,主动断开该客户端的断开连接后,客户端之后重新连接代码已

使用Netty实现心跳机制 代码环境:JDK1.8和Netty4.x 具体思路如下:

使用Netty提供的IdleStateHandler来检测读写操作的空闲时间

使用Protocol Buffer序列化

客户端write空闲5s后向服务端发送一个心跳包

服务端read空闲6s后心跳丢失计数器+1(丢失的心跳包数量)

当丢失的心跳包数量超过3个时,主动断开该客户端的channel

断开连接后,客户端10s之后重新连接

代码已上传至GitHub:完整代码地址 代码实现: 数据包结构(proto文件)

</>复制代码

  1. option java_outer_classname = "PacketProto";
  2. message Packet {
  3. // 包的类型
  4. enum PacketType {
  5. // 心跳包
  6. HEARTBEAT = 1;
  7. // 非心跳包
  8. DATA = 2;
  9. }
  10. // 包类型
  11. required PacketType packetType = 1;
  12. // 数据部分(可选,心跳包不包含数据部分)
  13. optional string data = 2;
  14. }
ClientHeartbeatHandler类

</>复制代码

  1. public class ClientHeartbeatHandler extends ChannelInboundHandlerAdapter {
  2. @Override
  3. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  4. System.out.println("--- Server is active ---");
  5. }
  6. @Override
  7. public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  8. System.out.println("--- Server is inactive ---");
  9. // 10s 之后尝试重新连接服务器
  10. System.out.println("10s 之后尝试重新连接服务器...");
  11. Thread.sleep(10 * 1000);
  12. Client.doConnect();
  13. }
  14. @Override
  15. public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  16. if (evt instanceof IdleStateEvent) {
  17. // 不管是读事件空闲还是写事件空闲都向服务器发送心跳包
  18. sendHeartbeatPacket(ctx);
  19. }
  20. }
  21. @Override
  22. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  23. System.out.println("连接出现异常");
  24. }
  25. /**
  26. * 发送心跳包
  27. *
  28. * @param ctx
  29. */
  30. private void sendHeartbeatPacket(ChannelHandlerContext ctx) {
  31. Packet.Builder builder = newBuilder();
  32. builder.setPacketType(Packet.PacketType.HEARTBEAT);
  33. Packet packet = builder.build();
  34. ctx.writeAndFlush(packet);
  35. }
  36. }
Client类

</>复制代码

  1. public class Client {
  2. private static Channel ch;
  3. private static Bootstrap bootstrap;
  4. public static void main(String[] args) {
  5. NioEventLoopGroup workGroup = new NioEventLoopGroup();
  6. try {
  7. bootstrap = new Bootstrap();
  8. bootstrap
  9. .group(workGroup)
  10. .channel(NioSocketChannel.class)
  11. .handler(new ChannelInitializer() {
  12. @Override
  13. protected void initChannel(SocketChannel ch) throws Exception {
  14. ChannelPipeline pipeline = ch.pipeline();
  15. pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
  16. pipeline.addLast(new ProtobufEncoder());
  17. pipeline.addLast(new IdleStateHandler(0, 5, 0));
  18. pipeline.addLast(new ClientHeartbeatHandler());
  19. }
  20. });
  21. // 连接服务器
  22. doConnect();
  23. // 模拟不定时发送向服务器发送数据的过程
  24. Random random = new Random();
  25. while (true) {
  26. int num = random.nextInt(21);
  27. Thread.sleep(num * 1000);
  28. PacketProto.Packet.Builder builder = newBuilder();
  29. builder.setPacketType(PacketProto.Packet.PacketType.DATA);
  30. builder.setData("我是数据包(非心跳包) " + num);
  31. PacketProto.Packet packet = builder.build();
  32. ch.writeAndFlush(packet);
  33. }
  34. } catch (InterruptedException e) {
  35. e.printStackTrace();
  36. } finally {
  37. workGroup.shutdownGracefully();
  38. }
  39. }
  40. /**
  41. * 抽取出该方法 (断线重连时使用)
  42. *
  43. * @throws InterruptedException
  44. */
  45. public static void doConnect() throws InterruptedException {
  46. ch = bootstrap.connect("127.0.0.1", 20000).sync().channel();
  47. }
  48. }
ServerHeartbeatHandler类

</>复制代码

  1. public class ServerHeartbeatHandler extends ChannelInboundHandlerAdapter {
  2. // 心跳丢失计数器
  3. private int counter;
  4. @Override
  5. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  6. System.out.println("--- Client is active ---");
  7. }
  8. @Override
  9. public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  10. System.out.println("--- Client is inactive ---");
  11. }
  12. @Override
  13. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  14. // 判断接收到的包类型
  15. if (msg instanceof Packet) {
  16. Packet packet = (Packet) msg;
  17. switch (packet.getPacketType()) {
  18. case HEARTBEAT:
  19. handleHeartbreat(ctx, packet);
  20. break;
  21. case DATA:
  22. handleData(ctx, packet);
  23. break;
  24. default:
  25. break;
  26. }
  27. }
  28. }
  29. @Override
  30. public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  31. if (evt instanceof IdleStateEvent) {
  32. // 空闲6s之后触发 (心跳包丢失)
  33. if (counter >= 3) {
  34. // 连续丢失3个心跳包 (断开连接)
  35. ctx.channel().close().sync();
  36. System.out.println("已与Client断开连接");
  37. } else {
  38. counter++;
  39. System.out.println("丢失了第 " + counter + " 个心跳包");
  40. }
  41. }
  42. }
  43. @Override
  44. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  45. System.out.println("连接出现异常");
  46. }
  47. /**
  48. * 处理心跳包
  49. *
  50. * @param ctx
  51. * @param packet
  52. */
  53. private void handleHeartbreat(ChannelHandlerContext ctx, Packet packet) {
  54. // 将心跳丢失计数器置为0
  55. counter = 0;
  56. System.out.println("收到心跳包");
  57. ReferenceCountUtil.release(packet);
  58. }
  59. /**
  60. * 处理数据包
  61. *
  62. * @param ctx
  63. * @param packet
  64. */
  65. private void handleData(ChannelHandlerContext ctx, Packet packet) {
  66. // 将心跳丢失计数器置为0
  67. counter = 0;
  68. String data = packet.getData();
  69. System.out.println(data);
  70. ReferenceCountUtil.release(packet);
  71. }
  72. }
Server类

</>复制代码

  1. public class Server {
  2. public static void main(String[] args) {
  3. NioEventLoopGroup acceptorGroup = new NioEventLoopGroup(1);
  4. NioEventLoopGroup workerGroup = new NioEventLoopGroup();
  5. try {
  6. ServerBootstrap bootstrap = new ServerBootstrap();
  7. bootstrap
  8. .group(acceptorGroup, workerGroup)
  9. .channel(NioServerSocketChannel.class)
  10. .childHandler(new ChannelInitializer() {
  11. @Override
  12. protected void initChannel(SocketChannel ch) throws Exception {
  13. ChannelPipeline pipeline = ch.pipeline();
  14. pipeline.addLast(new ProtobufVarint32FrameDecoder());
  15. pipeline.addLast(new ProtobufDecoder(PacketProto.Packet.getDefaultInstance()));
  16. pipeline.addLast(new IdleStateHandler(6, 0, 0));
  17. pipeline.addLast(new ServerHeartbeatHandler());
  18. }
  19. });
  20. Channel ch = bootstrap.bind(20000).sync().channel();
  21. System.out.println("Server has started...");
  22. ch.closeFuture().sync();
  23. } catch (InterruptedException e) {
  24. e.printStackTrace();
  25. } finally {
  26. acceptorGroup.shutdownGracefully();
  27. workerGroup.shutdownGracefully();
  28. }
  29. }
  30. }

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/65289.html

相关文章

  • 浅析 Netty 实现心跳机制断线重连

    摘要:基础何为心跳顾名思义所谓心跳即在长连接中客户端和服务器之间定期发送的一种特殊的数据包通知对方自己还在线以确保连接的有效性为什么需要心跳因为网络的不可靠性有可能在保持长连接的过程中由于某些突发情况例如网线被拔出突然掉电等会造成服务器和客户端的 基础 何为心跳 顾名思义, 所谓 心跳, 即在 TCP 长连接中, 客户端和服务器之间定期发送的一种特殊的数据包, 通知对方自己还在线, 以确保 ...

    waterc 评论0 收藏0
  • 使用Netty,我们到底在开发些什么?

    摘要:比如面向连接的功能包发送接收数量包发送接收速率错误计数连接重连次数调用延迟连接状态等。你要处理的,就是心跳超时的逻辑,比如延迟重连。发生异常后,可以根据不同的类型选择断线重连比如一些二进制协议的编解码紊乱问题,或者调度到其他节点。 在java界,netty无疑是开发网络应用的拿手菜。你不需要太多关注复杂的nio模型和底层网络的细节,使用其丰富的接口,可以很容易的实现复杂的通讯功能。 和...

    DesGemini 评论0 收藏0
  • 使用Netty,我们到底在开发些什么?

    摘要:比如面向连接的功能包发送接收数量包发送接收速率错误计数连接重连次数调用延迟连接状态等。你要处理的,就是心跳超时的逻辑,比如延迟重连。发生异常后,可以根据不同的类型选择断线重连比如一些二进制协议的编解码紊乱问题,或者调度到其他节点。 在java界,netty无疑是开发网络应用的拿手菜。你不需要太多关注复杂的nio模型和底层网络的细节,使用其丰富的接口,可以很容易的实现复杂的通讯功能。 和...

    MSchumi 评论0 收藏0
  • 长连接的心跳重连设计

    摘要:超过后则认为服务端出现故障,需要重连。同时在每次心跳时候都用当前时间和之前服务端响应绑定到上的时间相减判断是否需要重连即可。客户端检测到某个服务端迟迟没有响应心跳也能重连获取一个新的连接。 showImg(https://segmentfault.com/img/remote/1460000017987884?w=800&h=536); 前言 说道心跳这个词大家都不陌生,当然不是指男女...

    dreamGong 评论0 收藏0

发表评论

0条评论

RobinQu

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<