资讯专栏INFORMATION COLUMN

聊聊lettuce的sentinel连接

shusen / 3524人阅读

摘要:而部署多个来实现高可用,假设一个挂了,则端使用下一个来获取地址

本文主要研究一下lettuce的sentinel连接

RedisClient.connectSentinel

lettuce-core-5.0.4.RELEASE-sources.jar!/io/lettuce/core/RedisClient.java

    private  StatefulRedisSentinelConnection connectSentinel(RedisCodec codec, RedisURI redisURI,
            Duration timeout) {
        assertNotNull(codec);
        checkValidRedisURI(redisURI);

        ConnectionBuilder connectionBuilder = ConnectionBuilder.connectionBuilder();
        connectionBuilder.clientOptions(ClientOptions.copyOf(getOptions()));
        connectionBuilder.clientResources(clientResources);

        DefaultEndpoint endpoint = new DefaultEndpoint(clientOptions);

        StatefulRedisSentinelConnectionImpl connection = newStatefulRedisSentinelConnection(endpoint, codec, timeout);

        logger.debug("Trying to get a Redis Sentinel connection for one of: " + redisURI.getSentinels());

        connectionBuilder.endpoint(endpoint).commandHandler(() -> new CommandHandler(clientOptions, clientResources, endpoint))
                .connection(connection);
        connectionBuilder(getSocketAddressSupplier(redisURI), connectionBuilder, redisURI);

        if (clientOptions.isPingBeforeActivateConnection()) {
            connectionBuilder.enablePingBeforeConnect();
        }

        if (redisURI.getSentinels().isEmpty() && (isNotEmpty(redisURI.getHost()) || !isEmpty(redisURI.getSocket()))) {
            channelType(connectionBuilder, redisURI);
            try {
                getConnection(initializeChannelAsync(connectionBuilder));
            } catch (RuntimeException e) {
                connection.close();
                throw e;
            }
        } else {

            boolean connected = false;
            boolean first = true;
            Exception causingException = null;
            validateUrisAreOfSameConnectionType(redisURI.getSentinels());

            for (RedisURI uri : redisURI.getSentinels()) {
                if (first) {
                    channelType(connectionBuilder, uri);
                    first = false;
                }
                connectionBuilder.socketAddressSupplier(getSocketAddressSupplier(uri));

                if (logger.isDebugEnabled()) {
                    SocketAddress socketAddress = SocketAddressResolver.resolve(uri, clientResources.dnsResolver());
                    logger.debug("Connecting to Redis Sentinel, address: " + socketAddress);
                }
                try {
                    getConnection(initializeChannelAsync(connectionBuilder));
                    connected = true;
                    break;
                } catch (Exception e) {
                    logger.warn("Cannot connect Redis Sentinel at " + uri + ": " + e.toString());
                    causingException = e;
                }
            }

            if (!connected) {
                connection.close();
                throw new RedisConnectionException("Cannot connect to a Redis Sentinel: " + redisURI.getSentinels(),
                        causingException);
            }
        }

        if (LettuceStrings.isNotEmpty(redisURI.getClientName())) {
            connection.setClientName(redisURI.getClientName());
        }

        return connection;
    }

connectSentinel方法,会遍历sentinel,挨个取master获取连接,如果连接不上或抛异常则继续用下一个sentinel获取

如果遍历完sentinel都抛异常,则最后抛出RedisConnectionException("Cannot connect to a Redis Sentinel: " + redisURI.getSentinels(),causingException)

这里会调用AbstractRedisClient的initializeChannelAsync方法

AbstractRedisClient.initializeChannelAsync

lettuce-core-5.0.4.RELEASE-sources.jar!/io/lettuce/core/AbstractRedisClient.java

    /**
     * Connect and initialize a channel from {@link ConnectionBuilder}.
     *
     * @param connectionBuilder must not be {@literal null}.
     * @return the {@link ConnectionFuture} to synchronize the connection process.
     * @since 4.4
     */
    @SuppressWarnings("unchecked")
    protected > ConnectionFuture initializeChannelAsync(
            ConnectionBuilder connectionBuilder) {

        SocketAddress redisAddress = connectionBuilder.socketAddress();

        if (clientResources.eventExecutorGroup().isShuttingDown()) {
            throw new IllegalStateException("Cannot connect, Event executor group is terminated.");
        }

        logger.debug("Connecting to Redis at {}", redisAddress);

        CompletableFuture channelReadyFuture = new CompletableFuture<>();
        Bootstrap redisBootstrap = connectionBuilder.bootstrap();

        RedisChannelInitializer initializer = connectionBuilder.build();
        redisBootstrap.handler(initializer);

        clientResources.nettyCustomizer().afterBootstrapInitialized(redisBootstrap);
        CompletableFuture initFuture = initializer.channelInitialized();
        ChannelFuture connectFuture = redisBootstrap.connect(redisAddress);

        connectFuture.addListener(future -> {

            if (!future.isSuccess()) {

                logger.debug("Connecting to Redis at {}: {}", redisAddress, future.cause());
                connectionBuilder.endpoint().initialState();
                channelReadyFuture.completeExceptionally(future.cause());
                return;
            }

            initFuture.whenComplete((success, throwable) -> {

                if (throwable == null) {
                    logger.debug("Connecting to Redis at {}: Success", redisAddress);
                    RedisChannelHandler connection = connectionBuilder.connection();
                    connection.registerCloseables(closeableResources, connection);
                    channelReadyFuture.complete(connectFuture.channel());
                    return;
                }

                logger.debug("Connecting to Redis at {}, initialization: {}", redisAddress, throwable);
                connectionBuilder.endpoint().initialState();
                Throwable failure;

                if (throwable instanceof RedisConnectionException) {
                    failure = throwable;
                } else if (throwable instanceof TimeoutException) {
                    failure = new RedisConnectionException("Could not initialize channel within "
                            + connectionBuilder.getTimeout(), throwable);
                } else {
                    failure = throwable;
                }
                channelReadyFuture.completeExceptionally(failure);

                CompletableFuture response = new CompletableFuture<>();
                response.completeExceptionally(failure);

            });
        });

        return new DefaultConnectionFuture(redisAddress, channelReadyFuture.thenApply(channel -> (T) connectionBuilder
                .connection()));
    }

