1 /*
2  * Copyright 2012 The Netty Project
3  *
4  * The Netty Project licenses this file to you under the Apache License,
5  * version 2.0 (the "License"); you may not use this file except in compliance
6  * with the License. You may obtain a copy of the License at:
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13  * License for the specific language governing permissions and limitations
14  * under the License.
15  */

16 package io.netty.channel;
17
18 import io.netty.buffer.ByteBufAllocator;
19 import io.netty.channel.socket.ChannelOutputShutdownEvent;
20 import io.netty.channel.socket.ChannelOutputShutdownException;
21 import io.netty.util.DefaultAttributeMap;
22 import io.netty.util.ReferenceCountUtil;
23 import io.netty.util.internal.ObjectUtil;
24 import io.netty.util.internal.PlatformDependent;
25 import io.netty.util.internal.UnstableApi;
26 import io.netty.util.internal.logging.InternalLogger;
27 import io.netty.util.internal.logging.InternalLoggerFactory;
28
29 import java.io.IOException;
30 import java.net.ConnectException;
31 import java.net.InetSocketAddress;
32 import java.net.NoRouteToHostException;
33 import java.net.SocketAddress;
34 import java.net.SocketException;
35 import java.nio.channels.ClosedChannelException;
36 import java.nio.channels.NotYetConnectedException;
37 import java.util.concurrent.Executor;
38 import java.util.concurrent.RejectedExecutionException;
39
40 /**
41  * A skeletal {@link Channel} implementation.
42  */

43 public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
44
45     private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannel.class);
46
47     private final Channel parent;
48     private final ChannelId id;
49     private final Unsafe unsafe;
50     private final DefaultChannelPipeline pipeline;
51     private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(thisfalse);
52     private final CloseFuture closeFuture = new CloseFuture(this);
53
54     private volatile SocketAddress localAddress;
55     private volatile SocketAddress remoteAddress;
56     private volatile EventLoop eventLoop;
57     private volatile boolean registered;
58     private boolean closeInitiated;
59     private Throwable initialCloseCause;
60
61     /** Cache for the string representation of this channel */
62     private boolean strValActive;
63     private String strVal;
64
65     /**
66      * Creates a new instance.
67      *
68      * @param parent
69      *        the parent of this channel. {@code nullif there's no parent.
70      */

71     protected AbstractChannel(Channel parent) {
72         this.parent = parent;
73         id = newId();
74         unsafe = newUnsafe();
75         pipeline = newChannelPipeline();
76     }
77
78     /**
79      * Creates a new instance.
80      *
81      * @param parent
82      *        the parent of this channel. {@code nullif there's no parent.
83      */

84     protected AbstractChannel(Channel parent, ChannelId id) {
85         this.parent = parent;
86         this.id = id;
87         unsafe = newUnsafe();
88         pipeline = newChannelPipeline();
89     }
90
91     @Override
92     public final ChannelId id() {
93         return id;
94     }
95
96     /**
97      * Returns a new {@link DefaultChannelId} instance. Subclasses may override this method to assign custom
98      * {@link ChannelId}s to {@link Channel}s that use the {@link AbstractChannel#AbstractChannel(Channel)} constructor.
99      */

100     protected ChannelId newId() {
101         return DefaultChannelId.newInstance();
102     }
103
104     /**
105      * Returns a new {@link DefaultChannelPipeline} instance.
106      */

107     protected DefaultChannelPipeline newChannelPipeline() {
108         return new DefaultChannelPipeline(this);
109     }
110
111     @Override
112     public boolean isWritable() {
113         ChannelOutboundBuffer buf = unsafe.outboundBuffer();
114         return buf != null && buf.isWritable();
115     }
116
117     @Override
118     public long bytesBeforeUnwritable() {
119         ChannelOutboundBuffer buf = unsafe.outboundBuffer();
120         // isWritable() is currently assuming if there is no outboundBuffer then the channel is not writable.
121         // We should be consistent with that here.
122         return buf != null ? buf.bytesBeforeUnwritable() : 0;
123     }
124
125     @Override
126     public long bytesBeforeWritable() {
127         ChannelOutboundBuffer buf = unsafe.outboundBuffer();
128         // isWritable() is currently assuming if there is no outboundBuffer then the channel is not writable.
129         // We should be consistent with that here.
130         return buf != null ? buf.bytesBeforeWritable() : Long.MAX_VALUE;
131     }
132
133     @Override
134     public Channel parent() {
135         return parent;
136     }
137
138     @Override
139     public ChannelPipeline pipeline() {
140         return pipeline;
141     }
142
143     @Override
144     public ByteBufAllocator alloc() {
145         return config().getAllocator();
146     }
147
148     @Override
149     public EventLoop eventLoop() {
150         EventLoop eventLoop = this.eventLoop;
151         if (eventLoop == null) {
152             throw new IllegalStateException("channel not registered to an event loop");
153         }
154         return eventLoop;
155     }
156
157     @Override
158     public SocketAddress localAddress() {
159         SocketAddress localAddress = this.localAddress;
160         if (localAddress == null) {
161             try {
162                 this.localAddress = localAddress = unsafe().localAddress();
163             } catch (Error e) {
164                 throw e;
165             } catch (Throwable t) {
166                 // Sometimes fails on a closed socket in Windows.
167                 return null;
168             }
169         }
170         return localAddress;
171     }
172
173     /**
174      * @deprecated no use-case for this.
175      */

