1
16 package io.netty.channel;
17
18 import io.netty.buffer.ByteBufAllocator;
19 import io.netty.util.Attribute;
20 import io.netty.util.AttributeKey;
21 import io.netty.util.ReferenceCountUtil;
22 import io.netty.util.ResourceLeakHint;
23 import io.netty.util.concurrent.AbstractEventExecutor;
24 import io.netty.util.concurrent.EventExecutor;
25 import io.netty.util.concurrent.OrderedEventExecutor;
26 import io.netty.util.internal.ObjectPool;
27 import io.netty.util.internal.ObjectPool.Handle;
28 import io.netty.util.internal.ObjectPool.ObjectCreator;
29 import io.netty.util.internal.PromiseNotificationUtil;
30 import io.netty.util.internal.ThrowableUtil;
31 import io.netty.util.internal.ObjectUtil;
32 import io.netty.util.internal.StringUtil;
33 import io.netty.util.internal.SystemPropertyUtil;
34 import io.netty.util.internal.logging.InternalLogger;
35 import io.netty.util.internal.logging.InternalLoggerFactory;
36
37 import java.net.SocketAddress;
38 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
39
40 import static io.netty.channel.ChannelHandlerMask.MASK_BIND;
41 import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_ACTIVE;
42 import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_INACTIVE;
43 import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_READ;
44 import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_READ_COMPLETE;
45 import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_REGISTERED;
46 import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_UNREGISTERED;
47 import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_WRITABILITY_CHANGED;
48 import static io.netty.channel.ChannelHandlerMask.MASK_CLOSE;
49 import static io.netty.channel.ChannelHandlerMask.MASK_CONNECT;
50 import static io.netty.channel.ChannelHandlerMask.MASK_DEREGISTER;
51 import static io.netty.channel.ChannelHandlerMask.MASK_DISCONNECT;
52 import static io.netty.channel.ChannelHandlerMask.MASK_EXCEPTION_CAUGHT;
53 import static io.netty.channel.ChannelHandlerMask.MASK_FLUSH;
54 import static io.netty.channel.ChannelHandlerMask.MASK_ONLY_INBOUND;
55 import static io.netty.channel.ChannelHandlerMask.MASK_ONLY_OUTBOUND;
56 import static io.netty.channel.ChannelHandlerMask.MASK_READ;
57 import static io.netty.channel.ChannelHandlerMask.MASK_USER_EVENT_TRIGGERED;
58 import static io.netty.channel.ChannelHandlerMask.MASK_WRITE;
59 import static io.netty.channel.ChannelHandlerMask.mask;
60
61 abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
62
63 private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannelHandlerContext.class);
64 volatile AbstractChannelHandlerContext next;
65 volatile AbstractChannelHandlerContext prev;
66
67 private static final AtomicIntegerFieldUpdater<AbstractChannelHandlerContext> HANDLER_STATE_UPDATER =
68 AtomicIntegerFieldUpdater.newUpdater(AbstractChannelHandlerContext.class, "handlerState");
69
70
73 private static final int ADD_PENDING = 1;
74
77 private static final int ADD_COMPLETE = 2;
78
81 private static final int REMOVE_COMPLETE = 3;
82
86 private static final int INIT = 0;
87
88 private final DefaultChannelPipeline pipeline;
89 private final String name;
90 private final boolean ordered;
91 private final int executionMask;
92
93
94
95 final EventExecutor executor;
96 private ChannelFuture succeededFuture;
97
98
99
100 private Tasks invokeTasks;
101
102 private volatile int handlerState = INIT;
103
104 AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor,
105 String name, Class<? extends ChannelHandler> handlerClass) {
106 this.name = ObjectUtil.checkNotNull(name, "name");
107 this.pipeline = pipeline;
108 this.executor = executor;
109 this.executionMask = mask(handlerClass);
110
111 ordered = executor == null || executor instanceof OrderedEventExecutor;
112 }
113
114 @Override
115 public Channel channel() {
116 return pipeline.channel();
117 }
118
119 @Override
120 public ChannelPipeline pipeline() {
121 return pipeline;
122 }
123
124 @Override
125 public ByteBufAllocator alloc() {
126 return channel().config().getAllocator();
127 }
128
129 @Override
130 public EventExecutor executor() {
131 if (executor == null) {
132 return channel().eventLoop();
133 } else {
134 return executor;
135 }
136 }
137
138 @Override
139 public String name() {
140 return name;
141 }
142
143 @Override
144 public ChannelHandlerContext fireChannelRegistered() {
145 invokeChannelRegistered(findContextInbound(MASK_CHANNEL_REGISTERED));
146 return this;
147 }
148
149 static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
150 EventExecutor executor = next.executor();
151 if (executor.inEventLoop()) {
152 next.invokeChannelRegistered();
153 } else {
154 executor.execute(new Runnable() {
155 @Override
156 public void run() {
157 next.invokeChannelRegistered();
158 }
159 });
160 }
161 }
162
163 private void invokeChannelRegistered() {
164 if (invokeHandler()) {
165 try {
166 ((ChannelInboundHandler) handler()).channelRegistered(this);
167 } catch (Throwable t) {
168 invokeExceptionCaught(t);
169 }
170 } else {
171 fireChannelRegistered();
172 }
173 }
174
175 @Override
176 public ChannelHandlerContext fireChannelUnregistered() {
177 invokeChannelUnregistered(findContextInbound(MASK_CHANNEL_UNREGISTERED));
178 return this;
179 }
180
181 static void invokeChannelUnregistered(final AbstractChannelHandlerContext next) {
182 EventExecutor executor = next.executor();
183 if (executor.inEventLoop()) {
184 next.invokeChannelUnregistered();
185 } else {
186 executor.execute(new Runnable() {
187 @Override
188 public void run() {
189 next.invokeChannelUnregistered();
190 }
191 });
192 }
193 }
194
195 private void invokeChannelUnregistered() {
196 if (invokeHandler()) {
197 try {
198 ((ChannelInboundHandler) handler()).channelUnregistered(this);
199 } catch (Throwable t) {
200 invokeExceptionCaught(t);
201 }
202 } else {
203 fireChannelUnregistered();
204 }
205 }
206
207 @Override
208 public ChannelHandlerContext fireChannelActive() {
209 invokeChannelActive(findContextInbound(MASK_CHANNEL_ACTIVE));
210 return this;
211 }
212
213 static void invokeChannelActive(final AbstractChannelHandlerContext next) {
214 EventExecutor executor = next.executor();
215 if (executor.inEventLoop()) {
216 next.invokeChannelActive();
217 } else {
218 executor.execute(new Runnable() {
219 @Override
220 public void run() {
221 next.invokeChannelActive();
222 }
223 });
224 }
225 }
226
227 private void invokeChannelActive() {
228 if (invokeHandler()) {
229 try {
230 ((ChannelInboundHandler) handler()).channelActive(this);
231 } catch (Throwable t) {
232 invokeExceptionCaught(t);
233 }
234 } else {
235 fireChannelActive();
236 }
237 }
238
239 @Override
240 public ChannelHandlerContext fireChannelInactive() {
241 invokeChannelInactive(findContextInbound(MASK_CHANNEL_INACTIVE));
242 return this;
243 }
244
245 static void invokeChannelInactive(final AbstractChannelHandlerContext next) {
246 EventExecutor executor = next.executor();
247 if (executor.inEventLoop()) {
248 next.invokeChannelInactive();
249 } else {
250 executor.execute(new Runnable() {
251 @Override
252 public void run() {
253 next.invokeChannelInactive();
254 }
255 });
256 }
257 }
258
259 private void invokeChannelInactive() {
260 if (invokeHandler()) {
261 try {
262 ((ChannelInboundHandler) handler()).channelInactive(this);
263 } catch (Throwable t) {
264 invokeExceptionCaught(t);
265 }
266 } else {
267 fireChannelInactive();
268 }
269 }
270
271 @Override
272 public ChannelHandlerContext fireExceptionCaught(final Throwable cause) {
273 invokeExceptionCaught(findContextInbound(MASK_EXCEPTION_CAUGHT), cause);
274 return this;
275 }
276
277 static void invokeExceptionCaught(final AbstractChannelHandlerContext next, final Throwable cause) {
278 ObjectUtil.checkNotNull(cause, "cause");
279 EventExecutor executor = next.executor();
280 if (executor.inEventLoop()) {
281 next.invokeExceptionCaught(cause);
282 } else {
283 try {
284 executor.execute(new Runnable() {
285 @Override
286 public void run() {
287 next.invokeExceptionCaught(cause);
288 }
289 });
290 } catch (Throwable t) {
291 if (logger.isWarnEnabled()) {
292 logger.warn("Failed to submit an exceptionCaught() event.", t);
293 logger.warn("The exceptionCaught() event that was failed to submit was:", cause);
294 }
295 }
296 }
297 }
298
299 private void invokeExceptionCaught(final Throwable cause) {
300 if (invokeHandler()) {
301 try {
302 handler().exceptionCaught(this, cause);
303 } catch (Throwable error) {
304 if (logger.isDebugEnabled()) {
305 logger.debug(
306 "An exception {}" +
307 "was thrown by a user handler's exceptionCaught() " +
308 "method while handling the following exception:",
309 ThrowableUtil.stackTraceToString(error), cause);
310 } else if (logger.isWarnEnabled()) {
311 logger.warn(
312 "An exception '{}' [enable DEBUG level for full stacktrace] " +
313 "was thrown by a user handler's exceptionCaught() " +
314 "method while handling the following exception:", error, cause);
315 }
316 }
317 } else {
318 fireExceptionCaught(cause);
319 }
320 }
321
322 @Override
323 public ChannelHandlerContext fireUserEventTriggered(final Object event) {
324 invokeUserEventTriggered(findContextInbound(MASK_USER_EVENT_TRIGGERED), event);
325 return this;
326 }
327
328 static void invokeUserEventTriggered(final AbstractChannelHandlerContext next, final Object event) {
329 ObjectUtil.checkNotNull(event, "event");
330 EventExecutor executor = next.executor();
331 if (executor.inEventLoop()) {
332 next.invokeUserEventTriggered(event);
333 } else {
334 executor.execute(new Runnable() {
335 @Override
336 public void run() {
337 next.invokeUserEventTriggered(event);
338 }
339 });
340 }
341 }
342
343 private void invokeUserEventTriggered(Object event) {
344 if (invokeHandler()) {
345 try {
346 ((ChannelInboundHandler) handler()).userEventTriggered(this, event);
347 } catch (Throwable t) {
348 invokeExceptionCaught(t);
349 }
350 } else {
351 fireUserEventTriggered(event);
352 }
353 }
354
355 @Override
356 public ChannelHandlerContext fireChannelRead(final Object msg) {
357 invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
358 return this;
359 }
360
361 static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
362 final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
363 EventExecutor executor = next.executor();
364 if (executor.inEventLoop()) {
365 next.invokeChannelRead(m);
366 } else {
367 executor.execute(new Runnable() {
368 @Override
369 public void run() {
370 next.invokeChannelRead(m);
371 }
372 });
373 }
374 }
375
376 private void invokeChannelRead(Object msg) {
377 if (invokeHandler()) {
378 try {
379 ((ChannelInboundHandler) handler()).channelRead(this, msg);
380 } catch (Throwable t) {
381 invokeExceptionCaught(t);
382 }
383 } else {
384 fireChannelRead(msg);
385 }
386 }
387
388 @Override
389 public ChannelHandlerContext fireChannelReadComplete() {
390 invokeChannelReadComplete(findContextInbound(MASK_CHANNEL_READ_COMPLETE));
391 return this;
392 }
393
394 static void invokeChannelReadComplete(final AbstractChannelHandlerContext next) {
395 EventExecutor executor = next.executor();
396 if (executor.inEventLoop()) {
397 next.invokeChannelReadComplete();
398 } else {
399 Tasks tasks = next.invokeTasks;
400 if (tasks == null) {
401 next.invokeTasks = tasks = new Tasks(next);
402 }
403 executor.execute(tasks.invokeChannelReadCompleteTask);
404 }
405 }
406
407 private void invokeChannelReadComplete() {
408 if (invokeHandler()) {
409 try {
410 ((ChannelInboundHandler) handler()).channelReadComplete(this);
411 } catch (Throwable t) {
412 invokeExceptionCaught(t);
413 }
414 } else {
415 fireChannelReadComplete();
416 }
417 }
418
419 @Override
420 public ChannelHandlerContext fireChannelWritabilityChanged() {
421 invokeChannelWritabilityChanged(findContextInbound(MASK_CHANNEL_WRITABILITY_CHANGED));
422 return this;
423 }
424
425 static void invokeChannelWritabilityChanged(final AbstractChannelHandlerContext next) {
426 EventExecutor executor = next.executor();
427 if (executor.inEventLoop()) {
428 next.invokeChannelWritabilityChanged();
429 } else {
430 Tasks tasks = next.invokeTasks;
431 if (tasks == null) {
432 next.invokeTasks = tasks = new Tasks(next);
433 }
434 executor.execute(tasks.invokeChannelWritableStateChangedTask);
435 }
436 }
437
438 private void invokeChannelWritabilityChanged() {
439 if (invokeHandler()) {
440 try {
441 ((ChannelInboundHandler) handler()).channelWritabilityChanged(this);
442 } catch (Throwable t) {
443 invokeExceptionCaught(t);
444 }
445 } else {
446 fireChannelWritabilityChanged();
447 }
448 }
449
450 @Override
451 public ChannelFuture bind(SocketAddress localAddress) {
452 return bind(localAddress, newPromise());
453 }
454
455 @Override
456 public ChannelFuture connect(SocketAddress remoteAddress) {
457 return connect(remoteAddress, newPromise());
458 }
459
460 @Override
461 public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
462 return connect(remoteAddress, localAddress, newPromise());
463 }
464
465 @Override
466 public ChannelFuture disconnect() {
467 return disconnect(newPromise());
468 }
469
470 @Override
471 public ChannelFuture close() {
472 return close(newPromise());
473 }
474
475 @Override
476 public ChannelFuture deregister() {
477 return deregister(newPromise());
478 }
479
480 @Override
481 public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
482 ObjectUtil.checkNotNull(localAddress, "localAddress");
483 if (isNotValidPromise(promise, false)) {
484
485 return promise;
486 }
487
488 final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
489 EventExecutor executor = next.executor();
490 if (executor.inEventLoop()) {
491 next.invokeBind(localAddress, promise);
492 } else {
493 safeExecute(executor, new Runnable() {
494 @Override
495 public void run() {
496 next.invokeBind(localAddress, promise);
497 }
498 }, promise, null, false);
499 }
500 return promise;
501 }
502
503 private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
504 if (invokeHandler()) {
505 try {
506 ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
507 } catch (Throwable t) {
508 notifyOutboundHandlerException(t, promise);
509 }
510 } else {
511 bind(localAddress, promise);
512 }
513 }
514
515 @Override
516 public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
517 return connect(remoteAddress, null, promise);
518 }
519
520 @Override
521 public ChannelFuture connect(
522 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
523 ObjectUtil.checkNotNull(remoteAddress, "remoteAddress");
524
525 if (isNotValidPromise(promise, false)) {
526
527 return promise;
528 }
529
530 final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT);
531 EventExecutor executor = next.executor();
532 if (executor.inEventLoop()) {
533 next.invokeConnect(remoteAddress, localAddress, promise);
534 } else {
535 safeExecute(executor, new Runnable() {
536 @Override
537 public void run() {
538 next.invokeConnect(remoteAddress, localAddress, promise);
539 }
540 }, promise, null, false);
541 }
542 return promise;
543 }
544
545 private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
546 if (invokeHandler()) {
547 try {
548 ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
549 } catch (Throwable t) {
550 notifyOutboundHandlerException(t, promise);
551 }
552 } else {
553 connect(remoteAddress, localAddress, promise);
554 }
555 }
556
557 @Override
558 public ChannelFuture disconnect(final ChannelPromise promise) {
559 if (!channel().metadata().hasDisconnect()) {
560
561
562 return close(promise);
563 }
564 if (isNotValidPromise(promise, false)) {
565
566 return promise;
567 }
568
569 final AbstractChannelHandlerContext next = findContextOutbound(MASK_DISCONNECT);
570 EventExecutor executor = next.executor();
571 if (executor.inEventLoop()) {
572 next.invokeDisconnect(promise);
573 } else {
574 safeExecute(executor, new Runnable() {
575 @Override
576 public void run() {
577 next.invokeDisconnect(promise);
578 }
579 }, promise, null, false);
580 }
581 return promise;
582 }
583
584 private void invokeDisconnect(ChannelPromise promise) {
585 if (invokeHandler()) {
586 try {
587 ((ChannelOutboundHandler) handler()).disconnect(this, promise);
588 } catch (Throwable t) {
589 notifyOutboundHandlerException(t, promise);
590 }
591 } else {
592 disconnect(promise);
593 }
594 }
595
596 @Override
597 public ChannelFuture close(final ChannelPromise promise) {
598 if (isNotValidPromise(promise, false)) {
599
600 return promise;
601 }
602
603 final AbstractChannelHandlerContext next = findContextOutbound(MASK_CLOSE);
604 EventExecutor executor = next.executor();
605 if (executor.inEventLoop()) {
606 next.invokeClose(promise);
607 } else {
608 safeExecute(executor, new Runnable() {
609 @Override
610 public void run() {
611 next.invokeClose(promise);
612 }
613 }, promise, null, false);
614 }
615
616 return promise;
617 }
618
619 private void invokeClose(ChannelPromise promise) {
620 if (invokeHandler()) {
621 try {
622 ((ChannelOutboundHandler) handler()).close(this, promise);
623 } catch (Throwable t) {
624 notifyOutboundHandlerException(t, promise);
625 }
626 } else {
627 close(promise);
628 }
629 }
630
631 @Override
632 public ChannelFuture deregister(final ChannelPromise promise) {
633 if (isNotValidPromise(promise, false)) {
634
635 return promise;
636 }
637
638 final AbstractChannelHandlerContext next = findContextOutbound(MASK_DEREGISTER);
639 EventExecutor executor = next.executor();
640 if (executor.inEventLoop()) {
641 next.invokeDeregister(promise);
642 } else {
643 safeExecute(executor, new Runnable() {
644 @Override
645 public void run() {
646 next.invokeDeregister(promise);
647 }
648 }, promise, null, false);
649 }
650
651 return promise;
652 }
653
654 private void invokeDeregister(ChannelPromise promise) {
655 if (invokeHandler()) {
656 try {
657 ((ChannelOutboundHandler) handler()).deregister(this, promise);
658 } catch (Throwable t) {
659 notifyOutboundHandlerException(t, promise);
660 }
661 } else {
662 deregister(promise);
663 }
664 }
665
666 @Override
667 public ChannelHandlerContext read() {
668 final AbstractChannelHandlerContext next = findContextOutbound(MASK_READ);
669 EventExecutor executor = next.executor();
670 if (executor.inEventLoop()) {
671 next.invokeRead();
672 } else {
673 Tasks tasks = next.invokeTasks;
674 if (tasks == null) {
675 next.invokeTasks = tasks = new Tasks(next);
676 }
677 executor.execute(tasks.invokeReadTask);
678 }
679
680 return this;
681 }
682
683 private void invokeRead() {
684 if (invokeHandler()) {
685 try {
686 ((ChannelOutboundHandler) handler()).read(this);
687 } catch (Throwable t) {
688 invokeExceptionCaught(t);
689 }
690 } else {
691 read();
692 }
693 }
694
695 @Override
696 public ChannelFuture write(Object msg) {
697 return write(msg, newPromise());
698 }
699
700 @Override
701 public ChannelFuture write(final Object msg, final ChannelPromise promise) {
702 write(msg, false, promise);
703
704 return promise;
705 }
706
707 void invokeWrite(Object msg, ChannelPromise promise) {
708 if (invokeHandler()) {
709 invokeWrite0(msg, promise);
710 } else {
711 write(msg, promise);
712 }
713 }
714
715 private void invokeWrite0(Object msg, ChannelPromise promise) {
716 try {
717 ((ChannelOutboundHandler) handler()).write(this, msg, promise);
718 } catch (Throwable t) {
719 notifyOutboundHandlerException(t, promise);
720 }
721 }
722
723 @Override
724 public ChannelHandlerContext flush() {
725 final AbstractChannelHandlerContext next = findContextOutbound(MASK_FLUSH);
726 EventExecutor executor = next.executor();
727 if (executor.inEventLoop()) {
728 next.invokeFlush();
729 } else {
730 Tasks tasks = next.invokeTasks;
731 if (tasks == null) {
732 next.invokeTasks = tasks = new Tasks(next);
733 }
734 safeExecute(executor, tasks.invokeFlushTask, channel().voidPromise(), null, false);
735 }
736
737 return this;
738 }
739
740 private void invokeFlush() {
741 if (invokeHandler()) {
742 invokeFlush0();
743 } else {
744 flush();
745 }
746 }
747
748 private void invokeFlush0() {
749 try {
750 ((ChannelOutboundHandler) handler()).flush(this);
751 } catch (Throwable t) {
752 invokeExceptionCaught(t);
753 }
754 }
755
756 @Override
757 public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
758 write(msg, true, promise);
759 return promise;
760 }
761
762 void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
763 if (invokeHandler()) {
764 invokeWrite0(msg, promise);
765 invokeFlush0();
766 } else {
767 writeAndFlush(msg, promise);
768 }
769 }
770
771 private void write(Object msg, boolean flush, ChannelPromise promise) {
772 ObjectUtil.checkNotNull(msg, "msg");
773 try {
774 if (isNotValidPromise(promise, true)) {
775 ReferenceCountUtil.release(msg);
776
777 return;
778 }
779 } catch (RuntimeException e) {
780 ReferenceCountUtil.release(msg);
781 throw e;
782 }
783
784 final AbstractChannelHandlerContext next = findContextOutbound(flush ?
785 (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
786 final Object m = pipeline.touch(msg, next);
787 EventExecutor executor = next.executor();
788 if (executor.inEventLoop()) {
789 if (flush) {
790 next.invokeWriteAndFlush(m, promise);
791 } else {
792 next.invokeWrite(m, promise);
793 }
794 } else {
795 final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
796 if (!safeExecute(executor, task, promise, m, !flush)) {
797
798
799
800
801 task.cancel();
802 }
803 }
804 }
805
806 @Override
807 public ChannelFuture writeAndFlush(Object msg) {
808 return writeAndFlush(msg, newPromise());
809 }
810
811 private static void notifyOutboundHandlerException(Throwable cause, ChannelPromise promise) {
812
813
814 PromiseNotificationUtil.tryFailure(promise, cause, promise instanceof VoidChannelPromise ? null : logger);
815 }
816
817 @Override
818 public ChannelPromise newPromise() {
819 return new DefaultChannelPromise(channel(), executor());
820 }
821
822 @Override
823 public ChannelProgressivePromise newProgressivePromise() {
824 return new DefaultChannelProgressivePromise(channel(), executor());
825 }
826
827 @Override
828 public ChannelFuture newSucceededFuture() {
829 ChannelFuture succeededFuture = this.succeededFuture;
830 if (succeededFuture == null) {
831 this.succeededFuture = succeededFuture = new SucceededChannelFuture(channel(), executor());
832 }
833 return succeededFuture;
834 }
835
836 @Override
837 public ChannelFuture newFailedFuture(Throwable cause) {
838 return new FailedChannelFuture(channel(), executor(), cause);
839 }
840
841 private boolean isNotValidPromise(ChannelPromise promise, boolean allowVoidPromise) {
842 ObjectUtil.checkNotNull(promise, "promise");
843
844 if (promise.isDone()) {
845
846
847
848
849 if (promise.isCancelled()) {
850 return true;
851 }
852 throw new IllegalArgumentException("promise already done: " + promise);
853 }
854
855 if (promise.channel() != channel()) {
856 throw new IllegalArgumentException(String.format(
857 "promise.channel does not match: %s (expected: %s)", promise.channel(), channel()));
858 }
859
860 if (promise.getClass() == DefaultChannelPromise.class) {
861 return false;
862 }
863
864 if (!allowVoidPromise && promise instanceof VoidChannelPromise) {
865 throw new IllegalArgumentException(
866 StringUtil.simpleClassName(VoidChannelPromise.class) + " not allowed for this operation");
867 }
868
869 if (promise instanceof AbstractChannel.CloseFuture) {
870 throw new IllegalArgumentException(
871 StringUtil.simpleClassName(AbstractChannel.CloseFuture.class) + " not allowed in a pipeline");
872 }
873 return false;
874 }
875
876 private AbstractChannelHandlerContext findContextInbound(int mask) {
877 AbstractChannelHandlerContext ctx = this;
878 EventExecutor currentExecutor = executor();
879 do {
880 ctx = ctx.next;
881 } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND));
882 return ctx;
883 }
884
885 private AbstractChannelHandlerContext findContextOutbound(int mask) {
886 AbstractChannelHandlerContext ctx = this;
887 EventExecutor currentExecutor = executor();
888 do {
889 ctx = ctx.prev;
890 } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_OUTBOUND));
891 return ctx;
892 }
893
894 private static boolean skipContext(
895 AbstractChannelHandlerContext ctx, EventExecutor currentExecutor, int mask, int onlyMask) {
896
897 return (ctx.executionMask & (onlyMask | mask)) == 0 ||
898
899
900
901
902 (ctx.executor() == currentExecutor && (ctx.executionMask & mask) == 0);
903 }
904
905 @Override
906 public ChannelPromise voidPromise() {
907 return channel().voidPromise();
908 }
909
910 final void setRemoved() {
911 handlerState = REMOVE_COMPLETE;
912 }
913
914 final boolean setAddComplete() {
915 for (;;) {
916 int oldState = handlerState;
917 if (oldState == REMOVE_COMPLETE) {
918 return false;
919 }
920
921
922
923 if (HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
924 return true;
925 }
926 }
927 }
928
929 final void setAddPending() {
930 boolean updated = HANDLER_STATE_UPDATER.compareAndSet(this, INIT, ADD_PENDING);
931 assert updated;
932 }
933
934 final void callHandlerAdded() throws Exception {
935
936
937 if (setAddComplete()) {
938 handler().handlerAdded(this);
939 }
940 }
941
942 final void callHandlerRemoved() throws Exception {
943 try {
944
945 if (handlerState == ADD_COMPLETE) {
946 handler().handlerRemoved(this);
947 }
948 } finally {
949
950 setRemoved();
951 }
952 }
953
954
962 private boolean invokeHandler() {
963
964 int handlerState = this.handlerState;
965 return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);
966 }
967
968 @Override
969 public boolean isRemoved() {
970 return handlerState == REMOVE_COMPLETE;
971 }
972
973 @Override
974 public <T> Attribute<T> attr(AttributeKey<T> key) {
975 return channel().attr(key);
976 }
977
978 @Override
979 public <T> boolean hasAttr(AttributeKey<T> key) {
980 return channel().hasAttr(key);
981 }
982
983 private static boolean safeExecute(EventExecutor executor, Runnable runnable,
984 ChannelPromise promise, Object msg, boolean lazy) {
985 try {
986 if (lazy && executor instanceof AbstractEventExecutor) {
987 ((AbstractEventExecutor) executor).lazyExecute(runnable);
988 } else {
989 executor.execute(runnable);
990 }
991 return true;
992 } catch (Throwable cause) {
993 try {
994 promise.setFailure(cause);
995 } finally {
996 if (msg != null) {
997 ReferenceCountUtil.release(msg);
998 }
999 }
1000 return false;
1001 }
1002 }
1003
1004 @Override
1005 public String toHintString() {
1006 return '\'' + name + "' will handle the message from this point.";
1007 }
1008
1009 @Override
1010 public String toString() {
1011 return StringUtil.simpleClassName(ChannelHandlerContext.class) + '(' + name + ", " + channel() + ')';
1012 }
1013
1014 static final class WriteTask implements Runnable {
1015 private static final ObjectPool<WriteTask> RECYCLER = ObjectPool.newPool(new ObjectCreator<WriteTask>() {
1016 @Override
1017 public WriteTask newObject(Handle<WriteTask> handle) {
1018 return new WriteTask(handle);
1019 }
1020 });
1021
1022 static WriteTask newInstance(AbstractChannelHandlerContext ctx,
1023 Object msg, ChannelPromise promise, boolean flush) {
1024 WriteTask task = RECYCLER.get();
1025 init(task, ctx, msg, promise, flush);
1026 return task;
1027 }
1028
1029 private static final boolean ESTIMATE_TASK_SIZE_ON_SUBMIT =
1030 SystemPropertyUtil.getBoolean("io.netty.transport.estimateSizeOnSubmit", true);
1031
1032
1033 private static final int WRITE_TASK_OVERHEAD =
1034 SystemPropertyUtil.getInt("io.netty.transport.writeTaskSizeOverhead", 32);
1035
1036 private final Handle<WriteTask> handle;
1037 private AbstractChannelHandlerContext ctx;
1038 private Object msg;
1039 private ChannelPromise promise;
1040 private int size;
1041
1042 @SuppressWarnings("unchecked")
1043 private WriteTask(Handle<? extends WriteTask> handle) {
1044 this.handle = (Handle<WriteTask>) handle;
1045 }
1046
1047 protected static void init(WriteTask task, AbstractChannelHandlerContext ctx,
1048 Object msg, ChannelPromise promise, boolean flush) {
1049 task.ctx = ctx;
1050 task.msg = msg;
1051 task.promise = promise;
1052
1053 if (ESTIMATE_TASK_SIZE_ON_SUBMIT) {
1054 task.size = ctx.pipeline.estimatorHandle().size(msg) + WRITE_TASK_OVERHEAD;
1055 ctx.pipeline.incrementPendingOutboundBytes(task.size);
1056 } else {
1057 task.size = 0;
1058 }
1059 if (flush) {
1060 task.size |= Integer.MIN_VALUE;
1061 }
1062 }
1063
1064 @Override
1065 public void run() {
1066 try {
1067 decrementPendingOutboundBytes();
1068 if (size >= 0) {
1069 ctx.invokeWrite(msg, promise);
1070 } else {
1071 ctx.invokeWriteAndFlush(msg, promise);
1072 }
1073 } finally {
1074 recycle();
1075 }
1076 }
1077
1078 void cancel() {
1079 try {
1080 decrementPendingOutboundBytes();
1081 } finally {
1082 recycle();
1083 }
1084 }
1085
1086 private void decrementPendingOutboundBytes() {
1087 if (ESTIMATE_TASK_SIZE_ON_SUBMIT) {
1088 ctx.pipeline.decrementPendingOutboundBytes(size & Integer.MAX_VALUE);
1089 }
1090 }
1091
1092 private void recycle() {
1093
1094 ctx = null;
1095 msg = null;
1096 promise = null;
1097 handle.recycle(this);
1098 }
1099 }
1100
1101 private static final class Tasks {
1102 private final AbstractChannelHandlerContext next;
1103 private final Runnable invokeChannelReadCompleteTask = new Runnable() {
1104 @Override
1105 public void run() {
1106 next.invokeChannelReadComplete();
1107 }
1108 };
1109 private final Runnable invokeReadTask = new Runnable() {
1110 @Override
1111 public void run() {
1112 next.invokeRead();
1113 }
1114 };
1115 private final Runnable invokeChannelWritableStateChangedTask = new Runnable() {
1116 @Override
1117 public void run() {
1118 next.invokeChannelWritabilityChanged();
1119 }
1120 };
1121 private final Runnable invokeFlushTask = new Runnable() {
1122 @Override
1123 public void run() {
1124 next.invokeFlush();
1125 }
1126 };
1127
1128 Tasks(AbstractChannelHandlerContext next) {
1129 this.next = next;
1130 }
1131 }
1132 }
1133