1 /*
2  * Copyright 2012 The Netty Project
3  *
4  * The Netty Project licenses this file to you under the Apache License,
5  * version 2.0 (the "License"); you may not use this file except in compliance
6  * with the License. You may obtain a copy of the License at:
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13  * License for the specific language governing permissions and limitations
14  * under the License.
15  */

16 package io.netty.util.concurrent;
17
18 import io.netty.util.internal.ObjectUtil;
19 import io.netty.util.internal.PlatformDependent;
20 import io.netty.util.internal.SystemPropertyUtil;
21 import io.netty.util.internal.ThreadExecutorMap;
22 import io.netty.util.internal.UnstableApi;
23 import io.netty.util.internal.logging.InternalLogger;
24 import io.netty.util.internal.logging.InternalLoggerFactory;
25
26 import java.lang.Thread.State;
27 import java.util.ArrayList;
28 import java.util.Collection;
29 import java.util.LinkedHashSet;
30 import java.util.List;
31 import java.util.Queue;
32 import java.util.Set;
33 import java.util.concurrent.BlockingQueue;
34 import java.util.concurrent.Callable;
35 import java.util.concurrent.CountDownLatch;
36 import java.util.concurrent.ExecutionException;
37 import java.util.concurrent.Executor;
38 import java.util.concurrent.LinkedBlockingQueue;
39 import java.util.concurrent.RejectedExecutionException;
40 import java.util.concurrent.ThreadFactory;
41 import java.util.concurrent.TimeUnit;
42 import java.util.concurrent.TimeoutException;
43 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
44 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
45
46 /**
47  * Abstract base class for {@link OrderedEventExecutor}'s that execute all its submitted tasks in a single thread.
48  *
49  */

50 public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
51
52     static final int DEFAULT_MAX_PENDING_EXECUTOR_TASKS = Math.max(16,
53             SystemPropertyUtil.getInt("io.netty.eventexecutor.maxPendingTasks", Integer.MAX_VALUE));
54
55     private static final InternalLogger logger =
56             InternalLoggerFactory.getInstance(SingleThreadEventExecutor.class);
57
58     private static final int ST_NOT_STARTED = 1;
59     private static final int ST_STARTED = 2;
60     private static final int ST_SHUTTING_DOWN = 3;
61     private static final int ST_SHUTDOWN = 4;
62     private static final int ST_TERMINATED = 5;
63
64     private static final Runnable NOOP_TASK = new Runnable() {
65         @Override
66         public void run() {
67             // Do nothing.
68         }
69     };
70
71     private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER =
72             AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class"state");
73     private static final AtomicReferenceFieldUpdater<SingleThreadEventExecutor, ThreadProperties> PROPERTIES_UPDATER =
74             AtomicReferenceFieldUpdater.newUpdater(
75                     SingleThreadEventExecutor.class, ThreadProperties.class"threadProperties");
76
77     private final Queue<Runnable> taskQueue;
78
79     private volatile Thread thread;
80     @SuppressWarnings("unused")
81     private volatile ThreadProperties threadProperties;
82     private final Executor executor;
83     private volatile boolean interrupted;
84
85     private final CountDownLatch threadLock = new CountDownLatch(1);
86     private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();
87     private final boolean addTaskWakesUp;
88     private final int maxPendingTasks;
89     private final RejectedExecutionHandler rejectedExecutionHandler;
90
91     private long lastExecutionTime;
92
93     @SuppressWarnings({ "FieldMayBeFinal""unused" })
94     private volatile int state = ST_NOT_STARTED;
95
96     private volatile long gracefulShutdownQuietPeriod;
97     private volatile long gracefulShutdownTimeout;
98     private long gracefulShutdownStartTime;
99
100     private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);
101
102     /**
103      * Create a new instance
104      *
105      * @param parent            the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
106      * @param threadFactory     the {@link ThreadFactory} which will be used for the used {@link Thread}
107      * @param addTaskWakesUp    {@code trueif and only if invocation of {@link #addTask(Runnable)} will wake up the
108      *                          executor thread
109      */

