1 /*
2  * Copyright 2013 The Netty Project
3  *
4  * The Netty Project licenses this file to you under the Apache License,
5  * version 2.0 (the "License"); you may not use this file except in compliance
6  * with the License. You may obtain a copy of the License at:
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13  * License for the specific language governing permissions and limitations
14  * under the License.
15  */

16 package io.netty.channel;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufHolder;
20 import io.netty.buffer.Unpooled;
21 import io.netty.channel.socket.nio.NioSocketChannel;
22 import io.netty.util.ReferenceCountUtil;
23 import io.netty.util.concurrent.FastThreadLocal;
24 import io.netty.util.internal.InternalThreadLocalMap;
25 import io.netty.util.internal.ObjectPool;
26 import io.netty.util.internal.ObjectPool.Handle;
27 import io.netty.util.internal.ObjectPool.ObjectCreator;
28 import io.netty.util.internal.ObjectUtil;
29 import io.netty.util.internal.PromiseNotificationUtil;
30 import io.netty.util.internal.SystemPropertyUtil;
31 import io.netty.util.internal.logging.InternalLogger;
32 import io.netty.util.internal.logging.InternalLoggerFactory;
33
34 import java.nio.ByteBuffer;
35 import java.nio.channels.ClosedChannelException;
36 import java.util.Arrays;
37 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
38 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
39
40 import static java.lang.Math.min;
41
42 /**
43  * (Transport implementors only) an internal data structure used by {@link AbstractChannel} to store its pending
44  * outbound write requests.
45  * <p>
46  * All methods must be called by a transport implementation from an I/O thread, except the following ones:
47  * <ul>
48  * <li>{@link #size()} and {@link #isEmpty()}</li>
49  * <li>{@link #isWritable()}</li>
50  * <li>{@link #getUserDefinedWritability(int)} and {@link #setUserDefinedWritability(intboolean)}</li>
51  * </ul>
52  * </p>
53  */

54 public final class ChannelOutboundBuffer {
55     // Assuming a 64-bit JVM:
56     //  - 16 bytes object header
57     //  - 6 reference fields
58     //  - 2 long fields
59     //  - 2 int fields
60     //  - 1 boolean field
61     //  - padding
62     static final int CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD =
63             SystemPropertyUtil.getInt("io.netty.transport.outboundBufferEntrySizeOverhead", 96);
64
65     private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelOutboundBuffer.class);
66
67     private static final FastThreadLocal<ByteBuffer[]> NIO_BUFFERS = new FastThreadLocal<ByteBuffer[]>() {
68         @Override
69         protected ByteBuffer[] initialValue() throws Exception {
70             return new ByteBuffer[1024];
71         }
72     };
73
74     private final Channel channel;
75
76     // Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry)
77     //
78     // The Entry that is the first in the linked-list structure that was flushed
79     private Entry flushedEntry;
80     // The Entry which is the first unflushed in the linked-list structure
81     private Entry unflushedEntry;
82     // The Entry which represents the tail of the buffer
83     private Entry tailEntry;
84     // The number of flushed entries that are not written yet
85     private int flushed;
86
87     private int nioBufferCount;
88     private long nioBufferSize;
89
90     private boolean inFail;
91
92     private static final AtomicLongFieldUpdater<ChannelOutboundBuffer> TOTAL_PENDING_SIZE_UPDATER =
93             AtomicLongFieldUpdater.newUpdater(ChannelOutboundBuffer.class"totalPendingSize");
94
95     @SuppressWarnings("UnusedDeclaration")
96     private volatile long totalPendingSize;
97
98     private static final AtomicIntegerFieldUpdater<ChannelOutboundBuffer> UNWRITABLE_UPDATER =
99             AtomicIntegerFieldUpdater.newUpdater(ChannelOutboundBuffer.class"unwritable");
100
101     @SuppressWarnings("UnusedDeclaration")
102     private volatile int unwritable;
103
104     private volatile Runnable fireChannelWritabilityChangedTask;
105
106     ChannelOutboundBuffer(AbstractChannel channel) {
107         this.channel = channel;
108     }
109
110     /**
111      * Add given message to this {@link ChannelOutboundBuffer}. The given {@link ChannelPromise} will be notified once
112      * the message was written.
113      */

