资讯专栏INFORMATION COLUMN

dubbo之timeout超时分析

张率功 / 3127人阅读

摘要:讲到这里,超时原理基本上其实差不多了,这个类还有个地方需要注意,在初始化对象时,会去创建一个超时的延迟任务,延迟时间就是值,在这个延迟任务中也会调用方法唤醒阻塞

背景

在使用dubbo时,通常会遇到timeout这个属性,timeout属性的作用是:给某个服务调用设置超时时间,如果服务在设置的时间内未返回结果,则会抛出调用超时异常:TimeoutException,在使用的过程中,我们有时会对provider和consumer两个配置都会设置timeout值,那么服务调用过程中会以哪个为准?本文主要针对这个问题进行分析和扩展

三种设置方式

以provider配置为例:

方法级别

设置方式如下所示:


   

接口级别

全局级别

优先级选择

在dubbo中如果provider和consumer都配置了相同的一个属性,比如本文分析的timeout,其实是有一个优先级的,优先级:
consumer方法配置 > provider方法配置 > consumer接口配置 > provider接口配置 > consumer全局配置 > provider全局配置。所以对于本文开始的提出的问题就有了结果,会以消费者配置的为准,接下结合源码来进行解析,其实源码很简单,在RegistryDirectory类中将服务列表转换为DubboInvlker方法中进行了处理:

    private Map> toInvokers(List urls) {
        Map> newUrlInvokerMap = new HashMap>();
        if (urls == null || urls.isEmpty()) {
            return newUrlInvokerMap;
        }
        Set keys = new HashSet();
        String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
        for (URL providerUrl : urls) {
            // If protocol is configured at the reference side, only the matching protocol is selected
            if (queryProtocols != null && queryProtocols.length() > 0) {
                boolean accept = false;
                String[] acceptProtocols = queryProtocols.split(",");
                for (String acceptProtocol : acceptProtocols) {
                    if (providerUrl.getProtocol().equals(acceptProtocol)) {
                        accept = true;
                        break;
                    }
                }
                if (!accept) {
                    continue;
                }
            }
            if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
                continue;
            }
            if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
                logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() +
                        " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() +
                        " to consumer " + NetUtils.getLocalHost() + ", supported protocol: " +
                        ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
                continue;
            }
            // 重点就是下面这个方法
            URL url = mergeUrl(providerUrl);

            String key = url.toFullString(); // The parameter urls are sorted
            if (keys.contains(key)) { // Repeated url
                continue;
            }
            keys.add(key);
            // Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer again
            Map> localUrlInvokerMap = this.urlInvokerMap; // local reference
            Invoker invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
            if (invoker == null) { // Not in the cache, refer again
                try {
                    boolean enabled = true;
                    if (url.hasParameter(Constants.DISABLED_KEY)) {
                        enabled = !url.getParameter(Constants.DISABLED_KEY, false);
                    } else {
                        enabled = url.getParameter(Constants.ENABLED_KEY, true);
                    }
                    if (enabled) {
                        invoker = new InvokerDelegate(protocol.refer(serviceType, url), url, providerUrl);
                    }
                } catch (Throwable t) {
                    logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
                }
                if (invoker != null) { // Put new invoker in cache
                    newUrlInvokerMap.put(key, invoker);
                }
            } else {
                newUrlInvokerMap.put(key, invoker);
            }
        }
        keys.clear();
        return newUrlInvokerMap;
    }

重点就是上面mergeUrl()方法,将provider和comsumer的url参数进行了整合,在
mergeUrl()方法有会调用ClusterUtils.mergeUrl方法进行整合,因为这个方法比较简单,就是对一些参数进行了整合了,会用consumer参数进行覆盖,咱们这里就不分析了,如果感兴趣的同学可以去研究一下。

超时处理

在配置设置了超时timeout,那么代码中是如何处理的,这里咱们在进行一下扩展,分析一下dubbo中是如何处理超时的,在调用服务方法,最后都会调用DubboInvoker.doInvoke方法,咱们就从这个方法开始分析:

    @Override
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
        inv.setAttachment(Constants.VERSION_KEY, version);

        ExchangeClient currentClient;
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
            boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
            boolean isAsyncFuture = RpcUtils.isReturnTypeFuture(inv);
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
            if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                RpcContext.getContext().setFuture(null);
                return new RpcResult();
            } else if (isAsync) {
                ResponseFuture future = currentClient.request(inv, timeout);
                // For compatibility
                FutureAdapter futureAdapter = new FutureAdapter<>(future);
                RpcContext.getContext().setFuture(futureAdapter);

                Result result;
                // 异步处理
                if (isAsyncFuture) {
                    // register resultCallback, sometimes we need the async result being processed by the filter chain.
                    result = new AsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
                } else {
                    result = new SimpleAsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
                }
                return result;
            } else {
                // 同步处理
                RpcContext.getContext().setFuture(null);
                return (Result) currentClient.request(inv, timeout).get();
            }
        } catch (TimeoutException e) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (RemotingException e) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

