资讯专栏INFORMATION COLUMN

dubbo源码解析——cluster

seal_de / 2050人阅读

摘要:简单来说就是应对出错情况采取的策略。由于重试,重试次数过多时,带来时延。通常用于实时性要求较高的读操作,但需要浪费更多服务资源。通常用于通知所有提供者更新缓存或日志等本地资源信息。

我们再来回顾一下官网的对于集群容错的架构设计图

Cluster概述

将 Directory 中的多个 Invoker 伪装成一个 Invoker(伪装过程用到loadBalance),对上层透明,伪装过程包含了容错逻辑,调用失败后,重试另一个。简单来说,就是应对出错情况采取的策略。看看这个接口:

该接口有9个实现类,换个角度来说,就是有9中应对策略,本文介绍几个比较常用的策略

FailoverCluster

失败自动切换,当调用远程服务失败时,自动选择其他服务进行调用。可以通过retries设置重试次数。由于重试,重试次数过多时,带来时延。

/**
 * Failover
 * 当invoker调用失败,打印错误日志,并且重试其他invoker
 * 重试将导致时延
 */
public class FailoverClusterInvoker extends AbstractClusterInvoker {

    private static final Logger logger = LoggerFactory.getLogger(FailoverClusterInvoker.class);

    public FailoverClusterInvoker(Directory directory) {
        super(directory);
    }

    @Override
    @SuppressWarnings({"unchecked", "rawtypes"})
    public Result doInvoke(Invocation invocation, final List> invokers, LoadBalance loadbalance) throws RpcException {
        // 局部引用
        List> copyinvokers = invokers;

        // 参数校验
        checkInvokers(copyinvokers, invocation);

        // 获取方法名称
        String methodName = RpcUtils.getMethodName(invocation);

        // 获取重试次数
        int len = getUrl().getMethodParameter(methodName, Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
        if (len <= 0) {
            // 最少要调用1次
            len = 1;
        }

        // 局部引用
        RpcException le = null;
        // 已经调用过的invoker列表
        List> invoked = new ArrayList>(copyinvokers.size());
        // 调用失败的invoker地址
        Set providers = new HashSet(len);

        // i < len 作为循环条件,说明len是多少就循环多少次(len等于 重试次数 + 1)
        for (int i = 0; i < len; i++) {
            if (i > 0) {
                // 检查invoker是否被销毁
                checkWhetherDestroyed();
                // 重新选择invoker(在重试之前,需要重新选择,以避免候选invoker的改变)
                copyinvokers = list(invocation);
                // 参数检查
                checkInvokers(copyinvokers, invocation);
            }

            /*
             * 这一步就是进入loadBalance负载均衡
             * 因为上述步骤可能筛选出invoker数量大于1,所以再次经过loadBalance的筛选(同时避免获取到已经调用过的invoker)
             */
            Invoker invoker = select(loadbalance, invocation, copyinvokers, invoked);
            invoked.add(invoker);

            RpcContext.getContext().setInvokers((List) invoked);
            try {
                // 远程方法调用
                Result result = invoker.invoke(invocation);
                if (le != null && logger.isWarnEnabled()) {
                    logger.warn("Although retry the method " + methodName
                            + " in the service " + getInterface().getName()
                            + " was successful by the provider " + invoker.getUrl().getAddress()
                            + ", but there have been failed providers " + providers
                            + " (" + providers.size() + "/" + copyinvokers.size()
                            + ") from the registry " + directory.getUrl().getAddress()
                            + " on the consumer " + NetUtils.getLocalHost()
                            + " using the dubbo version " + Version.getVersion() + ". Last error is: "
                            + le.getMessage(), le);
                }

                // 正常执行,直接返回结果。否则,如果还有重试次数,则继续重试
                return result;
            } catch (RpcException e) {
                if (e.isBiz()) { // biz exception.
                    throw e;
                }
                le = e;
            } catch (Throwable e) {
                le = new RpcException(e.getMessage(), e);
            } finally {
                providers.add(invoker.getUrl().getAddress());
            }
        }

        // 能到这里,说明都失败了,providers保存失败的invoker地址
        throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method "
                + methodName + " in the service " + getInterface().getName()
                + ". Tried " + len + " times of the providers " + providers
                + " (" + providers.size() + "/" + copyinvokers.size()
                + ") from the registry " + directory.getUrl().getAddress()
                + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
                + Version.getVersion() + ". Last error is: "
                + (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le);
    }

}
MergeableCluster

这个主要用在分组聚合中,我们来看一下官网的介绍

按组合并返回结果 ,比如菜单服务,接口一样,但有多种实现,用group区分,现在消费方需从每种group中调用一次返回结果,合并结果返回,这样就可以实现聚合菜单项。

下面补充一下使用方法(网上基本没有使用方法的教程,楼主才疏学浅,花了几个小时才摸索出来):
(1)consumer侧,提供合并merge方法
这里有几个步骤:
a、在resources目录下,新建META-INF及dubbo,新建文本com.alibaba.dubbo.rpc.cluster.Merger
b、映射自定义的merger名称到相应的实现类,如:
myMerger=com.patty.dubbo.consumer.service.MyMerger
c、实现合并函数,需要实现Merger接口,如下:

public class MyMerger implements Merger {

