资讯专栏INFORMATION COLUMN

spring cloud gateway 源码解析(2)动态路由

刘永祥 / 648人阅读

摘要:先看看是怎么获取我们配置的路由在启动时,帮我们注册了一系列这里注入所有,我们在配置文件里配置的路由就是通过子类来完成的,可以参考实现自己的的存储,会在后面转换成接着看类里的获取路由定义方法即轮训所有的市现率调用,这样就把所有整合到一起了接着

先看看gateway是怎么获取我们配置的路由:
在gateway启动时,GatewayAutoConfiguration帮我们注册了一系列beans

    @Bean
    @Primary
    public RouteDefinitionLocator routeDefinitionLocator(List routeDefinitionLocators) {
        //这里注入所有RouteDefinitionLocator,我们在配置文件里配置的路由就是通过子类PropertiesRouteDefinitionLocator来完成的,
        //可以参考InMemoryRouteDefinitionRepository实现自己的RouteDefinition的存储,RouteDefinition会在后面转换成Route
        return new CompositeRouteDefinitionLocator(Flux.fromIterable(routeDefinitionLocators));
    }

    //接着看CompositeRouteDefinitionLocator类里的获取路由定义方法:
    public Flux getRouteDefinitions() {
        //即轮训所有RouteDefinitionLocator的市现率调用getRouteDefinitions,这样就把所有RouteDefinition整合到一起了
        return this.delegates.flatMap(RouteDefinitionLocator::getRouteDefinitions);
    }
    
    //接着看GatewayAutoConfiguration,刚才的bean RouteDefinitionLocator 作为参数注入到routeDefinitionLocator
    @Bean
    public RouteLocator routeDefinitionRouteLocator(GatewayProperties properties,
                                                   List GatewayFilters,
                                                   List predicates,
                                                   RouteDefinitionLocator routeDefinitionLocator) {
        return new RouteDefinitionRouteLocator(routeDefinitionLocator, predicates, GatewayFilters, properties);
    }

 //看RouteDefinitionRouteLocator类:
public class RouteDefinitionRouteLocator implements RouteLocator, BeanFactoryAware, ApplicationEventPublisherAware {
    ······

    public RouteDefinitionRouteLocator(RouteDefinitionLocator routeDefinitionLocator,
                                       List predicates,
                                       List gatewayFilterFactories,
                                       GatewayProperties gatewayProperties) {
        this.routeDefinitionLocator = routeDefinitionLocator;
        //RoutePredicateFactory放到map里,去掉RoutePredicateFactory后缀,所以配置时就可以写成
        //predicates:
        //    - Path=/abc/*
        //的形式而不用写完整的PathRoutePredicateFactory    
        initFactories(predicates);
        //把bean名字里的GatewayFilterFactory去掉,所以我们配置文件里配置时也要把GatewayFilterFactory去掉,不然会找不到
        gatewayFilterFactories.forEach(factory -> this.gatewayFilterFactories.put(factory.name(), factory));
        this.gatewayProperties = gatewayProperties;
    }
    ······ 

    @Override
    public Flux getRoutes() {
        return this.routeDefinitionLocator.getRouteDefinitions()
                .map(this::convertToRoute)//获取所有的RouteDefinition后转为Route
                //TODO: error handling
                .map(route -> {
                    if (logger.isDebugEnabled()) {
                        logger.debug("RouteDefinition matched: " + route.getId());
                    }
                    return route;
                });
 
    }

    private Route convertToRoute(RouteDefinition routeDefinition) {
        //获取所有匹配规则,所有的规则都满足才走对应Route
        AsyncPredicate predicate = combinePredicates(routeDefinition);
        //获取全部过滤器
        List gatewayFilters = getFilters(routeDefinition);

        return Route.async(routeDefinition)
                .asyncPredicate(predicate)
                .replaceFilters(gatewayFilters)
                .build();
    }
 

    private List getFilters(RouteDefinition routeDefinition) {
        List filters = new ArrayList<>();

        //把全局过滤器添加到当前路由,loadGatewayFilters调用GatewayFilterFactory里的apply配置类里面的静态Config类,并且把没实现Ordered接口的类用OrderedGatewayFilter代理一下,方便下面排序
        if (!this.gatewayProperties.getDefaultFilters().isEmpty()) {
            filters.addAll(loadGatewayFilters("defaultFilters",
                    this.gatewayProperties.getDefaultFilters()));
        }
        //添加本条路由定义的过滤器
        if (!routeDefinition.getFilters().isEmpty()) {
            filters.addAll(loadGatewayFilters(routeDefinition.getId(), routeDefinition.getFilters()));
        }
        //排序
        AnnotationAwareOrderComparator.sort(filters);
        return filters;
    }

    private AsyncPredicate combinePredicates(RouteDefinition routeDefinition) { 
        //获取路由的判断条件,比如我们大多是根据url判断,用的是PathRoutePredicateFactory
        List predicates = routeDefinition.getPredicates();
        AsyncPredicate predicate = lookup(routeDefinition, predicates.get(0));

        for (PredicateDefinition andPredicate : predicates.subList(1, predicates.size())) {
            AsyncPredicate found = lookup(routeDefinition, andPredicate);
            predicate = predicate.and(found);//如果有多个匹配规则就要满足所有的才可以
        }
        //返回的结果会在RoutePredicateHandlerMapping类里使用apply方法调用 
        return predicate;
    }

