1 /*
2 * Copyright 2013 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package io.netty.channel;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufHolder;
20 import io.netty.buffer.Unpooled;
21 import io.netty.channel.socket.nio.NioSocketChannel;
22 import io.netty.util.ReferenceCountUtil;
23 import io.netty.util.concurrent.FastThreadLocal;
24 import io.netty.util.internal.InternalThreadLocalMap;
25 import io.netty.util.internal.ObjectPool;
26 import io.netty.util.internal.ObjectPool.Handle;
27 import io.netty.util.internal.ObjectPool.ObjectCreator;
28 import io.netty.util.internal.ObjectUtil;
29 import io.netty.util.internal.PromiseNotificationUtil;
30 import io.netty.util.internal.SystemPropertyUtil;
31 import io.netty.util.internal.logging.InternalLogger;
32 import io.netty.util.internal.logging.InternalLoggerFactory;
33
34 import java.nio.ByteBuffer;
35 import java.nio.channels.ClosedChannelException;
36 import java.util.Arrays;
37 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
38 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
39
40 import static java.lang.Math.min;
41
42 /**
43 * (Transport implementors only) an internal data structure used by {@link AbstractChannel} to store its pending
44 * outbound write requests.
45 * <p>
46 * All methods must be called by a transport implementation from an I/O thread, except the following ones:
47 * <ul>
48 * <li>{@link #size()} and {@link #isEmpty()}</li>
49 * <li>{@link #isWritable()}</li>
50 * <li>{@link #getUserDefinedWritability(int)} and {@link #setUserDefinedWritability(int, boolean)}</li>
51 * </ul>
52 * </p>
53 */
54 public final class ChannelOutboundBuffer {
55 // Assuming a 64-bit JVM:
56 // - 16 bytes object header
57 // - 6 reference fields
58 // - 2 long fields
59 // - 2 int fields
60 // - 1 boolean field
61 // - padding
62 static final int CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD =
63 SystemPropertyUtil.getInt("io.netty.transport.outboundBufferEntrySizeOverhead", 96);
64
65 private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelOutboundBuffer.class);
66
67 private static final FastThreadLocal<ByteBuffer[]> NIO_BUFFERS = new FastThreadLocal<ByteBuffer[]>() {
68 @Override
69 protected ByteBuffer[] initialValue() throws Exception {
70 return new ByteBuffer[1024];
71 }
72 };
73
74 private final Channel channel;
75
76 // Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry)
77 //
78 // The Entry that is the first in the linked-list structure that was flushed
79 private Entry flushedEntry;
80 // The Entry which is the first unflushed in the linked-list structure
81 private Entry unflushedEntry;
82 // The Entry which represents the tail of the buffer
83 private Entry tailEntry;
84 // The number of flushed entries that are not written yet
85 private int flushed;
86
87 private int nioBufferCount;
88 private long nioBufferSize;
89
90 private boolean inFail;
91
92 private static final AtomicLongFieldUpdater<ChannelOutboundBuffer> TOTAL_PENDING_SIZE_UPDATER =
93 AtomicLongFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "totalPendingSize");
94
95 @SuppressWarnings("UnusedDeclaration")
96 private volatile long totalPendingSize;
97
98 private static final AtomicIntegerFieldUpdater<ChannelOutboundBuffer> UNWRITABLE_UPDATER =
99 AtomicIntegerFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "unwritable");
100
101 @SuppressWarnings("UnusedDeclaration")
102 private volatile int unwritable;
103
104 private volatile Runnable fireChannelWritabilityChangedTask;
105
106 ChannelOutboundBuffer(AbstractChannel channel) {
107 this.channel = channel;
108 }
109
110 /**
111 * Add given message to this {@link ChannelOutboundBuffer}. The given {@link ChannelPromise} will be notified once
112 * the message was written.
113 */
114 public void addMessage(Object msg, int size, ChannelPromise promise) {
115 Entry entry = Entry.newInstance(msg, size, total(msg), promise);
116 if (tailEntry == null) {
117 flushedEntry = null;
118 } else {
119 Entry tail = tailEntry;
120 tail.next = entry;
121 }
122 tailEntry = entry;
123 if (unflushedEntry == null) {
124 unflushedEntry = entry;
125 }
126
127 // increment pending bytes after adding message to the unflushed arrays.
128 // See https://github.com/netty/netty/issues/1619
129 incrementPendingOutboundBytes(entry.pendingSize, false);
130 }
131
132 /**
133 * Add a flush to this {@link ChannelOutboundBuffer}. This means all previous added messages are marked as flushed
134 * and so you will be able to handle them.
135 */
136 public void addFlush() {
137 // There is no need to process all entries if there was already a flush before and no new messages
138 // where added in the meantime.
139 //
140 // See https://github.com/netty/netty/issues/2577
141 Entry entry = unflushedEntry;
142 if (entry != null) {
143 if (flushedEntry == null) {
144 // there is no flushedEntry yet, so start with the entry
145 flushedEntry = entry;
146 }
147 do {
148 flushed ++;
149 if (!entry.promise.setUncancellable()) {
150 // Was cancelled so make sure we free up memory and notify about the freed bytes
151 int pending = entry.cancel();
152 decrementPendingOutboundBytes(pending, false, true);
153 }
154 entry = entry.next;
155 } while (entry != null);
156
157 // All flushed so reset unflushedEntry
158 unflushedEntry = null;
159 }
160 }
161
162 /**
163 * Increment the pending bytes which will be written at some point.
164 * This method is thread-safe!
165 */
166 void incrementPendingOutboundBytes(long size) {
167 incrementPendingOutboundBytes(size, true);
168 }
169
170 private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
171 if (size == 0) {
172 return;
173 }
174
175 long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
176 if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
177 setUnwritable(invokeLater);
178 }
179 }
180
181 /**
182 * Decrement the pending bytes which will be written at some point.
183 * This method is thread-safe!
184 */
185 void decrementPendingOutboundBytes(long size) {
186 decrementPendingOutboundBytes(size, true, true);
187 }
188
189 private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
190 if (size == 0) {
191 return;
192 }
193
194 long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
195 if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {
196 setWritable(invokeLater);
197 }
198 }
199
200 private static long total(Object msg) {
201 if (msg instanceof ByteBuf) {
202 return ((ByteBuf) msg).readableBytes();
203 }
204 if (msg instanceof FileRegion) {
205 return ((FileRegion) msg).count();
206 }
207 if (msg instanceof ByteBufHolder) {
208 return ((ByteBufHolder) msg).content().readableBytes();
209 }
210 return -1;
211 }
212
213 /**
214 * Return the current message to write or {@code null} if nothing was flushed before and so is ready to be written.
215 */
216 public Object current() {
217 Entry entry = flushedEntry;
218 if (entry == null) {
219 return null;
220 }
221
222 return entry.msg;
223 }
224
225 /**
226 * Return the current message flush progress.
227 * @return {@code 0} if nothing was flushed before for the current message or there is no current message
228 */
229 public long currentProgress() {
230 Entry entry = flushedEntry;
231 if (entry == null) {
232 return 0;
233 }
234 return entry.progress;
235 }
236
237 /**
238 * Notify the {@link ChannelPromise} of the current message about writing progress.
239 */
240 public void progress(long amount) {
241 Entry e = flushedEntry;
242 assert e != null;
243 ChannelPromise p = e.promise;
244 long progress = e.progress + amount;
245 e.progress = progress;
246 if (p instanceof ChannelProgressivePromise) {
247 ((ChannelProgressivePromise) p).tryProgress(progress, e.total);
248 }
249 }
250
251 /**
252 * Will remove the current message, mark its {@link ChannelPromise} as success and return {@code true}. If no
253 * flushed message exists at the time this method is called it will return {@code false} to signal that no more
254 * messages are ready to be handled.
255 */
256 public boolean remove() {
257 Entry e = flushedEntry;
258 if (e == null) {
259 clearNioBuffers();
260 return false;
261 }
262 Object msg = e.msg;
263
264 ChannelPromise promise = e.promise;
265 int size = e.pendingSize;
266
267 removeEntry(e);
268
269 if (!e.cancelled) {
270 // only release message, notify and decrement if it was not canceled before.
271 ReferenceCountUtil.safeRelease(msg);
272 safeSuccess(promise);
273 decrementPendingOutboundBytes(size, false, true);
274 }
275
276 // recycle the entry
277 e.recycle();
278
279 return true;
280 }
281
282 /**
283 * Will remove the current message, mark its {@link ChannelPromise} as failure using the given {@link Throwable}
284 * and return {@code true}. If no flushed message exists at the time this method is called it will return
285 * {@code false} to signal that no more messages are ready to be handled.
286 */
287 public boolean remove(Throwable cause) {
288 return remove0(cause, true);
289 }
290
291 private boolean remove0(Throwable cause, boolean notifyWritability) {
292 Entry e = flushedEntry;
293 if (e == null) {
294 clearNioBuffers();
295 return false;
296 }
297 Object msg = e.msg;
298
299 ChannelPromise promise = e.promise;
300 int size = e.pendingSize;
301
302 removeEntry(e);
303
304 if (!e.cancelled) {
305 // only release message, fail and decrement if it was not canceled before.
306 ReferenceCountUtil.safeRelease(msg);
307
308 safeFail(promise, cause);
309 decrementPendingOutboundBytes(size, false, notifyWritability);
310 }
311
312 // recycle the entry
313 e.recycle();
314
315 return true;
316 }
317
318 private void removeEntry(Entry e) {
319 if (-- flushed == 0) {
320 // processed everything
321 flushedEntry = null;
322 if (e == tailEntry) {
323 tailEntry = null;
324 unflushedEntry = null;
325 }
326 } else {
327 flushedEntry = e.next;
328 }
329 }
330
331 /**
332 * Removes the fully written entries and update the reader index of the partially written entry.
333 * This operation assumes all messages in this buffer is {@link ByteBuf}.
334 */
335 public void removeBytes(long writtenBytes) {
336 for (;;) {
337 Object msg = current();
338 if (!(msg instanceof ByteBuf)) {
339 assert writtenBytes == 0;
340 break;
341 }
342
343 final ByteBuf buf = (ByteBuf) msg;
344 final int readerIndex = buf.readerIndex();
345 final int readableBytes = buf.writerIndex() - readerIndex;
346
347 if (readableBytes <= writtenBytes) {
348 if (writtenBytes != 0) {
349 progress(readableBytes);
350 writtenBytes -= readableBytes;
351 }
352 remove();
353 } else { // readableBytes > writtenBytes
354 if (writtenBytes != 0) {
355 buf.readerIndex(readerIndex + (int) writtenBytes);
356 progress(writtenBytes);
357 }
358 break;
359 }
360 }
361 clearNioBuffers();
362 }
363
364 // Clear all ByteBuffer from the array so these can be GC'ed.
365 // See https://github.com/netty/netty/issues/3837
366 private void clearNioBuffers() {
367 int count = nioBufferCount;
368 if (count > 0) {
369 nioBufferCount = 0;
370 Arrays.fill(NIO_BUFFERS.get(), 0, count, null);
371 }
372 }
373
374 /**
375 * Returns an array of direct NIO buffers if the currently pending messages are made of {@link ByteBuf} only.
376 * {@link #nioBufferCount()} and {@link #nioBufferSize()} will return the number of NIO buffers in the returned
377 * array and the total number of readable bytes of the NIO buffers respectively.
378 * <p>
379 * Note that the returned array is reused and thus should not escape
380 * {@link AbstractChannel#doWrite(ChannelOutboundBuffer)}.
381 * Refer to {@link NioSocketChannel#doWrite(ChannelOutboundBuffer)} for an example.
382 * </p>
383 */
384 public ByteBuffer[] nioBuffers() {
385 return nioBuffers(Integer.MAX_VALUE, Integer.MAX_VALUE);
386 }
387
388 /**
389 * Returns an array of direct NIO buffers if the currently pending messages are made of {@link ByteBuf} only.
390 * {@link #nioBufferCount()} and {@link #nioBufferSize()} will return the number of NIO buffers in the returned
391 * array and the total number of readable bytes of the NIO buffers respectively.
392 * <p>
393 * Note that the returned array is reused and thus should not escape
394 * {@link AbstractChannel#doWrite(ChannelOutboundBuffer)}.
395 * Refer to {@link NioSocketChannel#doWrite(ChannelOutboundBuffer)} for an example.
396 * </p>
397 * @param maxCount The maximum amount of buffers that will be added to the return value.
398 * @param maxBytes A hint toward the maximum number of bytes to include as part of the return value. Note that this
399 * value maybe exceeded because we make a best effort to include at least 1 {@link ByteBuffer}
400 * in the return value to ensure write progress is made.
401 */
402 public ByteBuffer[] nioBuffers(int maxCount, long maxBytes) {
403 assert maxCount > 0;
404 assert maxBytes > 0;
405 long nioBufferSize = 0;
406 int nioBufferCount = 0;
407 final InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
408 ByteBuffer[] nioBuffers = NIO_BUFFERS.get(threadLocalMap);
409 Entry entry = flushedEntry;
410 while (isFlushedEntry(entry) && entry.msg instanceof ByteBuf) {
411 if (!entry.cancelled) {
412 ByteBuf buf = (ByteBuf) entry.msg;
413 final int readerIndex = buf.readerIndex();
414 final int readableBytes = buf.writerIndex() - readerIndex;
415
416 if (readableBytes > 0) {
417 if (maxBytes - readableBytes < nioBufferSize && nioBufferCount != 0) {
418 // If the nioBufferSize + readableBytes will overflow maxBytes, and there is at least one entry
419 // we stop populate the ByteBuffer array. This is done for 2 reasons:
420 // 1. bsd/osx don't allow to write more bytes then Integer.MAX_VALUE with one writev(...) call
421 // and so will return 'EINVAL', which will raise an IOException. On Linux it may work depending
422 // on the architecture and kernel but to be safe we also enforce the limit here.
423 // 2. There is no sense in putting more data in the array than is likely to be accepted by the
424 // OS.
425 //
426 // See also:
427 // - https://www.freebsd.org/cgi/man.cgi?query=write&sektion=2
428 // - http://linux.die.net/man/2/writev
429 break;
430 }
431 nioBufferSize += readableBytes;
432 int count = entry.count;
433 if (count == -1) {
434 //noinspection ConstantValueVariableUse
435 entry.count = count = buf.nioBufferCount();
436 }
437 int neededSpace = min(maxCount, nioBufferCount + count);
438 if (neededSpace > nioBuffers.length) {
439 nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount);
440 NIO_BUFFERS.set(threadLocalMap, nioBuffers);
441 }
442 if (count == 1) {
443 ByteBuffer nioBuf = entry.buf;
444 if (nioBuf == null) {
445 // cache ByteBuffer as it may need to create a new ByteBuffer instance if its a
446 // derived buffer
447 entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes);
448 }
449 nioBuffers[nioBufferCount++] = nioBuf;
450 } else {
451 // The code exists in an extra method to ensure the method is not too big to inline as this
452 // branch is not very likely to get hit very frequently.
453 nioBufferCount = nioBuffers(entry, buf, nioBuffers, nioBufferCount, maxCount);
454 }
455 if (nioBufferCount >= maxCount) {
456 break;
457 }
458 }
459 }
460 entry = entry.next;
461 }
462 this.nioBufferCount = nioBufferCount;
463 this.nioBufferSize = nioBufferSize;
464
465 return nioBuffers;
466 }
467
468 private static int nioBuffers(Entry entry, ByteBuf buf, ByteBuffer[] nioBuffers, int nioBufferCount, int maxCount) {
469 ByteBuffer[] nioBufs = entry.bufs;
470 if (nioBufs == null) {
471 // cached ByteBuffers as they may be expensive to create in terms
472 // of Object allocation
473 entry.bufs = nioBufs = buf.nioBuffers();
474 }
475 for (int i = 0; i < nioBufs.length && nioBufferCount < maxCount; ++i) {
476 ByteBuffer nioBuf = nioBufs[i];
477 if (nioBuf == null) {
478 break;
479 } else if (!nioBuf.hasRemaining()) {
480 continue;
481 }
482 nioBuffers[nioBufferCount++] = nioBuf;
483 }
484 return nioBufferCount;
485 }
486
487 private static ByteBuffer[] expandNioBufferArray(ByteBuffer[] array, int neededSpace, int size) {
488 int newCapacity = array.length;
489 do {
490 // double capacity until it is big enough
491 // See https://github.com/netty/netty/issues/1890
492 newCapacity <<= 1;
493
494 if (newCapacity < 0) {
495 throw new IllegalStateException();
496 }
497
498 } while (neededSpace > newCapacity);
499
500 ByteBuffer[] newArray = new ByteBuffer[newCapacity];
501 System.arraycopy(array, 0, newArray, 0, size);
502
503 return newArray;
504 }
505
506 /**
507 * Returns the number of {@link ByteBuffer} that can be written out of the {@link ByteBuffer} array that was
508 * obtained via {@link #nioBuffers()}. This method <strong>MUST</strong> be called after {@link #nioBuffers()}
509 * was called.
510 */
511 public int nioBufferCount() {
512 return nioBufferCount;
513 }
514
515 /**
516 * Returns the number of bytes that can be written out of the {@link ByteBuffer} array that was
517 * obtained via {@link #nioBuffers()}. This method <strong>MUST</strong> be called after {@link #nioBuffers()}
518 * was called.
519 */
520 public long nioBufferSize() {
521 return nioBufferSize;
522 }
523
524 /**
525 * Returns {@code true} if and only if {@linkplain #totalPendingWriteBytes() the total number of pending bytes} did
526 * not exceed the write watermark of the {@link Channel} and
527 * no {@linkplain #setUserDefinedWritability(int, boolean) user-defined writability flag} has been set to
528 * {@code false}.
529 */
530 public boolean isWritable() {
531 return unwritable == 0;
532 }
533
534 /**
535 * Returns {@code true} if and only if the user-defined writability flag at the specified index is set to
536 * {@code true}.
537 */
538 public boolean getUserDefinedWritability(int index) {
539 return (unwritable & writabilityMask(index)) == 0;
540 }
541
542 /**
543 * Sets a user-defined writability flag at the specified index.
544 */
545 public void setUserDefinedWritability(int index, boolean writable) {
546 if (writable) {
547 setUserDefinedWritability(index);
548 } else {
549 clearUserDefinedWritability(index);
550 }
551 }
552
553 private void setUserDefinedWritability(int index) {
554 final int mask = ~writabilityMask(index);
555 for (;;) {
556 final int oldValue = unwritable;
557 final int newValue = oldValue & mask;
558 if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
559 if (oldValue != 0 && newValue == 0) {
560 fireChannelWritabilityChanged(true);
561 }
562 break;
563 }
564 }
565 }
566
567 private void clearUserDefinedWritability(int index) {
568 final int mask = writabilityMask(index);
569 for (;;) {
570 final int oldValue = unwritable;
571 final int newValue = oldValue | mask;
572 if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
573 if (oldValue == 0 && newValue != 0) {
574 fireChannelWritabilityChanged(true);
575 }
576 break;
577 }
578 }
579 }
580
581 private static int writabilityMask(int index) {
582 if (index < 1 || index > 31) {
583 throw new IllegalArgumentException("index: " + index + " (expected: 1~31)");
584 }
585 return 1 << index;
586 }
587
588 private void setWritable(boolean invokeLater) {
589 for (;;) {
590 final int oldValue = unwritable;
591 final int newValue = oldValue & ~1;
592 if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
593 if (oldValue != 0 && newValue == 0) {
594 fireChannelWritabilityChanged(invokeLater);
595 }
596 break;
597 }
598 }
599 }
600
601 private void setUnwritable(boolean invokeLater) {
602 for (;;) {
603 final int oldValue = unwritable;
604 final int newValue = oldValue | 1;
605 if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
606 if (oldValue == 0 && newValue != 0) {
607 fireChannelWritabilityChanged(invokeLater);
608 }
609 break;
610 }
611 }
612 }
613
614 private void fireChannelWritabilityChanged(boolean invokeLater) {
615 final ChannelPipeline pipeline = channel.pipeline();
616 if (invokeLater) {
617 Runnable task = fireChannelWritabilityChangedTask;
618 if (task == null) {
619 fireChannelWritabilityChangedTask = task = new Runnable() {
620 @Override
621 public void run() {
622 pipeline.fireChannelWritabilityChanged();
623 }
624 };
625 }
626 channel.eventLoop().execute(task);
627 } else {
628 pipeline.fireChannelWritabilityChanged();
629 }
630 }
631
632 /**
633 * Returns the number of flushed messages in this {@link ChannelOutboundBuffer}.
634 */
635 public int size() {
636 return flushed;
637 }
638
639 /**
640 * Returns {@code true} if there are flushed messages in this {@link ChannelOutboundBuffer} or {@code false}
641 * otherwise.
642 */
643 public boolean isEmpty() {
644 return flushed == 0;
645 }
646
647 void failFlushed(Throwable cause, boolean notify) {
648 // Make sure that this method does not reenter. A listener added to the current promise can be notified by the
649 // current thread in the tryFailure() call of the loop below, and the listener can trigger another fail() call
650 // indirectly (usually by closing the channel.)
651 //
652 // See https://github.com/netty/netty/issues/1501
653 if (inFail) {
654 return;
655 }
656
657 try {
658 inFail = true;
659 for (;;) {
660 if (!remove0(cause, notify)) {
661 break;
662 }
663 }
664 } finally {
665 inFail = false;
666 }
667 }
668
669 void close(final Throwable cause, final boolean allowChannelOpen) {
670 if (inFail) {
671 channel.eventLoop().execute(new Runnable() {
672 @Override
673 public void run() {
674 close(cause, allowChannelOpen);
675 }
676 });
677 return;
678 }
679
680 inFail = true;
681
682 if (!allowChannelOpen && channel.isOpen()) {
683 throw new IllegalStateException("close() must be invoked after the channel is closed.");
684 }
685
686 if (!isEmpty()) {
687 throw new IllegalStateException("close() must be invoked after all flushed writes are handled.");
688 }
689
690 // Release all unflushed messages.
691 try {
692 Entry e = unflushedEntry;
693 while (e != null) {
694 // Just decrease; do not trigger any events via decrementPendingOutboundBytes()
695 int size = e.pendingSize;
696 TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
697
698 if (!e.cancelled) {
699 ReferenceCountUtil.safeRelease(e.msg);
700 safeFail(e.promise, cause);
701 }
702 e = e.recycleAndGetNext();
703 }
704 } finally {
705 inFail = false;
706 }
707 clearNioBuffers();
708 }
709
710 void close(ClosedChannelException cause) {
711 close(cause, false);
712 }
713
714 private static void safeSuccess(ChannelPromise promise) {
715 // Only log if the given promise is not of type VoidChannelPromise as trySuccess(...) is expected to return
716 // false.
717 PromiseNotificationUtil.trySuccess(promise, null, promise instanceof VoidChannelPromise ? null : logger);
718 }
719
720 private static void safeFail(ChannelPromise promise, Throwable cause) {
721 // Only log if the given promise is not of type VoidChannelPromise as tryFailure(...) is expected to return
722 // false.
723 PromiseNotificationUtil.tryFailure(promise, cause, promise instanceof VoidChannelPromise ? null : logger);
724 }
725
726 @Deprecated
727 public void recycle() {
728 // NOOP
729 }
730
731 public long totalPendingWriteBytes() {
732 return totalPendingSize;
733 }
734
735 /**
736 * Get how many bytes can be written until {@link #isWritable()} returns {@code false}.
737 * This quantity will always be non-negative. If {@link #isWritable()} is {@code false} then 0.
738 */
739 public long bytesBeforeUnwritable() {
740 long bytes = channel.config().getWriteBufferHighWaterMark() - totalPendingSize;
741 // If bytes is negative we know we are not writable, but if bytes is non-negative we have to check writability.
742 // Note that totalPendingSize and isWritable() use different volatile variables that are not synchronized
743 // together. totalPendingSize will be updated before isWritable().
744 if (bytes > 0) {
745 return isWritable() ? bytes : 0;
746 }
747 return 0;
748 }
749
750 /**
751 * Get how many bytes must be drained from the underlying buffer until {@link #isWritable()} returns {@code true}.
752 * This quantity will always be non-negative. If {@link #isWritable()} is {@code true} then 0.
753 */
754 public long bytesBeforeWritable() {
755 long bytes = totalPendingSize - channel.config().getWriteBufferLowWaterMark();
756 // If bytes is negative we know we are writable, but if bytes is non-negative we have to check writability.
757 // Note that totalPendingSize and isWritable() use different volatile variables that are not synchronized
758 // together. totalPendingSize will be updated before isWritable().
759 if (bytes > 0) {
760 return isWritable() ? 0 : bytes;
761 }
762 return 0;
763 }
764
765 /**
766 * Call {@link MessageProcessor#processMessage(Object)} for each flushed message
767 * in this {@link ChannelOutboundBuffer} until {@link MessageProcessor#processMessage(Object)}
768 * returns {@code false} or there are no more flushed messages to process.
769 */
770 public void forEachFlushedMessage(MessageProcessor processor) throws Exception {
771 ObjectUtil.checkNotNull(processor, "processor");
772
773 Entry entry = flushedEntry;
774 if (entry == null) {
775 return;
776 }
777
778 do {
779 if (!entry.cancelled) {
780 if (!processor.processMessage(entry.msg)) {
781 return;
782 }
783 }
784 entry = entry.next;
785 } while (isFlushedEntry(entry));
786 }
787
788 private boolean isFlushedEntry(Entry e) {
789 return e != null && e != unflushedEntry;
790 }
791
792 public interface MessageProcessor {
793 /**
794 * Will be called for each flushed message until it either there are no more flushed messages or this
795 * method returns {@code false}.
796 */
797 boolean processMessage(Object msg) throws Exception;
798 }
799
800 static final class Entry {
801 private static final ObjectPool<Entry> RECYCLER = ObjectPool.newPool(new ObjectCreator<Entry>() {
802 @Override
803 public Entry newObject(Handle<Entry> handle) {
804 return new Entry(handle);
805 }
806 });
807
808 private final Handle<Entry> handle;
809 Entry next;
810 Object msg;
811 ByteBuffer[] bufs;
812 ByteBuffer buf;
813 ChannelPromise promise;
814 long progress;
815 long total;
816 int pendingSize;
817 int count = -1;
818 boolean cancelled;
819
820 private Entry(Handle<Entry> handle) {
821 this.handle = handle;
822 }
823
824 static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) {
825 Entry entry = RECYCLER.get();
826 entry.msg = msg;
827 entry.pendingSize = size + CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD;
828 entry.total = total;
829 entry.promise = promise;
830 return entry;
831 }
832
833 int cancel() {
834 if (!cancelled) {
835 cancelled = true;
836 int pSize = pendingSize;
837
838 // release message and replace with an empty buffer
839 ReferenceCountUtil.safeRelease(msg);
840 msg = Unpooled.EMPTY_BUFFER;
841
842 pendingSize = 0;
843 total = 0;
844 progress = 0;
845 bufs = null;
846 buf = null;
847 return pSize;
848 }
849 return 0;
850 }
851
852 void recycle() {
853 next = null;
854 bufs = null;
855 buf = null;
856 msg = null;
857 promise = null;
858 progress = 0;
859 total = 0;
860 pendingSize = 0;
861 count = -1;
862 cancelled = false;
863 handle.recycle(this);
864 }
865
866 Entry recycleAndGetNext() {
867 Entry next = this.next;
868 recycle();
869 return next;
870 }
871 }
872 }
873