176     @Deprecated
177     protected void invalidateLocalAddress() {
178         localAddress = null;
179     }
180
181     @Override
182     public SocketAddress remoteAddress() {
183         SocketAddress remoteAddress = this.remoteAddress;
184         if (remoteAddress == null) {
185             try {
186                 this.remoteAddress = remoteAddress = unsafe().remoteAddress();
187             } catch (Error e) {
188                 throw e;
189             } catch (Throwable t) {
190                 // Sometimes fails on a closed socket in Windows.
191                 return null;
192             }
193         }
194         return remoteAddress;
195     }
196
197     /**
198      * @deprecated no use-case for this.
199      */

200     @Deprecated
201     protected void invalidateRemoteAddress() {
202         remoteAddress = null;
203     }
204
205     @Override
206     public boolean isRegistered() {
207         return registered;
208     }
209
210     @Override
211     public ChannelFuture bind(SocketAddress localAddress) {
212         return pipeline.bind(localAddress);
213     }
214
215     @Override
216     public ChannelFuture connect(SocketAddress remoteAddress) {
217         return pipeline.connect(remoteAddress);
218     }
219
220     @Override
221     public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
222         return pipeline.connect(remoteAddress, localAddress);
223     }
224
225     @Override
226     public ChannelFuture disconnect() {
227         return pipeline.disconnect();
228     }
229
230     @Override
231     public ChannelFuture close() {
232         return pipeline.close();
233     }
234
235     @Override
236     public ChannelFuture deregister() {
237         return pipeline.deregister();
238     }
239
240     @Override
241     public Channel flush() {
242         pipeline.flush();
243         return this;
244     }
245
246     @Override
247     public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
248         return pipeline.bind(localAddress, promise);
249     }
250
251     @Override
252     public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
253         return pipeline.connect(remoteAddress, promise);
254     }
255
256     @Override
257     public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
258         return pipeline.connect(remoteAddress, localAddress, promise);
259     }
260
261     @Override
262     public ChannelFuture disconnect(ChannelPromise promise) {
263         return pipeline.disconnect(promise);
264     }
265
266     @Override
267     public ChannelFuture close(ChannelPromise promise) {
268         return pipeline.close(promise);
269     }
270
271     @Override
272     public ChannelFuture deregister(ChannelPromise promise) {
273         return pipeline.deregister(promise);
274     }
275
276     @Override
277     public Channel read() {
278         pipeline.read();
279         return this;
280     }
281
282     @Override
283     public ChannelFuture write(Object msg) {
284         return pipeline.write(msg);
285     }
286
287     @Override
288     public ChannelFuture write(Object msg, ChannelPromise promise) {
289         return pipeline.write(msg, promise);
290     }
291
292     @Override
293     public ChannelFuture writeAndFlush(Object msg) {
294         return pipeline.writeAndFlush(msg);
295     }
296
297     @Override
298     public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
299         return pipeline.writeAndFlush(msg, promise);
300     }
301
302     @Override
303     public ChannelPromise newPromise() {
304         return pipeline.newPromise();
305     }
306
307     @Override
308     public ChannelProgressivePromise newProgressivePromise() {
309         return pipeline.newProgressivePromise();
310     }
311
312     @Override
313     public ChannelFuture newSucceededFuture() {
314         return pipeline.newSucceededFuture();
315     }
316
317     @Override
318     public ChannelFuture newFailedFuture(Throwable cause) {
319         return pipeline.newFailedFuture(cause);
320     }
321
322     @Override
323     public ChannelFuture closeFuture() {
324         return closeFuture;
325     }
326
327     @Override
328     public Unsafe unsafe() {
329         return unsafe;
330     }
331
332     /**
333      * Create a new {@link AbstractUnsafe} instance which will be used for the life-time of the {@link Channel}
334      */