    @SuppressWarnings("unchecked")
    private AsyncPredicate lookup(RouteDefinition route, PredicateDefinition predicate) {
        RoutePredicateFactory factory = this.predicates.get(predicate.getName());
        if (factory == null) {
            throw new IllegalArgumentException("Unable to find RoutePredicateFactory with name " + predicate.getName());
        }
        Map args = predicate.getArgs();
        if (logger.isDebugEnabled()) {
            logger.debug("RouteDefinition " + route.getId() + " applying "
                    + args + " to " + predicate.getName());
        }

        Map properties = factory.shortcutType().normalize(args, factory, this.parser, this.beanFactory);
        //获取RoutePredicateFactory实现类里的静态Config
        Object config = factory.newConfig();
        ConfigurationUtils.bind(config, properties,
                factory.shortcutFieldPrefix(), predicate.getName(), validator);
        if (this.publisher != null) {
            this.publisher.publishEvent(new PredicateArgsEvent(this, route.getId(), properties));
        }
        //这里实际调用的是ServerWebExchangeUtils类里的toAsyncPredicate方法,并配置的config
        return factory.applyAsync(config);
    }
}


  // 把刚才的bean RouteDefinitionRouteLocator作为参数注入到routeLocators
    @Bean
    @Primary 
    public RouteLocator cachedCompositeRouteLocator(List routeLocators) {
        //这里CompositeRouteLocator与上面的CompositeRouteDefinitionLocator类似,也是遍历routeLocators调用getRoutes方法整合所有Route
        return new CachingRouteLocator(new CompositeRouteLocator(Flux.fromIterable(routeLocators)));
    }
    //接着看CachingRouteLocator类
    public class CachingRouteLocator implements RouteLocator {

    private final RouteLocator delegate;
    private final Flux routes;
    private final Map cache = new HashMap<>();

    public CachingRouteLocator(RouteLocator delegate) {
        this.delegate = delegate;
        routes = CacheFlux.lookup(cache, "routes", Route.class)
                 //每次请求都会获取routes,每当拿缓存又找不到的时候(即我们修改了路由),就会调用上面CompositeRouteLocator的getRoutes方法重新获取路由
                .onCacheMissResume(() -> this.delegate.getRoutes().sort(AnnotationAwareOrderComparator.INSTANCE));
    }

    ·······

    public Flux refresh() {
        this.cache.clear();//清掉路由缓存
        return this.routes;
    }

    @EventListener(RefreshRoutesEvent.class)
    /* for testing */ void handleRefresh() {//更新路由用的是spring事件机制
        refresh();
    }
}

 //接着看GatewayAutoConfiguration,刚才的bean CachingRouteLocator作为参数注入到RouteLocator
    @Bean
    public RoutePredicateHandlerMapping routePredicateHandlerMapping(
            FilteringWebHandler webHandler, RouteLocator routeLocator,
            GlobalCorsProperties globalCorsProperties, Environment environment) {
        return new RoutePredicateHandlerMapping(webHandler, routeLocator,
                globalCorsProperties, environment);
    }

//接着看RoutePredicateHandlerMapping

public class RoutePredicateHandlerMapping extends AbstractHandlerMapping {

    private final FilteringWebHandler webHandler;
    private final RouteLocator routeLocator;
    private final Integer managmentPort;

    public RoutePredicateHandlerMapping(FilteringWebHandler webHandler, RouteLocator routeLocator, GlobalCorsProperties globalCorsProperties, Environment environment) {
        this.webHandler = webHandler;
        this.routeLocator = routeLocator;

        if (environment.containsProperty("management.server.port")) {
            managmentPort = new Integer(environment.getProperty("management.server.port"));
        } else {
            managmentPort = null;
        }
        //这里设置了顺序,使得网关路由的匹配在requestmapping和handlefunction之后
        setOrder(1);        
        setCorsConfigurations(globalCorsProperties.getCorsConfigurations());
    }