110     protected SingleThreadEventExecutor(
111             EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
112         this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp);
113     }
114
115     /**
116      * Create a new instance
117      *
118      * @param parent            the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
119      * @param threadFactory     the {@link ThreadFactory} which will be used for the used {@link Thread}
120      * @param addTaskWakesUp    {@code trueif and only if invocation of {@link #addTask(Runnable)} will wake up the
121      *                          executor thread
122      * @param maxPendingTasks   the maximum number of pending tasks before new tasks will be rejected.
123      * @param rejectedHandler   the {@link RejectedExecutionHandler} to use.
124      */

125     protected SingleThreadEventExecutor(
126             EventExecutorGroup parent, ThreadFactory threadFactory,
127             boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
128         this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp, maxPendingTasks, rejectedHandler);
129     }
130
131     /**
132      * Create a new instance
133      *
134      * @param parent            the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
135      * @param executor          the {@link Executor} which will be used for executing
136      * @param addTaskWakesUp    {@code trueif and only if invocation of {@link #addTask(Runnable)} will wake up the
137      *                          executor thread
138      */

139     protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp) {
140         this(parent, executor, addTaskWakesUp, DEFAULT_MAX_PENDING_EXECUTOR_TASKS, RejectedExecutionHandlers.reject());
141     }
142
143     /**
144      * Create a new instance
145      *
146      * @param parent            the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
147      * @param executor          the {@link Executor} which will be used for executing
148      * @param addTaskWakesUp    {@code trueif and only if invocation of {@link #addTask(Runnable)} will wake up the
149      *                          executor thread
150      * @param maxPendingTasks   the maximum number of pending tasks before new tasks will be rejected.
151      * @param rejectedHandler   the {@link RejectedExecutionHandler} to use.
152      */

153     protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
154                                         boolean addTaskWakesUp, int maxPendingTasks,
155                                         RejectedExecutionHandler rejectedHandler) {
156         super(parent);
157         this.addTaskWakesUp = addTaskWakesUp;
158         this.maxPendingTasks = Math.max(16, maxPendingTasks);
159         this.executor = ThreadExecutorMap.apply(executor, this);
160         taskQueue = newTaskQueue(this.maxPendingTasks);
161         rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
162     }
163
164     protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
165                                         boolean addTaskWakesUp, Queue<Runnable> taskQueue,
166                                         RejectedExecutionHandler rejectedHandler) {
167         super(parent);
168         this.addTaskWakesUp = addTaskWakesUp;
169         this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
170         this.executor = ThreadExecutorMap.apply(executor, this);
171         this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");
172         this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
173     }
174
175     /**
176      * @deprecated Please use and override {@link #newTaskQueue(int)}.
177      */

178     @Deprecated
179     protected Queue<Runnable> newTaskQueue() {
180         return newTaskQueue(maxPendingTasks);
181     }
182
183     /**
184      * Create a new {@link Queue} which will holds the tasks to execute. This default implementation will return a
185      * {@link LinkedBlockingQueue} but if your sub-class of {@link SingleThreadEventExecutor} will not do any blocking
186      * calls on the this {@link Queue} it may make sense to {@code @Override} this and return some more performant
187      * implementation that does not support blocking operations at all.
188      */

189     protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
190         return new LinkedBlockingQueue<Runnable>(maxPendingTasks);
191     }
192
193     /**
194      * Interrupt the current running {@link Thread}.
195      */

196     protected void interruptThread() {
197         Thread currentThread = thread;
198         if (currentThread == null) {
199             interrupted = true;
200         } else {
201             currentThread.interrupt();
202         }
203     }
204
205     /**
206      * @see Queue#poll()
207      */

208     protected Runnable pollTask() {
209         assert inEventLoop();
210         return pollTaskFrom(taskQueue);
211     }
212
213     protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
214         for (;;) {
215             Runnable task = taskQueue.poll();
216             if (task != WAKEUP_TASK) {
217                 return task;
218             }
219         }
220     }
221
222     /**
223      * Take the next {@link Runnable} from the task queue and so will block if no task is currently present.
224      * <p>
225      * Be aware that this method will throw an {@link UnsupportedOperationException} if the task queue, which was
226      * created via {@link #newTaskQueue()}, does not implement {@link BlockingQueue}.
227      * </p>
228      *
229      * @return {@code nullif the executor thread has been interrupted or waken up.
230      */

