1 /*
2  * Copyright 2012 The Netty Project
3  *
4  * The Netty Project licenses this file to you under the Apache License,
5  * version 2.0 (the "License"); you may not use this file except in compliance
6  * with the License. You may obtain a copy of the License at:
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13  * License for the specific language governing permissions and limitations
14  * under the License.
15  */

16 package io.netty.channel;
17
18 import io.netty.buffer.ByteBufAllocator;
19 import io.netty.util.Attribute;
20 import io.netty.util.AttributeKey;
21 import io.netty.util.ReferenceCountUtil;
22 import io.netty.util.ResourceLeakHint;
23 import io.netty.util.concurrent.AbstractEventExecutor;
24 import io.netty.util.concurrent.EventExecutor;
25 import io.netty.util.concurrent.OrderedEventExecutor;
26 import io.netty.util.internal.ObjectPool;
27 import io.netty.util.internal.ObjectPool.Handle;
28 import io.netty.util.internal.ObjectPool.ObjectCreator;
29 import io.netty.util.internal.PromiseNotificationUtil;
30 import io.netty.util.internal.ThrowableUtil;
31 import io.netty.util.internal.ObjectUtil;
32 import io.netty.util.internal.StringUtil;
33 import io.netty.util.internal.SystemPropertyUtil;
34 import io.netty.util.internal.logging.InternalLogger;
35 import io.netty.util.internal.logging.InternalLoggerFactory;
36
37 import java.net.SocketAddress;
38 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
39
40 import static io.netty.channel.ChannelHandlerMask.MASK_BIND;
41 import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_ACTIVE;
42 import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_INACTIVE;
43 import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_READ;
44 import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_READ_COMPLETE;
45 import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_REGISTERED;
46 import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_UNREGISTERED;
47 import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_WRITABILITY_CHANGED;
48 import static io.netty.channel.ChannelHandlerMask.MASK_CLOSE;
49 import static io.netty.channel.ChannelHandlerMask.MASK_CONNECT;
50 import static io.netty.channel.ChannelHandlerMask.MASK_DEREGISTER;
51 import static io.netty.channel.ChannelHandlerMask.MASK_DISCONNECT;
52 import static io.netty.channel.ChannelHandlerMask.MASK_EXCEPTION_CAUGHT;
53 import static io.netty.channel.ChannelHandlerMask.MASK_FLUSH;
54 import static io.netty.channel.ChannelHandlerMask.MASK_ONLY_INBOUND;
55 import static io.netty.channel.ChannelHandlerMask.MASK_ONLY_OUTBOUND;
56 import static io.netty.channel.ChannelHandlerMask.MASK_READ;
57 import static io.netty.channel.ChannelHandlerMask.MASK_USER_EVENT_TRIGGERED;
58 import static io.netty.channel.ChannelHandlerMask.MASK_WRITE;
59 import static io.netty.channel.ChannelHandlerMask.mask;
60
61 abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
62
63     private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannelHandlerContext.class);
64     volatile AbstractChannelHandlerContext next;
65     volatile AbstractChannelHandlerContext prev;
66
67     private static final AtomicIntegerFieldUpdater<AbstractChannelHandlerContext> HANDLER_STATE_UPDATER =
68             AtomicIntegerFieldUpdater.newUpdater(AbstractChannelHandlerContext.class"handlerState");
69
70     /**
71      * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} is about to be called.
72      */

73     private static final int ADD_PENDING = 1;
74     /**
75      * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called.
76      */

77     private static final int ADD_COMPLETE = 2;
78     /**
79      * {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
80      */

81     private static final int REMOVE_COMPLETE = 3;
82     /**
83      * Neither {@link ChannelHandler#handlerAdded(ChannelHandlerContext)}
84      * nor {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
85      */

