1 /*
2 * Copyright 2014 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package io.netty.channel.epoll;
17
18 import io.netty.channel.EventLoop;
19 import io.netty.channel.EventLoopGroup;
20 import io.netty.channel.EventLoopTaskQueueFactory;
21 import io.netty.channel.SelectStrategy;
22 import io.netty.channel.SingleThreadEventLoop;
23 import io.netty.channel.epoll.AbstractEpollChannel.AbstractEpollUnsafe;
24 import io.netty.channel.unix.FileDescriptor;
25 import io.netty.channel.unix.IovArray;
26 import io.netty.util.IntSupplier;
27 import io.netty.util.collection.IntObjectHashMap;
28 import io.netty.util.collection.IntObjectMap;
29 import io.netty.util.concurrent.RejectedExecutionHandler;
30 import io.netty.util.internal.ObjectUtil;
31 import io.netty.util.internal.PlatformDependent;
32 import io.netty.util.internal.logging.InternalLogger;
33 import io.netty.util.internal.logging.InternalLoggerFactory;
34
35 import java.io.IOException;
36 import java.util.Queue;
37 import java.util.concurrent.Executor;
38 import java.util.concurrent.atomic.AtomicLong;
39
40 import static java.lang.Math.min;
41
42 /**
43 * {@link EventLoop} which uses epoll under the covers. Only works on Linux!
44 */
45 class EpollEventLoop extends SingleThreadEventLoop {
46 private static final InternalLogger logger = InternalLoggerFactory.getInstance(EpollEventLoop.class);
47
48 static {
49 // Ensure JNI is initialized by the time this class is loaded by this time!
50 // We use unix-common methods in this class which are backed by JNI methods.
51 Epoll.ensureAvailability();
52 }
53
54 private final FileDescriptor epollFd;
55 private final FileDescriptor eventFd;
56 private final FileDescriptor timerFd;
57 private final IntObjectMap<AbstractEpollChannel> channels = new IntObjectHashMap<AbstractEpollChannel>(4096);
58 private final boolean allowGrowing;
59 private final EpollEventArray events;
60
61 // These are initialized on first use
62 private IovArray iovArray;
63 private NativeDatagramPacketArray datagramPacketArray;
64
65 private final SelectStrategy selectStrategy;
66 private final IntSupplier selectNowSupplier = new IntSupplier() {
67 @Override
68 public int get() throws Exception {
69 return epollWaitNow();
70 }
71 };
72
73 private static final long AWAKE = -1L;
74 private static final long NONE = Long.MAX_VALUE;
75
76 // nextWakeupNanos is:
77 // AWAKE when EL is awake
78 // NONE when EL is waiting with no wakeup scheduled
79 // other value T when EL is waiting with wakeup scheduled at time T
80 private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE);
81 private boolean pendingWakeup;
82 private volatile int ioRatio = 50;
83
84 // See http://man7.org/linux/man-pages/man2/timerfd_create.2.html.
85 private static final long MAX_SCHEDULED_TIMERFD_NS = 999999999;
86
87 EpollEventLoop(EventLoopGroup parent, Executor executor, int maxEvents,
88 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
89 EventLoopTaskQueueFactory queueFactory) {
90 super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
91 rejectedExecutionHandler);
92 selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy");
93 if (maxEvents == 0) {
94 allowGrowing = true;
95 events = new EpollEventArray(4096);
96 } else {
97 allowGrowing = false;
98 events = new EpollEventArray(maxEvents);
99 }
100 boolean success = false;
101 FileDescriptor epollFd = null;
102 FileDescriptor eventFd = null;
103 FileDescriptor timerFd = null;
104 try {
105 this.epollFd = epollFd = Native.newEpollCreate();
106 this.eventFd = eventFd = Native.newEventFd();
107 try {
108 // It is important to use EPOLLET here as we only want to get the notification once per
109 // wakeup and don't call eventfd_read(...).
110 Native.epollCtlAdd(epollFd.intValue(), eventFd.intValue(), Native.EPOLLIN | Native.EPOLLET);
111 } catch (IOException e) {
112 throw new IllegalStateException("Unable to add eventFd filedescriptor to epoll", e);
113 }
114 this.timerFd = timerFd = Native.newTimerFd();
115 try {
116 // It is important to use EPOLLET here as we only want to get the notification once per
117 // wakeup and don't call read(...).
118 Native.epollCtlAdd(epollFd.intValue(), timerFd.intValue(), Native.EPOLLIN | Native.EPOLLET);
119 } catch (IOException e) {
120 throw new IllegalStateException("Unable to add timerFd filedescriptor to epoll", e);
121 }
122 success = true;
123 } finally {
124 if (!success) {
125 if (epollFd != null) {
126 try {
127 epollFd.close();
128 } catch (Exception e) {
129 // ignore
130 }
131 }
132 if (eventFd != null) {
133 try {
134 eventFd.close();
135 } catch (Exception e) {
136 // ignore
137 }
138 }
139 if (timerFd != null) {
140 try {
141 timerFd.close();
142 } catch (Exception e) {
143 // ignore
144 }
145 }
146 }
147 }
148 }
149
150 private static Queue<Runnable> newTaskQueue(
151 EventLoopTaskQueueFactory queueFactory) {
152 if (queueFactory == null) {
153 return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS);
154 }
155 return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
156 }
157
158 /**
159 * Return a cleared {@link IovArray} that can be used for writes in this {@link EventLoop}.
160 */
161 IovArray cleanIovArray() {
162 if (iovArray == null) {
163 iovArray = new IovArray();
164 } else {
165 iovArray.clear();
166 }
167 return iovArray;
168 }
169
170 /**
171 * Return a cleared {@link NativeDatagramPacketArray} that can be used for writes in this {@link EventLoop}.
172 */
173 NativeDatagramPacketArray cleanDatagramPacketArray() {
174 if (datagramPacketArray == null) {
175 datagramPacketArray = new NativeDatagramPacketArray();
176 } else {
177 datagramPacketArray.clear();
178 }
179 return datagramPacketArray;
180 }
181
182 @Override
183 protected void wakeup(boolean inEventLoop) {
184 if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
185 // write to the evfd which will then wake-up epoll_wait(...)
186 Native.eventFdWrite(eventFd.intValue(), 1L);
187 }
188 }
189
190 @Override
191 protected boolean beforeScheduledTaskSubmitted(long deadlineNanos) {
192 // Note this is also correct for the nextWakeupNanos == -1 (AWAKE) case
193 return deadlineNanos < nextWakeupNanos.get();
194 }
195
196 @Override
197 protected boolean afterScheduledTaskSubmitted(long deadlineNanos) {
198 // Note this is also correct for the nextWakeupNanos == -1 (AWAKE) case
199 return deadlineNanos < nextWakeupNanos.get();
200 }
201
202 /**
203 * Register the given epoll with this {@link EventLoop}.
204 */
205 void add(AbstractEpollChannel ch) throws IOException {
206 assert inEventLoop();
207 int fd = ch.socket.intValue();
208 Native.epollCtlAdd(epollFd.intValue(), fd, ch.flags);
209 AbstractEpollChannel old = channels.put(fd, ch);
210
211 // We either expect to have no Channel in the map with the same FD or that the FD of the old Channel is already
212 // closed.
213 assert old == null || !old.isOpen();
214 }
215
216 /**
217 * The flags of the given epoll was modified so update the registration
218 */
219 void modify(AbstractEpollChannel ch) throws IOException {
220 assert inEventLoop();
221 Native.epollCtlMod(epollFd.intValue(), ch.socket.intValue(), ch.flags);
222 }
223
224 /**
225 * Deregister the given epoll from this {@link EventLoop}.
226 */
227 void remove(AbstractEpollChannel ch) throws IOException {
228 assert inEventLoop();
229 int fd = ch.socket.intValue();
230
231 AbstractEpollChannel old = channels.remove(fd);
232 if (old != null && old != ch) {
233 // The Channel mapping was already replaced due FD reuse, put back the stored Channel.
234 channels.put(fd, old);
235
236 // If we found another Channel in the map that is mapped to the same FD the given Channel MUST be closed.
237 assert !ch.isOpen();
238 } else if (ch.isOpen()) {
239 // Remove the epoll. This is only needed if it's still open as otherwise it will be automatically
240 // removed once the file-descriptor is closed.
241 Native.epollCtlDel(epollFd.intValue(), fd);
242 }
243 }
244
245 @Override
246 protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
247 return newTaskQueue0(maxPendingTasks);
248 }
249
250 private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
251 // This event loop never calls takeTask()
252 return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
253 : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
254 }
255
256 /**
257 * Returns the percentage of the desired amount of time spent for I/O in the event loop.
258 */
259 public int getIoRatio() {
260 return ioRatio;
261 }
262
263 /**
264 * Sets the percentage of the desired amount of time spent for I/O in the event loop. The default value is
265 * {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks.
266 */
267 public void setIoRatio(int ioRatio) {
268 if (ioRatio <= 0 || ioRatio > 100) {
269 throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)");
270 }
271 this.ioRatio = ioRatio;
272 }
273
274 @Override
275 public int registeredChannels() {
276 return channels.size();
277 }
278
279 private int epollWait(long deadlineNanos) throws IOException {
280 if (deadlineNanos == NONE) {
281 return Native.epollWait(epollFd, events, timerFd, Integer.MAX_VALUE, 0); // disarm timer
282 }
283 long totalDelay = deadlineToDelayNanos(deadlineNanos);
284 int delaySeconds = (int) min(totalDelay / 1000000000L, Integer.MAX_VALUE);
285 int delayNanos = (int) min(totalDelay - delaySeconds * 1000000000L, MAX_SCHEDULED_TIMERFD_NS);
286 return Native.epollWait(epollFd, events, timerFd, delaySeconds, delayNanos);
287 }
288
289 private int epollWaitNoTimerChange() throws IOException {
290 return Native.epollWait(epollFd, events, false);
291 }
292
293 private int epollWaitNow() throws IOException {
294 return Native.epollWait(epollFd, events, true);
295 }
296
297 private int epollBusyWait() throws IOException {
298 return Native.epollBusyWait(epollFd, events);
299 }
300
301 private int epollWaitTimeboxed() throws IOException {
302 // Wait with 1 second "safeguard" timeout
303 return Native.epollWait(epollFd, events, 1000);
304 }
305
306 @Override
307 protected void run() {
308 long prevDeadlineNanos = NONE;
309 for (;;) {
310 try {
311 int strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
312 switch (strategy) {
313 case SelectStrategy.CONTINUE:
314 continue;
315
316 case SelectStrategy.BUSY_WAIT:
317 strategy = epollBusyWait();
318 break;
319
320 case SelectStrategy.SELECT:
321 if (pendingWakeup) {
322 // We are going to be immediately woken so no need to reset wakenUp
323 // or check for timerfd adjustment.
324 strategy = epollWaitTimeboxed();
325 if (strategy != 0) {
326 break;
327 }
328 // We timed out so assume that we missed the write event due to an
329 // abnormally failed syscall (the write itself or a prior epoll_wait)
330 logger.warn("Missed eventfd write (not seen after > 1 second)");
331 pendingWakeup = false;
332 if (hasTasks()) {
333 break;
334 }
335 // fall-through
336 }
337
338 long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
339 if (curDeadlineNanos == -1L) {
340 curDeadlineNanos = NONE; // nothing on the calendar
341 }
342 nextWakeupNanos.set(curDeadlineNanos);
343 try {
344 if (!hasTasks()) {
345 if (curDeadlineNanos == prevDeadlineNanos) {
346 // No timer activity needed
347 strategy = epollWaitNoTimerChange();
348 } else {
349 // Timerfd needs to be re-armed or disarmed
350 prevDeadlineNanos = curDeadlineNanos;
351 strategy = epollWait(curDeadlineNanos);
352 }
353 }
354 } finally {
355 // Try get() first to avoid much more expensive CAS in the case we
356 // were woken via the wakeup() method (submitted task)
357 if (nextWakeupNanos.get() == AWAKE || nextWakeupNanos.getAndSet(AWAKE) == AWAKE) {
358 pendingWakeup = true;
359 }
360 }
361 // fallthrough
362 default:
363 }
364
365 final int ioRatio = this.ioRatio;
366 if (ioRatio == 100) {
367 try {
368 if (strategy > 0 && processReady(events, strategy)) {
369 prevDeadlineNanos = NONE;
370 }
371 } finally {
372 // Ensure we always run tasks.
373 runAllTasks();
374 }
375 } else if (strategy > 0) {
376 final long ioStartTime = System.nanoTime();
377 try {
378 if (processReady(events, strategy)) {
379 prevDeadlineNanos = NONE;
380 }
381 } finally {
382 // Ensure we always run tasks.
383 final long ioTime = System.nanoTime() - ioStartTime;
384 runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
385 }
386 } else {
387 runAllTasks(0); // This will run the minimum number of tasks
388 }
389 if (allowGrowing && strategy == events.length()) {
390 //increase the size of the array as we needed the whole space for the events
391 events.increase();
392 }
393 } catch (Throwable t) {
394 handleLoopException(t);
395 }
396 // Always handle shutdown even if the loop processing threw an exception.
397 try {
398 if (isShuttingDown()) {
399 closeAll();
400 if (confirmShutdown()) {
401 break;
402 }
403 }
404 } catch (Throwable t) {
405 handleLoopException(t);
406 }
407 }
408 }
409
410 /**
411 * Visible only for testing!
412 */
413 void handleLoopException(Throwable t) {
414 logger.warn("Unexpected exception in the selector loop.", t);
415
416 // Prevent possible consecutive immediate failures that lead to
417 // excessive CPU consumption.
418 try {
419 Thread.sleep(1000);
420 } catch (InterruptedException e) {
421 // Ignore.
422 }
423 }
424
425 private void closeAll() {
426 // Using the intermediate collection to prevent ConcurrentModificationException.
427 // In the `close()` method, the channel is deleted from `channels` map.
428 AbstractEpollChannel[] localChannels = channels.values().toArray(new AbstractEpollChannel[0]);
429
430 for (AbstractEpollChannel ch: localChannels) {
431 ch.unsafe().close(ch.unsafe().voidPromise());
432 }
433 }
434
435 // Returns true if a timerFd event was encountered
436 private boolean processReady(EpollEventArray events, int ready) {
437 boolean timerFired = false;
438 for (int i = 0; i < ready; i ++) {
439 final int fd = events.fd(i);
440 if (fd == eventFd.intValue()) {
441 pendingWakeup = false;
442 } else if (fd == timerFd.intValue()) {
443 timerFired = true;
444 } else {
445 final long ev = events.events(i);
446
447 AbstractEpollChannel ch = channels.get(fd);
448 if (ch != null) {
449 // Don't change the ordering of processing EPOLLOUT | EPOLLRDHUP / EPOLLIN if you're not 100%
450 // sure about it!
451 // Re-ordering can easily introduce bugs and bad side-effects, as we found out painfully in the
452 // past.
453 AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) ch.unsafe();
454
455 // First check for EPOLLOUT as we may need to fail the connect ChannelPromise before try
456 // to read from the file descriptor.
457 // See https://github.com/netty/netty/issues/3785
458 //
459 // It is possible for an EPOLLOUT or EPOLLERR to be generated when a connection is refused.
460 // In either case epollOutReady() will do the correct thing (finish connecting, or fail
461 // the connection).
462 // See https://github.com/netty/netty/issues/3848
463 if ((ev & (Native.EPOLLERR | Native.EPOLLOUT)) != 0) {
464 // Force flush of data as the epoll is writable again
465 unsafe.epollOutReady();
466 }
467
468 // Check EPOLLIN before EPOLLRDHUP to ensure all data is read before shutting down the input.
469 // See https://github.com/netty/netty/issues/4317.
470 //
471 // If EPOLLIN or EPOLLERR was received and the channel is still open call epollInReady(). This will
472 // try to read from the underlying file descriptor and so notify the user about the error.
473 if ((ev & (Native.EPOLLERR | Native.EPOLLIN)) != 0) {
474 // The Channel is still open and there is something to read. Do it now.
475 unsafe.epollInReady();
476 }
477
478 // Check if EPOLLRDHUP was set, this will notify us for connection-reset in which case
479 // we may close the channel directly or try to read more data depending on the state of the
480 // Channel and als depending on the AbstractEpollChannel subtype.
481 if ((ev & Native.EPOLLRDHUP) != 0) {
482 unsafe.epollRdHupReady();
483 }
484 } else {
485 // We received an event for an fd which we not use anymore. Remove it from the epoll_event set.
486 try {
487 Native.epollCtlDel(epollFd.intValue(), fd);
488 } catch (IOException ignore) {
489 // This can happen but is nothing we need to worry about as we only try to delete
490 // the fd from the epoll set as we not found it in our mappings. So this call to
491 // epollCtlDel(...) is just to ensure we cleanup stuff and so may fail if it was
492 // deleted before or the file descriptor was closed before.
493 }
494 }
495 }
496 }
497 return timerFired;
498 }
499
500 @Override
501 protected void cleanup() {
502 try {
503 // Ensure any in-flight wakeup writes have been performed prior to closing eventFd.
504 while (pendingWakeup) {
505 try {
506 int count = epollWaitTimeboxed();
507 if (count == 0) {
508 // We timed-out so assume that the write we're expecting isn't coming
509 break;
510 }
511 for (int i = 0; i < count; i++) {
512 if (events.fd(i) == eventFd.intValue()) {
513 pendingWakeup = false;
514 break;
515 }
516 }
517 } catch (IOException ignore) {
518 // ignore
519 }
520 }
521 try {
522 eventFd.close();
523 } catch (IOException e) {
524 logger.warn("Failed to close the event fd.", e);
525 }
526 try {
527 timerFd.close();
528 } catch (IOException e) {
529 logger.warn("Failed to close the timer fd.", e);
530 }
531
532 try {
533 epollFd.close();
534 } catch (IOException e) {
535 logger.warn("Failed to close the epoll fd.", e);
536 }
537 } finally {
538 // release native memory
539 if (iovArray != null) {
540 iovArray.release();
541 iovArray = null;
542 }
543 if (datagramPacketArray != null) {
544 datagramPacketArray.release();
545 datagramPacketArray = null;
546 }
547 events.free();
548 }
549 }
550 }
551