1 /*
2  * Copyright 2015 The Netty Project
3  *
4  * The Netty Project licenses this file to you under the Apache License,
5  * version 2.0 (the "License"); you may not use this file except in compliance
6  * with the License. You may obtain a copy of the License at:
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13  * License for the specific language governing permissions and limitations
14  * under the License.
15  */

16 package io.netty.channel.epoll;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.channel.Channel;
21 import io.netty.channel.ChannelConfig;
22 import io.netty.channel.ChannelFuture;
23 import io.netty.channel.ChannelFutureListener;
24 import io.netty.channel.ChannelMetadata;
25 import io.netty.channel.ChannelOutboundBuffer;
26 import io.netty.channel.ChannelPipeline;
27 import io.netty.channel.ChannelPromise;
28 import io.netty.channel.DefaultFileRegion;
29 import io.netty.channel.EventLoop;
30 import io.netty.channel.FileRegion;
31 import io.netty.channel.RecvByteBufAllocator;
32 import io.netty.channel.internal.ChannelUtils;
33 import io.netty.channel.socket.DuplexChannel;
34 import io.netty.channel.unix.FileDescriptor;
35 import io.netty.channel.unix.IovArray;
36 import io.netty.channel.unix.SocketWritableByteChannel;
37 import io.netty.channel.unix.UnixChannelUtil;
38 import io.netty.util.internal.PlatformDependent;
39 import io.netty.util.internal.StringUtil;
40 import io.netty.util.internal.UnstableApi;
41 import io.netty.util.internal.logging.InternalLogger;
42 import io.netty.util.internal.logging.InternalLoggerFactory;
43
44 import java.io.IOException;
45 import java.net.SocketAddress;
46 import java.nio.ByteBuffer;
47 import java.nio.channels.ClosedChannelException;
48 import java.nio.channels.WritableByteChannel;
49 import java.util.Queue;
50 import java.util.concurrent.Executor;
51
52 import static io.netty.channel.internal.ChannelUtils.MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD;
53 import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
54 import static io.netty.channel.unix.FileDescriptor.pipe;
55 import static io.netty.util.internal.ObjectUtil.checkNotNull;
56 import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
57
58 public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel implements DuplexChannel {
59     private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
60     private static final String EXPECTED_TYPES =
61             " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
62                     StringUtil.simpleClassName(DefaultFileRegion.class) + ')';
63     private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractEpollStreamChannel.class);
64
65     private final Runnable flushTask = new Runnable() {
66         @Override
67         public void run() {
68             // Calling flush0 directly to ensure we not try to flush messages that were added via write(...) in the
69             // meantime.
70             ((AbstractEpollUnsafe) unsafe()).flush0();
71         }
72     };
73
74     // Lazy init these if we need to splice(...)
75     private volatile Queue<SpliceInTask> spliceQueue;
76     private FileDescriptor pipeIn;
77     private FileDescriptor pipeOut;
78
79     private WritableByteChannel byteChannel;
80
81     protected AbstractEpollStreamChannel(Channel parent, int fd) {
82         this(parent, new LinuxSocket(fd));
83     }
84
85     protected AbstractEpollStreamChannel(int fd) {
86         this(new LinuxSocket(fd));
87     }
88
89     AbstractEpollStreamChannel(LinuxSocket fd) {
90         this(fd, isSoErrorZero(fd));
91     }
92
93     AbstractEpollStreamChannel(Channel parent, LinuxSocket fd) {
94         super(parent, fd, true);
95         // Add EPOLLRDHUP so we are notified once the remote peer close the connection.
96         flags |= Native.EPOLLRDHUP;
97     }
98
99     AbstractEpollStreamChannel(Channel parent, LinuxSocket fd, SocketAddress remote) {
100         super(parent, fd, remote);
101         // Add EPOLLRDHUP so we are notified once the remote peer close the connection.
102         flags |= Native.EPOLLRDHUP;
103     }
104
105     protected AbstractEpollStreamChannel(LinuxSocket fd, boolean active) {
106         super(null, fd, active);
107         // Add EPOLLRDHUP so we are notified once the remote peer close the connection.
108         flags |= Native.EPOLLRDHUP;
109     }
110
111     @Override
112     protected AbstractEpollUnsafe newUnsafe() {
113         return new EpollStreamUnsafe();
114     }
115
116     @Override
117     public ChannelMetadata metadata() {
118         return METADATA;
119     }
120
121     /**
122      * Splice from this {@link AbstractEpollStreamChannel} to another {@link AbstractEpollStreamChannel}.
123      * The {@code len} is the number of bytes to splice. If using {@link Integer#MAX_VALUE} it will
124      * splice until the {@link ChannelFuture} was canceled or it was failed.
125      *
126      * Please note:
127      * <ul>
128      *   <li>both channels need to be registered to the same {@link EventLoop}, otherwise an
129      *   {@link IllegalArgumentException} is thrown. </li>
130      *   <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this and the
131      *   target {@link AbstractEpollStreamChannel}</li>
132      * </ul>
133      *
134      */

