资讯专栏INFORMATION COLUMN

聊聊Elasticsearch的RoundRobinSupplier

baoxl / 3274人阅读

摘要:序本文主要研究一下的实现了接口,其方法使用来选择数组的下标,然后返回该下标的值的构造器创建了两个,分别是及方法执行的是方法执行的是的及方法都接收参数,通过该来选取小结实现了接口,其方法使用来选择数组的下标,然后返回该下标的值的构造器创

本文主要研究一下Elasticsearch的RoundRobinSupplier

RoundRobinSupplier

elasticsearch-7.0.1/libs/nio/src/main/java/org/elasticsearch/nio/RoundRobinSupplier.java

final class RoundRobinSupplier implements Supplier {

    private final AtomicBoolean selectorsSet = new AtomicBoolean(false);
    private volatile S[] selectors;
    private AtomicInteger counter = new AtomicInteger(0);

    RoundRobinSupplier() {
        this.selectors = null;
    }

    RoundRobinSupplier(S[] selectors) {
        this.selectors = selectors;
        this.selectorsSet.set(true);
    }

    @Override
    public S get() {
        S[] selectors = this.selectors;
        return selectors[counter.getAndIncrement() % selectors.length];
    }

    void setSelectors(S[] selectors) {
        if (selectorsSet.compareAndSet(false, true)) {
            this.selectors = selectors;
        } else {
            throw new AssertionError("Selectors already set. Should only be set once.");
        }
    }

    int count() {
        return selectors.length;
    }
}

RoundRobinSupplier实现了Supplier接口,其get方法使用counter.getAndIncrement() % selectors.length来选择selectors数组的下标,然后返回该下标的值

NioSelectorGroup

elasticsearch-7.0.1/libs/nio/src/main/java/org/elasticsearch/nio/NioSelectorGroup.java

public class NioSelectorGroup implements NioGroup {

    private final List dedicatedAcceptors;
    private final RoundRobinSupplier acceptorSupplier;

    private final List selectors;
    private final RoundRobinSupplier selectorSupplier;

    private final AtomicBoolean isOpen = new AtomicBoolean(true);

    //......

    public NioSelectorGroup(ThreadFactory acceptorThreadFactory, int dedicatedAcceptorCount, ThreadFactory selectorThreadFactory,
                            int selectorCount, Function, EventHandler> eventHandlerFunction) throws IOException {
        dedicatedAcceptors = new ArrayList<>(dedicatedAcceptorCount);
        selectors = new ArrayList<>(selectorCount);

        try {
            List> suppliersToSet = new ArrayList<>(selectorCount);
            for (int i = 0; i < selectorCount; ++i) {
                RoundRobinSupplier supplier = new RoundRobinSupplier<>();
                suppliersToSet.add(supplier);
                NioSelector selector = new NioSelector(eventHandlerFunction.apply(supplier));
                selectors.add(selector);
            }
            for (RoundRobinSupplier supplierToSet : suppliersToSet) {
                supplierToSet.setSelectors(selectors.toArray(new NioSelector[0]));
                assert supplierToSet.count() == selectors.size() : "Supplier should have same count as selector list.";
            }

            for (int i = 0; i < dedicatedAcceptorCount; ++i) {
                RoundRobinSupplier supplier = new RoundRobinSupplier<>(selectors.toArray(new NioSelector[0]));
                NioSelector acceptor = new NioSelector(eventHandlerFunction.apply(supplier));
                dedicatedAcceptors.add(acceptor);
            }

            if (dedicatedAcceptorCount != 0) {
                acceptorSupplier = new RoundRobinSupplier<>(dedicatedAcceptors.toArray(new NioSelector[0]));
            } else {
                acceptorSupplier = new RoundRobinSupplier<>(selectors.toArray(new NioSelector[0]));
            }
            selectorSupplier = new RoundRobinSupplier<>(selectors.toArray(new NioSelector[0]));
            assert selectorCount == selectors.size() : "We need to have created all the selectors at this point.";
            assert dedicatedAcceptorCount == dedicatedAcceptors.size() : "We need to have created all the acceptors at this point.";

            startSelectors(selectors, selectorThreadFactory);
            startSelectors(dedicatedAcceptors, acceptorThreadFactory);
        } catch (Exception e) {
            try {
                close();
            } catch (Exception e1) {
                e.addSuppressed(e1);
            }
            throw e;
        }
    }

    public  S bindServerChannel(InetSocketAddress address, ChannelFactory factory)
        throws IOException {
        ensureOpen();
        return factory.openNioServerSocketChannel(address, acceptorSupplier);
    }

