1 /*
2  * Copyright 2017 The Netty Project
3  *
4  * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5  * "License"); you may not use this file except in compliance with the License. You may obtain a
6  * copy of the License at:
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software distributed under the License
11  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12  * or implied. See the License for the specific language governing permissions and limitations under
13  * the License.
14  */

15 package io.netty.channel;
16
17 import io.netty.buffer.ByteBuf;
18 import io.netty.buffer.ByteBufAllocator;
19 import io.netty.buffer.CompositeByteBuf;
20 import io.netty.util.internal.UnstableApi;
21 import io.netty.util.internal.logging.InternalLogger;
22 import io.netty.util.internal.logging.InternalLoggerFactory;
23
24 import java.util.ArrayDeque;
25
26 import static io.netty.util.ReferenceCountUtil.safeRelease;
27 import static io.netty.util.internal.ObjectUtil.checkNotNull;
28 import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
29 import static io.netty.util.internal.PlatformDependent.throwException;
30
31 @UnstableApi
32 public abstract class AbstractCoalescingBufferQueue {
33     private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractCoalescingBufferQueue.class);
34     private final ArrayDeque<Object> bufAndListenerPairs;
35     private final PendingBytesTracker tracker;
36     private int readableBytes;
37
38     /**
39      * Create a new instance.
40      *
41      * @param channel the {@link Channel} which will have the {@link Channel#isWritable()} reflect the amount of queued
42      *                buffers or {@code nullif there is no writability state updated.
43      * @param initSize the initial size of the underlying queue.
44      */

45     protected AbstractCoalescingBufferQueue(Channel channel, int initSize) {
46         bufAndListenerPairs = new ArrayDeque<Object>(initSize);
47         tracker = channel == null ? null : PendingBytesTracker.newTracker(channel);
48     }
49
50     /**
51      * Add a buffer to the front of the queue and associate a promise with it that should be completed when
52      * all the buffer's bytes have been consumed from the queue and written.
53      * @param buf to add to the head of the queue
54      * @param promise to complete when all the bytes have been consumed and written, can be void.
55      */

56     public final void addFirst(ByteBuf buf, ChannelPromise promise) {
57         addFirst(buf, toChannelFutureListener(promise));
58     }
59
60     private void addFirst(ByteBuf buf, ChannelFutureListener listener) {
61         if (listener != null) {
62             bufAndListenerPairs.addFirst(listener);
63         }
64         bufAndListenerPairs.addFirst(buf);
65         incrementReadableBytes(buf.readableBytes());
66     }
67
68     /**
69      * Add a buffer to the end of the queue.
70      */

71     public final void add(ByteBuf buf) {
72         add(buf, (ChannelFutureListener) null);
73     }
74
75     /**
76      * Add a buffer to the end of the queue and associate a promise with it that should be completed when
77      * all the buffer's bytes have been consumed from the queue and written.
78      * @param buf to add to the tail of the queue
79      * @param promise to complete when all the bytes have been consumed and written, can be void.
80      */

81     public final void add(ByteBuf buf, ChannelPromise promise) {
82         // buffers are added before promises so that we naturally 'consume' the entire buffer during removal
83         // before we complete it's promise.
84         add(buf, toChannelFutureListener(promise));
85     }
86
87     /**
88      * Add a buffer to the end of the queue and associate a listener with it that should be completed when
89      * all the buffers  bytes have been consumed from the queue and written.
90      * @param buf to add to the tail of the queue
91      * @param listener to notify when all the bytes have been consumed and written, can be {@code null}.
92      */

93     public final void add(ByteBuf buf, ChannelFutureListener listener) {
94         // buffers are added before promises so that we naturally 'consume' the entire buffer during removal
95         // before we complete it's promise.
96         bufAndListenerPairs.add(buf);
97         if (listener != null) {
98             bufAndListenerPairs.add(listener);
99         }
100         incrementReadableBytes(buf.readableBytes());
101     }
102
103     /**
104      * Remove the first {@link ByteBuf} from the queue.
105      * @param aggregatePromise used to aggregate the promises and listeners for the returned buffer.
106      * @return the first {@link ByteBuf} from the queue.
107      */

