资讯专栏INFORMATION COLUMN

手把手教你基于Netty实现一个基础的RPC框架(通俗易懂)

番茄西红柿 / 3090人阅读

摘要:是一个分布式服务框架,以及治理方案。手写注意要点手写注意要点基于上文中对于协议的理解,如果我们自己去实现,需要考虑哪些技术呢其实基于图的整个流程应该有一个大概的理解。基于手写实现基于手写实现理解了协议后,我们基于来实现一个通信框架。

阅读这篇文章之前,建议先阅读和这篇文章关联的内容。

[1]详细剖析分布式微服务架构下网络通信的底层实现原理(图解)

[2][年薪60W的技巧]工作了5年,你真的理解Netty以及为什么要用吗?(深度干货)

[3]深度解析Netty中的核心组件(图解+实例)

[4]BAT面试必问细节:关于Netty中的ByteBuf详解

[5]通过大量实战案例分解Netty中是如何解决拆包黏包问题的?

[6]基于Netty实现自定义消息通信协议(协议设计及解析应用实战)

[7]全网最详细最齐全的序列化技术及深度解析与应用实战

在前面的内容中,我们已经由浅入深的理解了Netty的基础知识和实现原理,相信大家已经对Netty有了一个较为全面的理解。那么接下来,我们通过一个手写RPC通信的实战案例来带大家了解Netty的实际应用。

为什么要选择RPC来作为实战呢?因为Netty本身就是解决通信问题,而在实际应用中,RPC协议框架是我们接触得最多的一种,所以这个实战能让大家了解到Netty实际应用之外,还能理解RPC的底层原理。

什么是RPC

RPC全称为(Remote Procedure Call),是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议,简单理解就是让开发者能够像调用本地服务一样调用远程服务。

既然是协议,那么它必然有协议的规范,如图6-1所示。

为了达到“让开发者能够像调用本地服务那样调用远程服务”的目的,RPC协议需像图6-1那样实现远程交互。

  • 客户端调用远程服务时,必须要通过本地动态代理模块来屏蔽网络通信的细节,所以动态代理模块需要负责将请求参数、方法等数据组装成数据包发送到目标服务器
  • 这个数据包在发送时,还需要遵循约定的消息协议以及序列化协议,最终转化为二进制数据流传输
  • 服务端收到数据包后,先按照约定的消息协议解码,得到请求信息。
  • 服务端再根据请求信息路由调用到目标服务,获得结果并返回给客户端。

1567677351249

图6-1

业内主流的RPC框架

凡是满足RPC协议的框架,我们成为RPC框架,在实际开发中,我们可以使用开源且相对成熟的RPC框架解决微服务架构下的远程通信问题,常见的rpc框架:

  1. Thrift:thrift是一个软件框架,用来进行可扩展且跨语言的服务的开发。它结合了功能强大的软件堆栈和代码生成引擎,以构建在 C++, Java, Python, PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, JavaScript, Node.js, Smalltalk, and OCaml 这些编程语言间无缝结合的、高效的服务。
  2. Dubbo:Dubbo是一个分布式服务框架,以及SOA治理方案。其功能主要包括:高性能NIO通讯及多协议集成,服务动态寻址与路由,软负载均衡与容错,依赖分析与降级等。 Dubbo是阿里巴巴内部的SOA服务化治理方案的核心框架,Dubbo自2011年开源后,已被许多非阿里系公司使用。

手写RPC注意要点

基于上文中对于RPC协议的理解,如果我们自己去实现,需要考虑哪些技术呢? 其实基于图6-1的整个流程应该有一个大概的理解。

  • 通信协议,RPC框架对性能的要求非常高,所以通信协议应该是越简单越好,这样可以减少编解码带来的性能损耗,大部分主流的RPC框架会直接选择TCP、HTTP协议。
  • 序列化和反序列化,数据要进行网络传输,需要对数据进行序列化和反序列化,前面我们说过,所谓的序列化和反序列化是不把对象转化成二进制流以及将二进制流转化成对象的过程。在序列化框架选择上,我们一般会选择高效且通用的算法,比如FastJson、Protobuf、Hessian等。这些序列化技术都要比原生的序列化操作更加高效,压缩比也较高。
  • 动态代理, 客户端调用远程服务时,需要通过动态代理来屏蔽网络通信细节。而动态代理又是在运行过程中生成的,所以动态代理类的生成速度、字节码大小都会影响到RPC整体框架的性能和资源消耗。常见的动态代理技术: Javassist、Cglib、JDK的动态代理等。

