资讯专栏INFORMATION COLUMN

dubbo源码解析——消费过程

darkbug / 2974人阅读

摘要:上一篇源码解析概要篇中我们了解到中的一些概念及消费端总体调用过程。由于在生成代理实例的时候,在构造函数中赋值了,因此可以只用该进行方法的调用。

上一篇 dubbo源码解析——概要篇中我们了解到dubbo中的一些概念及消费端总体调用过程。本文中,将进入消费端源码解析(具体逻辑会放到代码的注释中)。本文先是对消费过程的总体代码逻辑理一遍,个别需要细讲的点,后面会专门的文章进行解析。

开头进入InvokerInvocationHandler

通过实现InvocationHandler,我们知道dubbo生成代理使用的是JDK动态代理。这个类中主要是对特殊方法进行处理。由于在生成代理实例的时候,在构造函数中赋值了invoker,因此可以只用该invoker进行invoke方法的调用。

/**
 * dubbo使用JDK动态代理,对接口对象进行注入
 * InvokerHandler
 *
 * 程序启动的过程中,在构造函数中,赋值下一个需要调用的invoker,从而形成执行链
 */
public class InvokerInvocationHandler implements InvocationHandler {

    private final Invoker invoker;

    public InvokerInvocationHandler(Invoker handler) {
        this.invoker = handler;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        // 获取方法名称
        String methodName = method.getName();
        // 获取参数类型
        Class[] parameterTypes = method.getParameterTypes();
        // 方法所处的类 是 Object类,则直接调用
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        }

        /*
         * toString、hashCode、equals方法比较特殊,如果interface里面定义了这几个方法,并且进行实现,
         * 通过dubbo远程调用是不会执行这些代码实现的。
         */

        /*
         * 方法调用是toString,依次执行MockClusterInvoker、AbstractClusterInvoker的toString方法
         */
        if ("toString".equals(methodName) && parameterTypes.length == 0) {
            return invoker.toString();
        }

        /*
         * interface中含有hashCode方法,直接调用invoker的hashCode
         */
        if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
            return invoker.hashCode();
        }

        /*
         * interface中含有equals方法,直接调用invoker的equals
         */
        if ("equals".equals(methodName) && parameterTypes.length == 1) {
            return invoker.equals(args[0]);
        }

        /*
         * invocationv包含了远程调用的参数、方法信息
         */
        RpcInvocation invocation;

        /*
         * todo这段代码在最新的dubbo版本中没有
         */
        if (RpcUtils.hasGeneratedFuture(method)) {
            Class clazz = method.getDeclaringClass();
            String syncMethodName = methodName.substring(0, methodName.length() - Constants.ASYNC_SUFFIX.length());
            Method syncMethod = clazz.getMethod(syncMethodName, method.getParameterTypes());
            invocation = new RpcInvocation(syncMethod, args);
            invocation.setAttachment(Constants.FUTURE_GENERATED_KEY, "true");
            invocation.setAttachment(Constants.ASYNC_KEY, "true");
        } else {
            invocation = new RpcInvocation(method, args);
            if (RpcUtils.hasFutureReturnType(method)) {
                invocation.setAttachment(Constants.FUTURE_RETURNTYPE_KEY, "true");
                invocation.setAttachment(Constants.ASYNC_KEY, "true");
            }
        }

        // 继续invoker链式调用
        return invoker.invoke(invocation).recreate();
    }


}
进入MockClusterInvoker

这段代码主要是判断是否需要进行mock调用

@Override
    public Result invoke(Invocation invocation) throws RpcException {
        Result result = null;

        // 获取mock参数,从而判断是否需要mock
        String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();

        if (value.length() == 0 || value.equalsIgnoreCase("false")) {
            // 不需要mock,继续往下调用
            result = this.invoker.invoke(invocation);
        } else if (value.startsWith("force")) {
            if (logger.isWarnEnabled()) {
                logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl());
            }
            // 选择mock的invoker
            result = doMockInvoke(invocation, null);
        } else {
            // 正常调用失败,则调用mock
            try {
                result = this.invoker.invoke(invocation);
            } catch (RpcException e) {
                if (e.isBiz()) {
                    throw e;
                } else {
                    if (logger.isWarnEnabled()) {
                        logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e);
                    }
                    result = doMockInvoke(invocation, e);
                }
            }
        }
        return result;
    }
进入AbstractClusterInvoker

进入这段代码,表明开始进入到集群