    @Override
    public  S openChannel(InetSocketAddress address, ChannelFactory factory) throws IOException {
        ensureOpen();
        return factory.openNioChannel(address, selectorSupplier);
    }    

    //......
}

NioSelectorGroup的构造器创建了两个RoundRobinSupplier,分别是acceptorSupplier及selectorSupplier;bindServerChannel方法执行的是factory.openNioServerSocketChannel(address, acceptorSupplier);openChannel方法执行的是factory.openNioChannel(address, selectorSupplier)

ChannelFactory

elasticsearch-7.0.1/libs/nio/src/main/java/org/elasticsearch/nio/ChannelFactory.java

public abstract class ChannelFactory {
    //......

    public ServerSocket openNioServerSocketChannel(InetSocketAddress address, Supplier supplier) throws IOException {
        ServerSocketChannel rawChannel = rawChannelFactory.openNioServerSocketChannel(address);
        NioSelector selector = supplier.get();
        ServerSocket serverChannel = internalCreateServerChannel(selector, rawChannel);
        scheduleServerChannel(serverChannel, selector);
        return serverChannel;
    }

    public Socket openNioChannel(InetSocketAddress remoteAddress, Supplier supplier) throws IOException {
        SocketChannel rawChannel = rawChannelFactory.openNioChannel(remoteAddress);
        NioSelector selector = supplier.get();
        Socket channel = internalCreateChannel(selector, rawChannel);
        scheduleChannel(channel, selector);
        return channel;
    }

    //......
}

ChannelFactory的openNioServerSocketChannel及openNioChannel方法都接收Supplier参数,通过该supplier来选取NioSelector

小结

RoundRobinSupplier实现了Supplier接口,其get方法使用counter.getAndIncrement() % selectors.length来选择selectors数组的下标,然后返回该下标的值

NioSelectorGroup的构造器创建了两个RoundRobinSupplier,分别是acceptorSupplier及selectorSupplier;bindServerChannel方法执行的是factory.openNioServerSocketChannel(address, acceptorSupplier);openChannel方法执行的是factory.openNioChannel(address, selectorSupplier)

ChannelFactory的openNioServerSocketChannel及openNioChannel方法都接收Supplier参数,通过该supplier来选取NioSelector

doc

RoundRobinSupplier

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/34479.html

相关文章

  • 聊聊ElasticsearchExponentiallyWeightedMovingAverage

    摘要:序本文主要研究一下的实现了,它是线程安全的其构造器要求输入及越大表示新数据权重越大旧数据权重越小返回的是的值,不过它存储的是的形式,返回的时候使用转换会方法使用计算新值,然后使用方法来实现原子更新实例方法测试算法的计算逻辑测试 序 本文主要研究一下Elasticsearch的ExponentiallyWeightedMovingAverage ExponentiallyWeighted...

    Tony_Zby 评论0 收藏0
  • 聊聊ElasticsearchReleasables

    摘要:序本文主要研究一下的继承了接口提供静态方法用于更方便地使用实例在中使用关闭了小结提供静态方法用于更方便地使用 序 本文主要研究一下Elasticsearch的Releasables Releasable elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/lease/Releasable.java publ...

    null1145 评论0 收藏0
  • 聊聊ElasticsearchConcurrentMapLong

    摘要:序本文主要研究一下的继承了接口,并指定类型为实现了接口,它内部使用实现提供了及两个静态方法用于创建其中方法创建为,为,为的小结继承了接口,并指定类型为实现了接口,它内部使用实现提供了及两个静态方法用于创建其中方法创建为,为,为的 序 本文主要研究一下Elasticsearch的ConcurrentMapLong ConcurrentMapLong elasticsearch-7.0.1...

    lidashuang 评论0 收藏0
  • 聊聊ElasticsearchBootstrapCheck

    摘要:序本文主要研究一下的接口定义了方法,该方法返回,另外还定义了一个方法,默认返回的方法返回了一系列,其中包括等要求不得小于要求是对开启的话要求是以后,避免小结接口定义了方法,该方法返回,另外还定义了一个方法,默认返回的方法返回了一 序 本文主要研究一下Elasticsearch的BootstrapCheck BootstrapCheck elasticsearch-7.0.1/serve...

    Alex 评论0 收藏0
  • 聊聊ElasticsearchRunOnce

    摘要:序本文主要研究一下的实现了接口,它的构造器要求输入,同时构造了变量方法会先使用将由设置为,如果成功则执行代理的的方法方法则返回值实例方法验证了顺序多次执行的场景方法则验证了并发多次执行的场景则验证了使用作为的场景小结实现了接口,它的构 序 本文主要研究一下Elasticsearch的RunOnce RunOnce elasticsearch-7.0.1/server/src/main/...

    lindroid 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<