1 /*
2  * JBoss, Home of Professional Open Source.
3  * Copyright 2014 Red Hat, Inc., and individual contributors
4  * as indicated by the @author tags.
5  *
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  */

18
19 package io.undertow.conduits;
20
21 import java.io.IOException;
22 import java.io.UnsupportedEncodingException;
23 import java.nio.ByteBuffer;
24 import java.nio.channels.ClosedChannelException;
25 import java.nio.channels.FileChannel;
26 import java.nio.charset.StandardCharsets;
27 import java.util.concurrent.TimeUnit;
28 import java.util.function.Supplier;
29
30 import io.undertow.UndertowLogger;
31 import io.undertow.server.protocol.http.HttpAttachments;
32 import io.undertow.util.Attachable;
33 import io.undertow.util.AttachmentKey;
34 import io.undertow.util.HeaderMap;
35 import io.undertow.util.HeaderValues;
36 import io.undertow.util.Headers;
37 import io.undertow.util.ImmediatePooledByteBuffer;
38 import org.xnio.IoUtils;
39 import io.undertow.connector.ByteBufferPool;
40 import io.undertow.connector.PooledByteBuffer;
41 import org.xnio.channels.StreamSourceChannel;
42 import org.xnio.conduits.AbstractStreamSinkConduit;
43 import org.xnio.conduits.ConduitWritableByteChannel;
44 import org.xnio.conduits.Conduits;
45 import org.xnio.conduits.StreamSinkConduit;
46
47 import static org.xnio.Bits.allAreClear;
48 import static org.xnio.Bits.anyAreSet;
49
50 /**
51  * Channel that implements HTTP chunked transfer coding.
52  *
53  * @author Stuart Douglas
54  */

