1 /*
2  * Copyright 2012 The Netty Project
3  *
4  * The Netty Project licenses this file to you under the Apache License,
5  * version 2.0 (the "License"); you may not use this file except in compliance
6  * with the License. You may obtain a copy of the License at:
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13  * License for the specific language governing permissions and limitations
14  * under the License.
15  */

16 package io.netty.handler.codec;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.buffer.CompositeByteBuf;
21 import io.netty.buffer.Unpooled;
22 import io.netty.channel.ChannelConfig;
23 import io.netty.channel.ChannelHandlerContext;
24 import io.netty.channel.ChannelInboundHandlerAdapter;
25 import io.netty.channel.socket.ChannelInputShutdownEvent;
26 import io.netty.util.internal.ObjectUtil;
27 import io.netty.util.internal.StringUtil;
28
29 import java.util.List;
30
31 import static io.netty.util.internal.ObjectUtil.checkPositive;
32 import static java.lang.Integer.MAX_VALUE;
33
34 /**
35  * {@link ChannelInboundHandlerAdapter} which decodes bytes in a stream-like fashion from one {@link ByteBuf} to an
36  * other Message type.
37  *
38  * For example here is an implementation which reads all readable bytes from
39  * the input {@link ByteBuf} and create a new {@link ByteBuf}.
40  *
41  * <pre>
42  *     public class SquareDecoder extends {@link ByteToMessageDecoder} {
43  *         {@code @Override}
44  *         public void decode({@link ChannelHandlerContext} ctx, {@link ByteBuf} in, List&lt;Object&gt; out)
45  *                 throws {@link Exception} {
46  *             out.add(in.readBytes(in.readableBytes()));
47  *         }
48  *     }
49  * </pre>
50  *
51  * <h3>Frame detection</h3>
52  * <p>
53  * Generally frame detection should be handled earlier in the pipeline by adding a
54  * {@link DelimiterBasedFrameDecoder}, {@link FixedLengthFrameDecoder}, {@link LengthFieldBasedFrameDecoder},
55  * or {@link LineBasedFrameDecoder}.
56  * <p>
57  * If a custom frame decoder is required, then one needs to be careful when implementing
58  * one with {@link ByteToMessageDecoder}. Ensure there are enough bytes in the buffer for a
59  * complete frame by checking {@link ByteBuf#readableBytes()}. If there are not enough bytes
60  * for a complete frame, return without modifying the reader index to allow more bytes to arrive.
61  * <p>
62  * To check for complete frames without modifying the reader index, use methods like {@link ByteBuf#getInt(int)}.
63  * One <strong>MUST</strong> use the reader index when using methods like {@link ByteBuf#getInt(int)}.
64  * For example calling <tt>in.getInt(0)</tt> is assuming the frame starts at the beginning of the buffer, which
65  * is not always the case. Use <tt>in.getInt(in.readerIndex())</tt> instead.
66  * <h3>Pitfalls</h3>
67  * <p>
68  * Be aware that sub-classes of {@link ByteToMessageDecoder} <strong>MUST NOT</strong>
69  * annotated with {@link @Sharable}.
70  * <p>
71  * Some methods such as {@link ByteBuf#readBytes(int)} will cause a memory leak if the returned buffer
72  * is not released or added to the <tt>out</tt> {@link List}. Use derived buffers like {@link ByteBuf#readSlice(int)}
73  * to avoid leaking memory.
74  */

75 public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
76
77     /**
78      * Cumulate {@link ByteBuf}s by merge them into one {@link ByteBuf}'s, using memory copies.
79      */

