1 /*
2  * Copyright 2014 The Netty Project
3  *
4  * The Netty Project licenses this file to you under the Apache License,
5  * version 2.0 (the "License"); you may not use this file except in compliance
6  * with the License. You may obtain a copy of the License at:
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13  * License for the specific language governing permissions and limitations
14  * under the License.
15  */

16 package io.netty.channel.epoll;
17
18 import io.netty.channel.EventLoop;
19 import io.netty.channel.EventLoopGroup;
20 import io.netty.channel.EventLoopTaskQueueFactory;
21 import io.netty.channel.SelectStrategy;
22 import io.netty.channel.SingleThreadEventLoop;
23 import io.netty.channel.epoll.AbstractEpollChannel.AbstractEpollUnsafe;
24 import io.netty.channel.unix.FileDescriptor;
25 import io.netty.channel.unix.IovArray;
26 import io.netty.util.IntSupplier;
27 import io.netty.util.collection.IntObjectHashMap;
28 import io.netty.util.collection.IntObjectMap;
29 import io.netty.util.concurrent.RejectedExecutionHandler;
30 import io.netty.util.internal.ObjectUtil;
31 import io.netty.util.internal.PlatformDependent;
32 import io.netty.util.internal.logging.InternalLogger;
33 import io.netty.util.internal.logging.InternalLoggerFactory;
34
35 import java.io.IOException;
36 import java.util.Queue;
37 import java.util.concurrent.Executor;
38 import java.util.concurrent.atomic.AtomicLong;
39
40 import static java.lang.Math.min;
41
42 /**
43  * {@link EventLoop} which uses epoll under the covers. Only works on Linux!
44  */

45 class EpollEventLoop extends SingleThreadEventLoop {
46     private static final InternalLogger logger = InternalLoggerFactory.getInstance(EpollEventLoop.class);
47
48     static {
49         // Ensure JNI is initialized by the time this class is loaded by this time!
50         // We use unix-common methods in this class which are backed by JNI methods.
51         Epoll.ensureAvailability();
52     }
53
54     private final FileDescriptor epollFd;
55     private final FileDescriptor eventFd;
56     private final FileDescriptor timerFd;
57     private final IntObjectMap<AbstractEpollChannel> channels = new IntObjectHashMap<AbstractEpollChannel>(4096);
58     private final boolean allowGrowing;
59     private final EpollEventArray events;
60
61     // These are initialized on first use
62     private IovArray iovArray;
63     private NativeDatagramPacketArray datagramPacketArray;
64
65     private final SelectStrategy selectStrategy;
66     private final IntSupplier selectNowSupplier = new IntSupplier() {
67         @Override
68         public int get() throws Exception {
69             return epollWaitNow();
70         }
71     };
72
73     private static final long AWAKE = -1L;
74     private static final long NONE = Long.MAX_VALUE;
75
76     // nextWakeupNanos is:
77     //    AWAKE            when EL is awake
78     //    NONE             when EL is waiting with no wakeup scheduled
79     //    other value T    when EL is waiting with wakeup scheduled at time T
80     private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE);
81     private boolean pendingWakeup;
82     private volatile int ioRatio = 50;
83
84     // See http://man7.org/linux/man-pages/man2/timerfd_create.2.html.
85     private static final long MAX_SCHEDULED_TIMERFD_NS = 999999999;
86
87     EpollEventLoop(EventLoopGroup parent, Executor executor, int maxEvents,
88                    SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
89                    EventLoopTaskQueueFactory queueFactory) {
90         super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
91                 rejectedExecutionHandler);
92         selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy");
93         if (maxEvents == 0) {
94             allowGrowing = true;
95             events = new EpollEventArray(4096);
96         } else {
97             allowGrowing = false;
98             events = new EpollEventArray(maxEvents);
99         }
100         boolean success = false;
101         FileDescriptor epollFd = null;
102         FileDescriptor eventFd = null;
103         FileDescriptor timerFd = null;
104         try {
105             this.epollFd = epollFd = Native.newEpollCreate();
106             this.eventFd = eventFd = Native.newEventFd();
107             try {
108                 // It is important to use EPOLLET here as we only want to get the notification once per
109                 // wakeup and don't call eventfd_read(...).
110                 Native.epollCtlAdd(epollFd.intValue(), eventFd.intValue(), Native.EPOLLIN | Native.EPOLLET);
111             } catch (IOException e) {
112                 throw new IllegalStateException("Unable to add eventFd filedescriptor to epoll", e);
113             }
114             this.timerFd = timerFd = Native.newTimerFd();
115             try {
116                 // It is important to use EPOLLET here as we only want to get the notification once per
117                 // wakeup and don't call read(...).
118                 Native.epollCtlAdd(epollFd.intValue(), timerFd.intValue(), Native.EPOLLIN | Native.EPOLLET);
119             } catch (IOException e) {
120                 throw new IllegalStateException("Unable to add timerFd filedescriptor to epoll", e);
121             }
122             success = true;
123         } finally {
124             if (!success) {
125                 if (epollFd != null) {
126                     try {
127                         epollFd.close();
128                     } catch (Exception e) {
129                         // ignore
130                     }
131                 }
132                 if (eventFd != null) {
133                     try {
134                         eventFd.close();
135                     } catch (Exception e) {
136                         // ignore
137                     }
138                 }
139                 if (timerFd != null) {
140                     try {
141                         timerFd.close();
142                     } catch (Exception e) {
143                         // ignore
144                     }
145                 }
146             }
147         }
148     }
149
150     private static Queue<Runnable> newTaskQueue(
151             EventLoopTaskQueueFactory queueFactory) {
152         if (queueFactory == null) {
153             return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS);
154         }
155         return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
156     }
157
158     /**
159      * Return a cleared {@link IovArray} that can be used for writes in this {@link EventLoop}.
160      */