114     public void addMessage(Object msg, int size, ChannelPromise promise) {
115         Entry entry = Entry.newInstance(msg, size, total(msg), promise);
116         if (tailEntry == null) {
117             flushedEntry = null;
118         } else {
119             Entry tail = tailEntry;
120             tail.next = entry;
121         }
122         tailEntry = entry;
123         if (unflushedEntry == null) {
124             unflushedEntry = entry;
125         }
126
127         // increment pending bytes after adding message to the unflushed arrays.
128         // See https://github.com/netty/netty/issues/1619
129         incrementPendingOutboundBytes(entry.pendingSize, false);
130     }
131
132     /**
133      * Add a flush to this {@link ChannelOutboundBuffer}. This means all previous added messages are marked as flushed
134      * and so you will be able to handle them.
135      */

136     public void addFlush() {
137         // There is no need to process all entries if there was already a flush before and no new messages
138         // where added in the meantime.
139         //
140         // See https://github.com/netty/netty/issues/2577
141         Entry entry = unflushedEntry;
142         if (entry != null) {
143             if (flushedEntry == null) {
144                 // there is no flushedEntry yet, so start with the entry
145                 flushedEntry = entry;
146             }
147             do {
148                 flushed ++;
149                 if (!entry.promise.setUncancellable()) {
150                     // Was cancelled so make sure we free up memory and notify about the freed bytes
151                     int pending = entry.cancel();
152                     decrementPendingOutboundBytes(pending, falsetrue);
153                 }
154                 entry = entry.next;
155             } while (entry != null);
156
157             // All flushed so reset unflushedEntry
158             unflushedEntry = null;
159         }
160     }
161
162     /**
163      * Increment the pending bytes which will be written at some point.
164      * This method is thread-safe!
165      */

166     void incrementPendingOutboundBytes(long size) {
167         incrementPendingOutboundBytes(size, true);
168     }
169
170     private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
171         if (size == 0) {
172             return;
173         }
174
175         long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
176         if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
177             setUnwritable(invokeLater);
178         }
179     }
180
181     /**
182      * Decrement the pending bytes which will be written at some point.
183      * This method is thread-safe!
184      */

185     void decrementPendingOutboundBytes(long size) {
186         decrementPendingOutboundBytes(size, truetrue);
187     }
188
189     private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
190         if (size == 0) {
191             return;
192         }
193
194         long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
195         if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {
196             setWritable(invokeLater);
197         }
198     }
199
200     private static long total(Object msg) {
201         if (msg instanceof ByteBuf) {
202             return ((ByteBuf) msg).readableBytes();
203         }
204         if (msg instanceof FileRegion) {
205             return ((FileRegion) msg).count();
206         }
207         if (msg instanceof ByteBufHolder) {
208             return ((ByteBufHolder) msg).content().readableBytes();
209         }
210         return -1;
211     }
212
213     /**
214      * Return the current message to write or {@code nullif nothing was flushed before and so is ready to be written.
215      */

216     public Object current() {
217         Entry entry = flushedEntry;
218         if (entry == null) {
219             return null;
220         }
221
222         return entry.msg;
223     }
224
225     /**
226      * Return the current message flush progress.
227      * @return {@code 0} if nothing was flushed before for the current message or there is no current message
228      */

229     public long currentProgress() {
230         Entry entry = flushedEntry;
231         if (entry == null) {
232             return 0;
233         }
234         return entry.progress;
235     }
236
237     /**
238      * Notify the {@link ChannelPromise} of the current message about writing progress.
239      */

240     public void progress(long amount) {
241         Entry e = flushedEntry;
242         assert e != null;
243         ChannelPromise p = e.promise;
244         long progress = e.progress + amount;
245         e.progress = progress;
246         if (p instanceof ChannelProgressivePromise) {
247             ((ChannelProgressivePromise) p).tryProgress(progress, e.total);
248         }
249     }
250
251     /**
252      * Will remove the current message, mark its {@link ChannelPromise} as success and return {@code true}. If no
253      * flushed message exists at the time this method is called it will return {@code false} to signal that no more
254      * messages are ready to be handled.
255      */