80     public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
81         @Override
82         public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
83             if (!cumulation.isReadable() && in.isContiguous()) {
84                 // If cumulation is empty and input buffer is contiguous, use it directly
85                 cumulation.release();
86                 return in;
87             }
88             try {
89                 final int required = in.readableBytes();
90                 if (required > cumulation.maxWritableBytes() ||
91                         (required > cumulation.maxFastWritableBytes() && cumulation.refCnt() > 1) ||
92                         cumulation.isReadOnly()) {
93                     // Expand cumulation (by replacing it) under the following conditions:
94                     // - cumulation cannot be resized to accommodate the additional data
95                     // - cumulation can be expanded with a reallocation operation to accommodate but the buffer is
96                     //   assumed to be shared (e.g. refCnt() > 1) and the reallocation may not be safe.
97                     return expandCumulation(alloc, cumulation, in);
98                 }
99                 cumulation.writeBytes(in, in.readerIndex(), required);
100                 in.readerIndex(in.writerIndex());
101                 return cumulation;
102             } finally {
103                 // We must release in in all cases as otherwise it may produce a leak if writeBytes(...) throw
104                 // for whatever release (for example because of OutOfMemoryError)
105                 in.release();
106             }
107         }
108     };
109
110     /**
111      * Cumulate {@link ByteBuf}s by add them to a {@link CompositeByteBuf} and so do no memory copy whenever possible.
112      * Be aware that {@link CompositeByteBuf} use a more complex indexing implementation so depending on your use-case
113      * and the decoder implementation this may be slower then just use the {@link #MERGE_CUMULATOR}.
114      */

115     public static final Cumulator COMPOSITE_CUMULATOR = new Cumulator() {
116         @Override
117         public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
118             if (!cumulation.isReadable()) {
119                 cumulation.release();
120                 return in;
121             }
122             CompositeByteBuf composite = null;
123             try {
124                 if (cumulation instanceof CompositeByteBuf && cumulation.refCnt() == 1) {
125                     composite = (CompositeByteBuf) cumulation;
126                     // Writer index must equal capacity if we are going to "write"
127                     // new components to the end
128                     if (composite.writerIndex() != composite.capacity()) {
129                         composite.capacity(composite.writerIndex());
130                     }
131                 } else {
132                     composite = alloc.compositeBuffer(Integer.MAX_VALUE).addFlattenedComponents(true, cumulation);
133                 }
134                 composite.addFlattenedComponents(true, in);
135                 in = null;
136                 return composite;
137             } finally {
138                 if (in != null) {
139                     // We must release if the ownership was not transferred as otherwise it may produce a leak
140                     in.release();
141                     // Also release any new buffer allocated if we're not returning it
142                     if (composite != null && composite != cumulation) {
143                         composite.release();
144                     }
145                 }
146             }
147         }
148     };
149
150     private static final byte STATE_INIT = 0;
151     private static final byte STATE_CALLING_CHILD_DECODE = 1;
152     private static final byte STATE_HANDLER_REMOVED_PENDING = 2;
153
154     ByteBuf cumulation;
155     private Cumulator cumulator = MERGE_CUMULATOR;
156     private boolean singleDecode;
157     private boolean first;
158
159     /**
160      * This flag is used to determine if we need to call {@link ChannelHandlerContext#read()} to consume more data
161      * when {@link ChannelConfig#isAutoRead()} is {@code false}.
162      */

163     private boolean firedChannelRead;
164
165     /**
166      * A bitmask where the bits are defined as
167      * <ul>
168      *     <li>{@link #STATE_INIT}</li>
169      *     <li>{@link #STATE_CALLING_CHILD_DECODE}</li>
170      *     <li>{@link #STATE_HANDLER_REMOVED_PENDING}</li>
171      * </ul>
172      */

173     private byte decodeState = STATE_INIT;
174     private int discardAfterReads = 16;
175     private int numReads;
176
177     protected ByteToMessageDecoder() {
178         ensureNotSharable();
179     }
180
181     /**
182      * If set then only one message is decoded on each {@link #channelRead(ChannelHandlerContext, Object)}
183      * call. This may be useful if you need to do some protocol upgrade and want to make sure nothing is mixed up.
184      *
185      * Default is {@code false} as this has performance impacts.
186      */

187     public void setSingleDecode(boolean singleDecode) {
188         this.singleDecode = singleDecode;
189     }
190
191     /**
192      * If {@code true} then only one message is decoded on each
193      * {@link #channelRead(ChannelHandlerContext, Object)} call.
194      *
195      * Default is {@code false} as this has performance impacts.
196      */