    @Override
    public ModelResult merge(ModelResult... items) {

        ModelResult result = new ModelResult();
        for (ModelResult item : items) {
            // 进行数据合并操作
            result.setResult((String)result.getResult() + (String) item.getResult());
        }

        return result;
    }
}

(2)将reference的cluster属性设置为"mergeable",group设置为“*”,并且设置合并方法,如下:


        
    

(3)同一套代码,分别利用不同的group,把服务发布到注册中心上面。例如:/group1/com.huangyuan.demoService 及 /group2/com.huangyuan.demoService

(3)接下来就可以直接使用了,这边测试得到结果:(这里合并只是简单连接字符串)

接下来再看下源码:

public Result invoke(final Invocation invocation) throws RpcException {
        // 获取Directory中的invoker
        List> invokers = directory.list(invocation);

        // 获取合并方法的名称
        String merger = getUrl().getMethodParameter(invocation.getMethodName(), Constants.MERGER_KEY);
        if (ConfigUtils.isEmpty(merger)) {
            for (final Invoker invoker : invokers) {
                // 如果没有合并方法,只调动其中一个分组
                if (invoker.isAvailable()) {
                    return invoker.invoke(invocation);
                }
            }
            return invokers.iterator().next().invoke(invocation);
        }

        // 获取返回值类型
        Class returnType;
        try {
            returnType = getInterface().getMethod(
                    invocation.getMethodName(), invocation.getParameterTypes()).getReturnType();
        } catch (NoSuchMethodException e) {
            returnType = null;
        }

        Map> results = new HashMap>();
        for (final Invoker invoker : invokers) {
            Future future = executor.submit(new Callable() {
                @Override
                public Result call() throws Exception {
                    return invoker.invoke(new RpcInvocation(invocation, invoker));
                }
            });
            // 保留future(未真正执行远程调用)
            results.put(invoker.getUrl().getServiceKey(), future);
        }

        Object result = null;

        // 结果列表
        List resultList = new ArrayList(results.size());

        // 超时时间
        int timeout = getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);

        //
        for (Map.Entry> entry : results.entrySet()) {
            Future future = entry.getValue();
            try {
                // 执行远程调用
                Result r = future.get(timeout, TimeUnit.MILLISECONDS);
                if (r.hasException()) {
                    log.error("Invoke " + getGroupDescFromServiceKey(entry.getKey()) + 
                                    " failed: " + r.getException().getMessage(), 
                            r.getException());
                } else {
                    resultList.add(r);
                }
            } catch (Exception e) {
                throw new RpcException("Failed to invoke service " + entry.getKey() + ": " + e.getMessage(), e);
            }
        }

        if (resultList.isEmpty()) {
            return new RpcResult((Object) null);
        } else if (resultList.size() == 1) {
            // 只有一个结果,直接返回了
            return resultList.iterator().next();
        }

        if (returnType == void.class) {
            return new RpcResult((Object) null);
        }