256     public boolean remove() {
257         Entry e = flushedEntry;
258         if (e == null) {
259             clearNioBuffers();
260             return false;
261         }
262         Object msg = e.msg;
263
264         ChannelPromise promise = e.promise;
265         int size = e.pendingSize;
266
267         removeEntry(e);
268
269         if (!e.cancelled) {
270             // only release message, notify and decrement if it was not canceled before.
271             ReferenceCountUtil.safeRelease(msg);
272             safeSuccess(promise);
273             decrementPendingOutboundBytes(size, falsetrue);
274         }
275
276         // recycle the entry
277         e.recycle();
278
279         return true;
280     }
281
282     /**
283      * Will remove the current message, mark its {@link ChannelPromise} as failure using the given {@link Throwable}
284      * and return {@code true}. If no   flushed message exists at the time this method is called it will return
285      * {@code false} to signal that no more messages are ready to be handled.
286      */

287     public boolean remove(Throwable cause) {
288         return remove0(cause, true);
289     }
290
291     private boolean remove0(Throwable cause, boolean notifyWritability) {
292         Entry e = flushedEntry;
293         if (e == null) {
294             clearNioBuffers();
295             return false;
296         }
297         Object msg = e.msg;
298
299         ChannelPromise promise = e.promise;
300         int size = e.pendingSize;
301
302         removeEntry(e);
303
304         if (!e.cancelled) {
305             // only release message, fail and decrement if it was not canceled before.
306             ReferenceCountUtil.safeRelease(msg);
307
308             safeFail(promise, cause);
309             decrementPendingOutboundBytes(size, false, notifyWritability);
310         }
311
312         // recycle the entry
313         e.recycle();
314
315         return true;
316     }
317
318     private void removeEntry(Entry e) {
319         if (-- flushed == 0) {
320             // processed everything
321             flushedEntry = null;
322             if (e == tailEntry) {
323                 tailEntry = null;
324                 unflushedEntry = null;
325             }
326         } else {
327             flushedEntry = e.next;
328         }
329     }
330
331     /**
332      * Removes the fully written entries and update the reader index of the partially written entry.
333      * This operation assumes all messages in this buffer is {@link ByteBuf}.
334      */

335     public void removeBytes(long writtenBytes) {
336         for (;;) {
337             Object msg = current();
338             if (!(msg instanceof ByteBuf)) {
339                 assert writtenBytes == 0;
340                 break;
341             }
342
343             final ByteBuf buf = (ByteBuf) msg;
344             final int readerIndex = buf.readerIndex();
345             final int readableBytes = buf.writerIndex() - readerIndex;
346
347             if (readableBytes <= writtenBytes) {
348                 if (writtenBytes != 0) {
349                     progress(readableBytes);
350                     writtenBytes -= readableBytes;
351                 }
352                 remove();
353             } else { // readableBytes > writtenBytes
354                 if (writtenBytes != 0) {
355                     buf.readerIndex(readerIndex + (int) writtenBytes);
356                     progress(writtenBytes);
357                 }
358                 break;
359             }
360         }
361         clearNioBuffers();
362     }
363
364     // Clear all ByteBuffer from the array so these can be GC'ed.
365     // See https://github.com/netty/netty/issues/3837
366     private void clearNioBuffers() {
367         int count = nioBufferCount;
368         if (count > 0) {
369             nioBufferCount = 0;
370             Arrays.fill(NIO_BUFFERS.get(), 0, count, null);
371         }
372     }
373
374     /**
375      * Returns an array of direct NIO buffers if the currently pending messages are made of {@link ByteBuf} only.
376      * {@link #nioBufferCount()} and {@link #nioBufferSize()} will return the number of NIO buffers in the returned
377      * array and the total number of readable bytes of the NIO buffers respectively.
378      * <p>
379      * Note that the returned array is reused and thus should not escape
380      * {@link AbstractChannel#doWrite(ChannelOutboundBuffer)}.
381      * Refer to {@link NioSocketChannel#doWrite(ChannelOutboundBuffer)} for an example.
382      * </p>
383      */