197     public boolean isSingleDecode() {
198         return singleDecode;
199     }
200
201     /**
202      * Set the {@link Cumulator} to use for cumulate the received {@link ByteBuf}s.
203      */

204     public void setCumulator(Cumulator cumulator) {
205         this.cumulator = ObjectUtil.checkNotNull(cumulator, "cumulator");
206     }
207
208     /**
209      * Set the number of reads after which {@link ByteBuf#discardSomeReadBytes()} are called and so free up memory.
210      * The default is {@code 16}.
211      */

212     public void setDiscardAfterReads(int discardAfterReads) {
213         checkPositive(discardAfterReads, "discardAfterReads");
214         this.discardAfterReads = discardAfterReads;
215     }
216
217     /**
218      * Returns the actual number of readable bytes in the internal cumulative
219      * buffer of this decoder. You usually do not need to rely on this value
220      * to write a decoder. Use it only when you must use it at your own risk.
221      * This method is a shortcut to {@link #internalBuffer() internalBuffer().readableBytes()}.
222      */

223     protected int actualReadableBytes() {
224         return internalBuffer().readableBytes();
225     }
226
227     /**
228      * Returns the internal cumulative buffer of this decoder. You usually
229      * do not need to access the internal buffer directly to write a decoder.
230      * Use it only when you must use it at your own risk.
231      */

232     protected ByteBuf internalBuffer() {
233         if (cumulation != null) {
234             return cumulation;
235         } else {
236             return Unpooled.EMPTY_BUFFER;
237         }
238     }
239
240     @Override
241     public final void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
242         if (decodeState == STATE_CALLING_CHILD_DECODE) {
243             decodeState = STATE_HANDLER_REMOVED_PENDING;
244             return;
245         }
246         ByteBuf buf = cumulation;
247         if (buf != null) {
248             // Directly set this to null so we are sure we not access it in any other method here anymore.
249             cumulation = null;
250             numReads = 0;
251             int readable = buf.readableBytes();
252             if (readable > 0) {
253                 ctx.fireChannelRead(buf);
254                 ctx.fireChannelReadComplete();
255             } else {
256                 buf.release();
257             }
258         }
259         handlerRemoved0(ctx);
260     }
261
262     /**
263      * Gets called after the {@link ByteToMessageDecoder} was removed from the actual context and it doesn't handle
264      * events anymore.
265      */

266     protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception { }
267
268     @Override
269     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
270         if (msg instanceof ByteBuf) {
271             CodecOutputList out = CodecOutputList.newInstance();
272             try {
273                 first = cumulation == null;
274                 cumulation = cumulator.cumulate(ctx.alloc(),
275                         first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg);
276                 callDecode(ctx, cumulation, out);
277             } catch (DecoderException e) {
278                 throw e;
279             } catch (Exception e) {
280                 throw new DecoderException(e);
281             } finally {
282                 try {
283                     if (cumulation != null && !cumulation.isReadable()) {
284                         numReads = 0;
285                         cumulation.release();
286                         cumulation = null;
287                     } else if (++numReads >= discardAfterReads) {
288                         // We did enough reads already try to discard some bytes so we not risk to see a OOME.
289                         // See https://github.com/netty/netty/issues/4275
290                         numReads = 0;
291                         discardSomeReadBytes();
292                     }
293
294                     int size = out.size();
295                     firedChannelRead |= out.insertSinceRecycled();
296                     fireChannelRead(ctx, out, size);
297                 } finally {
298                     out.recycle();
299                 }
300             }
301         } else {
302             ctx.fireChannelRead(msg);
303         }
304     }
305
306     /**
307      * Get {@code numElements} out of the {@link List} and forward these through the pipeline.
308      */

309     static void fireChannelRead(ChannelHandlerContext ctx, List<Object> msgs, int numElements) {
310         if (msgs instanceof CodecOutputList) {
311             fireChannelRead(ctx, (CodecOutputList) msgs, numElements);
312         } else {
313             for (int i = 0; i < numElements; i++) {
314                 ctx.fireChannelRead(msgs.get(i));
315             }
316         }
317     }
318
319     /**
320      * Get {@code numElements} out of the {@link CodecOutputList} and forward these through the pipeline.
321      */

