1 /*
2 * Copyright 2012 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package io.netty.handler.codec;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.buffer.CompositeByteBuf;
21 import io.netty.buffer.Unpooled;
22 import io.netty.channel.ChannelConfig;
23 import io.netty.channel.ChannelHandlerContext;
24 import io.netty.channel.ChannelInboundHandlerAdapter;
25 import io.netty.channel.socket.ChannelInputShutdownEvent;
26 import io.netty.util.internal.ObjectUtil;
27 import io.netty.util.internal.StringUtil;
28
29 import java.util.List;
30
31 import static io.netty.util.internal.ObjectUtil.checkPositive;
32 import static java.lang.Integer.MAX_VALUE;
33
34 /**
35 * {@link ChannelInboundHandlerAdapter} which decodes bytes in a stream-like fashion from one {@link ByteBuf} to an
36 * other Message type.
37 *
38 * For example here is an implementation which reads all readable bytes from
39 * the input {@link ByteBuf} and create a new {@link ByteBuf}.
40 *
41 * <pre>
42 * public class SquareDecoder extends {@link ByteToMessageDecoder} {
43 * {@code @Override}
44 * public void decode({@link ChannelHandlerContext} ctx, {@link ByteBuf} in, List<Object> out)
45 * throws {@link Exception} {
46 * out.add(in.readBytes(in.readableBytes()));
47 * }
48 * }
49 * </pre>
50 *
51 * <h3>Frame detection</h3>
52 * <p>
53 * Generally frame detection should be handled earlier in the pipeline by adding a
54 * {@link DelimiterBasedFrameDecoder}, {@link FixedLengthFrameDecoder}, {@link LengthFieldBasedFrameDecoder},
55 * or {@link LineBasedFrameDecoder}.
56 * <p>
57 * If a custom frame decoder is required, then one needs to be careful when implementing
58 * one with {@link ByteToMessageDecoder}. Ensure there are enough bytes in the buffer for a
59 * complete frame by checking {@link ByteBuf#readableBytes()}. If there are not enough bytes
60 * for a complete frame, return without modifying the reader index to allow more bytes to arrive.
61 * <p>
62 * To check for complete frames without modifying the reader index, use methods like {@link ByteBuf#getInt(int)}.
63 * One <strong>MUST</strong> use the reader index when using methods like {@link ByteBuf#getInt(int)}.
64 * For example calling <tt>in.getInt(0)</tt> is assuming the frame starts at the beginning of the buffer, which
65 * is not always the case. Use <tt>in.getInt(in.readerIndex())</tt> instead.
66 * <h3>Pitfalls</h3>
67 * <p>
68 * Be aware that sub-classes of {@link ByteToMessageDecoder} <strong>MUST NOT</strong>
69 * annotated with {@link @Sharable}.
70 * <p>
71 * Some methods such as {@link ByteBuf#readBytes(int)} will cause a memory leak if the returned buffer
72 * is not released or added to the <tt>out</tt> {@link List}. Use derived buffers like {@link ByteBuf#readSlice(int)}
73 * to avoid leaking memory.
74 */
75 public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
76
77 /**
78 * Cumulate {@link ByteBuf}s by merge them into one {@link ByteBuf}'s, using memory copies.
79 */
80 public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
81 @Override
82 public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
83 if (!cumulation.isReadable() && in.isContiguous()) {
84 // If cumulation is empty and input buffer is contiguous, use it directly
85 cumulation.release();
86 return in;
87 }
88 try {
89 final int required = in.readableBytes();
90 if (required > cumulation.maxWritableBytes() ||
91 (required > cumulation.maxFastWritableBytes() && cumulation.refCnt() > 1) ||
92 cumulation.isReadOnly()) {
93 // Expand cumulation (by replacing it) under the following conditions:
94 // - cumulation cannot be resized to accommodate the additional data
95 // - cumulation can be expanded with a reallocation operation to accommodate but the buffer is
96 // assumed to be shared (e.g. refCnt() > 1) and the reallocation may not be safe.
97 return expandCumulation(alloc, cumulation, in);
98 }
99 cumulation.writeBytes(in, in.readerIndex(), required);
100 in.readerIndex(in.writerIndex());
101 return cumulation;
102 } finally {
103 // We must release in in all cases as otherwise it may produce a leak if writeBytes(...) throw
104 // for whatever release (for example because of OutOfMemoryError)
105 in.release();
106 }
107 }
108 };
109
110 /**
111 * Cumulate {@link ByteBuf}s by add them to a {@link CompositeByteBuf} and so do no memory copy whenever possible.
112 * Be aware that {@link CompositeByteBuf} use a more complex indexing implementation so depending on your use-case
113 * and the decoder implementation this may be slower then just use the {@link #MERGE_CUMULATOR}.
114 */
115 public static final Cumulator COMPOSITE_CUMULATOR = new Cumulator() {
116 @Override
117 public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
118 if (!cumulation.isReadable()) {
119 cumulation.release();
120 return in;
121 }
122 CompositeByteBuf composite = null;
123 try {
124 if (cumulation instanceof CompositeByteBuf && cumulation.refCnt() == 1) {
125 composite = (CompositeByteBuf) cumulation;
126 // Writer index must equal capacity if we are going to "write"
127 // new components to the end
128 if (composite.writerIndex() != composite.capacity()) {
129 composite.capacity(composite.writerIndex());
130 }
131 } else {
132 composite = alloc.compositeBuffer(Integer.MAX_VALUE).addFlattenedComponents(true, cumulation);
133 }
134 composite.addFlattenedComponents(true, in);
135 in = null;
136 return composite;
137 } finally {
138 if (in != null) {
139 // We must release if the ownership was not transferred as otherwise it may produce a leak
140 in.release();
141 // Also release any new buffer allocated if we're not returning it
142 if (composite != null && composite != cumulation) {
143 composite.release();
144 }
145 }
146 }
147 }
148 };
149
150 private static final byte STATE_INIT = 0;
151 private static final byte STATE_CALLING_CHILD_DECODE = 1;
152 private static final byte STATE_HANDLER_REMOVED_PENDING = 2;
153
154 ByteBuf cumulation;
155 private Cumulator cumulator = MERGE_CUMULATOR;
156 private boolean singleDecode;
157 private boolean first;
158
159 /**
160 * This flag is used to determine if we need to call {@link ChannelHandlerContext#read()} to consume more data
161 * when {@link ChannelConfig#isAutoRead()} is {@code false}.
162 */
163 private boolean firedChannelRead;
164
165 /**
166 * A bitmask where the bits are defined as
167 * <ul>
168 * <li>{@link #STATE_INIT}</li>
169 * <li>{@link #STATE_CALLING_CHILD_DECODE}</li>
170 * <li>{@link #STATE_HANDLER_REMOVED_PENDING}</li>
171 * </ul>
172 */
173 private byte decodeState = STATE_INIT;
174 private int discardAfterReads = 16;
175 private int numReads;
176
177 protected ByteToMessageDecoder() {
178 ensureNotSharable();
179 }
180
181 /**
182 * If set then only one message is decoded on each {@link #channelRead(ChannelHandlerContext, Object)}
183 * call. This may be useful if you need to do some protocol upgrade and want to make sure nothing is mixed up.
184 *
185 * Default is {@code false} as this has performance impacts.
186 */
187 public void setSingleDecode(boolean singleDecode) {
188 this.singleDecode = singleDecode;
189 }
190
191 /**
192 * If {@code true} then only one message is decoded on each
193 * {@link #channelRead(ChannelHandlerContext, Object)} call.
194 *
195 * Default is {@code false} as this has performance impacts.
196 */
197 public boolean isSingleDecode() {
198 return singleDecode;
199 }
200
201 /**
202 * Set the {@link Cumulator} to use for cumulate the received {@link ByteBuf}s.
203 */
204 public void setCumulator(Cumulator cumulator) {
205 this.cumulator = ObjectUtil.checkNotNull(cumulator, "cumulator");
206 }
207
208 /**
209 * Set the number of reads after which {@link ByteBuf#discardSomeReadBytes()} are called and so free up memory.
210 * The default is {@code 16}.
211 */
212 public void setDiscardAfterReads(int discardAfterReads) {
213 checkPositive(discardAfterReads, "discardAfterReads");
214 this.discardAfterReads = discardAfterReads;
215 }
216
217 /**
218 * Returns the actual number of readable bytes in the internal cumulative
219 * buffer of this decoder. You usually do not need to rely on this value
220 * to write a decoder. Use it only when you must use it at your own risk.
221 * This method is a shortcut to {@link #internalBuffer() internalBuffer().readableBytes()}.
222 */
223 protected int actualReadableBytes() {
224 return internalBuffer().readableBytes();
225 }
226
227 /**
228 * Returns the internal cumulative buffer of this decoder. You usually
229 * do not need to access the internal buffer directly to write a decoder.
230 * Use it only when you must use it at your own risk.
231 */
232 protected ByteBuf internalBuffer() {
233 if (cumulation != null) {
234 return cumulation;
235 } else {
236 return Unpooled.EMPTY_BUFFER;
237 }
238 }
239
240 @Override
241 public final void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
242 if (decodeState == STATE_CALLING_CHILD_DECODE) {
243 decodeState = STATE_HANDLER_REMOVED_PENDING;
244 return;
245 }
246 ByteBuf buf = cumulation;
247 if (buf != null) {
248 // Directly set this to null so we are sure we not access it in any other method here anymore.
249 cumulation = null;
250 numReads = 0;
251 int readable = buf.readableBytes();
252 if (readable > 0) {
253 ctx.fireChannelRead(buf);
254 ctx.fireChannelReadComplete();
255 } else {
256 buf.release();
257 }
258 }
259 handlerRemoved0(ctx);
260 }
261
262 /**
263 * Gets called after the {@link ByteToMessageDecoder} was removed from the actual context and it doesn't handle
264 * events anymore.
265 */
266 protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception { }
267
268 @Override
269 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
270 if (msg instanceof ByteBuf) {
271 CodecOutputList out = CodecOutputList.newInstance();
272 try {
273 first = cumulation == null;
274 cumulation = cumulator.cumulate(ctx.alloc(),
275 first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg);
276 callDecode(ctx, cumulation, out);
277 } catch (DecoderException e) {
278 throw e;
279 } catch (Exception e) {
280 throw new DecoderException(e);
281 } finally {
282 try {
283 if (cumulation != null && !cumulation.isReadable()) {
284 numReads = 0;
285 cumulation.release();
286 cumulation = null;
287 } else if (++numReads >= discardAfterReads) {
288 // We did enough reads already try to discard some bytes so we not risk to see a OOME.
289 // See https://github.com/netty/netty/issues/4275
290 numReads = 0;
291 discardSomeReadBytes();
292 }
293
294 int size = out.size();
295 firedChannelRead |= out.insertSinceRecycled();
296 fireChannelRead(ctx, out, size);
297 } finally {
298 out.recycle();
299 }
300 }
301 } else {
302 ctx.fireChannelRead(msg);
303 }
304 }
305
306 /**
307 * Get {@code numElements} out of the {@link List} and forward these through the pipeline.
308 */
309 static void fireChannelRead(ChannelHandlerContext ctx, List<Object> msgs, int numElements) {
310 if (msgs instanceof CodecOutputList) {
311 fireChannelRead(ctx, (CodecOutputList) msgs, numElements);
312 } else {
313 for (int i = 0; i < numElements; i++) {
314 ctx.fireChannelRead(msgs.get(i));
315 }
316 }
317 }
318
319 /**
320 * Get {@code numElements} out of the {@link CodecOutputList} and forward these through the pipeline.
321 */
322 static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {
323 for (int i = 0; i < numElements; i ++) {
324 ctx.fireChannelRead(msgs.getUnsafe(i));
325 }
326 }
327
328 @Override
329 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
330 numReads = 0;
331 discardSomeReadBytes();
332 if (!firedChannelRead && !ctx.channel().config().isAutoRead()) {
333 ctx.read();
334 }
335 firedChannelRead = false;
336 ctx.fireChannelReadComplete();
337 }
338
339 protected final void discardSomeReadBytes() {
340 if (cumulation != null && !first && cumulation.refCnt() == 1) {
341 // discard some bytes if possible to make more room in the
342 // buffer but only if the refCnt == 1 as otherwise the user may have
343 // used slice().retain() or duplicate().retain().
344 //
345 // See:
346 // - https://github.com/netty/netty/issues/2327
347 // - https://github.com/netty/netty/issues/1764
348 cumulation.discardSomeReadBytes();
349 }
350 }
351
352 @Override
353 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
354 channelInputClosed(ctx, true);
355 }
356
357 @Override
358 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
359 if (evt instanceof ChannelInputShutdownEvent) {
360 // The decodeLast method is invoked when a channelInactive event is encountered.
361 // This method is responsible for ending requests in some situations and must be called
362 // when the input has been shutdown.
363 channelInputClosed(ctx, false);
364 }
365 super.userEventTriggered(ctx, evt);
366 }
367
368 private void channelInputClosed(ChannelHandlerContext ctx, boolean callChannelInactive) {
369 CodecOutputList out = CodecOutputList.newInstance();
370 try {
371 channelInputClosed(ctx, out);
372 } catch (DecoderException e) {
373 throw e;
374 } catch (Exception e) {
375 throw new DecoderException(e);
376 } finally {
377 try {
378 if (cumulation != null) {
379 cumulation.release();
380 cumulation = null;
381 }
382 int size = out.size();
383 fireChannelRead(ctx, out, size);
384 if (size > 0) {
385 // Something was read, call fireChannelReadComplete()
386 ctx.fireChannelReadComplete();
387 }
388 if (callChannelInactive) {
389 ctx.fireChannelInactive();
390 }
391 } finally {
392 // Recycle in all cases
393 out.recycle();
394 }
395 }
396 }
397
398 /**
399 * Called when the input of the channel was closed which may be because it changed to inactive or because of
400 * {@link ChannelInputShutdownEvent}.
401 */
402 void channelInputClosed(ChannelHandlerContext ctx, List<Object> out) throws Exception {
403 if (cumulation != null) {
404 callDecode(ctx, cumulation, out);
405 decodeLast(ctx, cumulation, out);
406 } else {
407 decodeLast(ctx, Unpooled.EMPTY_BUFFER, out);
408 }
409 }
410
411 /**
412 * Called once data should be decoded from the given {@link ByteBuf}. This method will call
413 * {@link #decode(ChannelHandlerContext, ByteBuf, List)} as long as decoding should take place.
414 *
415 * @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
416 * @param in the {@link ByteBuf} from which to read data
417 * @param out the {@link List} to which decoded messages should be added
418 */
419 protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
420 try {
421 while (in.isReadable()) {
422 int outSize = out.size();
423
424 if (outSize > 0) {
425 fireChannelRead(ctx, out, outSize);
426 out.clear();
427
428 // Check if this handler was removed before continuing with decoding.
429 // If it was removed, it is not safe to continue to operate on the buffer.
430 //
431 // See:
432 // - https://github.com/netty/netty/issues/4635
433 if (ctx.isRemoved()) {
434 break;
435 }
436 outSize = 0;
437 }
438
439 int oldInputLength = in.readableBytes();
440 decodeRemovalReentryProtection(ctx, in, out);
441
442 // Check if this handler was removed before continuing the loop.
443 // If it was removed, it is not safe to continue to operate on the buffer.
444 //
445 // See https://github.com/netty/netty/issues/1664
446 if (ctx.isRemoved()) {
447 break;
448 }
449
450 if (outSize == out.size()) {
451 if (oldInputLength == in.readableBytes()) {
452 break;
453 } else {
454 continue;
455 }
456 }
457
458 if (oldInputLength == in.readableBytes()) {
459 throw new DecoderException(
460 StringUtil.simpleClassName(getClass()) +
461 ".decode() did not read anything but decoded a message.");
462 }
463
464 if (isSingleDecode()) {
465 break;
466 }
467 }
468 } catch (DecoderException e) {
469 throw e;
470 } catch (Exception cause) {
471 throw new DecoderException(cause);
472 }
473 }
474
475 /**
476 * Decode the from one {@link ByteBuf} to an other. This method will be called till either the input
477 * {@link ByteBuf} has nothing to read when return from this method or till nothing was read from the input
478 * {@link ByteBuf}.
479 *
480 * @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
481 * @param in the {@link ByteBuf} from which to read data
482 * @param out the {@link List} to which decoded messages should be added
483 * @throws Exception is thrown if an error occurs
484 */
485 protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
486
487 /**
488 * Decode the from one {@link ByteBuf} to an other. This method will be called till either the input
489 * {@link ByteBuf} has nothing to read when return from this method or till nothing was read from the input
490 * {@link ByteBuf}.
491 *
492 * @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
493 * @param in the {@link ByteBuf} from which to read data
494 * @param out the {@link List} to which decoded messages should be added
495 * @throws Exception is thrown if an error occurs
496 */
497 final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
498 throws Exception {
499 decodeState = STATE_CALLING_CHILD_DECODE;
500 try {
501 decode(ctx, in, out);
502 } finally {
503 boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
504 decodeState = STATE_INIT;
505 if (removePending) {
506 fireChannelRead(ctx, out, out.size());
507 out.clear();
508 handlerRemoved(ctx);
509 }
510 }
511 }
512
513 /**
514 * Is called one last time when the {@link ChannelHandlerContext} goes in-active. Which means the
515 * {@link #channelInactive(ChannelHandlerContext)} was triggered.
516 *
517 * By default this will just call {@link #decode(ChannelHandlerContext, ByteBuf, List)} but sub-classes may
518 * override this for some special cleanup operation.
519 */
520 protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
521 if (in.isReadable()) {
522 // Only call decode() if there is something left in the buffer to decode.
523 // See https://github.com/netty/netty/issues/4386
524 decodeRemovalReentryProtection(ctx, in, out);
525 }
526 }
527
528 static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf oldCumulation, ByteBuf in) {
529 int oldBytes = oldCumulation.readableBytes();
530 int newBytes = in.readableBytes();
531 int totalBytes = oldBytes + newBytes;
532 ByteBuf newCumulation = alloc.buffer(alloc.calculateNewCapacity(totalBytes, MAX_VALUE));
533 ByteBuf toRelease = newCumulation;
534 try {
535 // This avoids redundant checks and stack depth compared to calling writeBytes(...)
536 newCumulation.setBytes(0, oldCumulation, oldCumulation.readerIndex(), oldBytes)
537 .setBytes(oldBytes, in, in.readerIndex(), newBytes)
538 .writerIndex(totalBytes);
539 in.readerIndex(in.writerIndex());
540 toRelease = oldCumulation;
541 return newCumulation;
542 } finally {
543 toRelease.release();
544 }
545 }
546
547 /**
548 * Cumulate {@link ByteBuf}s.
549 */
550 public interface Cumulator {
551 /**
552 * Cumulate the given {@link ByteBuf}s and return the {@link ByteBuf} that holds the cumulated bytes.
553 * The implementation is responsible to correctly handle the life-cycle of the given {@link ByteBuf}s and so
554 * call {@link ByteBuf#release()} if a {@link ByteBuf} is fully consumed.
555 */
556 ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in);
557 }
558 }
559