1 /*
2  * Copyright 2012 The Netty Project
3  *
4  * The Netty Project licenses this file to you under the Apache License,
5  * version 2.0 (the "License"); you may not use this file except in compliance
6  * with the License. You may obtain a copy of the License at:
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13  * License for the specific language governing permissions and limitations
14  * under the License.
15  */

16 package io.netty.util.concurrent;
17
18 import io.netty.util.internal.ObjectUtil;
19 import io.netty.util.internal.ThreadExecutorMap;
20 import io.netty.util.internal.logging.InternalLogger;
21 import io.netty.util.internal.logging.InternalLoggerFactory;
22
23 import java.security.AccessController;
24 import java.security.PrivilegedAction;
25 import java.util.Queue;
26 import java.util.concurrent.BlockingQueue;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.LinkedBlockingQueue;
29 import java.util.concurrent.RejectedExecutionException;
30 import java.util.concurrent.ThreadFactory;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.atomic.AtomicBoolean;
33
34 /**
35  * Single-thread singleton {@link EventExecutor}.  It starts the thread automatically and stops it when there is no
36  * task pending in the task queue for 1 second.  Please note it is not scalable to schedule large number of tasks to
37  * this executor; use a dedicated executor.
38  */

39 public final class GlobalEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
40
41     private static final InternalLogger logger = InternalLoggerFactory.getInstance(GlobalEventExecutor.class);
42
43     private static final long SCHEDULE_QUIET_PERIOD_INTERVAL = TimeUnit.SECONDS.toNanos(1);
44
45     public static final GlobalEventExecutor INSTANCE = new GlobalEventExecutor();
46
47     final BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<Runnable>();
48     final ScheduledFutureTask<Void> quietPeriodTask = new ScheduledFutureTask<Void>(
49             this, Executors.<Void>callable(new Runnable() {
50         @Override
51         public void run() {
52             // NOOP
53         }
54     }, null), ScheduledFutureTask.deadlineNanos(SCHEDULE_QUIET_PERIOD_INTERVAL), -SCHEDULE_QUIET_PERIOD_INTERVAL);
55
56     // because the GlobalEventExecutor is a singleton, tasks submitted to it can come from arbitrary threads and this
57     // can trigger the creation of a thread from arbitrary thread groups; for this reason, the thread factory must not
58     // be sticky about its thread group
59     // visible for testing
60     final ThreadFactory threadFactory;
61     private final TaskRunner taskRunner = new TaskRunner();
62     private final AtomicBoolean started = new AtomicBoolean();
63     volatile Thread thread;
64
65     private final Future<?> terminationFuture = new FailedFuture<Object>(thisnew UnsupportedOperationException());
66
67     private GlobalEventExecutor() {
68         scheduledTaskQueue().add(quietPeriodTask);
69         threadFactory = ThreadExecutorMap.apply(new DefaultThreadFactory(
70                 DefaultThreadFactory.toPoolName(getClass()), false, Thread.NORM_PRIORITY, null), this);
71     }
72
73     /**
74      * Take the next {@link Runnable} from the task queue and so will block if no task is currently present.
75      *
76      * @return {@code nullif the executor thread has been interrupted or waken up.
77      */

78     Runnable takeTask() {
79         BlockingQueue<Runnable> taskQueue = this.taskQueue;
80         for (;;) {
81             ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
82             if (scheduledTask == null) {
83                 Runnable task = null;
84                 try {
85                     task = taskQueue.take();
86                 } catch (InterruptedException e) {
87                     // Ignore
88                 }
89                 return task;
90             } else {
91                 long delayNanos = scheduledTask.delayNanos();
92                 Runnable task = null;
93                 if (delayNanos > 0) {
94                     try {
95                         task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
96                     } catch (InterruptedException e) {
97                         // Waken up.
98                         return null;
99                     }
100                 }
101                 if (task == null) {
102                     // We need to fetch the scheduled tasks now as otherwise there may be a chance that
103                     // scheduled tasks are never executed if there is always one task in the taskQueue.
104                     // This is for example true for the read task of OIO Transport
105                     // See https://github.com/netty/netty/issues/1614
106                     fetchFromScheduledTaskQueue();
107                     task = taskQueue.poll();
108                 }
109
110                 if (task != null) {
111                     return task;
112                 }
113             }
114         }
115     }
116
117     private void fetchFromScheduledTaskQueue() {
118         long nanoTime = AbstractScheduledEventExecutor.nanoTime();
119         Runnable scheduledTask = pollScheduledTask(nanoTime);
120         while (scheduledTask != null) {
121             taskQueue.add(scheduledTask);
122             scheduledTask = pollScheduledTask(nanoTime);
123         }
124     }
125
126     /**
127      * Return the number of tasks that are pending for processing.
128      *
129      * <strong>Be aware that this operation may be expensive as it depends on the internal implementation of the
130      * SingleThreadEventExecutor. So use it was care!</strong>
131      */