322     static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {
323         for (int i = 0; i < numElements; i ++) {
324             ctx.fireChannelRead(msgs.getUnsafe(i));
325         }
326     }
327
328     @Override
329     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
330         numReads = 0;
331         discardSomeReadBytes();
332         if (!firedChannelRead && !ctx.channel().config().isAutoRead()) {
333             ctx.read();
334         }
335         firedChannelRead = false;
336         ctx.fireChannelReadComplete();
337     }
338
339     protected final void discardSomeReadBytes() {
340         if (cumulation != null && !first && cumulation.refCnt() == 1) {
341             // discard some bytes if possible to make more room in the
342             // buffer but only if the refCnt == 1  as otherwise the user may have
343             // used slice().retain() or duplicate().retain().
344             //
345             // See:
346             // - https://github.com/netty/netty/issues/2327
347             // - https://github.com/netty/netty/issues/1764
348             cumulation.discardSomeReadBytes();
349         }
350     }
351
352     @Override
353     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
354         channelInputClosed(ctx, true);
355     }
356
357     @Override
358     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
359         if (evt instanceof ChannelInputShutdownEvent) {
360             // The decodeLast method is invoked when a channelInactive event is encountered.
361             // This method is responsible for ending requests in some situations and must be called
362             // when the input has been shutdown.
363             channelInputClosed(ctx, false);
364         }
365         super.userEventTriggered(ctx, evt);
366     }
367
368     private void channelInputClosed(ChannelHandlerContext ctx, boolean callChannelInactive) {
369         CodecOutputList out = CodecOutputList.newInstance();
370         try {
371             channelInputClosed(ctx, out);
372         } catch (DecoderException e) {
373             throw e;
374         } catch (Exception e) {
375             throw new DecoderException(e);
376         } finally {
377             try {
378                 if (cumulation != null) {
379                     cumulation.release();
380                     cumulation = null;
381                 }
382                 int size = out.size();
383                 fireChannelRead(ctx, out, size);
384                 if (size > 0) {
385                     // Something was read, call fireChannelReadComplete()
386                     ctx.fireChannelReadComplete();
387                 }
388                 if (callChannelInactive) {
389                     ctx.fireChannelInactive();
390                 }
391             } finally {
392                 // Recycle in all cases
393                 out.recycle();
394             }
395         }
396     }
397
398     /**
399      * Called when the input of the channel was closed which may be because it changed to inactive or because of
400      * {@link ChannelInputShutdownEvent}.
401      */

402     void channelInputClosed(ChannelHandlerContext ctx, List<Object> out) throws Exception {
403         if (cumulation != null) {
404             callDecode(ctx, cumulation, out);
405             decodeLast(ctx, cumulation, out);
406         } else {
407             decodeLast(ctx, Unpooled.EMPTY_BUFFER, out);
408         }
409     }
410
411     /**
412      * Called once data should be decoded from the given {@link ByteBuf}. This method will call
413      * {@link #decode(ChannelHandlerContext, ByteBuf, List)} as long as decoding should take place.
414      *
415      * @param ctx           the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
416      * @param in            the {@link ByteBuf} from which to read data
417      * @param out           the {@link List} to which decoded messages should be added
418      */