135     public final ChannelFuture spliceTo(final AbstractEpollStreamChannel ch, final int len) {
136         return spliceTo(ch, len, newPromise());
137     }
138
139     /**
140      * Splice from this {@link AbstractEpollStreamChannel} to another {@link AbstractEpollStreamChannel}.
141      * The {@code len} is the number of bytes to splice. If using {@link Integer#MAX_VALUE} it will
142      * splice until the {@link ChannelFuture} was canceled or it was failed.
143      *
144      * Please note:
145      * <ul>
146      *   <li>both channels need to be registered to the same {@link EventLoop}, otherwise an
147      *   {@link IllegalArgumentException} is thrown. </li>
148      *   <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this and the
149      *   target {@link AbstractEpollStreamChannel}</li>
150      * </ul>
151      *
152      */

153     public final ChannelFuture spliceTo(final AbstractEpollStreamChannel ch, final int len,
154                                         final ChannelPromise promise) {
155         if (ch.eventLoop() != eventLoop()) {
156             throw new IllegalArgumentException("EventLoops are not the same.");
157         }
158         checkPositiveOrZero(len, "len");
159         if (ch.config().getEpollMode() != EpollMode.LEVEL_TRIGGERED
160                 || config().getEpollMode() != EpollMode.LEVEL_TRIGGERED) {
161             throw new IllegalStateException("spliceTo() supported only when using " + EpollMode.LEVEL_TRIGGERED);
162         }
163         checkNotNull(promise, "promise");
164         if (!isOpen()) {
165             promise.tryFailure(new ClosedChannelException());
166         } else {
167             addToSpliceQueue(new SpliceInChannelTask(ch, len, promise));
168             failSpliceIfClosed(promise);
169         }
170         return promise;
171     }
172
173     /**
174      * Splice from this {@link AbstractEpollStreamChannel} to another {@link FileDescriptor}.
175      * The {@code offset} is the offset for the {@link FileDescriptor} and {@code len} is the
176      * number of bytes to splice. If using {@link Integer#MAX_VALUE} it will splice until the
177      * {@link ChannelFuture} was canceled or it was failed.
178      *
179      * Please note:
180      * <ul>
181      *   <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this
182      *   {@link AbstractEpollStreamChannel}</li>
183      *   <li>the {@link FileDescriptor} will not be closed after the {@link ChannelFuture} is notified</li>
184      *   <li>this channel must be registered to an event loop or {@link IllegalStateException} will be thrown.</li>
185      * </ul>
186      */

187     public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len) {
188         return spliceTo(ch, offset, len, newPromise());
189     }
190
191     /**
192      * Splice from this {@link AbstractEpollStreamChannel} to another {@link FileDescriptor}.
193      * The {@code offset} is the offset for the {@link FileDescriptor} and {@code len} is the
194      * number of bytes to splice. If using {@link Integer#MAX_VALUE} it will splice until the
195      * {@link ChannelFuture} was canceled or it was failed.
196      *
197      * Please note:
198      * <ul>
199      *   <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this
200      *   {@link AbstractEpollStreamChannel}</li>
201      *   <li>the {@link FileDescriptor} will not be closed after the {@link ChannelPromise} is notified</li>
202      *   <li>this channel must be registered to an event loop or {@link IllegalStateException} will be thrown.</li>
203      * </ul>
204      */

205     public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len,
206                                         final ChannelPromise promise) {
207         checkPositiveOrZero(len, "len");
208         checkPositiveOrZero(offset, "offset");
209         if (config().getEpollMode() != EpollMode.LEVEL_TRIGGERED) {
210             throw new IllegalStateException("spliceTo() supported only when using " + EpollMode.LEVEL_TRIGGERED);
211         }
212         checkNotNull(promise, "promise");
213         if (!isOpen()) {
214             promise.tryFailure(new ClosedChannelException());
215         } else {
216             addToSpliceQueue(new SpliceFdTask(ch, offset, len, promise));
217             failSpliceIfClosed(promise);
218         }
219         return promise;
220     }
221
222     private void failSpliceIfClosed(ChannelPromise promise) {
223         if (!isOpen()) {
224             // Seems like the Channel was closed in the meantime try to fail the promise to prevent any
225             // cases where a future may not be notified otherwise.
226             if (promise.tryFailure(new ClosedChannelException())) {
227                 eventLoop().execute(new Runnable() {
228                     @Override
229                     public void run() {
230                         // Call this via the EventLoop as it is a MPSC queue.
231                         clearSpliceQueue();
232                     }
233                 });
234             }
235         }
236     }
237
238     /**
239      * Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}.
240      * @param in the collection which contains objects to write.
241      * @param buf the {@link ByteBuf} from which the bytes should be written
242      * @return The value that should be decremented from the write quantum which starts at
243      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
244      * <ul>
245      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
246      *     is encountered</li>
247      *     <li>1 - if a single call to write data was made to the OS</li>
248      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
249      *     no data was accepted</li>
250      * </ul>
251      */

