1
15 package io.netty.channel;
16
17 import io.netty.buffer.ByteBuf;
18 import io.netty.buffer.ByteBufAllocator;
19 import io.netty.buffer.CompositeByteBuf;
20 import io.netty.util.internal.UnstableApi;
21 import io.netty.util.internal.logging.InternalLogger;
22 import io.netty.util.internal.logging.InternalLoggerFactory;
23
24 import java.util.ArrayDeque;
25
26 import static io.netty.util.ReferenceCountUtil.safeRelease;
27 import static io.netty.util.internal.ObjectUtil.checkNotNull;
28 import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
29 import static io.netty.util.internal.PlatformDependent.throwException;
30
31 @UnstableApi
32 public abstract class AbstractCoalescingBufferQueue {
33 private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractCoalescingBufferQueue.class);
34 private final ArrayDeque<Object> bufAndListenerPairs;
35 private final PendingBytesTracker tracker;
36 private int readableBytes;
37
38
45 protected AbstractCoalescingBufferQueue(Channel channel, int initSize) {
46 bufAndListenerPairs = new ArrayDeque<Object>(initSize);
47 tracker = channel == null ? null : PendingBytesTracker.newTracker(channel);
48 }
49
50
56 public final void addFirst(ByteBuf buf, ChannelPromise promise) {
57 addFirst(buf, toChannelFutureListener(promise));
58 }
59
60 private void addFirst(ByteBuf buf, ChannelFutureListener listener) {
61 if (listener != null) {
62 bufAndListenerPairs.addFirst(listener);
63 }
64 bufAndListenerPairs.addFirst(buf);
65 incrementReadableBytes(buf.readableBytes());
66 }
67
68
71 public final void add(ByteBuf buf) {
72 add(buf, (ChannelFutureListener) null);
73 }
74
75
81 public final void add(ByteBuf buf, ChannelPromise promise) {
82
83
84 add(buf, toChannelFutureListener(promise));
85 }
86
87
93 public final void add(ByteBuf buf, ChannelFutureListener listener) {
94
95
96 bufAndListenerPairs.add(buf);
97 if (listener != null) {
98 bufAndListenerPairs.add(listener);
99 }
100 incrementReadableBytes(buf.readableBytes());
101 }
102
103
108 public final ByteBuf removeFirst(ChannelPromise aggregatePromise) {
109 Object entry = bufAndListenerPairs.poll();
110 if (entry == null) {
111 return null;
112 }
113 assert entry instanceof ByteBuf;
114 ByteBuf result = (ByteBuf) entry;
115
116 decrementReadableBytes(result.readableBytes());
117
118 entry = bufAndListenerPairs.peek();
119 if (entry instanceof ChannelFutureListener) {
120 aggregatePromise.addListener((ChannelFutureListener) entry);
121 bufAndListenerPairs.poll();
122 }
123 return result;
124 }
125
126
137 public final ByteBuf remove(ByteBufAllocator alloc, int bytes, ChannelPromise aggregatePromise) {
138 checkPositiveOrZero(bytes, "bytes");
139 checkNotNull(aggregatePromise, "aggregatePromise");
140
141
142 if (bufAndListenerPairs.isEmpty()) {
143 assert readableBytes == 0;
144 return removeEmptyValue();
145 }
146 bytes = Math.min(bytes, readableBytes);
147
148 ByteBuf toReturn = null;
149 ByteBuf entryBuffer = null;
150 int originalBytes = bytes;
151 try {
152 for (;;) {
153 Object entry = bufAndListenerPairs.poll();
154 if (entry == null) {
155 break;
156 }
157 if (entry instanceof ChannelFutureListener) {
158 aggregatePromise.addListener((ChannelFutureListener) entry);
159 continue;
160 }
161 entryBuffer = (ByteBuf) entry;
162 if (entryBuffer.readableBytes() > bytes) {
163
164 bufAndListenerPairs.addFirst(entryBuffer);
165 if (bytes > 0) {
166
167 entryBuffer = entryBuffer.readRetainedSlice(bytes);
168 toReturn = toReturn == null ? composeFirst(alloc, entryBuffer)
169 : compose(alloc, toReturn, entryBuffer);
170 bytes = 0;
171 }
172 break;
173 } else {
174 bytes -= entryBuffer.readableBytes();
175 toReturn = toReturn == null ? composeFirst(alloc, entryBuffer)
176 : compose(alloc, toReturn, entryBuffer);
177 }
178 entryBuffer = null;
179 }
180 } catch (Throwable cause) {
181 safeRelease(entryBuffer);
182 safeRelease(toReturn);
183 aggregatePromise.setFailure(cause);
184 throwException(cause);
185 }
186 decrementReadableBytes(originalBytes - bytes);
187 return toReturn;
188 }
189
190
193 public final int readableBytes() {
194 return readableBytes;
195 }
196
197
200 public final boolean isEmpty() {
201 return bufAndListenerPairs.isEmpty();
202 }
203
204
207 public final void releaseAndFailAll(ChannelOutboundInvoker invoker, Throwable cause) {
208 releaseAndCompleteAll(invoker.newFailedFuture(cause));
209 }
210
211
215 public final void copyTo(AbstractCoalescingBufferQueue dest) {
216 dest.bufAndListenerPairs.addAll(bufAndListenerPairs);
217 dest.incrementReadableBytes(readableBytes);
218 }
219
220
224 public final void writeAndRemoveAll(ChannelHandlerContext ctx) {
225 Throwable pending = null;
226 ByteBuf previousBuf = null;
227 for (;;) {
228 Object entry = bufAndListenerPairs.poll();
229 try {
230 if (entry == null) {
231 if (previousBuf != null) {
232 decrementReadableBytes(previousBuf.readableBytes());
233 ctx.write(previousBuf, ctx.voidPromise());
234 }
235 break;
236 }
237
238 if (entry instanceof ByteBuf) {
239 if (previousBuf != null) {
240 decrementReadableBytes(previousBuf.readableBytes());
241 ctx.write(previousBuf, ctx.voidPromise());
242 }
243 previousBuf = (ByteBuf) entry;
244 } else if (entry instanceof ChannelPromise) {
245 decrementReadableBytes(previousBuf.readableBytes());
246 ctx.write(previousBuf, (ChannelPromise) entry);
247 previousBuf = null;
248 } else {
249 decrementReadableBytes(previousBuf.readableBytes());
250 ctx.write(previousBuf).addListener((ChannelFutureListener) entry);
251 previousBuf = null;
252 }
253 } catch (Throwable t) {
254 if (pending == null) {
255 pending = t;
256 } else {
257 logger.info("Throwable being suppressed because Throwable {} is already pending", pending, t);
258 }
259 }
260 }
261 if (pending != null) {
262 throw new IllegalStateException(pending);
263 }
264 }
265
266
269 protected abstract ByteBuf compose(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf next);
270
271
274 protected final ByteBuf composeIntoComposite(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf next) {
275
276
277 CompositeByteBuf composite = alloc.compositeBuffer(size() + 2);
278 try {
279 composite.addComponent(true, cumulation);
280 composite.addComponent(true, next);
281 } catch (Throwable cause) {
282 composite.release();
283 safeRelease(next);
284 throwException(cause);
285 }
286 return composite;
287 }
288
289
296 protected final ByteBuf copyAndCompose(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf next) {
297 ByteBuf newCumulation = alloc.ioBuffer(cumulation.readableBytes() + next.readableBytes());
298 try {
299 newCumulation.writeBytes(cumulation).writeBytes(next);
300 } catch (Throwable cause) {
301 newCumulation.release();
302 safeRelease(next);
303 throwException(cause);
304 }
305 cumulation.release();
306 next.release();
307 return newCumulation;
308 }
309
310
314 protected ByteBuf composeFirst(ByteBufAllocator allocator, ByteBuf first) {
315 return first;
316 }
317
318
322 protected abstract ByteBuf removeEmptyValue();
323
324
328 protected final int size() {
329 return bufAndListenerPairs.size();
330 }
331
332 private void releaseAndCompleteAll(ChannelFuture future) {
333 Throwable pending = null;
334 for (;;) {
335 Object entry = bufAndListenerPairs.poll();
336 if (entry == null) {
337 break;
338 }
339 try {
340 if (entry instanceof ByteBuf) {
341 ByteBuf buffer = (ByteBuf) entry;
342 decrementReadableBytes(buffer.readableBytes());
343 safeRelease(buffer);
344 } else {
345 ((ChannelFutureListener) entry).operationComplete(future);
346 }
347 } catch (Throwable t) {
348 if (pending == null) {
349 pending = t;
350 } else {
351 logger.info("Throwable being suppressed because Throwable {} is already pending", pending, t);
352 }
353 }
354 }
355 if (pending != null) {
356 throw new IllegalStateException(pending);
357 }
358 }
359
360 private void incrementReadableBytes(int increment) {
361 int nextReadableBytes = readableBytes + increment;
362 if (nextReadableBytes < readableBytes) {
363 throw new IllegalStateException("buffer queue length overflow: " + readableBytes + " + " + increment);
364 }
365 readableBytes = nextReadableBytes;
366 if (tracker != null) {
367 tracker.incrementPendingOutboundBytes(increment);
368 }
369 }
370
371 private void decrementReadableBytes(int decrement) {
372 readableBytes -= decrement;
373 assert readableBytes >= 0;
374 if (tracker != null) {
375 tracker.decrementPendingOutboundBytes(decrement);
376 }
377 }
378
379 private static ChannelFutureListener toChannelFutureListener(ChannelPromise promise) {
380 return promise.isVoid() ? null : new DelegatingChannelPromiseNotifier(promise);
381 }
382 }
383