        if (merger.startsWith(".")) {
            /*
             * 配置的方法名称,以"."开头
             * 这种方式,入参固定只有一个,没有达到合并的效果,不建议使用
             */
            merger = merger.substring(1);
            Method method;
            try {
                method = returnType.getMethod(merger, returnType);
            } catch (NoSuchMethodException e) {
                throw new RpcException("Can not merge result because missing method [ " + merger + " ] in class [ " + 
                        returnType.getClass().getName() + " ]");
            }
            if (!Modifier.isPublic(method.getModifiers())) {
                method.setAccessible(true);
            }
            result = resultList.remove(0).getValue();
            try {
                if (method.getReturnType() != void.class
                        && method.getReturnType().isAssignableFrom(result.getClass())) {
                    for (Result r : resultList) {
                        result = method.invoke(result, r.getValue());
                    }
                } else {
                    for (Result r : resultList) {
                        method.invoke(result, r.getValue());
                    }
                }
            } catch (Exception e) {
                throw new RpcException("Can not merge result: " + e.getMessage(), e);
            }
        } else {
            /*
             * 建议使用Merger扩展的方式
             */
            Merger resultMerger;
            if (ConfigUtils.isDefault(merger)) {
                resultMerger = MergerFactory.getMerger(returnType);
            } else {
                resultMerger = ExtensionLoader.getExtensionLoader(Merger.class).getExtension(merger);
            }
            if (resultMerger != null) {
                List rets = new ArrayList(resultList.size());
                for (Result r : resultList) {
                    rets.add(r.getValue());
                }
                result = resultMerger.merge(
                        rets.toArray((Object[]) Array.newInstance(returnType, 0)));
            } else {
                throw new RpcException("There is no merger to merge result.");
            }
        }
        return new RpcResult(result);
    }

PS:其实合并方法还有另外一个使用方式,使用".方法名称",并且合并方法只能写在结果类中,这种方式有一个很大的弊端,就是源码中入参固定只有一个,所以达不到合并效果,故不推荐使用。


        
    
AvailableCluster
public class AvailableCluster implements Cluster {

    public static final String NAME = "available";

    @Override
    public  Invoker join(Directory directory) throws RpcException {

        return new AbstractClusterInvoker(directory) {
            @Override
            public Result doInvoke(Invocation invocation, List> invokers, LoadBalance loadbalance) throws RpcException {
                for (Invoker invoker : invokers) {
                    if (invoker.isAvailable()) {
                        // 仅仅执行可只用的invoker
                        return invoker.invoke(invocation);
                    }
                }
                throw new RpcException("No provider available in " + invokers);
            }
        };

    }

}

遍历所有的Invokers判断invoker.isAvalible,只要一个有为true直接调用返回,否则就抛出异常.

ForkingCluster

引用官网的介绍

并行调用多个服务器,只要一个成功即返回。通常用于实时性要求较高的读操作,但需要浪费更多服务资源。可通过 forks="2" 来设置最大并行数。

我们来看看源码的实现

FailfastCluster

快速失败

Failfast可以理解为只发起一次调用,若失败则立即报错

通常用于非幂等写操作

@Override
    public Result doInvoke(Invocation invocation, List> invokers, LoadBalance loadbalance) throws RpcException {
        checkInvokers(invokers, invocation);
        Invoker invoker = select(loadbalance, invocation, invokers, null);
        try {
            // 成功直接往下执行
            return invoker.invoke(invocation);
        } catch (Throwable e) {
            // 失败抛出异常,不做别的处理
            if (e instanceof RpcException && ((RpcException) e).isBiz()) { // biz exception.
                throw (RpcException) e;
            }
            throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName() + " select from all providers " + invokers + " for service " + getInterface().getName() + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
        }
    }
BroadcastCluster

广播调用

广播调用所有提供者,逐个调用,任意一台报错则报错。通常用于通知所有提供者更新缓存或日志等本地资源信息。