231     protected Runnable takeTask() {
232         assert inEventLoop();
233         if (!(taskQueue instanceof BlockingQueue)) {
234             throw new UnsupportedOperationException();
235         }
236
237         BlockingQueue<Runnable> taskQueue = (BlockingQueue<Runnable>) this.taskQueue;
238         for (;;) {
239             ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
240             if (scheduledTask == null) {
241                 Runnable task = null;
242                 try {
243                     task = taskQueue.take();
244                     if (task == WAKEUP_TASK) {
245                         task = null;
246                     }
247                 } catch (InterruptedException e) {
248                     // Ignore
249                 }
250                 return task;
251             } else {
252                 long delayNanos = scheduledTask.delayNanos();
253                 Runnable task = null;
254                 if (delayNanos > 0) {
255                     try {
256                         task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
257                     } catch (InterruptedException e) {
258                         // Waken up.
259                         return null;
260                     }
261                 }
262                 if (task == null) {
263                     // We need to fetch the scheduled tasks now as otherwise there may be a chance that
264                     // scheduled tasks are never executed if there is always one task in the taskQueue.
265                     // This is for example true for the read task of OIO Transport
266                     // See https://github.com/netty/netty/issues/1614
267                     fetchFromScheduledTaskQueue();
268                     task = taskQueue.poll();
269                 }
270
271                 if (task != null) {
272                     return task;
273                 }
274             }
275         }
276     }
277
278     private boolean fetchFromScheduledTaskQueue() {
279         if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
280             return true;
281         }
282         long nanoTime = AbstractScheduledEventExecutor.nanoTime();
283         for (;;) {
284             Runnable scheduledTask = pollScheduledTask(nanoTime);
285             if (scheduledTask == null) {
286                 return true;
287             }
288             if (!taskQueue.offer(scheduledTask)) {
289                 // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
290                 scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask);
291                 return false;
292             }
293         }
294     }
295
296     /**
297      * @return {@code trueif at least one scheduled task was executed.
298      */

299     private boolean executeExpiredScheduledTasks() {
300         if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
301             return false;
302         }
303         long nanoTime = AbstractScheduledEventExecutor.nanoTime();
304         Runnable scheduledTask = pollScheduledTask(nanoTime);
305         if (scheduledTask == null) {
306             return false;
307         }
308         do {
309             safeExecute(scheduledTask);
310         } while ((scheduledTask = pollScheduledTask(nanoTime)) != null);
311         return true;
312     }
313
314     /**
315      * @see Queue#peek()
316      */

317     protected Runnable peekTask() {
318         assert inEventLoop();
319         return taskQueue.peek();
320     }
321
322     /**
323      * @see Queue#isEmpty()
324      */

325     protected boolean hasTasks() {
326         assert inEventLoop();
327         return !taskQueue.isEmpty();
328     }
329
330     /**
331      * Return the number of tasks that are pending for processing.
332      *
333      * <strong>Be aware that this operation may be expensive as it depends on the internal implementation of the
334      * SingleThreadEventExecutor. So use it with care!</strong>
335      */

336     public int pendingTasks() {
337         return taskQueue.size();
338     }
339
340     /**
341      * Add a task to the task queue, or throws a {@link RejectedExecutionException} if this instance was shutdown
342      * before.
343      */

344     protected void addTask(Runnable task) {
345         ObjectUtil.checkNotNull(task, "task");
346         if (!offerTask(task)) {
347             reject(task);
348         }
349     }
350
351     final boolean offerTask(Runnable task) {
352         if (isShutdown()) {
353             reject();
354         }
355         return taskQueue.offer(task);
356     }
357
358     /**
359      * @see Queue#remove(Object)
360      */

361     protected boolean removeTask(Runnable task) {
362         return taskQueue.remove(ObjectUtil.checkNotNull(task, "task"));
363     }
364
365     /**
366      * Poll all tasks from the task queue and run them via {@link Runnable#run()} method.
367      *
368      * @return {@code trueif and only if at least one task was run
369      */

370     protected boolean runAllTasks() {
371         assert inEventLoop();
372         boolean fetchedAll;
373         boolean ranAtLeastOne = false;
374
375         do {
376             fetchedAll = fetchFromScheduledTaskQueue();
377             if (runAllTasksFrom(taskQueue)) {
378                 ranAtLeastOne = true;
379             }
380         } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.
381
382         if (ranAtLeastOne) {
383             lastExecutionTime = ScheduledFutureTask.nanoTime();
384         }
385         afterRunningAllTasks();
386         return ranAtLeastOne;
387     }
388
389     /**
390      * Execute all expired scheduled tasks and all current tasks in the executor queue until both queues are empty,
391      * or {@code maxDrainAttempts} has been exceeded.
392      * @param maxDrainAttempts The maximum amount of times this method attempts to drain from queues. This is to prevent
393      *                         continuous task execution and scheduling from preventing the EventExecutor thread to
394      *                         make progress and return to the selector mechanism to process inbound I/O events.
395      * @return {@code trueif at least one task was run.
396      */

