1 /*
2  * JBoss, Home of Professional Open Source.
3  * Copyright 2017 Red Hat, Inc., and individual contributors
4  * as indicated by the @author tags.
5  *
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18
19 package org.jboss.threads;
20
21 import static java.lang.Math.max;
22 import static java.lang.Math.min;
23 import static java.lang.Thread.currentThread;
24 import static java.security.AccessController.doPrivileged;
25 import static java.security.AccessController.getContext;
26 import static java.util.concurrent.locks.LockSupport.parkNanos;
27 import static java.util.concurrent.locks.LockSupport.unpark;
28 import static org.jboss.threads.JBossExecutors.unsafe;
29
30 import java.lang.management.ManagementFactory;
31 import java.security.AccessControlContext;
32 import java.security.PrivilegedAction;
33 import java.time.Duration;
34 import java.time.temporal.ChronoUnit;
35 import java.util.ArrayList;
36 import java.util.Collections;
37 import java.util.Hashtable;
38 import java.util.List;
39 import java.util.Set;
40 import java.util.concurrent.ConcurrentHashMap;
41 import java.util.concurrent.Executor;
42 import java.util.concurrent.Executors;
43 import java.util.concurrent.RejectedExecutionException;
44 import java.util.concurrent.ThreadFactory;
45 import java.util.concurrent.ThreadLocalRandom;
46 import java.util.concurrent.TimeUnit;
47 import java.util.concurrent.atomic.LongAdder;
48 import java.util.concurrent.locks.LockSupport;
49
50 import javax.management.ObjectInstance;
51 import javax.management.ObjectName;
52
53 import org.jboss.threads.management.ManageableThreadPoolExecutorService;
54 import org.jboss.threads.management.StandardThreadPoolMXBean;
55 import org.wildfly.common.Assert;
56 import org.wildfly.common.cpu.ProcessorInfo;
57
58 /**
59  * A task-or-thread queue backed thread pool executor service.  Tasks are added in a FIFO manner, and consumers in a LIFO manner.
60  * Threads are only ever created in the event that there are no idle threads available to service a task, which, when
61  * combined with the LIFO-based consumer behavior, means that the thread pool will generally trend towards the minimum
62  * necessary size.  In addition, the optional {@linkplain #setGrowthResistance(float) growth resistance feature} can
63  * be used to further govern the thread pool size.
64  * <p>
65  * New instances of this thread pool are created by constructing and configuring a {@link Builder} instance, and calling
66  * its {@link Builder#build() build()} method.
67  *
68  * @author <a href="mailto:david.lloyd@redhat.com">David M. Lloyd</a>
69  */