384     public ByteBuffer[] nioBuffers() {
385         return nioBuffers(Integer.MAX_VALUE, Integer.MAX_VALUE);
386     }
387
388     /**
389      * Returns an array of direct NIO buffers if the currently pending messages are made of {@link ByteBuf} only.
390      * {@link #nioBufferCount()} and {@link #nioBufferSize()} will return the number of NIO buffers in the returned
391      * array and the total number of readable bytes of the NIO buffers respectively.
392      * <p>
393      * Note that the returned array is reused and thus should not escape
394      * {@link AbstractChannel#doWrite(ChannelOutboundBuffer)}.
395      * Refer to {@link NioSocketChannel#doWrite(ChannelOutboundBuffer)} for an example.
396      * </p>
397      * @param maxCount The maximum amount of buffers that will be added to the return value.
398      * @param maxBytes A hint toward the maximum number of bytes to include as part of the return value. Note that this
399      *                 value maybe exceeded because we make a best effort to include at least 1 {@link ByteBuffer}
400      *                 in the return value to ensure write progress is made.
401      */

402     public ByteBuffer[] nioBuffers(int maxCount, long maxBytes) {
403         assert maxCount > 0;
404         assert maxBytes > 0;
405         long nioBufferSize = 0;
406         int nioBufferCount = 0;
407         final InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
408         ByteBuffer[] nioBuffers = NIO_BUFFERS.get(threadLocalMap);
409         Entry entry = flushedEntry;
410         while (isFlushedEntry(entry) && entry.msg instanceof ByteBuf) {
411             if (!entry.cancelled) {
412                 ByteBuf buf = (ByteBuf) entry.msg;
413                 final int readerIndex = buf.readerIndex();
414                 final int readableBytes = buf.writerIndex() - readerIndex;
415
416                 if (readableBytes > 0) {
417                     if (maxBytes - readableBytes < nioBufferSize && nioBufferCount != 0) {
418                         // If the nioBufferSize + readableBytes will overflow maxBytes, and there is at least one entry
419                         // we stop populate the ByteBuffer array. This is done for 2 reasons:
420                         // 1. bsd/osx don't allow to write more bytes then Integer.MAX_VALUE with one writev(...) call
421                         // and so will return 'EINVAL', which will raise an IOException. On Linux it may work depending
422                         // on the architecture and kernel but to be safe we also enforce the limit here.
423                         // 2. There is no sense in putting more data in the array than is likely to be accepted by the
424                         // OS.
425                         //
426                         // See also:
427                         // - https://www.freebsd.org/cgi/man.cgi?query=write&sektion=2
428                         // - http://linux.die.net/man/2/writev
429                         break;
430                     }
431                     nioBufferSize += readableBytes;
432                     int count = entry.count;
433                     if (count == -1) {
434                         //noinspection ConstantValueVariableUse
435                         entry.count = count = buf.nioBufferCount();
436                     }
437                     int neededSpace = min(maxCount, nioBufferCount + count);
438                     if (neededSpace > nioBuffers.length) {
439                         nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount);
440                         NIO_BUFFERS.set(threadLocalMap, nioBuffers);
441                     }
442                     if (count == 1) {
443                         ByteBuffer nioBuf = entry.buf;
444                         if (nioBuf == null) {
445                             // cache ByteBuffer as it may need to create a new ByteBuffer instance if its a
446                             // derived buffer
447                             entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes);
448                         }
449                         nioBuffers[nioBufferCount++] = nioBuf;
450                     } else {
451                         // The code exists in an extra method to ensure the method is not too big to inline as this
452                         // branch is not very likely to get hit very frequently.
453                         nioBufferCount = nioBuffers(entry, buf, nioBuffers, nioBufferCount, maxCount);
454                     }
455                     if (nioBufferCount >= maxCount) {
456                         break;
457                     }
458                 }
459             }
460             entry = entry.next;
461         }
462         this.nioBufferCount = nioBufferCount;
463         this.nioBufferSize = nioBufferSize;
464
465         return nioBuffers;
466     }
467
468     private static int nioBuffers(Entry entry, ByteBuf buf, ByteBuffer[] nioBuffers, int nioBufferCount, int maxCount) {
469         ByteBuffer[] nioBufs = entry.bufs;
470         if (nioBufs == null) {
471             // cached ByteBuffers as they may be expensive to create in terms
472             // of Object allocation
473             entry.bufs = nioBufs = buf.nioBuffers();
474         }
475         for (int i = 0; i < nioBufs.length && nioBufferCount < maxCount; ++i) {
476             ByteBuffer nioBuf = nioBufs[i];
477             if (nioBuf == null) {
478                 break;
479             } else if (!nioBuf.hasRemaining()) {
480                 continue;
481             }
482             nioBuffers[nioBufferCount++] = nioBuf;
483         }
484         return nioBufferCount;
485     }
486
487     private static ByteBuffer[] expandNioBufferArray(ByteBuffer[] array, int neededSpace, int size) {
488         int newCapacity = array.length;
489         do {
490             // double capacity until it is big enough
491             // See https://github.com/netty/netty/issues/1890
492             newCapacity <<= 1;
493
494             if (newCapacity < 0) {
495                 throw new IllegalStateException();
496             }
497
498         } while (neededSpace > newCapacity);
499
500         ByteBuffer[] newArray = new ByteBuffer[newCapacity];
501         System.arraycopy(array, 0, newArray, 0, size);
502
503         return newArray;
504     }
505
506     /**
507      * Returns the number of {@link ByteBuffer} that can be written out of the {@link ByteBuffer} array that was
508      * obtained via {@link #nioBuffers()}. This method <strong>MUST</strong> be called after {@link #nioBuffers()}
509      * was called.
510      */