这里initializeChannelAsync的时候,会调用connectionBuilder.socketAddress()方法,进而调用RedisClient的getSocketAddress方法

RedisClient.getSocketAddress

lettuce-core-5.0.4.RELEASE-sources.jar!/io/lettuce/core/RedisClient.java

    protected SocketAddress getSocketAddress(RedisURI redisURI) throws InterruptedException, TimeoutException,
            ExecutionException {
        SocketAddress redisAddress;

        if (redisURI.getSentinelMasterId() != null && !redisURI.getSentinels().isEmpty()) {
            logger.debug("Connecting to Redis using Sentinels {}, MasterId {}", redisURI.getSentinels(),
                    redisURI.getSentinelMasterId());
            redisAddress = lookupRedis(redisURI);

            if (redisAddress == null) {
                throw new RedisConnectionException("Cannot provide redisAddress using sentinel for masterId "
                        + redisURI.getSentinelMasterId());
            }

        } else {
            redisAddress = SocketAddressResolver.resolve(redisURI, clientResources.dnsResolver());
        }
        return redisAddress;
    }

    private SocketAddress lookupRedis(RedisURI sentinelUri) throws InterruptedException, TimeoutException, ExecutionException {
        try (StatefulRedisSentinelConnection connection = connectSentinel(sentinelUri)) {
            return connection.async().getMasterAddrByName(sentinelUri.getSentinelMasterId())
                    .get(timeout.toNanos(), TimeUnit.NANOSECONDS);
        }
    }

getSocketAddress方法会调用lookupRedis方法,而lookupRedis方法则调用getMasterAddrByName方法,通过sentinel来获取master的ip地址

小结

redis的sentinel类似于一个master的服务发现中心,假设master有故障,则通过sentinel获取新的master实现failover。

而sentinel部署多个来实现高可用,假设一个sentinel挂了,则client端使用下一个sentinel来获取master地址

doc

lettuce Redis-Sentinel

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/36835.html

相关文章

  • 聊聊spring-boot-starter-data-redis配置变更

    摘要:序本文主要研究一下的配置变更配置变更以前是的版本的为版本,最近切到版本为版本,发现配置有变更。 序 本文主要研究一下spring-boot-starter-data-redis的配置变更 配置变更 以前是spring-boot的1.4.x版本的(spring-data-redis为1.7.x版本),最近切到2.0.4.RELEASEB版本(spring-data-redis为2.0.5...

    Labradors 评论0 收藏0
  • 聊聊spring-data-redis连接校验

    摘要:序本文主要研究一下的连接池的校验这里调用来创建连接池这里使用了这里继承了,重写了等方法,这里是通过来判断是通过字段来判断的,而在或者的时候变为,初始化以及在的时候变为可以看到对于这种造成的,这种方式 序 本文主要研究一下spring-data-redis的连接池的校验 lettuce LettucePoolingConnectionProvider spring-data-redis/...

    vibiu 评论0 收藏0
  • [case37]聊聊lettuceshareNativeConnection参数

    摘要:序本文主要研究一下的参数可以看到这里的默认为,表示多个将共享一个如果该值为,则及方法使用的是在为的时候,调用的是要注意这里维护了,第一个为的时候,才调用去获取另外要注意,这里的,默认是的,也就是说只要不为,就不会归还,每次 序 本文主要研究一下lettuce的shareNativeConnection参数 LettuceConnectionFactory spring-data-red...

    kgbook 评论0 收藏0
  • 聊聊redisTemplate对lettuce封装

    摘要:序本文主要研究一下对的封装内部有诸多方法,这里以为例该方法内部是先获取,然后调用方法默认这里的是最后这里还会调用来释放连接通过获取该类实际是委托给来执行对于的类库来说,这个是以方法为例,这是调用方法 序 本文主要研究一下redisTemplate对lettuce的封装 RedisTemplate spring-data-redis-2.0.10.RELEASE-sources.jar!...

    mylxsw 评论0 收藏0
  • docker搭建redis哨兵集群并且整合springboot实现

    一 创建redis和sentinel因为我们要存放docker-compose.yml文件,所以需要创建redis和sentinel两个文件夹二 创建docker-compose.ymldocker-compose.ymlversion:"3" services: master: image:redis:latest container_name:my_redis_master...

    3119555200 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<