252     private int writeBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
253         int readableBytes = buf.readableBytes();
254         if (readableBytes == 0) {
255             in.remove();
256             return 0;
257         }
258
259         if (buf.hasMemoryAddress() || buf.nioBufferCount() == 1) {
260             return doWriteBytes(in, buf);
261         } else {
262             ByteBuffer[] nioBuffers = buf.nioBuffers();
263             return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes,
264                     config().getMaxBytesPerGatheringWrite());
265         }
266     }
267
268     private void adjustMaxBytesPerGatheringWrite(long attempted, long written, long oldMaxBytesPerGatheringWrite) {
269         // By default we track the SO_SNDBUF when ever it is explicitly set. However some OSes may dynamically change
270         // SO_SNDBUF (and other characteristics that determine how much data can be written at once) so we should try
271         // make a best effort to adjust as OS behavior changes.
272         if (attempted == written) {
273             if (attempted << 1 > oldMaxBytesPerGatheringWrite) {
274                 config().setMaxBytesPerGatheringWrite(attempted << 1);
275             }
276         } else if (attempted > MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD && written < attempted >>> 1) {
277             config().setMaxBytesPerGatheringWrite(attempted >>> 1);
278         }
279     }
280
281     /**
282      * Write multiple bytes via {@link IovArray}.
283      * @param in the collection which contains objects to write.
284      * @param array The array which contains the content to write.
285      * @return The value that should be decremented from the write quantum which starts at
286      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
287      * <ul>
288      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
289      *     is encountered</li>
290      *     <li>1 - if a single call to write data was made to the OS</li>
291      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
292      *     no data was accepted</li>
293      * </ul>
294      * @throws IOException If an I/O exception occurs during write.
295      */

296     private int writeBytesMultiple(ChannelOutboundBuffer in, IovArray array) throws IOException {
297         final long expectedWrittenBytes = array.size();
298         assert expectedWrittenBytes != 0;
299         final int cnt = array.count();
300         assert cnt != 0;
301
302         final long localWrittenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
303         if (localWrittenBytes > 0) {
304             adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, array.maxBytes());
305             in.removeBytes(localWrittenBytes);
306             return 1;
307         }
308         return WRITE_STATUS_SNDBUF_FULL;
309     }
310
311     /**
312      * Write multiple bytes via {@link ByteBuffer} array.
313      * @param in the collection which contains objects to write.
314      * @param nioBuffers The buffers to write.
315      * @param nioBufferCnt The number of buffers to write.
316      * @param expectedWrittenBytes The number of bytes we expect to write.
317      * @param maxBytesPerGatheringWrite The maximum number of bytes we should attempt to write.
318      * @return The value that should be decremented from the write quantum which starts at
319      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
320      * <ul>
321      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
322      *     is encountered</li>
323      *     <li>1 - if a single call to write data was made to the OS</li>
324      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
325      *     no data was accepted</li>
326      * </ul>
327      * @throws IOException If an I/O exception occurs during write.
328      */

329     private int writeBytesMultiple(
330             ChannelOutboundBuffer in, ByteBuffer[] nioBuffers, int nioBufferCnt, long expectedWrittenBytes,
331             long maxBytesPerGatheringWrite) throws IOException {
332         assert expectedWrittenBytes != 0;
333         if (expectedWrittenBytes > maxBytesPerGatheringWrite) {
334             expectedWrittenBytes = maxBytesPerGatheringWrite;
335         }
336
337         final long localWrittenBytes = socket.writev(nioBuffers, 0, nioBufferCnt, expectedWrittenBytes);
338         if (localWrittenBytes > 0) {
339             adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, maxBytesPerGatheringWrite);
340             in.removeBytes(localWrittenBytes);
341             return 1;
342         }
343         return WRITE_STATUS_SNDBUF_FULL;
344     }
345
346     /**
347      * Write a {@link DefaultFileRegion}
348      * @param in the collection which contains objects to write.
349      * @param region the {@link DefaultFileRegion} from which the bytes should be written
350      * @return The value that should be decremented from the write quantum which starts at
351      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
352      * <ul>
353      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
354      *     is encountered</li>
355      *     <li>1 - if a single call to write data was made to the OS</li>
356      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
357      *     no data was accepted</li>
358      * </ul>
359      */