161     IovArray cleanIovArray() {
162         if (iovArray == null) {
163             iovArray = new IovArray();
164         } else {
165             iovArray.clear();
166         }
167         return iovArray;
168     }
169
170     /**
171      * Return a cleared {@link NativeDatagramPacketArray} that can be used for writes in this {@link EventLoop}.
172      */

173     NativeDatagramPacketArray cleanDatagramPacketArray() {
174         if (datagramPacketArray == null) {
175             datagramPacketArray = new NativeDatagramPacketArray();
176         } else {
177             datagramPacketArray.clear();
178         }
179         return datagramPacketArray;
180     }
181
182     @Override
183     protected void wakeup(boolean inEventLoop) {
184         if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
185             // write to the evfd which will then wake-up epoll_wait(...)
186             Native.eventFdWrite(eventFd.intValue(), 1L);
187         }
188     }
189
190     @Override
191     protected boolean beforeScheduledTaskSubmitted(long deadlineNanos) {
192         // Note this is also correct for the nextWakeupNanos == -1 (AWAKE) case
193         return deadlineNanos < nextWakeupNanos.get();
194     }
195
196     @Override
197     protected boolean afterScheduledTaskSubmitted(long deadlineNanos) {
198         // Note this is also correct for the nextWakeupNanos == -1 (AWAKE) case
199         return deadlineNanos < nextWakeupNanos.get();
200     }
201
202     /**
203      * Register the given epoll with this {@link EventLoop}.
204      */

205     void add(AbstractEpollChannel ch) throws IOException {
206         assert inEventLoop();
207         int fd = ch.socket.intValue();
208         Native.epollCtlAdd(epollFd.intValue(), fd, ch.flags);
209         AbstractEpollChannel old = channels.put(fd, ch);
210
211         // We either expect to have no Channel in the map with the same FD or that the FD of the old Channel is already
212         // closed.
213         assert old == null || !old.isOpen();
214     }
215
216     /**
217      * The flags of the given epoll was modified so update the registration
218      */