108     public final ByteBuf removeFirst(ChannelPromise aggregatePromise) {
109         Object entry = bufAndListenerPairs.poll();
110         if (entry == null) {
111             return null;
112         }
113         assert entry instanceof ByteBuf;
114         ByteBuf result = (ByteBuf) entry;
115
116         decrementReadableBytes(result.readableBytes());
117
118         entry = bufAndListenerPairs.peek();
119         if (entry instanceof ChannelFutureListener) {
120             aggregatePromise.addListener((ChannelFutureListener) entry);
121             bufAndListenerPairs.poll();
122         }
123         return result;
124     }
125
126     /**
127      * Remove a {@link ByteBuf} from the queue with the specified number of bytes. Any added buffer who's bytes are
128      * fully consumed during removal will have it's promise completed when the passed aggregate {@link ChannelPromise}
129      * completes.
130      *
131      * @param alloc The allocator used if a new {@link ByteBuf} is generated during the aggregation process.
132      * @param bytes the maximum number of readable bytes in the returned {@link ByteBuf}, if {@code bytes} is greater
133      *              than {@link #readableBytes} then a buffer of length {@link #readableBytes} is returned.
134      * @param aggregatePromise used to aggregate the promises and listeners for the constituent buffers.
135      * @return a {@link ByteBuf} composed of the enqueued buffers.
136      */

137     public final ByteBuf remove(ByteBufAllocator alloc, int bytes, ChannelPromise aggregatePromise) {
138         checkPositiveOrZero(bytes, "bytes");
139         checkNotNull(aggregatePromise, "aggregatePromise");
140
141         // Use isEmpty rather than readableBytes==0 as we may have a promise associated with an empty buffer.
142         if (bufAndListenerPairs.isEmpty()) {
143             assert readableBytes == 0;
144             return removeEmptyValue();
145         }
146         bytes = Math.min(bytes, readableBytes);
147
148         ByteBuf toReturn = null;
149         ByteBuf entryBuffer = null;
150         int originalBytes = bytes;
151         try {
152             for (;;) {
153                 Object entry = bufAndListenerPairs.poll();
154                 if (entry == null) {
155                     break;
156                 }
157                 if (entry instanceof ChannelFutureListener) {
158                     aggregatePromise.addListener((ChannelFutureListener) entry);
159                     continue;
160                 }
161                 entryBuffer = (ByteBuf) entry;
162                 if (entryBuffer.readableBytes() > bytes) {
163                     // Add the buffer back to the queue as we can't consume all of it.
164                     bufAndListenerPairs.addFirst(entryBuffer);
165                     if (bytes > 0) {
166                         // Take a slice of what we can consume and retain it.
167                         entryBuffer = entryBuffer.readRetainedSlice(bytes);
168                         toReturn = toReturn == null ? composeFirst(alloc, entryBuffer)
169                                                     : compose(alloc, toReturn, entryBuffer);
170                         bytes = 0;
171                     }
172                     break;
173                 } else {
174                     bytes -= entryBuffer.readableBytes();
175                     toReturn = toReturn == null ? composeFirst(alloc, entryBuffer)
176                                                 : compose(alloc, toReturn, entryBuffer);
177                 }
178                 entryBuffer = null;
179             }
180         } catch (Throwable cause) {
181             safeRelease(entryBuffer);
182             safeRelease(toReturn);
183             aggregatePromise.setFailure(cause);
184             throwException(cause);
185         }
186         decrementReadableBytes(originalBytes - bytes);
187         return toReturn;
188     }
189
190     /**
191      * The number of readable bytes.
192      */

193     public final int readableBytes() {
194         return readableBytes;
195     }
196
197     /**
198      * Are there pending buffers in the queue.
199      */

200     public final boolean isEmpty() {
201         return bufAndListenerPairs.isEmpty();
202     }
203
204     /**
205      *  Release all buffers in the queue and complete all listeners and promises.
206      */

207     public final void releaseAndFailAll(ChannelOutboundInvoker invoker, Throwable cause) {
208         releaseAndCompleteAll(invoker.newFailedFuture(cause));
209     }
210
211     /**
212      * Copy all pending entries in this queue into the destination queue.
213      * @param dest to copy pending buffers to.
214      */

215     public final void copyTo(AbstractCoalescingBufferQueue dest) {
216         dest.bufAndListenerPairs.addAll(bufAndListenerPairs);
217         dest.incrementReadableBytes(readableBytes);
218     }
219
220     /**
221      * Writes all remaining elements in this queue.
222      * @param ctx The context to write all elements to.
223      */

224     public final void writeAndRemoveAll(ChannelHandlerContext ctx) {
225         Throwable pending = null;
226         ByteBuf previousBuf = null;
227         for (;;) {
228             Object entry = bufAndListenerPairs.poll();
229             try {
230                 if (entry == null) {
231                     if (previousBuf != null) {
232                         decrementReadableBytes(previousBuf.readableBytes());
233                         ctx.write(previousBuf, ctx.voidPromise());
234                     }
235                     break;
236                 }
237
238                 if (entry instanceof ByteBuf) {
239                     if (previousBuf != null) {
240                         decrementReadableBytes(previousBuf.readableBytes());
241                         ctx.write(previousBuf, ctx.voidPromise());
242                     }
243                     previousBuf = (ByteBuf) entry;
244                 } else if (entry instanceof ChannelPromise) {
245                     decrementReadableBytes(previousBuf.readableBytes());
246                     ctx.write(previousBuf, (ChannelPromise) entry);
247                     previousBuf = null;
248                 } else {
249                     decrementReadableBytes(previousBuf.readableBytes());
250                     ctx.write(previousBuf).addListener((ChannelFutureListener) entry);
251                     previousBuf = null;
252                 }
253             } catch (Throwable t) {
254                 if (pending == null) {
255                     pending = t;
256                 } else {
257                     logger.info("Throwable being suppressed because Throwable {} is already pending", pending, t);
258                 }
259             }
260         }
261         if (pending != null) {
262             throw new IllegalStateException(pending);
263         }
264     }
265
266     /**
267      * Calculate the result of {@code current + next}.
268      */

