1
16 package io.netty.util.concurrent;
17
18 import io.netty.util.internal.ObjectUtil;
19 import io.netty.util.internal.PlatformDependent;
20 import io.netty.util.internal.SystemPropertyUtil;
21 import io.netty.util.internal.ThreadExecutorMap;
22 import io.netty.util.internal.UnstableApi;
23 import io.netty.util.internal.logging.InternalLogger;
24 import io.netty.util.internal.logging.InternalLoggerFactory;
25
26 import java.lang.Thread.State;
27 import java.util.ArrayList;
28 import java.util.Collection;
29 import java.util.LinkedHashSet;
30 import java.util.List;
31 import java.util.Queue;
32 import java.util.Set;
33 import java.util.concurrent.BlockingQueue;
34 import java.util.concurrent.Callable;
35 import java.util.concurrent.CountDownLatch;
36 import java.util.concurrent.ExecutionException;
37 import java.util.concurrent.Executor;
38 import java.util.concurrent.LinkedBlockingQueue;
39 import java.util.concurrent.RejectedExecutionException;
40 import java.util.concurrent.ThreadFactory;
41 import java.util.concurrent.TimeUnit;
42 import java.util.concurrent.TimeoutException;
43 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
44 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
45
46
50 public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
51
52 static final int DEFAULT_MAX_PENDING_EXECUTOR_TASKS = Math.max(16,
53 SystemPropertyUtil.getInt("io.netty.eventexecutor.maxPendingTasks", Integer.MAX_VALUE));
54
55 private static final InternalLogger logger =
56 InternalLoggerFactory.getInstance(SingleThreadEventExecutor.class);
57
58 private static final int ST_NOT_STARTED = 1;
59 private static final int ST_STARTED = 2;
60 private static final int ST_SHUTTING_DOWN = 3;
61 private static final int ST_SHUTDOWN = 4;
62 private static final int ST_TERMINATED = 5;
63
64 private static final Runnable NOOP_TASK = new Runnable() {
65 @Override
66 public void run() {
67
68 }
69 };
70
71 private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER =
72 AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");
73 private static final AtomicReferenceFieldUpdater<SingleThreadEventExecutor, ThreadProperties> PROPERTIES_UPDATER =
74 AtomicReferenceFieldUpdater.newUpdater(
75 SingleThreadEventExecutor.class, ThreadProperties.class, "threadProperties");
76
77 private final Queue<Runnable> taskQueue;
78
79 private volatile Thread thread;
80 @SuppressWarnings("unused")
81 private volatile ThreadProperties threadProperties;
82 private final Executor executor;
83 private volatile boolean interrupted;
84
85 private final CountDownLatch threadLock = new CountDownLatch(1);
86 private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();
87 private final boolean addTaskWakesUp;
88 private final int maxPendingTasks;
89 private final RejectedExecutionHandler rejectedExecutionHandler;
90
91 private long lastExecutionTime;
92
93 @SuppressWarnings({ "FieldMayBeFinal", "unused" })
94 private volatile int state = ST_NOT_STARTED;
95
96 private volatile long gracefulShutdownQuietPeriod;
97 private volatile long gracefulShutdownTimeout;
98 private long gracefulShutdownStartTime;
99
100 private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);
101
102
110 protected SingleThreadEventExecutor(
111 EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
112 this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp);
113 }
114
115
125 protected SingleThreadEventExecutor(
126 EventExecutorGroup parent, ThreadFactory threadFactory,
127 boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
128 this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp, maxPendingTasks, rejectedHandler);
129 }
130
131
139 protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp) {
140 this(parent, executor, addTaskWakesUp, DEFAULT_MAX_PENDING_EXECUTOR_TASKS, RejectedExecutionHandlers.reject());
141 }
142
143
153 protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
154 boolean addTaskWakesUp, int maxPendingTasks,
155 RejectedExecutionHandler rejectedHandler) {
156 super(parent);
157 this.addTaskWakesUp = addTaskWakesUp;
158 this.maxPendingTasks = Math.max(16, maxPendingTasks);
159 this.executor = ThreadExecutorMap.apply(executor, this);
160 taskQueue = newTaskQueue(this.maxPendingTasks);
161 rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
162 }
163
164 protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
165 boolean addTaskWakesUp, Queue<Runnable> taskQueue,
166 RejectedExecutionHandler rejectedHandler) {
167 super(parent);
168 this.addTaskWakesUp = addTaskWakesUp;
169 this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
170 this.executor = ThreadExecutorMap.apply(executor, this);
171 this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");
172 this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
173 }
174
175
178 @Deprecated
179 protected Queue<Runnable> newTaskQueue() {
180 return newTaskQueue(maxPendingTasks);
181 }
182
183
189 protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
190 return new LinkedBlockingQueue<Runnable>(maxPendingTasks);
191 }
192
193
196 protected void interruptThread() {
197 Thread currentThread = thread;
198 if (currentThread == null) {
199 interrupted = true;
200 } else {
201 currentThread.interrupt();
202 }
203 }
204
205
208 protected Runnable pollTask() {
209 assert inEventLoop();
210 return pollTaskFrom(taskQueue);
211 }
212
213 protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
214 for (;;) {
215 Runnable task = taskQueue.poll();
216 if (task != WAKEUP_TASK) {
217 return task;
218 }
219 }
220 }
221
222
231 protected Runnable takeTask() {
232 assert inEventLoop();
233 if (!(taskQueue instanceof BlockingQueue)) {
234 throw new UnsupportedOperationException();
235 }
236
237 BlockingQueue<Runnable> taskQueue = (BlockingQueue<Runnable>) this.taskQueue;
238 for (;;) {
239 ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
240 if (scheduledTask == null) {
241 Runnable task = null;
242 try {
243 task = taskQueue.take();
244 if (task == WAKEUP_TASK) {
245 task = null;
246 }
247 } catch (InterruptedException e) {
248
249 }
250 return task;
251 } else {
252 long delayNanos = scheduledTask.delayNanos();
253 Runnable task = null;
254 if (delayNanos > 0) {
255 try {
256 task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
257 } catch (InterruptedException e) {
258
259 return null;
260 }
261 }
262 if (task == null) {
263
264
265
266
267 fetchFromScheduledTaskQueue();
268 task = taskQueue.poll();
269 }
270
271 if (task != null) {
272 return task;
273 }
274 }
275 }
276 }
277
278 private boolean fetchFromScheduledTaskQueue() {
279 if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
280 return true;
281 }
282 long nanoTime = AbstractScheduledEventExecutor.nanoTime();
283 for (;;) {
284 Runnable scheduledTask = pollScheduledTask(nanoTime);
285 if (scheduledTask == null) {
286 return true;
287 }
288 if (!taskQueue.offer(scheduledTask)) {
289
290 scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask);
291 return false;
292 }
293 }
294 }
295
296
299 private boolean executeExpiredScheduledTasks() {
300 if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
301 return false;
302 }
303 long nanoTime = AbstractScheduledEventExecutor.nanoTime();
304 Runnable scheduledTask = pollScheduledTask(nanoTime);
305 if (scheduledTask == null) {
306 return false;
307 }
308 do {
309 safeExecute(scheduledTask);
310 } while ((scheduledTask = pollScheduledTask(nanoTime)) != null);
311 return true;
312 }
313
314
317 protected Runnable peekTask() {
318 assert inEventLoop();
319 return taskQueue.peek();
320 }
321
322
325 protected boolean hasTasks() {
326 assert inEventLoop();
327 return !taskQueue.isEmpty();
328 }
329
330
336 public int pendingTasks() {
337 return taskQueue.size();
338 }
339
340
344 protected void addTask(Runnable task) {
345 ObjectUtil.checkNotNull(task, "task");
346 if (!offerTask(task)) {
347 reject(task);
348 }
349 }
350
351 final boolean offerTask(Runnable task) {
352 if (isShutdown()) {
353 reject();
354 }
355 return taskQueue.offer(task);
356 }
357
358
361 protected boolean removeTask(Runnable task) {
362 return taskQueue.remove(ObjectUtil.checkNotNull(task, "task"));
363 }
364
365
370 protected boolean runAllTasks() {
371 assert inEventLoop();
372 boolean fetchedAll;
373 boolean ranAtLeastOne = false;
374
375 do {
376 fetchedAll = fetchFromScheduledTaskQueue();
377 if (runAllTasksFrom(taskQueue)) {
378 ranAtLeastOne = true;
379 }
380 } while (!fetchedAll);
381
382 if (ranAtLeastOne) {
383 lastExecutionTime = ScheduledFutureTask.nanoTime();
384 }
385 afterRunningAllTasks();
386 return ranAtLeastOne;
387 }
388
389
397 protected final boolean runScheduledAndExecutorTasks(final int maxDrainAttempts) {
398 assert inEventLoop();
399 boolean ranAtLeastOneTask;
400 int drainAttempt = 0;
401 do {
402
403
404 ranAtLeastOneTask = runExistingTasksFrom(taskQueue) | executeExpiredScheduledTasks();
405 } while (ranAtLeastOneTask && ++drainAttempt < maxDrainAttempts);
406
407 if (drainAttempt > 0) {
408 lastExecutionTime = ScheduledFutureTask.nanoTime();
409 }
410 afterRunningAllTasks();
411
412 return drainAttempt > 0;
413 }
414
415
422 protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
423 Runnable task = pollTaskFrom(taskQueue);
424 if (task == null) {
425 return false;
426 }
427 for (;;) {
428 safeExecute(task);
429 task = pollTaskFrom(taskQueue);
430 if (task == null) {
431 return true;
432 }
433 }
434 }
435
436
441 private boolean runExistingTasksFrom(Queue<Runnable> taskQueue) {
442 Runnable task = pollTaskFrom(taskQueue);
443 if (task == null) {
444 return false;
445 }
446 int remaining = Math.min(maxPendingTasks, taskQueue.size());
447 safeExecute(task);
448
449
450 while (remaining-- > 0 && (task = taskQueue.poll()) != null) {
451 safeExecute(task);
452 }
453 return true;
454 }
455
456
460 protected boolean runAllTasks(long timeoutNanos) {
461 fetchFromScheduledTaskQueue();
462 Runnable task = pollTask();
463 if (task == null) {
464 afterRunningAllTasks();
465 return false;
466 }
467
468 final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0;
469 long runTasks = 0;
470 long lastExecutionTime;
471 for (;;) {
472 safeExecute(task);
473
474 runTasks ++;
475
476
477
478 if ((runTasks & 0x3F) == 0) {
479 lastExecutionTime = ScheduledFutureTask.nanoTime();
480 if (lastExecutionTime >= deadline) {
481 break;
482 }
483 }
484
485 task = pollTask();
486 if (task == null) {
487 lastExecutionTime = ScheduledFutureTask.nanoTime();
488 break;
489 }
490 }
491
492 afterRunningAllTasks();
493 this.lastExecutionTime = lastExecutionTime;
494 return true;
495 }
496
497
500 @UnstableApi
501 protected void afterRunningAllTasks() { }
502
503
506 protected long delayNanos(long currentTimeNanos) {
507 ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
508 if (scheduledTask == null) {
509 return SCHEDULE_PURGE_INTERVAL;
510 }
511
512 return scheduledTask.delayNanos(currentTimeNanos);
513 }
514
515
519 @UnstableApi
520 protected long deadlineNanos() {
521 ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
522 if (scheduledTask == null) {
523 return nanoTime() + SCHEDULE_PURGE_INTERVAL;
524 }
525 return scheduledTask.deadlineNanos();
526 }
527
528
535 protected void updateLastExecutionTime() {
536 lastExecutionTime = ScheduledFutureTask.nanoTime();
537 }
538
539
542 protected abstract void run();
543
544
547 protected void cleanup() {
548
549 }
550
551 protected void wakeup(boolean inEventLoop) {
552 if (!inEventLoop) {
553
554
555 taskQueue.offer(WAKEUP_TASK);
556 }
557 }
558
559 @Override
560 public boolean inEventLoop(Thread thread) {
561 return thread == this.thread;
562 }
563
564
567 public void addShutdownHook(final Runnable task) {
568 if (inEventLoop()) {
569 shutdownHooks.add(task);
570 } else {
571 execute(new Runnable() {
572 @Override
573 public void run() {
574 shutdownHooks.add(task);
575 }
576 });
577 }
578 }
579
580
583 public void removeShutdownHook(final Runnable task) {
584 if (inEventLoop()) {
585 shutdownHooks.remove(task);
586 } else {
587 execute(new Runnable() {
588 @Override
589 public void run() {
590 shutdownHooks.remove(task);
591 }
592 });
593 }
594 }
595
596 private boolean runShutdownHooks() {
597 boolean ran = false;
598
599 while (!shutdownHooks.isEmpty()) {
600 List<Runnable> copy = new ArrayList<Runnable>(shutdownHooks);
601 shutdownHooks.clear();
602 for (Runnable task: copy) {
603 try {
604 task.run();
605 } catch (Throwable t) {
606 logger.warn("Shutdown hook raised an exception.", t);
607 } finally {
608 ran = true;
609 }
610 }
611 }
612
613 if (ran) {
614 lastExecutionTime = ScheduledFutureTask.nanoTime();
615 }
616
617 return ran;
618 }
619
620 @Override
621 public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
622 ObjectUtil.checkPositiveOrZero(quietPeriod, "quietPeriod");
623 if (timeout < quietPeriod) {
624 throw new IllegalArgumentException(
625 "timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))");
626 }
627 ObjectUtil.checkNotNull(unit, "unit");
628
629 if (isShuttingDown()) {
630 return terminationFuture();
631 }
632
633 boolean inEventLoop = inEventLoop();
634 boolean wakeup;
635 int oldState;
636 for (;;) {
637 if (isShuttingDown()) {
638 return terminationFuture();
639 }
640 int newState;
641 wakeup = true;
642 oldState = state;
643 if (inEventLoop) {
644 newState = ST_SHUTTING_DOWN;
645 } else {
646 switch (oldState) {
647 case ST_NOT_STARTED:
648 case ST_STARTED:
649 newState = ST_SHUTTING_DOWN;
650 break;
651 default:
652 newState = oldState;
653 wakeup = false;
654 }
655 }
656 if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
657 break;
658 }
659 }
660 gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod);
661 gracefulShutdownTimeout = unit.toNanos(timeout);
662
663 if (ensureThreadStarted(oldState)) {
664 return terminationFuture;
665 }
666
667 if (wakeup) {
668 taskQueue.offer(WAKEUP_TASK);
669 if (!addTaskWakesUp) {
670 wakeup(inEventLoop);
671 }
672 }
673
674 return terminationFuture();
675 }
676
677 @Override
678 public Future<?> terminationFuture() {
679 return terminationFuture;
680 }
681
682 @Override
683 @Deprecated
684 public void shutdown() {
685 if (isShutdown()) {
686 return;
687 }
688
689 boolean inEventLoop = inEventLoop();
690 boolean wakeup;
691 int oldState;
692 for (;;) {
693 if (isShuttingDown()) {
694 return;
695 }
696 int newState;
697 wakeup = true;
698 oldState = state;
699 if (inEventLoop) {
700 newState = ST_SHUTDOWN;
701 } else {
702 switch (oldState) {
703 case ST_NOT_STARTED:
704 case ST_STARTED:
705 case ST_SHUTTING_DOWN:
706 newState = ST_SHUTDOWN;
707 break;
708 default:
709 newState = oldState;
710 wakeup = false;
711 }
712 }
713 if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
714 break;
715 }
716 }
717
718 if (ensureThreadStarted(oldState)) {
719 return;
720 }
721
722 if (wakeup) {
723 taskQueue.offer(WAKEUP_TASK);
724 if (!addTaskWakesUp) {
725 wakeup(inEventLoop);
726 }
727 }
728 }
729
730 @Override
731 public boolean isShuttingDown() {
732 return state >= ST_SHUTTING_DOWN;
733 }
734
735 @Override
736 public boolean isShutdown() {
737 return state >= ST_SHUTDOWN;
738 }
739
740 @Override
741 public boolean isTerminated() {
742 return state == ST_TERMINATED;
743 }
744
745
748 protected boolean confirmShutdown() {
749 if (!isShuttingDown()) {
750 return false;
751 }
752
753 if (!inEventLoop()) {
754 throw new IllegalStateException("must be invoked from an event loop");
755 }
756
757 cancelScheduledTasks();
758
759 if (gracefulShutdownStartTime == 0) {
760 gracefulShutdownStartTime = ScheduledFutureTask.nanoTime();
761 }
762
763 if (runAllTasks() || runShutdownHooks()) {
764 if (isShutdown()) {
765
766 return true;
767 }
768
769
770
771
772 if (gracefulShutdownQuietPeriod == 0) {
773 return true;
774 }
775 taskQueue.offer(WAKEUP_TASK);
776 return false;
777 }
778
779 final long nanoTime = ScheduledFutureTask.nanoTime();
780
781 if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
782 return true;
783 }
784
785 if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
786
787
788 taskQueue.offer(WAKEUP_TASK);
789 try {
790 Thread.sleep(100);
791 } catch (InterruptedException e) {
792
793 }
794
795 return false;
796 }
797
798
799
800 return true;
801 }
802
803 @Override
804 public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
805 ObjectUtil.checkNotNull(unit, "unit");
806 if (inEventLoop()) {
807 throw new IllegalStateException("cannot await termination of the current thread");
808 }
809
810 threadLock.await(timeout, unit);
811
812 return isTerminated();
813 }
814
815 @Override
816 public void execute(Runnable task) {
817 ObjectUtil.checkNotNull(task, "task");
818 execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
819 }
820
821 @Override
822 public void lazyExecute(Runnable task) {
823 execute(ObjectUtil.checkNotNull(task, "task"), false);
824 }
825
826 private void execute(Runnable task, boolean immediate) {
827 boolean inEventLoop = inEventLoop();
828 addTask(task);
829 if (!inEventLoop) {
830 startThread();
831 if (isShutdown()) {
832 boolean reject = false;
833 try {
834 if (removeTask(task)) {
835 reject = true;
836 }
837 } catch (UnsupportedOperationException e) {
838
839
840
841 }
842 if (reject) {
843 reject();
844 }
845 }
846 }
847
848 if (!addTaskWakesUp && immediate) {
849 wakeup(inEventLoop);
850 }
851 }
852
853 @Override
854 public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
855 throwIfInEventLoop("invokeAny");
856 return super.invokeAny(tasks);
857 }
858
859 @Override
860 public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
861 throws InterruptedException, ExecutionException, TimeoutException {
862 throwIfInEventLoop("invokeAny");
863 return super.invokeAny(tasks, timeout, unit);
864 }
865
866 @Override
867 public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
868 throws InterruptedException {
869 throwIfInEventLoop("invokeAll");
870 return super.invokeAll(tasks);
871 }
872
873 @Override
874 public <T> List<java.util.concurrent.Future<T>> invokeAll(
875 Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
876 throwIfInEventLoop("invokeAll");
877 return super.invokeAll(tasks, timeout, unit);
878 }
879
880 private void throwIfInEventLoop(String method) {
881 if (inEventLoop()) {
882 throw new RejectedExecutionException("Calling " + method + " from within the EventLoop is not allowed");
883 }
884 }
885
886
891 public final ThreadProperties threadProperties() {
892 ThreadProperties threadProperties = this.threadProperties;
893 if (threadProperties == null) {
894 Thread thread = this.thread;
895 if (thread == null) {
896 assert !inEventLoop();
897 submit(NOOP_TASK).syncUninterruptibly();
898 thread = this.thread;
899 assert thread != null;
900 }
901
902 threadProperties = new DefaultThreadProperties(thread);
903 if (!PROPERTIES_UPDATER.compareAndSet(this, null, threadProperties)) {
904 threadProperties = this.threadProperties;
905 }
906 }
907
908 return threadProperties;
909 }
910
911
914 @Deprecated
915 protected interface NonWakeupRunnable extends LazyRunnable { }
916
917
921 protected boolean wakesUpForTask(Runnable task) {
922 return true;
923 }
924
925 protected static void reject() {
926 throw new RejectedExecutionException("event executor terminated");
927 }
928
929
934 protected final void reject(Runnable task) {
935 rejectedExecutionHandler.rejected(task, this);
936 }
937
938
939
940 private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1);
941
942 private void startThread() {
943 if (state == ST_NOT_STARTED) {
944 if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
945 boolean success = false;
946 try {
947 doStartThread();
948 success = true;
949 } finally {
950 if (!success) {
951 STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
952 }
953 }
954 }
955 }
956 }
957
958 private boolean ensureThreadStarted(int oldState) {
959 if (oldState == ST_NOT_STARTED) {
960 try {
961 doStartThread();
962 } catch (Throwable cause) {
963 STATE_UPDATER.set(this, ST_TERMINATED);
964 terminationFuture.tryFailure(cause);
965
966 if (!(cause instanceof Exception)) {
967
968 PlatformDependent.throwException(cause);
969 }
970 return true;
971 }
972 }
973 return false;
974 }
975
976 private void doStartThread() {
977 assert thread == null;
978 executor.execute(new Runnable() {
979 @Override
980 public void run() {
981 thread = Thread.currentThread();
982 if (interrupted) {
983 thread.interrupt();
984 }
985
986 boolean success = false;
987 updateLastExecutionTime();
988 try {
989 SingleThreadEventExecutor.this.run();
990 success = true;
991 } catch (Throwable t) {
992 logger.warn("Unexpected exception from an event executor: ", t);
993 } finally {
994 for (;;) {
995 int oldState = state;
996 if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
997 SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
998 break;
999 }
1000 }
1001
1002
1003 if (success && gracefulShutdownStartTime == 0) {
1004 if (logger.isErrorEnabled()) {
1005 logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
1006 SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
1007 "be called before run() implementation terminates.");
1008 }
1009 }
1010
1011 try {
1012
1013
1014
1015 for (;;) {
1016 if (confirmShutdown()) {
1017 break;
1018 }
1019 }
1020
1021
1022
1023 for (;;) {
1024 int oldState = state;
1025 if (oldState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet(
1026 SingleThreadEventExecutor.this, oldState, ST_SHUTDOWN)) {
1027 break;
1028 }
1029 }
1030
1031
1032
1033 confirmShutdown();
1034 } finally {
1035 try {
1036 cleanup();
1037 } finally {
1038
1039
1040
1041
1042 FastThreadLocal.removeAll();
1043
1044 STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
1045 threadLock.countDown();
1046 int numUserTasks = drainTasks();
1047 if (numUserTasks > 0 && logger.isWarnEnabled()) {
1048 logger.warn("An event executor terminated with " +
1049 "non-empty task queue (" + numUserTasks + ')');
1050 }
1051 terminationFuture.setSuccess(null);
1052 }
1053 }
1054 }
1055 }
1056 });
1057 }
1058
1059 final int drainTasks() {
1060 int numTasks = 0;
1061 for (;;) {
1062 Runnable runnable = taskQueue.poll();
1063 if (runnable == null) {
1064 break;
1065 }
1066
1067
1068 if (WAKEUP_TASK != runnable) {
1069 numTasks++;
1070 }
1071 }
1072 return numTasks;
1073 }
1074
1075 private static final class DefaultThreadProperties implements ThreadProperties {
1076 private final Thread t;
1077
1078 DefaultThreadProperties(Thread t) {
1079 this.t = t;
1080 }
1081
1082 @Override
1083 public State state() {
1084 return t.getState();
1085 }
1086
1087 @Override
1088 public int priority() {
1089 return t.getPriority();
1090 }
1091
1092 @Override
1093 public boolean isInterrupted() {
1094 return t.isInterrupted();
1095 }
1096
1097 @Override
1098 public boolean isDaemon() {
1099 return t.isDaemon();
1100 }
1101
1102 @Override
1103 public String name() {
1104 return t.getName();
1105 }
1106
1107 @Override
1108 public long id() {
1109 return t.getId();
1110 }
1111
1112 @Override
1113 public StackTraceElement[] stackTrace() {
1114 return t.getStackTrace();
1115 }
1116
1117 @Override
1118 public boolean isAlive() {
1119 return t.isAlive();
1120 }
1121 }
1122 }
1123