219     void modify(AbstractEpollChannel ch) throws IOException {
220         assert inEventLoop();
221         Native.epollCtlMod(epollFd.intValue(), ch.socket.intValue(), ch.flags);
222     }
223
224     /**
225      * Deregister the given epoll from this {@link EventLoop}.
226      */

227     void remove(AbstractEpollChannel ch) throws IOException {
228         assert inEventLoop();
229         int fd = ch.socket.intValue();
230
231         AbstractEpollChannel old = channels.remove(fd);
232         if (old != null && old != ch) {
233             // The Channel mapping was already replaced due FD reuse, put back the stored Channel.
234             channels.put(fd, old);
235
236             // If we found another Channel in the map that is mapped to the same FD the given Channel MUST be closed.
237             assert !ch.isOpen();
238         } else if (ch.isOpen()) {
239             // Remove the epoll. This is only needed if it's still open as otherwise it will be automatically
240             // removed once the file-descriptor is closed.
241             Native.epollCtlDel(epollFd.intValue(), fd);
242         }
243     }
244
245     @Override
246     protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
247         return newTaskQueue0(maxPendingTasks);
248     }
249
250     private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
251         // This event loop never calls takeTask()
252         return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
253                 : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
254     }
255
256     /**
257      * Returns the percentage of the desired amount of time spent for I/O in the event loop.
258      */

259     public int getIoRatio() {
260         return ioRatio;
261     }
262
263     /**
264      * Sets the percentage of the desired amount of time spent for I/O in the event loop.  The default value is
265      * {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks.
266      */