397     protected final boolean runScheduledAndExecutorTasks(final int maxDrainAttempts) {
398         assert inEventLoop();
399         boolean ranAtLeastOneTask;
400         int drainAttempt = 0;
401         do {
402             // We must run the taskQueue tasks first, because the scheduled tasks from outside the EventLoop are queued
403             // here because the taskQueue is thread safe and the scheduledTaskQueue is not thread safe.
404             ranAtLeastOneTask = runExistingTasksFrom(taskQueue) | executeExpiredScheduledTasks();
405         } while (ranAtLeastOneTask && ++drainAttempt < maxDrainAttempts);
406
407         if (drainAttempt > 0) {
408             lastExecutionTime = ScheduledFutureTask.nanoTime();
409         }
410         afterRunningAllTasks();
411
412         return drainAttempt > 0;
413     }
414
415     /**
416      * Runs all tasks from the passed {@code taskQueue}.
417      *
418      * @param taskQueue To poll and execute all tasks.
419      *
420      * @return {@code trueif at least one task was executed.
421      */

422     protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
423         Runnable task = pollTaskFrom(taskQueue);
424         if (task == null) {
425             return false;
426         }
427         for (;;) {
428             safeExecute(task);
429             task = pollTaskFrom(taskQueue);
430             if (task == null) {
431                 return true;
432             }
433         }
434     }
435
436     /**
437      * What ever tasks are present in {@code taskQueue} when this method is invoked will be {@link Runnable#run()}.
438      * @param taskQueue the task queue to drain.
439      * @return {@code trueif at least {@link Runnable#run()} was called.
440      */

441     private boolean runExistingTasksFrom(Queue<Runnable> taskQueue) {
442         Runnable task = pollTaskFrom(taskQueue);
443         if (task == null) {
444             return false;
445         }
446         int remaining = Math.min(maxPendingTasks, taskQueue.size());
447         safeExecute(task);
448         // Use taskQueue.poll() directly rather than pollTaskFrom() since the latter may
449         // silently consume more than one item from the queue (skips over WAKEUP_TASK instances)
450         while (remaining-- > 0 && (task = taskQueue.poll()) != null) {
451             safeExecute(task);
452         }
453         return true;
454     }
455
456     /**
457      * Poll all tasks from the task queue and run them via {@link Runnable#run()} method.  This method stops running
458      * the tasks in the task queue and returns if it ran longer than {@code timeoutNanos}.
459      */

460     protected boolean runAllTasks(long timeoutNanos) {
461         fetchFromScheduledTaskQueue();
462         Runnable task = pollTask();
463         if (task == null) {
464             afterRunningAllTasks();
465             return false;
466         }
467
468         final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0;
469         long runTasks = 0;
470         long lastExecutionTime;
471         for (;;) {
472             safeExecute(task);
473
474             runTasks ++;
475
476             // Check timeout every 64 tasks because nanoTime() is relatively expensive.
477             // XXX: Hard-coded value - will make it configurable if it is really a problem.
478             if ((runTasks & 0x3F) == 0) {
479                 lastExecutionTime = ScheduledFutureTask.nanoTime();
480                 if (lastExecutionTime >= deadline) {
481                     break;
482                 }
483             }
484
485             task = pollTask();
486             if (task == null) {
487                 lastExecutionTime = ScheduledFutureTask.nanoTime();
488                 break;
489             }
490         }
491
492         afterRunningAllTasks();
493         this.lastExecutionTime = lastExecutionTime;
494         return true;
495     }
496
497     /**
498      * Invoked before returning from {@link #runAllTasks()} and {@link #runAllTasks(long)}.
499      */

500     @UnstableApi
501     protected void afterRunningAllTasks() { }
502
503     /**
504      * Returns the amount of time left until the scheduled task with the closest dead line is executed.
505      */