@Override
    public Result invoke(final Invocation invocation) throws RpcException {
        // 检查消费端invoker是否销毁了
        checkWhetherDestroyed();

        // 将参数绑定到invocation
        Map contextAttachments = RpcContext.getContext().getAttachments();
        if (contextAttachments != null && contextAttachments.size() != 0) {
            ((RpcInvocation) invocation).addAttachments(contextAttachments);
        }

        // 获取满足条件的invoker(从Directory获取,并且经过router过滤)
        List> invokers = list(invocation);

        // 初始化loadBalance
        LoadBalance loadbalance = initLoadBalance(invokers, invocation);

        // invocation ID将被添加在异步操作
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);

        return doInvoke(invocation, invokers, loadbalance);
    }
进入AbstractDirectory
@Override
    public List> list(Invocation invocation) throws RpcException {
        // 判断Directory是否销毁
        if (destroyed) {
            throw new RpcException("Directory already destroyed .url: " + getUrl());
        }
        // 从methodInvokerMap中取出满足条件的invoker
        List> invokers = doList(invocation);

        // 根据路由列表,筛选出满足条件的invoker
        List localRouters = this.routers; // local reference
        if (localRouters != null && !localRouters.isEmpty()) {
            for (Router router : localRouters) {
                try {
                    if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {
                        invokers = router.route(invokers, getConsumerUrl(), invocation);
                    }
                } catch (Throwable t) {
                    logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
                }
            }
        }
        return invokers;
    }

这里主要是从Directory中获取invoker,并且经过router路由的筛选,获得满足条件的invoker。
在AbstractDirectory中,有一个关键的方法com.alibaba.dubbo.rpc.cluster.directory.AbstractDirectory#doList,这是一个抽象方法,子类RegistryDirectory的有具体实现,并且调用RegistryDirectory的doList方法。(这里应该是用到了模板方法模式)。后面的文字中会详细讲下doList方法中做了啥。

进入FailoverClusterInvoker

经过从Directory中获取invoker,然后router筛选出满足条件的invoker之后,进入到FailoverClusterInvoker。为什么会到这里呢?

根据官网的描述:
在集群调用失败时,Dubbo 提供了多种容错方案,缺省为 failover 重试。
所以这个时候是到了FailoverClusterInvoker类,但是如果你配置的是Failfast Cluster(快速失败),Failsafe Cluster(失败安全),Failback Cluster(失败自动恢复),Forking Cluster(并行调用多个服务器,只要一个成功即返回),Broadcast Cluster(广播调用所有提供者,逐个调用,任意一台报错则报错),他也会到达相应的类。