267     public void setIoRatio(int ioRatio) {
268         if (ioRatio <= 0 || ioRatio > 100) {
269             throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)");
270         }
271         this.ioRatio = ioRatio;
272     }
273
274     @Override
275     public int registeredChannels() {
276         return channels.size();
277     }
278
279     private int epollWait(long deadlineNanos) throws IOException {
280         if (deadlineNanos == NONE) {
281             return Native.epollWait(epollFd, events, timerFd, Integer.MAX_VALUE, 0); // disarm timer
282         }
283         long totalDelay = deadlineToDelayNanos(deadlineNanos);
284         int delaySeconds = (int) min(totalDelay / 1000000000L, Integer.MAX_VALUE);
285         int delayNanos = (int) min(totalDelay - delaySeconds * 1000000000L, MAX_SCHEDULED_TIMERFD_NS);
286         return Native.epollWait(epollFd, events, timerFd, delaySeconds, delayNanos);
287     }
288
289     private int epollWaitNoTimerChange() throws IOException {
290         return Native.epollWait(epollFd, events, false);
291     }
292
293     private int epollWaitNow() throws IOException {
294         return Native.epollWait(epollFd, events, true);
295     }
296
297     private int epollBusyWait() throws IOException {
298         return Native.epollBusyWait(epollFd, events);
299     }
300
301     private int epollWaitTimeboxed() throws IOException {
302         // Wait with 1 second "safeguard" timeout
303         return Native.epollWait(epollFd, events, 1000);
304     }
305
306     @Override
307     protected void run() {
308         long prevDeadlineNanos = NONE;
309         for (;;) {
310             try {
311                 int strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
312                 switch (strategy) {
313                     case SelectStrategy.CONTINUE:
314                         continue;
315
316                     case SelectStrategy.BUSY_WAIT:
317                         strategy = epollBusyWait();
318                         break;
319
320                     case SelectStrategy.SELECT:
321                         if (pendingWakeup) {
322                             // We are going to be immediately woken so no need to reset wakenUp
323                             // or check for timerfd adjustment.
324                             strategy = epollWaitTimeboxed();
325                             if (strategy != 0) {
326                                 break;
327                             }
328                             // We timed out so assume that we missed the write event due to an
329                             // abnormally failed syscall (the write itself or a prior epoll_wait)
330                             logger.warn("Missed eventfd write (not seen after > 1 second)");
331                             pendingWakeup = false;
332                             if (hasTasks()) {
333                                 break;
334                             }
335                             // fall-through
336                         }
337
338                         long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
339                         if (curDeadlineNanos == -1L) {
340                             curDeadlineNanos = NONE; // nothing on the calendar
341                         }
342                         nextWakeupNanos.set(curDeadlineNanos);
343                         try {
344                             if (!hasTasks()) {
345                                 if (curDeadlineNanos == prevDeadlineNanos) {
346                                     // No timer activity needed
347                                     strategy = epollWaitNoTimerChange();
348                                 } else {
349                                     // Timerfd needs to be re-armed or disarmed
350                                     prevDeadlineNanos = curDeadlineNanos;
351                                     strategy = epollWait(curDeadlineNanos);
352                                 }
353                             }
354                         } finally {
355                             // Try get() first to avoid much more expensive CAS in the case we
356                             // were woken via the wakeup() method (submitted task)
357                             if (nextWakeupNanos.get() == AWAKE || nextWakeupNanos.getAndSet(AWAKE) == AWAKE) {
358                                 pendingWakeup = true;
359                             }
360                         }
361                         // fallthrough
362                     default:
363                 }
364
365                 final int ioRatio = this.ioRatio;
366                 if (ioRatio == 100) {
367                     try {
368                         if (strategy > 0 && processReady(events, strategy)) {
369                             prevDeadlineNanos = NONE;
370                         }
371                     } finally {
372                         // Ensure we always run tasks.
373                         runAllTasks();
374                     }
375                 } else if (strategy > 0) {
376                     final long ioStartTime = System.nanoTime();
377                     try {
378                         if (processReady(events, strategy)) {
379                             prevDeadlineNanos = NONE;
380                         }
381                     } finally {
382                         // Ensure we always run tasks.
383                         final long ioTime = System.nanoTime() - ioStartTime;
384                         runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
385                     }
386                 } else {
387                     runAllTasks(0); // This will run the minimum number of tasks
388                 }
389                 if (allowGrowing && strategy == events.length()) {
390                     //increase the size of the array as we needed the whole space for the events
391                     events.increase();
392                 }
393             } catch (Throwable t) {
394                 handleLoopException(t);
395             }
396             // Always handle shutdown even if the loop processing threw an exception.
397             try {
398                 if (isShuttingDown()) {
399                     closeAll();
400                     if (confirmShutdown()) {
401                         break;
402                     }
403                 }
404             } catch (Throwable t) {
405                 handleLoopException(t);
406             }
407         }
408     }
409
410     /**
411      * Visible only for testing!
412      */

