1
16 package io.netty.channel;
17
18 import io.netty.buffer.ByteBufAllocator;
19 import io.netty.channel.socket.ChannelOutputShutdownEvent;
20 import io.netty.channel.socket.ChannelOutputShutdownException;
21 import io.netty.util.DefaultAttributeMap;
22 import io.netty.util.ReferenceCountUtil;
23 import io.netty.util.internal.ObjectUtil;
24 import io.netty.util.internal.PlatformDependent;
25 import io.netty.util.internal.UnstableApi;
26 import io.netty.util.internal.logging.InternalLogger;
27 import io.netty.util.internal.logging.InternalLoggerFactory;
28
29 import java.io.IOException;
30 import java.net.ConnectException;
31 import java.net.InetSocketAddress;
32 import java.net.NoRouteToHostException;
33 import java.net.SocketAddress;
34 import java.net.SocketException;
35 import java.nio.channels.ClosedChannelException;
36 import java.nio.channels.NotYetConnectedException;
37 import java.util.concurrent.Executor;
38 import java.util.concurrent.RejectedExecutionException;
39
40
43 public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
44
45 private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannel.class);
46
47 private final Channel parent;
48 private final ChannelId id;
49 private final Unsafe unsafe;
50 private final DefaultChannelPipeline pipeline;
51 private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);
52 private final CloseFuture closeFuture = new CloseFuture(this);
53
54 private volatile SocketAddress localAddress;
55 private volatile SocketAddress remoteAddress;
56 private volatile EventLoop eventLoop;
57 private volatile boolean registered;
58 private boolean closeInitiated;
59 private Throwable initialCloseCause;
60
61
62 private boolean strValActive;
63 private String strVal;
64
65
71 protected AbstractChannel(Channel parent) {
72 this.parent = parent;
73 id = newId();
74 unsafe = newUnsafe();
75 pipeline = newChannelPipeline();
76 }
77
78
84 protected AbstractChannel(Channel parent, ChannelId id) {
85 this.parent = parent;
86 this.id = id;
87 unsafe = newUnsafe();
88 pipeline = newChannelPipeline();
89 }
90
91 @Override
92 public final ChannelId id() {
93 return id;
94 }
95
96
100 protected ChannelId newId() {
101 return DefaultChannelId.newInstance();
102 }
103
104
107 protected DefaultChannelPipeline newChannelPipeline() {
108 return new DefaultChannelPipeline(this);
109 }
110
111 @Override
112 public boolean isWritable() {
113 ChannelOutboundBuffer buf = unsafe.outboundBuffer();
114 return buf != null && buf.isWritable();
115 }
116
117 @Override
118 public long bytesBeforeUnwritable() {
119 ChannelOutboundBuffer buf = unsafe.outboundBuffer();
120
121
122 return buf != null ? buf.bytesBeforeUnwritable() : 0;
123 }
124
125 @Override
126 public long bytesBeforeWritable() {
127 ChannelOutboundBuffer buf = unsafe.outboundBuffer();
128
129
130 return buf != null ? buf.bytesBeforeWritable() : Long.MAX_VALUE;
131 }
132
133 @Override
134 public Channel parent() {
135 return parent;
136 }
137
138 @Override
139 public ChannelPipeline pipeline() {
140 return pipeline;
141 }
142
143 @Override
144 public ByteBufAllocator alloc() {
145 return config().getAllocator();
146 }
147
148 @Override
149 public EventLoop eventLoop() {
150 EventLoop eventLoop = this.eventLoop;
151 if (eventLoop == null) {
152 throw new IllegalStateException("channel not registered to an event loop");
153 }
154 return eventLoop;
155 }
156
157 @Override
158 public SocketAddress localAddress() {
159 SocketAddress localAddress = this.localAddress;
160 if (localAddress == null) {
161 try {
162 this.localAddress = localAddress = unsafe().localAddress();
163 } catch (Error e) {
164 throw e;
165 } catch (Throwable t) {
166
167 return null;
168 }
169 }
170 return localAddress;
171 }
172
173
176 @Deprecated
177 protected void invalidateLocalAddress() {
178 localAddress = null;
179 }
180
181 @Override
182 public SocketAddress remoteAddress() {
183 SocketAddress remoteAddress = this.remoteAddress;
184 if (remoteAddress == null) {
185 try {
186 this.remoteAddress = remoteAddress = unsafe().remoteAddress();
187 } catch (Error e) {
188 throw e;
189 } catch (Throwable t) {
190
191 return null;
192 }
193 }
194 return remoteAddress;
195 }
196
197
200 @Deprecated
201 protected void invalidateRemoteAddress() {
202 remoteAddress = null;
203 }
204
205 @Override
206 public boolean isRegistered() {
207 return registered;
208 }
209
210 @Override
211 public ChannelFuture bind(SocketAddress localAddress) {
212 return pipeline.bind(localAddress);
213 }
214
215 @Override
216 public ChannelFuture connect(SocketAddress remoteAddress) {
217 return pipeline.connect(remoteAddress);
218 }
219
220 @Override
221 public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
222 return pipeline.connect(remoteAddress, localAddress);
223 }
224
225 @Override
226 public ChannelFuture disconnect() {
227 return pipeline.disconnect();
228 }
229
230 @Override
231 public ChannelFuture close() {
232 return pipeline.close();
233 }
234
235 @Override
236 public ChannelFuture deregister() {
237 return pipeline.deregister();
238 }
239
240 @Override
241 public Channel flush() {
242 pipeline.flush();
243 return this;
244 }
245
246 @Override
247 public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
248 return pipeline.bind(localAddress, promise);
249 }
250
251 @Override
252 public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
253 return pipeline.connect(remoteAddress, promise);
254 }
255
256 @Override
257 public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
258 return pipeline.connect(remoteAddress, localAddress, promise);
259 }
260
261 @Override
262 public ChannelFuture disconnect(ChannelPromise promise) {
263 return pipeline.disconnect(promise);
264 }
265
266 @Override
267 public ChannelFuture close(ChannelPromise promise) {
268 return pipeline.close(promise);
269 }
270
271 @Override
272 public ChannelFuture deregister(ChannelPromise promise) {
273 return pipeline.deregister(promise);
274 }
275
276 @Override
277 public Channel read() {
278 pipeline.read();
279 return this;
280 }
281
282 @Override
283 public ChannelFuture write(Object msg) {
284 return pipeline.write(msg);
285 }
286
287 @Override
288 public ChannelFuture write(Object msg, ChannelPromise promise) {
289 return pipeline.write(msg, promise);
290 }
291
292 @Override
293 public ChannelFuture writeAndFlush(Object msg) {
294 return pipeline.writeAndFlush(msg);
295 }
296
297 @Override
298 public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
299 return pipeline.writeAndFlush(msg, promise);
300 }
301
302 @Override
303 public ChannelPromise newPromise() {
304 return pipeline.newPromise();
305 }
306
307 @Override
308 public ChannelProgressivePromise newProgressivePromise() {
309 return pipeline.newProgressivePromise();
310 }
311
312 @Override
313 public ChannelFuture newSucceededFuture() {
314 return pipeline.newSucceededFuture();
315 }
316
317 @Override
318 public ChannelFuture newFailedFuture(Throwable cause) {
319 return pipeline.newFailedFuture(cause);
320 }
321
322 @Override
323 public ChannelFuture closeFuture() {
324 return closeFuture;
325 }
326
327 @Override
328 public Unsafe unsafe() {
329 return unsafe;
330 }
331
332
335 protected abstract AbstractUnsafe newUnsafe();
336
337
340 @Override
341 public final int hashCode() {
342 return id.hashCode();
343 }
344
345
349 @Override
350 public final boolean equals(Object o) {
351 return this == o;
352 }
353
354 @Override
355 public final int compareTo(Channel o) {
356 if (this == o) {
357 return 0;
358 }
359
360 return id().compareTo(o.id());
361 }
362
363
369 @Override
370 public String toString() {
371 boolean active = isActive();
372 if (strValActive == active && strVal != null) {
373 return strVal;
374 }
375
376 SocketAddress remoteAddr = remoteAddress();
377 SocketAddress localAddr = localAddress();
378 if (remoteAddr != null) {
379 StringBuilder buf = new StringBuilder(96)
380 .append("[id: 0x")
381 .append(id.asShortText())
382 .append(", L:")
383 .append(localAddr)
384 .append(active? " - " : " ! ")
385 .append("R:")
386 .append(remoteAddr)
387 .append(']');
388 strVal = buf.toString();
389 } else if (localAddr != null) {
390 StringBuilder buf = new StringBuilder(64)
391 .append("[id: 0x")
392 .append(id.asShortText())
393 .append(", L:")
394 .append(localAddr)
395 .append(']');
396 strVal = buf.toString();
397 } else {
398 StringBuilder buf = new StringBuilder(16)
399 .append("[id: 0x")
400 .append(id.asShortText())
401 .append(']');
402 strVal = buf.toString();
403 }
404
405 strValActive = active;
406 return strVal;
407 }
408
409 @Override
410 public final ChannelPromise voidPromise() {
411 return pipeline.voidPromise();
412 }
413
414
417 protected abstract class AbstractUnsafe implements Unsafe {
418
419 private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);
420 private RecvByteBufAllocator.Handle recvHandle;
421 private boolean inFlush0;
422
423 private boolean neverRegistered = true;
424
425 private void assertEventLoop() {
426 assert !registered || eventLoop.inEventLoop();
427 }
428
429 @Override
430 public RecvByteBufAllocator.Handle recvBufAllocHandle() {
431 if (recvHandle == null) {
432 recvHandle = config().getRecvByteBufAllocator().newHandle();
433 }
434 return recvHandle;
435 }
436
437 @Override
438 public final ChannelOutboundBuffer outboundBuffer() {
439 return outboundBuffer;
440 }
441
442 @Override
443 public final SocketAddress localAddress() {
444 return localAddress0();
445 }
446
447 @Override
448 public final SocketAddress remoteAddress() {
449 return remoteAddress0();
450 }
451
452 @Override
453 public final void register(EventLoop eventLoop, final ChannelPromise promise) {
454 ObjectUtil.checkNotNull(eventLoop, "eventLoop");
455 if (isRegistered()) {
456 promise.setFailure(new IllegalStateException("registered to an event loop already"));
457 return;
458 }
459 if (!isCompatible(eventLoop)) {
460 promise.setFailure(
461 new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
462 return;
463 }
464
465 AbstractChannel.this.eventLoop = eventLoop;
466
467 if (eventLoop.inEventLoop()) {
468 register0(promise);
469 } else {
470 try {
471 eventLoop.execute(new Runnable() {
472 @Override
473 public void run() {
474 register0(promise);
475 }
476 });
477 } catch (Throwable t) {
478 logger.warn(
479 "Force-closing a channel whose registration task was not accepted by an event loop: {}",
480 AbstractChannel.this, t);
481 closeForcibly();
482 closeFuture.setClosed();
483 safeSetFailure(promise, t);
484 }
485 }
486 }
487
488 private void register0(ChannelPromise promise) {
489 try {
490
491
492 if (!promise.setUncancellable() || !ensureOpen(promise)) {
493 return;
494 }
495 boolean firstRegistration = neverRegistered;
496 doRegister();
497 neverRegistered = false;
498 registered = true;
499
500
501
502 pipeline.invokeHandlerAddedIfNeeded();
503
504 safeSetSuccess(promise);
505 pipeline.fireChannelRegistered();
506
507
508 if (isActive()) {
509 if (firstRegistration) {
510 pipeline.fireChannelActive();
511 } else if (config().isAutoRead()) {
512
513
514
515
516 beginRead();
517 }
518 }
519 } catch (Throwable t) {
520
521 closeForcibly();
522 closeFuture.setClosed();
523 safeSetFailure(promise, t);
524 }
525 }
526
527 @Override
528 public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
529 assertEventLoop();
530
531 if (!promise.setUncancellable() || !ensureOpen(promise)) {
532 return;
533 }
534
535
536 if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
537 localAddress instanceof InetSocketAddress &&
538 !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
539 !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
540
541
542 logger.warn(
543 "A non-root user can't receive a broadcast packet if the socket " +
544 "is not bound to a wildcard address; binding to a non-wildcard " +
545 "address (" + localAddress + ") anyway as requested.");
546 }
547
548 boolean wasActive = isActive();
549 try {
550 doBind(localAddress);
551 } catch (Throwable t) {
552 safeSetFailure(promise, t);
553 closeIfClosed();
554 return;
555 }
556
557 if (!wasActive && isActive()) {
558 invokeLater(new Runnable() {
559 @Override
560 public void run() {
561 pipeline.fireChannelActive();
562 }
563 });
564 }
565
566 safeSetSuccess(promise);
567 }
568
569 @Override
570 public final void disconnect(final ChannelPromise promise) {
571 assertEventLoop();
572
573 if (!promise.setUncancellable()) {
574 return;
575 }
576
577 boolean wasActive = isActive();
578 try {
579 doDisconnect();
580
581 remoteAddress = null;
582 localAddress = null;
583 } catch (Throwable t) {
584 safeSetFailure(promise, t);
585 closeIfClosed();
586 return;
587 }
588
589 if (wasActive && !isActive()) {
590 invokeLater(new Runnable() {
591 @Override
592 public void run() {
593 pipeline.fireChannelInactive();
594 }
595 });
596 }
597
598 safeSetSuccess(promise);
599 closeIfClosed();
600 }
601
602 @Override
603 public final void close(final ChannelPromise promise) {
604 assertEventLoop();
605
606 ClosedChannelException closedChannelException =
607 StacklessClosedChannelException.newInstance(AbstractChannel.class, "close(ChannelPromise)");
608 close(promise, closedChannelException, closedChannelException, false);
609 }
610
611
615 @UnstableApi
616 public final void shutdownOutput(final ChannelPromise promise) {
617 assertEventLoop();
618 shutdownOutput(promise, null);
619 }
620
621
626 private void shutdownOutput(final ChannelPromise promise, Throwable cause) {
627 if (!promise.setUncancellable()) {
628 return;
629 }
630
631 final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
632 if (outboundBuffer == null) {
633 promise.setFailure(new ClosedChannelException());
634 return;
635 }
636 this.outboundBuffer = null;
637
638 final Throwable shutdownCause = cause == null ?
639 new ChannelOutputShutdownException("Channel output shutdown") :
640 new ChannelOutputShutdownException("Channel output shutdown", cause);
641 Executor closeExecutor = prepareToClose();
642 if (closeExecutor != null) {
643 closeExecutor.execute(new Runnable() {
644 @Override
645 public void run() {
646 try {
647
648 doShutdownOutput();
649 promise.setSuccess();
650 } catch (Throwable err) {
651 promise.setFailure(err);
652 } finally {
653
654 eventLoop().execute(new Runnable() {
655 @Override
656 public void run() {
657 closeOutboundBufferForShutdown(pipeline, outboundBuffer, shutdownCause);
658 }
659 });
660 }
661 }
662 });
663 } else {
664 try {
665
666 doShutdownOutput();
667 promise.setSuccess();
668 } catch (Throwable err) {
669 promise.setFailure(err);
670 } finally {
671 closeOutboundBufferForShutdown(pipeline, outboundBuffer, shutdownCause);
672 }
673 }
674 }
675
676 private void closeOutboundBufferForShutdown(
677 ChannelPipeline pipeline, ChannelOutboundBuffer buffer, Throwable cause) {
678 buffer.failFlushed(cause, false);
679 buffer.close(cause, true);
680 pipeline.fireUserEventTriggered(ChannelOutputShutdownEvent.INSTANCE);
681 }
682
683 private void close(final ChannelPromise promise, final Throwable cause,
684 final ClosedChannelException closeCause, final boolean notify) {
685 if (!promise.setUncancellable()) {
686 return;
687 }
688
689 if (closeInitiated) {
690 if (closeFuture.isDone()) {
691
692 safeSetSuccess(promise);
693 } else if (!(promise instanceof VoidChannelPromise)) {
694
695 closeFuture.addListener(new ChannelFutureListener() {
696 @Override
697 public void operationComplete(ChannelFuture future) throws Exception {
698 promise.setSuccess();
699 }
700 });
701 }
702 return;
703 }
704
705 closeInitiated = true;
706
707 final boolean wasActive = isActive();
708 final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
709 this.outboundBuffer = null;
710 Executor closeExecutor = prepareToClose();
711 if (closeExecutor != null) {
712 closeExecutor.execute(new Runnable() {
713 @Override
714 public void run() {
715 try {
716
717 doClose0(promise);
718 } finally {
719
720 invokeLater(new Runnable() {
721 @Override
722 public void run() {
723 if (outboundBuffer != null) {
724
725 outboundBuffer.failFlushed(cause, notify);
726 outboundBuffer.close(closeCause);
727 }
728 fireChannelInactiveAndDeregister(wasActive);
729 }
730 });
731 }
732 }
733 });
734 } else {
735 try {
736
737 doClose0(promise);
738 } finally {
739 if (outboundBuffer != null) {
740
741 outboundBuffer.failFlushed(cause, notify);
742 outboundBuffer.close(closeCause);
743 }
744 }
745 if (inFlush0) {
746 invokeLater(new Runnable() {
747 @Override
748 public void run() {
749 fireChannelInactiveAndDeregister(wasActive);
750 }
751 });
752 } else {
753 fireChannelInactiveAndDeregister(wasActive);
754 }
755 }
756 }
757
758 private void doClose0(ChannelPromise promise) {
759 try {
760 doClose();
761 closeFuture.setClosed();
762 safeSetSuccess(promise);
763 } catch (Throwable t) {
764 closeFuture.setClosed();
765 safeSetFailure(promise, t);
766 }
767 }
768
769 private void fireChannelInactiveAndDeregister(final boolean wasActive) {
770 deregister(voidPromise(), wasActive && !isActive());
771 }
772
773 @Override
774 public final void closeForcibly() {
775 assertEventLoop();
776
777 try {
778 doClose();
779 } catch (Exception e) {
780 logger.warn("Failed to close a channel.", e);
781 }
782 }
783
784 @Override
785 public final void deregister(final ChannelPromise promise) {
786 assertEventLoop();
787
788 deregister(promise, false);
789 }
790
791 private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) {
792 if (!promise.setUncancellable()) {
793 return;
794 }
795
796 if (!registered) {
797 safeSetSuccess(promise);
798 return;
799 }
800
801
802
803
804
805
806
807
808
809
810 invokeLater(new Runnable() {
811 @Override
812 public void run() {
813 try {
814 doDeregister();
815 } catch (Throwable t) {
816 logger.warn("Unexpected exception occurred while deregistering a channel.", t);
817 } finally {
818 if (fireChannelInactive) {
819 pipeline.fireChannelInactive();
820 }
821
822
823
824
825 if (registered) {
826 registered = false;
827 pipeline.fireChannelUnregistered();
828 }
829 safeSetSuccess(promise);
830 }
831 }
832 });
833 }
834
835 @Override
836 public final void beginRead() {
837 assertEventLoop();
838
839 if (!isActive()) {
840 return;
841 }
842
843 try {
844 doBeginRead();
845 } catch (final Exception e) {
846 invokeLater(new Runnable() {
847 @Override
848 public void run() {
849 pipeline.fireExceptionCaught(e);
850 }
851 });
852 close(voidPromise());
853 }
854 }
855
856 @Override
857 public final void write(Object msg, ChannelPromise promise) {
858 assertEventLoop();
859
860 ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
861 if (outboundBuffer == null) {
862
863
864
865
866 safeSetFailure(promise, newClosedChannelException(initialCloseCause, "write(Object, ChannelPromise)"));
867
868 ReferenceCountUtil.release(msg);
869 return;
870 }
871
872 int size;
873 try {
874 msg = filterOutboundMessage(msg);
875 size = pipeline.estimatorHandle().size(msg);
876 if (size < 0) {
877 size = 0;
878 }
879 } catch (Throwable t) {
880 safeSetFailure(promise, t);
881 ReferenceCountUtil.release(msg);
882 return;
883 }
884
885 outboundBuffer.addMessage(msg, size, promise);
886 }
887
888 @Override
889 public final void flush() {
890 assertEventLoop();
891
892 ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
893 if (outboundBuffer == null) {
894 return;
895 }
896
897 outboundBuffer.addFlush();
898 flush0();
899 }
900
901 @SuppressWarnings("deprecation")
902 protected void flush0() {
903 if (inFlush0) {
904
905 return;
906 }
907
908 final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
909 if (outboundBuffer == null || outboundBuffer.isEmpty()) {
910 return;
911 }
912
913 inFlush0 = true;
914
915
916 if (!isActive()) {
917 try {
918
919 if (!outboundBuffer.isEmpty()) {
920 if (isOpen()) {
921 outboundBuffer.failFlushed(new NotYetConnectedException(), true);
922 } else {
923
924 outboundBuffer.failFlushed(newClosedChannelException(initialCloseCause, "flush0()"), false);
925 }
926 }
927 } finally {
928 inFlush0 = false;
929 }
930 return;
931 }
932
933 try {
934 doWrite(outboundBuffer);
935 } catch (Throwable t) {
936 if (t instanceof IOException && config().isAutoClose()) {
937
945 initialCloseCause = t;
946 close(voidPromise(), t, newClosedChannelException(t, "flush0()"), false);
947 } else {
948 try {
949 shutdownOutput(voidPromise(), t);
950 } catch (Throwable t2) {
951 initialCloseCause = t;
952 close(voidPromise(), t2, newClosedChannelException(t, "flush0()"), false);
953 }
954 }
955 } finally {
956 inFlush0 = false;
957 }
958 }
959
960 private ClosedChannelException newClosedChannelException(Throwable cause, String method) {
961 ClosedChannelException exception =
962 StacklessClosedChannelException.newInstance(AbstractChannel.AbstractUnsafe.class, method);
963 if (cause != null) {
964 exception.initCause(cause);
965 }
966 return exception;
967 }
968
969 @Override
970 public final ChannelPromise voidPromise() {
971 assertEventLoop();
972
973 return unsafeVoidPromise;
974 }
975
976 protected final boolean ensureOpen(ChannelPromise promise) {
977 if (isOpen()) {
978 return true;
979 }
980
981 safeSetFailure(promise, newClosedChannelException(initialCloseCause, "ensureOpen(ChannelPromise)"));
982 return false;
983 }
984
985
988 protected final void safeSetSuccess(ChannelPromise promise) {
989 if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
990 logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
991 }
992 }
993
994
997 protected final void safeSetFailure(ChannelPromise promise, Throwable cause) {
998 if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) {
999 logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause);
1000 }
1001 }
1002
1003 protected final void closeIfClosed() {
1004 if (isOpen()) {
1005 return;
1006 }
1007 close(voidPromise());
1008 }
1009
1010 private void invokeLater(Runnable task) {
1011 try {
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023 eventLoop().execute(task);
1024 } catch (RejectedExecutionException e) {
1025 logger.warn("Can't invoke task later as EventLoop rejected it", e);
1026 }
1027 }
1028
1029
1032 protected final Throwable annotateConnectException(Throwable cause, SocketAddress remoteAddress) {
1033 if (cause instanceof ConnectException) {
1034 return new AnnotatedConnectException((ConnectException) cause, remoteAddress);
1035 }
1036 if (cause instanceof NoRouteToHostException) {
1037 return new AnnotatedNoRouteToHostException((NoRouteToHostException) cause, remoteAddress);
1038 }
1039 if (cause instanceof SocketException) {
1040 return new AnnotatedSocketException((SocketException) cause, remoteAddress);
1041 }
1042
1043 return cause;
1044 }
1045
1046
1052 protected Executor prepareToClose() {
1053 return null;
1054 }
1055 }
1056
1057
1060 protected abstract boolean isCompatible(EventLoop loop);
1061
1062
1065 protected abstract SocketAddress localAddress0();
1066
1067
1070 protected abstract SocketAddress remoteAddress0();
1071
1072
1077 protected void doRegister() throws Exception {
1078
1079 }
1080
1081
1084 protected abstract void doBind(SocketAddress localAddress) throws Exception;
1085
1086
1089 protected abstract void doDisconnect() throws Exception;
1090
1091
1094 protected abstract void doClose() throws Exception;
1095
1096
1100 @UnstableApi
1101 protected void doShutdownOutput() throws Exception {
1102 doClose();
1103 }
1104
1105
1110 protected void doDeregister() throws Exception {
1111
1112 }
1113
1114
1117 protected abstract void doBeginRead() throws Exception;
1118
1119
1122 protected abstract void doWrite(ChannelOutboundBuffer in) throws Exception;
1123
1124
1128 protected Object filterOutboundMessage(Object msg) throws Exception {
1129 return msg;
1130 }
1131
1132 protected void validateFileRegion(DefaultFileRegion region, long position) throws IOException {
1133 DefaultFileRegion.validate(region, position);
1134 }
1135
1136 static final class CloseFuture extends DefaultChannelPromise {
1137
1138 CloseFuture(AbstractChannel ch) {
1139 super(ch);
1140 }
1141
1142 @Override
1143 public ChannelPromise setSuccess() {
1144 throw new IllegalStateException();
1145 }
1146
1147 @Override
1148 public ChannelPromise setFailure(Throwable cause) {
1149 throw new IllegalStateException();
1150 }
1151
1152 @Override
1153 public boolean trySuccess() {
1154 throw new IllegalStateException();
1155 }
1156
1157 @Override
1158 public boolean tryFailure(Throwable cause) {
1159 throw new IllegalStateException();
1160 }
1161
1162 boolean setClosed() {
1163 return super.trySuccess();
1164 }
1165 }
1166
1167 private static final class AnnotatedConnectException extends ConnectException {
1168
1169 private static final long serialVersionUID = 3901958112696433556L;
1170
1171 AnnotatedConnectException(ConnectException exception, SocketAddress remoteAddress) {
1172 super(exception.getMessage() + ": " + remoteAddress);
1173 initCause(exception);
1174 }
1175
1176 @Override
1177 public Throwable fillInStackTrace() {
1178 return this;
1179 }
1180 }
1181
1182 private static final class AnnotatedNoRouteToHostException extends NoRouteToHostException {
1183
1184 private static final long serialVersionUID = -6801433937592080623L;
1185
1186 AnnotatedNoRouteToHostException(NoRouteToHostException exception, SocketAddress remoteAddress) {
1187 super(exception.getMessage() + ": " + remoteAddress);
1188 initCause(exception);
1189 }
1190
1191 @Override
1192 public Throwable fillInStackTrace() {
1193 return this;
1194 }
1195 }
1196
1197 private static final class AnnotatedSocketException extends SocketException {
1198
1199 private static final long serialVersionUID = 3896743275010454039L;
1200
1201 AnnotatedSocketException(SocketException exception, SocketAddress remoteAddress) {
1202 super(exception.getMessage() + ": " + remoteAddress);
1203 initCause(exception);
1204 }
1205
1206 @Override
1207 public Throwable fillInStackTrace() {
1208 return this;
1209 }
1210 }
1211 }
1212