1 /*
2 * Copyright 2012 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package io.netty.util.concurrent;
17
18 import io.netty.util.internal.ObjectUtil;
19 import io.netty.util.internal.ThreadExecutorMap;
20 import io.netty.util.internal.logging.InternalLogger;
21 import io.netty.util.internal.logging.InternalLoggerFactory;
22
23 import java.security.AccessController;
24 import java.security.PrivilegedAction;
25 import java.util.Queue;
26 import java.util.concurrent.BlockingQueue;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.LinkedBlockingQueue;
29 import java.util.concurrent.RejectedExecutionException;
30 import java.util.concurrent.ThreadFactory;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.atomic.AtomicBoolean;
33
34 /**
35 * Single-thread singleton {@link EventExecutor}. It starts the thread automatically and stops it when there is no
36 * task pending in the task queue for 1 second. Please note it is not scalable to schedule large number of tasks to
37 * this executor; use a dedicated executor.
38 */
39 public final class GlobalEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
40
41 private static final InternalLogger logger = InternalLoggerFactory.getInstance(GlobalEventExecutor.class);
42
43 private static final long SCHEDULE_QUIET_PERIOD_INTERVAL = TimeUnit.SECONDS.toNanos(1);
44
45 public static final GlobalEventExecutor INSTANCE = new GlobalEventExecutor();
46
47 final BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<Runnable>();
48 final ScheduledFutureTask<Void> quietPeriodTask = new ScheduledFutureTask<Void>(
49 this, Executors.<Void>callable(new Runnable() {
50 @Override
51 public void run() {
52 // NOOP
53 }
54 }, null), ScheduledFutureTask.deadlineNanos(SCHEDULE_QUIET_PERIOD_INTERVAL), -SCHEDULE_QUIET_PERIOD_INTERVAL);
55
56 // because the GlobalEventExecutor is a singleton, tasks submitted to it can come from arbitrary threads and this
57 // can trigger the creation of a thread from arbitrary thread groups; for this reason, the thread factory must not
58 // be sticky about its thread group
59 // visible for testing
60 final ThreadFactory threadFactory;
61 private final TaskRunner taskRunner = new TaskRunner();
62 private final AtomicBoolean started = new AtomicBoolean();
63 volatile Thread thread;
64
65 private final Future<?> terminationFuture = new FailedFuture<Object>(this, new UnsupportedOperationException());
66
67 private GlobalEventExecutor() {
68 scheduledTaskQueue().add(quietPeriodTask);
69 threadFactory = ThreadExecutorMap.apply(new DefaultThreadFactory(
70 DefaultThreadFactory.toPoolName(getClass()), false, Thread.NORM_PRIORITY, null), this);
71 }
72
73 /**
74 * Take the next {@link Runnable} from the task queue and so will block if no task is currently present.
75 *
76 * @return {@code null} if the executor thread has been interrupted or waken up.
77 */
78 Runnable takeTask() {
79 BlockingQueue<Runnable> taskQueue = this.taskQueue;
80 for (;;) {
81 ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
82 if (scheduledTask == null) {
83 Runnable task = null;
84 try {
85 task = taskQueue.take();
86 } catch (InterruptedException e) {
87 // Ignore
88 }
89 return task;
90 } else {
91 long delayNanos = scheduledTask.delayNanos();
92 Runnable task = null;
93 if (delayNanos > 0) {
94 try {
95 task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
96 } catch (InterruptedException e) {
97 // Waken up.
98 return null;
99 }
100 }
101 if (task == null) {
102 // We need to fetch the scheduled tasks now as otherwise there may be a chance that
103 // scheduled tasks are never executed if there is always one task in the taskQueue.
104 // This is for example true for the read task of OIO Transport
105 // See https://github.com/netty/netty/issues/1614
106 fetchFromScheduledTaskQueue();
107 task = taskQueue.poll();
108 }
109
110 if (task != null) {
111 return task;
112 }
113 }
114 }
115 }
116
117 private void fetchFromScheduledTaskQueue() {
118 long nanoTime = AbstractScheduledEventExecutor.nanoTime();
119 Runnable scheduledTask = pollScheduledTask(nanoTime);
120 while (scheduledTask != null) {
121 taskQueue.add(scheduledTask);
122 scheduledTask = pollScheduledTask(nanoTime);
123 }
124 }
125
126 /**
127 * Return the number of tasks that are pending for processing.
128 *
129 * <strong>Be aware that this operation may be expensive as it depends on the internal implementation of the
130 * SingleThreadEventExecutor. So use it was care!</strong>
131 */
132 public int pendingTasks() {
133 return taskQueue.size();
134 }
135
136 /**
137 * Add a task to the task queue, or throws a {@link RejectedExecutionException} if this instance was shutdown
138 * before.
139 */
140 private void addTask(Runnable task) {
141 taskQueue.add(ObjectUtil.checkNotNull(task, "task"));
142 }
143
144 @Override
145 public boolean inEventLoop(Thread thread) {
146 return thread == this.thread;
147 }
148
149 @Override
150 public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
151 return terminationFuture();
152 }
153
154 @Override
155 public Future<?> terminationFuture() {
156 return terminationFuture;
157 }
158
159 @Override
160 @Deprecated
161 public void shutdown() {
162 throw new UnsupportedOperationException();
163 }
164
165 @Override
166 public boolean isShuttingDown() {
167 return false;
168 }
169
170 @Override
171 public boolean isShutdown() {
172 return false;
173 }
174
175 @Override
176 public boolean isTerminated() {
177 return false;
178 }
179
180 @Override
181 public boolean awaitTermination(long timeout, TimeUnit unit) {
182 return false;
183 }
184
185 /**
186 * Waits until the worker thread of this executor has no tasks left in its task queue and terminates itself.
187 * Because a new worker thread will be started again when a new task is submitted, this operation is only useful
188 * when you want to ensure that the worker thread is terminated <strong>after</strong> your application is shut
189 * down and there's no chance of submitting a new task afterwards.
190 *
191 * @return {@code true} if and only if the worker thread has been terminated
192 */
193 public boolean awaitInactivity(long timeout, TimeUnit unit) throws InterruptedException {
194 ObjectUtil.checkNotNull(unit, "unit");
195
196 final Thread thread = this.thread;
197 if (thread == null) {
198 throw new IllegalStateException("thread was not started");
199 }
200 thread.join(unit.toMillis(timeout));
201 return !thread.isAlive();
202 }
203
204 @Override
205 public void execute(Runnable task) {
206 addTask(ObjectUtil.checkNotNull(task, "task"));
207 if (!inEventLoop()) {
208 startThread();
209 }
210 }
211
212 private void startThread() {
213 if (started.compareAndSet(false, true)) {
214 final Thread t = threadFactory.newThread(taskRunner);
215 // Set to null to ensure we not create classloader leaks by holds a strong reference to the inherited
216 // classloader.
217 // See:
218 // - https://github.com/netty/netty/issues/7290
219 // - https://bugs.openjdk.java.net/browse/JDK-7008595
220 AccessController.doPrivileged(new PrivilegedAction<Void>() {
221 @Override
222 public Void run() {
223 t.setContextClassLoader(null);
224 return null;
225 }
226 });
227
228 // Set the thread before starting it as otherwise inEventLoop() may return false and so produce
229 // an assert error.
230 // See https://github.com/netty/netty/issues/4357
231 thread = t;
232 t.start();
233 }
234 }
235
236 final class TaskRunner implements Runnable {
237 @Override
238 public void run() {
239 for (;;) {
240 Runnable task = takeTask();
241 if (task != null) {
242 try {
243 task.run();
244 } catch (Throwable t) {
245 logger.warn("Unexpected exception from the global event executor: ", t);
246 }
247
248 if (task != quietPeriodTask) {
249 continue;
250 }
251 }
252
253 Queue<ScheduledFutureTask<?>> scheduledTaskQueue = GlobalEventExecutor.this.scheduledTaskQueue;
254 // Terminate if there is no task in the queue (except the noop task).
255 if (taskQueue.isEmpty() && (scheduledTaskQueue == null || scheduledTaskQueue.size() == 1)) {
256 // Mark the current thread as stopped.
257 // The following CAS must always success and must be uncontended,
258 // because only one thread should be running at the same time.
259 boolean stopped = started.compareAndSet(true, false);
260 assert stopped;
261
262 // Check if there are pending entries added by execute() or schedule*() while we do CAS above.
263 if (taskQueue.isEmpty() && (scheduledTaskQueue == null || scheduledTaskQueue.size() == 1)) {
264 // A) No new task was added and thus there's nothing to handle
265 // -> safe to terminate because there's nothing left to do
266 // B) A new thread started and handled all the new tasks.
267 // -> safe to terminate the new thread will take care the rest
268 break;
269 }
270
271 // There are pending tasks added again.
272 if (!started.compareAndSet(false, true)) {
273 // startThread() started a new thread and set 'started' to true.
274 // -> terminate this thread so that the new thread reads from taskQueue exclusively.
275 break;
276 }
277
278 // New tasks were added, but this worker was faster to set 'started' to true.
279 // i.e. a new worker thread was not started by startThread().
280 // -> keep this thread alive to handle the newly added entries.
281 }
282 }
283 }
284 }
285 }
286