506     protected long delayNanos(long currentTimeNanos) {
507         ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
508         if (scheduledTask == null) {
509             return SCHEDULE_PURGE_INTERVAL;
510         }
511
512         return scheduledTask.delayNanos(currentTimeNanos);
513     }
514
515     /**
516      * Returns the absolute point in time (relative to {@link #nanoTime()}) at which the the next
517      * closest scheduled task should run.
518      */

519     @UnstableApi
520     protected long deadlineNanos() {
521         ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
522         if (scheduledTask == null) {
523             return nanoTime() + SCHEDULE_PURGE_INTERVAL;
524         }
525         return scheduledTask.deadlineNanos();
526     }
527
528     /**
529      * Updates the internal timestamp that tells when a submitted task was executed most recently.
530      * {@link #runAllTasks()} and {@link #runAllTasks(long)} updates this timestamp automatically, and thus there's
531      * usually no need to call this method.  However, if you take the tasks manually using {@link #takeTask()} or
532      * {@link #pollTask()}, you have to call this method at the end of task execution loop for accurate quiet period
533      * checks.
534      */

535     protected void updateLastExecutionTime() {
536         lastExecutionTime = ScheduledFutureTask.nanoTime();
537     }
538
539     /**
540      * Run the tasks in the {@link #taskQueue}
541      */

542     protected abstract void run();
543
544     /**
545      * Do nothing, sub-classes may override
546      */

547     protected void cleanup() {
548         // NOOP
549     }
550
551     protected void wakeup(boolean inEventLoop) {
552         if (!inEventLoop) {
553             // Use offer as we actually only need this to unblock the thread and if offer fails we do not care as there
554             // is already something in the queue.
555             taskQueue.offer(WAKEUP_TASK);
556         }
557     }
558
559     @Override
560     public boolean inEventLoop(Thread thread) {
561         return thread == this.thread;
562     }
563
564     /**
565      * Add a {@link Runnable} which will be executed on shutdown of this instance
566      */

567     public void addShutdownHook(final Runnable task) {
568         if (inEventLoop()) {
569             shutdownHooks.add(task);
570         } else {
571             execute(new Runnable() {
572                 @Override
573                 public void run() {
574                     shutdownHooks.add(task);
575                 }
576             });
577         }
578     }
579
580     /**
581      * Remove a previous added {@link Runnable} as a shutdown hook
582      */