335     protected abstract AbstractUnsafe newUnsafe();
336
337     /**
338      * Returns the ID of this channel.
339      */

340     @Override
341     public final int hashCode() {
342         return id.hashCode();
343     }
344
345     /**
346      * Returns {@code trueif and only if the specified object is identical
347      * with this channel (i.e: {@code this == o}).
348      */

349     @Override
350     public final boolean equals(Object o) {
351         return this == o;
352     }
353
354     @Override
355     public final int compareTo(Channel o) {
356         if (this == o) {
357             return 0;
358         }
359
360         return id().compareTo(o.id());
361     }
362
363     /**
364      * Returns the {@link String} representation of this channel.  The returned
365      * string contains the {@linkplain #hashCode() ID}, {@linkplain #localAddress() local address},
366      * and {@linkplain #remoteAddress() remote address} of this channel for
367      * easier identification.
368      */

369     @Override
370     public String toString() {
371         boolean active = isActive();
372         if (strValActive == active && strVal != null) {
373             return strVal;
374         }
375
376         SocketAddress remoteAddr = remoteAddress();
377         SocketAddress localAddr = localAddress();
378         if (remoteAddr != null) {
379             StringBuilder buf = new StringBuilder(96)
380                 .append("[id: 0x")
381                 .append(id.asShortText())
382                 .append(", L:")
383                 .append(localAddr)
384                 .append(active? " - " : " ! ")
385                 .append("R:")
386                 .append(remoteAddr)
387                 .append(']');
388             strVal = buf.toString();
389         } else if (localAddr != null) {
390             StringBuilder buf = new StringBuilder(64)
391                 .append("[id: 0x")
392                 .append(id.asShortText())
393                 .append(", L:")
394                 .append(localAddr)
395                 .append(']');
396             strVal = buf.toString();
397         } else {
398             StringBuilder buf = new StringBuilder(16)
399                 .append("[id: 0x")
400                 .append(id.asShortText())
401                 .append(']');
402             strVal = buf.toString();
403         }
404
405         strValActive = active;
406         return strVal;
407     }
408
409     @Override
410     public final ChannelPromise voidPromise() {
411         return pipeline.voidPromise();
412     }
413
414     /**
415      * {@link Unsafe} implementation which sub-classes must extend and use.
416      */

