1
18
19 package io.undertow.conduits;
20
21 import java.io.IOException;
22 import java.io.UnsupportedEncodingException;
23 import java.nio.ByteBuffer;
24 import java.nio.channels.ClosedChannelException;
25 import java.nio.channels.FileChannel;
26 import java.nio.charset.StandardCharsets;
27 import java.util.concurrent.TimeUnit;
28 import java.util.function.Supplier;
29
30 import io.undertow.UndertowLogger;
31 import io.undertow.server.protocol.http.HttpAttachments;
32 import io.undertow.util.Attachable;
33 import io.undertow.util.AttachmentKey;
34 import io.undertow.util.HeaderMap;
35 import io.undertow.util.HeaderValues;
36 import io.undertow.util.Headers;
37 import io.undertow.util.ImmediatePooledByteBuffer;
38 import org.xnio.IoUtils;
39 import io.undertow.connector.ByteBufferPool;
40 import io.undertow.connector.PooledByteBuffer;
41 import org.xnio.channels.StreamSourceChannel;
42 import org.xnio.conduits.AbstractStreamSinkConduit;
43 import org.xnio.conduits.ConduitWritableByteChannel;
44 import org.xnio.conduits.Conduits;
45 import org.xnio.conduits.StreamSinkConduit;
46
47 import static org.xnio.Bits.allAreClear;
48 import static org.xnio.Bits.anyAreSet;
49
50
55 public class ChunkedStreamSinkConduit extends AbstractStreamSinkConduit<StreamSinkConduit> {
56
57
64 @Deprecated
65 public static final AttachmentKey<HeaderMap> TRAILERS = HttpAttachments.RESPONSE_TRAILERS;
66
67 private final HeaderMap responseHeaders;
68
69 private final ConduitListener<? super ChunkedStreamSinkConduit> finishListener;
70 private final int config;
71
72 private final ByteBufferPool bufferPool;
73
74
77 private static final byte[] LAST_CHUNK = new byte[] {(byte) 48, (byte) 13, (byte) 10};
78
79
82 private static final byte[] CRLF = new byte[] {(byte) 13, (byte) 10};
83
84 private final Attachable attachable;
85 private int state;
86 private int chunkleft = 0;
87
88 private final ByteBuffer chunkingBuffer = ByteBuffer.allocate(12);
89 private final ByteBuffer chunkingSepBuffer;
90 private PooledByteBuffer lastChunkBuffer;
91
92
93 private static final int CONF_FLAG_CONFIGURABLE = 1 << 0;
94 private static final int CONF_FLAG_PASS_CLOSE = 1 << 1;
95
96
99 private static final int FLAG_WRITES_SHUTDOWN = 1;
100 private static final int FLAG_NEXT_SHUTDOWN = 1 << 2;
101 private static final int FLAG_WRITTEN_FIRST_CHUNK = 1 << 3;
102 private static final int FLAG_FIRST_DATA_WRITTEN = 1 << 4;
103 private static final int FLAG_FINISHED = 1 << 5;
104
105
115 public ChunkedStreamSinkConduit(final StreamSinkConduit next, final ByteBufferPool bufferPool, final boolean configurable, final boolean passClose, HeaderMap responseHeaders, final ConduitListener<? super ChunkedStreamSinkConduit> finishListener, final Attachable attachable) {
116 super(next);
117 this.bufferPool = bufferPool;
118 this.responseHeaders = responseHeaders;
119 this.finishListener = finishListener;
120 this.attachable = attachable;
121 config = (configurable ? CONF_FLAG_CONFIGURABLE : 0) | (passClose ? CONF_FLAG_PASS_CLOSE : 0);
122 chunkingSepBuffer = ByteBuffer.allocate(2);
123 chunkingSepBuffer.flip();
124 }
125
126 @Override
127 public int write(final ByteBuffer src) throws IOException {
128 return doWrite(src);
129 }
130
131
132 int doWrite(final ByteBuffer src) throws IOException {
133 if (anyAreSet(state, FLAG_WRITES_SHUTDOWN)) {
134 throw new ClosedChannelException();
135 }
136 if(src.remaining() == 0) {
137 return 0;
138 }
139 this.state |= FLAG_FIRST_DATA_WRITTEN;
140 int oldLimit = src.limit();
141 boolean dataRemaining = false;
142 if (chunkleft == 0 && !chunkingSepBuffer.hasRemaining()) {
143 chunkingBuffer.clear();
144 putIntAsHexString(chunkingBuffer, src.remaining());
145 chunkingBuffer.put(CRLF);
146 chunkingBuffer.flip();
147 chunkingSepBuffer.clear();
148 chunkingSepBuffer.put(CRLF);
149 chunkingSepBuffer.flip();
150 state |= FLAG_WRITTEN_FIRST_CHUNK;
151 chunkleft = src.remaining();
152 } else {
153 if (src.remaining() > chunkleft) {
154 dataRemaining = true;
155 src.limit(chunkleft + src.position());
156 }
157 }
158 try {
159 int chunkingSize = chunkingBuffer.remaining();
160 int chunkingSepSize = chunkingSepBuffer.remaining();
161 if (chunkingSize > 0 || chunkingSepSize > 0 || lastChunkBuffer != null) {
162 int originalRemaining = src.remaining();
163 long result;
164 if (lastChunkBuffer == null || dataRemaining) {
165 final ByteBuffer[] buf = new ByteBuffer[]{chunkingBuffer, src, chunkingSepBuffer};
166 result = next.write(buf, 0, buf.length);
167 } else {
168 final ByteBuffer[] buf = new ByteBuffer[]{chunkingBuffer, src, lastChunkBuffer.getBuffer()};
169 if (anyAreSet(state, CONF_FLAG_PASS_CLOSE)) {
170 result = next.writeFinal(buf, 0, buf.length);
171 } else {
172 result = next.write(buf, 0, buf.length);
173 }
174 if (!src.hasRemaining()) {
175 state |= FLAG_WRITES_SHUTDOWN;
176 }
177 if (!lastChunkBuffer.getBuffer().hasRemaining()) {
178 state |= FLAG_NEXT_SHUTDOWN;
179 lastChunkBuffer.close();
180 }
181 }
182 int srcWritten = originalRemaining - src.remaining();
183 chunkleft -= srcWritten;
184 if (result < chunkingSize) {
185 return 0;
186 } else {
187 return srcWritten;
188 }
189 } else {
190 int result = next.write(src);
191 chunkleft -= result;
192 return result;
193
194 }
195 } finally {
196 src.limit(oldLimit);
197 }
198
199 }
200
201 @Override
202 public void truncateWrites() throws IOException {
203 try {
204 if (lastChunkBuffer != null) {
205 lastChunkBuffer.close();
206 }
207 if (allAreClear(state, FLAG_FINISHED)) {
208 invokeFinishListener();
209 }
210 } finally {
211 super.truncateWrites();
212 }
213 }
214
215 @Override
216 public long write(final ByteBuffer[] srcs, final int offset, final int length) throws IOException {
217 for (int i = offset; i < length; ++i) {
218 if (srcs[i].hasRemaining()) {
219 return write(srcs[i]);
220 }
221 }
222 return 0;
223 }
224
225 @Override
226 public long writeFinal(ByteBuffer[] srcs, int offset, int length) throws IOException {
227 return Conduits.writeFinalBasic(this, srcs, offset, length);
228 }
229
230 @Override
231 public int writeFinal(ByteBuffer src) throws IOException {
232
233 if(!src.hasRemaining()) {
234 terminateWrites();
235 return 0;
236 }
237 if (lastChunkBuffer == null) {
238 createLastChunk(true);
239 }
240 return doWrite(src);
241 }
242
243 @Override
244 public long transferFrom(final FileChannel src, final long position, final long count) throws IOException {
245 if (anyAreSet(state, FLAG_WRITES_SHUTDOWN)) {
246 throw new ClosedChannelException();
247 }
248 return src.transferTo(position, count, new ConduitWritableByteChannel(this));
249 }
250
251 @Override
252 public long transferFrom(final StreamSourceChannel source, final long count, final ByteBuffer throughBuffer) throws IOException {
253 if (anyAreSet(state, FLAG_WRITES_SHUTDOWN)) {
254 throw new ClosedChannelException();
255 }
256 return IoUtils.transfer(source, count, throughBuffer, new ConduitWritableByteChannel(this));
257 }
258
259 @Override
260 public boolean flush() throws IOException {
261 this.state |= FLAG_FIRST_DATA_WRITTEN;
262 if (anyAreSet(state, FLAG_WRITES_SHUTDOWN)) {
263 if (anyAreSet(state, FLAG_NEXT_SHUTDOWN)) {
264 boolean val = next.flush();
265 if (val && allAreClear(state, FLAG_FINISHED)) {
266 invokeFinishListener();
267 }
268 return val;
269 } else {
270 next.write(lastChunkBuffer.getBuffer());
271 if (!lastChunkBuffer.getBuffer().hasRemaining()) {
272 lastChunkBuffer.close();
273 if (anyAreSet(config, CONF_FLAG_PASS_CLOSE)) {
274 next.terminateWrites();
275 }
276 state |= FLAG_NEXT_SHUTDOWN;
277 boolean val = next.flush();
278 if (val && allAreClear(state, FLAG_FINISHED)) {
279 invokeFinishListener();
280 }
281 return val;
282 } else {
283 return false;
284 }
285 }
286 } else {
287 return next.flush();
288 }
289 }
290
291 private void invokeFinishListener() {
292 state |= FLAG_FINISHED;
293 if (finishListener != null) {
294 finishListener.handleEvent(this);
295 }
296 }
297
298 @Override
299 public void terminateWrites() throws IOException {
300 if(anyAreSet(state, FLAG_WRITES_SHUTDOWN)) {
301 return;
302 }
303 if (this.chunkleft != 0) {
304 UndertowLogger.REQUEST_IO_LOGGER.debugf("Channel closed mid-chunk");
305 next.truncateWrites();
306 }
307 if (!anyAreSet(state, FLAG_FIRST_DATA_WRITTEN)) {
308
309
310
311 responseHeaders.put(Headers.CONTENT_LENGTH, "0");
312 responseHeaders.remove(Headers.TRANSFER_ENCODING);
313 state |= FLAG_NEXT_SHUTDOWN | FLAG_WRITES_SHUTDOWN;
314 if(anyAreSet(state, CONF_FLAG_PASS_CLOSE)) {
315 next.terminateWrites();
316 }
317 } else {
318 createLastChunk(false);
319 state |= FLAG_WRITES_SHUTDOWN;
320 }
321 }
322
323 private void createLastChunk(final boolean writeFinal) throws UnsupportedEncodingException {
324 PooledByteBuffer lastChunkBufferPooled = bufferPool.allocate();
325 ByteBuffer lastChunkBuffer = lastChunkBufferPooled.getBuffer();
326 if (writeFinal) {
327 lastChunkBuffer.put(CRLF);
328 } else if(chunkingSepBuffer.hasRemaining()) {
329
330
331 lastChunkBuffer.put(chunkingSepBuffer);
332 }
333 lastChunkBuffer.put(LAST_CHUNK);
334
335 HeaderMap attachment = attachable.getAttachment(HttpAttachments.RESPONSE_TRAILERS);
336 final HeaderMap trailers;
337 Supplier<HeaderMap> supplier = attachable.getAttachment(HttpAttachments.RESPONSE_TRAILER_SUPPLIER);
338 if(attachment != null && supplier == null) {
339 trailers = attachment;
340 } else if(attachment == null && supplier != null) {
341 trailers = supplier.get();
342 } else if(attachment != null) {
343 HeaderMap supplied = supplier.get();
344 for(HeaderValues k : supplied) {
345 attachment.putAll(k.getHeaderName(), k);
346 }
347 trailers = attachment;
348 } else {
349 trailers = null;
350 }
351 if (trailers != null && trailers.size() != 0) {
352 for (HeaderValues trailer : trailers) {
353 for (String val : trailer) {
354 trailer.getHeaderName().appendTo(lastChunkBuffer);
355 lastChunkBuffer.put((byte) ':');
356 lastChunkBuffer.put((byte) ' ');
357 lastChunkBuffer.put(val.getBytes(StandardCharsets.US_ASCII));
358 lastChunkBuffer.put(CRLF);
359 }
360 }
361 lastChunkBuffer.put(CRLF);
362 } else {
363 lastChunkBuffer.put(CRLF);
364 }
365
366
367
368 lastChunkBuffer.flip();
369 ByteBuffer data = ByteBuffer.allocate(lastChunkBuffer.remaining());
370 data.put(lastChunkBuffer);
371 data.flip();
372 this.lastChunkBuffer = new ImmediatePooledByteBuffer(data);
373
374 lastChunkBufferPooled.close();
375 }
376
377 @Override
378 public void awaitWritable() throws IOException {
379 next.awaitWritable();
380 }
381
382 @Override
383 public void awaitWritable(final long time, final TimeUnit timeUnit) throws IOException {
384 next.awaitWritable(time, timeUnit);
385 }
386
387 private static void putIntAsHexString(final ByteBuffer buf, final int v) {
388 byte int3 = (byte) (v >> 24);
389 byte int2 = (byte) (v >> 16);
390 byte int1 = (byte) (v >> 8);
391 byte int0 = (byte) (v );
392 boolean nonZeroFound = false;
393 if (int3 != 0) {
394 buf.put(DIGITS[(0xF0 & int3) >>> 4])
395 .put(DIGITS[0x0F & int3]);
396 nonZeroFound = true;
397 }
398 if (nonZeroFound || int2 != 0) {
399 buf.put(DIGITS[(0xF0 & int2) >>> 4])
400 .put(DIGITS[0x0F & int2]);
401 nonZeroFound = true;
402 }
403 if (nonZeroFound || int1 != 0) {
404 buf.put(DIGITS[(0xF0 & int1) >>> 4])
405 .put(DIGITS[0x0F & int1]);
406 }
407 buf.put(DIGITS[(0xF0 & int0) >>> 4])
408 .put(DIGITS[0x0F & int0]);
409 }
410
411
414 private static final byte[] DIGITS = new byte[] {
415 (byte) 48, (byte) 49, (byte) 50, (byte) 51, (byte) 52, (byte) 53,
416 (byte) 54, (byte) 55, (byte) 56, (byte) 57, (byte) 97, (byte) 98,
417 (byte) 99, (byte) 100, (byte) 101, (byte) 102};
418
419 }
420