269     protected abstract ByteBuf compose(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf next);
270
271     /**
272      * Compose {@code cumulation} and {@code next} into a new {@link CompositeByteBuf}.
273      */

274     protected final ByteBuf composeIntoComposite(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf next) {
275         // Create a composite buffer to accumulate this pair and potentially all the buffers
276         // in the queue. Using +2 as we have already dequeued current and next.
277         CompositeByteBuf composite = alloc.compositeBuffer(size() + 2);
278         try {
279             composite.addComponent(true, cumulation);
280             composite.addComponent(true, next);
281         } catch (Throwable cause) {
282             composite.release();
283             safeRelease(next);
284             throwException(cause);
285         }
286         return composite;
287     }
288
289     /**
290      * Compose {@code cumulation} and {@code next} into a new {@link ByteBufAllocator#ioBuffer()}.
291      * @param alloc The allocator to use to allocate the new buffer.
292      * @param cumulation The current cumulation.
293      * @param next The next buffer.
294      * @return The result of {@code cumulation + next}.
295      */

296     protected final ByteBuf copyAndCompose(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf next) {
297         ByteBuf newCumulation = alloc.ioBuffer(cumulation.readableBytes() + next.readableBytes());
298         try {
299             newCumulation.writeBytes(cumulation).writeBytes(next);
300         } catch (Throwable cause) {
301             newCumulation.release();
302             safeRelease(next);
303             throwException(cause);
304         }
305         cumulation.release();
306         next.release();
307         return newCumulation;
308     }
309
310     /**
311      * Calculate the first {@link ByteBuf} which will be used in subsequent calls to
312      * {@link #compose(ByteBufAllocator, ByteBuf, ByteBuf)}.
313      */

314     protected ByteBuf composeFirst(ByteBufAllocator allocator, ByteBuf first) {
315         return first;
316     }
317
318     /**
319      * The value to return when {@link #remove(ByteBufAllocator, int, ChannelPromise)} is called but the queue is empty.
320      * @return the {@link ByteBuf} which represents an empty queue.
321      */

322     protected abstract ByteBuf removeEmptyValue();
323
324     /**
325      * Get the number of elements in this queue added via one of the {@link #add(ByteBuf)} methods.
326      * @return the number of elements in this queue.
327      */

328     protected final int size() {
329         return bufAndListenerPairs.size();
330     }
331
332     private void releaseAndCompleteAll(ChannelFuture future) {
333         Throwable pending = null;
334         for (;;) {
335             Object entry = bufAndListenerPairs.poll();
336             if (entry == null) {
337                 break;
338             }
339             try {
340                 if (entry instanceof ByteBuf) {
341                     ByteBuf buffer = (ByteBuf) entry;
342                     decrementReadableBytes(buffer.readableBytes());
343                     safeRelease(buffer);
344                 } else {
345                     ((ChannelFutureListener) entry).operationComplete(future);
346                 }
347             } catch (Throwable t) {
348                 if (pending == null) {
349                     pending = t;
350                 } else {
351                     logger.info("Throwable being suppressed because Throwable {} is already pending", pending, t);
352                 }
353             }
354         }
355         if (pending != null) {
356             throw new IllegalStateException(pending);
357         }
358     }
359
360     private void incrementReadableBytes(int increment) {
361         int nextReadableBytes = readableBytes + increment;
362         if (nextReadableBytes < readableBytes) {
363             throw new IllegalStateException("buffer queue length overflow: " + readableBytes + " + " + increment);
364         }
365         readableBytes = nextReadableBytes;
366         if (tracker != null) {
367             tracker.incrementPendingOutboundBytes(increment);
368         }
369     }
370
371     private void decrementReadableBytes(int decrement) {
372         readableBytes -= decrement;
373         assert readableBytes >= 0;
374         if (tracker != null) {
375             tracker.decrementPendingOutboundBytes(decrement);
376         }
377     }
378
379     private static ChannelFutureListener toChannelFutureListener(ChannelPromise promise) {
380         return promise.isVoid() ? null : new DelegatingChannelPromiseNotifier(promise);
381     }
382 }
383