417     protected abstract class AbstractUnsafe implements Unsafe {
418
419         private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);
420         private RecvByteBufAllocator.Handle recvHandle;
421         private boolean inFlush0;
422         /** true if the channel has never been registered, false otherwise */
423         private boolean neverRegistered = true;
424
425         private void assertEventLoop() {
426             assert !registered || eventLoop.inEventLoop();
427         }
428
429         @Override
430         public RecvByteBufAllocator.Handle recvBufAllocHandle() {
431             if (recvHandle == null) {
432                 recvHandle = config().getRecvByteBufAllocator().newHandle();
433             }
434             return recvHandle;
435         }
436
437         @Override
438         public final ChannelOutboundBuffer outboundBuffer() {
439             return outboundBuffer;
440         }
441
442         @Override
443         public final SocketAddress localAddress() {
444             return localAddress0();
445         }
446
447         @Override
448         public final SocketAddress remoteAddress() {
449             return remoteAddress0();
450         }
451
452         @Override
453         public final void register(EventLoop eventLoop, final ChannelPromise promise) {
454             ObjectUtil.checkNotNull(eventLoop, "eventLoop");
455             if (isRegistered()) {
456                 promise.setFailure(new IllegalStateException("registered to an event loop already"));
457                 return;
458             }
459             if (!isCompatible(eventLoop)) {
460                 promise.setFailure(
461                         new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
462                 return;
463             }
464
465             AbstractChannel.this.eventLoop = eventLoop;
466
467             if (eventLoop.inEventLoop()) {
468                 register0(promise);
469             } else {
470                 try {
471                     eventLoop.execute(new Runnable() {
472                         @Override
473                         public void run() {
474                             register0(promise);
475                         }
476                     });
477                 } catch (Throwable t) {
478                     logger.warn(
479                             "Force-closing a channel whose registration task was not accepted by an event loop: {}",
480                             AbstractChannel.this, t);
481                     closeForcibly();
482                     closeFuture.setClosed();
483                     safeSetFailure(promise, t);
484                 }
485             }
486         }
487
488         private void register0(ChannelPromise promise) {
489             try {
490                 // check if the channel is still open as it could be closed in the mean time when the register
491                 // call was outside of the eventLoop
492                 if (!promise.setUncancellable() || !ensureOpen(promise)) {
493                     return;
494                 }
495                 boolean firstRegistration = neverRegistered;
496                 doRegister();
497                 neverRegistered = false;
498                 registered = true;
499
500                 // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
501                 // user may already fire events through the pipeline in the ChannelFutureListener.
502                 pipeline.invokeHandlerAddedIfNeeded();
503
504                 safeSetSuccess(promise);
505                 pipeline.fireChannelRegistered();
506                 // Only fire a channelActive if the channel has never been registered. This prevents firing
507                 // multiple channel actives if the channel is deregistered and re-registered.
508                 if (isActive()) {
509                     if (firstRegistration) {
510                         pipeline.fireChannelActive();
511                     } else if (config().isAutoRead()) {
512                         // This channel was registered before and autoRead() is set. This means we need to begin read
513                         // again so that we process inbound data.
514                         //
515                         // See https://github.com/netty/netty/issues/4805
516                         beginRead();
517                     }
518                 }
519             } catch (Throwable t) {
520                 // Close the channel directly to avoid FD leak.
521                 closeForcibly();
522                 closeFuture.setClosed();
523                 safeSetFailure(promise, t);
524             }
525         }
526
527         @Override
528         public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
529             assertEventLoop();
530
531             if (!promise.setUncancellable() || !ensureOpen(promise)) {
532                 return;
533             }
534
535             // See: https://github.com/netty/netty/issues/576
536             if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
537                 localAddress instanceof InetSocketAddress &&
538                 !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
539                 !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
540                 // Warn a user about the fact that a non-root user can't receive a
541                 // broadcast packet on *nix if the socket is bound on non-wildcard address.
542                 logger.warn(
543                         "A non-root user can't receive a broadcast packet if the socket " +
544                         "is not bound to a wildcard address; binding to a non-wildcard " +
545                         "address (" + localAddress + ") anyway as requested.");
546             }
547
548             boolean wasActive = isActive();
549             try {
550                 doBind(localAddress);
551             } catch (Throwable t) {
552                 safeSetFailure(promise, t);
553                 closeIfClosed();
554                 return;
555             }
556
557             if (!wasActive && isActive()) {
558                 invokeLater(new Runnable() {
559                     @Override
560                     public void run() {
561                         pipeline.fireChannelActive();
562                     }
563                 });
564             }
565
566             safeSetSuccess(promise);
567         }
568
569         @Override
570         public final void disconnect(final ChannelPromise promise) {
571             assertEventLoop();
572
573             if (!promise.setUncancellable()) {
574                 return;
575             }
576
577             boolean wasActive = isActive();
578             try {
579                 doDisconnect();
580                 // Reset remoteAddress and localAddress
581                 remoteAddress = null;
582                 localAddress = null;
583             } catch (Throwable t) {
584                 safeSetFailure(promise, t);
585                 closeIfClosed();
586                 return;
587             }
588
589             if (wasActive && !isActive()) {
590                 invokeLater(new Runnable() {
591                     @Override
592                     public void run() {
593                         pipeline.fireChannelInactive();
594                     }
595                 });
596             }
597
598             safeSetSuccess(promise);
599             closeIfClosed(); // doDisconnect() might have closed the channel
600         }
601
602         @Override
603         public final void close(final ChannelPromise promise) {
604             assertEventLoop();
605
606             ClosedChannelException closedChannelException =
607                     StacklessClosedChannelException.newInstance(AbstractChannel.class"close(ChannelPromise)");
608             close(promise, closedChannelException, closedChannelException, false);
609         }
610
611         /**
612          * Shutdown the output portion of the corresponding {@link Channel}.
613          * For example this will clean up the {@link ChannelOutboundBuffer} and not allow any more writes.
614          */

615         @UnstableApi
616         public final void shutdownOutput(final ChannelPromise promise) {
617             assertEventLoop();
618             shutdownOutput(promise, null);
619         }
620
621         /**
622          * Shutdown the output portion of the corresponding {@link Channel}.
623          * For example this will clean up the {@link ChannelOutboundBuffer} and not allow any more writes.
624          * @param cause The cause which may provide rational for the shutdown.
625          */