86     private static final int INIT = 0;
87
88     private final DefaultChannelPipeline pipeline;
89     private final String name;
90     private final boolean ordered;
91     private final int executionMask;
92
93     // Will be set to null if no child executor should be used, otherwise it will be set to the
94     // child executor.
95     final EventExecutor executor;
96     private ChannelFuture succeededFuture;
97
98     // Lazily instantiated tasks used to trigger events to a handler with different executor.
99     // There is no need to make this volatile as at worse it will just create a few more instances then needed.
100     private Tasks invokeTasks;
101
102     private volatile int handlerState = INIT;
103
104     AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor,
105                                   String name, Class<? extends ChannelHandler> handlerClass) {
106         this.name = ObjectUtil.checkNotNull(name, "name");
107         this.pipeline = pipeline;
108         this.executor = executor;
109         this.executionMask = mask(handlerClass);
110         // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
111         ordered = executor == null || executor instanceof OrderedEventExecutor;
112     }
113
114     @Override
115     public Channel channel() {
116         return pipeline.channel();
117     }
118
119     @Override
120     public ChannelPipeline pipeline() {
121         return pipeline;
122     }
123
124     @Override
125     public ByteBufAllocator alloc() {
126         return channel().config().getAllocator();
127     }
128
129     @Override
130     public EventExecutor executor() {
131         if (executor == null) {
132             return channel().eventLoop();
133         } else {
134             return executor;
135         }
136     }
137
138     @Override
139     public String name() {
140         return name;
141     }
142
143     @Override
144     public ChannelHandlerContext fireChannelRegistered() {
145         invokeChannelRegistered(findContextInbound(MASK_CHANNEL_REGISTERED));
146         return this;
147     }
148
149     static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
150         EventExecutor executor = next.executor();
151         if (executor.inEventLoop()) {
152             next.invokeChannelRegistered();
153         } else {
154             executor.execute(new Runnable() {
155                 @Override
156                 public void run() {
157                     next.invokeChannelRegistered();
158                 }
159             });
160         }
161     }
162
163     private void invokeChannelRegistered() {
164         if (invokeHandler()) {
165             try {
166                 ((ChannelInboundHandler) handler()).channelRegistered(this);
167             } catch (Throwable t) {
168                 invokeExceptionCaught(t);
169             }
170         } else {
171             fireChannelRegistered();
172         }
173     }
174
175     @Override
176     public ChannelHandlerContext fireChannelUnregistered() {
177         invokeChannelUnregistered(findContextInbound(MASK_CHANNEL_UNREGISTERED));
178         return this;
179     }
180
181     static void invokeChannelUnregistered(final AbstractChannelHandlerContext next) {
182         EventExecutor executor = next.executor();
183         if (executor.inEventLoop()) {
184             next.invokeChannelUnregistered();
185         } else {
186             executor.execute(new Runnable() {
187                 @Override
188                 public void run() {
189                     next.invokeChannelUnregistered();
190                 }
191             });
192         }
193     }
194
195     private void invokeChannelUnregistered() {
196         if (invokeHandler()) {
197             try {
198                 ((ChannelInboundHandler) handler()).channelUnregistered(this);
199             } catch (Throwable t) {
200                 invokeExceptionCaught(t);
201             }
202         } else {
203             fireChannelUnregistered();
204         }
205     }
206
207     @Override
208     public ChannelHandlerContext fireChannelActive() {
209         invokeChannelActive(findContextInbound(MASK_CHANNEL_ACTIVE));
210         return this;
211     }
212
213     static void invokeChannelActive(final AbstractChannelHandlerContext next) {
214         EventExecutor executor = next.executor();
215         if (executor.inEventLoop()) {
216             next.invokeChannelActive();
217         } else {
218             executor.execute(new Runnable() {
219                 @Override
220                 public void run() {
221                     next.invokeChannelActive();
222                 }
223             });
224         }
225     }
226
227     private void invokeChannelActive() {
228         if (invokeHandler()) {
229             try {
230                 ((ChannelInboundHandler) handler()).channelActive(this);
231             } catch (Throwable t) {
232                 invokeExceptionCaught(t);
233             }
234         } else {
235             fireChannelActive();
236         }
237     }
238
239     @Override
240     public ChannelHandlerContext fireChannelInactive() {
241         invokeChannelInactive(findContextInbound(MASK_CHANNEL_INACTIVE));
242         return this;
243     }
244
245     static void invokeChannelInactive(final AbstractChannelHandlerContext next) {
246         EventExecutor executor = next.executor();
247         if (executor.inEventLoop()) {
248             next.invokeChannelInactive();
249         } else {
250             executor.execute(new Runnable() {
251                 @Override
252                 public void run() {
253                     next.invokeChannelInactive();
254                 }
255             });
256         }
257     }
258
259     private void invokeChannelInactive() {
260         if (invokeHandler()) {
261             try {
262                 ((ChannelInboundHandler) handler()).channelInactive(this);
263             } catch (Throwable t) {
264                 invokeExceptionCaught(t);
265             }
266         } else {
267             fireChannelInactive();
268         }
269     }
270
271     @Override
272     public ChannelHandlerContext fireExceptionCaught(final Throwable cause) {
273         invokeExceptionCaught(findContextInbound(MASK_EXCEPTION_CAUGHT), cause);
274         return this;
275     }
276
277     static void invokeExceptionCaught(final AbstractChannelHandlerContext next, final Throwable cause) {
278         ObjectUtil.checkNotNull(cause, "cause");
279         EventExecutor executor = next.executor();
280         if (executor.inEventLoop()) {
281             next.invokeExceptionCaught(cause);
282         } else {
283             try {
284                 executor.execute(new Runnable() {
285                     @Override
286                     public void run() {
287                         next.invokeExceptionCaught(cause);
288                     }
289                 });
290             } catch (Throwable t) {
291                 if (logger.isWarnEnabled()) {
292                     logger.warn("Failed to submit an exceptionCaught() event.", t);
293                     logger.warn("The exceptionCaught() event that was failed to submit was:", cause);
294                 }
295             }
296         }
297     }
298
299     private void invokeExceptionCaught(final Throwable cause) {
300         if (invokeHandler()) {
301             try {
302                 handler().exceptionCaught(this, cause);
303             } catch (Throwable error) {
304                 if (logger.isDebugEnabled()) {
305                     logger.debug(
306                         "An exception {}" +
307                         "was thrown by a user handler's exceptionCaught() " +
308                         "method while handling the following exception:",
309                         ThrowableUtil.stackTraceToString(error), cause);
310                 } else if (logger.isWarnEnabled()) {
311                     logger.warn(
312                         "An exception '{}' [enable DEBUG level for full stacktrace] " +
313                         "was thrown by a user handler's exceptionCaught() " +
314                         "method while handling the following exception:", error, cause);
315                 }
316             }
317         } else {
318             fireExceptionCaught(cause);
319         }
320     }
321
322     @Override
323     public ChannelHandlerContext fireUserEventTriggered(final Object event) {
324         invokeUserEventTriggered(findContextInbound(MASK_USER_EVENT_TRIGGERED), event);
325         return this;
326     }
327
328     static void invokeUserEventTriggered(final AbstractChannelHandlerContext next, final Object event) {
329         ObjectUtil.checkNotNull(event, "event");
330         EventExecutor executor = next.executor();
331         if (executor.inEventLoop()) {
332             next.invokeUserEventTriggered(event);
333         } else {
334             executor.execute(new Runnable() {
335                 @Override
336                 public void run() {
337                     next.invokeUserEventTriggered(event);
338                 }
339             });
340         }
341     }
342
343     private void invokeUserEventTriggered(Object event) {
344         if (invokeHandler()) {
345             try {
346                 ((ChannelInboundHandler) handler()).userEventTriggered(this, event);
347             } catch (Throwable t) {
348                 invokeExceptionCaught(t);
349             }
350         } else {
351             fireUserEventTriggered(event);
352         }
353     }
354
355     @Override
356     public ChannelHandlerContext fireChannelRead(final Object msg) {
357         invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
358         return this;
359     }
360
361     static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
362         final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
363         EventExecutor executor = next.executor();
364         if (executor.inEventLoop()) {
365             next.invokeChannelRead(m);
366         } else {
367             executor.execute(new Runnable() {
368                 @Override
369                 public void run() {
370                     next.invokeChannelRead(m);
371                 }
372             });
373         }
374     }
375
376     private void invokeChannelRead(Object msg) {
377         if (invokeHandler()) {
378             try {
379                 ((ChannelInboundHandler) handler()).channelRead(this, msg);
380             } catch (Throwable t) {
381                 invokeExceptionCaught(t);
382             }
383         } else {
384             fireChannelRead(msg);
385         }
386     }
387
388     @Override
389     public ChannelHandlerContext fireChannelReadComplete() {
390         invokeChannelReadComplete(findContextInbound(MASK_CHANNEL_READ_COMPLETE));
391         return this;
392     }
393
394     static void invokeChannelReadComplete(final AbstractChannelHandlerContext next) {
395         EventExecutor executor = next.executor();
396         if (executor.inEventLoop()) {
397             next.invokeChannelReadComplete();
398         } else {
399             Tasks tasks = next.invokeTasks;
400             if (tasks == null) {
401                 next.invokeTasks = tasks = new Tasks(next);
402             }
403             executor.execute(tasks.invokeChannelReadCompleteTask);
404         }
405     }
406
407     private void invokeChannelReadComplete() {
408         if (invokeHandler()) {
409             try {
410                 ((ChannelInboundHandler) handler()).channelReadComplete(this);
411             } catch (Throwable t) {
412                 invokeExceptionCaught(t);
413             }
414         } else {
415             fireChannelReadComplete();
416         }
417     }
418
419     @Override
420     public ChannelHandlerContext fireChannelWritabilityChanged() {
421         invokeChannelWritabilityChanged(findContextInbound(MASK_CHANNEL_WRITABILITY_CHANGED));
422         return this;
423     }
424
425     static void invokeChannelWritabilityChanged(final AbstractChannelHandlerContext next) {
426         EventExecutor executor = next.executor();
427         if (executor.inEventLoop()) {
428             next.invokeChannelWritabilityChanged();
429         } else {
430             Tasks tasks = next.invokeTasks;
431             if (tasks == null) {
432                 next.invokeTasks = tasks = new Tasks(next);
433             }
434             executor.execute(tasks.invokeChannelWritableStateChangedTask);
435         }
436     }
437
438     private void invokeChannelWritabilityChanged() {
439         if (invokeHandler()) {
440             try {
441                 ((ChannelInboundHandler) handler()).channelWritabilityChanged(this);
442             } catch (Throwable t) {
443                 invokeExceptionCaught(t);
444             }
445         } else {
446             fireChannelWritabilityChanged();
447         }
448     }
449
450     @Override
451     public ChannelFuture bind(SocketAddress localAddress) {
452         return bind(localAddress, newPromise());
453     }
454
455     @Override
456     public ChannelFuture connect(SocketAddress remoteAddress) {
457         return connect(remoteAddress, newPromise());
458     }
459
460     @Override
461     public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
462         return connect(remoteAddress, localAddress, newPromise());
463     }
464
465     @Override
466     public ChannelFuture disconnect() {
467         return disconnect(newPromise());
468     }
469
470     @Override
471     public ChannelFuture close() {
472         return close(newPromise());
473     }
474
475     @Override
476     public ChannelFuture deregister() {
477         return deregister(newPromise());
478     }
479
480     @Override
481     public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
482         ObjectUtil.checkNotNull(localAddress, "localAddress");
483         if (isNotValidPromise(promise, false)) {
484             // cancelled
485             return promise;
486         }
487
488         final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
489         EventExecutor executor = next.executor();
490         if (executor.inEventLoop()) {
491             next.invokeBind(localAddress, promise);
492         } else {
493             safeExecute(executor, new Runnable() {
494                 @Override
495                 public void run() {
496                     next.invokeBind(localAddress, promise);
497                 }
498             }, promise, nullfalse);
499         }
500         return promise;
501     }
502
503     private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
504         if (invokeHandler()) {
505             try {
506                 ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
507             } catch (Throwable t) {
508                 notifyOutboundHandlerException(t, promise);
509             }
510         } else {
511             bind(localAddress, promise);
512         }
513     }
514
515     @Override
516     public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
517         return connect(remoteAddress, null, promise);
518     }
519
520     @Override
521     public ChannelFuture connect(
522             final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
523         ObjectUtil.checkNotNull(remoteAddress, "remoteAddress");
524
525         if (isNotValidPromise(promise, false)) {
526             // cancelled
527             return promise;
528         }
529
530         final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT);
531         EventExecutor executor = next.executor();
532         if (executor.inEventLoop()) {
533             next.invokeConnect(remoteAddress, localAddress, promise);
534         } else {
535             safeExecute(executor, new Runnable() {
536                 @Override
537                 public void run() {
538                     next.invokeConnect(remoteAddress, localAddress, promise);
539                 }
540             }, promise, nullfalse);
541         }
542         return promise;
543     }
544
545     private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
546         if (invokeHandler()) {
547             try {
548                 ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
549             } catch (Throwable t) {
550                 notifyOutboundHandlerException(t, promise);
551             }
552         } else {
553             connect(remoteAddress, localAddress, promise);
554         }
555     }
556
557     @Override
558     public ChannelFuture disconnect(final ChannelPromise promise) {
559         if (!channel().metadata().hasDisconnect()) {
560             // Translate disconnect to close if the channel has no notion of disconnect-reconnect.
561             // So far, UDP/IP is the only transport that has such behavior.
562             return close(promise);
563         }
564         if (isNotValidPromise(promise, false)) {
565             // cancelled
566             return promise;
567         }
568
569         final AbstractChannelHandlerContext next = findContextOutbound(MASK_DISCONNECT);
570         EventExecutor executor = next.executor();
571         if (executor.inEventLoop()) {
572             next.invokeDisconnect(promise);
573         } else {
574             safeExecute(executor, new Runnable() {
575                 @Override
576                 public void run() {
577                     next.invokeDisconnect(promise);
578                 }
579             }, promise, nullfalse);
580         }
581         return promise;
582     }
583
584     private void invokeDisconnect(ChannelPromise promise) {
585         if (invokeHandler()) {
586             try {
587                 ((ChannelOutboundHandler) handler()).disconnect(this, promise);
588             } catch (Throwable t) {
589                 notifyOutboundHandlerException(t, promise);
590             }
591         } else {
592             disconnect(promise);
593         }
594     }
595
596     @Override
597     public ChannelFuture close(final ChannelPromise promise) {
598         if (isNotValidPromise(promise, false)) {
599             // cancelled
600             return promise;
601         }
602
603         final AbstractChannelHandlerContext next = findContextOutbound(MASK_CLOSE);
604         EventExecutor executor = next.executor();
605         if (executor.inEventLoop()) {
606             next.invokeClose(promise);
607         } else {
608             safeExecute(executor, new Runnable() {
609                 @Override
610                 public void run() {
611                     next.invokeClose(promise);
612                 }
613             }, promise, nullfalse);
614         }
615
616         return promise;
617     }
618
619     private void invokeClose(ChannelPromise promise) {
620         if (invokeHandler()) {
621             try {
622                 ((ChannelOutboundHandler) handler()).close(this, promise);
623             } catch (Throwable t) {
624                 notifyOutboundHandlerException(t, promise);
625             }
626         } else {
627             close(promise);
628         }
629     }
630
631     @Override
632     public ChannelFuture deregister(final ChannelPromise promise) {
633         if (isNotValidPromise(promise, false)) {
634             // cancelled
635             return promise;
636         }
637
638         final AbstractChannelHandlerContext next = findContextOutbound(MASK_DEREGISTER);
639         EventExecutor executor = next.executor();
640         if (executor.inEventLoop()) {
641             next.invokeDeregister(promise);
642         } else {
643             safeExecute(executor, new Runnable() {
644                 @Override
645                 public void run() {
646                     next.invokeDeregister(promise);
647                 }
648             }, promise, nullfalse);
649         }
650
651         return promise;
652     }
653
654     private void invokeDeregister(ChannelPromise promise) {
655         if (invokeHandler()) {
656             try {
657                 ((ChannelOutboundHandler) handler()).deregister(this, promise);
658             } catch (Throwable t) {
659                 notifyOutboundHandlerException(t, promise);
660             }
661         } else {
662             deregister(promise);
663         }
664     }
665
666     @Override
667     public ChannelHandlerContext read() {
668         final AbstractChannelHandlerContext next = findContextOutbound(MASK_READ);
669         EventExecutor executor = next.executor();
670         if (executor.inEventLoop()) {
671             next.invokeRead();
672         } else {
673             Tasks tasks = next.invokeTasks;
674             if (tasks == null) {
675                 next.invokeTasks = tasks = new Tasks(next);
676             }
677             executor.execute(tasks.invokeReadTask);
678         }
679
680         return this;
681     }
682
683     private void invokeRead() {
684         if (invokeHandler()) {
685             try {
686                 ((ChannelOutboundHandler) handler()).read(this);
687             } catch (Throwable t) {
688                 invokeExceptionCaught(t);
689             }
690         } else {
691             read();
692         }
693     }
694
695     @Override
696     public ChannelFuture write(Object msg) {
697         return write(msg, newPromise());
698     }
699
700     @Override
701     public ChannelFuture write(final Object msg, final ChannelPromise promise) {
702         write(msg, false, promise);
703
704         return promise;
705     }
706
707     void invokeWrite(Object msg, ChannelPromise promise) {
708         if (invokeHandler()) {
709             invokeWrite0(msg, promise);
710         } else {
711             write(msg, promise);
712         }
713     }
714
715     private void invokeWrite0(Object msg, ChannelPromise promise) {
716         try {
717             ((ChannelOutboundHandler) handler()).write(this, msg, promise);
718         } catch (Throwable t) {
719             notifyOutboundHandlerException(t, promise);
720         }
721     }
722
723     @Override
724     public ChannelHandlerContext flush() {
725         final AbstractChannelHandlerContext next = findContextOutbound(MASK_FLUSH);
726         EventExecutor executor = next.executor();
727         if (executor.inEventLoop()) {
728             next.invokeFlush();
729         } else {
730             Tasks tasks = next.invokeTasks;
731             if (tasks == null) {
732                 next.invokeTasks = tasks = new Tasks(next);
733             }
734             safeExecute(executor, tasks.invokeFlushTask, channel().voidPromise(), nullfalse);
735         }
736
737         return this;
738     }
739
740     private void invokeFlush() {
741         if (invokeHandler()) {
742             invokeFlush0();
743         } else {
744             flush();
745         }
746     }
747
748     private void invokeFlush0() {
749         try {
750             ((ChannelOutboundHandler) handler()).flush(this);
751         } catch (Throwable t) {
752             invokeExceptionCaught(t);
753         }
754     }
755
756     @Override
757     public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
758         write(msg, true, promise);
759         return promise;
760     }
761
762     void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
763         if (invokeHandler()) {
764             invokeWrite0(msg, promise);
765             invokeFlush0();
766         } else {
767             writeAndFlush(msg, promise);
768         }
769     }
770
771     private void write(Object msg, boolean flush, ChannelPromise promise) {
772         ObjectUtil.checkNotNull(msg, "msg");
773         try {
774             if (isNotValidPromise(promise, true)) {
775                 ReferenceCountUtil.release(msg);
776                 // cancelled
777                 return;
778             }
779         } catch (RuntimeException e) {
780             ReferenceCountUtil.release(msg);
781             throw e;
782         }
783
784         final AbstractChannelHandlerContext next = findContextOutbound(flush ?
785                 (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
786         final Object m = pipeline.touch(msg, next);
787         EventExecutor executor = next.executor();
788         if (executor.inEventLoop()) {
789             if (flush) {
790                 next.invokeWriteAndFlush(m, promise);
791             } else {
792                 next.invokeWrite(m, promise);
793             }
794         } else {
795             final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
796             if (!safeExecute(executor, task, promise, m, !flush)) {
797                 // We failed to submit the WriteTask. We need to cancel it so we decrement the pending bytes
798                 // and put it back in the Recycler for re-use later.
799                 //
800                 // See https://github.com/netty/netty/issues/8343.
801                 task.cancel();
802             }
803         }
804     }
805
806     @Override
807     public ChannelFuture writeAndFlush(Object msg) {
808         return writeAndFlush(msg, newPromise());
809     }
810
811     private static void notifyOutboundHandlerException(Throwable cause, ChannelPromise promise) {
812         // Only log if the given promise is not of type VoidChannelPromise as tryFailure(...) is expected to return
813         // false.
814         PromiseNotificationUtil.tryFailure(promise, cause, promise instanceof VoidChannelPromise ? null : logger);
815     }
816
817     @Override
818     public ChannelPromise newPromise() {
819         return new DefaultChannelPromise(channel(), executor());
820     }
821
822     @Override
823     public ChannelProgressivePromise newProgressivePromise() {
824         return new DefaultChannelProgressivePromise(channel(), executor());
825     }
826
827     @Override
828     public ChannelFuture newSucceededFuture() {
829         ChannelFuture succeededFuture = this.succeededFuture;
830         if (succeededFuture == null) {
831             this.succeededFuture = succeededFuture = new SucceededChannelFuture(channel(), executor());
832         }
833         return succeededFuture;
834     }
835
836     @Override
837     public ChannelFuture newFailedFuture(Throwable cause) {
838         return new FailedChannelFuture(channel(), executor(), cause);
839     }
840
841     private boolean isNotValidPromise(ChannelPromise promise, boolean allowVoidPromise) {
842         ObjectUtil.checkNotNull(promise, "promise");
843
844         if (promise.isDone()) {
845             // Check if the promise was cancelled and if so signal that the processing of the operation
846             // should not be performed.
847             //
848             // See https://github.com/netty/netty/issues/2349
849             if (promise.isCancelled()) {
850                 return true;
851             }
852             throw new IllegalArgumentException("promise already done: " + promise);
853         }
854
855         if (promise.channel() != channel()) {
856             throw new IllegalArgumentException(String.format(
857                     "promise.channel does not match: %s (expected: %s)", promise.channel(), channel()));
858         }
859
860         if (promise.getClass() == DefaultChannelPromise.class) {
861             return false;
862         }
863
864         if (!allowVoidPromise && promise instanceof VoidChannelPromise) {
865             throw new IllegalArgumentException(
866                     StringUtil.simpleClassName(VoidChannelPromise.class) + " not allowed for this operation");
867         }
868
869         if (promise instanceof AbstractChannel.CloseFuture) {
870             throw new IllegalArgumentException(
871                     StringUtil.simpleClassName(AbstractChannel.CloseFuture.class) + " not allowed in a pipeline");
872         }
873         return false;
874     }
875
876     private AbstractChannelHandlerContext findContextInbound(int mask) {
877         AbstractChannelHandlerContext ctx = this;
878         EventExecutor currentExecutor = executor();
879         do {
880             ctx = ctx.next;
881         } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND));
882         return ctx;
883     }
884
885     private AbstractChannelHandlerContext findContextOutbound(int mask) {
886         AbstractChannelHandlerContext ctx = this;
887         EventExecutor currentExecutor = executor();
888         do {
889             ctx = ctx.prev;
890         } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_OUTBOUND));
891         return ctx;
892     }
893
894     private static boolean skipContext(
895             AbstractChannelHandlerContext ctx, EventExecutor currentExecutor, int mask, int onlyMask) {
896         // Ensure we correctly handle MASK_EXCEPTION_CAUGHT which is not included in the MASK_EXCEPTION_CAUGHT
897         return (ctx.executionMask & (onlyMask | mask)) == 0 ||
898                 // We can only skip if the EventExecutor is the same as otherwise we need to ensure we offload
899                 // everything to preserve ordering.
900                 //
901                 // See https://github.com/netty/netty/issues/10067
902                 (ctx.executor() == currentExecutor && (ctx.executionMask & mask) == 0);
903     }
904
905     @Override
906     public ChannelPromise voidPromise() {
907         return channel().voidPromise();
908     }
909
910     final void setRemoved() {
911         handlerState = REMOVE_COMPLETE;
912     }
913
914     final boolean setAddComplete() {
915         for (;;) {
916             int oldState = handlerState;
917             if (oldState == REMOVE_COMPLETE) {
918                 return false;
919             }
920             // Ensure we never update when the handlerState is REMOVE_COMPLETE already.
921             // oldState is usually ADD_PENDING but can also be REMOVE_COMPLETE when an EventExecutor is used that is not
922             // exposing ordering guarantees.
923             if (HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
924                 return true;
925             }
926         }
927     }
928
929     final void setAddPending() {
930         boolean updated = HANDLER_STATE_UPDATER.compareAndSet(this, INIT, ADD_PENDING);
931         assert updated; // This should always be true as it MUST be called before setAddComplete() or setRemoved().
932     }
933
934     final void callHandlerAdded() throws Exception {
935         // We must call setAddComplete before calling handlerAdded. Otherwise if the handlerAdded method generates
936         // any pipeline events ctx.handler() will miss them because the state will not allow it.
937         if (setAddComplete()) {
938             handler().handlerAdded(this);
939         }
940     }
941
942     final void callHandlerRemoved() throws Exception {
943         try {
944             // Only call handlerRemoved(...) if we called handlerAdded(...) before.
945             if (handlerState == ADD_COMPLETE) {
946                 handler().handlerRemoved(this);
947             }
948         } finally {
949             // Mark the handler as removed in any case.
950             setRemoved();
951         }
952     }
953
954     /**
955      * Makes best possible effort to detect if {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called
956      * yet. If not return {@code false} and if called or could not detect return {@code true}.
957      *
958      * If this method returns {@code false} we will not invoke the {@link ChannelHandler} but just forward the event.
959      * This is needed as {@link DefaultChannelPipeline} may already put the {@link ChannelHandler} in the linked-list
960      * but not called {@link ChannelHandler#handlerAdded(ChannelHandlerContext)}.
961      */