360     private int writeDefaultFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception {
361         final long offset = region.transferred();
362         final long regionCount = region.count();
363         if (offset >= regionCount) {
364             in.remove();
365             return 0;
366         }
367
368         final long flushedAmount = socket.sendFile(region, region.position(), offset, regionCount - offset);
369         if (flushedAmount > 0) {
370             in.progress(flushedAmount);
371             if (region.transferred() >= regionCount) {
372                 in.remove();
373             }
374             return 1;
375         } else if (flushedAmount == 0) {
376             validateFileRegion(region, offset);
377         }
378         return WRITE_STATUS_SNDBUF_FULL;
379     }
380
381     /**
382      * Write a {@link FileRegion}
383      * @param in the collection which contains objects to write.
384      * @param region the {@link FileRegion} from which the bytes should be written
385      * @return The value that should be decremented from the write quantum which starts at
386      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
387      * <ul>
388      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
389      *     is encountered</li>
390      *     <li>1 - if a single call to write data was made to the OS</li>
391      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
392      *     no data was accepted</li>
393      * </ul>
394      */

395     private int writeFileRegion(ChannelOutboundBuffer in, FileRegion region) throws Exception {
396         if (region.transferred() >= region.count()) {
397             in.remove();
398             return 0;
399         }
400
401         if (byteChannel == null) {
402             byteChannel = new EpollSocketWritableByteChannel();
403         }
404         final long flushedAmount = region.transferTo(byteChannel, region.transferred());
405         if (flushedAmount > 0) {
406             in.progress(flushedAmount);
407             if (region.transferred() >= region.count()) {
408                 in.remove();
409             }
410             return 1;
411         }
412         return WRITE_STATUS_SNDBUF_FULL;
413     }
414
415     @Override
416     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
417         int writeSpinCount = config().getWriteSpinCount();
418         do {
419             final int msgCount = in.size();
420             // Do gathering write if the outbound buffer entries start with more than one ByteBuf.
421             if (msgCount > 1 && in.current() instanceof ByteBuf) {
422                 writeSpinCount -= doWriteMultiple(in);
423             } else if (msgCount == 0) {
424                 // Wrote all messages.
425                 clearFlag(Native.EPOLLOUT);
426                 // Return here so we not set the EPOLLOUT flag.
427                 return;
428             } else {  // msgCount == 1
429                 writeSpinCount -= doWriteSingle(in);
430             }
431
432             // We do not break the loop here even if the outbound buffer was flushed completely,
433             // because a user might have triggered another write and flush when we notify his or her
434             // listeners.
435         } while (writeSpinCount > 0);
436
437         if (writeSpinCount == 0) {
438             // It is possible that we have set EPOLLOUT, woken up by EPOLL because the socket is writable, and then use
439             // our write quantum. In this case we no longer want to set the EPOLLOUT flag because the socket is still
440             // writable (as far as we know). We will find out next time we attempt to write if the socket is writable
441             // and set the EPOLLOUT if necessary.
442             clearFlag(Native.EPOLLOUT);
443
444             // We used our writeSpin quantum, and should try to write again later.
445             eventLoop().execute(flushTask);
446         } else {
447             // Underlying descriptor can not accept all data currently, so set the EPOLLOUT flag to be woken up
448             // when it can accept more data.
449             setFlag(Native.EPOLLOUT);
450         }
451     }
452
453     /**
454      * Attempt to write a single object.
455      * @param in the collection which contains objects to write.
456      * @return The value that should be decremented from the write quantum which starts at
457      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
458      * <ul>
459      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
460      *     is encountered</li>
461      *     <li>1 - if a single call to write data was made to the OS</li>
462      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
463      *     no data was accepted</li>
464      * </ul>
465      * @throws Exception If an I/O error occurs.
466      */