基于Netty手写实现RPC

理解了RPC协议后,我们基于Netty来实现一个RPC通信框架。

代码详见附件 netty-rpc-example

image-20210907221358022

图6-2 项目模块组成

需要引入的jar包:

    org.springframework.boot    spring-boot-starter    org.projectlombok    lombok    com.alibaba    fastjson    1.2.72    io.netty    netty-all

模块依赖关系:

  • provider依赖 netty-rpc-protocol和netty-rpc-api

  • cosumer依赖 netty-rpc-protocol和netty-rpc-api

netty-rpc-api模块

image-20210907223045613

图6-3 netty-rpc-api模块组成

IUserService

public interface IUserService {    String saveUser(String name);}

netty-rpc-provider模块

image-20210907223111784

图6-4 netty-rpc-provider模块组成

UserServiceImpl

@Service@Slf4jpublic class UserServiceImpl implements IUserService {    @Override    public String saveUser(String name) {        log.info("begin saveUser:"+name);        return "Save User Success!";    }}

NettyRpcProviderMain

注意,在当前步骤中,描述了case的部分,暂时先不用加,后续再加上

@ComponentScan(basePackages = {"com.example.spring","com.example.service"})  //case1(后续再加上)@SpringBootApplicationpublic class NettyRpcProviderMain {    public static void main(String[] args) throws Exception {        SpringApplication.run(NettyRpcProviderMain.class, args);        new NettyServer("127.0.0.1",8080).startNettyServer();   //case2(后续再加上)    }}

netty-rpc-protocol

开始写通信协议模块,这个模块主要做几个事情

  • 定义消息协议
  • 定义序列化反序列化方法
  • 建立netty通信

图6-5

定义消息协议

之前我们讲过自定义消息协议,我们在这里可以按照下面这个协议格式来定义好。

    /*    +----------------------------------------------+    | 魔数 2byte | 序列化算法 1byte | 请求类型 1byte  |    +----------------------------------------------+    | 消息 ID 8byte     |      数据长度 4byte       |    +----------------------------------------------+    */