583     public void removeShutdownHook(final Runnable task) {
584         if (inEventLoop()) {
585             shutdownHooks.remove(task);
586         } else {
587             execute(new Runnable() {
588                 @Override
589                 public void run() {
590                     shutdownHooks.remove(task);
591                 }
592             });
593         }
594     }
595
596     private boolean runShutdownHooks() {
597         boolean ran = false;
598         // Note shutdown hooks can add / remove shutdown hooks.
599         while (!shutdownHooks.isEmpty()) {
600             List<Runnable> copy = new ArrayList<Runnable>(shutdownHooks);
601             shutdownHooks.clear();
602             for (Runnable task: copy) {
603                 try {
604                     task.run();
605                 } catch (Throwable t) {
606                     logger.warn("Shutdown hook raised an exception.", t);
607                 } finally {
608                     ran = true;
609                 }
610             }
611         }
612
613         if (ran) {
614             lastExecutionTime = ScheduledFutureTask.nanoTime();
615         }
616
617         return ran;
618     }
619
620     @Override
621     public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
622         ObjectUtil.checkPositiveOrZero(quietPeriod, "quietPeriod");
623         if (timeout < quietPeriod) {
624             throw new IllegalArgumentException(
625                     "timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))");
626         }
627         ObjectUtil.checkNotNull(unit, "unit");
628
629         if (isShuttingDown()) {
630             return terminationFuture();
631         }
632
633         boolean inEventLoop = inEventLoop();
634         boolean wakeup;
635         int oldState;
636         for (;;) {
637             if (isShuttingDown()) {
638                 return terminationFuture();
639             }
640             int newState;
641             wakeup = true;
642             oldState = state;
643             if (inEventLoop) {
644                 newState = ST_SHUTTING_DOWN;
645             } else {
646                 switch (oldState) {
647                     case ST_NOT_STARTED:
648                     case ST_STARTED:
649                         newState = ST_SHUTTING_DOWN;
650                         break;
651                     default:
652                         newState = oldState;
653                         wakeup = false;
654                 }
655             }
656             if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
657                 break;
658             }
659         }
660         gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod);
661         gracefulShutdownTimeout = unit.toNanos(timeout);
662
663         if (ensureThreadStarted(oldState)) {
664             return terminationFuture;
665         }
666
667         if (wakeup) {
668             taskQueue.offer(WAKEUP_TASK);
669             if (!addTaskWakesUp) {
670                 wakeup(inEventLoop);
671             }
672         }
673
674         return terminationFuture();
675     }
676
677     @Override
678     public Future<?> terminationFuture() {
679         return terminationFuture;
680     }
681
682     @Override
683     @Deprecated
684     public void shutdown() {
685         if (isShutdown()) {
686             return;
687         }
688
689         boolean inEventLoop = inEventLoop();
690         boolean wakeup;
691         int oldState;
692         for (;;) {
693             if (isShuttingDown()) {
694                 return;
695             }
696             int newState;
697             wakeup = true;
698             oldState = state;
699             if (inEventLoop) {
700                 newState = ST_SHUTDOWN;
701             } else {
702                 switch (oldState) {
703                     case ST_NOT_STARTED:
704                     case ST_STARTED:
705                     case ST_SHUTTING_DOWN:
706                         newState = ST_SHUTDOWN;
707                         break;
708                     default:
709                         newState = oldState;
710                         wakeup = false;
711                 }
712             }
713             if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
714                 break;
715             }
716         }
717
718         if (ensureThreadStarted(oldState)) {
719             return;
720         }
721
722         if (wakeup) {
723             taskQueue.offer(WAKEUP_TASK);
724             if (!addTaskWakesUp) {
725                 wakeup(inEventLoop);
726             }
727         }
728     }
729
730     @Override
731     public boolean isShuttingDown() {
732         return state >= ST_SHUTTING_DOWN;
733     }
734
735     @Override
736     public boolean isShutdown() {
737         return state >= ST_SHUTDOWN;
738     }
739
740     @Override
741     public boolean isTerminated() {
742         return state == ST_TERMINATED;
743     }
744
745     /**
746      * Confirm that the shutdown if the instance should be done now!
747      */

748     protected boolean confirmShutdown() {
749         if (!isShuttingDown()) {
750             return false;
751         }
752
753         if (!inEventLoop()) {
754             throw new IllegalStateException("must be invoked from an event loop");
755         }
756
757         cancelScheduledTasks();
758
759         if (gracefulShutdownStartTime == 0) {
760             gracefulShutdownStartTime = ScheduledFutureTask.nanoTime();
761         }
762
763         if (runAllTasks() || runShutdownHooks()) {
764             if (isShutdown()) {
765                 // Executor shut down - no new tasks anymore.
766                 return true;
767             }
768
769             // There were tasks in the queue. Wait a little bit more until no tasks are queued for the quiet period or
770             // terminate if the quiet period is 0.
771             // See https://github.com/netty/netty/issues/4241
772             if (gracefulShutdownQuietPeriod == 0) {
773                 return true;
774             }
775             taskQueue.offer(WAKEUP_TASK);
776             return false;
777         }
778
779         final long nanoTime = ScheduledFutureTask.nanoTime();
780
781         if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
782             return true;
783         }
784
785         if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
786             // Check if any tasks were added to the queue every 100ms.
787             // TODO: Change the behavior of takeTask() so that it returns on timeout.
788             taskQueue.offer(WAKEUP_TASK);
789             try {
790                 Thread.sleep(100);
791             } catch (InterruptedException e) {
792                 // Ignore
793             }
794
795             return false;
796         }
797
798         // No tasks were added for last quiet period - hopefully safe to shut down.
799         // (Hopefully because we really cannot make a guarantee that there will be no execute() calls by a user.)
800         return true;
801     }
802
803     @Override
804     public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
805         ObjectUtil.checkNotNull(unit, "unit");
806         if (inEventLoop()) {
807             throw new IllegalStateException("cannot await termination of the current thread");
808         }
809
810         threadLock.await(timeout, unit);
811
812         return isTerminated();
813     }
814
815     @Override
816     public void execute(Runnable task) {
817         ObjectUtil.checkNotNull(task, "task");
818         execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
819     }
820
821     @Override
822     public void lazyExecute(Runnable task) {
823         execute(ObjectUtil.checkNotNull(task, "task"), false);
824     }
825
826     private void execute(Runnable task, boolean immediate) {
827         boolean inEventLoop = inEventLoop();
828         addTask(task);
829         if (!inEventLoop) {
830             startThread();
831             if (isShutdown()) {
832                 boolean reject = false;
833                 try {
834                     if (removeTask(task)) {
835                         reject = true;
836                     }
837                 } catch (UnsupportedOperationException e) {
838                     // The task queue does not support removal so the best thing we can do is to just move on and
839                     // hope we will be able to pick-up the task before its completely terminated.
840                     // In worst case we will log on termination.
841                 }
842                 if (reject) {
843                     reject();
844                 }
845             }
846         }
847
848         if (!addTaskWakesUp && immediate) {
849             wakeup(inEventLoop);
850         }
851     }
852
853     @Override
854     public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
855         throwIfInEventLoop("invokeAny");
856         return super.invokeAny(tasks);
857     }
858
859     @Override
860     public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
861             throws InterruptedException, ExecutionException, TimeoutException {
862         throwIfInEventLoop("invokeAny");
863         return super.invokeAny(tasks, timeout, unit);
864     }
865
866     @Override
867     public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
868             throws InterruptedException {
869         throwIfInEventLoop("invokeAll");
870         return super.invokeAll(tasks);
871     }
872
873     @Override
874     public <T> List<java.util.concurrent.Future<T>> invokeAll(
875             Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
876         throwIfInEventLoop("invokeAll");
877         return super.invokeAll(tasks, timeout, unit);
878     }
879
880     private void throwIfInEventLoop(String method) {
881         if (inEventLoop()) {
882             throw new RejectedExecutionException("Calling " + method + " from within the EventLoop is not allowed");
883         }
884     }
885
886     /**
887      * Returns the {@link ThreadProperties} of the {@link Thread} that powers the {@link SingleThreadEventExecutor}.
888      * If the {@link SingleThreadEventExecutor} is not started yet, this operation will start it and block until
889      * it is fully started.
890      */