419     protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
420         try {
421             while (in.isReadable()) {
422                 int outSize = out.size();
423
424                 if (outSize > 0) {
425                     fireChannelRead(ctx, out, outSize);
426                     out.clear();
427
428                     // Check if this handler was removed before continuing with decoding.
429                     // If it was removed, it is not safe to continue to operate on the buffer.
430                     //
431                     // See:
432                     // - https://github.com/netty/netty/issues/4635
433                     if (ctx.isRemoved()) {
434                         break;
435                     }
436                     outSize = 0;
437                 }
438
439                 int oldInputLength = in.readableBytes();
440                 decodeRemovalReentryProtection(ctx, in, out);
441
442                 // Check if this handler was removed before continuing the loop.
443                 // If it was removed, it is not safe to continue to operate on the buffer.
444                 //
445                 // See https://github.com/netty/netty/issues/1664
446                 if (ctx.isRemoved()) {
447                     break;
448                 }
449
450                 if (outSize == out.size()) {
451                     if (oldInputLength == in.readableBytes()) {
452                         break;
453                     } else {
454                         continue;
455                     }
456                 }
457
458                 if (oldInputLength == in.readableBytes()) {
459                     throw new DecoderException(
460                             StringUtil.simpleClassName(getClass()) +
461                                     ".decode() did not read anything but decoded a message.");
462                 }
463
464                 if (isSingleDecode()) {
465                     break;
466                 }
467             }
468         } catch (DecoderException e) {
469             throw e;
470         } catch (Exception cause) {
471             throw new DecoderException(cause);
472         }
473     }
474
475     /**
476      * Decode the from one {@link ByteBuf} to an other. This method will be called till either the input
477      * {@link ByteBuf} has nothing to read when return from this method or till nothing was read from the input
478      * {@link ByteBuf}.
479      *
480      * @param ctx           the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
481      * @param in            the {@link ByteBuf} from which to read data
482      * @param out           the {@link List} to which decoded messages should be added
483      * @throws Exception    is thrown if an error occurs
484      */

485     protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
486
487     /**
488      * Decode the from one {@link ByteBuf} to an other. This method will be called till either the input
489      * {@link ByteBuf} has nothing to read when return from this method or till nothing was read from the input
490      * {@link ByteBuf}.
491      *
492      * @param ctx           the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
493      * @param in            the {@link ByteBuf} from which to read data
494      * @param out           the {@link List} to which decoded messages should be added
495      * @throws Exception    is thrown if an error occurs
496      */

497     final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
498             throws Exception {
499         decodeState = STATE_CALLING_CHILD_DECODE;
500         try {
501             decode(ctx, in, out);
502         } finally {
503             boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
504             decodeState = STATE_INIT;
505             if (removePending) {
506                 fireChannelRead(ctx, out, out.size());
507                 out.clear();
508                 handlerRemoved(ctx);
509             }
510         }
511     }
512
513     /**
514      * Is called one last time when the {@link ChannelHandlerContext} goes in-active. Which means the
515      * {@link #channelInactive(ChannelHandlerContext)} was triggered.
516      *
517      * By default this will just call {@link #decode(ChannelHandlerContext, ByteBuf, List)} but sub-classes may
518      * override this for some special cleanup operation.
519      */

520     protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
521         if (in.isReadable()) {
522             // Only call decode() if there is something left in the buffer to decode.
523             // See https://github.com/netty/netty/issues/4386
524             decodeRemovalReentryProtection(ctx, in, out);
525         }
526     }
527
528     static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf oldCumulation, ByteBuf in) {
529         int oldBytes = oldCumulation.readableBytes();
530         int newBytes = in.readableBytes();
531         int totalBytes = oldBytes + newBytes;
532         ByteBuf newCumulation = alloc.buffer(alloc.calculateNewCapacity(totalBytes, MAX_VALUE));
533         ByteBuf toRelease = newCumulation;
534         try {
535             // This avoids redundant checks and stack depth compared to calling writeBytes(...)
536             newCumulation.setBytes(0, oldCumulation, oldCumulation.readerIndex(), oldBytes)
537                 .setBytes(oldBytes, in, in.readerIndex(), newBytes)
538                 .writerIndex(totalBytes);
539             in.readerIndex(in.writerIndex());
540             toRelease = oldCumulation;
541             return newCumulation;
542         } finally {
543             toRelease.release();
544         }
545     }
546
547     /**
548      * Cumulate {@link ByteBuf}s.
549      */

550     public interface Cumulator {
551         /**
552          * Cumulate the given {@link ByteBuf}s and return the {@link ByteBuf} that holds the cumulated bytes.
553          * The implementation is responsible to correctly handle the life-cycle of the given {@link ByteBuf}s and so
554          * call {@link ByteBuf#release()} if a {@link ByteBuf} is fully consumed.
555          */

556         ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in);
557     }
558 }
559