public Result doInvoke(final Invocation invocation, List> invokers, LoadBalance loadbalance) throws RpcException {
        checkInvokers(invokers, invocation);
        RpcContext.getContext().setInvokers((List) invokers);
        RpcException exception = null;
        Result result = null;

        for (Invoker invoker : invokers) {
            try {
                // 循环调用invoker
                result = invoker.invoke(invocation);
            } catch (RpcException e) {
                exception = e;
                logger.warn(e.getMessage(), e);
            } catch (Throwable e) {
                exception = new RpcException(e.getMessage(), e);
                logger.warn(e.getMessage(), e);
            }
        }
        if (exception != null) {
            throw exception;
        }
        return result;
    }
FailbackClusterInvoker

失败自动重试

当失败了,记录失败的请求,按照一定的间隔定时重试

特别适用于通知服务

这个相对比较复杂,先了解一些基础概念

Delayed

延迟接口,用于标记在给定延迟之后应该被作用的对象

ScheduledFuture

实现Delayed、Future接口,能够获取未来调度的结果

演示一些上面ScheduledFuture的用法

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

/**
 * @author huangy on 2018/11/12
 */
public class ScheduledFutureTest {

    // 延迟调用,获取未来调度结果的对象
    private volatile ScheduledFuture retryFuture;

    // 指定时间间隔 重发执行一次
    private static final long RETRY_FAILED_PERIOD = 1 * 1000;

    // ScheduledExecutorService的主要作用就是可以将定时任务与线程池功能结合使用
    private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);

    public void func() {
        retryFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {

            @Override
            public void run() {
                System.out.println("retry");
            }
            // 延迟第一次执行的时间    每次的延迟           时间单位(现在填的是毫秒)
        }, RETRY_FAILED_PERIOD, RETRY_FAILED_PERIOD, TimeUnit.MILLISECONDS);
    }

    public static void main(String[] args) {
        new ScheduledFutureTest().func();
    }
}

结果如下:

其实看完这个例子,再看failbackCluster就挺简单了

public class FailbackClusterInvoker extends AbstractClusterInvoker {

    private static final Logger logger = LoggerFactory.getLogger(FailbackClusterInvoker.class);

    // 5s 重发一次
    private static final long RETRY_FAILED_PERIOD = 5 * 1000;

    /**
     * Use {@link NamedInternalThreadFactory} to produce {@link org.apache.dubbo.common.threadlocal.InternalThread}
     * which with the use of {@link org.apache.dubbo.common.threadlocal.InternalThreadLocal} in {@link RpcContext}.
     */
    private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2,
            new NamedInternalThreadFactory("failback-cluster-timer", true));

    // 保存需要重新执行的invoker
    private final ConcurrentMap> failed = new ConcurrentHashMap>();

    // 延迟调用,获取未来调度结果的对象
    private volatile ScheduledFuture retryFuture;

    public FailbackClusterInvoker(Directory directory) {
        super(directory);
    }

    private void addFailed(Invocation invocation, AbstractClusterInvoker router) {
        if (retryFuture == null) {
            // 避免同时调度
            synchronized (this) {
                if (retryFuture == null) {
                    retryFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {

                        @Override
                        public void run() {
                            // collect retry statistics
                            try {
                                // 隔一段时间重新执行
                                retryFailed();
                            } catch (Throwable t) { // Defensive fault tolerance
                                logger.error("Unexpected error occur at collect statistic", t);
                            }
                        }
                    }, RETRY_FAILED_PERIOD, RETRY_FAILED_PERIOD, TimeUnit.MILLISECONDS);
                }
            }
        }
        failed.put(invocation, router);
    }

    void retryFailed() {
        // 没有需要重新执行的invoker
        if (failed.size() == 0) {
            return;
        }

        // 逐个调用之前失败的invoker
        for (Map.Entry> entry : new HashMap>(
                failed).entrySet()) {
            Invocation invocation = entry.getKey();
            Invoker invoker = entry.getValue();
            try {
                invoker.invoke(invocation);
                failed.remove(invocation);
            } catch (Throwable e) {
                logger.error("Failed retry to invoke method " + invocation.getMethodName() + ", waiting again.", e);
            }
        }
    }

    @Override
    protected Result doInvoke(Invocation invocation, List> invokers, LoadBalance loadbalance) throws RpcException {
        try {
            checkInvokers(invokers, invocation);
            Invoker invoker = select(loadbalance, invocation, invokers, null);
            // 正常执行,则直接返回
            return invoker.invoke(invocation);
        } catch (Throwable e) {
            logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: "
                    + e.getMessage() + ", ", e);
            // 记录失败的请求
            addFailed(invocation, this);
            return new RpcResult(); // ignore
        }
    }

}
FailsafeCluster

