1 /*
2  * Copyright 2012 The Netty Project
3  *
4  * The Netty Project licenses this file to you under the Apache License,
5  * version 2.0 (the "License"); you may not use this file except in compliance
6  * with the License. You may obtain a copy of the License at:
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13  * License for the specific language governing permissions and limitations
14  * under the License.
15  */

16 package io.netty.channel;
17
18 import io.netty.buffer.ByteBufAllocator;
19 import io.netty.util.Attribute;
20 import io.netty.util.AttributeKey;
21 import io.netty.util.concurrent.EventExecutor;
22 import io.netty.util.internal.ObjectUtil;
23 import io.netty.util.internal.ThrowableUtil;
24 import io.netty.util.internal.logging.InternalLogger;
25 import io.netty.util.internal.logging.InternalLoggerFactory;
26
27 import java.net.SocketAddress;
28
29 /**
30  *  Combines a {@link ChannelInboundHandler} and a {@link ChannelOutboundHandler} into one {@link ChannelHandler}.
31  */

32 public class CombinedChannelDuplexHandler<I extends ChannelInboundHandler, O extends ChannelOutboundHandler>
33         extends ChannelDuplexHandler {
34
35     private static final InternalLogger logger = InternalLoggerFactory.getInstance(CombinedChannelDuplexHandler.class);
36
37     private DelegatingChannelHandlerContext inboundCtx;
38     private DelegatingChannelHandlerContext outboundCtx;
39     private volatile boolean handlerAdded;
40
41     private I inboundHandler;
42     private O outboundHandler;
43
44     /**
45      * Creates a new uninitialized instance. A class that extends this handler must invoke
46      * {@link #init(ChannelInboundHandler, ChannelOutboundHandler)} before adding this handler into a
47      * {@link ChannelPipeline}.
48      */

49     protected CombinedChannelDuplexHandler() {
50         ensureNotSharable();
51     }
52
53     /**
54      * Creates a new instance that combines the specified two handlers into one.
55      */

56     public CombinedChannelDuplexHandler(I inboundHandler, O outboundHandler) {
57         ensureNotSharable();
58         init(inboundHandler, outboundHandler);
59     }
60
61     /**
62      * Initialized this handler with the specified handlers.
63      *
64      * @throws IllegalStateException if this handler was not constructed via the default constructor or
65      *                               if this handler does not implement all required handler interfaces
66      * @throws IllegalArgumentException if the specified handlers cannot be combined into one due to a conflict
67      *                                  in the type hierarchy
68      */

69     protected final void init(I inboundHandler, O outboundHandler) {
70         validate(inboundHandler, outboundHandler);
71         this.inboundHandler = inboundHandler;
72         this.outboundHandler = outboundHandler;
73     }
74
75     private void validate(I inboundHandler, O outboundHandler) {
76         if (this.inboundHandler != null) {
77             throw new IllegalStateException(
78                     "init() can not be invoked if " + CombinedChannelDuplexHandler.class.getSimpleName() +
79                             " was constructed with non-default constructor.");
80         }
81
82         ObjectUtil.checkNotNull(inboundHandler, "inboundHandler");
83         ObjectUtil.checkNotNull(outboundHandler, "outboundHandler");
84
85         if (inboundHandler instanceof ChannelOutboundHandler) {
86             throw new IllegalArgumentException(
87                     "inboundHandler must not implement " +
88                     ChannelOutboundHandler.class.getSimpleName() + " to get combined.");
89         }
90         if (outboundHandler instanceof ChannelInboundHandler) {
91             throw new IllegalArgumentException(
92                     "outboundHandler must not implement " +
93                     ChannelInboundHandler.class.getSimpleName() + " to get combined.");
94         }
95     }
96
97     protected final I inboundHandler() {
98         return inboundHandler;
99     }
100
101     protected final O outboundHandler() {
102         return outboundHandler;
103     }
104
105     private void checkAdded() {
106         if (!handlerAdded) {
107             throw new IllegalStateException("handler not added to pipeline yet");
108         }
109     }
110
111     /**
112      * Removes the {@link ChannelInboundHandler} that was combined in this {@link CombinedChannelDuplexHandler}.
113      */

114     public final void removeInboundHandler() {
115         checkAdded();
116         inboundCtx.remove();
117     }
118
119     /**
120      * Removes the {@link ChannelOutboundHandler} that was combined in this {@link CombinedChannelDuplexHandler}.
121      */

122     public final void removeOutboundHandler() {
123         checkAdded();
124         outboundCtx.remove();
125     }
126
127     @Override
128     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
129         if (inboundHandler == null) {
130             throw new IllegalStateException(
131                     "init() must be invoked before being added to a " + ChannelPipeline.class.getSimpleName() +
132                             if " +  CombinedChannelDuplexHandler.class.getSimpleName() +
133                             " was constructed with the default constructor.");
134         }
135
136         outboundCtx = new DelegatingChannelHandlerContext(ctx, outboundHandler);
137         inboundCtx = new DelegatingChannelHandlerContext(ctx, inboundHandler) {
138             @SuppressWarnings("deprecation")
139             @Override
140             public ChannelHandlerContext fireExceptionCaught(Throwable cause) {
141                 if (!outboundCtx.removed) {
142                     try {
143                         // We directly delegate to the ChannelOutboundHandler as this may override exceptionCaught(...)
144                         // as well
145                         outboundHandler.exceptionCaught(outboundCtx, cause);
146                     } catch (Throwable error) {
147                         if (logger.isDebugEnabled()) {
148                             logger.debug(
149                                     "An exception {}" +
150                                     "was thrown by a user handler's exceptionCaught() " +
151                                     "method while handling the following exception:",
152                                     ThrowableUtil.stackTraceToString(error), cause);
153                         } else if (logger.isWarnEnabled()) {
154                             logger.warn(
155                                     "An exception '{}' [enable DEBUG level for full stacktrace] " +
156                                     "was thrown by a user handler's exceptionCaught() " +
157                                     "method while handling the following exception:", error, cause);
158                         }
159                     }
160                 } else {
161                     super.fireExceptionCaught(cause);
162                 }
163                 return this;
164             }
165         };
166
167         // The inboundCtx and outboundCtx were created and set now it's safe to call removeInboundHandler() and
168         // removeOutboundHandler().
169         handlerAdded = true;
170
171         try {
172             inboundHandler.handlerAdded(inboundCtx);
173         } finally {
174             outboundHandler.handlerAdded(outboundCtx);
175         }
176     }
177
178     @Override
179     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
180         try {
181             inboundCtx.remove();
182         } finally {
183             outboundCtx.remove();
184         }
185     }
186
187     @Override
188     public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
189         assert ctx == inboundCtx.ctx;
190         if (!inboundCtx.removed) {
191             inboundHandler.channelRegistered(inboundCtx);
192         } else {
193             inboundCtx.fireChannelRegistered();
194         }
195     }
196
197     @Override
198     public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
199         assert ctx == inboundCtx.ctx;
200         if (!inboundCtx.removed) {
201             inboundHandler.channelUnregistered(inboundCtx);
202         } else {
203             inboundCtx.fireChannelUnregistered();
204         }
205     }
206
207     @Override
208     public void channelActive(ChannelHandlerContext ctx) throws Exception {
209         assert ctx == inboundCtx.ctx;
210         if (!inboundCtx.removed) {
211             inboundHandler.channelActive(inboundCtx);
212         } else {
213             inboundCtx.fireChannelActive();
214         }
215     }
216
217     @Override
218     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
219         assert ctx == inboundCtx.ctx;
220         if (!inboundCtx.removed) {
221             inboundHandler.channelInactive(inboundCtx);
222         } else {
223             inboundCtx.fireChannelInactive();
224         }
225     }
226
227     @Override
228     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
229         assert ctx == inboundCtx.ctx;
230         if (!inboundCtx.removed) {
231             inboundHandler.exceptionCaught(inboundCtx, cause);
232         } else {
233             inboundCtx.fireExceptionCaught(cause);
234         }
235     }
236
237     @Override
238     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
239         assert ctx == inboundCtx.ctx;
240         if (!inboundCtx.removed) {
241             inboundHandler.userEventTriggered(inboundCtx, evt);
242         } else {
243             inboundCtx.fireUserEventTriggered(evt);
244         }
245     }
246
247     @Override
248     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
249         assert ctx == inboundCtx.ctx;
250         if (!inboundCtx.removed) {
251             inboundHandler.channelRead(inboundCtx, msg);
252         } else {
253             inboundCtx.fireChannelRead(msg);
254         }
255     }
256
257     @Override
258     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
259         assert ctx == inboundCtx.ctx;
260         if (!inboundCtx.removed) {
261             inboundHandler.channelReadComplete(inboundCtx);
262         } else {
263             inboundCtx.fireChannelReadComplete();
264         }
265     }
266
267     @Override
268     public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
269         assert ctx == inboundCtx.ctx;
270         if (!inboundCtx.removed) {
271             inboundHandler.channelWritabilityChanged(inboundCtx);
272         } else {
273             inboundCtx.fireChannelWritabilityChanged();
274         }
275     }
276
277     @Override
278     public void bind(
279             ChannelHandlerContext ctx,
280             SocketAddress localAddress, ChannelPromise promise) throws Exception {
281         assert ctx == outboundCtx.ctx;
282         if (!outboundCtx.removed) {
283             outboundHandler.bind(outboundCtx, localAddress, promise);
284         } else {
285             outboundCtx.bind(localAddress, promise);
286         }
287     }
288
289     @Override
290     public void connect(
291             ChannelHandlerContext ctx,
292             SocketAddress remoteAddress, SocketAddress localAddress,
293             ChannelPromise promise) throws Exception {
294         assert ctx == outboundCtx.ctx;
295         if (!outboundCtx.removed) {
296             outboundHandler.connect(outboundCtx, remoteAddress, localAddress, promise);
297         } else {
298             outboundCtx.connect(localAddress, promise);
299         }
300     }
301
302     @Override
303     public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
304         assert ctx == outboundCtx.ctx;
305         if (!outboundCtx.removed) {
306             outboundHandler.disconnect(outboundCtx, promise);
307         } else {
308             outboundCtx.disconnect(promise);
309         }
310     }
311
312     @Override
313     public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
314         assert ctx == outboundCtx.ctx;
315         if (!outboundCtx.removed) {
316             outboundHandler.close(outboundCtx, promise);
317         } else {
318             outboundCtx.close(promise);
319         }
320     }
321
322     @Override
323     public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
324         assert ctx == outboundCtx.ctx;
325         if (!outboundCtx.removed) {
326             outboundHandler.deregister(outboundCtx, promise);
327         } else {
328             outboundCtx.deregister(promise);
329         }
330     }
331
332     @Override
333     public void read(ChannelHandlerContext ctx) throws Exception {
334         assert ctx == outboundCtx.ctx;
335         if (!outboundCtx.removed) {
336             outboundHandler.read(outboundCtx);
337         } else {
338             outboundCtx.read();
339         }
340     }
341
342     @Override
343     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
344         assert ctx == outboundCtx.ctx;
345         if (!outboundCtx.removed) {
346             outboundHandler.write(outboundCtx, msg, promise);
347         } else {
348             outboundCtx.write(msg, promise);
349         }
350     }
351
352     @Override
353     public void flush(ChannelHandlerContext ctx) throws Exception {
354         assert ctx == outboundCtx.ctx;
355         if (!outboundCtx.removed) {
356             outboundHandler.flush(outboundCtx);
357         } else {
358             outboundCtx.flush();
359         }
360     }
361
362     private static class DelegatingChannelHandlerContext implements ChannelHandlerContext {
363
364         private final ChannelHandlerContext ctx;
365         private final ChannelHandler handler;
366         boolean removed;
367
368         DelegatingChannelHandlerContext(ChannelHandlerContext ctx, ChannelHandler handler) {
369             this.ctx = ctx;
370             this.handler = handler;
371         }
372
373         @Override
374         public Channel channel() {
375             return ctx.channel();
376         }
377
378         @Override
379         public EventExecutor executor() {
380             return ctx.executor();
381         }
382
383         @Override
384         public String name() {
385             return ctx.name();
386         }
387
388         @Override
389         public ChannelHandler handler() {
390             return ctx.handler();
391         }
392
393         @Override
394         public boolean isRemoved() {
395             return removed || ctx.isRemoved();
396         }
397
398         @Override
399         public ChannelHandlerContext fireChannelRegistered() {
400             ctx.fireChannelRegistered();
401             return this;
402         }
403
404         @Override
405         public ChannelHandlerContext fireChannelUnregistered() {
406             ctx.fireChannelUnregistered();
407             return this;
408         }
409
410         @Override
411         public ChannelHandlerContext fireChannelActive() {
412             ctx.fireChannelActive();
413             return this;
414         }
415
416         @Override
417         public ChannelHandlerContext fireChannelInactive() {
418             ctx.fireChannelInactive();
419             return this;
420         }
421
422         @Override
423         public ChannelHandlerContext fireExceptionCaught(Throwable cause) {
424             ctx.fireExceptionCaught(cause);
425             return this;
426         }
427
428         @Override
429         public ChannelHandlerContext fireUserEventTriggered(Object event) {
430             ctx.fireUserEventTriggered(event);
431             return this;
432         }
433
434         @Override
435         public ChannelHandlerContext fireChannelRead(Object msg) {
436             ctx.fireChannelRead(msg);
437             return this;
438         }
439
440         @Override
441         public ChannelHandlerContext fireChannelReadComplete() {
442             ctx.fireChannelReadComplete();
443             return this;
444         }
445
446         @Override
447         public ChannelHandlerContext fireChannelWritabilityChanged() {
448             ctx.fireChannelWritabilityChanged();
449             return this;
450         }
451
452         @Override
453         public ChannelFuture bind(SocketAddress localAddress) {
454             return ctx.bind(localAddress);
455         }
456
457         @Override
458         public ChannelFuture connect(SocketAddress remoteAddress) {
459             return ctx.connect(remoteAddress);
460         }
461
462         @Override
463         public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
464             return ctx.connect(remoteAddress, localAddress);
465         }
466
467         @Override
468         public ChannelFuture disconnect() {
469             return ctx.disconnect();
470         }
471
472         @Override
473         public ChannelFuture close() {
474             return ctx.close();
475         }
476
477         @Override
478         public ChannelFuture deregister() {
479             return ctx.deregister();
480         }
481
482         @Override
483         public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
484             return ctx.bind(localAddress, promise);
485         }
486
487         @Override
488         public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
489             return ctx.connect(remoteAddress, promise);
490         }
491
492         @Override
493         public ChannelFuture connect(
494                 SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
495             return ctx.connect(remoteAddress, localAddress, promise);
496         }
497
498         @Override
499         public ChannelFuture disconnect(ChannelPromise promise) {
500             return ctx.disconnect(promise);
501         }
502
503         @Override
504         public ChannelFuture close(ChannelPromise promise) {
505             return ctx.close(promise);
506         }
507
508         @Override
509         public ChannelFuture deregister(ChannelPromise promise) {
510             return ctx.deregister(promise);
511         }
512
513         @Override
514         public ChannelHandlerContext read() {
515             ctx.read();
516             return this;
517         }
518
519         @Override
520         public ChannelFuture write(Object msg) {
521             return ctx.write(msg);
522         }
523
524         @Override
525         public ChannelFuture write(Object msg, ChannelPromise promise) {
526             return ctx.write(msg, promise);
527         }
528
529         @Override
530         public ChannelHandlerContext flush() {
531             ctx.flush();
532             return this;
533         }
534
535         @Override
536         public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
537             return ctx.writeAndFlush(msg, promise);
538         }
539
540         @Override
541         public ChannelFuture writeAndFlush(Object msg) {
542             return ctx.writeAndFlush(msg);
543         }
544
545         @Override
546         public ChannelPipeline pipeline() {
547             return ctx.pipeline();
548         }
549
550         @Override
551         public ByteBufAllocator alloc() {
552             return ctx.alloc();
553         }
554
555         @Override
556         public ChannelPromise newPromise() {
557             return ctx.newPromise();
558         }
559
560         @Override
561         public ChannelProgressivePromise newProgressivePromise() {
562             return ctx.newProgressivePromise();
563         }
564
565         @Override
566         public ChannelFuture newSucceededFuture() {
567             return ctx.newSucceededFuture();
568         }
569
570         @Override
571         public ChannelFuture newFailedFuture(Throwable cause) {
572             return ctx.newFailedFuture(cause);
573         }
574
575         @Override
576         public ChannelPromise voidPromise() {
577             return ctx.voidPromise();
578         }
579
580         @Override
581         public <T> Attribute<T> attr(AttributeKey<T> key) {
582             return ctx.channel().attr(key);
583         }
584
585         @Override
586         public <T> boolean hasAttr(AttributeKey<T> key) {
587             return ctx.channel().hasAttr(key);
588         }
589
590         final void remove() {
591             EventExecutor executor = executor();
592             if (executor.inEventLoop()) {
593                 remove0();
594             } else {
595                 executor.execute(new Runnable() {
596                     @Override
597                     public void run() {
598                         remove0();
599                     }
600                 });
601             }
602         }
603
604         private void remove0() {
605             if (!removed) {
606                 removed = true;
607                 try {
608                     handler.handlerRemoved(this);
609                 } catch (Throwable cause) {
610                     fireExceptionCaught(new ChannelPipelineException(
611                             handler.getClass().getName() + ".handlerRemoved() has thrown an exception.", cause));
612                 }
613             }
614         }
615     }
616 }
617