1 /*
2  * JBoss, Home of Professional Open Source.
3  * Copyright 2014 Red Hat, Inc., and individual contributors
4  * as indicated by the @author tags.
5  *
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  */

18
19 package io.undertow.server.protocol.http;
20
21 import static org.xnio.Bits.allAreClear;
22 import static org.xnio.Bits.allAreSet;
23
24 import java.io.IOException;
25 import java.nio.ByteBuffer;
26 import java.nio.channels.ClosedChannelException;
27 import java.nio.channels.FileChannel;
28
29 import org.xnio.Buffers;
30 import org.xnio.IoUtils;
31 import org.xnio.XnioWorker;
32 import org.xnio.channels.StreamSourceChannel;
33 import org.xnio.conduits.AbstractStreamSinkConduit;
34 import org.xnio.conduits.ConduitWritableByteChannel;
35 import org.xnio.conduits.Conduits;
36 import org.xnio.conduits.StreamSinkConduit;
37
38 import io.undertow.UndertowMessages;
39 import io.undertow.connector.ByteBufferPool;
40 import io.undertow.connector.PooledByteBuffer;
41 import io.undertow.server.Connectors;
42 import io.undertow.server.HttpServerExchange;
43 import io.undertow.util.HeaderMap;
44 import io.undertow.util.HeaderValues;
45 import io.undertow.util.HttpString;
46 import io.undertow.util.Protocols;
47 import io.undertow.util.StatusCodes;
48
49 /**
50  * @author <a href="mailto:david.lloyd@redhat.com">David M. Lloyd</a>
51  */