511     public int nioBufferCount() {
512         return nioBufferCount;
513     }
514
515     /**
516      * Returns the number of bytes that can be written out of the {@link ByteBuffer} array that was
517      * obtained via {@link #nioBuffers()}. This method <strong>MUST</strong> be called after {@link #nioBuffers()}
518      * was called.
519      */

520     public long nioBufferSize() {
521         return nioBufferSize;
522     }
523
524     /**
525      * Returns {@code trueif and only if {@linkplain #totalPendingWriteBytes() the total number of pending bytes} did
526      * not exceed the write watermark of the {@link Channel} and
527      * no {@linkplain #setUserDefinedWritability(intboolean) user-defined writability flag} has been set to
528      * {@code false}.
529      */

530     public boolean isWritable() {
531         return unwritable == 0;
532     }
533
534     /**
535      * Returns {@code trueif and only if the user-defined writability flag at the specified index is set to
536      * {@code true}.
537      */

538     public boolean getUserDefinedWritability(int index) {
539         return (unwritable & writabilityMask(index)) == 0;
540     }
541
542     /**
543      * Sets a user-defined writability flag at the specified index.
544      */

545     public void setUserDefinedWritability(int index, boolean writable) {
546         if (writable) {
547             setUserDefinedWritability(index);
548         } else {
549             clearUserDefinedWritability(index);
550         }
551     }
552
553     private void setUserDefinedWritability(int index) {
554         final int mask = ~writabilityMask(index);
555         for (;;) {
556             final int oldValue = unwritable;
557             final int newValue = oldValue & mask;
558             if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
559                 if (oldValue != 0 && newValue == 0) {
560                     fireChannelWritabilityChanged(true);
561                 }
562                 break;
563             }
564         }
565     }
566
567     private void clearUserDefinedWritability(int index) {
568         final int mask = writabilityMask(index);
569         for (;;) {
570             final int oldValue = unwritable;
571             final int newValue = oldValue | mask;
572             if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
573                 if (oldValue == 0 && newValue != 0) {
574                     fireChannelWritabilityChanged(true);
575                 }
576                 break;
577             }
578         }
579     }
580
581     private static int writabilityMask(int index) {
582         if (index < 1 || index > 31) {
583             throw new IllegalArgumentException("index: " + index + " (expected: 1~31)");
584         }
585         return 1 << index;
586     }
587
588     private void setWritable(boolean invokeLater) {
589         for (;;) {
590             final int oldValue = unwritable;
591             final int newValue = oldValue & ~1;
592             if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
593                 if (oldValue != 0 && newValue == 0) {
594                     fireChannelWritabilityChanged(invokeLater);
595                 }
596                 break;
597             }
598         }
599     }
600
601     private void setUnwritable(boolean invokeLater) {
602         for (;;) {
603             final int oldValue = unwritable;
604             final int newValue = oldValue | 1;
605             if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
606                 if (oldValue == 0 && newValue != 0) {
607                     fireChannelWritabilityChanged(invokeLater);
608                 }
609                 break;
610             }
611         }
612     }
613
614     private void fireChannelWritabilityChanged(boolean invokeLater) {
615         final ChannelPipeline pipeline = channel.pipeline();
616         if (invokeLater) {
617             Runnable task = fireChannelWritabilityChangedTask;
618             if (task == null) {
619                 fireChannelWritabilityChangedTask = task = new Runnable() {
620                     @Override
621                     public void run() {
622                         pipeline.fireChannelWritabilityChanged();
623                     }
624                 };
625             }
626             channel.eventLoop().execute(task);
627         } else {
628             pipeline.fireChannelWritabilityChanged();
629         }
630     }
631
632     /**
633      * Returns the number of flushed messages in this {@link ChannelOutboundBuffer}.
634      */