在这个方法中,咱们就以同步模式进行分析,看request方法,request()方法会返回一个DefaultFuture类,在去调用DefaultFuture.get()方法,这里其实涉及到一个在异步中实现同步的技巧,咱们这里不做分析,所以重点就在get()方法里:

    @Override
    public Object get() throws RemotingException {
        return get(timeout);
    }

    @Override
    public Object get(int timeout) throws RemotingException {
        if (timeout <= 0) {
            timeout = Constants.DEFAULT_TIMEOUT;
        }
        if (!isDone()) {
            long start = System.currentTimeMillis();
            lock.lock();
            try {
                while (!isDone()) {
                    done.await(timeout, TimeUnit.MILLISECONDS);
                    if (isDone() || System.currentTimeMillis() - start > timeout) {
                        break;
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
            if (!isDone()) {
                throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
            }
        }
        return returnFromResponse();
    }

在调用get()方法时,会去调用get(timeout)这个方法,在这个方法中会传一个timeout字段,在和timeout就是咱们配置的那个参数,在这个方法中咱们要关注下面一个代码块:

        if (!isDone()) {
            long start = System.currentTimeMillis();
            lock.lock();
            try {
                while (!isDone()) {
                    // 线程阻塞
                    done.await(timeout, TimeUnit.MILLISECONDS);
                    if (isDone() || System.currentTimeMillis() - start > timeout) {
                        break;
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
            // 在超时时间里,还没有结果,则抛出超时异常
            if (!isDone()) {
                throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
            }
        }

重点看await()方法,会进行阻塞timeout时间,如果阻塞时间到了,则会唤醒往下执行,超时跳出while循环中,判断是否有结果返回,如果没有(这个地方要注意:只有有结果返回,或超时才跳出循环中),则抛出超时异常。讲到这里,超时原理基本上其实差不多了,DefaultFuture这个类还有个地方需要注意,在初始化DefaultFuture对象时,会去创建一个超时的延迟任务,延迟时间就是timeout值,在这个延迟任务中也会调用signal()方法唤醒阻塞

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/74197.html

相关文章

  • Dubbo 源码分析 - 集群容错 Cluster

    摘要:集群用途是将多个服务提供者合并为一个,并将这个暴露给服务消费者。比如发请求,接受服务提供者返回的数据等。如果包含,表明对应的服务提供者可能因网络原因未能成功提供服务。如果不包含,此时还需要进行可用性检测,比如检测服务提供者网络连通性等。 1.简介 为了避免单点故障,现在的应用至少会部署在两台服务器上。对于一些负载比较高的服务,会部署更多台服务器。这样,同一环境下的服务提供者数量会大于1...

    denson 评论0 收藏0
  • Dubbo 源码分析20 Dubbo服务提供者、服务消费者并发度控制机制

    摘要:代码根据服务提供者和服务调用方法名,获取。代码根据服务提供者配置的最大并发度,创建该服务该方法对应的信号量对象。总结是控制消费端对单个服务提供者单个服务允许调用的最大并发度。 本文将详细分析< dubbo:service executes=/>与< dubbo:reference actives = />的实现机制,深入探...

    不知名网友 评论0 收藏0
  • dubbo源码解析(二十六)远程调用——http协议

    摘要:前言基于表单的远程调用协议,采用的实现,关于协议就不用多说了吧。后记该部分相关的源码解析地址该文章讲解了远程调用中关于协议的部分,内容比较简单,可以参考着官方文档了解一下。 远程调用——http协议 目标:介绍远程调用中跟http协议相关的设计和实现,介绍dubbo-rpc-http的源码。 前言 基于HTTP表单的远程调用协议,采用 Spring 的HttpInvoker实现,关于h...

    xiyang 评论0 收藏0
  • dubbo源码解析(二十五)远程调用——hessian协议

    摘要:客户端对象字节输出流请求对象响应对象增加协议头发送请求获得请求后的状态码三该类实现了接口,是创建的工厂类。该类的实现跟类类似,但是是标准的接口调用会采用的工厂类,而是的协议调用。 远程调用——hessian协议 目标:介绍远程调用中跟hessian协议相关的设计和实现,介绍dubbo-rpc-hessian的源码。 前言 本文讲解多是dubbo集成的第二种协议,hessian协议,He...

    xzavier 评论0 收藏0
  • 数据库相关异常分析

    摘要:起因最近一段时间,生产系统持续碰到一些数据库异常,导致执行失败。综上,若发生异常,为数据库连接失效,但是失效的原因可能会有多种,大致都与各种参数相关。当时数据量大概多条,然后在批量插入时抛出该异常。 起因 最近一段时间,生产系统持续碰到一些数据库异常,导致 sql 执行失败。 应用环境 Java 1.7 + Mysql 5.6 + spring + ibatis 问题排查 将各种失败的...

    IamDLY 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<