962     private boolean invokeHandler() {
963         // Store in local variable to reduce volatile reads.
964         int handlerState = this.handlerState;
965         return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);
966     }
967
968     @Override
969     public boolean isRemoved() {
970         return handlerState == REMOVE_COMPLETE;
971     }
972
973     @Override
974     public <T> Attribute<T> attr(AttributeKey<T> key) {
975         return channel().attr(key);
976     }
977
978     @Override
979     public <T> boolean hasAttr(AttributeKey<T> key) {
980         return channel().hasAttr(key);
981     }
982
983     private static boolean safeExecute(EventExecutor executor, Runnable runnable,
984             ChannelPromise promise, Object msg, boolean lazy) {
985         try {
986             if (lazy && executor instanceof AbstractEventExecutor) {
987                 ((AbstractEventExecutor) executor).lazyExecute(runnable);
988             } else {
989                 executor.execute(runnable);
990             }
991             return true;
992         } catch (Throwable cause) {
993             try {
994                 promise.setFailure(cause);
995             } finally {
996                 if (msg != null) {
997                     ReferenceCountUtil.release(msg);
998                 }
999             }
1000             return false;
1001         }
1002     }
1003
1004     @Override
1005     public String toHintString() {
1006         return '\'' + name + "' will handle the message from this point.";
1007     }
1008
1009     @Override
1010     public String toString() {
1011         return StringUtil.simpleClassName(ChannelHandlerContext.class) + '(' + name + ", " + channel() + ')';
1012     }
1013
1014     static final class WriteTask implements Runnable {
1015         private static final ObjectPool<WriteTask> RECYCLER = ObjectPool.newPool(new ObjectCreator<WriteTask>() {
1016             @Override
1017             public WriteTask newObject(Handle<WriteTask> handle) {
1018                 return new WriteTask(handle);
1019             }
1020         });
1021
1022         static WriteTask newInstance(AbstractChannelHandlerContext ctx,
1023                 Object msg, ChannelPromise promise, boolean flush) {
1024             WriteTask task = RECYCLER.get();
1025             init(task, ctx, msg, promise, flush);
1026             return task;
1027         }
1028
1029         private static final boolean ESTIMATE_TASK_SIZE_ON_SUBMIT =
1030                 SystemPropertyUtil.getBoolean("io.netty.transport.estimateSizeOnSubmit"true);
1031
1032         // Assuming compressed oops, 12 bytes obj header, 4 ref fields and one int field
1033         private static final int WRITE_TASK_OVERHEAD =
1034                 SystemPropertyUtil.getInt("io.netty.transport.writeTaskSizeOverhead", 32);
1035
1036         private final Handle<WriteTask> handle;
1037         private AbstractChannelHandlerContext ctx;
1038         private Object msg;
1039         private ChannelPromise promise;
1040         private int size; // sign bit controls flush
1041
1042         @SuppressWarnings("unchecked")
1043         private WriteTask(Handle<? extends WriteTask> handle) {
1044             this.handle = (Handle<WriteTask>) handle;
1045         }
1046
1047         protected static void init(WriteTask task, AbstractChannelHandlerContext ctx,
1048                                    Object msg, ChannelPromise promise, boolean flush) {
1049             task.ctx = ctx;
1050             task.msg = msg;
1051             task.promise = promise;
1052
1053             if (ESTIMATE_TASK_SIZE_ON_SUBMIT) {
1054                 task.size = ctx.pipeline.estimatorHandle().size(msg) + WRITE_TASK_OVERHEAD;
1055                 ctx.pipeline.incrementPendingOutboundBytes(task.size);
1056             } else {
1057                 task.size = 0;
1058             }
1059             if (flush) {
1060                 task.size |= Integer.MIN_VALUE;
1061             }
1062         }
1063
1064         @Override
1065         public void run() {
1066             try {
1067                 decrementPendingOutboundBytes();
1068                 if (size >= 0) {
1069                     ctx.invokeWrite(msg, promise);
1070                 } else {
1071                     ctx.invokeWriteAndFlush(msg, promise);
1072                 }
1073             } finally {
1074                 recycle();
1075             }
1076         }
1077
1078         void cancel() {
1079             try {
1080                 decrementPendingOutboundBytes();
1081             } finally {
1082                 recycle();
1083             }
1084         }
1085
1086         private void decrementPendingOutboundBytes() {
1087             if (ESTIMATE_TASK_SIZE_ON_SUBMIT) {
1088                 ctx.pipeline.decrementPendingOutboundBytes(size & Integer.MAX_VALUE);
1089             }
1090         }
1091
1092         private void recycle() {
1093             // Set to null so the GC can collect them directly
1094             ctx = null;
1095             msg = null;
1096             promise = null;
1097             handle.recycle(this);
1098         }
1099     }
1100
1101     private static final class Tasks {
1102         private final AbstractChannelHandlerContext next;
1103         private final Runnable invokeChannelReadCompleteTask = new Runnable() {
1104             @Override
1105             public void run() {
1106                 next.invokeChannelReadComplete();
1107             }
1108         };
1109         private final Runnable invokeReadTask = new Runnable() {
1110             @Override
1111             public void run() {
1112                 next.invokeRead();
1113             }
1114         };
1115         private final Runnable invokeChannelWritableStateChangedTask = new Runnable() {
1116             @Override
1117             public void run() {
1118                 next.invokeChannelWritabilityChanged();
1119             }
1120         };
1121         private final Runnable invokeFlushTask = new Runnable() {
1122             @Override
1123             public void run() {
1124                 next.invokeFlush();
1125             }
1126         };
1127
1128         Tasks(AbstractChannelHandlerContext next) {
1129             this.next = next;
1130         }
1131     }
1132 }
1133