626         private void shutdownOutput(final ChannelPromise promise, Throwable cause) {
627             if (!promise.setUncancellable()) {
628                 return;
629             }
630
631             final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
632             if (outboundBuffer == null) {
633                 promise.setFailure(new ClosedChannelException());
634                 return;
635             }
636             this.outboundBuffer = null// Disallow adding any messages and flushes to outboundBuffer.
637
638             final Throwable shutdownCause = cause == null ?
639                     new ChannelOutputShutdownException("Channel output shutdown") :
640                     new ChannelOutputShutdownException("Channel output shutdown", cause);
641             Executor closeExecutor = prepareToClose();
642             if (closeExecutor != null) {
643                 closeExecutor.execute(new Runnable() {
644                     @Override
645                     public void run() {
646                         try {
647                             // Execute the shutdown.
648                             doShutdownOutput();
649                             promise.setSuccess();
650                         } catch (Throwable err) {
651                             promise.setFailure(err);
652                         } finally {
653                             // Dispatch to the EventLoop
654                             eventLoop().execute(new Runnable() {
655                                 @Override
656                                 public void run() {
657                                     closeOutboundBufferForShutdown(pipeline, outboundBuffer, shutdownCause);
658                                 }
659                             });
660                         }
661                     }
662                 });
663             } else {
664                 try {
665                     // Execute the shutdown.
666                     doShutdownOutput();
667                     promise.setSuccess();
668                 } catch (Throwable err) {
669                     promise.setFailure(err);
670                 } finally {
671                     closeOutboundBufferForShutdown(pipeline, outboundBuffer, shutdownCause);
672                 }
673             }
674         }
675
676         private void closeOutboundBufferForShutdown(
677                 ChannelPipeline pipeline, ChannelOutboundBuffer buffer, Throwable cause) {
678             buffer.failFlushed(cause, false);
679             buffer.close(cause, true);
680             pipeline.fireUserEventTriggered(ChannelOutputShutdownEvent.INSTANCE);
681         }
682
683         private void close(final ChannelPromise promise, final Throwable cause,
684                            final ClosedChannelException closeCause, final boolean notify) {
685             if (!promise.setUncancellable()) {
686                 return;
687             }
688
689             if (closeInitiated) {
690                 if (closeFuture.isDone()) {
691                     // Closed already.
692                     safeSetSuccess(promise);
693                 } else if (!(promise instanceof VoidChannelPromise)) { // Only needed if no VoidChannelPromise.
694                     // This means close() was called before so we just register a listener and return
695                     closeFuture.addListener(new ChannelFutureListener() {
696                         @Override
697                         public void operationComplete(ChannelFuture future) throws Exception {
698                             promise.setSuccess();
699                         }
700                     });
701                 }
702                 return;
703             }
704
705             closeInitiated = true;
706
707             final boolean wasActive = isActive();
708             final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
709             this.outboundBuffer = null// Disallow adding any messages and flushes to outboundBuffer.
710             Executor closeExecutor = prepareToClose();
711             if (closeExecutor != null) {
712                 closeExecutor.execute(new Runnable() {
713                     @Override
714                     public void run() {
715                         try {
716                             // Execute the close.
717                             doClose0(promise);
718                         } finally {
719                             // Call invokeLater so closeAndDeregister is executed in the EventLoop again!
720                             invokeLater(new Runnable() {
721                                 @Override
722                                 public void run() {
723                                     if (outboundBuffer != null) {
724                                         // Fail all the queued messages
725                                         outboundBuffer.failFlushed(cause, notify);
726                                         outboundBuffer.close(closeCause);
727                                     }
728                                     fireChannelInactiveAndDeregister(wasActive);
729                                 }
730                             });
731                         }
732                     }
733                 });
734             } else {
735                 try {
736                     // Close the channel and fail the queued messages in all cases.
737                     doClose0(promise);
738                 } finally {
739                     if (outboundBuffer != null) {
740                         // Fail all the queued messages.
741                         outboundBuffer.failFlushed(cause, notify);
742                         outboundBuffer.close(closeCause);
743                     }
744                 }
745                 if (inFlush0) {
746                     invokeLater(new Runnable() {
747                         @Override
748                         public void run() {
749                             fireChannelInactiveAndDeregister(wasActive);
750                         }
751                     });
752                 } else {
753                     fireChannelInactiveAndDeregister(wasActive);
754                 }
755             }
756         }
757
758         private void doClose0(ChannelPromise promise) {
759             try {
760                 doClose();
761                 closeFuture.setClosed();
762                 safeSetSuccess(promise);
763             } catch (Throwable t) {
764                 closeFuture.setClosed();
765                 safeSetFailure(promise, t);
766             }
767         }
768
769         private void fireChannelInactiveAndDeregister(final boolean wasActive) {
770             deregister(voidPromise(), wasActive && !isActive());
771         }
772
773         @Override
774         public final void closeForcibly() {
775             assertEventLoop();
776
777             try {
778                 doClose();
779             } catch (Exception e) {
780                 logger.warn("Failed to close a channel.", e);
781             }
782         }
783
784         @Override
785         public final void deregister(final ChannelPromise promise) {
786             assertEventLoop();
787
788             deregister(promise, false);
789         }
790
791         private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) {
792             if (!promise.setUncancellable()) {
793                 return;
794             }
795
796             if (!registered) {
797                 safeSetSuccess(promise);
798                 return;
799             }
800
801             // As a user may call deregister() from within any method while doing processing in the ChannelPipeline,
802             // we need to ensure we do the actual deregister operation later. This is needed as for example,
803             // we may be in the ByteToMessageDecoder.callDecode(...) method and so still try to do processing in
804             // the old EventLoop while the user already registered the Channel to a new EventLoop. Without delay,
805             // the deregister operation this could lead to have a handler invoked by different EventLoop and so
806             // threads.
807             //
808             // See:
809             // https://github.com/netty/netty/issues/4435
810             invokeLater(new Runnable() {
811                 @Override
812                 public void run() {
813                     try {
814                         doDeregister();
815                     } catch (Throwable t) {
816                         logger.warn("Unexpected exception occurred while deregistering a channel.", t);
817                     } finally {
818                         if (fireChannelInactive) {
819                             pipeline.fireChannelInactive();
820                         }
821                         // Some transports like local and AIO does not allow the deregistration of
822                         // an open channel.  Their doDeregister() calls close(). Consequently,
823                         // close() calls deregister() again - no need to fire channelUnregistered, so check
824                         // if it was registered.
825                         if (registered) {
826                             registered = false;
827                             pipeline.fireChannelUnregistered();
828                         }
829                         safeSetSuccess(promise);
830                     }
831                 }
832             });
833         }
834
835         @Override
836         public final void beginRead() {
837             assertEventLoop();
838
839             if (!isActive()) {
840                 return;
841             }
842
843             try {
844                 doBeginRead();
845             } catch (final Exception e) {
846                 invokeLater(new Runnable() {
847                     @Override
848                     public void run() {
849                         pipeline.fireExceptionCaught(e);
850                     }
851                 });
852                 close(voidPromise());
853             }
854         }
855
856         @Override
857         public final void write(Object msg, ChannelPromise promise) {
858             assertEventLoop();
859
860             ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
861             if (outboundBuffer == null) {
862                 // If the outboundBuffer is null we know the channel was closed and so
863                 // need to fail the future right away. If it is not null the handling of the rest
864                 // will be done in flush0()
865                 // See https://github.com/netty/netty/issues/2362
866                 safeSetFailure(promise, newClosedChannelException(initialCloseCause, "write(Object, ChannelPromise)"));
867                 // release message now to prevent resource-leak
868                 ReferenceCountUtil.release(msg);
869                 return;
870             }
871
872             int size;
873             try {
874                 msg = filterOutboundMessage(msg);
875                 size = pipeline.estimatorHandle().size(msg);
876                 if (size < 0) {
877                     size = 0;
878                 }
879             } catch (Throwable t) {
880                 safeSetFailure(promise, t);
881                 ReferenceCountUtil.release(msg);
882                 return;
883             }
884
885             outboundBuffer.addMessage(msg, size, promise);
886         }
887
888         @Override
889         public final void flush() {
890             assertEventLoop();
891
892             ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
893             if (outboundBuffer == null) {
894                 return;
895             }
896
897             outboundBuffer.addFlush();
898             flush0();
899         }
900
901         @SuppressWarnings("deprecation")
902         protected void flush0() {
903             if (inFlush0) {
904                 // Avoid re-entrance
905                 return;
906             }
907
908             final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
909             if (outboundBuffer == null || outboundBuffer.isEmpty()) {
910                 return;
911             }
912
913             inFlush0 = true;
914
915             // Mark all pending write requests as failure if the channel is inactive.
916             if (!isActive()) {
917                 try {
918                     // Check if we need to generate the exception at all.
919                     if (!outboundBuffer.isEmpty()) {
920                         if (isOpen()) {
921                             outboundBuffer.failFlushed(new NotYetConnectedException(), true);
922                         } else {
923                             // Do not trigger channelWritabilityChanged because the channel is closed already.
924                             outboundBuffer.failFlushed(newClosedChannelException(initialCloseCause, "flush0()"), false);
925                         }
926                     }
927                 } finally {
928                     inFlush0 = false;
929                 }
930                 return;
931             }
932
933             try {
934                 doWrite(outboundBuffer);
935             } catch (Throwable t) {
936                 if (t instanceof IOException && config().isAutoClose()) {
937                     /**
938                      * Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
939                      * failing all flushed messages and also ensure the actual close of the underlying transport
940                      * will happen before the promises are notified.
941                      *
942                      * This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
943                      * may still return {@code true} even if the channel should be closed as result of the exception.
944                      */

945                     initialCloseCause = t;
946                     close(voidPromise(), t, newClosedChannelException(t, "flush0()"), false);
947                 } else {
948                     try {
949                         shutdownOutput(voidPromise(), t);
950                     } catch (Throwable t2) {
951                         initialCloseCause = t;
952                         close(voidPromise(), t2, newClosedChannelException(t, "flush0()"), false);
953                     }
954                 }
955             } finally {
956                 inFlush0 = false;
957             }
958         }
959
960         private ClosedChannelException newClosedChannelException(Throwable cause, String method) {
961             ClosedChannelException exception =
962                     StacklessClosedChannelException.newInstance(AbstractChannel.AbstractUnsafe.class, method);
963             if (cause != null) {
964                 exception.initCause(cause);
965             }
966             return exception;
967         }
968
969         @Override
970         public final ChannelPromise voidPromise() {
971             assertEventLoop();
972
973             return unsafeVoidPromise;
974         }
975
976         protected final boolean ensureOpen(ChannelPromise promise) {
977             if (isOpen()) {
978                 return true;
979             }
980
981             safeSetFailure(promise, newClosedChannelException(initialCloseCause, "ensureOpen(ChannelPromise)"));
982             return false;
983         }
984
985         /**
986          * Marks the specified {@code promise} as success.  If the {@code promise} is done already, log a message.
987          */