891     public final ThreadProperties threadProperties() {
892         ThreadProperties threadProperties = this.threadProperties;
893         if (threadProperties == null) {
894             Thread thread = this.thread;
895             if (thread == null) {
896                 assert !inEventLoop();
897                 submit(NOOP_TASK).syncUninterruptibly();
898                 thread = this.thread;
899                 assert thread != null;
900             }
901
902             threadProperties = new DefaultThreadProperties(thread);
903             if (!PROPERTIES_UPDATER.compareAndSet(thisnull, threadProperties)) {
904                 threadProperties = this.threadProperties;
905             }
906         }
907
908         return threadProperties;
909     }
910
911     /**
912      * @deprecated use {@link AbstractEventExecutor.LazyRunnable}
913      */

914     @Deprecated
915     protected interface NonWakeupRunnable extends LazyRunnable { }
916
917     /**
918      * Can be overridden to control which tasks require waking the {@link EventExecutor} thread
919      * if it is waiting so that they can be run immediately.
920      */

921     protected boolean wakesUpForTask(Runnable task) {
922         return true;
923     }
924
925     protected static void reject() {
926         throw new RejectedExecutionException("event executor terminated");
927     }
928
929     /**
930      * Offers the task to the associated {@link RejectedExecutionHandler}.
931      *
932      * @param task to reject.
933      */