635     public int size() {
636         return flushed;
637     }
638
639     /**
640      * Returns {@code trueif there are flushed messages in this {@link ChannelOutboundBuffer} or {@code false}
641      * otherwise.
642      */

643     public boolean isEmpty() {
644         return flushed == 0;
645     }
646
647     void failFlushed(Throwable cause, boolean notify) {
648         // Make sure that this method does not reenter.  A listener added to the current promise can be notified by the
649         // current thread in the tryFailure() call of the loop below, and the listener can trigger another fail() call
650         // indirectly (usually by closing the channel.)
651         //
652         // See https://github.com/netty/netty/issues/1501
653         if (inFail) {
654             return;
655         }
656
657         try {
658             inFail = true;
659             for (;;) {
660                 if (!remove0(cause, notify)) {
661                     break;
662                 }
663             }
664         } finally {
665             inFail = false;
666         }
667     }
668
669     void close(final Throwable cause, final boolean allowChannelOpen) {
670         if (inFail) {
671             channel.eventLoop().execute(new Runnable() {
672                 @Override
673                 public void run() {
674                     close(cause, allowChannelOpen);
675                 }
676             });
677             return;
678         }
679
680         inFail = true;
681
682         if (!allowChannelOpen && channel.isOpen()) {
683             throw new IllegalStateException("close() must be invoked after the channel is closed.");
684         }
685
686         if (!isEmpty()) {
687             throw new IllegalStateException("close() must be invoked after all flushed writes are handled.");
688         }
689
690         // Release all unflushed messages.
691         try {
692             Entry e = unflushedEntry;
693             while (e != null) {
694                 // Just decrease; do not trigger any events via decrementPendingOutboundBytes()
695                 int size = e.pendingSize;
696                 TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
697
698                 if (!e.cancelled) {
699                     ReferenceCountUtil.safeRelease(e.msg);
700                     safeFail(e.promise, cause);
701                 }
702                 e = e.recycleAndGetNext();
703             }
704         } finally {
705             inFail = false;
706         }
707         clearNioBuffers();
708     }
709
710     void close(ClosedChannelException cause) {
711         close(cause, false);
712     }
713
714     private static void safeSuccess(ChannelPromise promise) {
715         // Only log if the given promise is not of type VoidChannelPromise as trySuccess(...) is expected to return
716         // false.
717         PromiseNotificationUtil.trySuccess(promise, null, promise instanceof VoidChannelPromise ? null : logger);
718     }
719
720     private static void safeFail(ChannelPromise promise, Throwable cause) {
721         // Only log if the given promise is not of type VoidChannelPromise as tryFailure(...) is expected to return
722         // false.
723         PromiseNotificationUtil.tryFailure(promise, cause, promise instanceof VoidChannelPromise ? null : logger);
724     }
725
726     @Deprecated
727     public void recycle() {
728         // NOOP
729     }
730
731     public long totalPendingWriteBytes() {
732         return totalPendingSize;
733     }
734
735     /**
736      * Get how many bytes can be written until {@link #isWritable()} returns {@code false}.
737      * This quantity will always be non-negative. If {@link #isWritable()} is {@code false} then 0.
738      */