988         protected final void safeSetSuccess(ChannelPromise promise) {
989             if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
990                 logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
991             }
992         }
993
994         /**
995          * Marks the specified {@code promise} as failure.  If the {@code promise} is done already, log a message.
996          */

997         protected final void safeSetFailure(ChannelPromise promise, Throwable cause) {
998             if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) {
999                 logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause);
1000             }
1001         }
1002
1003         protected final void closeIfClosed() {
1004             if (isOpen()) {
1005                 return;
1006             }
1007             close(voidPromise());
1008         }
1009
1010         private void invokeLater(Runnable task) {
1011             try {
1012                 // This method is used by outbound operation implementations to trigger an inbound event later.
1013                 // They do not trigger an inbound event immediately because an outbound operation might have been
1014                 // triggered by another inbound event handler method.  If fired immediately, the call stack
1015                 // will look like this for example:
1016                 //
1017                 //   handlerA.inboundBufferUpdated() - (1) an inbound handler method closes a connection.
1018                 //   -> handlerA.ctx.close()
1019                 //      -> channel.unsafe.close()
1020                 //         -> handlerA.channelInactive() - (2) another inbound handler method called while in (1) yet
1021                 //
1022                 // which means the execution of two inbound handler methods of the same handler overlap undesirably.
1023                 eventLoop().execute(task);
1024             } catch (RejectedExecutionException e) {
1025                 logger.warn("Can't invoke task later as EventLoop rejected it", e);
1026             }
1027         }
1028
1029         /**
1030          * Appends the remote address to the message of the exceptions caused by connection attempt failure.
1031          */