52 final class HttpResponseConduit extends AbstractStreamSinkConduit<StreamSinkConduit> {
53
54     private final ByteBufferPool pool;
55     private final HttpServerConnection connection;
56
57     private int state = STATE_START;
58
59     private long fiCookie = -1L;
60     private String string;
61     private HeaderValues headerValues;
62     private int valueIdx;
63     private int charIndex;
64     private PooledByteBuffer pooledBuffer;
65     private PooledByteBuffer pooledFileTransferBuffer;
66     private HttpServerExchange exchange;
67
68     private ByteBuffer[] writevBuffer;
69     private boolean done = false;
70
71     private static final int STATE_BODY = 0; // Message body, normal pass-through operation
72     private static final int STATE_START = 1; // No headers written yet
73     private static final int STATE_HDR_NAME = 2; // Header name indexed by charIndex
74     private static final int STATE_HDR_D = 3; // Header delimiter ':'
75     private static final int STATE_HDR_DS = 4; // Header delimiter ': '
76     private static final int STATE_HDR_VAL = 5; // Header value
77     private static final int STATE_HDR_EOL_CR = 6; // Header line CR
78     private static final int STATE_HDR_EOL_LF = 7; // Header line LF
79     private static final int STATE_HDR_FINAL_CR = 8; // Final CR
80     private static final int STATE_HDR_FINAL_LF = 9; // Final LF
81     private static final int STATE_BUF_FLUSH = 10; // flush the buffer and go to writing body
82
83     private static final int MASK_STATE = 0x0000000F;
84     private static final int FLAG_SHUTDOWN = 0x00000010;
85
86     HttpResponseConduit(final StreamSinkConduit next, final ByteBufferPool pool, HttpServerConnection connection) {
87         super(next);
88         this.pool = pool;
89         this.connection = connection;
90     }
91
92     HttpResponseConduit(final StreamSinkConduit next, final ByteBufferPool pool, HttpServerConnection connection, HttpServerExchange exchange) {
93         super(next);
94         this.pool = pool;
95         this.connection = connection;
96         this.exchange = exchange;
97     }
98     void reset(HttpServerExchange exchange) {
99
100         this.exchange = exchange;
101         state = STATE_START;
102         fiCookie = -1L;
103         string = null;
104         headerValues = null;
105         valueIdx = 0;
106         charIndex = 0;
107     }
108
109     /**
110      * Handles writing out the header data. It can also take a byte buffer of user
111      * data, to enable both user data and headers to be written out in a single operation,
112      * which has a noticeable performance impact.
113      * <p>
114      * It is up to the caller to note the current position of this buffer before and after they
115      * call this method, and use this to figure out how many bytes (if any) have been written.
116      *
117      * @param state
118      * @param userData
119      * @return
120      * @throws IOException
121      */

122     private int processWrite(int state, final Object userData, int pos, int length) throws IOException {
123         if (done || exchange == null) {
124             throw new ClosedChannelException();
125         }
126         try {
127             assert state != STATE_BODY;
128             if (state == STATE_BUF_FLUSH) {
129                 final ByteBuffer byteBuffer = pooledBuffer.getBuffer();
130                 do {
131                     long res = 0;
132                     ByteBuffer[] data;
133                     if (userData == null || length == 0) {
134                         res = next.write(byteBuffer);
135                     } else if (userData instanceof ByteBuffer) {
136                         data = writevBuffer;
137                         if (data == null) {
138                             data = writevBuffer = new ByteBuffer[2];
139                         }
140                         data[0] = byteBuffer;
141                         data[1] = (ByteBuffer) userData;
142                         res = next.write(data, 0, 2);
143                     } else {
144                         data = writevBuffer;
145                         if (data == null || data.length < length + 1) {
146                             data = writevBuffer = new ByteBuffer[length + 1];
147                         }
148                         data[0] = byteBuffer;
149                         System.arraycopy(userData, pos, data, 1, length);
150                         res = next.write(data, 0, length + 1);
151                     }
152                     if (res == 0) {
153                         return STATE_BUF_FLUSH;
154                     }
155                 } while (byteBuffer.hasRemaining());
156                 bufferDone();
157                 return STATE_BODY;
158             } else if (state != STATE_START) {
159                 return processStatefulWrite(state, userData, pos, length);
160             }
161
162             //merge the cookies into the header map
163             Connectors.flattenCookies(exchange);
164
165             if (pooledBuffer == null) {
166                 pooledBuffer = pool.allocate();
167             }
168             ByteBuffer buffer = pooledBuffer.getBuffer();
169
170
171             assert buffer.remaining() >= 50;
172             Protocols.HTTP_1_1.appendTo(buffer);
173             buffer.put((byte) ' ');
174             int code = exchange.getStatusCode();
175             assert 999 >= code && code >= 100;
176             buffer.put((byte) (code / 100 + '0'));
177             buffer.put((byte) (code / 10 % 10 + '0'));
178             buffer.put((byte) (code % 10 + '0'));
179             buffer.put((byte) ' ');
180
181             String string = exchange.getReasonPhrase();
182             if(string == null) {
183                 string = StatusCodes.getReason(code);
184             }
185             if(string.length() > buffer.remaining()) {
186                 pooledBuffer.close();
187                 pooledBuffer = null;
188                 truncateWrites();
189                 throw UndertowMessages.MESSAGES.reasonPhraseToLargeForBuffer(string);
190             }
191             writeString(buffer, string);
192             buffer.put((byte) '\r').put((byte) '\n');
193
194             int remaining = buffer.remaining();
195
196
197             HeaderMap headers = exchange.getResponseHeaders();
198             long fiCookie = headers.fastIterateNonEmpty();
199             while (fiCookie != -1) {
200                 HeaderValues headerValues = headers.fiCurrent(fiCookie);
201
202                 HttpString header = headerValues.getHeaderName();
203                 int headerSize = header.length();
204                 int valueIdx = 0;
205                 while (valueIdx < headerValues.size()) {
206                     remaining -= (headerSize + 2);
207
208                     if (remaining < 0) {
209                         this.fiCookie = fiCookie;
210                         this.string = string;
211                         this.headerValues = headerValues;
212                         this.valueIdx = valueIdx;
213                         this.charIndex = 0;
214                         this.state = STATE_HDR_NAME;
215                         buffer.flip();
216                         return processStatefulWrite(STATE_HDR_NAME, userData, pos, length);
217                     }
218                     header.appendTo(buffer);
219                     buffer.put((byte) ':').put((byte) ' ');
220                     string = headerValues.get(valueIdx++);
221
222                     remaining -= (string.length() + 2);
223                     if (remaining < 2) {//we use 2 here, to make sure we always have room for the final \r\n
224                         this.fiCookie = fiCookie;
225                         this.string = string;
226                         this.headerValues = headerValues;
227                         this.valueIdx = valueIdx;
228                         this.charIndex = 0;
229                         this.state = STATE_HDR_VAL;
230                         buffer.flip();
231                         return processStatefulWrite(STATE_HDR_VAL, userData, pos, length);
232                     }
233                     writeString(buffer, string);
234                     buffer.put((byte) '\r').put((byte) '\n');
235                 }
236                 fiCookie = headers.fiNextNonEmpty(fiCookie);
237             }
238             buffer.put((byte) '\r').put((byte) '\n');
239             buffer.flip();
240             do {
241                 long res = 0;
242                 ByteBuffer[] data;
243                 if (userData == null) {
244                     res = next.write(buffer);
245                 } else if (userData instanceof ByteBuffer) {
246                     data = writevBuffer;
247                     if (data == null) {
248                         data = writevBuffer = new ByteBuffer[2];
249                     }
250                     data[0] = buffer;
251                     data[1] = (ByteBuffer) userData;
252                     res = next.write(data, 0, 2);
253                 } else {
254                     data = writevBuffer;
255                     if (data == null || data.length < length + 1) {
256                         data = writevBuffer = new ByteBuffer[length + 1];
257                     }
258                     data[0] = buffer;
259                     System.arraycopy(userData, pos, data, 1, length);
260                     res = next.write(data, 0, length + 1);
261                 }
262                 if (res == 0) {
263                     return STATE_BUF_FLUSH;
264                 }
265             } while (buffer.hasRemaining());
266             bufferDone();
267             return STATE_BODY;
268         } catch (IOException | RuntimeException | Error e) {
269             //WFLY-4696, just to be safe
270             if (pooledBuffer != null) {
271                 pooledBuffer.close();
272                 pooledBuffer = null;
273             }
274             throw e;
275         }
276     }
277
278     private void bufferDone() {
279         if(exchange == null) {
280             return;
281         }
282         HttpServerConnection connection = (HttpServerConnection)exchange.getConnection();
283         if(connection.getExtraBytes() != null && connection.isOpen() && exchange.isRequestComplete()) {
284             //if we are pipelining we hold onto the buffer
285             pooledBuffer.getBuffer().clear();
286         } else {
287
288             pooledBuffer.close();
289             pooledBuffer = null;
290             this.exchange = null;
291         }
292     }
293
294     public void freeContinueResponse() {
295         if (pooledBuffer != null) {
296             pooledBuffer.close();
297             pooledBuffer = null;
298         }
299     }
300
301     private static void writeString(ByteBuffer buffer, String string) {
302         int length = string.length();
303         for (int charIndex = 0; charIndex < length; charIndex++) {
304             char c = string.charAt(charIndex);
305             byte b = (byte) c;
306             if(b != '\r' && b != '\n') {
307                 buffer.put(b);
308             } else {
309                 buffer.put((byte) ' ');
310             }
311         }
312     }
313
314
315     /**
316      * Handles writing out the header data in the case where is is too big to fit into a buffer. This is a much slower code path.
317      */

318     private int processStatefulWrite(int state, final Object userData, int pos, int len) throws IOException {
319         ByteBuffer buffer = pooledBuffer.getBuffer();
320         long fiCookie = this.fiCookie;
321         int valueIdx = this.valueIdx;
322         int charIndex = this.charIndex;
323         int length;
324         String string = this.string;
325         HeaderValues headerValues = this.headerValues;
326         int res;
327         // BUFFER IS FLIPPED COMING IN
328         if (buffer.hasRemaining()) {
329             do {
330                 res = next.write(buffer);
331                 if (res == 0) {
332                     return state;
333                 }
334             } while (buffer.hasRemaining());
335         }
336         buffer.clear();
337         HeaderMap headers = exchange.getResponseHeaders();
338         // BUFFER IS NOW EMPTY FOR FILLING
339         for (; ; ) {
340             switch (state) {
341                 case STATE_HDR_NAME: {
342                     final HttpString headerName = headerValues.getHeaderName();
343                     length = headerName.length();
344                     while (charIndex < length) {
345                         if (buffer.hasRemaining()) {
346                             buffer.put(headerName.byteAt(charIndex++));
347                         } else {
348                             buffer.flip();
349                             do {
350                                 res = next.write(buffer);
351                                 if (res == 0) {
352                                     this.string = string;
353                                     this.headerValues = headerValues;
354                                     this.charIndex = charIndex;
355                                     this.fiCookie = fiCookie;
356                                     this.valueIdx = valueIdx;
357                                     return STATE_HDR_NAME;
358                                 }
359                             } while (buffer.hasRemaining());
360                             buffer.clear();
361                         }
362                     }
363                     // fall thru
364                 }
365                 case STATE_HDR_D: {
366                     if (!buffer.hasRemaining()) {
367                         buffer.flip();
368                         do {
369                             res = next.write(buffer);
370                             if (res == 0) {
371                                 this.string = string;
372                                 this.headerValues = headerValues;
373                                 this.charIndex = charIndex;
374                                 this.fiCookie = fiCookie;
375                                 this.valueIdx = valueIdx;
376                                 return STATE_HDR_D;
377                             }
378                         } while (buffer.hasRemaining());
379                         buffer.clear();
380                     }
381                     buffer.put((byte) ':');
382                     // fall thru
383                 }
384                 case STATE_HDR_DS: {
385                     if (!buffer.hasRemaining()) {
386                         buffer.flip();
387                         do {
388                             res = next.write(buffer);
389                             if (res == 0) {
390                                 this.string = string;
391                                 this.headerValues = headerValues;
392                                 this.charIndex = charIndex;
393                                 this.fiCookie = fiCookie;
394                                 this.valueIdx = valueIdx;
395                                 return STATE_HDR_DS;
396                             }
397                         } while (buffer.hasRemaining());
398                         buffer.clear();
399                     }
400                     buffer.put((byte) ' ');
401                     //if (valueIterator == null) {
402                     //    valueIterator = exchange.getResponseHeaders().get(headerName).iterator();
403                     //}
404                     string = headerValues.get(valueIdx++);
405                     charIndex = 0;
406                     // fall thru
407                 }
408                 case STATE_HDR_VAL: {
409                     length = string.length();
410                     while (charIndex < length) {
411                         if (buffer.hasRemaining()) {
412                             buffer.put((byte) string.charAt(charIndex++));
413                         } else {
414                             buffer.flip();
415                             do {
416                                 res = next.write(buffer);
417                                 if (res == 0) {
418                                     this.string = string;
419                                     this.headerValues = headerValues;
420                                     this.charIndex = charIndex;
421                                     this.fiCookie = fiCookie;
422                                     this.valueIdx = valueIdx;
423                                     return STATE_HDR_VAL;
424                                 }
425                             } while (buffer.hasRemaining());
426                             buffer.clear();
427                         }
428                     }
429                     charIndex = 0;
430                     if (valueIdx == headerValues.size()) {
431                         if (!buffer.hasRemaining()) {
432                             if (flushHeaderBuffer(buffer, string, headerValues, charIndex, fiCookie, valueIdx))
433                                 return STATE_HDR_EOL_CR;
434                         }
435                         buffer.put((byte) 13); // CR
436                         if (!buffer.hasRemaining()) {
437                             if (flushHeaderBuffer(buffer, string, headerValues, charIndex, fiCookie, valueIdx))
438                                 return STATE_HDR_EOL_LF;
439                         }
440                         buffer.put((byte) 10); // LF
441                         if ((fiCookie = headers.fiNextNonEmpty(fiCookie)) != -1L) {
442                             headerValues = headers.fiCurrent(fiCookie);
443                             valueIdx = 0;
444                             state = STATE_HDR_NAME;
445                             break;
446                         } else {
447                             if (!buffer.hasRemaining()) {
448                                 if (flushHeaderBuffer(buffer, string, headerValues, charIndex, fiCookie, valueIdx))
449                                     return STATE_HDR_FINAL_CR;
450                             }
451                             buffer.put((byte) 13); // CR
452                             if (!buffer.hasRemaining()) {
453                                 if (flushHeaderBuffer(buffer, string, headerValues, charIndex, fiCookie, valueIdx))
454                                     return STATE_HDR_FINAL_LF;
455                             }
456                             buffer.put((byte) 10); // LF
457                             this.fiCookie = -1;
458                             this.valueIdx = 0;
459                             this.string = null;
460                             buffer.flip();
461                             //for performance reasons we use a gather write if there is user data
462                             if (userData == null) {
463                                 do {
464                                     res = next.write(buffer);
465                                     if (res == 0) {
466                                         return STATE_BUF_FLUSH;
467                                     }
468                                 } while (buffer.hasRemaining());
469                             } else if(userData instanceof ByteBuffer) {
470                                 ByteBuffer[] b = {buffer, (ByteBuffer) userData};
471                                 do {
472                                     long r = next.write(b, 0, b.length);
473                                     if (r == 0 && buffer.hasRemaining()) {
474                                         return STATE_BUF_FLUSH;
475                                     }
476                                 } while (buffer.hasRemaining());
477                             } else {
478                                 ByteBuffer[] b = new ByteBuffer[1 + len];
479                                 b[0] = buffer;
480                                 System.arraycopy(userData, pos, b, 1, len);
481                                 do {
482                                     long r = next.write(b, 0, b.length);
483                                     if (r == 0 && buffer.hasRemaining()) {
484                                         return STATE_BUF_FLUSH;
485                                     }
486                                 } while (buffer.hasRemaining());
487                             }
488                             bufferDone();
489                             return STATE_BODY;
490                         }
491                         // not reached
492                     }
493                     // fall thru
494                 }
495                 // Clean-up states
496                 case STATE_HDR_EOL_CR: {
497                     if (!buffer.hasRemaining()) {
498                         if (flushHeaderBuffer(buffer, string, headerValues, charIndex, fiCookie, valueIdx))
499                             return STATE_HDR_EOL_CR;
500                     }
501                     buffer.put((byte) 13); // CR
502                 }
503                 case STATE_HDR_EOL_LF: {
504                     if (!buffer.hasRemaining()) {
505                         if (flushHeaderBuffer(buffer, string, headerValues, charIndex, fiCookie, valueIdx))
506                             return STATE_HDR_EOL_LF;
507                     }
508                     buffer.put((byte) 10); // LF
509                     if (valueIdx < headerValues.size()) {
510                         state = STATE_HDR_NAME;
511                         break;
512                     } else if ((fiCookie = headers.fiNextNonEmpty(fiCookie)) != -1L) {
513                         headerValues = headers.fiCurrent(fiCookie);
514                         valueIdx = 0;
515                         state = STATE_HDR_NAME;
516                         break;
517                     }
518                     // fall thru
519                 }
520                 case STATE_HDR_FINAL_CR: {
521                     if (!buffer.hasRemaining()) {
522                         if (flushHeaderBuffer(buffer, string, headerValues, charIndex, fiCookie, valueIdx))
523                             return STATE_HDR_FINAL_CR;
524                     }
525                     buffer.put((byte) 13); // CR
526                     // fall thru
527                 }
528                 case STATE_HDR_FINAL_LF: {
529                     if (!buffer.hasRemaining()) {
530                         if (flushHeaderBuffer(buffer, string, headerValues, charIndex, fiCookie, valueIdx))
531                             return STATE_HDR_FINAL_LF;
532                     }
533                     buffer.put((byte) 10); // LF
534                     this.fiCookie = -1L;
535                     this.valueIdx = 0;
536                     this.string = null;
537                     buffer.flip();
538                     //for performance reasons we use a gather write if there is user data
539                     if (userData == null) {
540                         do {
541                             res = next.write(buffer);
542                             if (res == 0) {
543                                 return STATE_BUF_FLUSH;
544                             }
545                         } while (buffer.hasRemaining());
546                     } else if(userData instanceof ByteBuffer) {
547                         ByteBuffer[] b = {buffer, (ByteBuffer) userData};
548                         do {
549                             long r = next.write(b, 0, b.length);
550                             if (r == 0 && buffer.hasRemaining()) {
551                                 return STATE_BUF_FLUSH;
552                             }
553                         } while (buffer.hasRemaining());
554                     } else {
555                         ByteBuffer[] b = new ByteBuffer[1 + len];
556                         b[0] = buffer;
557                         System.arraycopy(userData, pos, b, 1, len);
558                         do {
559                             long r = next.write(b, 0, b.length);
560                             if (r == 0 && buffer.hasRemaining()) {
561                                 return STATE_BUF_FLUSH;
562                             }
563                         } while (buffer.hasRemaining());
564                     }
565                     // fall thru
566                 }
567                 case STATE_BUF_FLUSH: {
568                     // buffer was successfully flushed above
569                     bufferDone();
570                     return STATE_BODY;
571                 }
572                 default: {
573                     throw new IllegalStateException();
574                 }
575             }
576         }
577     }
578
579     private boolean flushHeaderBuffer(ByteBuffer buffer, String string, HeaderValues headerValues, int charIndex, long fiCookie, int valueIdx) throws IOException {
580         int res;
581         buffer.flip();
582         do {
583             res = next.write(buffer);
584             if (res == 0) {
585                 this.string = string;
586                 this.headerValues = headerValues;
587                 this.charIndex = charIndex;
588                 this.fiCookie = fiCookie;
589                 this.valueIdx = valueIdx;
590                 return true;
591             }
592         } while (buffer.hasRemaining());
593         buffer.clear();
594         return false;
595     }
596
597     public int write(final ByteBuffer src) throws IOException {
598         try {
599             int oldState = this.state;
600             int state = oldState & MASK_STATE;
601             int alreadyWritten = 0;
602             int originalRemaining = -1;
603             try {
604                 if (state != 0) {
605                     originalRemaining = src.remaining();
606                     state = processWrite(state, src, -1, -1);
607                     if (state != 0) {
608                         return 0;
609                     }
610                     alreadyWritten = originalRemaining - src.remaining();
611                     if (allAreSet(oldState, FLAG_SHUTDOWN)) {
612                         next.terminateWrites();
613                         throw new ClosedChannelException();
614                     }
615                 }
616                 if (alreadyWritten != originalRemaining) {
617                     return next.write(src) + alreadyWritten;
618                 }
619                 return alreadyWritten;
620             } finally {
621                 this.state = oldState & ~MASK_STATE | state;
622             }
623         } catch(IOException|RuntimeException|Error e) {
624             IoUtils.safeClose(connection);
625             throw e;
626         }
627     }
628
629     public long write(final ByteBuffer[] srcs) throws IOException {
630         return write(srcs, 0, srcs.length);
631     }
632
633     public long write(final ByteBuffer[] srcs, final int offset, final int length) throws IOException {
634         if (length == 0) {
635             return 0L;
636         }
637         int oldVal = state;
638         int state = oldVal & MASK_STATE;
639         try {
640             if (state != 0) {
641                 long rem = Buffers.remaining(srcs, offset, length);
642                 state = processWrite(state, srcs, offset, length);
643
644                 long ret  = rem - Buffers.remaining(srcs, offset, length);
645                 if (state != 0) {
646                     return ret;
647                 }
648                 if (allAreSet(oldVal, FLAG_SHUTDOWN)) {
649                     next.terminateWrites();
650                     throw new ClosedChannelException();
651                 }
652                 //we don't attempt to write again
653                 return ret;
654             }
655             return length == 1 ? next.write(srcs[offset]) : next.write(srcs, offset, length);
656         } catch (IOException | RuntimeException | Error e) {
657             IoUtils.safeClose(connection);
658             throw e;
659         } finally {
660             this.state = oldVal & ~MASK_STATE | state;
661         }
662     }
663
664     public long transferFrom(final FileChannel src, final long position, final long count) throws IOException {
665         try {
666             if (pooledFileTransferBuffer != null) {
667                 try {
668                     return write(pooledFileTransferBuffer.getBuffer());
669                 } catch (IOException | RuntimeException | Error e) {
670                     if (pooledFileTransferBuffer != null) {
671                         pooledFileTransferBuffer.close();
672                         pooledFileTransferBuffer = null;
673                     }
674                     throw e;
675                 } finally {
676                     if (pooledFileTransferBuffer != null) {
677                         if (!pooledFileTransferBuffer.getBuffer().hasRemaining()) {
678                             pooledFileTransferBuffer.close();
679                             pooledFileTransferBuffer = null;
680                         }
681                     }
682                 }
683             } else if (state != 0) {
684                 final PooledByteBuffer pooled = exchange.getConnection().getByteBufferPool().allocate();
685
686                 ByteBuffer buffer = pooled.getBuffer();
687                 try {
688                     int res = src.read(buffer);
689                     buffer.flip();
690                     if (res <= 0) {
691                         return res;
692                     }
693                     return write(buffer);
694                 } finally {
695                     if (buffer.hasRemaining()) {
696                         pooledFileTransferBuffer = pooled;
697                     } else {
698                         pooled.close();
699                     }
700                 }
701
702             } else {
703                 return next.transferFrom(src, position, count);
704             }
705         } catch (IOException | RuntimeException | Error e) {
706             IoUtils.safeClose(connection);
707             throw e;
708         }
709     }
710
711     public long transferFrom(final StreamSourceChannel source, final long count, final ByteBuffer throughBuffer) throws IOException {
712         try {
713             if (state != 0) {
714                 return IoUtils.transfer(source, count, throughBuffer, new ConduitWritableByteChannel(this));
715             } else {
716                 return next.transferFrom(source, count, throughBuffer);
717             }
718         } catch (IOException| RuntimeException | Error e) {
719             IoUtils.safeClose(connection);
720             throw e;
721         }
722     }
723
724     @Override
725     public int writeFinal(ByteBuffer src) throws IOException {
726         try {
727             return Conduits.writeFinalBasic(this, src);
728         } catch (IOException | RuntimeException | Error e) {
729             IoUtils.safeClose(connection);
730             throw e;
731         }
732     }
733
734     @Override
735     public long writeFinal(ByteBuffer[] srcs, int offset, int length) throws IOException {
736         try {
737             return Conduits.writeFinalBasic(this, srcs, offset, length);
738         } catch (IOException | RuntimeException | Error e) {
739             IoUtils.safeClose(connection);
740             throw e;
741         }
742     }
743
744     public boolean flush() throws IOException {
745         int oldVal = state;
746         int state = oldVal & MASK_STATE;
747         try {
748             if (state != 0) {
749                 state = processWrite(state, null, -1, -1);
750                 if (state != 0) {
751                     return false;
752                 }
753                 if (allAreSet(oldVal, FLAG_SHUTDOWN)) {
754                     next.terminateWrites();
755                     // fall out to the flush
756                 }
757             }
758             return next.flush();
759         } catch (IOException | RuntimeException | Error e) {
760             IoUtils.safeClose(connection);
761             throw e;
762         } finally {
763             this.state = oldVal & ~MASK_STATE | state;
764         }
765     }
766
767
768     public void terminateWrites() throws IOException {
769         try {
770             int oldVal = this.state;
771             if (allAreClear(oldVal, MASK_STATE)) {
772                 next.terminateWrites();
773                 return;
774             }
775             this.state = oldVal | FLAG_SHUTDOWN;
776         } catch (IOException | RuntimeException | Error e) {
777             IoUtils.safeClose(connection);
778             throw e;
779         }
780     }
781
782     public void truncateWrites() throws IOException {
783         try {
784             next.truncateWrites();
785         } catch (IOException | RuntimeException | Error e) {
786             IoUtils.safeClose(connection);
787             throw e;
788         } finally {
789             if (pooledBuffer != null) {
790                 bufferDone();
791             }
792             if(pooledFileTransferBuffer != null) {
793                 pooledFileTransferBuffer.close();
794                 pooledFileTransferBuffer = null;
795             }
796         }
797     }
798
799     public XnioWorker getWorker() {
800         return next.getWorker();
801     }
802
803     void freeBuffers() {
804         done = true;
805         if(pooledBuffer != null) {
806             bufferDone();
807         }
808         if(pooledFileTransferBuffer != null) {
809             pooledFileTransferBuffer.close();
810             pooledFileTransferBuffer = null;
811         }
812     }
813 }
814