1
16 package io.netty.channel.epoll;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.buffer.ByteBufUtil;
21 import io.netty.buffer.Unpooled;
22 import io.netty.channel.AbstractChannel;
23 import io.netty.channel.Channel;
24 import io.netty.channel.ChannelConfig;
25 import io.netty.channel.ChannelException;
26 import io.netty.channel.ChannelFuture;
27 import io.netty.channel.ChannelFutureListener;
28 import io.netty.channel.ChannelMetadata;
29 import io.netty.channel.ChannelOutboundBuffer;
30 import io.netty.channel.ChannelPromise;
31 import io.netty.channel.ConnectTimeoutException;
32 import io.netty.channel.EventLoop;
33 import io.netty.channel.RecvByteBufAllocator;
34 import io.netty.channel.nio.AbstractNioChannel;
35 import io.netty.channel.socket.ChannelInputShutdownEvent;
36 import io.netty.channel.socket.ChannelInputShutdownReadComplete;
37 import io.netty.channel.socket.SocketChannelConfig;
38 import io.netty.channel.unix.FileDescriptor;
39 import io.netty.channel.unix.Socket;
40 import io.netty.channel.unix.UnixChannel;
41 import io.netty.util.ReferenceCountUtil;
42
43 import java.io.IOException;
44 import java.net.InetSocketAddress;
45 import java.net.SocketAddress;
46 import java.nio.ByteBuffer;
47 import java.nio.channels.AlreadyConnectedException;
48 import java.nio.channels.ClosedChannelException;
49 import java.nio.channels.ConnectionPendingException;
50 import java.nio.channels.NotYetConnectedException;
51 import java.nio.channels.UnresolvedAddressException;
52 import java.util.concurrent.ScheduledFuture;
53 import java.util.concurrent.TimeUnit;
54
55 import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
56 import static io.netty.channel.unix.UnixChannelUtil.computeRemoteAddr;
57 import static io.netty.util.internal.ObjectUtil.checkNotNull;
58
59 abstract class AbstractEpollChannel extends AbstractChannel implements UnixChannel {
60 private static final ChannelMetadata METADATA = new ChannelMetadata(false);
61 final LinuxSocket socket;
62
66 private ChannelPromise connectPromise;
67 private ScheduledFuture<?> connectTimeoutFuture;
68 private SocketAddress requestedRemoteAddress;
69
70 private volatile SocketAddress local;
71 private volatile SocketAddress remote;
72
73 protected int flags = Native.EPOLLET;
74 boolean inputClosedSeenErrorOnRead;
75 boolean epollInReadyRunnablePending;
76
77 protected volatile boolean active;
78
79 AbstractEpollChannel(LinuxSocket fd) {
80 this(null, fd, false);
81 }
82
83 AbstractEpollChannel(Channel parent, LinuxSocket fd, boolean active) {
84 super(parent);
85 this.socket = checkNotNull(fd, "fd");
86 this.active = active;
87 if (active) {
88
89
90 this.local = fd.localAddress();
91 this.remote = fd.remoteAddress();
92 }
93 }
94
95 AbstractEpollChannel(Channel parent, LinuxSocket fd, SocketAddress remote) {
96 super(parent);
97 this.socket = checkNotNull(fd, "fd");
98 this.active = true;
99
100
101 this.remote = remote;
102 this.local = fd.localAddress();
103 }
104
105 static boolean isSoErrorZero(Socket fd) {
106 try {
107 return fd.getSoError() == 0;
108 } catch (IOException e) {
109 throw new ChannelException(e);
110 }
111 }
112
113 void setFlag(int flag) throws IOException {
114 if (!isFlagSet(flag)) {
115 flags |= flag;
116 modifyEvents();
117 }
118 }
119
120 void clearFlag(int flag) throws IOException {
121 if (isFlagSet(flag)) {
122 flags &= ~flag;
123 modifyEvents();
124 }
125 }
126
127 boolean isFlagSet(int flag) {
128 return (flags & flag) != 0;
129 }
130
131 @Override
132 public final FileDescriptor fd() {
133 return socket;
134 }
135
136 @Override
137 public abstract EpollChannelConfig config();
138
139 @Override
140 public boolean isActive() {
141 return active;
142 }
143
144 @Override
145 public ChannelMetadata metadata() {
146 return METADATA;
147 }
148
149 @Override
150 protected void doClose() throws Exception {
151 active = false;
152
153
154 inputClosedSeenErrorOnRead = true;
155 try {
156 ChannelPromise promise = connectPromise;
157 if (promise != null) {
158
159 promise.tryFailure(new ClosedChannelException());
160 connectPromise = null;
161 }
162
163 ScheduledFuture<?> future = connectTimeoutFuture;
164 if (future != null) {
165 future.cancel(false);
166 connectTimeoutFuture = null;
167 }
168
169 if (isRegistered()) {
170
171
172
173
174 EventLoop loop = eventLoop();
175 if (loop.inEventLoop()) {
176 doDeregister();
177 } else {
178 loop.execute(new Runnable() {
179 @Override
180 public void run() {
181 try {
182 doDeregister();
183 } catch (Throwable cause) {
184 pipeline().fireExceptionCaught(cause);
185 }
186 }
187 });
188 }
189 }
190 } finally {
191 socket.close();
192 }
193 }
194
195 void resetCachedAddresses() {
196 local = socket.localAddress();
197 remote = socket.remoteAddress();
198 }
199
200 @Override
201 protected void doDisconnect() throws Exception {
202 doClose();
203 }
204
205 @Override
206 protected boolean isCompatible(EventLoop loop) {
207 return loop instanceof EpollEventLoop;
208 }
209
210 @Override
211 public boolean isOpen() {
212 return socket.isOpen();
213 }
214
215 @Override
216 protected void doDeregister() throws Exception {
217 ((EpollEventLoop) eventLoop()).remove(this);
218 }
219
220 @Override
221 protected final void doBeginRead() throws Exception {
222
223 final AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) unsafe();
224 unsafe.readPending = true;
225
226
227
228
229 setFlag(Native.EPOLLIN);
230
231
232
233 if (unsafe.maybeMoreDataToRead) {
234 unsafe.executeEpollInReadyRunnable(config());
235 }
236 }
237
238 final boolean shouldBreakEpollInReady(ChannelConfig config) {
239 return socket.isInputShutdown() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
240 }
241
242 private static boolean isAllowHalfClosure(ChannelConfig config) {
243 if (config instanceof EpollDomainSocketChannelConfig) {
244 return ((EpollDomainSocketChannelConfig) config).isAllowHalfClosure();
245 }
246 return config instanceof SocketChannelConfig &&
247 ((SocketChannelConfig) config).isAllowHalfClosure();
248 }
249
250 final void clearEpollIn() {
251
252 if (isRegistered()) {
253 final EventLoop loop = eventLoop();
254 final AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) unsafe();
255 if (loop.inEventLoop()) {
256 unsafe.clearEpollIn0();
257 } else {
258
259 loop.execute(new Runnable() {
260 @Override
261 public void run() {
262 if (!unsafe.readPending && !config().isAutoRead()) {
263
264 unsafe.clearEpollIn0();
265 }
266 }
267 });
268 }
269 } else {
270
271
272 flags &= ~Native.EPOLLIN;
273 }
274 }
275
276 private void modifyEvents() throws IOException {
277 if (isOpen() && isRegistered()) {
278 ((EpollEventLoop) eventLoop()).modify(this);
279 }
280 }
281
282 @Override
283 protected void doRegister() throws Exception {
284
285
286
287 epollInReadyRunnablePending = false;
288 ((EpollEventLoop) eventLoop()).add(this);
289 }
290
291 @Override
292 protected abstract AbstractEpollUnsafe newUnsafe();
293
294
297 protected final ByteBuf newDirectBuffer(ByteBuf buf) {
298 return newDirectBuffer(buf, buf);
299 }
300
301
306 protected final ByteBuf newDirectBuffer(Object holder, ByteBuf buf) {
307 final int readableBytes = buf.readableBytes();
308 if (readableBytes == 0) {
309 ReferenceCountUtil.release(holder);
310 return Unpooled.EMPTY_BUFFER;
311 }
312
313 final ByteBufAllocator alloc = alloc();
314 if (alloc.isDirectBufferPooled()) {
315 return newDirectBuffer0(holder, buf, alloc, readableBytes);
316 }
317
318 final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
319 if (directBuf == null) {
320 return newDirectBuffer0(holder, buf, alloc, readableBytes);
321 }
322
323 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
324 ReferenceCountUtil.safeRelease(holder);
325 return directBuf;
326 }
327
328 private static ByteBuf newDirectBuffer0(Object holder, ByteBuf buf, ByteBufAllocator alloc, int capacity) {
329 final ByteBuf directBuf = alloc.directBuffer(capacity);
330 directBuf.writeBytes(buf, buf.readerIndex(), capacity);
331 ReferenceCountUtil.safeRelease(holder);
332 return directBuf;
333 }
334
335 protected static void checkResolvable(InetSocketAddress addr) {
336 if (addr.isUnresolved()) {
337 throw new UnresolvedAddressException();
338 }
339 }
340
341
344 protected final int doReadBytes(ByteBuf byteBuf) throws Exception {
345 int writerIndex = byteBuf.writerIndex();
346 int localReadAmount;
347 unsafe().recvBufAllocHandle().attemptedBytesRead(byteBuf.writableBytes());
348 if (byteBuf.hasMemoryAddress()) {
349 localReadAmount = socket.readAddress(byteBuf.memoryAddress(), writerIndex, byteBuf.capacity());
350 } else {
351 ByteBuffer buf = byteBuf.internalNioBuffer(writerIndex, byteBuf.writableBytes());
352 localReadAmount = socket.read(buf, buf.position(), buf.limit());
353 }
354 if (localReadAmount > 0) {
355 byteBuf.writerIndex(writerIndex + localReadAmount);
356 }
357 return localReadAmount;
358 }
359
360 protected final int doWriteBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
361 if (buf.hasMemoryAddress()) {
362 int localFlushedAmount = socket.writeAddress(buf.memoryAddress(), buf.readerIndex(), buf.writerIndex());
363 if (localFlushedAmount > 0) {
364 in.removeBytes(localFlushedAmount);
365 return 1;
366 }
367 } else {
368 final ByteBuffer nioBuf = buf.nioBufferCount() == 1 ?
369 buf.internalNioBuffer(buf.readerIndex(), buf.readableBytes()) : buf.nioBuffer();
370 int localFlushedAmount = socket.write(nioBuf, nioBuf.position(), nioBuf.limit());
371 if (localFlushedAmount > 0) {
372 nioBuf.position(nioBuf.position() + localFlushedAmount);
373 in.removeBytes(localFlushedAmount);
374 return 1;
375 }
376 }
377 return WRITE_STATUS_SNDBUF_FULL;
378 }
379
380 protected abstract class AbstractEpollUnsafe extends AbstractUnsafe {
381 boolean readPending;
382 boolean maybeMoreDataToRead;
383 private EpollRecvByteAllocatorHandle allocHandle;
384 private final Runnable epollInReadyRunnable = new Runnable() {
385 @Override
386 public void run() {
387 epollInReadyRunnablePending = false;
388 epollInReady();
389 }
390 };
391
392
395 abstract void epollInReady();
396
397 final void epollInBefore() {
398 maybeMoreDataToRead = false;
399 }
400
401 final void epollInFinally(ChannelConfig config) {
402 maybeMoreDataToRead = allocHandle.maybeMoreDataToRead();
403
404 if (allocHandle.isReceivedRdHup() || (readPending && maybeMoreDataToRead)) {
405
406
407
408
409
410
411
412 executeEpollInReadyRunnable(config);
413 } else if (!readPending && !config.isAutoRead()) {
414
415
416
417
418
419
420 clearEpollIn();
421 }
422 }
423
424 final void executeEpollInReadyRunnable(ChannelConfig config) {
425 if (epollInReadyRunnablePending || !isActive() || shouldBreakEpollInReady(config)) {
426 return;
427 }
428 epollInReadyRunnablePending = true;
429 eventLoop().execute(epollInReadyRunnable);
430 }
431
432
435 final void epollRdHupReady() {
436
437 recvBufAllocHandle().receivedRdHup();
438
439 if (isActive()) {
440
441
442
443 epollInReady();
444 } else {
445
446 shutdownInput(true);
447 }
448
449
450 clearEpollRdHup();
451 }
452
453
456 private void clearEpollRdHup() {
457 try {
458 clearFlag(Native.EPOLLRDHUP);
459 } catch (IOException e) {
460 pipeline().fireExceptionCaught(e);
461 close(voidPromise());
462 }
463 }
464
465
468 void shutdownInput(boolean rdHup) {
469 if (!socket.isInputShutdown()) {
470 if (isAllowHalfClosure(config())) {
471 try {
472 socket.shutdown(true, false);
473 } catch (IOException ignored) {
474
475
476 fireEventAndClose(ChannelInputShutdownEvent.INSTANCE);
477 return;
478 } catch (NotYetConnectedException ignore) {
479
480
481 }
482 clearEpollIn();
483 pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
484 } else {
485 close(voidPromise());
486 }
487 } else if (!rdHup) {
488 inputClosedSeenErrorOnRead = true;
489 pipeline().fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
490 }
491 }
492
493 private void fireEventAndClose(Object evt) {
494 pipeline().fireUserEventTriggered(evt);
495 close(voidPromise());
496 }
497
498 @Override
499 public EpollRecvByteAllocatorHandle recvBufAllocHandle() {
500 if (allocHandle == null) {
501 allocHandle = newEpollHandle((RecvByteBufAllocator.ExtendedHandle) super.recvBufAllocHandle());
502 }
503 return allocHandle;
504 }
505
506
510 EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.ExtendedHandle handle) {
511 return new EpollRecvByteAllocatorHandle(handle);
512 }
513
514 @Override
515 protected final void flush0() {
516
517
518
519 if (!isFlagSet(Native.EPOLLOUT)) {
520 super.flush0();
521 }
522 }
523
524
527 final void epollOutReady() {
528 if (connectPromise != null) {
529
530 finishConnect();
531 } else if (!socket.isOutputShutdown()) {
532
533 super.flush0();
534 }
535 }
536
537 protected final void clearEpollIn0() {
538 assert eventLoop().inEventLoop();
539 try {
540 readPending = false;
541 clearFlag(Native.EPOLLIN);
542 } catch (IOException e) {
543
544
545 pipeline().fireExceptionCaught(e);
546 unsafe().close(unsafe().voidPromise());
547 }
548 }
549
550 @Override
551 public void connect(
552 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
553 if (!promise.setUncancellable() || !ensureOpen(promise)) {
554 return;
555 }
556
557 try {
558 if (connectPromise != null) {
559 throw new ConnectionPendingException();
560 }
561
562 boolean wasActive = isActive();
563 if (doConnect(remoteAddress, localAddress)) {
564 fulfillConnectPromise(promise, wasActive);
565 } else {
566 connectPromise = promise;
567 requestedRemoteAddress = remoteAddress;
568
569
570 int connectTimeoutMillis = config().getConnectTimeoutMillis();
571 if (connectTimeoutMillis > 0) {
572 connectTimeoutFuture = eventLoop().schedule(new Runnable() {
573 @Override
574 public void run() {
575 ChannelPromise connectPromise = AbstractEpollChannel.this.connectPromise;
576 if (connectPromise != null && !connectPromise.isDone()
577 && connectPromise.tryFailure(new ConnectTimeoutException(
578 "connection timed out: " + remoteAddress))) {
579 close(voidPromise());
580 }
581 }
582 }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
583 }
584
585 promise.addListener(new ChannelFutureListener() {
586 @Override
587 public void operationComplete(ChannelFuture future) throws Exception {
588 if (future.isCancelled()) {
589 if (connectTimeoutFuture != null) {
590 connectTimeoutFuture.cancel(false);
591 }
592 connectPromise = null;
593 close(voidPromise());
594 }
595 }
596 });
597 }
598 } catch (Throwable t) {
599 closeIfClosed();
600 promise.tryFailure(annotateConnectException(t, remoteAddress));
601 }
602 }
603
604 private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
605 if (promise == null) {
606
607 return;
608 }
609 active = true;
610
611
612
613 boolean active = isActive();
614
615
616 boolean promiseSet = promise.trySuccess();
617
618
619
620 if (!wasActive && active) {
621 pipeline().fireChannelActive();
622 }
623
624
625 if (!promiseSet) {
626 close(voidPromise());
627 }
628 }
629
630 private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
631 if (promise == null) {
632
633 return;
634 }
635
636
637 promise.tryFailure(cause);
638 closeIfClosed();
639 }
640
641 private void finishConnect() {
642
643
644
645 assert eventLoop().inEventLoop();
646
647 boolean connectStillInProgress = false;
648 try {
649 boolean wasActive = isActive();
650 if (!doFinishConnect()) {
651 connectStillInProgress = true;
652 return;
653 }
654 fulfillConnectPromise(connectPromise, wasActive);
655 } catch (Throwable t) {
656 fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
657 } finally {
658 if (!connectStillInProgress) {
659
660
661 if (connectTimeoutFuture != null) {
662 connectTimeoutFuture.cancel(false);
663 }
664 connectPromise = null;
665 }
666 }
667 }
668
669
672 private boolean doFinishConnect() throws Exception {
673 if (socket.finishConnect()) {
674 clearFlag(Native.EPOLLOUT);
675 if (requestedRemoteAddress instanceof InetSocketAddress) {
676 remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress());
677 }
678 requestedRemoteAddress = null;
679
680 return true;
681 }
682 setFlag(Native.EPOLLOUT);
683 return false;
684 }
685 }
686
687 @Override
688 protected void doBind(SocketAddress local) throws Exception {
689 if (local instanceof InetSocketAddress) {
690 checkResolvable((InetSocketAddress) local);
691 }
692 socket.bind(local);
693 this.local = socket.localAddress();
694 }
695
696
699 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
700 if (localAddress instanceof InetSocketAddress) {
701 checkResolvable((InetSocketAddress) localAddress);
702 }
703
704 InetSocketAddress remoteSocketAddr = remoteAddress instanceof InetSocketAddress
705 ? (InetSocketAddress) remoteAddress : null;
706 if (remoteSocketAddr != null) {
707 checkResolvable(remoteSocketAddr);
708 }
709
710 if (remote != null) {
711
712
713
714 throw new AlreadyConnectedException();
715 }
716
717 if (localAddress != null) {
718 socket.bind(localAddress);
719 }
720
721 boolean connected = doConnect0(remoteAddress);
722 if (connected) {
723 remote = remoteSocketAddr == null ?
724 remoteAddress : computeRemoteAddr(remoteSocketAddr, socket.remoteAddress());
725 }
726
727
728
729 local = socket.localAddress();
730 return connected;
731 }
732
733 private boolean doConnect0(SocketAddress remote) throws Exception {
734 boolean success = false;
735 try {
736 boolean connected = socket.connect(remote);
737 if (!connected) {
738 setFlag(Native.EPOLLOUT);
739 }
740 success = true;
741 return connected;
742 } finally {
743 if (!success) {
744 doClose();
745 }
746 }
747 }
748
749 @Override
750 protected SocketAddress localAddress0() {
751 return local;
752 }
753
754 @Override
755 protected SocketAddress remoteAddress0() {
756 return remote;
757 }
758 }
759