1032         protected final Throwable annotateConnectException(Throwable cause, SocketAddress remoteAddress) {
1033             if (cause instanceof ConnectException) {
1034                 return new AnnotatedConnectException((ConnectException) cause, remoteAddress);
1035             }
1036             if (cause instanceof NoRouteToHostException) {
1037                 return new AnnotatedNoRouteToHostException((NoRouteToHostException) cause, remoteAddress);
1038             }
1039             if (cause instanceof SocketException) {
1040                 return new AnnotatedSocketException((SocketException) cause, remoteAddress);
1041             }
1042
1043             return cause;
1044         }
1045
1046         /**
1047          * Prepares to close the {@link Channel}. If this method returns an {@link Executor}, the
1048          * caller must call the {@link Executor#execute(Runnable)} method with a task that calls
1049          * {@link #doClose()} on the returned {@link Executor}. If this method returns {@code null},
1050          * {@link #doClose()} must be called from the caller thread. (i.e. {@link EventLoop})
1051          */

1052         protected Executor prepareToClose() {
1053             return null;
1054         }
1055     }
1056
1057     /**
1058      * Return {@code trueif the given {@link EventLoop} is compatible with this instance.
1059      */

1060     protected abstract boolean isCompatible(EventLoop loop);
1061
1062     /**
1063      * Returns the {@link SocketAddress} which is bound locally.
1064      */