调用实例失败后,如果有报错,则忽略掉异常,返回一个正常的空结果

@Override
    public Result doInvoke(Invocation invocation, List> invokers, LoadBalance loadbalance) throws RpcException {
        try {
            checkInvokers(invokers, invocation);
            Invoker invoker = select(loadbalance, invocation, invokers, null);
            return invoker.invoke(invocation);
        } catch (Throwable e) {
            logger.error("Failsafe ignore exception: " + e.getMessage(), e);
            return new RpcResult(); // ignore
        }
    }

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/72098.html

相关文章

  • dubbo源码解析——消费过程

    摘要:上一篇源码解析概要篇中我们了解到中的一些概念及消费端总体调用过程。由于在生成代理实例的时候,在构造函数中赋值了,因此可以只用该进行方法的调用。 上一篇 dubbo源码解析——概要篇中我们了解到dubbo中的一些概念及消费端总体调用过程。本文中,将进入消费端源码解析(具体逻辑会放到代码的注释中)。本文先是对消费过程的总体代码逻辑理一遍,个别需要细讲的点,后面会专门的文章进行解析。...

    darkbug 评论0 收藏0
  • dubbo源码解析(一)Hello,Dubbo

    摘要:英文全名为,也叫远程过程调用,其实就是一个计算机通信协议,它是一种通过网络从远程计算机程序上请求服务而不需要了解底层网络技术的协议。 Hello,Dubbo 你好,dubbo,初次见面,我想和你交个朋友。 Dubbo你到底是什么? 先给出一套官方的说法:Apache Dubbo是一款高性能、轻量级基于Java的RPC开源框架。 那么什么是RPC? 文档地址:http://dubbo.a...

    evin2016 评论0 收藏0
  • dubbo源码解析(四十五)服务引用过程

    摘要:服务引用过程目标从源码的角度分析服务引用过程。并保留服务提供者的部分配置,比如版本,,时间戳等最后将合并后的配置设置为查询字符串中。的可以参考源码解析二十三远程调用的一的源码分析。 dubbo服务引用过程 目标:从源码的角度分析服务引用过程。 前言 前面服务暴露过程的文章讲解到,服务引用有两种方式,一种就是直连,也就是直接指定服务的地址来进行引用,这种方式更多的时候被用来做服务测试,不...

    xiaowugui666 评论0 收藏0
  • dubbo源码解析——概要篇

    摘要:服务提供者代码上面这个类会被封装成为一个实例,并新生成一个实例。这样当网络通讯层收到一个请求后,会找到对应的实例,并调用它所对应的实例,从而真正调用了服务提供者的代码。 这次源码解析借鉴《肥朝》前辈的dubbo源码解析,进行源码学习。总结起来就是先总体,后局部.也就是先把需要注意的概念先抛出来,把整体架构图先画出来.让读者拿着地图跟着我的脚步,并且每一步我都提醒,现在我们在哪,我们下一...

    Meathill 评论0 收藏0
  • dubbo源码解析(三十五)集群——cluster

    摘要:失败安全,出现异常时,直接忽略。失败自动恢复,在调用失败后,返回一个空结果给服务提供者。源码分析一该类实现了接口,是集群的抽象类。 集群——cluster 目标:介绍dubbo中集群容错的几种模式,介绍dubbo-cluster下support包的源码。 前言 集群容错还是很好理解的,就是当你调用失败的时候所作出的措施。先来看看有哪些模式: showImg(https://segmen...

    gself 评论0 收藏0

发表评论

0条评论

seal_de

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<