70 public final class EnhancedQueueExecutor extends EnhancedQueueExecutorBase6 implements ManageableThreadPoolExecutorService {
71     private static final Thread[] NO_THREADS = new Thread[0];
72
73     static {
74         Version.getVersionString();
75         MBeanUnregisterAction.forceInit();
76     }
77
78     /*
79     ┌──────────────────────────┐
80     │ Explanation of operation │
81     └──────────────────────────┘
82
83     The primary mechanism of this executor is the special linked list/stack.  This list has the following properties:
84       • Multiple node types:
85         ◦ Task nodes (TaskNode), which have the following properties:
86           ▪ Strongly connected (no task is ever removed from the middle of the list; tail can always be found by following .next pointers)
87           ▪ FIFO (insertion at tail.next, "removal" at head.next)
88           ▪ Head and tail always refer to TaskNodes; head.next/tail.next are the "true" action points for all node types
89           ▪ At least one dead task node is always in the list, thus the task is cleared after execution to avoid leaks
90         ◦ Waiting consumer thread nodes (PoolThreadNode), which have the following properties:
91           ▪ LIFO/FILO (insertion and removal at tail.next)
92           ▪ Carrier for task handoff between producer and waiting consumer
93         ◦ Waiting termination (awaitTermination) thread nodes (TerminateWaiterNode), which have the following properties:
94           ▪ Append-only (insertion at tail.next.next...next)
95           ▪ All removed at once when termination is complete
96           ▪ If a thread stops waiting, the node remains (but its thread field is cleared to prevent (best-effort) useless unparks)
97           ▪ Once cleared, the thread field can never be reinitialized
98       • Concurrently accessed (multiple threads may read the list and update the head and tail pointers)
99       • TaskNode.next may be any node type or null
100       • PoolThreadNode.next may only be another PoolThreadNode or null
101       • TerminateWaiterNode.next may only be another TerminateWaiterNode or null
102
103     The secondary mechanism is the thread status atomic variable.  It is logically a structure with the following fields:
104       • Current thread count (currentSizeOf(), withCurrentSize())
105       • Core pool size (coreSizeOf(), withCoreSize())
106       • Max pool size (maxSizeOf(), withMaxSize())
107       • Shutdown-requested flag (isShutdownRequested(), withShutdownRequested())
108       • Shutdown-with-interrupt requested flag (isShutdownInterrupt(), withShutdownInterrupt())
109       • Shutdown-completed flag (isShutdownComplete(), withShutdownComplete())
110       • The decision to create a new thread is affected by whether the number of active threads is less than the core size;
111         if not, then the growth resistance factor is applied to decide whether the task should be enqueued or a new thread begun.
112         Note: the default growth resistance factor is 0% resistance.
113
114     The final mechanism is the queue status atomic variable.  It is logically a structure with the following fields:
115       • Current queue size (currentQueueSizeOf(), withCurrentQueueSize())
116       • Queue size limit (maxQueueSizeOf(), withMaxQueueSize())
117
118      */

119
120     // =======================================================
121     // Optimization control flags
122     // =======================================================
123
124     /**
125      * A global hint which establishes whether it is recommended to disable uses of {@code EnhancedQueueExecutor}.
126      * This hint defaults to {@code false} but can be changed to {@code true} by setting the {@code jboss.threads.eqe.disable}
127      * property to {@code true} before this class is initialized.
128      */

129     public static final boolean DISABLE_HINT = readBooleanPropertyPrefixed("disable"false);
130
131     /**
132      * Update the tail pointer opportunistically.
133      */

134     static final boolean UPDATE_TAIL = readBooleanPropertyPrefixed("update-tail"false);
135     /**
136      * Update the summary statistics.
137      */

138     static final boolean UPDATE_STATISTICS = readBooleanPropertyPrefixed("statistics"false);
139     /**
140      * Maintain an estimate of the number of threads which are currently doing work on behalf of the thread pool.
141      */

142     static final boolean UPDATE_ACTIVE_COUNT =
143             UPDATE_STATISTICS || readBooleanPropertyPrefixed("statistics.active-count"false);
144     /**
145      * Suppress queue limit and size tracking for performance.
146      */

147     static final boolean NO_QUEUE_LIMIT = readBooleanPropertyPrefixed("unlimited-queue"false);
148     /**
149      * Set the default value for whether an mbean is to be auto-registered for the thread pool.
150      */

151     static final boolean REGISTER_MBEAN = readBooleanPropertyPrefixed("register-mbean"true);
152     /**
153      * Set to enable or disable MBean registration.
154      */

155     static final boolean DISABLE_MBEAN = readBooleanPropertyPrefixed("disable-mbean", readProperty("org.graalvm.nativeimage.imagecode"null) != null);
156     /**
157      * The number of times a thread should spin/yield before actually parking.
158      */

159     static final int PARK_SPINS = readIntPropertyPrefixed("park-spins", ProcessorInfo.availableProcessors() == 1 ? 0 : 1 << 7);
160     /**
161      * The yield ratio expressed as the number of spins that should yield.
162      */

163     static final int YIELD_FACTOR = max(min(readIntPropertyPrefixed("park-yields", 1), PARK_SPINS), 0);
164
165     // =======================================================
166     // Constants
167     // =======================================================
168
169     static final Executor DEFAULT_HANDLER = JBossExecutors.rejectingExecutor();
170
171     // =======================================================
172     // Immutable configuration fields
173     // =======================================================
174
175     /**
176      * The thread factory.
177      */

178     private final ThreadFactory threadFactory;
179     /**
180      * The approximate set of pooled threads.
181      */

182     private final Set<Thread> runningThreads = Collections.newSetFromMap(new ConcurrentHashMap<>());
183     /**
184      * The management bean instance.
185      */

186     private final MXBeanImpl mxBean;
187     /**
188      * The MBean registration handle (if any).
189      */

190     private final Object handle;
191     /**
192      * The access control context of the creating thread.
193      */

194     private final AccessControlContext acc;
195
196     // =======================================================
197     // Current state fields
198     // =======================================================
199
200     /**
201      * The linked list of threads waiting for termination of this thread pool.
202      */

203     @SuppressWarnings("unused"// used by field updater
204     volatile Waiter terminationWaiters;
205
206     /**
207      * Queue size:
208      * <ul>
209      *     <li>Bit 00..1F: current queue length</li>
210      *     <li>Bit 20..3F: queue limit</li>
211      * </ul>
212      */

213     @SuppressWarnings("unused"// used by field updater
214     volatile long queueSize;
215
216     /**
217      * The thread keep-alive timeout value.
218      */

219     volatile long timeoutNanos;
220
221     /**
222      * A resistance factor applied after the core pool is full; values applied here will cause that fraction
223      * of submissions to create new threads when no idle thread is available.   A value of {@code 0.0f} implies that
224      * threads beyond the core size should be created as aggressively as threads within it; a value of {@code 1.0f}
225      * implies that threads beyond the core size should never be created.
226      */

227     volatile float growthResistance;
228
229     /**
230      * The handler for tasks which cannot be accepted by the executor.
231      */

232     volatile Executor handoffExecutor;
233
234     /**
235      * The handler for uncaught exceptions which occur during user tasks.
236      */

237     volatile Thread.UncaughtExceptionHandler exceptionHandler;
238
239     /**
240      * The termination task to execute when the thread pool exits.
241      */

242     volatile Runnable terminationTask;
243
244     // =======================================================
245     // Statistics fields and counters
246     // =======================================================
247
248     /**
249      * The peak number of threads ever created by this pool.
250      */

251     @SuppressWarnings("unused"// used by field updater
252     volatile int peakThreadCount;
253     /**
254      * The approximate peak queue size.
255      */

256     @SuppressWarnings("unused"// used by field updater
257     volatile int peakQueueSize;
258
259     private final LongAdder submittedTaskCounter = new LongAdder();
260     private final LongAdder completedTaskCounter = new LongAdder();
261     private final LongAdder rejectedTaskCounter = new LongAdder();
262     private final LongAdder spinMisses = new LongAdder();
263
264     /**
265      * The current active number of threads.
266      */

267     @SuppressWarnings("unused")
268     volatile int activeCount;
269
270     // =======================================================
271     // Updaters
272     // =======================================================
273
274     private static final long terminationWaitersOffset;
275
276     private static final long queueSizeOffset;
277
278     private static final long peakThreadCountOffset;
279     private static final long activeCountOffset;
280     private static final long peakQueueSizeOffset;
281
282     private static final Object sequenceBase;
283     private static final long sequenceOffset;
284
285     static {
286         try {
287             terminationWaitersOffset = unsafe.objectFieldOffset(EnhancedQueueExecutor.class.getDeclaredField("terminationWaiters"));
288
289             queueSizeOffset = unsafe.objectFieldOffset(EnhancedQueueExecutor.class.getDeclaredField("queueSize"));
290
291             peakThreadCountOffset = unsafe.objectFieldOffset(EnhancedQueueExecutor.class.getDeclaredField("peakThreadCount"));
292             activeCountOffset = unsafe.objectFieldOffset(EnhancedQueueExecutor.class.getDeclaredField("activeCount"));
293             peakQueueSizeOffset = unsafe.objectFieldOffset(EnhancedQueueExecutor.class.getDeclaredField("peakQueueSize"));
294
295             sequenceBase = unsafe.staticFieldBase(EnhancedQueueExecutor.class.getDeclaredField("sequence"));
296             sequenceOffset = unsafe.staticFieldOffset(EnhancedQueueExecutor.class.getDeclaredField("sequence"));
297         } catch (NoSuchFieldException e) {
298             throw new NoSuchFieldError(e.getMessage());
299         }
300     }
301
302     // =======================================================
303     // Thread state field constants
304     // =======================================================
305
306     private static final long TS_THREAD_CNT_MASK = 0b1111_1111_1111_1111_1111L; // 20 bits, can be shifted as needed
307
308     // shift amounts
309     private static final long TS_CURRENT_SHIFT   = 0;
310     private static final long TS_CORE_SHIFT      = 20;
311     private static final long TS_MAX_SHIFT       = 40;
312
313     private static final long TS_ALLOW_CORE_TIMEOUT = 1L << 60;
314     private static final long TS_SHUTDOWN_REQUESTED = 1L << 61;
315     private static final long TS_SHUTDOWN_INTERRUPT = 1L << 62;
316     @SuppressWarnings("NumericOverflow")
317     private static final long TS_SHUTDOWN_COMPLETE = 1L << 63;
318
319     private static final int EXE_OK = 0;
320     private static final int EXE_REJECT_QUEUE_FULL = 1;
321     private static final int EXE_REJECT_SHUTDOWN = 2;
322     private static final int EXE_CREATE_THREAD = 3;
323
324     private static final int AT_YES = 0;
325     private static final int AT_NO = 1;
326     private static final int AT_SHUTDOWN = 2;
327
328     // =======================================================
329     // Marker objects
330     // =======================================================
331
332     static final QNode TERMINATE_REQUESTED = new TerminateWaiterNode();
333     static final QNode TERMINATE_COMPLETE = new TerminateWaiterNode();
334
335     static final Waiter TERMINATE_COMPLETE_WAITER = new Waiter(null);
336
337     static final Runnable WAITING = new NullRunnable();
338     static final Runnable GAVE_UP = new NullRunnable();
339     static final Runnable ACCEPTED = new NullRunnable();
340     static final Runnable EXIT = new NullRunnable();
341
342     // =======================================================
343     // Constructor
344     // =======================================================
345
346     static volatile int sequence = 1;
347
348     EnhancedQueueExecutor(final Builder builder) {
349         super();
350         this.acc = getContext();
351         int maxSize = builder.getMaximumPoolSize();
352         int coreSize = min(builder.getCorePoolSize(), maxSize);
353         this.handoffExecutor = builder.getHandoffExecutor();
354         this.exceptionHandler = builder.getExceptionHandler();
355         this.threadFactory = builder.getThreadFactory();
356         this.terminationTask = builder.getTerminationTask();
357         this.growthResistance = builder.getGrowthResistance();
358         final Duration keepAliveTime = builder.getKeepAliveTime();
359         // initial dead node
360         // thread stat
361         threadStatus = withCoreSize(withMaxSize(withAllowCoreTimeout(0L, builder.allowsCoreThreadTimeOut()), maxSize), coreSize);
362         timeoutNanos = TimeUtil.clampedPositiveNanos(keepAliveTime);
363         queueSize = withMaxQueueSize(withCurrentQueueSize(0L, 0), builder.getMaximumQueueSize());
364         mxBean = new MXBeanImpl();
365         if (! DISABLE_MBEAN && builder.isRegisterMBean()) {
366             final String configuredName = builder.getMBeanName();
367             final String finalName = configuredName != null ? configuredName : "threadpool-" + unsafe.getAndAddInt(sequenceBase, sequenceOffset, 1);
368             handle = doPrivileged(new MBeanRegisterAction(finalName, mxBean), acc);
369         } else {
370             handle = null;
371         }
372     }
373
374     static final class MBeanRegisterAction implements PrivilegedAction<ObjectInstance> {
375         private final String finalName;
376         private final MXBeanImpl mxBean;
377
378         MBeanRegisterAction(final String finalName, final MXBeanImpl mxBean) {
379             this.finalName = finalName;
380             this.mxBean = mxBean;
381         }
382
383         public ObjectInstance run() {
384             try {
385                 final Hashtable<String, String> table = new Hashtable<>();
386                 table.put("name", ObjectName.quote(finalName));
387                 table.put("type""thread-pool");
388                 return ManagementFactory.getPlatformMBeanServer().registerMBean(mxBean, new ObjectName("jboss.threads", table));
389             } catch (Throwable ignored) {
390             }
391             return null;
392         }
393     }
394
395     // =======================================================
396     // Builder
397     // =======================================================
398
399     /**
400      * The builder class for an {@code EnhancedQueueExecutor}.  All the fields are initialized to sensible defaults for
401      * a small thread pool.
402      */

403     public static final class Builder {
404         private ThreadFactory threadFactory = Executors.defaultThreadFactory();
405         private Runnable terminationTask = NullRunnable.getInstance();
406         private Executor handoffExecutor = DEFAULT_HANDLER;
407         private Thread.UncaughtExceptionHandler exceptionHandler = JBossExecutors.loggingExceptionHandler();
408         private int coreSize = 16;
409         private int maxSize = 64;
410         private Duration keepAliveTime = Duration.ofSeconds(30);
411         private float growthResistance;
412         private boolean allowCoreTimeOut;
413         private int maxQueueSize = Integer.MAX_VALUE;
414         private boolean registerMBean = REGISTER_MBEAN;
415         private String mBeanName;
416
417         /**
418          * Construct a new instance.
419          */

420         public Builder() {}
421
422         /**
423          * Get the configured thread factory.
424          *
425          * @return the configured thread factory (not {@code null})
426          */

427         public ThreadFactory getThreadFactory() {
428             return threadFactory;
429         }
430
431         /**
432          * Set the configured thread factory.
433          *
434          * @param threadFactory the configured thread factory (must not be {@code null})
435          * @return this builder
436          */

437         public Builder setThreadFactory(final ThreadFactory threadFactory) {
438             Assert.checkNotNullParam("threadFactory", threadFactory);
439             this.threadFactory = threadFactory;
440             return this;
441         }
442
443         /**
444          * Get the termination task.  By default, an empty {@code Runnable} is used.
445          *
446          * @return the termination task (not {@code null})
447          */

448         public Runnable getTerminationTask() {
449             return terminationTask;
450         }
451
452         /**
453          * Set the termination task.
454          *
455          * @param terminationTask the termination task (must not be {@code null})
456          * @return this builder
457          */

458         public Builder setTerminationTask(final Runnable terminationTask) {
459             Assert.checkNotNullParam("terminationTask", terminationTask);
460             this.terminationTask = terminationTask;
461             return this;
462         }
463
464         /**
465          * Get the core pool size.  This is the size below which new threads will always be created if no idle threads
466          * are available.  If the pool size reaches the core size but has not yet reached the maximum size, a resistance
467          * factor will be applied to each task submission which determines whether the task should be queued or a new
468          * thread started.
469          *
470          * @return the core pool size
471          * @see EnhancedQueueExecutor#getCorePoolSize()
472          */

473         public int getCorePoolSize() {
474             return coreSize;
475         }
476
477         /**
478          * Set the core pool size.  If the configured maximum pool size is less than the configured core size, the
479          * core size will be reduced to match the maximum size when the thread pool is constructed.
480          *
481          * @param coreSize the core pool size (must be greater than or equal to 0, and less than 2^20)
482          * @return this builder
483          * @see EnhancedQueueExecutor#setCorePoolSize(int)
484          */

485         public Builder setCorePoolSize(final int coreSize) {
486             Assert.checkMinimumParameter("coreSize", 0, coreSize);
487             Assert.checkMaximumParameter("coreSize", TS_THREAD_CNT_MASK, coreSize);
488             this.coreSize = coreSize;
489             return this;
490         }
491
492         /**
493          * Get the maximum pool size.  This is the absolute upper limit to the size of the thread pool.
494          *
495          * @return the maximum pool size
496          * @see EnhancedQueueExecutor#getMaximumPoolSize()
497          */

498         public int getMaximumPoolSize() {
499             return maxSize;
500         }
501
502         /**
503          * Set the maximum pool size.  If the configured maximum pool size is less than the configured core size, the
504          * core size will be reduced to match the maximum size when the thread pool is constructed.
505          *
506          * @param maxSize the maximum pool size (must be greater than or equal to 0, and less than 2^20)
507          * @return this builder
508          * @see EnhancedQueueExecutor#setMaximumPoolSize(int)
509          */

510         public Builder setMaximumPoolSize(final int maxSize) {
511             Assert.checkMinimumParameter("maxSize", 0, maxSize);
512             Assert.checkMaximumParameter("maxSize", TS_THREAD_CNT_MASK, maxSize);
513             this.maxSize = maxSize;
514             return this;
515         }
516
517         /**
518          * Get the thread keep-alive time.  This is the amount of time (in the configured time unit) that idle threads
519          * will wait for a task before exiting.
520          *
521          * @return the thread keep-alive time duration
522          */

523         public Duration getKeepAliveTime() {
524             return keepAliveTime;
525         }
526
527         /**
528          * Get the thread keep-alive time.  This is the amount of time (in the configured time unit) that idle threads
529          * will wait for a task before exiting.
530          *
531          * @param keepAliveUnits the time keepAliveUnits of the keep-alive time (must not be {@code null})
532          * @return the thread keep-alive time
533          * @see EnhancedQueueExecutor#getKeepAliveTime(TimeUnit)
534          * @deprecated Use {@link #getKeepAliveTime()} instead.
535          */

536         @Deprecated
537         public long getKeepAliveTime(TimeUnit keepAliveUnits) {
538             Assert.checkNotNullParam("keepAliveUnits", keepAliveUnits);
539             final long secondsPart = keepAliveUnits.convert(keepAliveTime.getSeconds(), TimeUnit.SECONDS);
540             final long nanoPart = keepAliveUnits.convert(keepAliveTime.getNano(), TimeUnit.NANOSECONDS);
541             final long sum = secondsPart + nanoPart;
542             return sum < 0 ? Long.MAX_VALUE : sum;
543         }
544
545         /**
546          * Set the thread keep-alive time.
547          *
548          * @param keepAliveTime the thread keep-alive time (must not be {@code null})
549          */

550         public Builder setKeepAliveTime(final Duration keepAliveTime) {
551             Assert.checkNotNullParam("keepAliveTime", keepAliveTime);
552             this.keepAliveTime = keepAliveTime;
553             return this;
554         }
555
556         /**
557          * Set the thread keep-alive time.
558          *
559          * @param keepAliveTime the thread keep-alive time (must be greater than 0)
560          * @param keepAliveUnits the time keepAliveUnits of the keep-alive time (must not be {@code null})
561          * @return this builder
562          * @see EnhancedQueueExecutor#setKeepAliveTime(long, TimeUnit)
563          * @deprecated Use {@link #setKeepAliveTime(Duration)} instead.
564          */

565         @Deprecated
566         public Builder setKeepAliveTime(final long keepAliveTime, final TimeUnit keepAliveUnits) {
567             Assert.checkMinimumParameter("keepAliveTime", 1L, keepAliveTime);
568             Assert.checkNotNullParam("keepAliveUnits", keepAliveUnits);
569             this.keepAliveTime = Duration.of(keepAliveTime, JDKSpecific.timeToTemporal(keepAliveUnits));
570             return this;
571         }
572
573         /**
574          * Get the thread pool growth resistance.  This is the average fraction of submitted tasks that will be enqueued (instead
575          * of causing a new thread to start) when there are no idle threads and the pool size is equal to or greater than
576          * the core size (but still less than the maximum size).  A value of {@code 0.0} indicates that tasks should
577          * not be enqueued until the pool is completely full; a value of {@code 1.0} indicates that tasks should always
578          * be enqueued until the queue is completely full.
579          *
580          * @return the thread pool growth resistance
581          * @see EnhancedQueueExecutor#getGrowthResistance()
582          */

583         public float getGrowthResistance() {
584             return growthResistance;
585         }
586
587         /**
588          * Set the thread pool growth resistance.
589          *
590          * @param growthResistance the thread pool growth resistance (must be in the range {@code 0.0f ≤ n ≤ 1.0f})
591          * @return this builder
592          * @see #getGrowthResistance()
593          * @see EnhancedQueueExecutor#setGrowthResistance(float)
594          */

595         public Builder setGrowthResistance(final float growthResistance) {
596             Assert.checkMinimumParameter("growthResistance", 0.0f, growthResistance);
597             Assert.checkMaximumParameter("growthResistance", 1.0f, growthResistance);
598             this.growthResistance = growthResistance;
599             return this;
600         }
601
602         /**
603          * Determine whether core threads are allowed to time out.  A "core thread" is defined as any thread in the pool
604          * when the pool size is below the pool's {@linkplain #getCorePoolSize() core pool size}.
605          *
606          * @return {@code trueif core threads are allowed to time out, {@code false} otherwise
607          * @see EnhancedQueueExecutor#allowsCoreThreadTimeOut()
608          */

609         public boolean allowsCoreThreadTimeOut() {
610             return allowCoreTimeOut;
611         }
612
613         /**
614          * Establish whether core threads are allowed to time out.  A "core thread" is defined as any thread in the pool
615          * when the pool size is below the pool's {@linkplain #getCorePoolSize() core pool size}.
616          *
617          * @param allowCoreTimeOut {@code trueif core threads are allowed to time out, {@code false} otherwise
618          * @return this builder
619          * @see EnhancedQueueExecutor#allowCoreThreadTimeOut(boolean)
620          */

621         public Builder allowCoreThreadTimeOut(final boolean allowCoreTimeOut) {
622             this.allowCoreTimeOut = allowCoreTimeOut;
623             return this;
624         }
625
626         /**
627          * Get the maximum queue size.  If the queue is full and a task cannot be immediately accepted, rejection will result.
628          *
629          * @return the maximum queue size
630          * @see EnhancedQueueExecutor#getMaximumQueueSize()
631          */

632         public int getMaximumQueueSize() {
633             return maxQueueSize;
634         }
635
636         /**
637          * Set the maximum queue size.
638          * This has no impact when {@code jboss.threads.eqe.unlimited-queue} is set.
639          *
640          * @param maxQueueSize the maximum queue size (must be ≥ 0)
641          * @return this builder
642          * @see EnhancedQueueExecutor#setMaximumQueueSize(int)
643          */

644         public Builder setMaximumQueueSize(final int maxQueueSize) {
645             Assert.checkMinimumParameter("maxQueueSize", 0, maxQueueSize);
646             Assert.checkMaximumParameter("maxQueueSize", Integer.MAX_VALUE, maxQueueSize);
647             this.maxQueueSize = maxQueueSize;
648             return this;
649         }
650
651         /**
652          * Get the handoff executor.
653          *
654          * @return the handoff executor (not {@code null})
655          */

656         public Executor getHandoffExecutor() {
657             return handoffExecutor;
658         }
659
660         /**
661          * Set the handoff executor.
662          *
663          * @param handoffExecutor the handoff executor (must not be {@code null})
664          * @return this builder
665          */

666         public Builder setHandoffExecutor(final Executor handoffExecutor) {
667             Assert.checkNotNullParam("handoffExecutor", handoffExecutor);
668             this.handoffExecutor = handoffExecutor;
669             return this;
670         }
671
672         /**
673          * Get the uncaught exception handler.
674          *
675          * @return the uncaught exception handler (not {@code null})
676          */

677         public Thread.UncaughtExceptionHandler getExceptionHandler() {
678             return exceptionHandler;
679         }
680
681         /**
682          * Set the uncaught exception handler.
683          *
684          * @param exceptionHandler the uncaught exception handler (must not be {@code null})
685          * @return this builder
686          */

687         public Builder setExceptionHandler(final Thread.UncaughtExceptionHandler exceptionHandler) {
688             this.exceptionHandler = exceptionHandler;
689             return this;
690         }
691
692         /**
693          * Construct the executor from the configured parameters.
694          *
695          * @return the executor, which will be active and ready to accept tasks (not {@code null})
696          */

697         public EnhancedQueueExecutor build() {
698             return new EnhancedQueueExecutor(this);
699         }
700
701         /**
702          * Determine whether an MBean should automatically be registered for this pool.
703          *
704          * @return {@code trueif an MBean is to be auto-registered; {@code false} otherwise
705          */

706         public boolean isRegisterMBean() {
707             return registerMBean;
708         }
709
710         /**
711          * Establish whether an MBean should automatically be registered for this pool.
712          *
713          * @param registerMBean {@code trueif an MBean is to be auto-registered; {@code false} otherwise
714          * @return this builder
715          */

716         public Builder setRegisterMBean(final boolean registerMBean) {
717             this.registerMBean = registerMBean;
718             return this;
719         }
720
721         /**
722          * Get the overridden MBean name.
723          *
724          * @return the overridden MBean name, or {@code nullif a default name should be generated
725          */

726         public String getMBeanName() {
727             return mBeanName;
728         }
729
730         /**
731          * Set the overridden MBean name.
732          *
733          * @param mBeanName the overridden MBean name, or {@code nullif a default name should be generated
734          * @return this builder
735          */

736         public Builder setMBeanName(final String mBeanName) {
737             this.mBeanName = mBeanName;
738             return this;
739         }
740     }
741
742     // =======================================================
743     // ExecutorService
744     // =======================================================
745
746     /**
747      * Execute a task.
748      *
749      * @param runnable the task to execute (must not be {@code null})
750      */

751     public void execute(Runnable runnable) {
752         Assert.checkNotNullParam("runnable", runnable);
753         final Runnable realRunnable = JBossExecutors.classLoaderPreservingTaskUnchecked(runnable);
754         int result;
755         result = tryExecute(realRunnable);
756         boolean ok = false;
757         if (result == EXE_OK) {
758             // last check to ensure that there is at least one existent thread to avoid rare thread timeout race condition
759             if (currentSizeOf(threadStatus) == 0 && tryAllocateThread(0.0f) == AT_YES && ! doStartThread(null)) {
760                 deallocateThread();
761             }
762             if (UPDATE_STATISTICS) submittedTaskCounter.increment();
763             return;
764         } else if (result == EXE_CREATE_THREAD) try {
765             ok = doStartThread(realRunnable);
766         } finally {
767             if (! ok) deallocateThread();
768         } else {
769             if (UPDATE_STATISTICS) rejectedTaskCounter.increment();
770             if (result == EXE_REJECT_SHUTDOWN) {
771                 rejectShutdown(realRunnable);
772             } else {
773                 assert result == EXE_REJECT_QUEUE_FULL;
774                 rejectQueueFull(realRunnable);
775             }
776         }
777     }
778
779     /**
780      * Request that shutdown be initiated for this thread pool.  This is equivalent to calling
781      * {@link #shutdown(boolean) shutdown(false)}; see that method for more information.
782      */

783     public void shutdown() {
784         shutdown(false);
785     }
786
787     /**
788      * Attempt to stop the thread pool immediately by interrupting all running threads and de-queueing all pending
789      * tasks.  The thread pool might not be fully stopped when this method returns, if a currently running task
790      * does not respect interruption.
791      *
792      * @return a list of pending tasks (not {@code null})
793      */

794     public List<Runnable> shutdownNow() {
795         shutdown(true);
796         final ArrayList<Runnable> list = new ArrayList<>();
797         TaskNode head = this.head;
798         QNode headNext;
799         for (;;) {
800             headNext = head.getNext();
801             if (headNext instanceof TaskNode) {
802                 TaskNode taskNode = (TaskNode) headNext;
803                 if (compareAndSetHead(head, taskNode)) {
804                     if (! NO_QUEUE_LIMIT) decreaseQueueSize();
805                     head = taskNode;
806                     list.add(taskNode.task);
807                 }
808                 // retry
809             } else {
810                 // no more tasks;
811                 return list;
812             }
813         }
814     }
815
816     /**
817      * Determine whether shutdown was requested on this thread pool.
818      *
819      * @return {@code trueif shutdown was requested, {@code false} otherwise
820      */

821     public boolean isShutdown() {
822         return isShutdownRequested(threadStatus);
823     }
824
825     /**
826      * Determine whether shutdown has completed on this thread pool.
827      *
828      * @return {@code trueif shutdown has completed, {@code false} otherwise
829      */

830     public boolean isTerminated() {
831         return isShutdownComplete(threadStatus);
832     }
833
834     /**
835      * Wait for the thread pool to complete termination.  If the timeout expires before the thread pool is terminated,
836      * {@code false} is returned.  If the calling thread is interrupted before the thread pool is terminated, then
837      * an {@link InterruptedException} is thrown.
838      *
839      * @param timeout the amount of time to wait (must be ≥ 0)
840      * @param unit the unit of time to use for waiting (must not be {@code null})
841      * @return {@code trueif the thread pool terminated within the given timeout, {@code false} otherwise
842      * @throws InterruptedException if the calling thread was interrupted before either the time period elapsed or the pool terminated successfully
843      */

844     public boolean awaitTermination(final long timeout, final TimeUnit unit) throws InterruptedException {
845         Assert.checkMinimumParameter("timeout", 0, timeout);
846         Assert.checkNotNullParam("unit", unit);
847         if (timeout > 0) {
848             final Thread thread = Thread.currentThread();
849             if (runningThreads.contains(thread)) {
850                 throw Messages.msg.cannotAwaitWithin();
851             }
852             Waiter waiters = this.terminationWaiters;
853             if (waiters == TERMINATE_COMPLETE_WAITER) {
854                 return true;
855             }
856             final Waiter waiter = new Waiter(waiters);
857             waiter.setThread(currentThread());
858             while (! compareAndSetTerminationWaiters(waiters, waiter)) {
859                 waiters = this.terminationWaiters;
860                 if (waiters == TERMINATE_COMPLETE_WAITER) {
861                     return true;
862                 }
863                 waiter.setNext(waiters);
864             }
865             try {
866                 parkNanos(this, unit.toNanos(timeout));
867             } finally {
868                 // prevent future spurious unparks without sabotaging the queue's integrity
869                 waiter.setThread(null);
870             }
871         }
872         if (Thread.interrupted()) throw new InterruptedException();
873         return isTerminated();
874     }
875
876     // =======================================================
877     // Management
878     // =======================================================
879
880     public StandardThreadPoolMXBean getThreadPoolMXBean() {
881         return mxBean;
882     }
883
884     // =======================================================
885     // Other public API
886     // =======================================================
887
888     /**
889      * Initiate shutdown of this thread pool.  After this method is called, no new tasks will be accepted.  Once
890      * the last task is complete, the thread pool will be terminated and its
891      * {@linkplain Builder#setTerminationTask(Runnable) termination task}
892      * will be called.  Calling this method more than once has no additional effect, unless all previous calls
893      * had the {@code interrupt} parameter set to {@code false} and the subsequent call sets it to {@code true}, in
894      * which case all threads in the pool will be interrupted.
895      *
896      * @param interrupt {@code true} to request that currently-running tasks be interrupted; {@code false} otherwise
897      */

898     public void shutdown(boolean interrupt) {
899         long oldStatus, newStatus;
900         // state change sh1:
901         //   threadStatus ← threadStatus | shutdown
902         // succeeds: -
903         // preconditions:
904         //   none (thread status may be in any state)
905         // postconditions (succeed):
906         //   (always) ShutdownRequested set in threadStatus
907         //   (maybe) ShutdownInterrupted set in threadStatus (depends on parameter)
908         //   (maybe) ShutdownComplete set in threadStatus (if currentSizeOf() == 0)
909         //   (maybe) exit if no change
910         // postconditions (fail): -
911         // post-actions (fail):
912         //   repeat state change until success or return
913         do {
914             oldStatus = threadStatus;
915             newStatus = withShutdownRequested(oldStatus);
916             if (interrupt) newStatus = withShutdownInterrupt(newStatus);
917             if (currentSizeOf(oldStatus) == 0) newStatus = withShutdownComplete(newStatus);
918             if (newStatus == oldStatus) return;
919         } while (! compareAndSetThreadStatus(oldStatus, newStatus));
920         assert oldStatus != newStatus;
921         if (isShutdownRequested(newStatus) != isShutdownRequested(oldStatus)) {
922             assert ! isShutdownRequested(oldStatus); // because it can only ever be set, not cleared
923             // we initiated shutdown
924             // clear out all consumers and append a dummy waiter node
925             TaskNode tail = this.tail;
926             QNode tailNext;
927             // a marker to indicate that termination was requested
928             for (;;) {
929                 tailNext = tail.getNext();
930                 if (tailNext instanceof TaskNode) {
931                     tail = (TaskNode) tailNext;
932                 } else if (tailNext instanceof PoolThreadNode || tailNext == null) {
933                     // remove the entire chain from this point
934                     PoolThreadNode node = (PoolThreadNode) tailNext;
935                     // state change sh2:
936                     //   tail.next ← terminateNode(null)
937                     // succeeds: sh1
938                     // preconditions:
939                     //   threadStatus contains shutdown
940                     //   tail(snapshot) is a task node
941                     //   tail(snapshot).next is a (list of) pool thread node(s) or null
942                     // postconditions (succeed):
943                     //   tail(snapshot).next is TERMINATE_REQUESTED
944                     if (tail.compareAndSetNext(node, TERMINATE_REQUESTED)) {
945                         // got it!
946                         // state change sh3:
947                         //   node.task ← EXIT
948                         // preconditions:
949                         //   none (node task may be in any state)
950                         // postconditions (succeed):
951                         //   task is EXIT
952                         // postconditions (fail):
953                         //   no change (repeat loop)
954                         while (node != null) {
955                             node.compareAndSetTask(WAITING, EXIT);
956                             node.unpark();
957                             node = node.getNext();
958                         }
959                         // success; exit loop
960                         break;
961                     }
962                     // repeat loop (failed CAS)
963                 } else if (tailNext instanceof TerminateWaiterNode) {
964                     // theoretically impossible, but it means we have nothing else to do
965                     break;
966                 } else {
967                     throw Assert.unreachableCode();
968                 }
969             }
970         }
971         if (isShutdownInterrupt(newStatus) != isShutdownInterrupt(oldStatus)) {
972             assert ! isShutdownInterrupt(oldStatus); // because it can only ever be set, not cleared
973             // we initiated interrupt, so interrupt all currently active threads
974             for (Thread thread : runningThreads) {
975                 thread.interrupt();
976             }
977         }
978         if (isShutdownComplete(newStatus) != isShutdownComplete(oldStatus)) {
979             assert ! isShutdownComplete(oldStatus);  // because it can only ever be set, not cleared
980             completeTermination();
981         }
982     }
983
984     /**
985      * Determine if this thread pool is in the process of terminating but has not yet completed.
986      *
987      * @return {@code trueif the thread pool is terminating, or {@code falseif the thread pool is not terminating or has completed termination
988      */

989     public boolean isTerminating() {
990         final long threadStatus = this.threadStatus;
991         return isShutdownRequested(threadStatus) && ! isShutdownComplete(threadStatus);
992     }
993
994     /**
995      * Start an idle core thread.  Normally core threads are only begun when a task was submitted to the executor
996      * but no thread is immediately available to handle the task.
997      *
998      * @return {@code trueif a core thread was started, or {@code falseif all core threads were already running or
999      *   if the thread failed to be created for some other reason
1000      */

1001     public boolean prestartCoreThread() {
1002         if (tryAllocateThread(1.0f) != AT_YES) return false;
1003         if (doStartThread(null)) return true;
1004         deallocateThread();
1005         return false;
1006     }
1007
1008     /**
1009      * Start all core threads.  Calls {@link #prestartCoreThread()} in a loop until it returns {@code false}.
1010      *
1011      * @return the number of core threads created
1012      */

1013     public int prestartAllCoreThreads() {
1014         int cnt = 0;
1015         while (prestartCoreThread()) cnt ++;
1016         return cnt;
1017     }
1018
1019     // =======================================================
1020     // Tuning & configuration parameters (run time)
1021     // =======================================================
1022
1023     /**
1024      * Get the thread pool growth resistance.  This is the average fraction of submitted tasks that will be enqueued (instead
1025      * of causing a new thread to start) when there are no idle threads and the pool size is equal to or greater than
1026      * the core size (but still less than the maximum size).  A value of {@code 0.0} indicates that tasks should
1027      * not be enqueued until the pool is completely full; a value of {@code 1.0} indicates that tasks should always
1028      * be enqueued until the queue is completely full.
1029      *
1030      * @return the configured growth resistance factor
1031      * @see Builder#getGrowthResistance() Builder.getGrowthResistance()
1032      */

1033     public float getGrowthResistance() {
1034         return growthResistance;
1035     }
1036
1037     /**
1038      * Set the growth resistance factor.
1039      *
1040      * @param growthResistance the thread pool growth resistance (must be in the range {@code 0.0f ≤ n ≤ 1.0f})
1041      * @see Builder#setGrowthResistance(float) Builder.setGrowthResistance()
1042      */

1043     public void setGrowthResistance(final float growthResistance) {
1044         Assert.checkMinimumParameter("growthResistance", 0.0f, growthResistance);
1045         Assert.checkMaximumParameter("growthResistance", 1.0f, growthResistance);
1046         this.growthResistance = growthResistance;
1047     }
1048
1049     /**
1050      * Get the core pool size.  This is the size below which new threads will always be created if no idle threads
1051      * are available.  If the pool size reaches the core size but has not yet reached the maximum size, a resistance
1052      * factor will be applied to each task submission which determines whether the task should be queued or a new
1053      * thread started.
1054      *
1055      * @return the core pool size
1056      * @see Builder#getCorePoolSize() Builder.getCorePoolSize()
1057      */

1058     public int getCorePoolSize() {
1059         return coreSizeOf(threadStatus);
1060     }
1061
1062     /**
1063      * Set the core pool size.  If the configured maximum pool size is less than the configured core size, the
1064      * core size will be reduced to match the maximum size when the thread pool is constructed.
1065      *
1066      * @param corePoolSize the core pool size (must be greater than or equal to 0, and less than 2^20)
1067      * @see Builder#setCorePoolSize(int) Builder.setCorePoolSize()
1068      */

1069     public void setCorePoolSize(final int corePoolSize) {
1070         Assert.checkMinimumParameter("corePoolSize", 0, corePoolSize);
1071         Assert.checkMaximumParameter("corePoolSize", TS_THREAD_CNT_MASK, corePoolSize);
1072         long oldVal, newVal;
1073         do {
1074             oldVal = threadStatus;
1075             if (corePoolSize > maxSizeOf(oldVal)) {
1076                 // automatically bump up max size to match
1077                 newVal = withCoreSize(withMaxSize(oldVal, corePoolSize), corePoolSize);
1078             } else {
1079                 newVal = withCoreSize(oldVal, corePoolSize);
1080             }
1081         } while (! compareAndSetThreadStatus(oldVal, newVal));
1082         if (maxSizeOf(newVal) < maxSizeOf(oldVal) || coreSizeOf(newVal) < coreSizeOf(oldVal)) {
1083             // poke all the threads to try to terminate any excess eagerly
1084             for (Thread activeThread : runningThreads) {
1085                 unpark(activeThread);
1086             }
1087         }
1088     }
1089
1090     /**
1091      * Get the maximum pool size.  This is the absolute upper limit to the size of the thread pool.
1092      *
1093      * @return the maximum pool size
1094      * @see Builder#getMaximumPoolSize() Builder.getMaximumPoolSize()
1095      */

1096     public int getMaximumPoolSize() {
1097         return maxSizeOf(threadStatus);
1098     }
1099
1100     /**
1101      * Set the maximum pool size.  If the configured maximum pool size is less than the configured core size, the
1102      * core size will be reduced to match the maximum size when the thread pool is constructed.
1103      *
1104      * @param maxPoolSize the maximum pool size (must be greater than or equal to 0, and less than 2^20)
1105      * @see Builder#setMaximumPoolSize(int) Builder.setMaximumPoolSize()
1106      */

1107     public void setMaximumPoolSize(final int maxPoolSize) {
1108         Assert.checkMinimumParameter("maxPoolSize", 0, maxPoolSize);
1109         Assert.checkMaximumParameter("maxPoolSize", TS_THREAD_CNT_MASK, maxPoolSize);
1110         long oldVal, newVal;
1111         do {
1112             oldVal = threadStatus;
1113             if (maxPoolSize < coreSizeOf(oldVal)) {
1114                 // automatically bump down core size to match
1115                 newVal = withCoreSize(withMaxSize(oldVal, maxPoolSize), maxPoolSize);
1116             } else {
1117                 newVal = withMaxSize(oldVal, maxPoolSize);
1118             }
1119         } while (! compareAndSetThreadStatus(oldVal, newVal));
1120         if (maxSizeOf(newVal) < maxSizeOf(oldVal) || coreSizeOf(newVal) < coreSizeOf(oldVal)) {
1121             // poke all the threads to try to terminate any excess eagerly
1122             for (Thread activeThread : runningThreads) {
1123                 unpark(activeThread);
1124             }
1125         }
1126     }
1127
1128     /**
1129      * Determine whether core threads are allowed to time out.  A "core thread" is defined as any thread in the pool
1130      * when the pool size is below the pool's {@linkplain #getCorePoolSize() core pool size}.
1131      *
1132      * @return {@code trueif core threads are allowed to time out, {@code false} otherwise
1133      * @see Builder#allowsCoreThreadTimeOut() Builder.allowsCoreThreadTimeOut()
1134      */

1135     public boolean allowsCoreThreadTimeOut() {
1136         return isAllowCoreTimeout(threadStatus);
1137     }
1138
1139     /**
1140      * Establish whether core threads are allowed to time out.  A "core thread" is defined as any thread in the pool
1141      * when the pool size is below the pool's {@linkplain #getCorePoolSize() core pool size}.
1142      *
1143      * @param value {@code trueif core threads are allowed to time out, {@code false} otherwise
1144      * @see Builder#allowCoreThreadTimeOut(boolean) Builder.allowCoreThreadTimeOut()
1145      */

1146     public void allowCoreThreadTimeOut(boolean value) {
1147         long oldVal, newVal;
1148         do {
1149             oldVal = threadStatus;
1150             newVal = withAllowCoreTimeout(oldVal, value);
1151             if (oldVal == newVal) return;
1152         } while (! compareAndSetThreadStatus(oldVal, newVal));
1153         if (value) {
1154             // poke all the threads to try to terminate any excess eagerly
1155             for (Thread activeThread : runningThreads) {
1156                 unpark(activeThread);
1157             }
1158         }
1159     }
1160
1161     /**
1162      * Get the thread keep-alive time.  This is the minimum length of time that idle threads should remain until they exit.
1163      * Unless core threads are allowed to time out, threads will only exit if the current thread count exceeds the core
1164      * limit.
1165      *
1166      * @param keepAliveUnits the unit in which the result should be expressed (must not be {@code null})
1167      * @return the amount of time (will be greater than zero)
1168      * @see Builder#getKeepAliveTime(TimeUnit) Builder.getKeepAliveTime()
1169      * @deprecated Use {@link #getKeepAliveTime()} instead.
1170      */

1171     @Deprecated
1172     public long getKeepAliveTime(TimeUnit keepAliveUnits) {
1173         Assert.checkNotNullParam("keepAliveUnits", keepAliveUnits);
1174         return keepAliveUnits.convert(timeoutNanos, TimeUnit.NANOSECONDS);
1175     }
1176
1177     /**
1178      * Get the thread keep-alive time.  This is the minimum length of time that idle threads should remain until they exit.
1179      * Unless core threads are allowed to time out, threads will only exit if the current thread count exceeds the core
1180      * limit.
1181      *
1182      * @return the amount of time (will be greater than zero)
1183      * @see Builder#getKeepAliveTime() Builder.getKeepAliveTime()
1184      */

1185     public Duration getKeepAliveTime() {
1186         return Duration.of(timeoutNanos, ChronoUnit.NANOS);
1187     }
1188
1189     /**
1190      * Set the thread keep-alive time.  This is the minimum length of time that idle threads should remain until they exit.
1191      * Unless core threads are allowed to time out, threads will only exit if the current thread count exceeds the core
1192      * limit.
1193      *
1194      * @param keepAliveTime the thread keep-alive time (must be &gt; 0)
1195      * @param keepAliveUnits the unit in which the value is expressed (must not be {@code null})
1196      * @see Builder#setKeepAliveTime(long, TimeUnit) Builder.setKeepAliveTime()
1197      * @deprecated Use {@link #setKeepAliveTime(Duration)} instead.
1198      */

1199     @Deprecated
1200     public void setKeepAliveTime(final long keepAliveTime, final TimeUnit keepAliveUnits) {
1201         Assert.checkMinimumParameter("keepAliveTime", 1L, keepAliveTime);
1202         Assert.checkNotNullParam("keepAliveUnits", keepAliveUnits);
1203         timeoutNanos = max(1L, keepAliveUnits.toNanos(keepAliveTime));
1204     }
1205
1206     /**
1207      * Set the thread keep-alive time.  This is the minimum length of time that idle threads should remain until they exit.
1208      * Unless core threads are allowed to time out, threads will only exit if the current thread count exceeds the core
1209      * limit.
1210      *
1211      * @param keepAliveTime the thread keep-alive time (must not be {@code null})
1212      * @see Builder#setKeepAliveTime(Duration) Builder.setKeepAliveTime()
1213      */

1214     public void setKeepAliveTime(final Duration keepAliveTime) {
1215         Assert.checkNotNullParam("keepAliveTime", keepAliveTime);
1216         timeoutNanos = TimeUtil.clampedPositiveNanos(keepAliveTime);
1217     }
1218
1219     /**
1220      * Get the maximum queue size.  If the queue is full and a task cannot be immediately accepted, rejection will result.
1221      *
1222      * @return the maximum queue size
1223      * @see Builder#getMaximumQueueSize() Builder.getMaximumQueueSize()
1224      */

1225     public int getMaximumQueueSize() {
1226         return maxQueueSizeOf(queueSize);
1227     }
1228
1229     /**
1230      * Set the maximum queue size.  If the new maximum queue size is smaller than the current queue size, there is no
1231      * effect other than preventing tasks from being enqueued until the size decreases below the maximum again.
1232      *
1233      * @param maxQueueSize the maximum queue size (must be ≥ 0)
1234      * @see Builder#setMaximumQueueSize(int) Builder.setMaximumQueueSize()
1235      */

1236     public void setMaximumQueueSize(final int maxQueueSize) {
1237         Assert.checkMinimumParameter("maxQueueSize", 0, maxQueueSize);
1238         Assert.checkMaximumParameter("maxQueueSize", Integer.MAX_VALUE, maxQueueSize);
1239         if (NO_QUEUE_LIMIT) return;
1240         long oldVal;
1241         do {
1242             oldVal = queueSize;
1243         } while (! compareAndSetQueueSize(oldVal, withMaxQueueSize(oldVal, maxQueueSize)));
1244     }
1245
1246     /**
1247      * Get the executor to delegate to in the event of task rejection.
1248      *
1249      * @return the executor to delegate to in the event of task rejection (not {@code null})
1250      */

1251     public Executor getHandoffExecutor() {
1252         return handoffExecutor;
1253     }
1254
1255     /**
1256      * Set the executor to delegate to in the event of task rejection.
1257      *
1258      * @param handoffExecutor the executor to delegate to in the event of task rejection (must not be {@code null})
1259      */

1260     public void setHandoffExecutor(final Executor handoffExecutor) {
1261         Assert.checkNotNullParam("handoffExecutor", handoffExecutor);
1262         this.handoffExecutor = handoffExecutor;
1263     }
1264
1265     /**
1266      * Get the exception handler to use for uncaught exceptions.
1267      *
1268      * @return the exception handler to use for uncaught exceptions (not {@code null})
1269      */

1270     public Thread.UncaughtExceptionHandler getExceptionHandler() {
1271         return exceptionHandler;
1272     }
1273
1274     /**
1275      * Set the exception handler to use for uncaught exceptions.
1276      *
1277      * @param exceptionHandler the exception handler to use for uncaught exceptions (must not be {@code null})
1278      */

1279     public void setExceptionHandler(final Thread.UncaughtExceptionHandler exceptionHandler) {
1280         Assert.checkNotNullParam("exceptionHandler", exceptionHandler);
1281         this.exceptionHandler = exceptionHandler;
1282     }
1283
1284     /**
1285      * Set the termination task, overwriting any previous setting.
1286      *
1287      * @param terminationTask the termination task, or {@code null} to perform no termination task
1288      */

1289     public void setTerminationTask(final Runnable terminationTask) {
1290         this.terminationTask = terminationTask;
1291     }
1292
1293     // =======================================================
1294     // Statistics & metrics API
1295     // =======================================================
1296
1297     /**
1298      * Get an estimate of the current queue size.
1299      *
1300      * @return an estimate of the current queue size or -1 when {@code jboss.threads.eqe.unlimited-queue} is enabled
1301      */

1302     public int getQueueSize() {
1303         return NO_QUEUE_LIMIT ? -1 : currentQueueSizeOf(queueSize);
1304     }
1305
1306     /**
1307      * Get an estimate of the peak number of threads that the pool has ever held.
1308      *
1309      * @return an estimate of the peak number of threads that the pool has ever held
1310      */

1311     public int getLargestPoolSize() {
1312         return UPDATE_STATISTICS ? peakThreadCount : -1;
1313     }
1314
1315     /**
1316      * Get an estimate of the number of threads which are currently doing work on behalf of the thread pool.
1317      *
1318      * @return the active count estimate or -1 when {@code jboss.threads.eqe.statistics.active-count} is disabled
1319      */

1320     public int getActiveCount() {
1321         return UPDATE_ACTIVE_COUNT ? activeCount : -1;
1322     }
1323
1324     /**
1325      * Get an estimate of the peak size of the queue.
1326      *
1327      * return an estimate of the peak size of the queue or -1 when {@code jboss.threads.eqe.statistics}
1328      * is disabled or {@code jboss.threads.eqe.unlimited-queue} is enabled
1329      */

1330     public int getLargestQueueSize() {
1331         return UPDATE_STATISTICS && !NO_QUEUE_LIMIT ? peakQueueSize : -1;
1332     }
1333
1334     /**
1335      * Get an estimate of the total number of tasks ever submitted to this thread pool.
1336      *
1337      * @return an estimate of the total number of tasks ever submitted to this thread pool
1338      * or -1 when {@code jboss.threads.eqe.statistics} is disabled
1339      */

1340     public long getSubmittedTaskCount() {
1341         return UPDATE_STATISTICS ? submittedTaskCounter.longValue() : -1;
1342     }
1343
1344     /**
1345      * Get an estimate of the total number of tasks ever rejected by this thread pool for any reason.
1346      *
1347      * @return an estimate of the total number of tasks ever rejected by this thread pool
1348      * or -1 when {@code jboss.threads.eqe.statistics} is disabled
1349      */

1350     public long getRejectedTaskCount() {
1351         return UPDATE_STATISTICS ? rejectedTaskCounter.longValue() : -1;
1352     }
1353
1354     /**
1355      * Get an estimate of the number of tasks completed by this thread pool.
1356      *
1357      * @return an estimate of the number of tasks completed by this thread pool
1358      * or -1 when {@code jboss.threads.eqe.statistics} is disabled
1359      */

1360     public long getCompletedTaskCount() {
1361         return UPDATE_STATISTICS ? completedTaskCounter.longValue() : -1;
1362     }
1363
1364     /**
1365      * Get an estimate of the current number of active threads in the pool.
1366      *
1367      * @return an estimate of the current number of active threads in the pool
1368      */

1369     public int getPoolSize() {
1370         return currentSizeOf(threadStatus);
1371     }
1372
1373     /**
1374      * Get an array containing an approximate snapshot of the currently running threads in
1375      * this executor.
1376      *
1377      * @return an array of running (unterminated) threads (not {@code null})
1378      */

1379     public Thread[] getRunningThreads() {
1380         return runningThreads.toArray(NO_THREADS);
1381     }
1382
1383     static class MBeanUnregisterAction implements PrivilegedAction<Void> {
1384         static void forceInit() {
1385         }
1386
1387         private final Object handle;
1388
1389         MBeanUnregisterAction(final Object handle) {
1390             this.handle = handle;
1391         }
1392
1393         public Void run() {
1394             try {
1395                 ManagementFactory.getPlatformMBeanServer().unregisterMBean(((ObjectInstance) handle).getObjectName());
1396             } catch (Throwable ignored) {
1397             }
1398             return null;
1399         }
1400     }
1401
1402     // =======================================================
1403     // Pooled thread body
1404     // =======================================================
1405
1406     final class ThreadBody implements Runnable {
1407         private Runnable initialTask;
1408
1409         ThreadBody(final Runnable initialTask) {
1410             this.initialTask = initialTask;
1411         }
1412
1413         /**
1414          * Execute the body of the thread.  On entry the thread is added to the {@link #runningThreads} set, and on
1415          * exit it is removed.
1416          */

1417         public void run() {
1418             final Thread currentThread = Thread.currentThread();
1419             final LongAdder spinMisses = EnhancedQueueExecutor.this.spinMisses;
1420             runningThreads.add(currentThread);
1421
1422             // run the initial task
1423             doRunTask(getAndClearInitialTask());
1424
1425             // Eagerly allocate a PoolThreadNode for the next time it's needed
1426             PoolThreadNode nextPoolThreadNode = new PoolThreadNode(currentThread);
1427             // main loop
1428             QNode node;
1429             processingQueue: for (;;) {
1430                 node = getOrAddNode(nextPoolThreadNode);
1431                 if (node instanceof TaskNode) {
1432                     // task node was removed
1433                     doRunTask(((TaskNode) node).getAndClearTask());
1434                     continue;
1435                 } else if (node == nextPoolThreadNode) {
1436                     // pool thread node was added
1437                     final PoolThreadNode newNode = nextPoolThreadNode;
1438                     // nextPoolThreadNode has been added to the queue, a new node is required for next time.
1439                     nextPoolThreadNode = new PoolThreadNode(currentThread);
1440                     // at this point, we are registered into the queue
1441                     long start = System.nanoTime();
1442                     long elapsed = 0L;
1443                     waitingForTask: for (;;) {
1444                         Runnable task = newNode.getTask();
1445                         assert task != ACCEPTED && task != GAVE_UP;
1446                         if (task != WAITING && task != EXIT) {
1447                             if (newNode.compareAndSetTask(task, ACCEPTED)) {
1448                                 // we have a task to run, so run it and then abandon the node
1449                                 doRunTask(task);
1450                                 // rerun outer
1451                                 continue processingQueue;
1452                             }
1453                             // we had a task to run, but we failed to CAS it for some reason, so retry
1454                             if (UPDATE_STATISTICS) spinMisses.increment();
1455                             continue waitingForTask;
1456                         } else {
1457                             final long timeoutNanos = EnhancedQueueExecutor.this.timeoutNanos;
1458                             long oldVal = threadStatus;
1459                             if (elapsed >= timeoutNanos || task == EXIT || currentSizeOf(oldVal) > maxSizeOf(oldVal)) {
1460                                 // try to exit this thread, if we are allowed
1461                                 if (task == EXIT ||
1462                                         isShutdownRequested(oldVal) ||
1463                                         isAllowCoreTimeout(oldVal) ||
1464                                         currentSizeOf(oldVal) > coreSizeOf(oldVal)
1465                                         ) {
1466                                     if (newNode.compareAndSetTask(task, GAVE_UP)) {
1467                                         for (;;) {
1468                                             if (tryDeallocateThread(oldVal)) {
1469                                                 // clear to exit.
1470                                                 runningThreads.remove(currentThread);
1471                                                 return;
1472                                             }
1473                                             if (UPDATE_STATISTICS) spinMisses.increment();
1474                                             oldVal = threadStatus;
1475                                         }
1476                                         //throw Assert.unreachableCode();
1477                                     }
1478                                     continue waitingForTask;
1479                                 } else {
1480                                     if (elapsed >= timeoutNanos) {
1481                                         newNode.park(EnhancedQueueExecutor.this);
1482                                     } else {
1483                                         newNode.park(EnhancedQueueExecutor.this, timeoutNanos - elapsed);
1484                                     }
1485                                     Thread.interrupted();
1486                                     elapsed = System.nanoTime() - start;
1487                                     // retry inner
1488                                     continue waitingForTask;
1489                                 }
1490                                 //throw Assert.unreachableCode();
1491                             } else {
1492                                 assert task == WAITING;
1493                                 newNode.park(EnhancedQueueExecutor.this, timeoutNanos - elapsed);
1494                                 Thread.interrupted();
1495                                 elapsed = System.nanoTime() - start;
1496                                 // retry inner
1497                                 continue waitingForTask;
1498                             }
1499                             //throw Assert.unreachableCode();
1500                         }
1501                         //throw Assert.unreachableCode();
1502                     } // :waitingForTask
1503                     //throw Assert.unreachableCode();
1504                 } else {
1505                     assert node instanceof TerminateWaiterNode;
1506                     // we're shutting down!
1507                     runningThreads.remove(currentThread);
1508                     deallocateThread();
1509                     return;
1510                 }
1511                 //throw Assert.unreachableCode();
1512             } // :processingQueue
1513             //throw Assert.unreachableCode();
1514         }
1515
1516         private QNode getOrAddNode(PoolThreadNode nextPoolThreadNode) {
1517             TaskNode head;
1518             QNode headNext;
1519             for (;;) {
1520                 head = EnhancedQueueExecutor.this.head;
1521                 headNext = head.getNext();
1522                 if (headNext instanceof TaskNode) {
1523                     TaskNode taskNode = (TaskNode) headNext;
1524                     if (compareAndSetHead(head, taskNode)) {
1525                         if (! NO_QUEUE_LIMIT) decreaseQueueSize();
1526                         return taskNode;
1527                     }
1528                 } else if (headNext instanceof PoolThreadNode || headNext == null) {
1529                     nextPoolThreadNode.setNextRelaxed(headNext);
1530                     if (head.compareAndSetNext(headNext, nextPoolThreadNode)) {
1531                         return nextPoolThreadNode;
1532                     }
1533                 } else {
1534                     assert headNext instanceof TerminateWaiterNode;
1535                     return headNext;
1536                 }
1537                 if (UPDATE_STATISTICS) spinMisses.increment();
1538                 JDKSpecific.onSpinWait();
1539             }
1540         }
1541
1542         private Runnable getAndClearInitialTask() {
1543             try {
1544                 return initialTask;
1545             } finally {
1546                 this.initialTask = null;
1547             }
1548         }
1549
1550         void doRunTask(final Runnable task) {
1551             if (task != null) {
1552                 if (isShutdownInterrupt(threadStatus)) {
1553                     Thread.currentThread().interrupt();
1554                 } else {
1555                     Thread.interrupted();
1556                 }
1557                 if (UPDATE_ACTIVE_COUNT) incrementActiveCount();
1558                 safeRun(task);
1559                 if (UPDATE_ACTIVE_COUNT) {
1560                     decrementActiveCount();
1561                     if (UPDATE_STATISTICS) {
1562                         completedTaskCounter.increment();
1563                     }
1564                 }
1565             }
1566         }
1567     }
1568
1569     // =======================================================
1570     // Thread starting
1571     // =======================================================
1572
1573     /**
1574      * Allocate a new thread.
1575      *
1576      * @param growthResistance the growth resistance to apply
1577      * @return {@code AT_YES} if a thread is allocated; {@code AT_NO} if a thread was not allocated; {@code AT_SHUTDOWN} if the pool is being shut down
1578      */

1579     int tryAllocateThread(final float growthResistance) {
1580         int oldSize;
1581         long oldStat;
1582         for (;;) {
1583             oldStat = threadStatus;
1584             if (isShutdownRequested(oldStat)) {
1585                 return AT_SHUTDOWN;
1586             }
1587             oldSize = currentSizeOf(oldStat);
1588             if (oldSize >= maxSizeOf(oldStat)) {
1589                 // max threads already reached
1590                 return AT_NO;
1591             }
1592             if (oldSize >= coreSizeOf(oldStat) && oldSize > 0) {
1593                 // core threads are reached; check resistance factor (if no threads are running, then always start a thread)
1594                 if (growthResistance != 0.0f && (growthResistance == 1.0f || ThreadLocalRandom.current().nextFloat() < growthResistance)) {
1595                     // do not create a thread this time
1596                     return AT_NO;
1597                 }
1598             }
1599             // try to increase
1600             final int newSize = oldSize + 1;
1601             // state change ex3:
1602             //   threadStatus.size ← threadStatus(snapshot).size + 1
1603             // cannot succeed: sh1
1604             // succeeds: -
1605             // preconditions:
1606             //   ! threadStatus(snapshot).shutdownRequested
1607             //   threadStatus(snapshot).size < threadStatus(snapshot).maxSize
1608             //   threadStatus(snapshot).size < threadStatus(snapshot).coreSize || random < growthResistance
1609             // post-actions (fail):
1610             //   retry whole loop
1611             if (compareAndSetThreadStatus(oldStat, withCurrentSize(oldStat, newSize))) {
1612                 // increment peak thread count
1613                 if (UPDATE_STATISTICS) {
1614                     int oldVal;
1615                     do {
1616                         oldVal = peakThreadCount;
1617                         if (oldVal >= newSize) break;
1618                     } while (! compareAndSetPeakThreadCount(oldVal, newSize));
1619                 }
1620                 return AT_YES;
1621             }
1622             if (UPDATE_STATISTICS) spinMisses.increment();
1623         }
1624     }
1625
1626     /**
1627      * Roll back a thread allocation, possibly terminating the pool.  Only call after {@link #tryAllocateThread(float)} returns {@link #AT_YES}.
1628      */

1629     void deallocateThread() {
1630         long oldStat;
1631         do {
1632             oldStat = threadStatus;
1633         } while (! tryDeallocateThread(oldStat));
1634     }
1635
1636     /**
1637      * Try to roll back a thread allocation, possibly running the termination task if the pool would be terminated
1638      * by last thread exit.
1639      *
1640      * @param oldStat the {@code threadStatus} to CAS
1641      * @return {@code trueif the thread was deallocated, or {@code false} to retry with a new {@code oldStat}
1642      */

1643     boolean tryDeallocateThread(long oldStat) {
1644         // roll back our thread allocation attempt
1645         // state change ex4:
1646         //   threadStatus.size ← threadStatus.size - 1
1647         // succeeds: ex3
1648         // preconditions:
1649         //   threadStatus.size > 0
1650         long newStat = withCurrentSize(oldStat, currentSizeOf(oldStat) - 1);
1651         if (currentSizeOf(newStat) == 0 && isShutdownRequested(oldStat)) {
1652             newStat = withShutdownComplete(newStat);
1653         }
1654         if (! compareAndSetThreadStatus(oldStat, newStat)) return false;
1655         if (isShutdownComplete(newStat)) {
1656             completeTermination();
1657         }
1658         return true;
1659     }
1660
1661     /**
1662      * Start an allocated thread.
1663      *
1664      * @param runnable the task or {@code null}
1665      * @return {@code trueif the thread was started, {@code false} otherwise
1666      * @throws RejectedExecutionException if {@code runnable} is not {@code null} and the thread could not be created or started
1667      */

1668     boolean doStartThread(Runnable runnable) throws RejectedExecutionException {
1669         Thread thread;
1670         try {
1671             thread = threadFactory.newThread(new ThreadBody(runnable));
1672         } catch (Throwable t) {
1673             if (runnable != null) {
1674                 if (UPDATE_STATISTICS) rejectedTaskCounter.increment();
1675                 rejectException(runnable, t);
1676             }
1677             return false;
1678         }
1679         if (thread == null) {
1680             if (runnable != null) {
1681                 if (UPDATE_STATISTICS) rejectedTaskCounter.increment();
1682                 rejectNoThread(runnable);
1683             }
1684             return false;
1685         }
1686         try {
1687             thread.start();
1688         } catch (Throwable t) {
1689             if (runnable != null) {
1690                 if (UPDATE_STATISTICS) rejectedTaskCounter.increment();
1691                 rejectException(runnable, t);
1692             }
1693             return false;
1694         }
1695         return true;
1696     }
1697
1698     // =======================================================
1699     // Task submission
1700     // =======================================================
1701
1702     private int tryExecute(final Runnable runnable) {
1703         QNode tailNext;
1704         if (TAIL_LOCK) lockTail();
1705         TaskNode tail = this.tail;
1706         TaskNode node = null;
1707         for (;;) {
1708             tailNext = tail.getNext();
1709             if (tailNext instanceof TaskNode) {
1710                 TaskNode tailNextTaskNode;
1711                 do {
1712                     if (UPDATE_STATISTICS) spinMisses.increment();
1713                     tailNextTaskNode = (TaskNode) tailNext;
1714                     // retry
1715                     tail = tailNextTaskNode;
1716                     tailNext = tail.getNext();
1717                 } while (tailNext instanceof TaskNode);
1718                 // opportunistically update for the possible benefit of other threads
1719                 if (UPDATE_TAIL) compareAndSetTail(tail, tailNextTaskNode);
1720             }
1721             // we've progressed to the first non-task node, as far as we can see
1722             assert ! (tailNext instanceof TaskNode);
1723             if (tailNext instanceof PoolThreadNode) {
1724                 final QNode tailNextNext = tailNext.getNext();
1725                 // state change ex1:
1726                 //   tail(snapshot).next ← tail(snapshot).next(snapshot).next(snapshot)
1727                 // succeeds: -
1728                 // cannot succeed: sh2
1729                 // preconditions:
1730                 //   tail(snapshot) is a dead TaskNode
1731                 //   tail(snapshot).next is PoolThreadNode
1732                 //   tail(snapshot).next.next* is PoolThreadNode or null
1733                 // additional success postconditions: -
1734                 // failure postconditions: -
1735                 // post-actions (succeed):
1736                 //   run state change ex2
1737                 // post-actions (fail):
1738                 //   retry with new tail(snapshot)
1739                 if (tail.compareAndSetNext(tailNext, tailNextNext)) {
1740                     assert tail instanceof TaskNode && tail.task == null;
1741                     PoolThreadNode consumerNode = (PoolThreadNode) tailNext;
1742                     // state change ex2:
1743                     //   tail(snapshot).next(snapshot).task ← runnable
1744                     // succeeds: ex1
1745                     // preconditions:
1746                     //   tail(snapshot).next(snapshot).task = WAITING
1747                     // post-actions (succeed):
1748                     //   unpark thread and return
1749                     // post-actions (fail):
1750                     //   retry outer with new tail(snapshot)
1751                     if (consumerNode.compareAndSetTask(WAITING, runnable)) {
1752                         if (TAIL_LOCK) unlockTail();
1753                         consumerNode.unpark();
1754                         return EXE_OK;
1755                     }
1756                     // otherwise the consumer gave up or was exited already, so fall out and...
1757                 }
1758                 if (UPDATE_STATISTICS) spinMisses.increment();
1759                 // retry with new tail(snapshot) as was foretold
1760                 tail = this.tail;
1761             } else if (tailNext == null) {
1762                 // no consumers available; maybe we can start one
1763                 int tr = tryAllocateThread(growthResistance);
1764                 if (tr == AT_YES) {
1765                     if (TAIL_LOCK) unlockTail();
1766                     return EXE_CREATE_THREAD;
1767                 }
1768                 if (tr == AT_SHUTDOWN) {
1769                     if (TAIL_LOCK) unlockTail();
1770                     return EXE_REJECT_SHUTDOWN;
1771                 }
1772                 assert tr == AT_NO;
1773                 // no; try to enqueue
1774                 if (! NO_QUEUE_LIMIT && ! increaseQueueSize()) {
1775                     // queue is full
1776                     // OK last effort to create a thread, disregarding growth limit
1777                     tr = tryAllocateThread(0.0f);
1778                     if (TAIL_LOCK) unlockTail();
1779                     if (tr == AT_YES) {
1780                         return EXE_CREATE_THREAD;
1781                     }
1782                     if (tr == AT_SHUTDOWN) {
1783                         return EXE_REJECT_SHUTDOWN;
1784                     }
1785                     assert tr == AT_NO;
1786                     return EXE_REJECT_QUEUE_FULL;
1787                 }
1788                 // queue size increased successfully; we can add to the list
1789                 if (node == null) {
1790                     // avoid re-allocating TaskNode instances on subsequent iterations
1791                     node = new TaskNode(runnable);
1792                 }
1793                 // state change ex5:
1794                 //   tail(snapshot).next ← new task node
1795                 // cannot succeed: sh2
1796                 // preconditions:
1797                 //   tail(snapshot).next = null
1798                 //   ex3 failed precondition
1799                 //   queue size increased to accommodate node
1800                 // postconditions (success):
1801                 //   tail(snapshot).next = new task node
1802                 if (tail.compareAndSetNext(null, node)) {
1803                     // try to update tail to the new node; if this CAS fails then tail already points at past the node
1804                     // this is because tail can only ever move forward, and the task list is always strongly connected
1805                     compareAndSetTail(tail, node);
1806                     if (TAIL_LOCK) unlockTail();
1807                     return EXE_OK;
1808                 }
1809                 // we failed; we have to drop the queue size back down again to compensate before we can retry
1810                 if (! NO_QUEUE_LIMIT) decreaseQueueSize();
1811                 if (UPDATE_STATISTICS) spinMisses.increment();
1812                 // retry with new tail(snapshot)
1813                 tail = this.tail;
1814             } else {
1815                 if (TAIL_LOCK) unlockTail();
1816                 // no consumers are waiting and the tail(snapshot).next node is non-null and not a task node, therefore it must be a...
1817                 assert tailNext instanceof TerminateWaiterNode;
1818                 // shutting down
1819                 return EXE_REJECT_SHUTDOWN;
1820             }
1821         }
1822         // not reached
1823     }
1824
1825     // =======================================================
1826     // Termination task
1827     // =======================================================
1828
1829     void completeTermination() {
1830         // be kind and un-interrupt the thread for the termination task
1831         Thread.interrupted();
1832         final Runnable terminationTask = this.terminationTask;
1833         this.terminationTask = null;
1834         safeRun(terminationTask);
1835         // notify all waiters
1836         Waiter waiters = getAndSetTerminationWaiters(TERMINATE_COMPLETE_WAITER);
1837         while (waiters != null) {
1838             unpark(waiters.getThread());
1839             waiters = waiters.getNext();
1840         }
1841         tail.setNext(TERMINATE_COMPLETE);
1842         if (! DISABLE_MBEAN) {
1843             final Object handle = this.handle;
1844             if (handle != null) {
1845                 doPrivileged(new MBeanUnregisterAction(handle), acc);
1846             }
1847         }
1848     }
1849
1850     // =======================================================
1851     // Compare-and-set operations
1852     // =======================================================
1853
1854     void incrementActiveCount() {
1855         unsafe.getAndAddInt(this, activeCountOffset, 1);
1856     }
1857
1858     void decrementActiveCount() {
1859         unsafe.getAndAddInt(this, activeCountOffset, -1);
1860     }
1861
1862     boolean compareAndSetPeakThreadCount(final int expect, final int update) {
1863         return unsafe.compareAndSwapInt(this, peakThreadCountOffset, expect, update);
1864     }
1865
1866     boolean compareAndSetPeakQueueSize(final int expect, final int update) {
1867         return unsafe.compareAndSwapInt(this, peakQueueSizeOffset, expect, update);
1868     }
1869
1870     boolean compareAndSetQueueSize(final long expect, final long update) {
1871         return unsafe.compareAndSwapLong(this, queueSizeOffset, expect, update);
1872     }
1873
1874     boolean compareAndSetTerminationWaiters(final Waiter expect, final Waiter update) {
1875         return unsafe.compareAndSwapObject(this, terminationWaitersOffset, expect, update);
1876     }
1877
1878     Waiter getAndSetTerminationWaiters(final Waiter update) {
1879         return (Waiter) unsafe.getAndSetObject(this, terminationWaitersOffset, update);
1880     }
1881
1882     // =======================================================
1883     // Queue size operations
1884     // =======================================================
1885
1886     boolean increaseQueueSize() {
1887         long oldVal = queueSize;
1888         int oldSize = currentQueueSizeOf(oldVal);
1889         if (oldSize >= maxQueueSizeOf(oldVal)) {
1890             // reject
1891             return false;
1892         }
1893         int newSize = oldSize + 1;
1894         while (! compareAndSetQueueSize(oldVal, withCurrentQueueSize(oldVal, newSize))) {
1895             if (UPDATE_STATISTICS) spinMisses.increment();
1896             oldVal = queueSize;
1897             oldSize = currentQueueSizeOf(oldVal);
1898             if (oldSize >= maxQueueSizeOf(oldVal)) {
1899                 // reject
1900                 return false;
1901             }
1902             newSize = oldSize + 1;
1903         }
1904         if (UPDATE_STATISTICS) {
1905             do {
1906                 // oldSize now represents the old peak size
1907                 oldSize = peakQueueSize;
1908                 if (newSize <= oldSize) break;
1909             } while (! compareAndSetPeakQueueSize(oldSize, newSize));
1910         }
1911         return true;
1912     }
1913
1914     void decreaseQueueSize() {
1915         long oldVal;
1916         oldVal = queueSize;
1917         assert currentQueueSizeOf(oldVal) > 0;
1918         while (! compareAndSetQueueSize(oldVal, withCurrentQueueSize(oldVal, currentQueueSizeOf(oldVal) - 1))) {
1919             if (UPDATE_STATISTICS) spinMisses.increment();
1920             oldVal = queueSize;
1921             assert currentQueueSizeOf(oldVal) > 0;
1922         }
1923     }
1924
1925     // =======================================================
1926     // Inline Functions
1927     // =======================================================
1928
1929     static int currentQueueSizeOf(long queueSize) {
1930         return (int) (queueSize & 0x7fff_ffff);
1931     }
1932
1933     static long withCurrentQueueSize(long queueSize, int current) {
1934         assert current >= 0;
1935         return queueSize & 0xffff_ffff_0000_0000L | current;
1936     }
1937
1938     static int maxQueueSizeOf(long queueSize) {
1939         return (int) (queueSize >>> 32 & 0x7fff_ffff);
1940     }
1941
1942     static long withMaxQueueSize(long queueSize, int max) {
1943         assert max >= 0;
1944         return queueSize & 0xffff_ffffL | (long)max << 32;
1945     }
1946
1947     static int coreSizeOf(long status) {
1948         return (int) (status >>> TS_CORE_SHIFT & TS_THREAD_CNT_MASK);
1949     }
1950
1951     static int maxSizeOf(long status) {
1952         return (int) (status >>> TS_MAX_SHIFT & TS_THREAD_CNT_MASK);
1953     }
1954
1955     static int currentSizeOf(long status) {
1956         return (int) (status >>> TS_CURRENT_SHIFT & TS_THREAD_CNT_MASK);
1957     }
1958
1959     static long withCoreSize(long status, int newCoreSize) {
1960         assert 0 <= newCoreSize && newCoreSize <= TS_THREAD_CNT_MASK;
1961         return status & ~(TS_THREAD_CNT_MASK << TS_CORE_SHIFT) | (long)newCoreSize << TS_CORE_SHIFT;
1962     }
1963
1964     static long withCurrentSize(long status, int newCurrentSize) {
1965         assert 0 <= newCurrentSize && newCurrentSize <= TS_THREAD_CNT_MASK;
1966         return status & ~(TS_THREAD_CNT_MASK << TS_CURRENT_SHIFT) | (long)newCurrentSize << TS_CURRENT_SHIFT;
1967     }
1968
1969     static long withMaxSize(long status, int newMaxSize) {
1970         assert 0 <= newMaxSize && newMaxSize <= TS_THREAD_CNT_MASK;
1971         return status & ~(TS_THREAD_CNT_MASK << TS_MAX_SHIFT) | (long)newMaxSize << TS_MAX_SHIFT;
1972     }
1973
1974     static long withShutdownRequested(final long status) {
1975         return status | TS_SHUTDOWN_REQUESTED;
1976     }
1977
1978     static long withShutdownComplete(final long status) {
1979         return status | TS_SHUTDOWN_COMPLETE;
1980     }
1981
1982     static long withShutdownInterrupt(final long status) {
1983         return status | TS_SHUTDOWN_INTERRUPT;
1984     }
1985
1986     static long withAllowCoreTimeout(final long status, final boolean allowed) {
1987         return allowed ? status | TS_ALLOW_CORE_TIMEOUT : status & ~TS_ALLOW_CORE_TIMEOUT;
1988     }
1989
1990     static boolean isShutdownRequested(final long status) {
1991         return (status & TS_SHUTDOWN_REQUESTED) != 0;
1992     }
1993
1994     static boolean isShutdownComplete(final long status) {
1995         return (status & TS_SHUTDOWN_COMPLETE) != 0;
1996     }
1997
1998     static boolean isShutdownInterrupt(final long threadStatus) {
1999         return (threadStatus & TS_SHUTDOWN_INTERRUPT) != 0;
2000     }
2001
2002     static boolean isAllowCoreTimeout(final long oldVal) {
2003         return (oldVal & TS_ALLOW_CORE_TIMEOUT) != 0;
2004     }
2005
2006     // =======================================================
2007     // Static configuration
2008     // =======================================================
2009
2010     // =======================================================
2011     // Utilities
2012     // =======================================================
2013
2014     void safeRun(final Runnable task) {
2015         if (task == nullreturn;
2016         final Thread currentThread = Thread.currentThread();
2017         JBossExecutors.clearContextClassLoader(currentThread);
2018         try {
2019             task.run();
2020         } catch (Throwable t) {
2021             try {
2022                 exceptionHandler.uncaughtException(Thread.currentThread(), t);
2023             } catch (Throwable ignored) {
2024                 // nothing else we can safely do here
2025             }
2026         } finally {
2027             JBossExecutors.clearContextClassLoader(currentThread);
2028             // clear interrupt status
2029             Thread.interrupted();
2030         }
2031     }
2032
2033     void rejectException(final Runnable task, final Throwable cause) {
2034         try {
2035             handoffExecutor.execute(task);
2036         } catch (Throwable t) {
2037             t.addSuppressed(cause);
2038             throw t;
2039         }
2040     }
2041
2042     void rejectNoThread(final Runnable task) {
2043         try {
2044             handoffExecutor.execute(task);
2045         } catch (Throwable t) {
2046             t.addSuppressed(new RejectedExecutionException("No threads available"));
2047             throw t;
2048         }
2049     }
2050
2051     void rejectQueueFull(final Runnable task) {
2052         try {
2053             handoffExecutor.execute(task);
2054         } catch (Throwable t) {
2055             t.addSuppressed(new RejectedExecutionException("Queue is full"));
2056             throw t;
2057         }
2058     }
2059
2060     void rejectShutdown(final Runnable task) {
2061         try {
2062             handoffExecutor.execute(task);
2063         } catch (Throwable t) {
2064             t.addSuppressed(new RejectedExecutionException("Executor is being shut down"));
2065             throw t;
2066         }
2067     }
2068
2069     // =======================================================
2070     // Node classes
2071     // =======================================================
2072
2073     abstract static class QNode {
2074         private static final long nextOffset;
2075
2076         static {
2077             try {
2078                 nextOffset = unsafe.objectFieldOffset(QNode.class.getDeclaredField("next"));
2079             } catch (NoSuchFieldException e) {
2080                 throw new NoSuchFieldError(e.getMessage());
2081             }
2082         }
2083
2084         @SuppressWarnings("unused")
2085         private volatile QNode next;
2086
2087         boolean compareAndSetNext(QNode expect, QNode update) {
2088             return unsafe.compareAndSwapObject(this, nextOffset, expect, update);
2089         }
2090
2091         QNode getNext() {
2092             return next;
2093         }
2094
2095         void setNext(final QNode node) {
2096             next = node;
2097         }
2098
2099         void setNextRelaxed(final QNode node) {
2100             unsafe.putObject(this, nextOffset, node);
2101         }
2102     }
2103
2104     /** Padding between PoolThreadNode task and parked fields and QNode.next */
2105     static abstract class PoolThreadNodeBase extends QNode {
2106         /**
2107          * Padding fields.
2108          */

2109         @SuppressWarnings("unused")
2110         int p00, p01, p02, p03,
2111             p04, p05, p06, p07,
2112             p08, p09, p0A, p0B,
2113             p0C, p0D, p0E, p0F;
2114
2115         PoolThreadNodeBase() {}
2116     }
2117
2118     static final class PoolThreadNode extends PoolThreadNodeBase {
2119
2120         /**
2121          * Thread is running normally.
2122          */

2123         private static final int STATE_NORMAL = 0;
2124
2125         /**
2126          * Thread is parked (or about to park), and unpark call is necessary to wake the thread
2127          */

2128         private static final int STATE_PARKED = 1;
2129
2130         /**
2131          * The thread has been unparked, any thread that is spinning or about to park will return,
2132          * if not thread is currently spinning the next thread that attempts to spin will immediately return
2133          */

2134         private static final int STATE_UNPARKED = 2;
2135
2136
2137         private static final long taskOffset;
2138         private static final long parkedOffset;
2139
2140         static {
2141             try {
2142                 taskOffset = unsafe.objectFieldOffset(PoolThreadNode.class.getDeclaredField("task"));
2143                 parkedOffset = unsafe.objectFieldOffset(PoolThreadNode.class.getDeclaredField("parked"));
2144             } catch (NoSuchFieldException e) {
2145                 throw new NoSuchFieldError(e.getMessage());
2146             }
2147         }
2148
2149         private final Thread thread;
2150
2151         @SuppressWarnings("unused")
2152         private volatile Runnable task;
2153
2154         /**
2155          * The park state, see the STATE_ constants for the meanings of each value
2156          */

2157         @SuppressWarnings("unused")
2158         private volatile int parked;
2159
2160         PoolThreadNode(final Thread thread) {
2161             this.thread = thread;
2162             task = WAITING;
2163         }
2164
2165         boolean compareAndSetTask(final Runnable expect, final Runnable update) {
2166             return task == expect && unsafe.compareAndSwapObject(this, taskOffset, expect, update);
2167         }
2168
2169         Runnable getTask() {
2170             return task;
2171         }
2172
2173         PoolThreadNode getNext() {
2174             return (PoolThreadNode) super.getNext();
2175         }
2176
2177         void park(EnhancedQueueExecutor enhancedQueueExecutor) {
2178             int spins = PARK_SPINS;
2179             if (parked == STATE_UNPARKED && unsafe.compareAndSwapInt(this, parkedOffset, STATE_UNPARKED, STATE_NORMAL)) {
2180                 return;
2181             }
2182             while (spins > 0) {
2183                 if (spins < YIELD_FACTOR) {
2184                     Thread.yield();
2185                 } else {
2186                     JDKSpecific.onSpinWait();
2187                 }
2188                 spins--;
2189                 if (parked == STATE_UNPARKED && unsafe.compareAndSwapInt(this, parkedOffset, STATE_UNPARKED, STATE_NORMAL)) {
2190                     return;
2191                 }
2192             }
2193             if (parked == STATE_NORMAL && unsafe.compareAndSwapInt(this, parkedOffset, STATE_NORMAL, STATE_PARKED)) try {
2194                 LockSupport.park(enhancedQueueExecutor);
2195             } finally {
2196                 // parked can be STATE_PARKED or STATE_UNPARKED, cannot possibly be STATE_NORMAL.
2197                 // if it's STATE_PARKED, we'd go back to NORMAL since we're not parking anymore.
2198                 // if it's STATE_UNPARKED, we can still go back to NORMAL because all of our preconditions will be rechecked anyway.
2199                 parked = STATE_NORMAL;
2200             }
2201         }
2202
2203         void park(EnhancedQueueExecutor enhancedQueueExecutor, long nanos) {
2204             long remaining;
2205             int spins = PARK_SPINS;
2206             if (spins > 0) {
2207                 long start = System.nanoTime();
2208                 //note that we don't check the nanotime while spinning
2209                 //as spin time is short and for our use cases it does not matter if the time
2210                 //overruns a bit (as the nano time is for thread timeout) we just spin then check
2211                 //to keep performance consistent between the two versions.
2212                 if (parked == STATE_UNPARKED && unsafe.compareAndSwapInt(this, parkedOffset, STATE_UNPARKED, STATE_NORMAL)) {
2213                     return;
2214                 }
2215                 while (spins > 0) {
2216                     if (spins < YIELD_FACTOR) {
2217                         Thread.yield();
2218                     } else {
2219                         JDKSpecific.onSpinWait();
2220                     }
2221                     if (parked == STATE_UNPARKED && unsafe.compareAndSwapInt(this, parkedOffset, STATE_UNPARKED, STATE_NORMAL)) {
2222                         return;
2223                     }
2224                     spins--;
2225                 }
2226                 remaining = nanos - (System.nanoTime() - start);
2227                 if (remaining < 0) {
2228                     return;
2229                 }
2230             } else {
2231                 remaining = nanos;
2232             }
2233             if (parked == STATE_NORMAL && unsafe.compareAndSwapInt(this, parkedOffset, STATE_NORMAL, STATE_PARKED)) try {
2234                 LockSupport.parkNanos(enhancedQueueExecutor, remaining);
2235             } finally {
2236                 // parked can be STATE_PARKED or STATE_UNPARKED, cannot possibly be STATE_NORMAL.
2237                 // if it's STATE_PARKED, we'd go back to NORMAL since we're not parking anymore.
2238                 // if it's STATE_UNPARKED, we can still go back to NORMAL because all of our preconditions will be rechecked anyway.
2239                 parked = STATE_NORMAL;
2240             }
2241         }
2242
2243         void unpark() {
2244             if (parked == STATE_NORMAL && unsafe.compareAndSwapInt(this, parkedOffset, STATE_NORMAL, STATE_UNPARKED)) {
2245                 return;
2246             }
2247             LockSupport.unpark(thread);
2248         }
2249     }
2250
2251     static final class TerminateWaiterNode extends QNode {
2252     }
2253
2254     static final class TaskNode extends QNode {
2255         volatile Runnable task;
2256
2257         TaskNode(final Runnable task) {
2258             // we always start task nodes with a {@code null} next
2259             this.task = task;
2260         }
2261
2262         Runnable getAndClearTask() {
2263             try {
2264                 return task;
2265             } finally {
2266                 this.task = null;
2267             }
2268         }
2269     }
2270
2271     // =======================================================
2272     // Management bean implementation
2273     // =======================================================
2274
2275     final class MXBeanImpl implements StandardThreadPoolMXBean {
2276         MXBeanImpl() {
2277         }
2278
2279         public float getGrowthResistance() {
2280             return EnhancedQueueExecutor.this.getGrowthResistance();
2281         }
2282
2283         public void setGrowthResistance(final float value) {
2284             EnhancedQueueExecutor.this.setGrowthResistance(value);
2285         }
2286
2287         public boolean isGrowthResistanceSupported() {
2288             return true;
2289         }
2290
2291         public int getCorePoolSize() {
2292             return EnhancedQueueExecutor.this.getCorePoolSize();
2293         }
2294
2295         public void setCorePoolSize(final int corePoolSize) {
2296             EnhancedQueueExecutor.this.setCorePoolSize(corePoolSize);
2297         }
2298
2299         public boolean isCorePoolSizeSupported() {
2300             return true;
2301         }
2302
2303         public boolean prestartCoreThread() {
2304             return EnhancedQueueExecutor.this.prestartCoreThread();
2305         }
2306
2307         public int prestartAllCoreThreads() {
2308             return EnhancedQueueExecutor.this.prestartAllCoreThreads();
2309         }
2310
2311         public boolean isCoreThreadPrestartSupported() {
2312             return true;
2313         }
2314
2315         public int getMaximumPoolSize() {
2316             return EnhancedQueueExecutor.this.getMaximumPoolSize();
2317         }
2318
2319         public void setMaximumPoolSize(final int maxPoolSize) {
2320             EnhancedQueueExecutor.this.setMaximumPoolSize(maxPoolSize);
2321         }
2322
2323         public int getPoolSize() {
2324             return EnhancedQueueExecutor.this.getPoolSize();
2325         }
2326
2327         public int getLargestPoolSize() {
2328             return EnhancedQueueExecutor.this.getLargestPoolSize();
2329         }
2330
2331         public int getActiveCount() {
2332             return EnhancedQueueExecutor.this.getActiveCount();
2333         }
2334
2335         public boolean isAllowCoreThreadTimeOut() {
2336             return EnhancedQueueExecutor.this.allowsCoreThreadTimeOut();
2337         }
2338
2339         public void setAllowCoreThreadTimeOut(final boolean value) {
2340             EnhancedQueueExecutor.this.allowCoreThreadTimeOut(value);
2341         }
2342
2343         public long getKeepAliveTimeSeconds() {
2344             return EnhancedQueueExecutor.this.getKeepAliveTime().getSeconds();
2345         }
2346
2347         public void setKeepAliveTimeSeconds(final long seconds) {
2348             EnhancedQueueExecutor.this.setKeepAliveTime(Duration.of(seconds, ChronoUnit.SECONDS));
2349         }
2350
2351         public int getMaximumQueueSize() {
2352             return EnhancedQueueExecutor.this.getMaximumQueueSize();
2353         }
2354
2355         public void setMaximumQueueSize(final int size) {
2356             EnhancedQueueExecutor.this.setMaximumQueueSize(size);
2357         }
2358
2359         public int getQueueSize() {
2360             return EnhancedQueueExecutor.this.getQueueSize();
2361         }
2362
2363         public int getLargestQueueSize() {
2364             return EnhancedQueueExecutor.this.getLargestQueueSize();
2365         }
2366
2367         public boolean isQueueBounded() {
2368             return ! NO_QUEUE_LIMIT;
2369         }
2370
2371         public boolean isQueueSizeModifiable() {
2372             return ! NO_QUEUE_LIMIT;
2373         }
2374
2375         public boolean isShutdown() {
2376             return EnhancedQueueExecutor.this.isShutdown();
2377         }
2378
2379         public boolean isTerminating() {
2380             return EnhancedQueueExecutor.this.isTerminating();
2381         }
2382
2383         public boolean isTerminated() {
2384             return EnhancedQueueExecutor.this.isTerminated();
2385         }
2386
2387         public long getSubmittedTaskCount() {
2388             return EnhancedQueueExecutor.this.getSubmittedTaskCount();
2389         }
2390
2391         public long getRejectedTaskCount() {
2392             return EnhancedQueueExecutor.this.getRejectedTaskCount();
2393         }
2394
2395         public long getCompletedTaskCount() {
2396             return EnhancedQueueExecutor.this.getCompletedTaskCount();
2397         }
2398
2399         public long getSpinMissCount() {
2400             return EnhancedQueueExecutor.this.spinMisses.longValue();
2401         }
2402     }
2403 }
2404