132     public int pendingTasks() {
133         return taskQueue.size();
134     }
135
136     /**
137      * Add a task to the task queue, or throws a {@link RejectedExecutionException} if this instance was shutdown
138      * before.
139      */

140     private void addTask(Runnable task) {
141         taskQueue.add(ObjectUtil.checkNotNull(task, "task"));
142     }
143
144     @Override
145     public boolean inEventLoop(Thread thread) {
146         return thread == this.thread;
147     }
148
149     @Override
150     public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
151         return terminationFuture();
152     }
153
154     @Override
155     public Future<?> terminationFuture() {
156         return terminationFuture;
157     }
158
159     @Override
160     @Deprecated
161     public void shutdown() {
162         throw new UnsupportedOperationException();
163     }
164
165     @Override
166     public boolean isShuttingDown() {
167         return false;
168     }
169
170     @Override
171     public boolean isShutdown() {
172         return false;
173     }
174
175     @Override
176     public boolean isTerminated() {
177         return false;
178     }
179
180     @Override
181     public boolean awaitTermination(long timeout, TimeUnit unit) {
182         return false;
183     }
184
185     /**
186      * Waits until the worker thread of this executor has no tasks left in its task queue and terminates itself.
187      * Because a new worker thread will be started again when a new task is submitted, this operation is only useful
188      * when you want to ensure that the worker thread is terminated <strong>after</strong> your application is shut
189      * down and there's no chance of submitting a new task afterwards.
190      *
191      * @return {@code trueif and only if the worker thread has been terminated
192      */

193     public boolean awaitInactivity(long timeout, TimeUnit unit) throws InterruptedException {
194         ObjectUtil.checkNotNull(unit, "unit");
195
196         final Thread thread = this.thread;
197         if (thread == null) {
198             throw new IllegalStateException("thread was not started");
199         }
200         thread.join(unit.toMillis(timeout));
201         return !thread.isAlive();
202     }
203
204     @Override
205     public void execute(Runnable task) {
206         addTask(ObjectUtil.checkNotNull(task, "task"));
207         if (!inEventLoop()) {
208             startThread();
209         }
210     }
211
212     private void startThread() {
213         if (started.compareAndSet(falsetrue)) {
214             final Thread t = threadFactory.newThread(taskRunner);
215             // Set to null to ensure we not create classloader leaks by holds a strong reference to the inherited
216             // classloader.
217             // See:
218             // - https://github.com/netty/netty/issues/7290
219             // - https://bugs.openjdk.java.net/browse/JDK-7008595
220             AccessController.doPrivileged(new PrivilegedAction<Void>() {
221                 @Override
222                 public Void run() {
223                     t.setContextClassLoader(null);
224                     return null;
225                 }
226             });
227
228             // Set the thread before starting it as otherwise inEventLoop() may return false and so produce
229             // an assert error.
230             // See https://github.com/netty/netty/issues/4357
231             thread = t;
232             t.start();
233         }
234     }
235
236     final class TaskRunner implements Runnable {
237         @Override
238         public void run() {
239             for (;;) {
240                 Runnable task = takeTask();
241                 if (task != null) {
242                     try {
243                         task.run();
244                     } catch (Throwable t) {
245                         logger.warn("Unexpected exception from the global event executor: ", t);
246                     }
247
248                     if (task != quietPeriodTask) {
249                         continue;
250                     }
251                 }
252
253                 Queue<ScheduledFutureTask<?>> scheduledTaskQueue = GlobalEventExecutor.this.scheduledTaskQueue;
254                 // Terminate if there is no task in the queue (except the noop task).
255                 if (taskQueue.isEmpty() && (scheduledTaskQueue == null || scheduledTaskQueue.size() == 1)) {
256                     // Mark the current thread as stopped.
257                     // The following CAS must always success and must be uncontended,
258                     // because only one thread should be running at the same time.
259                     boolean stopped = started.compareAndSet(truefalse);
260                     assert stopped;
261
262                     // Check if there are pending entries added by execute() or schedule*() while we do CAS above.
263                     if (taskQueue.isEmpty() && (scheduledTaskQueue == null || scheduledTaskQueue.size() == 1)) {
264                         // A) No new task was added and thus there's nothing to handle
265                         //    -> safe to terminate because there's nothing left to do
266                         // B) A new thread started and handled all the new tasks.
267                         //    -> safe to terminate the new thread will take care the rest
268                         break;
269                     }
270
271                     // There are pending tasks added again.
272                     if (!started.compareAndSet(falsetrue)) {
273                         // startThread() started a new thread and set 'started' to true.
274                         // -> terminate this thread so that the new thread reads from taskQueue exclusively.
275                         break;
276                     }
277
278                     // New tasks were added, but this worker was faster to set 'started' to true.
279                     // i.e. a new worker thread was not started by startThread().
280                     // -> keep this thread alive to handle the newly added entries.
281                 }
282             }
283         }
284     }
285 }
286