@AllArgsConstructor@Datapublic class Header implements Serializable {    /*    +----------------------------------------------+    | 魔数 2byte | 序列化算法 1byte | 请求类型 1byte  |    +----------------------------------------------+    | 消息 ID 8byte     |      数据长度 4byte       |    +----------------------------------------------+    */    private short magic; //魔数-用来验证报文的身份(2个字节)    private byte serialType; //序列化类型(1个字节)    private byte reqType; //操作类型(1个字节)    private long requestId; //请求id(8个字节)    private int length; //数据长度(4个字节)}

RpcRequest

@Datapublic class RpcRequest implements Serializable {    private String className;    private String methodName;    private Object[] params;    private Class[] parameterTypes;}

RpcResponse

@Datapublic class RpcResponse implements Serializable {    private Object data;    private String msg;}

RpcProtocol

@Datapublic class RpcProtocol implements Serializable {    private Header header;    private T content;}

定义相关常量

上述消息协议定义中,涉及到几个枚举相关的类,定义如下

ReqType

消息类型

public enum ReqType {    REQUEST((byte)1),    RESPONSE((byte)2),    HEARTBEAT((byte)3);    private byte code;    private ReqType(byte code) {        this.code=code;    }    public byte code(){        return this.code;    }    public static ReqType findByCode(int code) {        for (ReqType msgType : ReqType.values()) {            if (msgType.code() == code) {                return msgType;            }        }        return null;    }}

SerialType

序列化类型

public enum SerialType {    JSON_SERIAL((byte)0),    JAVA_SERIAL((byte)1);    private byte code;    SerialType(byte code) {        this.code=code;    }    public byte code(){        return this.code;    }}

RpcConstant

public class RpcConstant {    //header部分的总字节数    public final static int HEAD_TOTAL_LEN=16;    //魔数    public final static short MAGIC=0xca;}

定义序列化相关实现

这里演示两种,一种是JSON方式,另一种是Java原生的方式

ISerializer

public interface ISerializer {     byte[] serialize(T obj);     T deserialize(byte[] data,Class clazz);    byte getType();}

JavaSerializer

public class JavaSerializer implements ISerializer{    @Override    public  byte[] serialize(T obj) {        ByteArrayOutputStream byteArrayOutputStream=                new ByteArrayOutputStream();        try {            ObjectOutputStream outputStream=                    new ObjectOutputStream(byteArrayOutputStream);            outputStream.writeObject(obj);            return  byteArrayOutputStream.toByteArray();        } catch (IOException e) {            e.printStackTrace();        }        return new byte[0];    }    @Override    public  T deserialize(byte[] data, Class clazz) {        ByteArrayInputStream byteArrayInputStream=new ByteArrayInputStream(data);        try {            ObjectInputStream objectInputStream=                    new ObjectInputStream(byteArrayInputStream);            return (T) objectInputStream.readObject();        } catch (IOException e) {            e.printStackTrace();        } catch (ClassNotFoundException e) {            e.printStackTrace();        }        return null;    }    @Override    public byte getType() {        return SerialType.JAVA_SERIAL.code();    }}

JsonSerializer

public class JsonSerializer implements ISerializer{    @Override    public  byte[] serialize(T obj) {        return JSON.toJSONString(obj).getBytes();    }    @Override    public  T deserialize(byte[] data, Class clazz) {        return JSON.parseObject(new String(data),clazz);    }    @Override    public byte getType() {        return SerialType.JSON_SERIAL.code();    }}

SerializerManager

实现对序列化机制的管理

public class SerializerManager {    private final static ConcurrentHashMap serializers=new ConcurrentHashMap();    static {        ISerializer jsonSerializer=new JsonSerializer();        ISerializer javaSerializer=new JavaSerializer();        serializers.put(jsonSerializer.getType(),jsonSerializer);        serializers.put(javaSerializer.getType(),javaSerializer);    }    public static ISerializer getSerializer(byte key){        ISerializer serializer=serializers.get(key);        if(serializer==null){            return new JavaSerializer();        }        return serializer;    }}

定义编码和解码实现

由于自定义了消息协议,所以 需要自己实现编码和解码,代码如下

RpcDecoder

@Slf4jpublic class RpcDecoder extends ByteToMessageDecoder {    /*    +----------------------------------------------+    | 魔数 2byte | 序列化算法 1byte | 请求类型 1byte  |    +----------------------------------------------+    | 消息 ID 8byte     |      数据长度 4byte       |    +----------------------------------------------+    */    @Override    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception {        log.info("==========begin RpcDecoder ==============");        if(in.readableBytes()< RpcConstant.HEAD_TOTAL_LEN){            //消息长度不够,不需要解析            return;        }        in.markReaderIndex();//标记一个读取数据的索引,后续用来重置。        short magic=in.readShort(); //读取magic        if(magic!=RpcConstant.MAGIC){            throw new IllegalArgumentException("Illegal request parameter magic,"+magic);        }        byte serialType=in.readByte(); //读取序列化算法类型        byte reqType=in.readByte(); //请求类型        long requestId=in.readLong(); //请求消息id        int dataLength=in.readInt(); //请求数据长度        //可读区域的字节数小于实际数据长度        if(in.readableBytes() reqProtocol=new RpcProtocol<>();                reqProtocol.setHeader(header);                reqProtocol.setContent(request);                out.add(reqProtocol);                break;            case RESPONSE:                RpcResponse response=serializer.deserialize(content,RpcResponse.class);                RpcProtocol resProtocol=new RpcProtocol<>();                resProtocol.setHeader(header);                resProtocol.setContent(response);                out.add(resProtocol);                break;            case HEARTBEAT:                break;            default:                break;        }    }}

RpcEncoder

@Slf4jpublic class RpcEncoder extends MessageToByteEncoder> {    /*    +----------------------------------------------+    | 魔数 2byte | 序列化算法 1byte | 请求类型 1byte  |    +----------------------------------------------+    | 消息 ID 8byte     |      数据长度 4byte       |    +----------------------------------------------+    */    @Override    protected void encode(ChannelHandlerContext ctx, RpcProtocol msg, ByteBuf out) throws Exception {        log.info("=============begin RpcEncoder============");        Header header=msg.getHeader();        out.writeShort(header.getMagic()); //写入魔数        out.writeByte(header.getSerialType()); //写入序列化类型        out.writeByte(header.getReqType());//写入请求类型        out.writeLong(header.getRequestId()); //写入请求id        ISerializer serializer= SerializerManager.getSerializer(header.getSerialType());        byte[] data=serializer.serialize(msg.getContent()); //序列化        header.setLength(data.length);        out.writeInt(data.length); //写入消息长度        out.writeBytes(data);    }}

NettyServer

实现NettyServer构建。

@Slf4jpublic class NettyServer{    private String serverAddress; //地址    private int serverPort; //端口    public NettyServer(String serverAddress, int serverPort) {        this.serverAddress = serverAddress;        this.serverPort = serverPort;    }    public void startNettyServer() throws Exception {        log.info("begin start Netty Server");        EventLoopGroup bossGroup=new NioEventLoopGroup();        EventLoopGroup workGroup=new NioEventLoopGroup();        try {            ServerBootstrap bootstrap = new ServerBootstrap();            bootstrap.group(bossGroup, workGroup)                .channel(NioServerSocketChannel.class)                .childHandler(new RpcServerInitializer());            ChannelFuture channelFuture = bootstrap.bind(this.serverAddress, this.serverPort).sync();            log.info("Server started Success on Port:{}", this.serverPort);            channelFuture.channel().closeFuture().sync();        }catch (Exception e){            log.error("Rpc Server Exception",e);        }finally {            workGroup.shutdownGracefully();            bossGroup.shutdownGracefully();        }    }}

RpcServerInitializer

public class RpcServerInitializer extends ChannelInitializer {    @Override    protected void initChannel(SocketChannel ch) throws Exception {        ch.pipeline()            .addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,12,4,0,0))            .addLast(new RpcDecoder())            .addLast(new RpcEncoder())            .addLast(new RpcServerHandler());    }}

RpcServerHandler

public class RpcServerHandler extends SimpleChannelInboundHandler> {    @Override    protected void channelRead0(ChannelHandlerContext ctx, RpcProtocol msg) throws Exception {        RpcProtocol resProtocol=new RpcProtocol<>();        Header header=msg.getHeader();        header.setReqType(ReqType.RESPONSE.code());        Object result=invoke(msg.getContent());        resProtocol.setHeader(header);        RpcResponse response=new RpcResponse();        response.setData(result);        response.setMsg("success");        resProtocol.setContent(response);        ctx.writeAndFlush(resProtocol);    }    private Object invoke(RpcRequest request){        try {            Class clazz=Class.forName(request.getClassName());            Object bean= SpringBeansManager.getBean(clazz); //获取实例对象(CASE)            Method declaredMethod=clazz.getDeclaredMethod(request.getMethodName(),request.getParameterTypes());            return declaredMethod.invoke(bean,request.getParams());        } catch (ClassNotFoundException | NoSuchMethodException e) {            e.printStackTrace();        } catch (IllegalAccessException e) {            e.printStackTrace();        } catch (InvocationTargetException e) {            e.printStackTrace();        }        return null;    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {        super.exceptionCaught(ctx, cause);    }}

SpringBeansManager

@Componentpublic class SpringBeansManager implements ApplicationContextAware {    private static ApplicationContext applicationContext;    @Override    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {        SpringBeansManager.applicationContext=applicationContext;    }    public static  T getBean(Class clazz){        return applicationContext.getBean(clazz);    }}

需要注意,这个类的构建好之后,需要在netty-rpc-provider模块的main方法中增加compone-scan进行扫描

@ComponentScan(basePackages = {"com.example.spring","com.example.service"})  //修改这里@SpringBootApplicationpublic class NettyRpcProviderMain {    public static void main(String[] args) throws Exception {        SpringApplication.run(NettyRpcProviderMain.class, args);        new NettyServer("127.0.0.1",8080).startNettyServer();  // 修改这里    }}

netty-rpc-consumer

接下来开始实现消费端

RpcClientProxy

public class RpcClientProxy {        public  T clientProxy(final Class interfaceCls,final String host,final int port){        return (T) Proxy.newProxyInstance                (interfaceCls.getClassLoader(),                        new Class[]{interfaceCls},                        new RpcInvokerProxy(host,port));    }}

RpcInvokerProxy

@Slf4jpublic class RpcInvokerProxy implements InvocationHandler {    private String serviceAddress;    private int servicePort;    public RpcInvokerProxy(String serviceAddress, int servicePort) {        this.serviceAddress = serviceAddress;        this.servicePort = servicePort;    }    @Override    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {        log.info("begin invoke target server");        //组装参数        RpcProtocol protocol=new RpcProtocol<>();        long requestId= RequestHolder.REQUEST_ID.incrementAndGet();        Header header=new Header(RpcConstant.MAGIC, SerialType.JSON_SERIAL.code(), ReqType.REQUEST.code(),requestId,0);        protocol.setHeader(header);        RpcRequest request=new RpcRequest();        request.setClassName(method.getDeclaringClass().getName());        request.setMethodName(method.getName());        request.setParameterTypes(method.getParameterTypes());        request.setParams(args);        protocol.setContent(request);        //发送请求        NettyClient nettyClient=new NettyClient(serviceAddress,servicePort);        //构建异步数据处理        RpcFuture future=new RpcFuture<>(new DefaultPromise<>(new DefaultEventLoop()));        RequestHolder.REQUEST_MAP.put(requestId,future);        nettyClient.sendRequest(protocol);        return future.getPromise().get().getData();    }}

定义客户端连接

在netty-rpc-protocol这个模块的protocol包路径下,创建NettyClient

@Slf4jpublic class NettyClient {    private final Bootstrap bootstrap;    private final EventLoopGroup eventLoopGroup=new NioEventLoopGroup();    private String serviceAddress;    private int servicePort;    public NettyClient(String serviceAddress,int servicePort){        log.info("begin init NettyClient");        bootstrap=new Bootstrap();        bootstrap.group(eventLoopGroup)                .channel(NioSocketChannel.class)                .handler(new RpcClientInitializer());        this.serviceAddress=serviceAddress;        this.servicePort=servicePort;    }    public void sendRequest(RpcProtocol protocol) throws InterruptedException {        ChannelFuture future=bootstrap.connect(this.serviceAddress,this.servicePort).sync();        future.addListener(listener->{            if(future.isSuccess()){                log.info("connect rpc server {} success.",this.serviceAddress);            }else{                log.error("connect rpc server {} failed .",this.serviceAddress);                future.cause().printStackTrace();                eventLoopGroup.shutdownGracefully();            }        });        log.info("begin transfer data");        future.channel().writeAndFlush(protocol);    }}

RpcClientInitializer

@Slf4jpublic class RpcClientInitializer extends ChannelInitializer {    @Override    protected void initChannel(SocketChannel ch) throws Exception {        log.info("begin initChannel");        ch.pipeline()                .addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,12,4,0,0))                .addLast(new LoggingHandler())                .addLast(new RpcEncoder())                .addLast(new RpcDecoder())                .addLast(new RpcClientHandler());    }}

RpcClientHandler

需要注意,Netty的通信过程是基于入站出站分离的,所以在获取结果时,我们需要借助一个Future对象来完成。

@Slf4jpublic class RpcClientHandler extends SimpleChannelInboundHandler> {    @Override    protected void channelRead0(ChannelHandlerContext ctx, RpcProtocol msg) throws Exception {        log.info("receive rpc server result");        long requestId=msg.getHeader().getRequestId();        RpcFuture future=RequestHolder.REQUEST_MAP.remove(requestId);        future.getPromise().setSuccess(msg.getContent()); //返回结果    }}

Future的实现

在netty-rpc-protocol模块中添加rpcFuture实现

RpcFuture

@Datapublic class RpcFuture {    //Promise是可写的 Future, Future自身并没有写操作相关的接口,    // Netty通过 Promise对 Future进行扩展,用于设置IO操作的结果    private Promise promise;    public RpcFuture(Promise promise) {        this.promise = promise;    }}

RequestHolder

保存requestid和future的对应结果

public class RequestHolder {    public static final AtomicLong REQUEST_ID=new AtomicLong();    public static final Map REQUEST_MAP=new ConcurrentHashMap<>();}

需要源码的同学,请关注公众号[跟着Mic学架构],回复关键字[rpc],即可获得

版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 Mic带你学架构
如果本篇文章对您有帮助,还请帮忙点个关注和赞,您的坚持是我不断创作的动力。欢迎关注「跟着Mic学架构」公众号公众号获取更多技术干货!

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/123637.html

相关文章

  • #yyds干货盘点#学不懂Netty?看不懂源码?不存在,这篇文章把手带你阅读Netty源码

    摘要:简单来说就是把注册的动作异步化,当异步执行结束后会把执行结果回填到中抽象类一般就是公共逻辑的处理,而这里的处理主要就是针对一些参数的判断,判断完了之后再调用方法。 阅读这篇文章之前,建议先阅读和这篇文章关联的内容。 1. 详细剖析分布式微服务架构下网络通信的底层实现原理(图解) 2. (年薪60W的技巧)工作了5年,你真的理解Netty以及为什么要用吗?(深度干货)...

    zsirfs 评论0 收藏0
  • dubbo源码解析(一)Hello,Dubbo

    摘要:英文全名为,也叫远程过程调用,其实就是一个计算机通信协议,它是一种通过网络从远程计算机程序上请求服务而不需要了解底层网络技术的协议。 Hello,Dubbo 你好,dubbo,初次见面,我想和你交个朋友。 Dubbo你到底是什么? 先给出一套官方的说法:Apache Dubbo是一款高性能、轻量级基于Java的RPC开源框架。 那么什么是RPC? 文档地址:http://dubbo.a...

    evin2016 评论0 收藏0
  • 从0到1搭建RPC框架

    摘要:前言此博客所述项目代码已在开源欢迎大家一起贡献点此进入最近一次写博客还是年底谢谢大家持久以来的关注本篇博文将会教大家如何从到搭建一个简单高效且拓展性强的框架什么是相信大家都或多或少使用过框架比如阿里的谷歌的的等等那么究竟什么是翻译成中文 Cool-Rpc 前言 此博客所述项目代码已在github开源,欢迎大家一起贡献! 点此进入:Cool-RPC 最近一次写博客还是17年底,谢谢大家...

    stdying 评论0 收藏0
  • Java深入-框架技巧

    摘要:从使用到原理学习线程池关于线程池的使用,及原理分析分析角度新颖面向切面编程的基本用法基于注解的实现在软件开发中,分散于应用中多出的功能被称为横切关注点如事务安全缓存等。 Java 程序媛手把手教你设计模式中的撩妹神技 -- 上篇 遇一人白首,择一城终老,是多么美好的人生境界,她和他历经风雨慢慢变老,回首走过的点点滴滴,依然清楚的记得当初爱情萌芽的模样…… Java 进阶面试问题列表 -...

    chengtao1633 评论0 收藏0
  • Java深入-框架技巧

    摘要:从使用到原理学习线程池关于线程池的使用,及原理分析分析角度新颖面向切面编程的基本用法基于注解的实现在软件开发中,分散于应用中多出的功能被称为横切关注点如事务安全缓存等。 Java 程序媛手把手教你设计模式中的撩妹神技 -- 上篇 遇一人白首,择一城终老,是多么美好的人生境界,她和他历经风雨慢慢变老,回首走过的点点滴滴,依然清楚的记得当初爱情萌芽的模样…… Java 进阶面试问题列表 -...

    dance 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<