413     void handleLoopException(Throwable t) {
414         logger.warn("Unexpected exception in the selector loop.", t);
415
416         // Prevent possible consecutive immediate failures that lead to
417         // excessive CPU consumption.
418         try {
419             Thread.sleep(1000);
420         } catch (InterruptedException e) {
421             // Ignore.
422         }
423     }
424
425     private void closeAll() {
426         // Using the intermediate collection to prevent ConcurrentModificationException.
427         // In the `close()` method, the channel is deleted from `channels` map.
428         AbstractEpollChannel[] localChannels = channels.values().toArray(new AbstractEpollChannel[0]);
429
430         for (AbstractEpollChannel ch: localChannels) {
431             ch.unsafe().close(ch.unsafe().voidPromise());
432         }
433     }
434
435     // Returns true if a timerFd event was encountered
436     private boolean processReady(EpollEventArray events, int ready) {
437         boolean timerFired = false;
438         for (int i = 0; i < ready; i ++) {
439             final int fd = events.fd(i);
440             if (fd == eventFd.intValue()) {
441                 pendingWakeup = false;
442             } else if (fd == timerFd.intValue()) {
443                 timerFired = true;
444             } else {
445                 final long ev = events.events(i);
446
447                 AbstractEpollChannel ch = channels.get(fd);
448                 if (ch != null) {
449                     // Don't change the ordering of processing EPOLLOUT | EPOLLRDHUP / EPOLLIN if you're not 100%
450                     // sure about it!
451                     // Re-ordering can easily introduce bugs and bad side-effects, as we found out painfully in the
452                     // past.
453                     AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) ch.unsafe();
454
455                     // First check for EPOLLOUT as we may need to fail the connect ChannelPromise before try
456                     // to read from the file descriptor.
457                     // See https://github.com/netty/netty/issues/3785
458                     //
459                     // It is possible for an EPOLLOUT or EPOLLERR to be generated when a connection is refused.
460                     // In either case epollOutReady() will do the correct thing (finish connecting, or fail
461                     // the connection).
462                     // See https://github.com/netty/netty/issues/3848
463                     if ((ev & (Native.EPOLLERR | Native.EPOLLOUT)) != 0) {
464                         // Force flush of data as the epoll is writable again
465                         unsafe.epollOutReady();
466                     }
467
468                     // Check EPOLLIN before EPOLLRDHUP to ensure all data is read before shutting down the input.
469                     // See https://github.com/netty/netty/issues/4317.
470                     //
471                     // If EPOLLIN or EPOLLERR was received and the channel is still open call epollInReady(). This will
472                     // try to read from the underlying file descriptor and so notify the user about the error.
473                     if ((ev & (Native.EPOLLERR | Native.EPOLLIN)) != 0) {
474                         // The Channel is still open and there is something to read. Do it now.
475                         unsafe.epollInReady();
476                     }
477
478                     // Check if EPOLLRDHUP was set, this will notify us for connection-reset in which case
479                     // we may close the channel directly or try to read more data depending on the state of the
480                     // Channel and als depending on the AbstractEpollChannel subtype.
481                     if ((ev & Native.EPOLLRDHUP) != 0) {
482                         unsafe.epollRdHupReady();
483                     }
484                 } else {
485                     // We received an event for an fd which we not use anymore. Remove it from the epoll_event set.
486                     try {
487                         Native.epollCtlDel(epollFd.intValue(), fd);
488                     } catch (IOException ignore) {
489                         // This can happen but is nothing we need to worry about as we only try to delete
490                         // the fd from the epoll set as we not found it in our mappings. So this call to
491                         // epollCtlDel(...) is just to ensure we cleanup stuff and so may fail if it was
492                         // deleted before or the file descriptor was closed before.
493                     }
494                 }
495             }
496         }
497         return timerFired;
498     }
499
500     @Override
501     protected void cleanup() {
502         try {
503             // Ensure any in-flight wakeup writes have been performed prior to closing eventFd.
504             while (pendingWakeup) {
505                 try {
506                     int count = epollWaitTimeboxed();
507                     if (count == 0) {
508                         // We timed-out so assume that the write we're expecting isn't coming
509                         break;
510                     }
511                     for (int i = 0; i < count; i++) {
512                         if (events.fd(i) == eventFd.intValue()) {
513                             pendingWakeup = false;
514                             break;
515                         }
516                     }
517                 } catch (IOException ignore) {
518                     // ignore
519                 }
520             }
521             try {
522                 eventFd.close();
523             } catch (IOException e) {
524                 logger.warn("Failed to close the event fd.", e);
525             }
526             try {
527                 timerFd.close();
528             } catch (IOException e) {
529                 logger.warn("Failed to close the timer fd.", e);
530             }
531
532             try {
533                 epollFd.close();
534             } catch (IOException e) {
535                 logger.warn("Failed to close the epoll fd.", e);
536             }
537         } finally {
538             // release native memory
539             if (iovArray != null) {
540                 iovArray.release();
541                 iovArray = null;
542             }
543             if (datagramPacketArray != null) {
544                 datagramPacketArray.release();
545                 datagramPacketArray = null;
546             }
547             events.free();
548         }
549     }
550 }
551