1065     protected abstract SocketAddress localAddress0();
1066
1067     /**
1068      * Return the {@link SocketAddress} which the {@link Channel} is connected to.
1069      */

1070     protected abstract SocketAddress remoteAddress0();
1071
1072     /**
1073      * Is called after the {@link Channel} is registered with its {@link EventLoop} as part of the register process.
1074      *
1075      * Sub-classes may override this method
1076      */

1077     protected void doRegister() throws Exception {
1078         // NOOP
1079     }
1080
1081     /**
1082      * Bind the {@link Channel} to the {@link SocketAddress}
1083      */

1084     protected abstract void doBind(SocketAddress localAddress) throws Exception;
1085
1086     /**
1087      * Disconnect this {@link Channel} from its remote peer
1088      */

1089     protected abstract void doDisconnect() throws Exception;
1090
1091     /**
1092      * Close the {@link Channel}
1093      */

1094     protected abstract void doClose() throws Exception;
1095
1096     /**
1097      * Called when conditions justify shutting down the output portion of the channel. This may happen if a write
1098      * operation throws an exception.
1099      */

1100     @UnstableApi
1101     protected void doShutdownOutput() throws Exception {
1102         doClose();
1103     }
1104
1105     /**
1106      * Deregister the {@link Channel} from its {@link EventLoop}.
1107      *
1108      * Sub-classes may override this method
1109      */

1110     protected void doDeregister() throws Exception {
1111         // NOOP
1112     }
1113
1114     /**
1115      * Schedule a read operation.
1116      */

1117     protected abstract void doBeginRead() throws Exception;
1118
1119     /**
1120      * Flush the content of the given buffer to the remote peer.
1121      */

1122     protected abstract void doWrite(ChannelOutboundBuffer in) throws Exception;
1123
1124     /**
1125      * Invoked when a new message is added to a {@link ChannelOutboundBuffer} of this {@link AbstractChannel}, so that
1126      * the {@link Channel} implementation converts the message to another. (e.g. heap buffer -> direct buffer)
1127      */

1128     protected Object filterOutboundMessage(Object msg) throws Exception {
1129         return msg;
1130     }
1131
1132     protected void validateFileRegion(DefaultFileRegion region, long position) throws IOException {
1133         DefaultFileRegion.validate(region, position);
1134     }
1135
1136     static final class CloseFuture extends DefaultChannelPromise {
1137
1138         CloseFuture(AbstractChannel ch) {
1139             super(ch);
1140         }
1141
1142         @Override
1143         public ChannelPromise setSuccess() {
1144             throw new IllegalStateException();
1145         }
1146
1147         @Override
1148         public ChannelPromise setFailure(Throwable cause) {
1149             throw new IllegalStateException();
1150         }
1151
1152         @Override
1153         public boolean trySuccess() {
1154             throw new IllegalStateException();
1155         }
1156
1157         @Override
1158         public boolean tryFailure(Throwable cause) {
1159             throw new IllegalStateException();
1160         }
1161
1162         boolean setClosed() {
1163             return super.trySuccess();
1164         }
1165     }
1166
1167     private static final class AnnotatedConnectException extends ConnectException {
1168
1169         private static final long serialVersionUID = 3901958112696433556L;
1170
1171         AnnotatedConnectException(ConnectException exception, SocketAddress remoteAddress) {
1172             super(exception.getMessage() + ": " + remoteAddress);
1173             initCause(exception);
1174         }
1175
1176         @Override
1177         public Throwable fillInStackTrace() {
1178             return this;
1179         }
1180     }
1181
1182     private static final class AnnotatedNoRouteToHostException extends NoRouteToHostException {
1183
1184         private static final long serialVersionUID = -6801433937592080623L;
1185
1186         AnnotatedNoRouteToHostException(NoRouteToHostException exception, SocketAddress remoteAddress) {
1187             super(exception.getMessage() + ": " + remoteAddress);
1188             initCause(exception);
1189         }
1190
1191         @Override
1192         public Throwable fillInStackTrace() {
1193             return this;
1194         }
1195     }
1196
1197     private static final class AnnotatedSocketException extends SocketException {
1198
1199         private static final long serialVersionUID = 3896743275010454039L;
1200
1201         AnnotatedSocketException(SocketException exception, SocketAddress remoteAddress) {
1202             super(exception.getMessage() + ": " + remoteAddress);
1203             initCause(exception);
1204         }
1205
1206         @Override
1207         public Throwable fillInStackTrace() {
1208             return this;
1209         }
1210     }
1211 }
1212