739     public long bytesBeforeUnwritable() {
740         long bytes = channel.config().getWriteBufferHighWaterMark() - totalPendingSize;
741         // If bytes is negative we know we are not writable, but if bytes is non-negative we have to check writability.
742         // Note that totalPendingSize and isWritable() use different volatile variables that are not synchronized
743         // together. totalPendingSize will be updated before isWritable().
744         if (bytes > 0) {
745             return isWritable() ? bytes : 0;
746         }
747         return 0;
748     }
749
750     /**
751      * Get how many bytes must be drained from the underlying buffer until {@link #isWritable()} returns {@code true}.
752      * This quantity will always be non-negative. If {@link #isWritable()} is {@code true} then 0.
753      */

754     public long bytesBeforeWritable() {
755         long bytes = totalPendingSize - channel.config().getWriteBufferLowWaterMark();
756         // If bytes is negative we know we are writable, but if bytes is non-negative we have to check writability.
757         // Note that totalPendingSize and isWritable() use different volatile variables that are not synchronized
758         // together. totalPendingSize will be updated before isWritable().
759         if (bytes > 0) {
760             return isWritable() ? 0 : bytes;
761         }
762         return 0;
763     }
764
765     /**
766      * Call {@link MessageProcessor#processMessage(Object)} for each flushed message
767      * in this {@link ChannelOutboundBuffer} until {@link MessageProcessor#processMessage(Object)}
768      * returns {@code false} or there are no more flushed messages to process.
769      */

770     public void forEachFlushedMessage(MessageProcessor processor) throws Exception {
771         ObjectUtil.checkNotNull(processor, "processor");
772
773         Entry entry = flushedEntry;
774         if (entry == null) {
775             return;
776         }
777
778         do {
779             if (!entry.cancelled) {
780                 if (!processor.processMessage(entry.msg)) {
781                     return;
782                 }
783             }
784             entry = entry.next;
785         } while (isFlushedEntry(entry));
786     }
787
788     private boolean isFlushedEntry(Entry e) {
789         return e != null && e != unflushedEntry;
790     }
791
792     public interface MessageProcessor {
793         /**
794          * Will be called for each flushed message until it either there are no more flushed messages or this
795          * method returns {@code false}.
796          */

797         boolean processMessage(Object msg) throws Exception;
798     }
799
800     static final class Entry {
801         private static final ObjectPool<Entry> RECYCLER = ObjectPool.newPool(new ObjectCreator<Entry>() {
802             @Override
803             public Entry newObject(Handle<Entry> handle) {
804                 return new Entry(handle);
805             }
806         });
807
808         private final Handle<Entry> handle;
809         Entry next;
810         Object msg;
811         ByteBuffer[] bufs;
812         ByteBuffer buf;
813         ChannelPromise promise;
814         long progress;
815         long total;
816         int pendingSize;
817         int count = -1;
818         boolean cancelled;
819
820         private Entry(Handle<Entry> handle) {
821             this.handle = handle;
822         }
823
824         static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) {
825             Entry entry = RECYCLER.get();
826             entry.msg = msg;
827             entry.pendingSize = size + CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD;
828             entry.total = total;
829             entry.promise = promise;
830             return entry;
831         }
832
833         int cancel() {
834             if (!cancelled) {
835                 cancelled = true;
836                 int pSize = pendingSize;
837
838                 // release message and replace with an empty buffer
839                 ReferenceCountUtil.safeRelease(msg);
840                 msg = Unpooled.EMPTY_BUFFER;
841
842                 pendingSize = 0;
843                 total = 0;
844                 progress = 0;
845                 bufs = null;
846                 buf = null;
847                 return pSize;
848             }
849             return 0;
850         }
851
852         void recycle() {
853             next = null;
854             bufs = null;
855             buf = null;
856             msg = null;
857             promise = null;
858             progress = 0;
859             total = 0;
860             pendingSize = 0;
861             count = -1;
862             cancelled = false;
863             handle.recycle(this);
864         }
865
866         Entry recycleAndGetNext() {
867             Entry next = this.next;
868             recycle();
869             return next;
870         }
871     }
872 }
873