1 /*
2 * Copyright 2015 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package io.netty.channel.epoll;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.channel.Channel;
21 import io.netty.channel.ChannelConfig;
22 import io.netty.channel.ChannelFuture;
23 import io.netty.channel.ChannelFutureListener;
24 import io.netty.channel.ChannelMetadata;
25 import io.netty.channel.ChannelOutboundBuffer;
26 import io.netty.channel.ChannelPipeline;
27 import io.netty.channel.ChannelPromise;
28 import io.netty.channel.DefaultFileRegion;
29 import io.netty.channel.EventLoop;
30 import io.netty.channel.FileRegion;
31 import io.netty.channel.RecvByteBufAllocator;
32 import io.netty.channel.internal.ChannelUtils;
33 import io.netty.channel.socket.DuplexChannel;
34 import io.netty.channel.unix.FileDescriptor;
35 import io.netty.channel.unix.IovArray;
36 import io.netty.channel.unix.SocketWritableByteChannel;
37 import io.netty.channel.unix.UnixChannelUtil;
38 import io.netty.util.internal.PlatformDependent;
39 import io.netty.util.internal.StringUtil;
40 import io.netty.util.internal.UnstableApi;
41 import io.netty.util.internal.logging.InternalLogger;
42 import io.netty.util.internal.logging.InternalLoggerFactory;
43
44 import java.io.IOException;
45 import java.net.SocketAddress;
46 import java.nio.ByteBuffer;
47 import java.nio.channels.ClosedChannelException;
48 import java.nio.channels.WritableByteChannel;
49 import java.util.Queue;
50 import java.util.concurrent.Executor;
51
52 import static io.netty.channel.internal.ChannelUtils.MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD;
53 import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
54 import static io.netty.channel.unix.FileDescriptor.pipe;
55 import static io.netty.util.internal.ObjectUtil.checkNotNull;
56 import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
57
58 public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel implements DuplexChannel {
59 private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
60 private static final String EXPECTED_TYPES =
61 " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
62 StringUtil.simpleClassName(DefaultFileRegion.class) + ')';
63 private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractEpollStreamChannel.class);
64
65 private final Runnable flushTask = new Runnable() {
66 @Override
67 public void run() {
68 // Calling flush0 directly to ensure we not try to flush messages that were added via write(...) in the
69 // meantime.
70 ((AbstractEpollUnsafe) unsafe()).flush0();
71 }
72 };
73
74 // Lazy init these if we need to splice(...)
75 private volatile Queue<SpliceInTask> spliceQueue;
76 private FileDescriptor pipeIn;
77 private FileDescriptor pipeOut;
78
79 private WritableByteChannel byteChannel;
80
81 protected AbstractEpollStreamChannel(Channel parent, int fd) {
82 this(parent, new LinuxSocket(fd));
83 }
84
85 protected AbstractEpollStreamChannel(int fd) {
86 this(new LinuxSocket(fd));
87 }
88
89 AbstractEpollStreamChannel(LinuxSocket fd) {
90 this(fd, isSoErrorZero(fd));
91 }
92
93 AbstractEpollStreamChannel(Channel parent, LinuxSocket fd) {
94 super(parent, fd, true);
95 // Add EPOLLRDHUP so we are notified once the remote peer close the connection.
96 flags |= Native.EPOLLRDHUP;
97 }
98
99 AbstractEpollStreamChannel(Channel parent, LinuxSocket fd, SocketAddress remote) {
100 super(parent, fd, remote);
101 // Add EPOLLRDHUP so we are notified once the remote peer close the connection.
102 flags |= Native.EPOLLRDHUP;
103 }
104
105 protected AbstractEpollStreamChannel(LinuxSocket fd, boolean active) {
106 super(null, fd, active);
107 // Add EPOLLRDHUP so we are notified once the remote peer close the connection.
108 flags |= Native.EPOLLRDHUP;
109 }
110
111 @Override
112 protected AbstractEpollUnsafe newUnsafe() {
113 return new EpollStreamUnsafe();
114 }
115
116 @Override
117 public ChannelMetadata metadata() {
118 return METADATA;
119 }
120
121 /**
122 * Splice from this {@link AbstractEpollStreamChannel} to another {@link AbstractEpollStreamChannel}.
123 * The {@code len} is the number of bytes to splice. If using {@link Integer#MAX_VALUE} it will
124 * splice until the {@link ChannelFuture} was canceled or it was failed.
125 *
126 * Please note:
127 * <ul>
128 * <li>both channels need to be registered to the same {@link EventLoop}, otherwise an
129 * {@link IllegalArgumentException} is thrown. </li>
130 * <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this and the
131 * target {@link AbstractEpollStreamChannel}</li>
132 * </ul>
133 *
134 */
135 public final ChannelFuture spliceTo(final AbstractEpollStreamChannel ch, final int len) {
136 return spliceTo(ch, len, newPromise());
137 }
138
139 /**
140 * Splice from this {@link AbstractEpollStreamChannel} to another {@link AbstractEpollStreamChannel}.
141 * The {@code len} is the number of bytes to splice. If using {@link Integer#MAX_VALUE} it will
142 * splice until the {@link ChannelFuture} was canceled or it was failed.
143 *
144 * Please note:
145 * <ul>
146 * <li>both channels need to be registered to the same {@link EventLoop}, otherwise an
147 * {@link IllegalArgumentException} is thrown. </li>
148 * <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this and the
149 * target {@link AbstractEpollStreamChannel}</li>
150 * </ul>
151 *
152 */
153 public final ChannelFuture spliceTo(final AbstractEpollStreamChannel ch, final int len,
154 final ChannelPromise promise) {
155 if (ch.eventLoop() != eventLoop()) {
156 throw new IllegalArgumentException("EventLoops are not the same.");
157 }
158 checkPositiveOrZero(len, "len");
159 if (ch.config().getEpollMode() != EpollMode.LEVEL_TRIGGERED
160 || config().getEpollMode() != EpollMode.LEVEL_TRIGGERED) {
161 throw new IllegalStateException("spliceTo() supported only when using " + EpollMode.LEVEL_TRIGGERED);
162 }
163 checkNotNull(promise, "promise");
164 if (!isOpen()) {
165 promise.tryFailure(new ClosedChannelException());
166 } else {
167 addToSpliceQueue(new SpliceInChannelTask(ch, len, promise));
168 failSpliceIfClosed(promise);
169 }
170 return promise;
171 }
172
173 /**
174 * Splice from this {@link AbstractEpollStreamChannel} to another {@link FileDescriptor}.
175 * The {@code offset} is the offset for the {@link FileDescriptor} and {@code len} is the
176 * number of bytes to splice. If using {@link Integer#MAX_VALUE} it will splice until the
177 * {@link ChannelFuture} was canceled or it was failed.
178 *
179 * Please note:
180 * <ul>
181 * <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this
182 * {@link AbstractEpollStreamChannel}</li>
183 * <li>the {@link FileDescriptor} will not be closed after the {@link ChannelFuture} is notified</li>
184 * <li>this channel must be registered to an event loop or {@link IllegalStateException} will be thrown.</li>
185 * </ul>
186 */
187 public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len) {
188 return spliceTo(ch, offset, len, newPromise());
189 }
190
191 /**
192 * Splice from this {@link AbstractEpollStreamChannel} to another {@link FileDescriptor}.
193 * The {@code offset} is the offset for the {@link FileDescriptor} and {@code len} is the
194 * number of bytes to splice. If using {@link Integer#MAX_VALUE} it will splice until the
195 * {@link ChannelFuture} was canceled or it was failed.
196 *
197 * Please note:
198 * <ul>
199 * <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this
200 * {@link AbstractEpollStreamChannel}</li>
201 * <li>the {@link FileDescriptor} will not be closed after the {@link ChannelPromise} is notified</li>
202 * <li>this channel must be registered to an event loop or {@link IllegalStateException} will be thrown.</li>
203 * </ul>
204 */
205 public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len,
206 final ChannelPromise promise) {
207 checkPositiveOrZero(len, "len");
208 checkPositiveOrZero(offset, "offset");
209 if (config().getEpollMode() != EpollMode.LEVEL_TRIGGERED) {
210 throw new IllegalStateException("spliceTo() supported only when using " + EpollMode.LEVEL_TRIGGERED);
211 }
212 checkNotNull(promise, "promise");
213 if (!isOpen()) {
214 promise.tryFailure(new ClosedChannelException());
215 } else {
216 addToSpliceQueue(new SpliceFdTask(ch, offset, len, promise));
217 failSpliceIfClosed(promise);
218 }
219 return promise;
220 }
221
222 private void failSpliceIfClosed(ChannelPromise promise) {
223 if (!isOpen()) {
224 // Seems like the Channel was closed in the meantime try to fail the promise to prevent any
225 // cases where a future may not be notified otherwise.
226 if (promise.tryFailure(new ClosedChannelException())) {
227 eventLoop().execute(new Runnable() {
228 @Override
229 public void run() {
230 // Call this via the EventLoop as it is a MPSC queue.
231 clearSpliceQueue();
232 }
233 });
234 }
235 }
236 }
237
238 /**
239 * Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}.
240 * @param in the collection which contains objects to write.
241 * @param buf the {@link ByteBuf} from which the bytes should be written
242 * @return The value that should be decremented from the write quantum which starts at
243 * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
244 * <ul>
245 * <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
246 * is encountered</li>
247 * <li>1 - if a single call to write data was made to the OS</li>
248 * <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
249 * no data was accepted</li>
250 * </ul>
251 */
252 private int writeBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
253 int readableBytes = buf.readableBytes();
254 if (readableBytes == 0) {
255 in.remove();
256 return 0;
257 }
258
259 if (buf.hasMemoryAddress() || buf.nioBufferCount() == 1) {
260 return doWriteBytes(in, buf);
261 } else {
262 ByteBuffer[] nioBuffers = buf.nioBuffers();
263 return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes,
264 config().getMaxBytesPerGatheringWrite());
265 }
266 }
267
268 private void adjustMaxBytesPerGatheringWrite(long attempted, long written, long oldMaxBytesPerGatheringWrite) {
269 // By default we track the SO_SNDBUF when ever it is explicitly set. However some OSes may dynamically change
270 // SO_SNDBUF (and other characteristics that determine how much data can be written at once) so we should try
271 // make a best effort to adjust as OS behavior changes.
272 if (attempted == written) {
273 if (attempted << 1 > oldMaxBytesPerGatheringWrite) {
274 config().setMaxBytesPerGatheringWrite(attempted << 1);
275 }
276 } else if (attempted > MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD && written < attempted >>> 1) {
277 config().setMaxBytesPerGatheringWrite(attempted >>> 1);
278 }
279 }
280
281 /**
282 * Write multiple bytes via {@link IovArray}.
283 * @param in the collection which contains objects to write.
284 * @param array The array which contains the content to write.
285 * @return The value that should be decremented from the write quantum which starts at
286 * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
287 * <ul>
288 * <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
289 * is encountered</li>
290 * <li>1 - if a single call to write data was made to the OS</li>
291 * <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
292 * no data was accepted</li>
293 * </ul>
294 * @throws IOException If an I/O exception occurs during write.
295 */
296 private int writeBytesMultiple(ChannelOutboundBuffer in, IovArray array) throws IOException {
297 final long expectedWrittenBytes = array.size();
298 assert expectedWrittenBytes != 0;
299 final int cnt = array.count();
300 assert cnt != 0;
301
302 final long localWrittenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
303 if (localWrittenBytes > 0) {
304 adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, array.maxBytes());
305 in.removeBytes(localWrittenBytes);
306 return 1;
307 }
308 return WRITE_STATUS_SNDBUF_FULL;
309 }
310
311 /**
312 * Write multiple bytes via {@link ByteBuffer} array.
313 * @param in the collection which contains objects to write.
314 * @param nioBuffers The buffers to write.
315 * @param nioBufferCnt The number of buffers to write.
316 * @param expectedWrittenBytes The number of bytes we expect to write.
317 * @param maxBytesPerGatheringWrite The maximum number of bytes we should attempt to write.
318 * @return The value that should be decremented from the write quantum which starts at
319 * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
320 * <ul>
321 * <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
322 * is encountered</li>
323 * <li>1 - if a single call to write data was made to the OS</li>
324 * <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
325 * no data was accepted</li>
326 * </ul>
327 * @throws IOException If an I/O exception occurs during write.
328 */
329 private int writeBytesMultiple(
330 ChannelOutboundBuffer in, ByteBuffer[] nioBuffers, int nioBufferCnt, long expectedWrittenBytes,
331 long maxBytesPerGatheringWrite) throws IOException {
332 assert expectedWrittenBytes != 0;
333 if (expectedWrittenBytes > maxBytesPerGatheringWrite) {
334 expectedWrittenBytes = maxBytesPerGatheringWrite;
335 }
336
337 final long localWrittenBytes = socket.writev(nioBuffers, 0, nioBufferCnt, expectedWrittenBytes);
338 if (localWrittenBytes > 0) {
339 adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, maxBytesPerGatheringWrite);
340 in.removeBytes(localWrittenBytes);
341 return 1;
342 }
343 return WRITE_STATUS_SNDBUF_FULL;
344 }
345
346 /**
347 * Write a {@link DefaultFileRegion}
348 * @param in the collection which contains objects to write.
349 * @param region the {@link DefaultFileRegion} from which the bytes should be written
350 * @return The value that should be decremented from the write quantum which starts at
351 * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
352 * <ul>
353 * <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
354 * is encountered</li>
355 * <li>1 - if a single call to write data was made to the OS</li>
356 * <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
357 * no data was accepted</li>
358 * </ul>
359 */
360 private int writeDefaultFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception {
361 final long offset = region.transferred();
362 final long regionCount = region.count();
363 if (offset >= regionCount) {
364 in.remove();
365 return 0;
366 }
367
368 final long flushedAmount = socket.sendFile(region, region.position(), offset, regionCount - offset);
369 if (flushedAmount > 0) {
370 in.progress(flushedAmount);
371 if (region.transferred() >= regionCount) {
372 in.remove();
373 }
374 return 1;
375 } else if (flushedAmount == 0) {
376 validateFileRegion(region, offset);
377 }
378 return WRITE_STATUS_SNDBUF_FULL;
379 }
380
381 /**
382 * Write a {@link FileRegion}
383 * @param in the collection which contains objects to write.
384 * @param region the {@link FileRegion} from which the bytes should be written
385 * @return The value that should be decremented from the write quantum which starts at
386 * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
387 * <ul>
388 * <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
389 * is encountered</li>
390 * <li>1 - if a single call to write data was made to the OS</li>
391 * <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
392 * no data was accepted</li>
393 * </ul>
394 */
395 private int writeFileRegion(ChannelOutboundBuffer in, FileRegion region) throws Exception {
396 if (region.transferred() >= region.count()) {
397 in.remove();
398 return 0;
399 }
400
401 if (byteChannel == null) {
402 byteChannel = new EpollSocketWritableByteChannel();
403 }
404 final long flushedAmount = region.transferTo(byteChannel, region.transferred());
405 if (flushedAmount > 0) {
406 in.progress(flushedAmount);
407 if (region.transferred() >= region.count()) {
408 in.remove();
409 }
410 return 1;
411 }
412 return WRITE_STATUS_SNDBUF_FULL;
413 }
414
415 @Override
416 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
417 int writeSpinCount = config().getWriteSpinCount();
418 do {
419 final int msgCount = in.size();
420 // Do gathering write if the outbound buffer entries start with more than one ByteBuf.
421 if (msgCount > 1 && in.current() instanceof ByteBuf) {
422 writeSpinCount -= doWriteMultiple(in);
423 } else if (msgCount == 0) {
424 // Wrote all messages.
425 clearFlag(Native.EPOLLOUT);
426 // Return here so we not set the EPOLLOUT flag.
427 return;
428 } else { // msgCount == 1
429 writeSpinCount -= doWriteSingle(in);
430 }
431
432 // We do not break the loop here even if the outbound buffer was flushed completely,
433 // because a user might have triggered another write and flush when we notify his or her
434 // listeners.
435 } while (writeSpinCount > 0);
436
437 if (writeSpinCount == 0) {
438 // It is possible that we have set EPOLLOUT, woken up by EPOLL because the socket is writable, and then use
439 // our write quantum. In this case we no longer want to set the EPOLLOUT flag because the socket is still
440 // writable (as far as we know). We will find out next time we attempt to write if the socket is writable
441 // and set the EPOLLOUT if necessary.
442 clearFlag(Native.EPOLLOUT);
443
444 // We used our writeSpin quantum, and should try to write again later.
445 eventLoop().execute(flushTask);
446 } else {
447 // Underlying descriptor can not accept all data currently, so set the EPOLLOUT flag to be woken up
448 // when it can accept more data.
449 setFlag(Native.EPOLLOUT);
450 }
451 }
452
453 /**
454 * Attempt to write a single object.
455 * @param in the collection which contains objects to write.
456 * @return The value that should be decremented from the write quantum which starts at
457 * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
458 * <ul>
459 * <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
460 * is encountered</li>
461 * <li>1 - if a single call to write data was made to the OS</li>
462 * <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
463 * no data was accepted</li>
464 * </ul>
465 * @throws Exception If an I/O error occurs.
466 */
467 protected int doWriteSingle(ChannelOutboundBuffer in) throws Exception {
468 // The outbound buffer contains only one message or it contains a file region.
469 Object msg = in.current();
470 if (msg instanceof ByteBuf) {
471 return writeBytes(in, (ByteBuf) msg);
472 } else if (msg instanceof DefaultFileRegion) {
473 return writeDefaultFileRegion(in, (DefaultFileRegion) msg);
474 } else if (msg instanceof FileRegion) {
475 return writeFileRegion(in, (FileRegion) msg);
476 } else if (msg instanceof SpliceOutTask) {
477 if (!((SpliceOutTask) msg).spliceOut()) {
478 return WRITE_STATUS_SNDBUF_FULL;
479 }
480 in.remove();
481 return 1;
482 } else {
483 // Should never reach here.
484 throw new Error();
485 }
486 }
487
488 /**
489 * Attempt to write multiple {@link ByteBuf} objects.
490 * @param in the collection which contains objects to write.
491 * @return The value that should be decremented from the write quantum which starts at
492 * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
493 * <ul>
494 * <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
495 * is encountered</li>
496 * <li>1 - if a single call to write data was made to the OS</li>
497 * <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
498 * no data was accepted</li>
499 * </ul>
500 * @throws Exception If an I/O error occurs.
501 */
502 private int doWriteMultiple(ChannelOutboundBuffer in) throws Exception {
503 final long maxBytesPerGatheringWrite = config().getMaxBytesPerGatheringWrite();
504 IovArray array = ((EpollEventLoop) eventLoop()).cleanIovArray();
505 array.maxBytes(maxBytesPerGatheringWrite);
506 in.forEachFlushedMessage(array);
507
508 if (array.count() >= 1) {
509 // TODO: Handle the case where cnt == 1 specially.
510 return writeBytesMultiple(in, array);
511 }
512 // cnt == 0, which means the outbound buffer contained empty buffers only.
513 in.removeBytes(0);
514 return 0;
515 }
516
517 @Override
518 protected Object filterOutboundMessage(Object msg) {
519 if (msg instanceof ByteBuf) {
520 ByteBuf buf = (ByteBuf) msg;
521 return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf): buf;
522 }
523
524 if (msg instanceof FileRegion || msg instanceof SpliceOutTask) {
525 return msg;
526 }
527
528 throw new UnsupportedOperationException(
529 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
530 }
531
532 @UnstableApi
533 @Override
534 protected final void doShutdownOutput() throws Exception {
535 socket.shutdown(false, true);
536 }
537
538 private void shutdownInput0(final ChannelPromise promise) {
539 try {
540 socket.shutdown(true, false);
541 promise.setSuccess();
542 } catch (Throwable cause) {
543 promise.setFailure(cause);
544 }
545 }
546
547 @Override
548 public boolean isOutputShutdown() {
549 return socket.isOutputShutdown();
550 }
551
552 @Override
553 public boolean isInputShutdown() {
554 return socket.isInputShutdown();
555 }
556
557 @Override
558 public boolean isShutdown() {
559 return socket.isShutdown();
560 }
561
562 @Override
563 public ChannelFuture shutdownOutput() {
564 return shutdownOutput(newPromise());
565 }
566
567 @Override
568 public ChannelFuture shutdownOutput(final ChannelPromise promise) {
569 EventLoop loop = eventLoop();
570 if (loop.inEventLoop()) {
571 ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
572 } else {
573 loop.execute(new Runnable() {
574 @Override
575 public void run() {
576 ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
577 }
578 });
579 }
580
581 return promise;
582 }
583
584 @Override
585 public ChannelFuture shutdownInput() {
586 return shutdownInput(newPromise());
587 }
588
589 @Override
590 public ChannelFuture shutdownInput(final ChannelPromise promise) {
591 Executor closeExecutor = ((EpollStreamUnsafe) unsafe()).prepareToClose();
592 if (closeExecutor != null) {
593 closeExecutor.execute(new Runnable() {
594 @Override
595 public void run() {
596 shutdownInput0(promise);
597 }
598 });
599 } else {
600 EventLoop loop = eventLoop();
601 if (loop.inEventLoop()) {
602 shutdownInput0(promise);
603 } else {
604 loop.execute(new Runnable() {
605 @Override
606 public void run() {
607 shutdownInput0(promise);
608 }
609 });
610 }
611 }
612 return promise;
613 }
614
615 @Override
616 public ChannelFuture shutdown() {
617 return shutdown(newPromise());
618 }
619
620 @Override
621 public ChannelFuture shutdown(final ChannelPromise promise) {
622 ChannelFuture shutdownOutputFuture = shutdownOutput();
623 if (shutdownOutputFuture.isDone()) {
624 shutdownOutputDone(shutdownOutputFuture, promise);
625 } else {
626 shutdownOutputFuture.addListener(new ChannelFutureListener() {
627 @Override
628 public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception {
629 shutdownOutputDone(shutdownOutputFuture, promise);
630 }
631 });
632 }
633 return promise;
634 }
635
636 private void shutdownOutputDone(final ChannelFuture shutdownOutputFuture, final ChannelPromise promise) {
637 ChannelFuture shutdownInputFuture = shutdownInput();
638 if (shutdownInputFuture.isDone()) {
639 shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
640 } else {
641 shutdownInputFuture.addListener(new ChannelFutureListener() {
642 @Override
643 public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception {
644 shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
645 }
646 });
647 }
648 }
649
650 private static void shutdownDone(ChannelFuture shutdownOutputFuture,
651 ChannelFuture shutdownInputFuture,
652 ChannelPromise promise) {
653 Throwable shutdownOutputCause = shutdownOutputFuture.cause();
654 Throwable shutdownInputCause = shutdownInputFuture.cause();
655 if (shutdownOutputCause != null) {
656 if (shutdownInputCause != null) {
657 logger.debug("Exception suppressed because a previous exception occurred.",
658 shutdownInputCause);
659 }
660 promise.setFailure(shutdownOutputCause);
661 } else if (shutdownInputCause != null) {
662 promise.setFailure(shutdownInputCause);
663 } else {
664 promise.setSuccess();
665 }
666 }
667
668 @Override
669 protected void doClose() throws Exception {
670 try {
671 // Calling super.doClose() first so spliceTo(...) will fail on next call.
672 super.doClose();
673 } finally {
674 safeClosePipe(pipeIn);
675 safeClosePipe(pipeOut);
676 clearSpliceQueue();
677 }
678 }
679
680 private void clearSpliceQueue() {
681 Queue<SpliceInTask> sQueue = spliceQueue;
682 if (sQueue == null) {
683 return;
684 }
685 ClosedChannelException exception = null;
686
687 for (;;) {
688 SpliceInTask task = sQueue.poll();
689 if (task == null) {
690 break;
691 }
692 if (exception == null) {
693 exception = new ClosedChannelException();
694 }
695 task.promise.tryFailure(exception);
696 }
697 }
698
699 private static void safeClosePipe(FileDescriptor fd) {
700 if (fd != null) {
701 try {
702 fd.close();
703 } catch (IOException e) {
704 logger.warn("Error while closing a pipe", e);
705 }
706 }
707 }
708
709 class EpollStreamUnsafe extends AbstractEpollUnsafe {
710 // Overridden here just to be able to access this method from AbstractEpollStreamChannel
711 @Override
712 protected Executor prepareToClose() {
713 return super.prepareToClose();
714 }
715
716 private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
717 EpollRecvByteAllocatorHandle allocHandle) {
718 if (byteBuf != null) {
719 if (byteBuf.isReadable()) {
720 readPending = false;
721 pipeline.fireChannelRead(byteBuf);
722 } else {
723 byteBuf.release();
724 }
725 }
726 allocHandle.readComplete();
727 pipeline.fireChannelReadComplete();
728 pipeline.fireExceptionCaught(cause);
729
730 // If oom will close the read event, release connection.
731 // See https://github.com/netty/netty/issues/10434
732 if (close || cause instanceof OutOfMemoryError || cause instanceof IOException) {
733 shutdownInput(false);
734 }
735 }
736
737 @Override
738 EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.ExtendedHandle handle) {
739 return new EpollRecvByteAllocatorStreamingHandle(handle);
740 }
741
742 @Override
743 void epollInReady() {
744 final ChannelConfig config = config();
745 if (shouldBreakEpollInReady(config)) {
746 clearEpollIn0();
747 return;
748 }
749 final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
750 allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET));
751
752 final ChannelPipeline pipeline = pipeline();
753 final ByteBufAllocator allocator = config.getAllocator();
754 allocHandle.reset(config);
755 epollInBefore();
756
757 ByteBuf byteBuf = null;
758 boolean close = false;
759 try {
760 Queue<SpliceInTask> sQueue = null;
761 do {
762 if (sQueue != null || (sQueue = spliceQueue) != null) {
763 SpliceInTask spliceTask = sQueue.peek();
764 if (spliceTask != null) {
765 if (spliceTask.spliceIn(allocHandle)) {
766 // We need to check if it is still active as if not we removed all SpliceTasks in
767 // doClose(...)
768 if (isActive()) {
769 sQueue.remove();
770 }
771 continue;
772 } else {
773 break;
774 }
775 }
776 }
777
778 // we use a direct buffer here as the native implementations only be able
779 // to handle direct buffers.
780 byteBuf = allocHandle.allocate(allocator);
781 allocHandle.lastBytesRead(doReadBytes(byteBuf));
782 if (allocHandle.lastBytesRead() <= 0) {
783 // nothing was read, release the buffer.
784 byteBuf.release();
785 byteBuf = null;
786 close = allocHandle.lastBytesRead() < 0;
787 if (close) {
788 // There is nothing left to read as we received an EOF.
789 readPending = false;
790 }
791 break;
792 }
793 allocHandle.incMessagesRead(1);
794 readPending = false;
795 pipeline.fireChannelRead(byteBuf);
796 byteBuf = null;
797
798 if (shouldBreakEpollInReady(config)) {
799 // We need to do this for two reasons:
800 //
801 // - If the input was shutdown in between (which may be the case when the user did it in the
802 // fireChannelRead(...) method we should not try to read again to not produce any
803 // miss-leading exceptions.
804 //
805 // - If the user closes the channel we need to ensure we not try to read from it again as
806 // the filedescriptor may be re-used already by the OS if the system is handling a lot of
807 // concurrent connections and so needs a lot of filedescriptors. If not do this we risk
808 // reading data from a filedescriptor that belongs to another socket then the socket that
809 // was "wrapped" by this Channel implementation.
810 break;
811 }
812 } while (allocHandle.continueReading());
813
814 allocHandle.readComplete();
815 pipeline.fireChannelReadComplete();
816
817 if (close) {
818 shutdownInput(false);
819 }
820 } catch (Throwable t) {
821 handleReadException(pipeline, byteBuf, t, close, allocHandle);
822 } finally {
823 epollInFinally(config);
824 }
825 }
826 }
827
828 private void addToSpliceQueue(final SpliceInTask task) {
829 Queue<SpliceInTask> sQueue = spliceQueue;
830 if (sQueue == null) {
831 synchronized (this) {
832 sQueue = spliceQueue;
833 if (sQueue == null) {
834 spliceQueue = sQueue = PlatformDependent.newMpscQueue();
835 }
836 }
837 }
838 sQueue.add(task);
839 }
840
841 protected abstract class SpliceInTask {
842 final ChannelPromise promise;
843 int len;
844
845 protected SpliceInTask(int len, ChannelPromise promise) {
846 this.promise = promise;
847 this.len = len;
848 }
849
850 abstract boolean spliceIn(RecvByteBufAllocator.Handle handle);
851
852 protected final int spliceIn(FileDescriptor pipeOut, RecvByteBufAllocator.Handle handle) throws IOException {
853 // calculate the maximum amount of data we are allowed to splice
854 int length = Math.min(handle.guess(), len);
855 int splicedIn = 0;
856 for (;;) {
857 // Splicing until there is nothing left to splice.
858 int localSplicedIn = Native.splice(socket.intValue(), -1, pipeOut.intValue(), -1, length);
859 if (localSplicedIn == 0) {
860 break;
861 }
862 splicedIn += localSplicedIn;
863 length -= localSplicedIn;
864 }
865
866 return splicedIn;
867 }
868 }
869
870 // Let it directly implement channelFutureListener as well to reduce object creation.
871 private final class SpliceInChannelTask extends SpliceInTask implements ChannelFutureListener {
872 private final AbstractEpollStreamChannel ch;
873
874 SpliceInChannelTask(AbstractEpollStreamChannel ch, int len, ChannelPromise promise) {
875 super(len, promise);
876 this.ch = ch;
877 }
878
879 @Override
880 public void operationComplete(ChannelFuture future) throws Exception {
881 if (!future.isSuccess()) {
882 promise.setFailure(future.cause());
883 }
884 }
885
886 @Override
887 public boolean spliceIn(RecvByteBufAllocator.Handle handle) {
888 assert ch.eventLoop().inEventLoop();
889 if (len == 0) {
890 promise.setSuccess();
891 return true;
892 }
893 try {
894 // We create the pipe on the target channel as this will allow us to just handle pending writes
895 // later in a correct fashion without get into any ordering issues when spliceTo(...) is called
896 // on multiple Channels pointing to one target Channel.
897 FileDescriptor pipeOut = ch.pipeOut;
898 if (pipeOut == null) {
899 // Create a new pipe as non was created before.
900 FileDescriptor[] pipe = pipe();
901 ch.pipeIn = pipe[0];
902 pipeOut = ch.pipeOut = pipe[1];
903 }
904
905 int splicedIn = spliceIn(pipeOut, handle);
906 if (splicedIn > 0) {
907 // Integer.MAX_VALUE is a special value which will result in splice forever.
908 if (len != Integer.MAX_VALUE) {
909 len -= splicedIn;
910 }
911
912 // Depending on if we are done with splicing inbound data we set the right promise for the
913 // outbound splicing.
914 final ChannelPromise splicePromise;
915 if (len == 0) {
916 splicePromise = promise;
917 } else {
918 splicePromise = ch.newPromise().addListener(this);
919 }
920
921 boolean autoRead = config().isAutoRead();
922
923 // Just call unsafe().write(...) and flush() as we not want to traverse the whole pipeline for this
924 // case.
925 ch.unsafe().write(new SpliceOutTask(ch, splicedIn, autoRead), splicePromise);
926 ch.unsafe().flush();
927 if (autoRead && !splicePromise.isDone()) {
928 // Write was not done which means the target channel was not writable. In this case we need to
929 // disable reading until we are done with splicing to the target channel because:
930 //
931 // - The user may want to to trigger another splice operation once the splicing was complete.
932 config().setAutoRead(false);
933 }
934 }
935
936 return len == 0;
937 } catch (Throwable cause) {
938 promise.setFailure(cause);
939 return true;
940 }
941 }
942 }
943
944 private final class SpliceOutTask {
945 private final AbstractEpollStreamChannel ch;
946 private final boolean autoRead;
947 private int len;
948
949 SpliceOutTask(AbstractEpollStreamChannel ch, int len, boolean autoRead) {
950 this.ch = ch;
951 this.len = len;
952 this.autoRead = autoRead;
953 }
954
955 public boolean spliceOut() throws Exception {
956 assert ch.eventLoop().inEventLoop();
957 try {
958 int splicedOut = Native.splice(ch.pipeIn.intValue(), -1, ch.socket.intValue(), -1, len);
959 len -= splicedOut;
960 if (len == 0) {
961 if (autoRead) {
962 // AutoRead was used and we spliced everything so start reading again
963 config().setAutoRead(true);
964 }
965 return true;
966 }
967 return false;
968 } catch (IOException e) {
969 if (autoRead) {
970 // AutoRead was used and we spliced everything so start reading again
971 config().setAutoRead(true);
972 }
973 throw e;
974 }
975 }
976 }
977
978 private final class SpliceFdTask extends SpliceInTask {
979 private final FileDescriptor fd;
980 private final ChannelPromise promise;
981 private int offset;
982
983 SpliceFdTask(FileDescriptor fd, int offset, int len, ChannelPromise promise) {
984 super(len, promise);
985 this.fd = fd;
986 this.promise = promise;
987 this.offset = offset;
988 }
989
990 @Override
991 public boolean spliceIn(RecvByteBufAllocator.Handle handle) {
992 assert eventLoop().inEventLoop();
993 if (len == 0) {
994 promise.setSuccess();
995 return true;
996 }
997
998 try {
999 FileDescriptor[] pipe = pipe();
1000 FileDescriptor pipeIn = pipe[0];
1001 FileDescriptor pipeOut = pipe[1];
1002 try {
1003 int splicedIn = spliceIn(pipeOut, handle);
1004 if (splicedIn > 0) {
1005 // Integer.MAX_VALUE is a special value which will result in splice forever.
1006 if (len != Integer.MAX_VALUE) {
1007 len -= splicedIn;
1008 }
1009 do {
1010 int splicedOut = Native.splice(pipeIn.intValue(), -1, fd.intValue(), offset, splicedIn);
1011 offset += splicedOut;
1012 splicedIn -= splicedOut;
1013 } while (splicedIn > 0);
1014 if (len == 0) {
1015 promise.setSuccess();
1016 return true;
1017 }
1018 }
1019 return false;
1020 } finally {
1021 safeClosePipe(pipeIn);
1022 safeClosePipe(pipeOut);
1023 }
1024 } catch (Throwable cause) {
1025 promise.setFailure(cause);
1026 return true;
1027 }
1028 }
1029 }
1030
1031 private final class EpollSocketWritableByteChannel extends SocketWritableByteChannel {
1032 EpollSocketWritableByteChannel() {
1033 super(socket);
1034 }
1035
1036 @Override
1037 protected ByteBufAllocator alloc() {
1038 return AbstractEpollStreamChannel.this.alloc();
1039 }
1040 }
1041 }
1042