934     protected final void reject(Runnable task) {
935         rejectedExecutionHandler.rejected(task, this);
936     }
937
938     // ScheduledExecutorService implementation
939
940     private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1);
941
942     private void startThread() {
943         if (state == ST_NOT_STARTED) {
944             if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
945                 boolean success = false;
946                 try {
947                     doStartThread();
948                     success = true;
949                 } finally {
950                     if (!success) {
951                         STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
952                     }
953                 }
954             }
955         }
956     }
957
958     private boolean ensureThreadStarted(int oldState) {
959         if (oldState == ST_NOT_STARTED) {
960             try {
961                 doStartThread();
962             } catch (Throwable cause) {
963                 STATE_UPDATER.set(this, ST_TERMINATED);
964                 terminationFuture.tryFailure(cause);
965
966                 if (!(cause instanceof Exception)) {
967                     // Also rethrow as it may be an OOME for example
968                     PlatformDependent.throwException(cause);
969                 }
970                 return true;
971             }
972         }
973         return false;
974     }
975
976     private void doStartThread() {
977         assert thread == null;
978         executor.execute(new Runnable() {
979             @Override
980             public void run() {
981                 thread = Thread.currentThread();
982                 if (interrupted) {
983                     thread.interrupt();
984                 }
985
986                 boolean success = false;
987                 updateLastExecutionTime();
988                 try {
989                     SingleThreadEventExecutor.this.run();
990                     success = true;
991                 } catch (Throwable t) {
992                     logger.warn("Unexpected exception from an event executor: ", t);
993                 } finally {
994                     for (;;) {
995                         int oldState = state;
996                         if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
997                                 SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
998                             break;
999                         }
1000                     }
1001
1002                     // Check if confirmShutdown() was called at the end of the loop.
1003                     if (success && gracefulShutdownStartTime == 0) {
1004                         if (logger.isErrorEnabled()) {
1005                             logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
1006                                     SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
1007                                     "be called before run() implementation terminates.");
1008                         }
1009                     }
1010
1011                     try {
1012                         // Run all remaining tasks and shutdown hooks. At this point the event loop
1013                         // is in ST_SHUTTING_DOWN state still accepting tasks which is needed for
1014                         // graceful shutdown with quietPeriod.
1015                         for (;;) {
1016                             if (confirmShutdown()) {
1017                                 break;
1018                             }
1019                         }
1020
1021                         // Now we want to make sure no more tasks can be added from this point. This is
1022                         // achieved by switching the state. Any new tasks beyond this point will be rejected.
1023                         for (;;) {
1024                             int oldState = state;
1025                             if (oldState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet(
1026                                     SingleThreadEventExecutor.this, oldState, ST_SHUTDOWN)) {
1027                                 break;
1028                             }
1029                         }
1030
1031                         // We have the final set of tasks in the queue now, no more can be added, run all remaining.
1032                         // No need to loop here, this is the final pass.
1033                         confirmShutdown();
1034                     } finally {
1035                         try {
1036                             cleanup();
1037                         } finally {
1038                             // Lets remove all FastThreadLocals for the Thread as we are about to terminate and notify
1039                             // the future. The user may block on the future and once it unblocks the JVM may terminate
1040                             // and start unloading classes.
1041                             // See https://github.com/netty/netty/issues/6596.
1042                             FastThreadLocal.removeAll();
1043
1044                             STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
1045                             threadLock.countDown();
1046                             int numUserTasks = drainTasks();
1047                             if (numUserTasks > 0 && logger.isWarnEnabled()) {
1048                                 logger.warn("An event executor terminated with " +
1049                                         "non-empty task queue (" + numUserTasks + ')');
1050                             }
1051                             terminationFuture.setSuccess(null);
1052                         }
1053                     }
1054                 }
1055             }
1056         });
1057     }
1058
1059     final int drainTasks() {
1060         int numTasks = 0;
1061         for (;;) {
1062             Runnable runnable = taskQueue.poll();
1063             if (runnable == null) {
1064                 break;
1065             }
1066             // WAKEUP_TASK should be just discarded as these are added internally.
1067             // The important bit is that we not have any user tasks left.
1068             if (WAKEUP_TASK != runnable) {
1069                 numTasks++;
1070             }
1071         }
1072         return numTasks;
1073     }
1074
1075     private static final class DefaultThreadProperties implements ThreadProperties {
1076         private final Thread t;
1077
1078         DefaultThreadProperties(Thread t) {
1079             this.t = t;
1080         }
1081
1082         @Override
1083         public State state() {
1084             return t.getState();
1085         }
1086
1087         @Override
1088         public int priority() {
1089             return t.getPriority();
1090         }
1091
1092         @Override
1093         public boolean isInterrupted() {
1094             return t.isInterrupted();
1095         }
1096
1097         @Override
1098         public boolean isDaemon() {
1099             return t.isDaemon();
1100         }
1101
1102         @Override
1103         public String name() {
1104             return t.getName();
1105         }
1106
1107         @Override
1108         public long id() {
1109             return t.getId();
1110         }
1111
1112         @Override
1113         public StackTraceElement[] stackTrace() {
1114             return t.getStackTrace();
1115         }
1116
1117         @Override
1118         public boolean isAlive() {
1119             return t.isAlive();
1120         }
1121     }
1122 }
1123