467     protected int doWriteSingle(ChannelOutboundBuffer in) throws Exception {
468         // The outbound buffer contains only one message or it contains a file region.
469         Object msg = in.current();
470         if (msg instanceof ByteBuf) {
471             return writeBytes(in, (ByteBuf) msg);
472         } else if (msg instanceof DefaultFileRegion) {
473             return writeDefaultFileRegion(in, (DefaultFileRegion) msg);
474         } else if (msg instanceof FileRegion) {
475             return writeFileRegion(in, (FileRegion) msg);
476         } else if (msg instanceof SpliceOutTask) {
477             if (!((SpliceOutTask) msg).spliceOut()) {
478                 return WRITE_STATUS_SNDBUF_FULL;
479             }
480             in.remove();
481             return 1;
482         } else {
483             // Should never reach here.
484             throw new Error();
485         }
486     }
487
488     /**
489      * Attempt to write multiple {@link ByteBuf} objects.
490      * @param in the collection which contains objects to write.
491      * @return The value that should be decremented from the write quantum which starts at
492      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
493      * <ul>
494      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
495      *     is encountered</li>
496      *     <li>1 - if a single call to write data was made to the OS</li>
497      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
498      *     no data was accepted</li>
499      * </ul>
500      * @throws Exception If an I/O error occurs.
501      */

