摘要:序本文主要研究一下的继承了,其类型为,它定义及两个属性其方法使用创建,同时还创建了,最后返回包含二者的判断是否小于,小于则使用创建,否则创建最后创建的是实例的构造器给分配了小结继承了,其类型为,它定义及两个属性其方
序
本文主要研究一下Elasticsearch的FixedExecutorBuilder
FixedExecutorBuilderelasticsearch-7.0.1/server/src/main/java/org/elasticsearch/threadpool/FixedExecutorBuilder.java
public final class FixedExecutorBuilder extends ExecutorBuilder{ private final Setting sizeSetting; private final Setting queueSizeSetting; /** * Construct a fixed executor builder; the settings will have the key prefix "thread_pool." followed by the executor name. * * @param settings the node-level settings * @param name the name of the executor * @param size the fixed number of threads * @param queueSize the size of the backing queue, -1 for unbounded */ FixedExecutorBuilder(final Settings settings, final String name, final int size, final int queueSize) { this(settings, name, size, queueSize, "thread_pool." + name); } /** * Construct a fixed executor builder. * * @param settings the node-level settings * @param name the name of the executor * @param size the fixed number of threads * @param queueSize the size of the backing queue, -1 for unbounded * @param prefix the prefix for the settings keys */ public FixedExecutorBuilder(final Settings settings, final String name, final int size, final int queueSize, final String prefix) { super(name); final String sizeKey = settingsKey(prefix, "size"); this.sizeSetting = new Setting<>( sizeKey, s -> Integer.toString(size), s -> Setting.parseInt(s, 1, applyHardSizeLimit(settings, name), sizeKey), Setting.Property.NodeScope); final String queueSizeKey = settingsKey(prefix, "queue_size"); this.queueSizeSetting = Setting.intSetting(queueSizeKey, queueSize, Setting.Property.NodeScope); } @Override public List > getRegisteredSettings() { return Arrays.asList(sizeSetting, queueSizeSetting); } @Override FixedExecutorSettings getSettings(Settings settings) { final String nodeName = Node.NODE_NAME_SETTING.get(settings); final int size = sizeSetting.get(settings); final int queueSize = queueSizeSetting.get(settings); return new FixedExecutorSettings(nodeName, size, queueSize); } @Override ThreadPool.ExecutorHolder build(final FixedExecutorSettings settings, final ThreadContext threadContext) { int size = settings.size; int queueSize = settings.queueSize; final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(EsExecutors.threadName(settings.nodeName, name())); final ExecutorService executor = EsExecutors.newFixed(settings.nodeName + "/" + name(), size, queueSize, threadFactory, threadContext); final ThreadPool.Info info = new ThreadPool.Info(name(), ThreadPool.ThreadPoolType.FIXED, size, size, null, queueSize < 0 ? null : new SizeValue(queueSize)); return new ThreadPool.ExecutorHolder(executor, info); } @Override String formatInfo(ThreadPool.Info info) { return String.format( Locale.ROOT, "name [%s], size [%d], queue size [%s]", info.getName(), info.getMax(), info.getQueueSize() == null ? "unbounded" : info.getQueueSize()); } static class FixedExecutorSettings extends ExecutorBuilder.ExecutorSettings { private final int size; private final int queueSize; FixedExecutorSettings(final String nodeName, final int size, final int queueSize) { super(nodeName); this.size = size; this.queueSize = queueSize; } } }
FixedExecutorBuilder继承了ExecutorBuilder,其ExecutorSettings类型为FixedExecutorSettings,它定义size及queueSize两个属性;其build方法使用EsExecutors.newFixed创建ExecutorService,同时还创建了ThreadPool.Info,最后返回包含二者的ThreadPool.ExecutorHolder
EsExecutors.newFixedelasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java
public class EsExecutors { //...... public static EsThreadPoolExecutor newFixed(String name, int size, int queueCapacity, ThreadFactory threadFactory, ThreadContext contextHolder) { BlockingQueuequeue; if (queueCapacity < 0) { queue = ConcurrentCollections.newBlockingQueue(); } else { queue = new SizeBlockingQueue<>(ConcurrentCollections. newBlockingQueue(), queueCapacity); } return new EsThreadPoolExecutor(name, size, size, 0, TimeUnit.MILLISECONDS, queue, threadFactory, new EsAbortPolicy(), contextHolder); } //...... }
EsExecutors.newFixed判断queueCapacity是否小于0,小于0则使用ConcurrentCollections.newBlockingQueue()创建BlockingQueue,否则创建SizeBlockingQueue;最后创建的是EsThreadPoolExecutor
实例elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java
public class ThreadPool implements Scheduler, Closeable { //...... public ThreadPool(final Settings settings, final ExecutorBuilder>... customBuilders) { assert Node.NODE_NAME_SETTING.exists(settings); final Mapbuilders = new HashMap<>(); final int availableProcessors = EsExecutors.numberOfProcessors(settings); final int halfProcMaxAt5 = halfNumberOfProcessorsMaxFive(availableProcessors); final int halfProcMaxAt10 = halfNumberOfProcessorsMaxTen(availableProcessors); final int genericThreadPoolMax = boundedBy(4 * availableProcessors, 128, 512); builders.put(Names.GENERIC, new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30))); builders.put(Names.WRITE, new FixedExecutorBuilder(settings, Names.WRITE, availableProcessors, 200)); builders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, availableProcessors, 1000)); builders.put(Names.ANALYZE, new FixedExecutorBuilder(settings, Names.ANALYZE, 1, 16)); builders.put(Names.SEARCH, new AutoQueueAdjustingExecutorBuilder(settings, Names.SEARCH, searchThreadPoolSize(availableProcessors), 1000, 1000, 1000, 2000)); builders.put(Names.SEARCH_THROTTLED, new AutoQueueAdjustingExecutorBuilder(settings, Names.SEARCH_THROTTLED, 1, 100, 100, 100, 200)); builders.put(Names.MANAGEMENT, new ScalingExecutorBuilder(Names.MANAGEMENT, 1, 5, TimeValue.timeValueMinutes(5))); // no queue as this means clients will need to handle rejections on listener queue even if the operation succeeded // the assumption here is that the listeners should be very lightweight on the listeners side builders.put(Names.LISTENER, new FixedExecutorBuilder(settings, Names.LISTENER, halfProcMaxAt10, -1)); builders.put(Names.FLUSH, new ScalingExecutorBuilder(Names.FLUSH, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5))); builders.put(Names.REFRESH, new ScalingExecutorBuilder(Names.REFRESH, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5))); builders.put(Names.WARMER, new ScalingExecutorBuilder(Names.WARMER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5))); builders.put(Names.SNAPSHOT, new ScalingExecutorBuilder(Names.SNAPSHOT, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5))); builders.put(Names.FETCH_SHARD_STARTED, new ScalingExecutorBuilder(Names.FETCH_SHARD_STARTED, 1, 2 * availableProcessors, TimeValue.timeValueMinutes(5))); builders.put(Names.FORCE_MERGE, new FixedExecutorBuilder(settings, Names.FORCE_MERGE, 1, -1)); builders.put(Names.FETCH_SHARD_STORE, new ScalingExecutorBuilder(Names.FETCH_SHARD_STORE, 1, 2 * availableProcessors, TimeValue.timeValueMinutes(5))); for (final ExecutorBuilder> builder : customBuilders) { if (builders.containsKey(builder.name())) { throw new IllegalArgumentException("builder with name [" + builder.name() + "] already exists"); } builders.put(builder.name(), builder); } this.builders = Collections.unmodifiableMap(builders); threadContext = new ThreadContext(settings); final Map executors = new HashMap<>(); for (final Map.Entry entry : builders.entrySet()) { final ExecutorBuilder.ExecutorSettings executorSettings = entry.getValue().getSettings(settings); final ExecutorHolder executorHolder = entry.getValue().build(executorSettings, threadContext); if (executors.containsKey(executorHolder.info.getName())) { throw new IllegalStateException("duplicate executors with name [" + executorHolder.info.getName() + "] registered"); } logger.debug("created thread pool: {}", entry.getValue().formatInfo(executorHolder.info)); executors.put(entry.getKey(), executorHolder); } executors.put(Names.SAME, new ExecutorHolder(DIRECT_EXECUTOR, new Info(Names.SAME, ThreadPoolType.DIRECT))); this.executors = unmodifiableMap(executors); final List infos = executors .values() .stream() .filter(holder -> holder.info.getName().equals("same") == false) .map(holder -> holder.info) .collect(Collectors.toList()); this.threadPoolInfo = new ThreadPoolInfo(infos); this.scheduler = Scheduler.initScheduler(settings); TimeValue estimatedTimeInterval = ESTIMATED_TIME_INTERVAL_SETTING.get(settings); this.cachedTimeThread = new CachedTimeThread(EsExecutors.threadName(settings, "[timer]"), estimatedTimeInterval.millis()); this.cachedTimeThread.start(); } //...... }
ThreadPool的构造器给Names.WRITE、Names.GET、Names.ANALYZE、Names.LISTENER、Names.FORCE_MERGE分配了FixedExecutorBuilder
小结FixedExecutorBuilder继承了ExecutorBuilder,其ExecutorSettings类型为FixedExecutorSettings,它定义size及queueSize两个属性;其build方法使用EsExecutors.newFixed创建ExecutorService,同时还创建了ThreadPool.Info,最后返回包含二者的ThreadPool.ExecutorHolder
docFixedExecutorBuilder
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/34507.html
摘要:序本文主要研究一下的实现了,它是线程安全的其构造器要求输入及越大表示新数据权重越大旧数据权重越小返回的是的值,不过它存储的是的形式,返回的时候使用转换会方法使用计算新值,然后使用方法来实现原子更新实例方法测试算法的计算逻辑测试 序 本文主要研究一下Elasticsearch的ExponentiallyWeightedMovingAverage ExponentiallyWeighted...
摘要:序本文主要研究一下的继承了接口提供静态方法用于更方便地使用实例在中使用关闭了小结提供静态方法用于更方便地使用 序 本文主要研究一下Elasticsearch的Releasables Releasable elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/lease/Releasable.java publ...
摘要:序本文主要研究一下的继承了接口,并指定类型为实现了接口,它内部使用实现提供了及两个静态方法用于创建其中方法创建为,为,为的小结继承了接口,并指定类型为实现了接口,它内部使用实现提供了及两个静态方法用于创建其中方法创建为,为,为的 序 本文主要研究一下Elasticsearch的ConcurrentMapLong ConcurrentMapLong elasticsearch-7.0.1...
摘要:序本文主要研究一下的接口定义了方法,该方法返回,另外还定义了一个方法,默认返回的方法返回了一系列,其中包括等要求不得小于要求是对开启的话要求是以后,避免小结接口定义了方法,该方法返回,另外还定义了一个方法,默认返回的方法返回了一 序 本文主要研究一下Elasticsearch的BootstrapCheck BootstrapCheck elasticsearch-7.0.1/serve...
摘要:序本文主要研究一下的实现了接口,其方法使用来选择数组的下标,然后返回该下标的值的构造器创建了两个,分别是及方法执行的是方法执行的是的及方法都接收参数,通过该来选取小结实现了接口,其方法使用来选择数组的下标,然后返回该下标的值的构造器创 序 本文主要研究一下Elasticsearch的RoundRobinSupplier RoundRobinSupplier elasticsearch-...
阅读 2450·2021-11-25 09:43
阅读 409·2021-11-12 10:36
阅读 4157·2021-11-08 13:18
阅读 2015·2021-09-06 15:00
阅读 2848·2019-08-30 15:56
阅读 790·2019-08-30 13:57
阅读 1880·2019-08-30 13:48
阅读 1288·2019-08-30 11:13