    @Override//方法在源码解析(1)中说的AbstractHandlerMapping类中获取Handler时调用
    protected Mono getHandlerInternal(ServerWebExchange exchange) {
        // don"t handle requests on the management port if set
        if (managmentPort != null && exchange.getRequest().getURI().getPort() == managmentPort.intValue()) {
            //不处理特定配置端口的路由,返回empty即不用网关路由的handler,AbstractHandlerMapping继续找下一个
            return Mono.empty();
        }
        exchange.getAttributes().put(GATEWAY_HANDLER_MAPPER_ATTR, getSimpleName());

        return lookupRoute(exchange)//查找路由
                // .log("route-predicate-handler-mapping", Level.FINER) //name this
                .flatMap((Function>) r -> {
                    exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
                    if (logger.isDebugEnabled()) {
                        logger.debug("Mapping [" + getExchangeDesc(exchange) + "] to " + r);
                    }
                    //把Route放到exchange的属性了,后面有几个过滤器用到了
                    exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);
                    //返回构造该类时传入的FilteringWebHandler,之后的流程就会跟上一篇里说的走下去了
                    return Mono.just(webHandler);
                }).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> {
                   //找不到匹配的路由的话进到这里,之后基本就是404了
                    exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
                    if (logger.isTraceEnabled()) {
                        logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]");
                    }
                })));
    }

     ······

    protected Mono lookupRoute(ServerWebExchange exchange) {
       //调用上面提到的CachingRouteLocator的getRoutes方法获取所有Routes
        return this.routeLocator
                .getRoutes()
                //individually filter routes so that filterWhen error delaying is not a problem
                .concatMap(route -> Mono
                        .just(route)
                        .filterWhen(r -> {//过滤出符合的路由
                            // add the current route we are testing
                            exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());
                            //调用上面说的ServerWebExchangeUtils类里的toAsyncPredicate方法返回的lambda 
                            //里面会调用Predicate的实现类(都是lambda)的test方法返回true或者false
                            return r.getPredicate().apply(exchange);
                        })
                        //instead of immediately stopping main flux due to error, log and swallow it
                        .doOnError(e -> logger.error("Error applying predicate for route: "+route.getId(), e))
                        .onErrorResume(e -> Mono.empty())
                )
                // .defaultIfEmpty() put a static Route not found
                // or .switchIfEmpty()
                // .switchIfEmpty(Mono.empty().log("noroute"))
                .next()//过滤出一个后就停止了
                //TODO: error handling
                .map(route -> {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Route matched: " + route.getId());
                    }
                    validateRoute(route, exchange);
                    return route;
                });
 
    }

   ·····
}

说了那么多终于把网关匹配路由的流程说完了,如果上面都看明白的话,动态路由就好办了
网关已经把InMemoryRouteDefinitionRepository注册成bean(也可以参考这个类自己实现RouteDefinitionRepository接口),我们把它当作个service注入到controller,
前端把RouteDefinition用json的格式post过来,我们调用InMemoryRouteDefinitionRepository的save或者delete方法,再用spring的事件触发RefreshRoutesEvent事件来刷新路由就行了,等下次请求的时候就可以拿到新的路由配置了
顺序是:
1.RoutePredicateHandlerMapping 的lookupRoute方法,由于路由刷新事件把路由缓存清了,所以重新获取
2.CompositeRouteLocator 的getRoutes方法遍历所有RouteLocator实现类的getRoutes方法
3.RouteDefinitionRouteLocator 的getRoutes方法里重新获取了所有的路由定义,也就把我们刚刚用事件添加的路由也获取了

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/77399.html

相关文章

  • Spring Cloud Gateway 扩展支持动态限流

    摘要:以流量为切入点,从流量控制熔断降级系统负载保护等多个维度保护服务的稳定性分布式系统的流量防卫兵。欢迎关注我们获得更多的好玩实践 之前分享过 一篇 《Spring Cloud Gateway 原生的接口限流该怎么玩》, 核心是依赖Spring Cloud Gateway 默认提供的限流过滤器来实现 原生RequestRateLimiter 的不足 配置方式 spring: clou...

    妤锋シ 评论0 收藏0
  • Spring Cloud Gateway 扩展支持动态限流

    摘要:以流量为切入点,从流量控制熔断降级系统负载保护等多个维度保护服务的稳定性分布式系统的流量防卫兵。欢迎关注我们获得更多的好玩实践 之前分享过 一篇 《Spring Cloud Gateway 原生的接口限流该怎么玩》, 核心是依赖Spring Cloud Gateway 默认提供的限流过滤器来实现 原生RequestRateLimiter 的不足 配置方式 spring: clou...

    beanlam 评论0 收藏0
  • spring cloud gateway 源码解析(1)整体流程

    摘要:公司要做自己的网关,于是先把的过了一遍,然后把源码在看了一遍,这样公司的需求就搞定了。包括动态路由,多纬度限流,记录请求参数及返回参数也可修改。至此,流程就走完了。 公司要做自己的网关,于是先把github的issue过了一遍,然后把gateway源码在看了一遍,这样公司的需求就搞定了。包括动态路由,多纬度限流,记录请求参数及返回参数(也可修改)。先从请求进入网关说起吧: 请求先进...

    miqt 评论0 收藏0
  • 如何使用SpringCloud进行灰度发布

    摘要:灰度发布是指在黑与白之间,能够平滑过渡的一种发布方式。如何使用进行灰度发布呢将分一下四步第一,设置网关权重路由设置中提供了去实现根据分组设置权重进行路由,因此使用起来相对比较简单,有兴趣的可以阅读源码。 灰度发布是指在黑与白之间,能够平滑过渡的一种发布方式。在其上可以进行A/B testing,即让一部分用户继续用产品特性A,一部分用户开始用产品特性B,如果用户对B没有什么反对意见,那...

    Jackwoo 评论0 收藏0

发表评论

0条评论

刘永祥

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<