502     private int doWriteMultiple(ChannelOutboundBuffer in) throws Exception {
503         final long maxBytesPerGatheringWrite = config().getMaxBytesPerGatheringWrite();
504         IovArray array = ((EpollEventLoop) eventLoop()).cleanIovArray();
505         array.maxBytes(maxBytesPerGatheringWrite);
506         in.forEachFlushedMessage(array);
507
508         if (array.count() >= 1) {
509             // TODO: Handle the case where cnt == 1 specially.
510             return writeBytesMultiple(in, array);
511         }
512         // cnt == 0, which means the outbound buffer contained empty buffers only.
513         in.removeBytes(0);
514         return 0;
515     }
516
517     @Override
518     protected Object filterOutboundMessage(Object msg) {
519         if (msg instanceof ByteBuf) {
520             ByteBuf buf = (ByteBuf) msg;
521             return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf): buf;
522         }
523
524         if (msg instanceof FileRegion || msg instanceof SpliceOutTask) {
525             return msg;
526         }
527
528         throw new UnsupportedOperationException(
529                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
530     }
531
532     @UnstableApi
533     @Override
534     protected final void doShutdownOutput() throws Exception {
535         socket.shutdown(falsetrue);
536     }
537
538     private void shutdownInput0(final ChannelPromise promise) {
539         try {
540             socket.shutdown(truefalse);
541             promise.setSuccess();
542         } catch (Throwable cause) {
543             promise.setFailure(cause);
544         }
545     }
546
547     @Override
548     public boolean isOutputShutdown() {
549         return socket.isOutputShutdown();
550     }
551
552     @Override
553     public boolean isInputShutdown() {
554         return socket.isInputShutdown();
555     }
556
557     @Override
558     public boolean isShutdown() {
559         return socket.isShutdown();
560     }
561
562     @Override
563     public ChannelFuture shutdownOutput() {
564         return shutdownOutput(newPromise());
565     }
566
567     @Override
568     public ChannelFuture shutdownOutput(final ChannelPromise promise) {
569         EventLoop loop = eventLoop();
570         if (loop.inEventLoop()) {
571             ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
572         } else {
573             loop.execute(new Runnable() {
574                 @Override
575                 public void run() {
576                     ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
577                 }
578             });
579         }
580
581         return promise;
582     }
583
584     @Override
585     public ChannelFuture shutdownInput() {
586         return shutdownInput(newPromise());
587     }
588
589     @Override
590     public ChannelFuture shutdownInput(final ChannelPromise promise) {
591         Executor closeExecutor = ((EpollStreamUnsafe) unsafe()).prepareToClose();
592         if (closeExecutor != null) {
593             closeExecutor.execute(new Runnable() {
594                 @Override
595                 public void run() {
596                     shutdownInput0(promise);
597                 }
598             });
599         } else {
600             EventLoop loop = eventLoop();
601             if (loop.inEventLoop()) {
602                 shutdownInput0(promise);
603             } else {
604                 loop.execute(new Runnable() {
605                     @Override
606                     public void run() {
607                         shutdownInput0(promise);
608                     }
609                 });
610             }
611         }
612         return promise;
613     }
614
615     @Override
616     public ChannelFuture shutdown() {
617         return shutdown(newPromise());
618     }
619
620     @Override
621     public ChannelFuture shutdown(final ChannelPromise promise) {
622         ChannelFuture shutdownOutputFuture = shutdownOutput();
623         if (shutdownOutputFuture.isDone()) {
624             shutdownOutputDone(shutdownOutputFuture, promise);
625         } else {
626             shutdownOutputFuture.addListener(new ChannelFutureListener() {
627                 @Override
628                 public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception {
629                     shutdownOutputDone(shutdownOutputFuture, promise);
630                 }
631             });
632         }
633         return promise;
634     }
635
636     private void shutdownOutputDone(final ChannelFuture shutdownOutputFuture, final ChannelPromise promise) {
637         ChannelFuture shutdownInputFuture = shutdownInput();
638         if (shutdownInputFuture.isDone()) {
639             shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
640         } else {
641             shutdownInputFuture.addListener(new ChannelFutureListener() {
642                 @Override
643                 public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception {
644                     shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
645                 }
646             });
647         }
648     }
649
650     private static void shutdownDone(ChannelFuture shutdownOutputFuture,
651                               ChannelFuture shutdownInputFuture,
652                               ChannelPromise promise) {
653         Throwable shutdownOutputCause = shutdownOutputFuture.cause();
654         Throwable shutdownInputCause = shutdownInputFuture.cause();
655         if (shutdownOutputCause != null) {
656             if (shutdownInputCause != null) {
657                 logger.debug("Exception suppressed because a previous exception occurred.",
658                         shutdownInputCause);
659             }
660             promise.setFailure(shutdownOutputCause);
661         } else if (shutdownInputCause != null) {
662             promise.setFailure(shutdownInputCause);
663         } else {
664             promise.setSuccess();
665         }
666     }
667
668     @Override
669     protected void doClose() throws Exception {
670         try {
671             // Calling super.doClose() first so spliceTo(...) will fail on next call.
672             super.doClose();
673         } finally {
674             safeClosePipe(pipeIn);
675             safeClosePipe(pipeOut);
676             clearSpliceQueue();
677         }
678     }
679
680     private void clearSpliceQueue() {
681         Queue<SpliceInTask> sQueue = spliceQueue;
682         if (sQueue == null) {
683             return;
684         }
685         ClosedChannelException exception = null;
686
687         for (;;) {
688             SpliceInTask task = sQueue.poll();
689             if (task == null) {
690                 break;
691             }
692             if (exception == null) {
693                 exception = new ClosedChannelException();
694             }
695             task.promise.tryFailure(exception);
696         }
697     }
698
699     private static void safeClosePipe(FileDescriptor fd) {
700         if (fd != null) {
701             try {
702                 fd.close();
703             } catch (IOException e) {
704                 logger.warn("Error while closing a pipe", e);
705             }
706         }
707     }
708
709     class EpollStreamUnsafe extends AbstractEpollUnsafe {
710         // Overridden here just to be able to access this method from AbstractEpollStreamChannel
711         @Override
712         protected Executor prepareToClose() {
713             return super.prepareToClose();
714         }
715
716         private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
717                 EpollRecvByteAllocatorHandle allocHandle) {
718             if (byteBuf != null) {
719                 if (byteBuf.isReadable()) {
720                     readPending = false;
721                     pipeline.fireChannelRead(byteBuf);
722                 } else {
723                     byteBuf.release();
724                 }
725             }
726             allocHandle.readComplete();
727             pipeline.fireChannelReadComplete();
728             pipeline.fireExceptionCaught(cause);
729
730             // If oom will close the read event, release connection.
731             // See https://github.com/netty/netty/issues/10434
732             if (close || cause instanceof OutOfMemoryError || cause instanceof IOException) {
733                 shutdownInput(false);
734             }
735         }
736
737         @Override
738         EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.ExtendedHandle handle) {
739             return new EpollRecvByteAllocatorStreamingHandle(handle);
740         }
741
742         @Override
743         void epollInReady() {
744             final ChannelConfig config = config();
745             if (shouldBreakEpollInReady(config)) {
746                 clearEpollIn0();
747                 return;
748             }
749             final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
750             allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET));
751
752             final ChannelPipeline pipeline = pipeline();
753             final ByteBufAllocator allocator = config.getAllocator();
754             allocHandle.reset(config);
755             epollInBefore();
756
757             ByteBuf byteBuf = null;
758             boolean close = false;
759             try {
760                 Queue<SpliceInTask> sQueue = null;
761                 do {
762                     if (sQueue != null || (sQueue = spliceQueue) != null) {
763                         SpliceInTask spliceTask = sQueue.peek();
764                         if (spliceTask != null) {
765                             if (spliceTask.spliceIn(allocHandle)) {
766                                 // We need to check if it is still active as if not we removed all SpliceTasks in
767                                 // doClose(...)
768                                 if (isActive()) {
769                                     sQueue.remove();
770                                 }
771                                 continue;
772                             } else {
773                                 break;
774                             }
775                         }
776                     }
777
778                     // we use a direct buffer here as the native implementations only be able
779                     // to handle direct buffers.
780                     byteBuf = allocHandle.allocate(allocator);
781                     allocHandle.lastBytesRead(doReadBytes(byteBuf));
782                     if (allocHandle.lastBytesRead() <= 0) {
783                         // nothing was read, release the buffer.
784                         byteBuf.release();
785                         byteBuf = null;
786                         close = allocHandle.lastBytesRead() < 0;
787                         if (close) {
788                             // There is nothing left to read as we received an EOF.
789                             readPending = false;
790                         }
791                         break;
792                     }
793                     allocHandle.incMessagesRead(1);
794                     readPending = false;
795                     pipeline.fireChannelRead(byteBuf);
796                     byteBuf = null;
797
798                     if (shouldBreakEpollInReady(config)) {
799                         // We need to do this for two reasons:
800                         //
801                         // - If the input was shutdown in between (which may be the case when the user did it in the
802                         //   fireChannelRead(...) method we should not try to read again to not produce any
803                         //   miss-leading exceptions.
804                         //
805                         // - If the user closes the channel we need to ensure we not try to read from it again as
806                         //   the filedescriptor may be re-used already by the OS if the system is handling a lot of
807                         //   concurrent connections and so needs a lot of filedescriptors. If not do this we risk
808                         //   reading data from a filedescriptor that belongs to another socket then the socket that
809                         //   was "wrapped" by this Channel implementation.
810                         break;
811                     }
812                 } while (allocHandle.continueReading());
813
814                 allocHandle.readComplete();
815                 pipeline.fireChannelReadComplete();
816
817                 if (close) {
818                     shutdownInput(false);
819                 }
820             } catch (Throwable t) {
821                 handleReadException(pipeline, byteBuf, t, close, allocHandle);
822             } finally {
823                 epollInFinally(config);
824             }
825         }
826     }
827
828     private void addToSpliceQueue(final SpliceInTask task) {
829         Queue<SpliceInTask> sQueue = spliceQueue;
830         if (sQueue == null) {
831             synchronized (this) {
832                 sQueue = spliceQueue;
833                 if (sQueue == null) {
834                     spliceQueue = sQueue = PlatformDependent.newMpscQueue();
835                 }
836             }
837         }
838         sQueue.add(task);
839     }
840
841     protected abstract class SpliceInTask {
842         final ChannelPromise promise;
843         int len;
844
845         protected SpliceInTask(int len, ChannelPromise promise) {
846             this.promise = promise;
847             this.len = len;
848         }
849
850         abstract boolean spliceIn(RecvByteBufAllocator.Handle handle);
851
852         protected final int spliceIn(FileDescriptor pipeOut, RecvByteBufAllocator.Handle handle) throws IOException {
853             // calculate the maximum amount of data we are allowed to splice
854             int length = Math.min(handle.guess(), len);
855             int splicedIn = 0;
856             for (;;) {
857                 // Splicing until there is nothing left to splice.
858                 int localSplicedIn = Native.splice(socket.intValue(), -1, pipeOut.intValue(), -1, length);
859                 if (localSplicedIn == 0) {
860                     break;
861                 }
862                 splicedIn += localSplicedIn;
863                 length -= localSplicedIn;
864             }
865
866             return splicedIn;
867         }
868     }
869
870     // Let it directly implement channelFutureListener as well to reduce object creation.
871     private final class SpliceInChannelTask extends SpliceInTask implements ChannelFutureListener {
872         private final AbstractEpollStreamChannel ch;
873
874         SpliceInChannelTask(AbstractEpollStreamChannel ch, int len, ChannelPromise promise) {
875             super(len, promise);
876             this.ch = ch;
877         }
878
879         @Override
880         public void operationComplete(ChannelFuture future) throws Exception {
881             if (!future.isSuccess()) {
882                 promise.setFailure(future.cause());
883             }
884         }
885
886         @Override
887         public boolean spliceIn(RecvByteBufAllocator.Handle handle) {
888             assert ch.eventLoop().inEventLoop();
889             if (len == 0) {
890                 promise.setSuccess();
891                 return true;
892             }
893             try {
894                 // We create the pipe on the target channel as this will allow us to just handle pending writes
895                 // later in a correct fashion without get into any ordering issues when spliceTo(...) is called
896                 // on multiple Channels pointing to one target Channel.
897                 FileDescriptor pipeOut = ch.pipeOut;
898                 if (pipeOut == null) {
899                     // Create a new pipe as non was created before.
900                     FileDescriptor[] pipe = pipe();
901                     ch.pipeIn = pipe[0];
902                     pipeOut = ch.pipeOut = pipe[1];
903                 }
904
905                 int splicedIn = spliceIn(pipeOut, handle);
906                 if (splicedIn > 0) {
907                     // Integer.MAX_VALUE is a special value which will result in splice forever.
908                     if (len != Integer.MAX_VALUE) {
909                         len -= splicedIn;
910                     }
911
912                     // Depending on if we are done with splicing inbound data we set the right promise for the
913                     // outbound splicing.
914                     final ChannelPromise splicePromise;
915                     if (len == 0) {
916                         splicePromise = promise;
917                     } else {
918                         splicePromise = ch.newPromise().addListener(this);
919                     }
920
921                     boolean autoRead = config().isAutoRead();
922
923                     // Just call unsafe().write(...) and flush() as we not want to traverse the whole pipeline for this
924                     // case.
925                     ch.unsafe().write(new SpliceOutTask(ch, splicedIn, autoRead), splicePromise);
926                     ch.unsafe().flush();
927                     if (autoRead && !splicePromise.isDone()) {
928                         // Write was not done which means the target channel was not writable. In this case we need to
929                         // disable reading until we are done with splicing to the target channel because:
930                         //
931                         // - The user may want to to trigger another splice operation once the splicing was complete.
932                         config().setAutoRead(false);
933                     }
934                 }
935
936                 return len == 0;
937             } catch (Throwable cause) {
938                 promise.setFailure(cause);
939                 return true;
940             }
941         }
942     }
943
944     private final class SpliceOutTask {
945         private final AbstractEpollStreamChannel ch;
946         private final boolean autoRead;
947         private int len;
948
949         SpliceOutTask(AbstractEpollStreamChannel ch, int len, boolean autoRead) {
950             this.ch = ch;
951             this.len = len;
952             this.autoRead = autoRead;
953         }
954
955         public boolean spliceOut() throws Exception {
956             assert ch.eventLoop().inEventLoop();
957             try {
958                 int splicedOut = Native.splice(ch.pipeIn.intValue(), -1, ch.socket.intValue(), -1, len);
959                 len -= splicedOut;
960                 if (len == 0) {
961                     if (autoRead) {
962                         // AutoRead was used and we spliced everything so start reading again
963                         config().setAutoRead(true);
964                     }
965                     return true;
966                 }
967                 return false;
968             } catch (IOException e) {
969                 if (autoRead) {
970                     // AutoRead was used and we spliced everything so start reading again
971                     config().setAutoRead(true);
972                 }
973                 throw e;
974             }
975         }
976     }
977
978     private final class SpliceFdTask extends SpliceInTask {
979         private final FileDescriptor fd;
980         private final ChannelPromise promise;
981         private int offset;
982
983         SpliceFdTask(FileDescriptor fd, int offset, int len, ChannelPromise promise) {
984             super(len, promise);
985             this.fd = fd;
986             this.promise = promise;
987             this.offset = offset;
988         }
989
990         @Override
991         public boolean spliceIn(RecvByteBufAllocator.Handle handle) {
992             assert eventLoop().inEventLoop();
993             if (len == 0) {
994                 promise.setSuccess();
995                 return true;
996             }
997
998             try {
999                 FileDescriptor[] pipe = pipe();
1000                 FileDescriptor pipeIn = pipe[0];
1001                 FileDescriptor pipeOut = pipe[1];
1002                 try {
1003                     int splicedIn = spliceIn(pipeOut, handle);
1004                     if (splicedIn > 0) {
1005                         // Integer.MAX_VALUE is a special value which will result in splice forever.
1006                         if (len != Integer.MAX_VALUE) {
1007                             len -= splicedIn;
1008                         }
1009                         do {
1010                             int splicedOut = Native.splice(pipeIn.intValue(), -1, fd.intValue(), offset, splicedIn);
1011                             offset += splicedOut;
1012                             splicedIn -= splicedOut;
1013                         } while (splicedIn > 0);
1014                         if (len == 0) {
1015                             promise.setSuccess();
1016                             return true;
1017                         }
1018                     }
1019                     return false;
1020                 } finally {
1021                     safeClosePipe(pipeIn);
1022                     safeClosePipe(pipeOut);
1023                 }
1024             } catch (Throwable cause) {
1025                 promise.setFailure(cause);
1026                 return true;
1027             }
1028         }
1029     }
1030
1031     private final class EpollSocketWritableByteChannel extends SocketWritableByteChannel {
1032         EpollSocketWritableByteChannel() {
1033             super(socket);
1034         }
1035
1036         @Override
1037         protected ByteBufAllocator alloc() {
1038             return AbstractEpollStreamChannel.this.alloc();
1039         }
1040     }
1041 }
1042