1 /*
2 * JBoss, Home of Professional Open Source.
3 * Copyright 2017 Red Hat, Inc., and individual contributors
4 * as indicated by the @author tags.
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.jboss.threads;
20
21 import static java.lang.Math.max;
22 import static java.lang.Math.min;
23 import static java.lang.Thread.currentThread;
24 import static java.security.AccessController.doPrivileged;
25 import static java.security.AccessController.getContext;
26 import static java.util.concurrent.locks.LockSupport.parkNanos;
27 import static java.util.concurrent.locks.LockSupport.unpark;
28 import static org.jboss.threads.JBossExecutors.unsafe;
29
30 import java.lang.management.ManagementFactory;
31 import java.security.AccessControlContext;
32 import java.security.PrivilegedAction;
33 import java.time.Duration;
34 import java.time.temporal.ChronoUnit;
35 import java.util.ArrayList;
36 import java.util.Collections;
37 import java.util.Hashtable;
38 import java.util.List;
39 import java.util.Set;
40 import java.util.concurrent.ConcurrentHashMap;
41 import java.util.concurrent.Executor;
42 import java.util.concurrent.Executors;
43 import java.util.concurrent.RejectedExecutionException;
44 import java.util.concurrent.ThreadFactory;
45 import java.util.concurrent.ThreadLocalRandom;
46 import java.util.concurrent.TimeUnit;
47 import java.util.concurrent.atomic.LongAdder;
48 import java.util.concurrent.locks.LockSupport;
49
50 import javax.management.ObjectInstance;
51 import javax.management.ObjectName;
52
53 import org.jboss.threads.management.ManageableThreadPoolExecutorService;
54 import org.jboss.threads.management.StandardThreadPoolMXBean;
55 import org.wildfly.common.Assert;
56 import org.wildfly.common.cpu.ProcessorInfo;
57
58 /**
59 * A task-or-thread queue backed thread pool executor service. Tasks are added in a FIFO manner, and consumers in a LIFO manner.
60 * Threads are only ever created in the event that there are no idle threads available to service a task, which, when
61 * combined with the LIFO-based consumer behavior, means that the thread pool will generally trend towards the minimum
62 * necessary size. In addition, the optional {@linkplain #setGrowthResistance(float) growth resistance feature} can
63 * be used to further govern the thread pool size.
64 * <p>
65 * New instances of this thread pool are created by constructing and configuring a {@link Builder} instance, and calling
66 * its {@link Builder#build() build()} method.
67 *
68 * @author <a href="mailto:david.lloyd@redhat.com">David M. Lloyd</a>
69 */
70 public final class EnhancedQueueExecutor extends EnhancedQueueExecutorBase6 implements ManageableThreadPoolExecutorService {
71 private static final Thread[] NO_THREADS = new Thread[0];
72
73 static {
74 Version.getVersionString();
75 MBeanUnregisterAction.forceInit();
76 }
77
78 /*
79 ┌──────────────────────────┐
80 │ Explanation of operation │
81 └──────────────────────────┘
82
83 The primary mechanism of this executor is the special linked list/stack. This list has the following properties:
84 • Multiple node types:
85 ◦ Task nodes (TaskNode), which have the following properties:
86 ▪ Strongly connected (no task is ever removed from the middle of the list; tail can always be found by following .next pointers)
87 ▪ FIFO (insertion at tail.next, "removal" at head.next)
88 ▪ Head and tail always refer to TaskNodes; head.next/tail.next are the "true" action points for all node types
89 ▪ At least one dead task node is always in the list, thus the task is cleared after execution to avoid leaks
90 ◦ Waiting consumer thread nodes (PoolThreadNode), which have the following properties:
91 ▪ LIFO/FILO (insertion and removal at tail.next)
92 ▪ Carrier for task handoff between producer and waiting consumer
93 ◦ Waiting termination (awaitTermination) thread nodes (TerminateWaiterNode), which have the following properties:
94 ▪ Append-only (insertion at tail.next.next...next)
95 ▪ All removed at once when termination is complete
96 ▪ If a thread stops waiting, the node remains (but its thread field is cleared to prevent (best-effort) useless unparks)
97 ▪ Once cleared, the thread field can never be reinitialized
98 • Concurrently accessed (multiple threads may read the list and update the head and tail pointers)
99 • TaskNode.next may be any node type or null
100 • PoolThreadNode.next may only be another PoolThreadNode or null
101 • TerminateWaiterNode.next may only be another TerminateWaiterNode or null
102
103 The secondary mechanism is the thread status atomic variable. It is logically a structure with the following fields:
104 • Current thread count (currentSizeOf(), withCurrentSize())
105 • Core pool size (coreSizeOf(), withCoreSize())
106 • Max pool size (maxSizeOf(), withMaxSize())
107 • Shutdown-requested flag (isShutdownRequested(), withShutdownRequested())
108 • Shutdown-with-interrupt requested flag (isShutdownInterrupt(), withShutdownInterrupt())
109 • Shutdown-completed flag (isShutdownComplete(), withShutdownComplete())
110 • The decision to create a new thread is affected by whether the number of active threads is less than the core size;
111 if not, then the growth resistance factor is applied to decide whether the task should be enqueued or a new thread begun.
112 Note: the default growth resistance factor is 0% resistance.
113
114 The final mechanism is the queue status atomic variable. It is logically a structure with the following fields:
115 • Current queue size (currentQueueSizeOf(), withCurrentQueueSize())
116 • Queue size limit (maxQueueSizeOf(), withMaxQueueSize())
117
118 */
119
120 // =======================================================
121 // Optimization control flags
122 // =======================================================
123
124 /**
125 * A global hint which establishes whether it is recommended to disable uses of {@code EnhancedQueueExecutor}.
126 * This hint defaults to {@code false} but can be changed to {@code true} by setting the {@code jboss.threads.eqe.disable}
127 * property to {@code true} before this class is initialized.
128 */
129 public static final boolean DISABLE_HINT = readBooleanPropertyPrefixed("disable", false);
130
131 /**
132 * Update the tail pointer opportunistically.
133 */
134 static final boolean UPDATE_TAIL = readBooleanPropertyPrefixed("update-tail", false);
135 /**
136 * Update the summary statistics.
137 */
138 static final boolean UPDATE_STATISTICS = readBooleanPropertyPrefixed("statistics", false);
139 /**
140 * Maintain an estimate of the number of threads which are currently doing work on behalf of the thread pool.
141 */
142 static final boolean UPDATE_ACTIVE_COUNT =
143 UPDATE_STATISTICS || readBooleanPropertyPrefixed("statistics.active-count", false);
144 /**
145 * Suppress queue limit and size tracking for performance.
146 */
147 static final boolean NO_QUEUE_LIMIT = readBooleanPropertyPrefixed("unlimited-queue", false);
148 /**
149 * Set the default value for whether an mbean is to be auto-registered for the thread pool.
150 */
151 static final boolean REGISTER_MBEAN = readBooleanPropertyPrefixed("register-mbean", true);
152 /**
153 * Set to enable or disable MBean registration.
154 */
155 static final boolean DISABLE_MBEAN = readBooleanPropertyPrefixed("disable-mbean", readProperty("org.graalvm.nativeimage.imagecode", null) != null);
156 /**
157 * The number of times a thread should spin/yield before actually parking.
158 */
159 static final int PARK_SPINS = readIntPropertyPrefixed("park-spins", ProcessorInfo.availableProcessors() == 1 ? 0 : 1 << 7);
160 /**
161 * The yield ratio expressed as the number of spins that should yield.
162 */
163 static final int YIELD_FACTOR = max(min(readIntPropertyPrefixed("park-yields", 1), PARK_SPINS), 0);
164
165 // =======================================================
166 // Constants
167 // =======================================================
168
169 static final Executor DEFAULT_HANDLER = JBossExecutors.rejectingExecutor();
170
171 // =======================================================
172 // Immutable configuration fields
173 // =======================================================
174
175 /**
176 * The thread factory.
177 */
178 private final ThreadFactory threadFactory;
179 /**
180 * The approximate set of pooled threads.
181 */
182 private final Set<Thread> runningThreads = Collections.newSetFromMap(new ConcurrentHashMap<>());
183 /**
184 * The management bean instance.
185 */
186 private final MXBeanImpl mxBean;
187 /**
188 * The MBean registration handle (if any).
189 */
190 private final Object handle;
191 /**
192 * The access control context of the creating thread.
193 */
194 private final AccessControlContext acc;
195
196 // =======================================================
197 // Current state fields
198 // =======================================================
199
200 /**
201 * The linked list of threads waiting for termination of this thread pool.
202 */
203 @SuppressWarnings("unused") // used by field updater
204 volatile Waiter terminationWaiters;
205
206 /**
207 * Queue size:
208 * <ul>
209 * <li>Bit 00..1F: current queue length</li>
210 * <li>Bit 20..3F: queue limit</li>
211 * </ul>
212 */
213 @SuppressWarnings("unused") // used by field updater
214 volatile long queueSize;
215
216 /**
217 * The thread keep-alive timeout value.
218 */
219 volatile long timeoutNanos;
220
221 /**
222 * A resistance factor applied after the core pool is full; values applied here will cause that fraction
223 * of submissions to create new threads when no idle thread is available. A value of {@code 0.0f} implies that
224 * threads beyond the core size should be created as aggressively as threads within it; a value of {@code 1.0f}
225 * implies that threads beyond the core size should never be created.
226 */
227 volatile float growthResistance;
228
229 /**
230 * The handler for tasks which cannot be accepted by the executor.
231 */
232 volatile Executor handoffExecutor;
233
234 /**
235 * The handler for uncaught exceptions which occur during user tasks.
236 */
237 volatile Thread.UncaughtExceptionHandler exceptionHandler;
238
239 /**
240 * The termination task to execute when the thread pool exits.
241 */
242 volatile Runnable terminationTask;
243
244 // =======================================================
245 // Statistics fields and counters
246 // =======================================================
247
248 /**
249 * The peak number of threads ever created by this pool.
250 */
251 @SuppressWarnings("unused") // used by field updater
252 volatile int peakThreadCount;
253 /**
254 * The approximate peak queue size.
255 */
256 @SuppressWarnings("unused") // used by field updater
257 volatile int peakQueueSize;
258
259 private final LongAdder submittedTaskCounter = new LongAdder();
260 private final LongAdder completedTaskCounter = new LongAdder();
261 private final LongAdder rejectedTaskCounter = new LongAdder();
262 private final LongAdder spinMisses = new LongAdder();
263
264 /**
265 * The current active number of threads.
266 */
267 @SuppressWarnings("unused")
268 volatile int activeCount;
269
270 // =======================================================
271 // Updaters
272 // =======================================================
273
274 private static final long terminationWaitersOffset;
275
276 private static final long queueSizeOffset;
277
278 private static final long peakThreadCountOffset;
279 private static final long activeCountOffset;
280 private static final long peakQueueSizeOffset;
281
282 private static final Object sequenceBase;
283 private static final long sequenceOffset;
284
285 static {
286 try {
287 terminationWaitersOffset = unsafe.objectFieldOffset(EnhancedQueueExecutor.class.getDeclaredField("terminationWaiters"));
288
289 queueSizeOffset = unsafe.objectFieldOffset(EnhancedQueueExecutor.class.getDeclaredField("queueSize"));
290
291 peakThreadCountOffset = unsafe.objectFieldOffset(EnhancedQueueExecutor.class.getDeclaredField("peakThreadCount"));
292 activeCountOffset = unsafe.objectFieldOffset(EnhancedQueueExecutor.class.getDeclaredField("activeCount"));
293 peakQueueSizeOffset = unsafe.objectFieldOffset(EnhancedQueueExecutor.class.getDeclaredField("peakQueueSize"));
294
295 sequenceBase = unsafe.staticFieldBase(EnhancedQueueExecutor.class.getDeclaredField("sequence"));
296 sequenceOffset = unsafe.staticFieldOffset(EnhancedQueueExecutor.class.getDeclaredField("sequence"));
297 } catch (NoSuchFieldException e) {
298 throw new NoSuchFieldError(e.getMessage());
299 }
300 }
301
302 // =======================================================
303 // Thread state field constants
304 // =======================================================
305
306 private static final long TS_THREAD_CNT_MASK = 0b1111_1111_1111_1111_1111L; // 20 bits, can be shifted as needed
307
308 // shift amounts
309 private static final long TS_CURRENT_SHIFT = 0;
310 private static final long TS_CORE_SHIFT = 20;
311 private static final long TS_MAX_SHIFT = 40;
312
313 private static final long TS_ALLOW_CORE_TIMEOUT = 1L << 60;
314 private static final long TS_SHUTDOWN_REQUESTED = 1L << 61;
315 private static final long TS_SHUTDOWN_INTERRUPT = 1L << 62;
316 @SuppressWarnings("NumericOverflow")
317 private static final long TS_SHUTDOWN_COMPLETE = 1L << 63;
318
319 private static final int EXE_OK = 0;
320 private static final int EXE_REJECT_QUEUE_FULL = 1;
321 private static final int EXE_REJECT_SHUTDOWN = 2;
322 private static final int EXE_CREATE_THREAD = 3;
323
324 private static final int AT_YES = 0;
325 private static final int AT_NO = 1;
326 private static final int AT_SHUTDOWN = 2;
327
328 // =======================================================
329 // Marker objects
330 // =======================================================
331
332 static final QNode TERMINATE_REQUESTED = new TerminateWaiterNode();
333 static final QNode TERMINATE_COMPLETE = new TerminateWaiterNode();
334
335 static final Waiter TERMINATE_COMPLETE_WAITER = new Waiter(null);
336
337 static final Runnable WAITING = new NullRunnable();
338 static final Runnable GAVE_UP = new NullRunnable();
339 static final Runnable ACCEPTED = new NullRunnable();
340 static final Runnable EXIT = new NullRunnable();
341
342 // =======================================================
343 // Constructor
344 // =======================================================
345
346 static volatile int sequence = 1;
347
348 EnhancedQueueExecutor(final Builder builder) {
349 super();
350 this.acc = getContext();
351 int maxSize = builder.getMaximumPoolSize();
352 int coreSize = min(builder.getCorePoolSize(), maxSize);
353 this.handoffExecutor = builder.getHandoffExecutor();
354 this.exceptionHandler = builder.getExceptionHandler();
355 this.threadFactory = builder.getThreadFactory();
356 this.terminationTask = builder.getTerminationTask();
357 this.growthResistance = builder.getGrowthResistance();
358 final Duration keepAliveTime = builder.getKeepAliveTime();
359 // initial dead node
360 // thread stat
361 threadStatus = withCoreSize(withMaxSize(withAllowCoreTimeout(0L, builder.allowsCoreThreadTimeOut()), maxSize), coreSize);
362 timeoutNanos = TimeUtil.clampedPositiveNanos(keepAliveTime);
363 queueSize = withMaxQueueSize(withCurrentQueueSize(0L, 0), builder.getMaximumQueueSize());
364 mxBean = new MXBeanImpl();
365 if (! DISABLE_MBEAN && builder.isRegisterMBean()) {
366 final String configuredName = builder.getMBeanName();
367 final String finalName = configuredName != null ? configuredName : "threadpool-" + unsafe.getAndAddInt(sequenceBase, sequenceOffset, 1);
368 handle = doPrivileged(new MBeanRegisterAction(finalName, mxBean), acc);
369 } else {
370 handle = null;
371 }
372 }
373
374 static final class MBeanRegisterAction implements PrivilegedAction<ObjectInstance> {
375 private final String finalName;
376 private final MXBeanImpl mxBean;
377
378 MBeanRegisterAction(final String finalName, final MXBeanImpl mxBean) {
379 this.finalName = finalName;
380 this.mxBean = mxBean;
381 }
382
383 public ObjectInstance run() {
384 try {
385 final Hashtable<String, String> table = new Hashtable<>();
386 table.put("name", ObjectName.quote(finalName));
387 table.put("type", "thread-pool");
388 return ManagementFactory.getPlatformMBeanServer().registerMBean(mxBean, new ObjectName("jboss.threads", table));
389 } catch (Throwable ignored) {
390 }
391 return null;
392 }
393 }
394
395 // =======================================================
396 // Builder
397 // =======================================================
398
399 /**
400 * The builder class for an {@code EnhancedQueueExecutor}. All the fields are initialized to sensible defaults for
401 * a small thread pool.
402 */
403 public static final class Builder {
404 private ThreadFactory threadFactory = Executors.defaultThreadFactory();
405 private Runnable terminationTask = NullRunnable.getInstance();
406 private Executor handoffExecutor = DEFAULT_HANDLER;
407 private Thread.UncaughtExceptionHandler exceptionHandler = JBossExecutors.loggingExceptionHandler();
408 private int coreSize = 16;
409 private int maxSize = 64;
410 private Duration keepAliveTime = Duration.ofSeconds(30);
411 private float growthResistance;
412 private boolean allowCoreTimeOut;
413 private int maxQueueSize = Integer.MAX_VALUE;
414 private boolean registerMBean = REGISTER_MBEAN;
415 private String mBeanName;
416
417 /**
418 * Construct a new instance.
419 */
420 public Builder() {}
421
422 /**
423 * Get the configured thread factory.
424 *
425 * @return the configured thread factory (not {@code null})
426 */
427 public ThreadFactory getThreadFactory() {
428 return threadFactory;
429 }
430
431 /**
432 * Set the configured thread factory.
433 *
434 * @param threadFactory the configured thread factory (must not be {@code null})
435 * @return this builder
436 */
437 public Builder setThreadFactory(final ThreadFactory threadFactory) {
438 Assert.checkNotNullParam("threadFactory", threadFactory);
439 this.threadFactory = threadFactory;
440 return this;
441 }
442
443 /**
444 * Get the termination task. By default, an empty {@code Runnable} is used.
445 *
446 * @return the termination task (not {@code null})
447 */
448 public Runnable getTerminationTask() {
449 return terminationTask;
450 }
451
452 /**
453 * Set the termination task.
454 *
455 * @param terminationTask the termination task (must not be {@code null})
456 * @return this builder
457 */
458 public Builder setTerminationTask(final Runnable terminationTask) {
459 Assert.checkNotNullParam("terminationTask", terminationTask);
460 this.terminationTask = terminationTask;
461 return this;
462 }
463
464 /**
465 * Get the core pool size. This is the size below which new threads will always be created if no idle threads
466 * are available. If the pool size reaches the core size but has not yet reached the maximum size, a resistance
467 * factor will be applied to each task submission which determines whether the task should be queued or a new
468 * thread started.
469 *
470 * @return the core pool size
471 * @see EnhancedQueueExecutor#getCorePoolSize()
472 */
473 public int getCorePoolSize() {
474 return coreSize;
475 }
476
477 /**
478 * Set the core pool size. If the configured maximum pool size is less than the configured core size, the
479 * core size will be reduced to match the maximum size when the thread pool is constructed.
480 *
481 * @param coreSize the core pool size (must be greater than or equal to 0, and less than 2^20)
482 * @return this builder
483 * @see EnhancedQueueExecutor#setCorePoolSize(int)
484 */
485 public Builder setCorePoolSize(final int coreSize) {
486 Assert.checkMinimumParameter("coreSize", 0, coreSize);
487 Assert.checkMaximumParameter("coreSize", TS_THREAD_CNT_MASK, coreSize);
488 this.coreSize = coreSize;
489 return this;
490 }
491
492 /**
493 * Get the maximum pool size. This is the absolute upper limit to the size of the thread pool.
494 *
495 * @return the maximum pool size
496 * @see EnhancedQueueExecutor#getMaximumPoolSize()
497 */
498 public int getMaximumPoolSize() {
499 return maxSize;
500 }
501
502 /**
503 * Set the maximum pool size. If the configured maximum pool size is less than the configured core size, the
504 * core size will be reduced to match the maximum size when the thread pool is constructed.
505 *
506 * @param maxSize the maximum pool size (must be greater than or equal to 0, and less than 2^20)
507 * @return this builder
508 * @see EnhancedQueueExecutor#setMaximumPoolSize(int)
509 */
510 public Builder setMaximumPoolSize(final int maxSize) {
511 Assert.checkMinimumParameter("maxSize", 0, maxSize);
512 Assert.checkMaximumParameter("maxSize", TS_THREAD_CNT_MASK, maxSize);
513 this.maxSize = maxSize;
514 return this;
515 }
516
517 /**
518 * Get the thread keep-alive time. This is the amount of time (in the configured time unit) that idle threads
519 * will wait for a task before exiting.
520 *
521 * @return the thread keep-alive time duration
522 */
523 public Duration getKeepAliveTime() {
524 return keepAliveTime;
525 }
526
527 /**
528 * Get the thread keep-alive time. This is the amount of time (in the configured time unit) that idle threads
529 * will wait for a task before exiting.
530 *
531 * @param keepAliveUnits the time keepAliveUnits of the keep-alive time (must not be {@code null})
532 * @return the thread keep-alive time
533 * @see EnhancedQueueExecutor#getKeepAliveTime(TimeUnit)
534 * @deprecated Use {@link #getKeepAliveTime()} instead.
535 */
536 @Deprecated
537 public long getKeepAliveTime(TimeUnit keepAliveUnits) {
538 Assert.checkNotNullParam("keepAliveUnits", keepAliveUnits);
539 final long secondsPart = keepAliveUnits.convert(keepAliveTime.getSeconds(), TimeUnit.SECONDS);
540 final long nanoPart = keepAliveUnits.convert(keepAliveTime.getNano(), TimeUnit.NANOSECONDS);
541 final long sum = secondsPart + nanoPart;
542 return sum < 0 ? Long.MAX_VALUE : sum;
543 }
544
545 /**
546 * Set the thread keep-alive time.
547 *
548 * @param keepAliveTime the thread keep-alive time (must not be {@code null})
549 */
550 public Builder setKeepAliveTime(final Duration keepAliveTime) {
551 Assert.checkNotNullParam("keepAliveTime", keepAliveTime);
552 this.keepAliveTime = keepAliveTime;
553 return this;
554 }
555
556 /**
557 * Set the thread keep-alive time.
558 *
559 * @param keepAliveTime the thread keep-alive time (must be greater than 0)
560 * @param keepAliveUnits the time keepAliveUnits of the keep-alive time (must not be {@code null})
561 * @return this builder
562 * @see EnhancedQueueExecutor#setKeepAliveTime(long, TimeUnit)
563 * @deprecated Use {@link #setKeepAliveTime(Duration)} instead.
564 */
565 @Deprecated
566 public Builder setKeepAliveTime(final long keepAliveTime, final TimeUnit keepAliveUnits) {
567 Assert.checkMinimumParameter("keepAliveTime", 1L, keepAliveTime);
568 Assert.checkNotNullParam("keepAliveUnits", keepAliveUnits);
569 this.keepAliveTime = Duration.of(keepAliveTime, JDKSpecific.timeToTemporal(keepAliveUnits));
570 return this;
571 }
572
573 /**
574 * Get the thread pool growth resistance. This is the average fraction of submitted tasks that will be enqueued (instead
575 * of causing a new thread to start) when there are no idle threads and the pool size is equal to or greater than
576 * the core size (but still less than the maximum size). A value of {@code 0.0} indicates that tasks should
577 * not be enqueued until the pool is completely full; a value of {@code 1.0} indicates that tasks should always
578 * be enqueued until the queue is completely full.
579 *
580 * @return the thread pool growth resistance
581 * @see EnhancedQueueExecutor#getGrowthResistance()
582 */
583 public float getGrowthResistance() {
584 return growthResistance;
585 }
586
587 /**
588 * Set the thread pool growth resistance.
589 *
590 * @param growthResistance the thread pool growth resistance (must be in the range {@code 0.0f ≤ n ≤ 1.0f})
591 * @return this builder
592 * @see #getGrowthResistance()
593 * @see EnhancedQueueExecutor#setGrowthResistance(float)
594 */
595 public Builder setGrowthResistance(final float growthResistance) {
596 Assert.checkMinimumParameter("growthResistance", 0.0f, growthResistance);
597 Assert.checkMaximumParameter("growthResistance", 1.0f, growthResistance);
598 this.growthResistance = growthResistance;
599 return this;
600 }
601
602 /**
603 * Determine whether core threads are allowed to time out. A "core thread" is defined as any thread in the pool
604 * when the pool size is below the pool's {@linkplain #getCorePoolSize() core pool size}.
605 *
606 * @return {@code true} if core threads are allowed to time out, {@code false} otherwise
607 * @see EnhancedQueueExecutor#allowsCoreThreadTimeOut()
608 */
609 public boolean allowsCoreThreadTimeOut() {
610 return allowCoreTimeOut;
611 }
612
613 /**
614 * Establish whether core threads are allowed to time out. A "core thread" is defined as any thread in the pool
615 * when the pool size is below the pool's {@linkplain #getCorePoolSize() core pool size}.
616 *
617 * @param allowCoreTimeOut {@code true} if core threads are allowed to time out, {@code false} otherwise
618 * @return this builder
619 * @see EnhancedQueueExecutor#allowCoreThreadTimeOut(boolean)
620 */
621 public Builder allowCoreThreadTimeOut(final boolean allowCoreTimeOut) {
622 this.allowCoreTimeOut = allowCoreTimeOut;
623 return this;
624 }
625
626 /**
627 * Get the maximum queue size. If the queue is full and a task cannot be immediately accepted, rejection will result.
628 *
629 * @return the maximum queue size
630 * @see EnhancedQueueExecutor#getMaximumQueueSize()
631 */
632 public int getMaximumQueueSize() {
633 return maxQueueSize;
634 }
635
636 /**
637 * Set the maximum queue size.
638 * This has no impact when {@code jboss.threads.eqe.unlimited-queue} is set.
639 *
640 * @param maxQueueSize the maximum queue size (must be ≥ 0)
641 * @return this builder
642 * @see EnhancedQueueExecutor#setMaximumQueueSize(int)
643 */
644 public Builder setMaximumQueueSize(final int maxQueueSize) {
645 Assert.checkMinimumParameter("maxQueueSize", 0, maxQueueSize);
646 Assert.checkMaximumParameter("maxQueueSize", Integer.MAX_VALUE, maxQueueSize);
647 this.maxQueueSize = maxQueueSize;
648 return this;
649 }
650
651 /**
652 * Get the handoff executor.
653 *
654 * @return the handoff executor (not {@code null})
655 */
656 public Executor getHandoffExecutor() {
657 return handoffExecutor;
658 }
659
660 /**
661 * Set the handoff executor.
662 *
663 * @param handoffExecutor the handoff executor (must not be {@code null})
664 * @return this builder
665 */
666 public Builder setHandoffExecutor(final Executor handoffExecutor) {
667 Assert.checkNotNullParam("handoffExecutor", handoffExecutor);
668 this.handoffExecutor = handoffExecutor;
669 return this;
670 }
671
672 /**
673 * Get the uncaught exception handler.
674 *
675 * @return the uncaught exception handler (not {@code null})
676 */
677 public Thread.UncaughtExceptionHandler getExceptionHandler() {
678 return exceptionHandler;
679 }
680
681 /**
682 * Set the uncaught exception handler.
683 *
684 * @param exceptionHandler the uncaught exception handler (must not be {@code null})
685 * @return this builder
686 */
687 public Builder setExceptionHandler(final Thread.UncaughtExceptionHandler exceptionHandler) {
688 this.exceptionHandler = exceptionHandler;
689 return this;
690 }
691
692 /**
693 * Construct the executor from the configured parameters.
694 *
695 * @return the executor, which will be active and ready to accept tasks (not {@code null})
696 */
697 public EnhancedQueueExecutor build() {
698 return new EnhancedQueueExecutor(this);
699 }
700
701 /**
702 * Determine whether an MBean should automatically be registered for this pool.
703 *
704 * @return {@code true} if an MBean is to be auto-registered; {@code false} otherwise
705 */
706 public boolean isRegisterMBean() {
707 return registerMBean;
708 }
709
710 /**
711 * Establish whether an MBean should automatically be registered for this pool.
712 *
713 * @param registerMBean {@code true} if an MBean is to be auto-registered; {@code false} otherwise
714 * @return this builder
715 */
716 public Builder setRegisterMBean(final boolean registerMBean) {
717 this.registerMBean = registerMBean;
718 return this;
719 }
720
721 /**
722 * Get the overridden MBean name.
723 *
724 * @return the overridden MBean name, or {@code null} if a default name should be generated
725 */
726 public String getMBeanName() {
727 return mBeanName;
728 }
729
730 /**
731 * Set the overridden MBean name.
732 *
733 * @param mBeanName the overridden MBean name, or {@code null} if a default name should be generated
734 * @return this builder
735 */
736 public Builder setMBeanName(final String mBeanName) {
737 this.mBeanName = mBeanName;
738 return this;
739 }
740 }
741
742 // =======================================================
743 // ExecutorService
744 // =======================================================
745
746 /**
747 * Execute a task.
748 *
749 * @param runnable the task to execute (must not be {@code null})
750 */
751 public void execute(Runnable runnable) {
752 Assert.checkNotNullParam("runnable", runnable);
753 final Runnable realRunnable = JBossExecutors.classLoaderPreservingTaskUnchecked(runnable);
754 int result;
755 result = tryExecute(realRunnable);
756 boolean ok = false;
757 if (result == EXE_OK) {
758 // last check to ensure that there is at least one existent thread to avoid rare thread timeout race condition
759 if (currentSizeOf(threadStatus) == 0 && tryAllocateThread(0.0f) == AT_YES && ! doStartThread(null)) {
760 deallocateThread();
761 }
762 if (UPDATE_STATISTICS) submittedTaskCounter.increment();
763 return;
764 } else if (result == EXE_CREATE_THREAD) try {
765 ok = doStartThread(realRunnable);
766 } finally {
767 if (! ok) deallocateThread();
768 } else {
769 if (UPDATE_STATISTICS) rejectedTaskCounter.increment();
770 if (result == EXE_REJECT_SHUTDOWN) {
771 rejectShutdown(realRunnable);
772 } else {
773 assert result == EXE_REJECT_QUEUE_FULL;
774 rejectQueueFull(realRunnable);
775 }
776 }
777 }
778
779 /**
780 * Request that shutdown be initiated for this thread pool. This is equivalent to calling
781 * {@link #shutdown(boolean) shutdown(false)}; see that method for more information.
782 */
783 public void shutdown() {
784 shutdown(false);
785 }
786
787 /**
788 * Attempt to stop the thread pool immediately by interrupting all running threads and de-queueing all pending
789 * tasks. The thread pool might not be fully stopped when this method returns, if a currently running task
790 * does not respect interruption.
791 *
792 * @return a list of pending tasks (not {@code null})
793 */
794 public List<Runnable> shutdownNow() {
795 shutdown(true);
796 final ArrayList<Runnable> list = new ArrayList<>();
797 TaskNode head = this.head;
798 QNode headNext;
799 for (;;) {
800 headNext = head.getNext();
801 if (headNext instanceof TaskNode) {
802 TaskNode taskNode = (TaskNode) headNext;
803 if (compareAndSetHead(head, taskNode)) {
804 if (! NO_QUEUE_LIMIT) decreaseQueueSize();
805 head = taskNode;
806 list.add(taskNode.task);
807 }
808 // retry
809 } else {
810 // no more tasks;
811 return list;
812 }
813 }
814 }
815
816 /**
817 * Determine whether shutdown was requested on this thread pool.
818 *
819 * @return {@code true} if shutdown was requested, {@code false} otherwise
820 */
821 public boolean isShutdown() {
822 return isShutdownRequested(threadStatus);
823 }
824
825 /**
826 * Determine whether shutdown has completed on this thread pool.
827 *
828 * @return {@code true} if shutdown has completed, {@code false} otherwise
829 */
830 public boolean isTerminated() {
831 return isShutdownComplete(threadStatus);
832 }
833
834 /**
835 * Wait for the thread pool to complete termination. If the timeout expires before the thread pool is terminated,
836 * {@code false} is returned. If the calling thread is interrupted before the thread pool is terminated, then
837 * an {@link InterruptedException} is thrown.
838 *
839 * @param timeout the amount of time to wait (must be ≥ 0)
840 * @param unit the unit of time to use for waiting (must not be {@code null})
841 * @return {@code true} if the thread pool terminated within the given timeout, {@code false} otherwise
842 * @throws InterruptedException if the calling thread was interrupted before either the time period elapsed or the pool terminated successfully
843 */
844 public boolean awaitTermination(final long timeout, final TimeUnit unit) throws InterruptedException {
845 Assert.checkMinimumParameter("timeout", 0, timeout);
846 Assert.checkNotNullParam("unit", unit);
847 if (timeout > 0) {
848 final Thread thread = Thread.currentThread();
849 if (runningThreads.contains(thread)) {
850 throw Messages.msg.cannotAwaitWithin();
851 }
852 Waiter waiters = this.terminationWaiters;
853 if (waiters == TERMINATE_COMPLETE_WAITER) {
854 return true;
855 }
856 final Waiter waiter = new Waiter(waiters);
857 waiter.setThread(currentThread());
858 while (! compareAndSetTerminationWaiters(waiters, waiter)) {
859 waiters = this.terminationWaiters;
860 if (waiters == TERMINATE_COMPLETE_WAITER) {
861 return true;
862 }
863 waiter.setNext(waiters);
864 }
865 try {
866 parkNanos(this, unit.toNanos(timeout));
867 } finally {
868 // prevent future spurious unparks without sabotaging the queue's integrity
869 waiter.setThread(null);
870 }
871 }
872 if (Thread.interrupted()) throw new InterruptedException();
873 return isTerminated();
874 }
875
876 // =======================================================
877 // Management
878 // =======================================================
879
880 public StandardThreadPoolMXBean getThreadPoolMXBean() {
881 return mxBean;
882 }
883
884 // =======================================================
885 // Other public API
886 // =======================================================
887
888 /**
889 * Initiate shutdown of this thread pool. After this method is called, no new tasks will be accepted. Once
890 * the last task is complete, the thread pool will be terminated and its
891 * {@linkplain Builder#setTerminationTask(Runnable) termination task}
892 * will be called. Calling this method more than once has no additional effect, unless all previous calls
893 * had the {@code interrupt} parameter set to {@code false} and the subsequent call sets it to {@code true}, in
894 * which case all threads in the pool will be interrupted.
895 *
896 * @param interrupt {@code true} to request that currently-running tasks be interrupted; {@code false} otherwise
897 */
898 public void shutdown(boolean interrupt) {
899 long oldStatus, newStatus;
900 // state change sh1:
901 // threadStatus ← threadStatus | shutdown
902 // succeeds: -
903 // preconditions:
904 // none (thread status may be in any state)
905 // postconditions (succeed):
906 // (always) ShutdownRequested set in threadStatus
907 // (maybe) ShutdownInterrupted set in threadStatus (depends on parameter)
908 // (maybe) ShutdownComplete set in threadStatus (if currentSizeOf() == 0)
909 // (maybe) exit if no change
910 // postconditions (fail): -
911 // post-actions (fail):
912 // repeat state change until success or return
913 do {
914 oldStatus = threadStatus;
915 newStatus = withShutdownRequested(oldStatus);
916 if (interrupt) newStatus = withShutdownInterrupt(newStatus);
917 if (currentSizeOf(oldStatus) == 0) newStatus = withShutdownComplete(newStatus);
918 if (newStatus == oldStatus) return;
919 } while (! compareAndSetThreadStatus(oldStatus, newStatus));
920 assert oldStatus != newStatus;
921 if (isShutdownRequested(newStatus) != isShutdownRequested(oldStatus)) {
922 assert ! isShutdownRequested(oldStatus); // because it can only ever be set, not cleared
923 // we initiated shutdown
924 // clear out all consumers and append a dummy waiter node
925 TaskNode tail = this.tail;
926 QNode tailNext;
927 // a marker to indicate that termination was requested
928 for (;;) {
929 tailNext = tail.getNext();
930 if (tailNext instanceof TaskNode) {
931 tail = (TaskNode) tailNext;
932 } else if (tailNext instanceof PoolThreadNode || tailNext == null) {
933 // remove the entire chain from this point
934 PoolThreadNode node = (PoolThreadNode) tailNext;
935 // state change sh2:
936 // tail.next ← terminateNode(null)
937 // succeeds: sh1
938 // preconditions:
939 // threadStatus contains shutdown
940 // tail(snapshot) is a task node
941 // tail(snapshot).next is a (list of) pool thread node(s) or null
942 // postconditions (succeed):
943 // tail(snapshot).next is TERMINATE_REQUESTED
944 if (tail.compareAndSetNext(node, TERMINATE_REQUESTED)) {
945 // got it!
946 // state change sh3:
947 // node.task ← EXIT
948 // preconditions:
949 // none (node task may be in any state)
950 // postconditions (succeed):
951 // task is EXIT
952 // postconditions (fail):
953 // no change (repeat loop)
954 while (node != null) {
955 node.compareAndSetTask(WAITING, EXIT);
956 node.unpark();
957 node = node.getNext();
958 }
959 // success; exit loop
960 break;
961 }
962 // repeat loop (failed CAS)
963 } else if (tailNext instanceof TerminateWaiterNode) {
964 // theoretically impossible, but it means we have nothing else to do
965 break;
966 } else {
967 throw Assert.unreachableCode();
968 }
969 }
970 }
971 if (isShutdownInterrupt(newStatus) != isShutdownInterrupt(oldStatus)) {
972 assert ! isShutdownInterrupt(oldStatus); // because it can only ever be set, not cleared
973 // we initiated interrupt, so interrupt all currently active threads
974 for (Thread thread : runningThreads) {
975 thread.interrupt();
976 }
977 }
978 if (isShutdownComplete(newStatus) != isShutdownComplete(oldStatus)) {
979 assert ! isShutdownComplete(oldStatus); // because it can only ever be set, not cleared
980 completeTermination();
981 }
982 }
983
984 /**
985 * Determine if this thread pool is in the process of terminating but has not yet completed.
986 *
987 * @return {@code true} if the thread pool is terminating, or {@code false} if the thread pool is not terminating or has completed termination
988 */
989 public boolean isTerminating() {
990 final long threadStatus = this.threadStatus;
991 return isShutdownRequested(threadStatus) && ! isShutdownComplete(threadStatus);
992 }
993
994 /**
995 * Start an idle core thread. Normally core threads are only begun when a task was submitted to the executor
996 * but no thread is immediately available to handle the task.
997 *
998 * @return {@code true} if a core thread was started, or {@code false} if all core threads were already running or
999 * if the thread failed to be created for some other reason
1000 */
1001 public boolean prestartCoreThread() {
1002 if (tryAllocateThread(1.0f) != AT_YES) return false;
1003 if (doStartThread(null)) return true;
1004 deallocateThread();
1005 return false;
1006 }
1007
1008 /**
1009 * Start all core threads. Calls {@link #prestartCoreThread()} in a loop until it returns {@code false}.
1010 *
1011 * @return the number of core threads created
1012 */
1013 public int prestartAllCoreThreads() {
1014 int cnt = 0;
1015 while (prestartCoreThread()) cnt ++;
1016 return cnt;
1017 }
1018
1019 // =======================================================
1020 // Tuning & configuration parameters (run time)
1021 // =======================================================
1022
1023 /**
1024 * Get the thread pool growth resistance. This is the average fraction of submitted tasks that will be enqueued (instead
1025 * of causing a new thread to start) when there are no idle threads and the pool size is equal to or greater than
1026 * the core size (but still less than the maximum size). A value of {@code 0.0} indicates that tasks should
1027 * not be enqueued until the pool is completely full; a value of {@code 1.0} indicates that tasks should always
1028 * be enqueued until the queue is completely full.
1029 *
1030 * @return the configured growth resistance factor
1031 * @see Builder#getGrowthResistance() Builder.getGrowthResistance()
1032 */
1033 public float getGrowthResistance() {
1034 return growthResistance;
1035 }
1036
1037 /**
1038 * Set the growth resistance factor.
1039 *
1040 * @param growthResistance the thread pool growth resistance (must be in the range {@code 0.0f ≤ n ≤ 1.0f})
1041 * @see Builder#setGrowthResistance(float) Builder.setGrowthResistance()
1042 */
1043 public void setGrowthResistance(final float growthResistance) {
1044 Assert.checkMinimumParameter("growthResistance", 0.0f, growthResistance);
1045 Assert.checkMaximumParameter("growthResistance", 1.0f, growthResistance);
1046 this.growthResistance = growthResistance;
1047 }
1048
1049 /**
1050 * Get the core pool size. This is the size below which new threads will always be created if no idle threads
1051 * are available. If the pool size reaches the core size but has not yet reached the maximum size, a resistance
1052 * factor will be applied to each task submission which determines whether the task should be queued or a new
1053 * thread started.
1054 *
1055 * @return the core pool size
1056 * @see Builder#getCorePoolSize() Builder.getCorePoolSize()
1057 */
1058 public int getCorePoolSize() {
1059 return coreSizeOf(threadStatus);
1060 }
1061
1062 /**
1063 * Set the core pool size. If the configured maximum pool size is less than the configured core size, the
1064 * core size will be reduced to match the maximum size when the thread pool is constructed.
1065 *
1066 * @param corePoolSize the core pool size (must be greater than or equal to 0, and less than 2^20)
1067 * @see Builder#setCorePoolSize(int) Builder.setCorePoolSize()
1068 */
1069 public void setCorePoolSize(final int corePoolSize) {
1070 Assert.checkMinimumParameter("corePoolSize", 0, corePoolSize);
1071 Assert.checkMaximumParameter("corePoolSize", TS_THREAD_CNT_MASK, corePoolSize);
1072 long oldVal, newVal;
1073 do {
1074 oldVal = threadStatus;
1075 if (corePoolSize > maxSizeOf(oldVal)) {
1076 // automatically bump up max size to match
1077 newVal = withCoreSize(withMaxSize(oldVal, corePoolSize), corePoolSize);
1078 } else {
1079 newVal = withCoreSize(oldVal, corePoolSize);
1080 }
1081 } while (! compareAndSetThreadStatus(oldVal, newVal));
1082 if (maxSizeOf(newVal) < maxSizeOf(oldVal) || coreSizeOf(newVal) < coreSizeOf(oldVal)) {
1083 // poke all the threads to try to terminate any excess eagerly
1084 for (Thread activeThread : runningThreads) {
1085 unpark(activeThread);
1086 }
1087 }
1088 }
1089
1090 /**
1091 * Get the maximum pool size. This is the absolute upper limit to the size of the thread pool.
1092 *
1093 * @return the maximum pool size
1094 * @see Builder#getMaximumPoolSize() Builder.getMaximumPoolSize()
1095 */
1096 public int getMaximumPoolSize() {
1097 return maxSizeOf(threadStatus);
1098 }
1099
1100 /**
1101 * Set the maximum pool size. If the configured maximum pool size is less than the configured core size, the
1102 * core size will be reduced to match the maximum size when the thread pool is constructed.
1103 *
1104 * @param maxPoolSize the maximum pool size (must be greater than or equal to 0, and less than 2^20)
1105 * @see Builder#setMaximumPoolSize(int) Builder.setMaximumPoolSize()
1106 */
1107 public void setMaximumPoolSize(final int maxPoolSize) {
1108 Assert.checkMinimumParameter("maxPoolSize", 0, maxPoolSize);
1109 Assert.checkMaximumParameter("maxPoolSize", TS_THREAD_CNT_MASK, maxPoolSize);
1110 long oldVal, newVal;
1111 do {
1112 oldVal = threadStatus;
1113 if (maxPoolSize < coreSizeOf(oldVal)) {
1114 // automatically bump down core size to match
1115 newVal = withCoreSize(withMaxSize(oldVal, maxPoolSize), maxPoolSize);
1116 } else {
1117 newVal = withMaxSize(oldVal, maxPoolSize);
1118 }
1119 } while (! compareAndSetThreadStatus(oldVal, newVal));
1120 if (maxSizeOf(newVal) < maxSizeOf(oldVal) || coreSizeOf(newVal) < coreSizeOf(oldVal)) {
1121 // poke all the threads to try to terminate any excess eagerly
1122 for (Thread activeThread : runningThreads) {
1123 unpark(activeThread);
1124 }
1125 }
1126 }
1127
1128 /**
1129 * Determine whether core threads are allowed to time out. A "core thread" is defined as any thread in the pool
1130 * when the pool size is below the pool's {@linkplain #getCorePoolSize() core pool size}.
1131 *
1132 * @return {@code true} if core threads are allowed to time out, {@code false} otherwise
1133 * @see Builder#allowsCoreThreadTimeOut() Builder.allowsCoreThreadTimeOut()
1134 */
1135 public boolean allowsCoreThreadTimeOut() {
1136 return isAllowCoreTimeout(threadStatus);
1137 }
1138
1139 /**
1140 * Establish whether core threads are allowed to time out. A "core thread" is defined as any thread in the pool
1141 * when the pool size is below the pool's {@linkplain #getCorePoolSize() core pool size}.
1142 *
1143 * @param value {@code true} if core threads are allowed to time out, {@code false} otherwise
1144 * @see Builder#allowCoreThreadTimeOut(boolean) Builder.allowCoreThreadTimeOut()
1145 */
1146 public void allowCoreThreadTimeOut(boolean value) {
1147 long oldVal, newVal;
1148 do {
1149 oldVal = threadStatus;
1150 newVal = withAllowCoreTimeout(oldVal, value);
1151 if (oldVal == newVal) return;
1152 } while (! compareAndSetThreadStatus(oldVal, newVal));
1153 if (value) {
1154 // poke all the threads to try to terminate any excess eagerly
1155 for (Thread activeThread : runningThreads) {
1156 unpark(activeThread);
1157 }
1158 }
1159 }
1160
1161 /**
1162 * Get the thread keep-alive time. This is the minimum length of time that idle threads should remain until they exit.
1163 * Unless core threads are allowed to time out, threads will only exit if the current thread count exceeds the core
1164 * limit.
1165 *
1166 * @param keepAliveUnits the unit in which the result should be expressed (must not be {@code null})
1167 * @return the amount of time (will be greater than zero)
1168 * @see Builder#getKeepAliveTime(TimeUnit) Builder.getKeepAliveTime()
1169 * @deprecated Use {@link #getKeepAliveTime()} instead.
1170 */
1171 @Deprecated
1172 public long getKeepAliveTime(TimeUnit keepAliveUnits) {
1173 Assert.checkNotNullParam("keepAliveUnits", keepAliveUnits);
1174 return keepAliveUnits.convert(timeoutNanos, TimeUnit.NANOSECONDS);
1175 }
1176
1177 /**
1178 * Get the thread keep-alive time. This is the minimum length of time that idle threads should remain until they exit.
1179 * Unless core threads are allowed to time out, threads will only exit if the current thread count exceeds the core
1180 * limit.
1181 *
1182 * @return the amount of time (will be greater than zero)
1183 * @see Builder#getKeepAliveTime() Builder.getKeepAliveTime()
1184 */
1185 public Duration getKeepAliveTime() {
1186 return Duration.of(timeoutNanos, ChronoUnit.NANOS);
1187 }
1188
1189 /**
1190 * Set the thread keep-alive time. This is the minimum length of time that idle threads should remain until they exit.
1191 * Unless core threads are allowed to time out, threads will only exit if the current thread count exceeds the core
1192 * limit.
1193 *
1194 * @param keepAliveTime the thread keep-alive time (must be > 0)
1195 * @param keepAliveUnits the unit in which the value is expressed (must not be {@code null})
1196 * @see Builder#setKeepAliveTime(long, TimeUnit) Builder.setKeepAliveTime()
1197 * @deprecated Use {@link #setKeepAliveTime(Duration)} instead.
1198 */
1199 @Deprecated
1200 public void setKeepAliveTime(final long keepAliveTime, final TimeUnit keepAliveUnits) {
1201 Assert.checkMinimumParameter("keepAliveTime", 1L, keepAliveTime);
1202 Assert.checkNotNullParam("keepAliveUnits", keepAliveUnits);
1203 timeoutNanos = max(1L, keepAliveUnits.toNanos(keepAliveTime));
1204 }
1205
1206 /**
1207 * Set the thread keep-alive time. This is the minimum length of time that idle threads should remain until they exit.
1208 * Unless core threads are allowed to time out, threads will only exit if the current thread count exceeds the core
1209 * limit.
1210 *
1211 * @param keepAliveTime the thread keep-alive time (must not be {@code null})
1212 * @see Builder#setKeepAliveTime(Duration) Builder.setKeepAliveTime()
1213 */
1214 public void setKeepAliveTime(final Duration keepAliveTime) {
1215 Assert.checkNotNullParam("keepAliveTime", keepAliveTime);
1216 timeoutNanos = TimeUtil.clampedPositiveNanos(keepAliveTime);
1217 }
1218
1219 /**
1220 * Get the maximum queue size. If the queue is full and a task cannot be immediately accepted, rejection will result.
1221 *
1222 * @return the maximum queue size
1223 * @see Builder#getMaximumQueueSize() Builder.getMaximumQueueSize()
1224 */
1225 public int getMaximumQueueSize() {
1226 return maxQueueSizeOf(queueSize);
1227 }
1228
1229 /**
1230 * Set the maximum queue size. If the new maximum queue size is smaller than the current queue size, there is no
1231 * effect other than preventing tasks from being enqueued until the size decreases below the maximum again.
1232 *
1233 * @param maxQueueSize the maximum queue size (must be ≥ 0)
1234 * @see Builder#setMaximumQueueSize(int) Builder.setMaximumQueueSize()
1235 */
1236 public void setMaximumQueueSize(final int maxQueueSize) {
1237 Assert.checkMinimumParameter("maxQueueSize", 0, maxQueueSize);
1238 Assert.checkMaximumParameter("maxQueueSize", Integer.MAX_VALUE, maxQueueSize);
1239 if (NO_QUEUE_LIMIT) return;
1240 long oldVal;
1241 do {
1242 oldVal = queueSize;
1243 } while (! compareAndSetQueueSize(oldVal, withMaxQueueSize(oldVal, maxQueueSize)));
1244 }
1245
1246 /**
1247 * Get the executor to delegate to in the event of task rejection.
1248 *
1249 * @return the executor to delegate to in the event of task rejection (not {@code null})
1250 */
1251 public Executor getHandoffExecutor() {
1252 return handoffExecutor;
1253 }
1254
1255 /**
1256 * Set the executor to delegate to in the event of task rejection.
1257 *
1258 * @param handoffExecutor the executor to delegate to in the event of task rejection (must not be {@code null})
1259 */
1260 public void setHandoffExecutor(final Executor handoffExecutor) {
1261 Assert.checkNotNullParam("handoffExecutor", handoffExecutor);
1262 this.handoffExecutor = handoffExecutor;
1263 }
1264
1265 /**
1266 * Get the exception handler to use for uncaught exceptions.
1267 *
1268 * @return the exception handler to use for uncaught exceptions (not {@code null})
1269 */
1270 public Thread.UncaughtExceptionHandler getExceptionHandler() {
1271 return exceptionHandler;
1272 }
1273
1274 /**
1275 * Set the exception handler to use for uncaught exceptions.
1276 *
1277 * @param exceptionHandler the exception handler to use for uncaught exceptions (must not be {@code null})
1278 */
1279 public void setExceptionHandler(final Thread.UncaughtExceptionHandler exceptionHandler) {
1280 Assert.checkNotNullParam("exceptionHandler", exceptionHandler);
1281 this.exceptionHandler = exceptionHandler;
1282 }
1283
1284 /**
1285 * Set the termination task, overwriting any previous setting.
1286 *
1287 * @param terminationTask the termination task, or {@code null} to perform no termination task
1288 */
1289 public void setTerminationTask(final Runnable terminationTask) {
1290 this.terminationTask = terminationTask;
1291 }
1292
1293 // =======================================================
1294 // Statistics & metrics API
1295 // =======================================================
1296
1297 /**
1298 * Get an estimate of the current queue size.
1299 *
1300 * @return an estimate of the current queue size or -1 when {@code jboss.threads.eqe.unlimited-queue} is enabled
1301 */
1302 public int getQueueSize() {
1303 return NO_QUEUE_LIMIT ? -1 : currentQueueSizeOf(queueSize);
1304 }
1305
1306 /**
1307 * Get an estimate of the peak number of threads that the pool has ever held.
1308 *
1309 * @return an estimate of the peak number of threads that the pool has ever held
1310 */
1311 public int getLargestPoolSize() {
1312 return UPDATE_STATISTICS ? peakThreadCount : -1;
1313 }
1314
1315 /**
1316 * Get an estimate of the number of threads which are currently doing work on behalf of the thread pool.
1317 *
1318 * @return the active count estimate or -1 when {@code jboss.threads.eqe.statistics.active-count} is disabled
1319 */
1320 public int getActiveCount() {
1321 return UPDATE_ACTIVE_COUNT ? activeCount : -1;
1322 }
1323
1324 /**
1325 * Get an estimate of the peak size of the queue.
1326 *
1327 * return an estimate of the peak size of the queue or -1 when {@code jboss.threads.eqe.statistics}
1328 * is disabled or {@code jboss.threads.eqe.unlimited-queue} is enabled
1329 */
1330 public int getLargestQueueSize() {
1331 return UPDATE_STATISTICS && !NO_QUEUE_LIMIT ? peakQueueSize : -1;
1332 }
1333
1334 /**
1335 * Get an estimate of the total number of tasks ever submitted to this thread pool.
1336 *
1337 * @return an estimate of the total number of tasks ever submitted to this thread pool
1338 * or -1 when {@code jboss.threads.eqe.statistics} is disabled
1339 */
1340 public long getSubmittedTaskCount() {
1341 return UPDATE_STATISTICS ? submittedTaskCounter.longValue() : -1;
1342 }
1343
1344 /**
1345 * Get an estimate of the total number of tasks ever rejected by this thread pool for any reason.
1346 *
1347 * @return an estimate of the total number of tasks ever rejected by this thread pool
1348 * or -1 when {@code jboss.threads.eqe.statistics} is disabled
1349 */
1350 public long getRejectedTaskCount() {
1351 return UPDATE_STATISTICS ? rejectedTaskCounter.longValue() : -1;
1352 }
1353
1354 /**
1355 * Get an estimate of the number of tasks completed by this thread pool.
1356 *
1357 * @return an estimate of the number of tasks completed by this thread pool
1358 * or -1 when {@code jboss.threads.eqe.statistics} is disabled
1359 */
1360 public long getCompletedTaskCount() {
1361 return UPDATE_STATISTICS ? completedTaskCounter.longValue() : -1;
1362 }
1363
1364 /**
1365 * Get an estimate of the current number of active threads in the pool.
1366 *
1367 * @return an estimate of the current number of active threads in the pool
1368 */
1369 public int getPoolSize() {
1370 return currentSizeOf(threadStatus);
1371 }
1372
1373 /**
1374 * Get an array containing an approximate snapshot of the currently running threads in
1375 * this executor.
1376 *
1377 * @return an array of running (unterminated) threads (not {@code null})
1378 */
1379 public Thread[] getRunningThreads() {
1380 return runningThreads.toArray(NO_THREADS);
1381 }
1382
1383 static class MBeanUnregisterAction implements PrivilegedAction<Void> {
1384 static void forceInit() {
1385 }
1386
1387 private final Object handle;
1388
1389 MBeanUnregisterAction(final Object handle) {
1390 this.handle = handle;
1391 }
1392
1393 public Void run() {
1394 try {
1395 ManagementFactory.getPlatformMBeanServer().unregisterMBean(((ObjectInstance) handle).getObjectName());
1396 } catch (Throwable ignored) {
1397 }
1398 return null;
1399 }
1400 }
1401
1402 // =======================================================
1403 // Pooled thread body
1404 // =======================================================
1405
1406 final class ThreadBody implements Runnable {
1407 private Runnable initialTask;
1408
1409 ThreadBody(final Runnable initialTask) {
1410 this.initialTask = initialTask;
1411 }
1412
1413 /**
1414 * Execute the body of the thread. On entry the thread is added to the {@link #runningThreads} set, and on
1415 * exit it is removed.
1416 */
1417 public void run() {
1418 final Thread currentThread = Thread.currentThread();
1419 final LongAdder spinMisses = EnhancedQueueExecutor.this.spinMisses;
1420 runningThreads.add(currentThread);
1421
1422 // run the initial task
1423 doRunTask(getAndClearInitialTask());
1424
1425 // Eagerly allocate a PoolThreadNode for the next time it's needed
1426 PoolThreadNode nextPoolThreadNode = new PoolThreadNode(currentThread);
1427 // main loop
1428 QNode node;
1429 processingQueue: for (;;) {
1430 node = getOrAddNode(nextPoolThreadNode);
1431 if (node instanceof TaskNode) {
1432 // task node was removed
1433 doRunTask(((TaskNode) node).getAndClearTask());
1434 continue;
1435 } else if (node == nextPoolThreadNode) {
1436 // pool thread node was added
1437 final PoolThreadNode newNode = nextPoolThreadNode;
1438 // nextPoolThreadNode has been added to the queue, a new node is required for next time.
1439 nextPoolThreadNode = new PoolThreadNode(currentThread);
1440 // at this point, we are registered into the queue
1441 long start = System.nanoTime();
1442 long elapsed = 0L;
1443 waitingForTask: for (;;) {
1444 Runnable task = newNode.getTask();
1445 assert task != ACCEPTED && task != GAVE_UP;
1446 if (task != WAITING && task != EXIT) {
1447 if (newNode.compareAndSetTask(task, ACCEPTED)) {
1448 // we have a task to run, so run it and then abandon the node
1449 doRunTask(task);
1450 // rerun outer
1451 continue processingQueue;
1452 }
1453 // we had a task to run, but we failed to CAS it for some reason, so retry
1454 if (UPDATE_STATISTICS) spinMisses.increment();
1455 continue waitingForTask;
1456 } else {
1457 final long timeoutNanos = EnhancedQueueExecutor.this.timeoutNanos;
1458 long oldVal = threadStatus;
1459 if (elapsed >= timeoutNanos || task == EXIT || currentSizeOf(oldVal) > maxSizeOf(oldVal)) {
1460 // try to exit this thread, if we are allowed
1461 if (task == EXIT ||
1462 isShutdownRequested(oldVal) ||
1463 isAllowCoreTimeout(oldVal) ||
1464 currentSizeOf(oldVal) > coreSizeOf(oldVal)
1465 ) {
1466 if (newNode.compareAndSetTask(task, GAVE_UP)) {
1467 for (;;) {
1468 if (tryDeallocateThread(oldVal)) {
1469 // clear to exit.
1470 runningThreads.remove(currentThread);
1471 return;
1472 }
1473 if (UPDATE_STATISTICS) spinMisses.increment();
1474 oldVal = threadStatus;
1475 }
1476 //throw Assert.unreachableCode();
1477 }
1478 continue waitingForTask;
1479 } else {
1480 if (elapsed >= timeoutNanos) {
1481 newNode.park(EnhancedQueueExecutor.this);
1482 } else {
1483 newNode.park(EnhancedQueueExecutor.this, timeoutNanos - elapsed);
1484 }
1485 Thread.interrupted();
1486 elapsed = System.nanoTime() - start;
1487 // retry inner
1488 continue waitingForTask;
1489 }
1490 //throw Assert.unreachableCode();
1491 } else {
1492 assert task == WAITING;
1493 newNode.park(EnhancedQueueExecutor.this, timeoutNanos - elapsed);
1494 Thread.interrupted();
1495 elapsed = System.nanoTime() - start;
1496 // retry inner
1497 continue waitingForTask;
1498 }
1499 //throw Assert.unreachableCode();
1500 }
1501 //throw Assert.unreachableCode();
1502 } // :waitingForTask
1503 //throw Assert.unreachableCode();
1504 } else {
1505 assert node instanceof TerminateWaiterNode;
1506 // we're shutting down!
1507 runningThreads.remove(currentThread);
1508 deallocateThread();
1509 return;
1510 }
1511 //throw Assert.unreachableCode();
1512 } // :processingQueue
1513 //throw Assert.unreachableCode();
1514 }
1515
1516 private QNode getOrAddNode(PoolThreadNode nextPoolThreadNode) {
1517 TaskNode head;
1518 QNode headNext;
1519 for (;;) {
1520 head = EnhancedQueueExecutor.this.head;
1521 headNext = head.getNext();
1522 if (headNext instanceof TaskNode) {
1523 TaskNode taskNode = (TaskNode) headNext;
1524 if (compareAndSetHead(head, taskNode)) {
1525 if (! NO_QUEUE_LIMIT) decreaseQueueSize();
1526 return taskNode;
1527 }
1528 } else if (headNext instanceof PoolThreadNode || headNext == null) {
1529 nextPoolThreadNode.setNextRelaxed(headNext);
1530 if (head.compareAndSetNext(headNext, nextPoolThreadNode)) {
1531 return nextPoolThreadNode;
1532 }
1533 } else {
1534 assert headNext instanceof TerminateWaiterNode;
1535 return headNext;
1536 }
1537 if (UPDATE_STATISTICS) spinMisses.increment();
1538 JDKSpecific.onSpinWait();
1539 }
1540 }
1541
1542 private Runnable getAndClearInitialTask() {
1543 try {
1544 return initialTask;
1545 } finally {
1546 this.initialTask = null;
1547 }
1548 }
1549
1550 void doRunTask(final Runnable task) {
1551 if (task != null) {
1552 if (isShutdownInterrupt(threadStatus)) {
1553 Thread.currentThread().interrupt();
1554 } else {
1555 Thread.interrupted();
1556 }
1557 if (UPDATE_ACTIVE_COUNT) incrementActiveCount();
1558 safeRun(task);
1559 if (UPDATE_ACTIVE_COUNT) {
1560 decrementActiveCount();
1561 if (UPDATE_STATISTICS) {
1562 completedTaskCounter.increment();
1563 }
1564 }
1565 }
1566 }
1567 }
1568
1569 // =======================================================
1570 // Thread starting
1571 // =======================================================
1572
1573 /**
1574 * Allocate a new thread.
1575 *
1576 * @param growthResistance the growth resistance to apply
1577 * @return {@code AT_YES} if a thread is allocated; {@code AT_NO} if a thread was not allocated; {@code AT_SHUTDOWN} if the pool is being shut down
1578 */
1579 int tryAllocateThread(final float growthResistance) {
1580 int oldSize;
1581 long oldStat;
1582 for (;;) {
1583 oldStat = threadStatus;
1584 if (isShutdownRequested(oldStat)) {
1585 return AT_SHUTDOWN;
1586 }
1587 oldSize = currentSizeOf(oldStat);
1588 if (oldSize >= maxSizeOf(oldStat)) {
1589 // max threads already reached
1590 return AT_NO;
1591 }
1592 if (oldSize >= coreSizeOf(oldStat) && oldSize > 0) {
1593 // core threads are reached; check resistance factor (if no threads are running, then always start a thread)
1594 if (growthResistance != 0.0f && (growthResistance == 1.0f || ThreadLocalRandom.current().nextFloat() < growthResistance)) {
1595 // do not create a thread this time
1596 return AT_NO;
1597 }
1598 }
1599 // try to increase
1600 final int newSize = oldSize + 1;
1601 // state change ex3:
1602 // threadStatus.size ← threadStatus(snapshot).size + 1
1603 // cannot succeed: sh1
1604 // succeeds: -
1605 // preconditions:
1606 // ! threadStatus(snapshot).shutdownRequested
1607 // threadStatus(snapshot).size < threadStatus(snapshot).maxSize
1608 // threadStatus(snapshot).size < threadStatus(snapshot).coreSize || random < growthResistance
1609 // post-actions (fail):
1610 // retry whole loop
1611 if (compareAndSetThreadStatus(oldStat, withCurrentSize(oldStat, newSize))) {
1612 // increment peak thread count
1613 if (UPDATE_STATISTICS) {
1614 int oldVal;
1615 do {
1616 oldVal = peakThreadCount;
1617 if (oldVal >= newSize) break;
1618 } while (! compareAndSetPeakThreadCount(oldVal, newSize));
1619 }
1620 return AT_YES;
1621 }
1622 if (UPDATE_STATISTICS) spinMisses.increment();
1623 }
1624 }
1625
1626 /**
1627 * Roll back a thread allocation, possibly terminating the pool. Only call after {@link #tryAllocateThread(float)} returns {@link #AT_YES}.
1628 */
1629 void deallocateThread() {
1630 long oldStat;
1631 do {
1632 oldStat = threadStatus;
1633 } while (! tryDeallocateThread(oldStat));
1634 }
1635
1636 /**
1637 * Try to roll back a thread allocation, possibly running the termination task if the pool would be terminated
1638 * by last thread exit.
1639 *
1640 * @param oldStat the {@code threadStatus} to CAS
1641 * @return {@code true} if the thread was deallocated, or {@code false} to retry with a new {@code oldStat}
1642 */
1643 boolean tryDeallocateThread(long oldStat) {
1644 // roll back our thread allocation attempt
1645 // state change ex4:
1646 // threadStatus.size ← threadStatus.size - 1
1647 // succeeds: ex3
1648 // preconditions:
1649 // threadStatus.size > 0
1650 long newStat = withCurrentSize(oldStat, currentSizeOf(oldStat) - 1);
1651 if (currentSizeOf(newStat) == 0 && isShutdownRequested(oldStat)) {
1652 newStat = withShutdownComplete(newStat);
1653 }
1654 if (! compareAndSetThreadStatus(oldStat, newStat)) return false;
1655 if (isShutdownComplete(newStat)) {
1656 completeTermination();
1657 }
1658 return true;
1659 }
1660
1661 /**
1662 * Start an allocated thread.
1663 *
1664 * @param runnable the task or {@code null}
1665 * @return {@code true} if the thread was started, {@code false} otherwise
1666 * @throws RejectedExecutionException if {@code runnable} is not {@code null} and the thread could not be created or started
1667 */
1668 boolean doStartThread(Runnable runnable) throws RejectedExecutionException {
1669 Thread thread;
1670 try {
1671 thread = threadFactory.newThread(new ThreadBody(runnable));
1672 } catch (Throwable t) {
1673 if (runnable != null) {
1674 if (UPDATE_STATISTICS) rejectedTaskCounter.increment();
1675 rejectException(runnable, t);
1676 }
1677 return false;
1678 }
1679 if (thread == null) {
1680 if (runnable != null) {
1681 if (UPDATE_STATISTICS) rejectedTaskCounter.increment();
1682 rejectNoThread(runnable);
1683 }
1684 return false;
1685 }
1686 try {
1687 thread.start();
1688 } catch (Throwable t) {
1689 if (runnable != null) {
1690 if (UPDATE_STATISTICS) rejectedTaskCounter.increment();
1691 rejectException(runnable, t);
1692 }
1693 return false;
1694 }
1695 return true;
1696 }
1697
1698 // =======================================================
1699 // Task submission
1700 // =======================================================
1701
1702 private int tryExecute(final Runnable runnable) {
1703 QNode tailNext;
1704 if (TAIL_LOCK) lockTail();
1705 TaskNode tail = this.tail;
1706 TaskNode node = null;
1707 for (;;) {
1708 tailNext = tail.getNext();
1709 if (tailNext instanceof TaskNode) {
1710 TaskNode tailNextTaskNode;
1711 do {
1712 if (UPDATE_STATISTICS) spinMisses.increment();
1713 tailNextTaskNode = (TaskNode) tailNext;
1714 // retry
1715 tail = tailNextTaskNode;
1716 tailNext = tail.getNext();
1717 } while (tailNext instanceof TaskNode);
1718 // opportunistically update for the possible benefit of other threads
1719 if (UPDATE_TAIL) compareAndSetTail(tail, tailNextTaskNode);
1720 }
1721 // we've progressed to the first non-task node, as far as we can see
1722 assert ! (tailNext instanceof TaskNode);
1723 if (tailNext instanceof PoolThreadNode) {
1724 final QNode tailNextNext = tailNext.getNext();
1725 // state change ex1:
1726 // tail(snapshot).next ← tail(snapshot).next(snapshot).next(snapshot)
1727 // succeeds: -
1728 // cannot succeed: sh2
1729 // preconditions:
1730 // tail(snapshot) is a dead TaskNode
1731 // tail(snapshot).next is PoolThreadNode
1732 // tail(snapshot).next.next* is PoolThreadNode or null
1733 // additional success postconditions: -
1734 // failure postconditions: -
1735 // post-actions (succeed):
1736 // run state change ex2
1737 // post-actions (fail):
1738 // retry with new tail(snapshot)
1739 if (tail.compareAndSetNext(tailNext, tailNextNext)) {
1740 assert tail instanceof TaskNode && tail.task == null;
1741 PoolThreadNode consumerNode = (PoolThreadNode) tailNext;
1742 // state change ex2:
1743 // tail(snapshot).next(snapshot).task ← runnable
1744 // succeeds: ex1
1745 // preconditions:
1746 // tail(snapshot).next(snapshot).task = WAITING
1747 // post-actions (succeed):
1748 // unpark thread and return
1749 // post-actions (fail):
1750 // retry outer with new tail(snapshot)
1751 if (consumerNode.compareAndSetTask(WAITING, runnable)) {
1752 if (TAIL_LOCK) unlockTail();
1753 consumerNode.unpark();
1754 return EXE_OK;
1755 }
1756 // otherwise the consumer gave up or was exited already, so fall out and...
1757 }
1758 if (UPDATE_STATISTICS) spinMisses.increment();
1759 // retry with new tail(snapshot) as was foretold
1760 tail = this.tail;
1761 } else if (tailNext == null) {
1762 // no consumers available; maybe we can start one
1763 int tr = tryAllocateThread(growthResistance);
1764 if (tr == AT_YES) {
1765 if (TAIL_LOCK) unlockTail();
1766 return EXE_CREATE_THREAD;
1767 }
1768 if (tr == AT_SHUTDOWN) {
1769 if (TAIL_LOCK) unlockTail();
1770 return EXE_REJECT_SHUTDOWN;
1771 }
1772 assert tr == AT_NO;
1773 // no; try to enqueue
1774 if (! NO_QUEUE_LIMIT && ! increaseQueueSize()) {
1775 // queue is full
1776 // OK last effort to create a thread, disregarding growth limit
1777 tr = tryAllocateThread(0.0f);
1778 if (TAIL_LOCK) unlockTail();
1779 if (tr == AT_YES) {
1780 return EXE_CREATE_THREAD;
1781 }
1782 if (tr == AT_SHUTDOWN) {
1783 return EXE_REJECT_SHUTDOWN;
1784 }
1785 assert tr == AT_NO;
1786 return EXE_REJECT_QUEUE_FULL;
1787 }
1788 // queue size increased successfully; we can add to the list
1789 if (node == null) {
1790 // avoid re-allocating TaskNode instances on subsequent iterations
1791 node = new TaskNode(runnable);
1792 }
1793 // state change ex5:
1794 // tail(snapshot).next ← new task node
1795 // cannot succeed: sh2
1796 // preconditions:
1797 // tail(snapshot).next = null
1798 // ex3 failed precondition
1799 // queue size increased to accommodate node
1800 // postconditions (success):
1801 // tail(snapshot).next = new task node
1802 if (tail.compareAndSetNext(null, node)) {
1803 // try to update tail to the new node; if this CAS fails then tail already points at past the node
1804 // this is because tail can only ever move forward, and the task list is always strongly connected
1805 compareAndSetTail(tail, node);
1806 if (TAIL_LOCK) unlockTail();
1807 return EXE_OK;
1808 }
1809 // we failed; we have to drop the queue size back down again to compensate before we can retry
1810 if (! NO_QUEUE_LIMIT) decreaseQueueSize();
1811 if (UPDATE_STATISTICS) spinMisses.increment();
1812 // retry with new tail(snapshot)
1813 tail = this.tail;
1814 } else {
1815 if (TAIL_LOCK) unlockTail();
1816 // no consumers are waiting and the tail(snapshot).next node is non-null and not a task node, therefore it must be a...
1817 assert tailNext instanceof TerminateWaiterNode;
1818 // shutting down
1819 return EXE_REJECT_SHUTDOWN;
1820 }
1821 }
1822 // not reached
1823 }
1824
1825 // =======================================================
1826 // Termination task
1827 // =======================================================
1828
1829 void completeTermination() {
1830 // be kind and un-interrupt the thread for the termination task
1831 Thread.interrupted();
1832 final Runnable terminationTask = this.terminationTask;
1833 this.terminationTask = null;
1834 safeRun(terminationTask);
1835 // notify all waiters
1836 Waiter waiters = getAndSetTerminationWaiters(TERMINATE_COMPLETE_WAITER);
1837 while (waiters != null) {
1838 unpark(waiters.getThread());
1839 waiters = waiters.getNext();
1840 }
1841 tail.setNext(TERMINATE_COMPLETE);
1842 if (! DISABLE_MBEAN) {
1843 final Object handle = this.handle;
1844 if (handle != null) {
1845 doPrivileged(new MBeanUnregisterAction(handle), acc);
1846 }
1847 }
1848 }
1849
1850 // =======================================================
1851 // Compare-and-set operations
1852 // =======================================================
1853
1854 void incrementActiveCount() {
1855 unsafe.getAndAddInt(this, activeCountOffset, 1);
1856 }
1857
1858 void decrementActiveCount() {
1859 unsafe.getAndAddInt(this, activeCountOffset, -1);
1860 }
1861
1862 boolean compareAndSetPeakThreadCount(final int expect, final int update) {
1863 return unsafe.compareAndSwapInt(this, peakThreadCountOffset, expect, update);
1864 }
1865
1866 boolean compareAndSetPeakQueueSize(final int expect, final int update) {
1867 return unsafe.compareAndSwapInt(this, peakQueueSizeOffset, expect, update);
1868 }
1869
1870 boolean compareAndSetQueueSize(final long expect, final long update) {
1871 return unsafe.compareAndSwapLong(this, queueSizeOffset, expect, update);
1872 }
1873
1874 boolean compareAndSetTerminationWaiters(final Waiter expect, final Waiter update) {
1875 return unsafe.compareAndSwapObject(this, terminationWaitersOffset, expect, update);
1876 }
1877
1878 Waiter getAndSetTerminationWaiters(final Waiter update) {
1879 return (Waiter) unsafe.getAndSetObject(this, terminationWaitersOffset, update);
1880 }
1881
1882 // =======================================================
1883 // Queue size operations
1884 // =======================================================
1885
1886 boolean increaseQueueSize() {
1887 long oldVal = queueSize;
1888 int oldSize = currentQueueSizeOf(oldVal);
1889 if (oldSize >= maxQueueSizeOf(oldVal)) {
1890 // reject
1891 return false;
1892 }
1893 int newSize = oldSize + 1;
1894 while (! compareAndSetQueueSize(oldVal, withCurrentQueueSize(oldVal, newSize))) {
1895 if (UPDATE_STATISTICS) spinMisses.increment();
1896 oldVal = queueSize;
1897 oldSize = currentQueueSizeOf(oldVal);
1898 if (oldSize >= maxQueueSizeOf(oldVal)) {
1899 // reject
1900 return false;
1901 }
1902 newSize = oldSize + 1;
1903 }
1904 if (UPDATE_STATISTICS) {
1905 do {
1906 // oldSize now represents the old peak size
1907 oldSize = peakQueueSize;
1908 if (newSize <= oldSize) break;
1909 } while (! compareAndSetPeakQueueSize(oldSize, newSize));
1910 }
1911 return true;
1912 }
1913
1914 void decreaseQueueSize() {
1915 long oldVal;
1916 oldVal = queueSize;
1917 assert currentQueueSizeOf(oldVal) > 0;
1918 while (! compareAndSetQueueSize(oldVal, withCurrentQueueSize(oldVal, currentQueueSizeOf(oldVal) - 1))) {
1919 if (UPDATE_STATISTICS) spinMisses.increment();
1920 oldVal = queueSize;
1921 assert currentQueueSizeOf(oldVal) > 0;
1922 }
1923 }
1924
1925 // =======================================================
1926 // Inline Functions
1927 // =======================================================
1928
1929 static int currentQueueSizeOf(long queueSize) {
1930 return (int) (queueSize & 0x7fff_ffff);
1931 }
1932
1933 static long withCurrentQueueSize(long queueSize, int current) {
1934 assert current >= 0;
1935 return queueSize & 0xffff_ffff_0000_0000L | current;
1936 }
1937
1938 static int maxQueueSizeOf(long queueSize) {
1939 return (int) (queueSize >>> 32 & 0x7fff_ffff);
1940 }
1941
1942 static long withMaxQueueSize(long queueSize, int max) {
1943 assert max >= 0;
1944 return queueSize & 0xffff_ffffL | (long)max << 32;
1945 }
1946
1947 static int coreSizeOf(long status) {
1948 return (int) (status >>> TS_CORE_SHIFT & TS_THREAD_CNT_MASK);
1949 }
1950
1951 static int maxSizeOf(long status) {
1952 return (int) (status >>> TS_MAX_SHIFT & TS_THREAD_CNT_MASK);
1953 }
1954
1955 static int currentSizeOf(long status) {
1956 return (int) (status >>> TS_CURRENT_SHIFT & TS_THREAD_CNT_MASK);
1957 }
1958
1959 static long withCoreSize(long status, int newCoreSize) {
1960 assert 0 <= newCoreSize && newCoreSize <= TS_THREAD_CNT_MASK;
1961 return status & ~(TS_THREAD_CNT_MASK << TS_CORE_SHIFT) | (long)newCoreSize << TS_CORE_SHIFT;
1962 }
1963
1964 static long withCurrentSize(long status, int newCurrentSize) {
1965 assert 0 <= newCurrentSize && newCurrentSize <= TS_THREAD_CNT_MASK;
1966 return status & ~(TS_THREAD_CNT_MASK << TS_CURRENT_SHIFT) | (long)newCurrentSize << TS_CURRENT_SHIFT;
1967 }
1968
1969 static long withMaxSize(long status, int newMaxSize) {
1970 assert 0 <= newMaxSize && newMaxSize <= TS_THREAD_CNT_MASK;
1971 return status & ~(TS_THREAD_CNT_MASK << TS_MAX_SHIFT) | (long)newMaxSize << TS_MAX_SHIFT;
1972 }
1973
1974 static long withShutdownRequested(final long status) {
1975 return status | TS_SHUTDOWN_REQUESTED;
1976 }
1977
1978 static long withShutdownComplete(final long status) {
1979 return status | TS_SHUTDOWN_COMPLETE;
1980 }
1981
1982 static long withShutdownInterrupt(final long status) {
1983 return status | TS_SHUTDOWN_INTERRUPT;
1984 }
1985
1986 static long withAllowCoreTimeout(final long status, final boolean allowed) {
1987 return allowed ? status | TS_ALLOW_CORE_TIMEOUT : status & ~TS_ALLOW_CORE_TIMEOUT;
1988 }
1989
1990 static boolean isShutdownRequested(final long status) {
1991 return (status & TS_SHUTDOWN_REQUESTED) != 0;
1992 }
1993
1994 static boolean isShutdownComplete(final long status) {
1995 return (status & TS_SHUTDOWN_COMPLETE) != 0;
1996 }
1997
1998 static boolean isShutdownInterrupt(final long threadStatus) {
1999 return (threadStatus & TS_SHUTDOWN_INTERRUPT) != 0;
2000 }
2001
2002 static boolean isAllowCoreTimeout(final long oldVal) {
2003 return (oldVal & TS_ALLOW_CORE_TIMEOUT) != 0;
2004 }
2005
2006 // =======================================================
2007 // Static configuration
2008 // =======================================================
2009
2010 // =======================================================
2011 // Utilities
2012 // =======================================================
2013
2014 void safeRun(final Runnable task) {
2015 if (task == null) return;
2016 final Thread currentThread = Thread.currentThread();
2017 JBossExecutors.clearContextClassLoader(currentThread);
2018 try {
2019 task.run();
2020 } catch (Throwable t) {
2021 try {
2022 exceptionHandler.uncaughtException(Thread.currentThread(), t);
2023 } catch (Throwable ignored) {
2024 // nothing else we can safely do here
2025 }
2026 } finally {
2027 JBossExecutors.clearContextClassLoader(currentThread);
2028 // clear interrupt status
2029 Thread.interrupted();
2030 }
2031 }
2032
2033 void rejectException(final Runnable task, final Throwable cause) {
2034 try {
2035 handoffExecutor.execute(task);
2036 } catch (Throwable t) {
2037 t.addSuppressed(cause);
2038 throw t;
2039 }
2040 }
2041
2042 void rejectNoThread(final Runnable task) {
2043 try {
2044 handoffExecutor.execute(task);
2045 } catch (Throwable t) {
2046 t.addSuppressed(new RejectedExecutionException("No threads available"));
2047 throw t;
2048 }
2049 }
2050
2051 void rejectQueueFull(final Runnable task) {
2052 try {
2053 handoffExecutor.execute(task);
2054 } catch (Throwable t) {
2055 t.addSuppressed(new RejectedExecutionException("Queue is full"));
2056 throw t;
2057 }
2058 }
2059
2060 void rejectShutdown(final Runnable task) {
2061 try {
2062 handoffExecutor.execute(task);
2063 } catch (Throwable t) {
2064 t.addSuppressed(new RejectedExecutionException("Executor is being shut down"));
2065 throw t;
2066 }
2067 }
2068
2069 // =======================================================
2070 // Node classes
2071 // =======================================================
2072
2073 abstract static class QNode {
2074 private static final long nextOffset;
2075
2076 static {
2077 try {
2078 nextOffset = unsafe.objectFieldOffset(QNode.class.getDeclaredField("next"));
2079 } catch (NoSuchFieldException e) {
2080 throw new NoSuchFieldError(e.getMessage());
2081 }
2082 }
2083
2084 @SuppressWarnings("unused")
2085 private volatile QNode next;
2086
2087 boolean compareAndSetNext(QNode expect, QNode update) {
2088 return unsafe.compareAndSwapObject(this, nextOffset, expect, update);
2089 }
2090
2091 QNode getNext() {
2092 return next;
2093 }
2094
2095 void setNext(final QNode node) {
2096 next = node;
2097 }
2098
2099 void setNextRelaxed(final QNode node) {
2100 unsafe.putObject(this, nextOffset, node);
2101 }
2102 }
2103
2104 /** Padding between PoolThreadNode task and parked fields and QNode.next */
2105 static abstract class PoolThreadNodeBase extends QNode {
2106 /**
2107 * Padding fields.
2108 */
2109 @SuppressWarnings("unused")
2110 int p00, p01, p02, p03,
2111 p04, p05, p06, p07,
2112 p08, p09, p0A, p0B,
2113 p0C, p0D, p0E, p0F;
2114
2115 PoolThreadNodeBase() {}
2116 }
2117
2118 static final class PoolThreadNode extends PoolThreadNodeBase {
2119
2120 /**
2121 * Thread is running normally.
2122 */
2123 private static final int STATE_NORMAL = 0;
2124
2125 /**
2126 * Thread is parked (or about to park), and unpark call is necessary to wake the thread
2127 */
2128 private static final int STATE_PARKED = 1;
2129
2130 /**
2131 * The thread has been unparked, any thread that is spinning or about to park will return,
2132 * if not thread is currently spinning the next thread that attempts to spin will immediately return
2133 */
2134 private static final int STATE_UNPARKED = 2;
2135
2136
2137 private static final long taskOffset;
2138 private static final long parkedOffset;
2139
2140 static {
2141 try {
2142 taskOffset = unsafe.objectFieldOffset(PoolThreadNode.class.getDeclaredField("task"));
2143 parkedOffset = unsafe.objectFieldOffset(PoolThreadNode.class.getDeclaredField("parked"));
2144 } catch (NoSuchFieldException e) {
2145 throw new NoSuchFieldError(e.getMessage());
2146 }
2147 }
2148
2149 private final Thread thread;
2150
2151 @SuppressWarnings("unused")
2152 private volatile Runnable task;
2153
2154 /**
2155 * The park state, see the STATE_ constants for the meanings of each value
2156 */
2157 @SuppressWarnings("unused")
2158 private volatile int parked;
2159
2160 PoolThreadNode(final Thread thread) {
2161 this.thread = thread;
2162 task = WAITING;
2163 }
2164
2165 boolean compareAndSetTask(final Runnable expect, final Runnable update) {
2166 return task == expect && unsafe.compareAndSwapObject(this, taskOffset, expect, update);
2167 }
2168
2169 Runnable getTask() {
2170 return task;
2171 }
2172
2173 PoolThreadNode getNext() {
2174 return (PoolThreadNode) super.getNext();
2175 }
2176
2177 void park(EnhancedQueueExecutor enhancedQueueExecutor) {
2178 int spins = PARK_SPINS;
2179 if (parked == STATE_UNPARKED && unsafe.compareAndSwapInt(this, parkedOffset, STATE_UNPARKED, STATE_NORMAL)) {
2180 return;
2181 }
2182 while (spins > 0) {
2183 if (spins < YIELD_FACTOR) {
2184 Thread.yield();
2185 } else {
2186 JDKSpecific.onSpinWait();
2187 }
2188 spins--;
2189 if (parked == STATE_UNPARKED && unsafe.compareAndSwapInt(this, parkedOffset, STATE_UNPARKED, STATE_NORMAL)) {
2190 return;
2191 }
2192 }
2193 if (parked == STATE_NORMAL && unsafe.compareAndSwapInt(this, parkedOffset, STATE_NORMAL, STATE_PARKED)) try {
2194 LockSupport.park(enhancedQueueExecutor);
2195 } finally {
2196 // parked can be STATE_PARKED or STATE_UNPARKED, cannot possibly be STATE_NORMAL.
2197 // if it's STATE_PARKED, we'd go back to NORMAL since we're not parking anymore.
2198 // if it's STATE_UNPARKED, we can still go back to NORMAL because all of our preconditions will be rechecked anyway.
2199 parked = STATE_NORMAL;
2200 }
2201 }
2202
2203 void park(EnhancedQueueExecutor enhancedQueueExecutor, long nanos) {
2204 long remaining;
2205 int spins = PARK_SPINS;
2206 if (spins > 0) {
2207 long start = System.nanoTime();
2208 //note that we don't check the nanotime while spinning
2209 //as spin time is short and for our use cases it does not matter if the time
2210 //overruns a bit (as the nano time is for thread timeout) we just spin then check
2211 //to keep performance consistent between the two versions.
2212 if (parked == STATE_UNPARKED && unsafe.compareAndSwapInt(this, parkedOffset, STATE_UNPARKED, STATE_NORMAL)) {
2213 return;
2214 }
2215 while (spins > 0) {
2216 if (spins < YIELD_FACTOR) {
2217 Thread.yield();
2218 } else {
2219 JDKSpecific.onSpinWait();
2220 }
2221 if (parked == STATE_UNPARKED && unsafe.compareAndSwapInt(this, parkedOffset, STATE_UNPARKED, STATE_NORMAL)) {
2222 return;
2223 }
2224 spins--;
2225 }
2226 remaining = nanos - (System.nanoTime() - start);
2227 if (remaining < 0) {
2228 return;
2229 }
2230 } else {
2231 remaining = nanos;
2232 }
2233 if (parked == STATE_NORMAL && unsafe.compareAndSwapInt(this, parkedOffset, STATE_NORMAL, STATE_PARKED)) try {
2234 LockSupport.parkNanos(enhancedQueueExecutor, remaining);
2235 } finally {
2236 // parked can be STATE_PARKED or STATE_UNPARKED, cannot possibly be STATE_NORMAL.
2237 // if it's STATE_PARKED, we'd go back to NORMAL since we're not parking anymore.
2238 // if it's STATE_UNPARKED, we can still go back to NORMAL because all of our preconditions will be rechecked anyway.
2239 parked = STATE_NORMAL;
2240 }
2241 }
2242
2243 void unpark() {
2244 if (parked == STATE_NORMAL && unsafe.compareAndSwapInt(this, parkedOffset, STATE_NORMAL, STATE_UNPARKED)) {
2245 return;
2246 }
2247 LockSupport.unpark(thread);
2248 }
2249 }
2250
2251 static final class TerminateWaiterNode extends QNode {
2252 }
2253
2254 static final class TaskNode extends QNode {
2255 volatile Runnable task;
2256
2257 TaskNode(final Runnable task) {
2258 // we always start task nodes with a {@code null} next
2259 this.task = task;
2260 }
2261
2262 Runnable getAndClearTask() {
2263 try {
2264 return task;
2265 } finally {
2266 this.task = null;
2267 }
2268 }
2269 }
2270
2271 // =======================================================
2272 // Management bean implementation
2273 // =======================================================
2274
2275 final class MXBeanImpl implements StandardThreadPoolMXBean {
2276 MXBeanImpl() {
2277 }
2278
2279 public float getGrowthResistance() {
2280 return EnhancedQueueExecutor.this.getGrowthResistance();
2281 }
2282
2283 public void setGrowthResistance(final float value) {
2284 EnhancedQueueExecutor.this.setGrowthResistance(value);
2285 }
2286
2287 public boolean isGrowthResistanceSupported() {
2288 return true;
2289 }
2290
2291 public int getCorePoolSize() {
2292 return EnhancedQueueExecutor.this.getCorePoolSize();
2293 }
2294
2295 public void setCorePoolSize(final int corePoolSize) {
2296 EnhancedQueueExecutor.this.setCorePoolSize(corePoolSize);
2297 }
2298
2299 public boolean isCorePoolSizeSupported() {
2300 return true;
2301 }
2302
2303 public boolean prestartCoreThread() {
2304 return EnhancedQueueExecutor.this.prestartCoreThread();
2305 }
2306
2307 public int prestartAllCoreThreads() {
2308 return EnhancedQueueExecutor.this.prestartAllCoreThreads();
2309 }
2310
2311 public boolean isCoreThreadPrestartSupported() {
2312 return true;
2313 }
2314
2315 public int getMaximumPoolSize() {
2316 return EnhancedQueueExecutor.this.getMaximumPoolSize();
2317 }
2318
2319 public void setMaximumPoolSize(final int maxPoolSize) {
2320 EnhancedQueueExecutor.this.setMaximumPoolSize(maxPoolSize);
2321 }
2322
2323 public int getPoolSize() {
2324 return EnhancedQueueExecutor.this.getPoolSize();
2325 }
2326
2327 public int getLargestPoolSize() {
2328 return EnhancedQueueExecutor.this.getLargestPoolSize();
2329 }
2330
2331 public int getActiveCount() {
2332 return EnhancedQueueExecutor.this.getActiveCount();
2333 }
2334
2335 public boolean isAllowCoreThreadTimeOut() {
2336 return EnhancedQueueExecutor.this.allowsCoreThreadTimeOut();
2337 }
2338
2339 public void setAllowCoreThreadTimeOut(final boolean value) {
2340 EnhancedQueueExecutor.this.allowCoreThreadTimeOut(value);
2341 }
2342
2343 public long getKeepAliveTimeSeconds() {
2344 return EnhancedQueueExecutor.this.getKeepAliveTime().getSeconds();
2345 }
2346
2347 public void setKeepAliveTimeSeconds(final long seconds) {
2348 EnhancedQueueExecutor.this.setKeepAliveTime(Duration.of(seconds, ChronoUnit.SECONDS));
2349 }
2350
2351 public int getMaximumQueueSize() {
2352 return EnhancedQueueExecutor.this.getMaximumQueueSize();
2353 }
2354
2355 public void setMaximumQueueSize(final int size) {
2356 EnhancedQueueExecutor.this.setMaximumQueueSize(size);
2357 }
2358
2359 public int getQueueSize() {
2360 return EnhancedQueueExecutor.this.getQueueSize();
2361 }
2362
2363 public int getLargestQueueSize() {
2364 return EnhancedQueueExecutor.this.getLargestQueueSize();
2365 }
2366
2367 public boolean isQueueBounded() {
2368 return ! NO_QUEUE_LIMIT;
2369 }
2370
2371 public boolean isQueueSizeModifiable() {
2372 return ! NO_QUEUE_LIMIT;
2373 }
2374
2375 public boolean isShutdown() {
2376 return EnhancedQueueExecutor.this.isShutdown();
2377 }
2378
2379 public boolean isTerminating() {
2380 return EnhancedQueueExecutor.this.isTerminating();
2381 }
2382
2383 public boolean isTerminated() {
2384 return EnhancedQueueExecutor.this.isTerminated();
2385 }
2386
2387 public long getSubmittedTaskCount() {
2388 return EnhancedQueueExecutor.this.getSubmittedTaskCount();
2389 }
2390
2391 public long getRejectedTaskCount() {
2392 return EnhancedQueueExecutor.this.getRejectedTaskCount();
2393 }
2394
2395 public long getCompletedTaskCount() {
2396 return EnhancedQueueExecutor.this.getCompletedTaskCount();
2397 }
2398
2399 public long getSpinMissCount() {
2400 return EnhancedQueueExecutor.this.spinMisses.longValue();
2401 }
2402 }
2403 }
2404