@Override
    @SuppressWarnings({"unchecked", "rawtypes"})
    public Result doInvoke(Invocation invocation, final List> invokers, LoadBalance loadbalance) throws RpcException {
        // 局部引用
        List> copyinvokers = invokers;

        // 参数校验(这种封装方法我在工作中借鉴,个人感觉比较好)
        checkInvokers(copyinvokers, invocation);

        // 获取方法名称
        String methodName = RpcUtils.getMethodName(invocation);

        // 获取重试次数
        int len = getUrl().getMethodParameter(methodName, Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
        if (len <= 0) {
            // 最少要调用1次
            len = 1;
        }

        // 局部引用
        RpcException le = null;
        List> invoked = new ArrayList>(copyinvokers.size()); // invoked invokers.
        Set providers = new HashSet(len);

        // i < len 作为循环条件,说明len是多少就循环多少次
        for (int i = 0; i < len; i++) {
            // 在重试之前,需要重新选择,以避免候选invoker的改变
            if (i > 0) {
                // 检查invoker是否被销毁
                checkWhetherDestroyed();
                // 重新选择invoker
                copyinvokers = list(invocation);
                // 参数检查
                checkInvokers(copyinvokers, invocation);
            }

            /*
             * 这一步就是进入loadBalance负载均衡
             * 因为上述步骤可能筛选出invoker数量大于1,所以再次经过loadBalance的筛选
             */
            Invoker invoker = select(loadbalance, invocation, copyinvokers, invoked);
            invoked.add(invoker);

            RpcContext.getContext().setInvokers((List) invoked);
            try {
                // 远程方法调用
                Result result = invoker.invoke(invocation);
                if (le != null && logger.isWarnEnabled()) {
                    logger.warn("Although retry the method " + methodName
                            + " in the service " + getInterface().getName()
                            + " was successful by the provider " + invoker.getUrl().getAddress()
                            + ", but there have been failed providers " + providers
                            + " (" + providers.size() + "/" + copyinvokers.size()
                            + ") from the registry " + directory.getUrl().getAddress()
                            + " on the consumer " + NetUtils.getLocalHost()
                            + " using the dubbo version " + Version.getVersion() + ". Last error is: "
                            + le.getMessage(), le);
                }
                return result;
            } catch (RpcException e) {
                if (e.isBiz()) { // biz exception.
                    throw e;
                }
                le = e;
            } catch (Throwable e) {
                le = new RpcException(e.getMessage(), e);
            } finally {
                providers.add(invoker.getUrl().getAddress());
            }
        }
        throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method "
                + methodName + " in the service " + getInterface().getName()
                + ". Tried " + len + " times of the providers " + providers
                + " (" + providers.size() + "/" + copyinvokers.size()
                + ") from the registry " + directory.getUrl().getAddress()
                + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
                + Version.getVersion() + ". Last error is: "
                + (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le);
    }

到达终点站.我们回忆总结一下,文初提到的三个关键词,在这个集群容错的整体架构过程中,dubbo究竟做了什么.其实也就是三件事

(1)在Directory中找出本次集群中的全部invokers
(2)在Router中,将上一步的全部invokers挑选出能正常执行的invokers
(3)在LoadBalance中,将上一步的能正常的执行invokers中,根据配置的负载均衡策略,挑选出需要执行的invoker

后面的文章中,对上述的一些细节进行解析

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/72100.html

相关文章

  • dubbo源码解析(四十八)异步化改造

    摘要:大揭秘异步化改造目标从源码的角度分析的新特性中对于异步化的改造原理。看源码解析四十六消费端发送请求过程讲到的十四的,在以前的逻辑会直接在方法中根据配置区分同步异步单向调用。改为关于可以参考源码解析十远程通信层的六。 2.7大揭秘——异步化改造 目标:从源码的角度分析2.7的新特性中对于异步化的改造原理。 前言 dubbo中提供了很多类型的协议,关于协议的系列可以查看下面的文章: du...

    lijinke666 评论0 收藏0
  • dubbo源码解析——概要篇

    摘要:服务提供者代码上面这个类会被封装成为一个实例,并新生成一个实例。这样当网络通讯层收到一个请求后,会找到对应的实例,并调用它所对应的实例,从而真正调用了服务提供者的代码。 这次源码解析借鉴《肥朝》前辈的dubbo源码解析,进行源码学习。总结起来就是先总体,后局部.也就是先把需要注意的概念先抛出来,把整体架构图先画出来.让读者拿着地图跟着我的脚步,并且每一步我都提醒,现在我们在哪,我们下一...

    Meathill 评论0 收藏0
  • Dubbo 源码分析 - 集群容错之 Router

    摘要:源码分析条件路由规则有两个条件组成,分别用于对服务消费者和提供者进行匹配。如果服务提供者匹配条件为空,表示对某些服务消费者禁用服务。此时第六次循环分隔符,,。第二个和第三个参数来自方法的参数列表,这两个参数分别为服务提供者和服务消费者。 1. 简介 上一篇文章分析了集群容错的第一部分 -- 服务目录 Directory。服务目录在刷新 Invoker 列表的过程中,会通过 Router...

    jcc 评论0 收藏0
  • dubbo源码解析(四十六)消费端发送请求过程

    摘要:可以参考源码解析二十四远程调用协议的八。十六的该类也是用了适配器模式,该类主要的作用就是增加了心跳功能,可以参考源码解析十远程通信层的四。二十的可以参考源码解析十七远程通信的一。 2.7大揭秘——消费端发送请求过程 目标:从源码的角度分析一个服务方法调用经历怎么样的磨难以后到达服务端。 前言 前一篇文章讲到的是引用服务的过程,引用服务无非就是创建出一个代理。供消费者调用服务的相关方法。...

    fish 评论0 收藏0
  • dubbo源码解析(四十五)服务引用过程

    摘要:服务引用过程目标从源码的角度分析服务引用过程。并保留服务提供者的部分配置,比如版本,,时间戳等最后将合并后的配置设置为查询字符串中。的可以参考源码解析二十三远程调用的一的源码分析。 dubbo服务引用过程 目标:从源码的角度分析服务引用过程。 前言 前面服务暴露过程的文章讲解到,服务引用有两种方式,一种就是直连,也就是直接指定服务的地址来进行引用,这种方式更多的时候被用来做服务测试,不...

    xiaowugui666 评论0 收藏0

发表评论

0条评论

darkbug

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<