55 public class ChunkedStreamSinkConduit extends AbstractStreamSinkConduit<StreamSinkConduit> {
56
57     /**
58      * Trailers that are to be attached to the end of the HTTP response. Note that it is the callers responsibility
59      * to make sure the client understands trailers (i.e. they have provided a TE header), and to set the 'Trailers:'
60      * header appropriately.
61      * <p>
62      * This attachment must be set before the {@link #terminateWrites()} method is called.
63      */

64     @Deprecated
65     public static final AttachmentKey<HeaderMap> TRAILERS = HttpAttachments.RESPONSE_TRAILERS;
66
67     private final HeaderMap responseHeaders;
68
69     private final ConduitListener<? super ChunkedStreamSinkConduit> finishListener;
70     private final int config;
71
72     private final ByteBufferPool bufferPool;
73
74     /**
75      * "0\r\n" as bytes in US ASCII encoding.
76      */

77     private static final byte[] LAST_CHUNK = new byte[] {(byte) 48, (byte) 13, (byte) 10};
78
79     /**
80      * "\r\n" as bytes in US ASCII encoding.
81      */

82     private static final byte[] CRLF = new byte[] {(byte) 13, (byte) 10};
83
84     private final Attachable attachable;
85     private int state;
86     private int chunkleft = 0;
87
88     private final ByteBuffer chunkingBuffer = ByteBuffer.allocate(12); //12 is the most
89     private final ByteBuffer chunkingSepBuffer;
90     private PooledByteBuffer lastChunkBuffer;
91
92
93     private static final int CONF_FLAG_CONFIGURABLE = 1 << 0;
94     private static final int CONF_FLAG_PASS_CLOSE = 1 << 1;
95
96     /**
97      * Flag that is set when {@link #terminateWrites()} or @{link #close()} is called
98      */

99     private static final int FLAG_WRITES_SHUTDOWN = 1;
100     private static final int FLAG_NEXT_SHUTDOWN = 1 << 2;
101     private static final int FLAG_WRITTEN_FIRST_CHUNK = 1 << 3;
102     private static final int FLAG_FIRST_DATA_WRITTEN = 1 << 4; //set on first flush or write call
103     private static final int FLAG_FINISHED = 1 << 5;
104
105     /**
106      * Construct a new instance.
107      *
108      * @param next            the channel to wrap
109      * @param configurable    {@code true} to allow configuration of the next channel, {@code false} otherwise
110      * @param passClose       {@code true} to close the underlying channel when this channel is closed, {@code false} otherwise
111      * @param responseHeaders The response headers
112      * @param finishListener  The finish listener
113      * @param attachable      The attachable
114      */

115     public ChunkedStreamSinkConduit(final StreamSinkConduit next, final ByteBufferPool bufferPool, final boolean configurable, final boolean passClose, HeaderMap responseHeaders, final ConduitListener<? super ChunkedStreamSinkConduit> finishListener, final Attachable attachable) {
116         super(next);
117         this.bufferPool = bufferPool;
118         this.responseHeaders = responseHeaders;
119         this.finishListener = finishListener;
120         this.attachable = attachable;
121         config = (configurable ? CONF_FLAG_CONFIGURABLE : 0) | (passClose ? CONF_FLAG_PASS_CLOSE : 0);
122         chunkingSepBuffer = ByteBuffer.allocate(2);
123         chunkingSepBuffer.flip();
124     }
125
126     @Override
127     public int write(final ByteBuffer src) throws IOException {
128         return doWrite(src);
129     }
130
131
132     int doWrite(final ByteBuffer src) throws IOException {
133         if (anyAreSet(state, FLAG_WRITES_SHUTDOWN)) {
134             throw new ClosedChannelException();
135         }
136         if(src.remaining() == 0) {
137             return 0;
138         }
139         this.state |= FLAG_FIRST_DATA_WRITTEN;
140         int oldLimit = src.limit();
141         boolean dataRemaining = false//set to true if there is data in src that still needs to be written out
142         if (chunkleft == 0 && !chunkingSepBuffer.hasRemaining()) {
143             chunkingBuffer.clear();
144             putIntAsHexString(chunkingBuffer, src.remaining());
145             chunkingBuffer.put(CRLF);
146             chunkingBuffer.flip();
147             chunkingSepBuffer.clear();
148             chunkingSepBuffer.put(CRLF);
149             chunkingSepBuffer.flip();
150             state |= FLAG_WRITTEN_FIRST_CHUNK;
151             chunkleft = src.remaining();
152         } else {
153             if (src.remaining() > chunkleft) {
154                 dataRemaining = true;
155                 src.limit(chunkleft + src.position());
156             }
157         }
158         try {
159             int chunkingSize = chunkingBuffer.remaining();
160             int chunkingSepSize = chunkingSepBuffer.remaining();
161             if (chunkingSize > 0 || chunkingSepSize > 0 || lastChunkBuffer != null) {
162                 int originalRemaining = src.remaining();
163                 long result;
164                 if (lastChunkBuffer == null || dataRemaining) {
165                     final ByteBuffer[] buf = new ByteBuffer[]{chunkingBuffer, src, chunkingSepBuffer};
166                     result = next.write(buf, 0, buf.length);
167                 } else {
168                     final ByteBuffer[] buf = new ByteBuffer[]{chunkingBuffer, src, lastChunkBuffer.getBuffer()};
169                     if (anyAreSet(state, CONF_FLAG_PASS_CLOSE)) {
170                         result = next.writeFinal(buf, 0, buf.length);
171                     } else {
172                         result = next.write(buf, 0, buf.length);
173                     }
174                     if (!src.hasRemaining()) {
175                         state |= FLAG_WRITES_SHUTDOWN;
176                     }
177                     if (!lastChunkBuffer.getBuffer().hasRemaining()) {
178                         state |= FLAG_NEXT_SHUTDOWN;
179                         lastChunkBuffer.close();
180                     }
181                 }
182                 int srcWritten = originalRemaining - src.remaining();
183                 chunkleft -= srcWritten;
184                 if (result < chunkingSize) {
185                     return 0;
186                 } else {
187                     return srcWritten;
188                 }
189             } else {
190                 int result = next.write(src);
191                 chunkleft -= result;
192                 return result;
193
194             }
195         } finally {
196             src.limit(oldLimit);
197         }
198
199     }
200
201     @Override
202     public void truncateWrites() throws IOException {
203         try {
204             if (lastChunkBuffer != null) {
205                 lastChunkBuffer.close();
206             }
207             if (allAreClear(state, FLAG_FINISHED)) {
208                 invokeFinishListener();
209             }
210         } finally {
211             super.truncateWrites();
212         }
213     }
214
215     @Override
216     public long write(final ByteBuffer[] srcs, final int offset, final int length) throws IOException {
217         for (int i = offset; i < length; ++i) {
218             if (srcs[i].hasRemaining()) {
219                 return write(srcs[i]);
220             }
221         }
222         return 0;
223     }
224
225     @Override
226     public long writeFinal(ByteBuffer[] srcs, int offset, int length) throws IOException {
227         return Conduits.writeFinalBasic(this, srcs, offset, length);
228     }
229
230     @Override
231     public int writeFinal(ByteBuffer src) throws IOException {
232         //todo: we could optimise this to just set a content length if no data has been written
233         if(!src.hasRemaining()) {
234             terminateWrites();
235             return 0;
236         }
237         if (lastChunkBuffer == null) {
238             createLastChunk(true);
239         }
240         return doWrite(src);
241     }
242
243     @Override
244     public long transferFrom(final FileChannel src, final long position, final long count) throws IOException {
245         if (anyAreSet(state, FLAG_WRITES_SHUTDOWN)) {
246             throw new ClosedChannelException();
247         }
248         return src.transferTo(position, count, new ConduitWritableByteChannel(this));
249     }
250
251     @Override
252     public long transferFrom(final StreamSourceChannel source, final long count, final ByteBuffer throughBuffer) throws IOException {
253         if (anyAreSet(state, FLAG_WRITES_SHUTDOWN)) {
254             throw new ClosedChannelException();
255         }
256         return IoUtils.transfer(source, count, throughBuffer, new ConduitWritableByteChannel(this));
257     }
258
259     @Override
260     public boolean flush() throws IOException {
261         this.state |= FLAG_FIRST_DATA_WRITTEN;
262         if (anyAreSet(state, FLAG_WRITES_SHUTDOWN)) {
263             if (anyAreSet(state, FLAG_NEXT_SHUTDOWN)) {
264                 boolean val = next.flush();
265                 if (val && allAreClear(state, FLAG_FINISHED)) {
266                     invokeFinishListener();
267                 }
268                 return val;
269             } else {
270                 next.write(lastChunkBuffer.getBuffer());
271                 if (!lastChunkBuffer.getBuffer().hasRemaining()) {
272                     lastChunkBuffer.close();
273                     if (anyAreSet(config, CONF_FLAG_PASS_CLOSE)) {
274                         next.terminateWrites();
275                     }
276                     state |= FLAG_NEXT_SHUTDOWN;
277                     boolean val = next.flush();
278                     if (val && allAreClear(state, FLAG_FINISHED)) {
279                         invokeFinishListener();
280                     }
281                     return val;
282                 } else {
283                     return false;
284                 }
285             }
286         } else {
287             return next.flush();
288         }
289     }
290
291     private void invokeFinishListener() {
292         state |= FLAG_FINISHED;
293         if (finishListener != null) {
294             finishListener.handleEvent(this);
295         }
296     }
297
298     @Override
299     public void terminateWrites() throws IOException {
300         if(anyAreSet(state, FLAG_WRITES_SHUTDOWN)) {
301             return;
302         }
303         if (this.chunkleft != 0) {
304             UndertowLogger.REQUEST_IO_LOGGER.debugf("Channel closed mid-chunk");
305             next.truncateWrites();
306         }
307         if (!anyAreSet(state, FLAG_FIRST_DATA_WRITTEN)) {
308             //if no data was actually sent we just remove the transfer encoding header, and set content length 0
309             //TODO: is this the best way to do it?
310             //todo: should we make this behaviour configurable?
311             responseHeaders.put(Headers.CONTENT_LENGTH, "0"); //according to the spec we don't actually need this, but better to be safe
312             responseHeaders.remove(Headers.TRANSFER_ENCODING);
313             state |= FLAG_NEXT_SHUTDOWN | FLAG_WRITES_SHUTDOWN;
314             if(anyAreSet(state, CONF_FLAG_PASS_CLOSE)) {
315                 next.terminateWrites();
316             }
317         } else {
318             createLastChunk(false);
319             state |= FLAG_WRITES_SHUTDOWN;
320         }
321     }
322
323     private void createLastChunk(final boolean writeFinal) throws UnsupportedEncodingException {
324         PooledByteBuffer lastChunkBufferPooled =  bufferPool.allocate();
325         ByteBuffer lastChunkBuffer = lastChunkBufferPooled.getBuffer();
326         if (writeFinal) {
327             lastChunkBuffer.put(CRLF);
328         } else if(chunkingSepBuffer.hasRemaining()) {
329             //the end of chunk /r/n has not been written yet
330             //just add it to this buffer to make managing state easier
331             lastChunkBuffer.put(chunkingSepBuffer);
332         }
333         lastChunkBuffer.put(LAST_CHUNK);
334         //we just assume it will fit
335         HeaderMap attachment = attachable.getAttachment(HttpAttachments.RESPONSE_TRAILERS);
336         final HeaderMap trailers;
337         Supplier<HeaderMap> supplier = attachable.getAttachment(HttpAttachments.RESPONSE_TRAILER_SUPPLIER);
338         if(attachment != null && supplier == null) {
339             trailers = attachment;
340         } else if(attachment == null && supplier != null) {
341             trailers = supplier.get();
342         } else if(attachment != null) {
343             HeaderMap supplied = supplier.get();
344             for(HeaderValues k : supplied) {
345                 attachment.putAll(k.getHeaderName(), k);
346             }
347             trailers = attachment;
348         } else {
349             trailers = null;
350         }
351         if (trailers != null && trailers.size() != 0) {
352             for (HeaderValues trailer : trailers) {
353                 for (String val : trailer) {
354                     trailer.getHeaderName().appendTo(lastChunkBuffer);
355                     lastChunkBuffer.put((byte) ':');
356                     lastChunkBuffer.put((byte) ' ');
357                     lastChunkBuffer.put(val.getBytes(StandardCharsets.US_ASCII));
358                     lastChunkBuffer.put(CRLF);
359                 }
360             }
361             lastChunkBuffer.put(CRLF);
362         } else {
363             lastChunkBuffer.put(CRLF);
364         }
365         //horrible hack
366         //there is a situation where we can get a buffer leak here if the connection is terminated abnormaly
367         //this should be fixed once this channel has its lifecycle tied to the connection, same as fixed length
368         lastChunkBuffer.flip();
369         ByteBuffer data = ByteBuffer.allocate(lastChunkBuffer.remaining());
370         data.put(lastChunkBuffer);
371         data.flip();
372         this.lastChunkBuffer = new ImmediatePooledByteBuffer(data);
373
374         lastChunkBufferPooled.close();
375     }
376
377     @Override
378     public void awaitWritable() throws IOException {
379         next.awaitWritable();
380     }
381
382     @Override
383     public void awaitWritable(final long time, final TimeUnit timeUnit) throws IOException {
384         next.awaitWritable(time, timeUnit);
385     }
386
387     private static void putIntAsHexString(final ByteBuffer buf, final int v) {
388         byte int3 = (byte) (v >> 24);
389         byte int2 = (byte) (v >> 16);
390         byte int1 = (byte) (v >>  8);
391         byte int0 = (byte) (v      );
392         boolean nonZeroFound = false;
393         if (int3 != 0) {
394             buf.put(DIGITS[(0xF0 & int3) >>> 4])
395                .put(DIGITS[0x0F & int3]);
396             nonZeroFound = true;
397         }
398         if (nonZeroFound || int2 != 0) {
399             buf.put(DIGITS[(0xF0 & int2) >>> 4])
400                .put(DIGITS[0x0F & int2]);
401             nonZeroFound = true;
402         }
403         if (nonZeroFound || int1 != 0) {
404             buf.put(DIGITS[(0xF0 & int1) >>> 4])
405                .put(DIGITS[0x0F & int1]);
406         }
407         buf.put(DIGITS[(0xF0 & int0) >>> 4])
408            .put(DIGITS[0x0F & int0]);
409     }
410
411     /**
412      * hexadecimal digits "0123456789abcdef" as bytes in US ASCII encoding.
413      */

414     private static final byte[] DIGITS = new byte[] {
415         (byte) 48, (byte) 49, (byte) 50, (byte) 51, (byte) 52, (byte) 53,
416         (byte) 54, (byte) 55, (byte) 56, (byte) 57, (byte) 97, (byte) 98,
417         (byte) 99, (byte) 100, (byte) 101, (byte) 102};
418
419 }
420