1 /*
2  * JBoss, Home of Professional Open Source.
3  * Copyright 2014 Red Hat, Inc., and individual contributors
4  * as indicated by the @author tags.
5  *
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  */

18
19 package io.undertow.server;
20
21 import io.undertow.UndertowLogger;
22 import io.undertow.UndertowMessages;
23 import io.undertow.UndertowOptions;
24 import io.undertow.channels.DetachableStreamSinkChannel;
25 import io.undertow.channels.DetachableStreamSourceChannel;
26 import io.undertow.conduits.EmptyStreamSourceConduit;
27 import io.undertow.connector.PooledByteBuffer;
28 import io.undertow.io.AsyncReceiverImpl;
29 import io.undertow.io.AsyncSenderImpl;
30 import io.undertow.io.BlockingReceiverImpl;
31 import io.undertow.io.BlockingSenderImpl;
32 import io.undertow.io.Receiver;
33 import io.undertow.io.Sender;
34 import io.undertow.io.UndertowInputStream;
35 import io.undertow.io.UndertowOutputStream;
36 import io.undertow.security.api.SecurityContext;
37 import io.undertow.server.handlers.Cookie;
38 import io.undertow.util.AbstractAttachable;
39 import io.undertow.util.AttachmentKey;
40 import io.undertow.util.ConduitFactory;
41 import io.undertow.util.Cookies;
42 import io.undertow.util.HeaderMap;
43 import io.undertow.util.Headers;
44 import io.undertow.util.HttpString;
45 import io.undertow.util.Methods;
46 import io.undertow.util.NetworkUtils;
47 import io.undertow.util.Protocols;
48 import io.undertow.util.Rfc6265CookieSupport;
49 import io.undertow.util.StatusCodes;
50 import org.jboss.logging.Logger;
51 import org.xnio.Buffers;
52 import org.xnio.ChannelExceptionHandler;
53 import org.xnio.ChannelListener;
54 import org.xnio.ChannelListeners;
55 import org.xnio.IoUtils;
56 import org.xnio.XnioIoThread;
57 import org.xnio.channels.Channels;
58 import org.xnio.channels.Configurable;
59 import org.xnio.channels.StreamSinkChannel;
60 import org.xnio.channels.StreamSourceChannel;
61 import org.xnio.conduits.Conduit;
62 import org.xnio.conduits.ConduitStreamSinkChannel;
63 import org.xnio.conduits.ConduitStreamSourceChannel;
64 import org.xnio.conduits.StreamSinkConduit;
65 import org.xnio.conduits.StreamSourceConduit;
66
67 import java.io.IOException;
68 import java.io.InputStream;
69 import java.io.OutputStream;
70 import java.net.InetSocketAddress;
71 import java.nio.ByteBuffer;
72 import java.nio.channels.Channel;
73 import java.nio.channels.FileChannel;
74 import java.util.ArrayDeque;
75 import java.util.Deque;
76 import java.util.Map;
77 import java.util.TreeMap;
78 import java.util.concurrent.Executor;
79 import java.util.concurrent.TimeUnit;
80
81 import static org.xnio.Bits.allAreSet;
82 import static org.xnio.Bits.anyAreClear;
83 import static org.xnio.Bits.anyAreSet;
84 import static org.xnio.Bits.intBitMask;
85
86 /**
87  * An HTTP server request/response exchange.  An instance of this class is constructed as soon as the request headers are
88  * fully parsed.
89  *
90  * @author <a href="mailto:david.lloyd@redhat.com">David M. Lloyd</a>
91  */

92 public final class HttpServerExchange extends AbstractAttachable {
93
94     // immutable state
95
96     private static final Logger log = Logger.getLogger(HttpServerExchange.class);
97
98     private static final RuntimePermission SET_SECURITY_CONTEXT = new RuntimePermission("io.undertow.SET_SECURITY_CONTEXT");
99     private static final String ISO_8859_1 = "ISO-8859-1";
100     private static final String HTTPS = "https";
101
102     /**
103      * The HTTP reason phrase to send. This is an attachment rather than a field as it is rarely used. If this is not set
104      * a generic description from the RFC is used instead.
105      */

106     private static final AttachmentKey<String> REASON_PHRASE = AttachmentKey.create(String.class);
107
108     /**
109      * The attachment key that buffered request data is attached under.
110      */

111     static final AttachmentKey<PooledByteBuffer[]> BUFFERED_REQUEST_DATA = AttachmentKey.create(PooledByteBuffer[].class);
112
113     /**
114      * Attachment key that can be used to hold additional request attributes
115      */

116     public static final AttachmentKey<Map<String, String>> REQUEST_ATTRIBUTES = AttachmentKey.create(Map.class);
117
118     /**
119      * Attachment key that can be used to hold a remotely authenticated user
120      */

121     public static final AttachmentKey<String> REMOTE_USER = AttachmentKey.create(String.class);
122
123
124     /**
125      * Attachment key that can be used as a flag of secure attribute
126      */

127     public static final AttachmentKey<Boolean> SECURE_REQUEST = AttachmentKey.create(Boolean.class);
128
129     private final ServerConnection connection;
130     private final HeaderMap requestHeaders;
131     private final HeaderMap responseHeaders;
132
133     private int exchangeCompletionListenersCount = 0;
134     private ExchangeCompletionListener[] exchangeCompleteListeners;
135     private DefaultResponseListener[] defaultResponseListeners;
136
137     private Map<String, Deque<String>> queryParameters;
138     private Map<String, Deque<String>> pathParameters;
139
140     private Map<String, Cookie> requestCookies;
141     private Map<String, Cookie> responseCookies;
142
143     /**
144      * The actual response channel. May be null if it has not been created yet.
145      */

146     private WriteDispatchChannel responseChannel;
147     /**
148      * The actual request channel. May be null if it has not been created yet.
149      */

150     protected ReadDispatchChannel requestChannel;
151
152     private BlockingHttpExchange blockingHttpExchange;
153
154     private HttpString protocol;
155
156     /**
157      * The security context
158      */

159     private SecurityContext securityContext;
160
161     // mutable state
162
163     private int state = 200;
164     private HttpString requestMethod = HttpString.EMPTY;
165     private String requestScheme;
166
167     /**
168      * The original request URI. This will include the host name if it was specified by the client.
169      * <p>
170      * This is not decoded in any way, and does not include the query string.
171      * <p>
172      * Examples:
173      * GET http://localhost:8080/myFile.jsf?foo=bar HTTP/1.1 -> 'http://localhost:8080/myFile.jsf'
174      * POST /my+File.jsf?foo=bar HTTP/1.1 -> '/my+File.jsf'
175      */

176     private String requestURI;
177
178     /**
179      * The request path. This will be decoded by the server, and does not include the query string.
180      * <p>
181      * This path is not canonicalised, so care must be taken to ensure that escape attacks are not possible.
182      * <p>
183      * Examples:
184      * GET http://localhost:8080/b/../my+File.jsf?foo=bar HTTP/1.1 -> '/b/../my+File.jsf'
185      * POST /my+File.jsf?foo=bar HTTP/1.1 -> '/my File.jsf'
186      */

187     private String requestPath;
188
189     /**
190      * The remaining unresolved portion of request path. If a {@link io.undertow.server.handlers.CanonicalPathHandler} is
191      * installed this will be canonicalised.
192      * <p>
193      * Initially this will be equal to {@link #requestPath}, however it will be modified as handlers resolve the path.
194      */

195     private String relativePath;
196
197     /**
198      * The resolved part of the canonical path.
199      */

200     private String resolvedPath = "";
201
202     /**
203      * the query string
204      */

205     private String queryString = "";
206
207     private int requestWrapperCount = 0;
208     private ConduitWrapper<StreamSourceConduit>[] requestWrappers; //we don't allocate these by default, as for get requests they are not used
209
210     private int responseWrapperCount = 0;
211     private ConduitWrapper<StreamSinkConduit>[] responseWrappers;
212
213     private Sender sender;
214     private Receiver receiver;
215
216     private long requestStartTime = -1;
217
218
219     /**
220      * The maximum entity size. This can be modified before the request stream is obtained, however once the request
221      * stream is obtained this cannot be modified further.
222      * <p>
223      * The default value for this is determined by the {@link io.undertow.UndertowOptions#MAX_ENTITY_SIZE} option. A value
224      * of 0 indicates that this is unbounded.
225      * <p>
226      * If this entity size is exceeded the request channel will be forcibly closed.
227      * <p>
228      * TODO: integrate this with HTTP 100-continue responses, to make it possible to send a 417 rather than just forcibly
229      * closing the channel.
230      *
231      * @see io.undertow.UndertowOptions#MAX_ENTITY_SIZE
232      */

233     private long maxEntitySize;
234
235     /**
236      * When the call stack return this task will be executed by the executor specified in {@link #dispatchExecutor}.
237      * If the executor is null then it will be executed by the XNIO worker.
238      */

239     private Runnable dispatchTask;
240
241     /**
242      * The executor that is to be used to dispatch the {@link #dispatchTask}. Note that this is not cleared
243      * between dispatches, so once a request has been dispatched once then all subsequent dispatches will use
244      * the same executor.
245      */

246     private Executor dispatchExecutor;
247
248     /**
249      * The number of bytes that have been sent to the remote client. This does not include headers,
250      * only the entity body, and does not take any transfer or content encoding into account.
251      */

252     private long responseBytesSent = 0;
253
254
255     private static final int MASK_RESPONSE_CODE = intBitMask(0, 9);
256
257     /**
258      * Flag that is set when the response sending begins
259      */

260     private static final int FLAG_RESPONSE_SENT = 1 << 10;
261
262     /**
263      * Flag that is sent when the response has been fully written and flushed.
264      */

265     private static final int FLAG_RESPONSE_TERMINATED = 1 << 11;
266
267     /**
268      * Flag that is set once the request has been fully read. For zero
269      * length requests this is set immediately.
270      */

271     private static final int FLAG_REQUEST_TERMINATED = 1 << 12;
272
273     /**
274      * Flag that is set if this is a persistent connection, and the
275      * connection should be re-used.
276      */

277     private static final int FLAG_PERSISTENT = 1 << 14;
278
279     /**
280      * If this flag is set it means that the request has been dispatched,
281      * and will not be ending when the call stack returns.
282      * <p>
283      * This could be because it is being dispatched to a worker thread from
284      * an IO thread, or because resume(Reads/Writes) has been called.
285      */

286     private static final int FLAG_DISPATCHED = 1 << 15;
287
288     /**
289      * Flag that is set if the {@link #requestURI} field contains the hostname.
290      */

291     private static final int FLAG_URI_CONTAINS_HOST = 1 << 16;
292
293     /**
294      * If this flag is set then the request is current running through a
295      * handler chain.
296      * <p>
297      * This will be true most of the time, this only time this will return
298      * false is when performing async operations outside the scope of a call to
299      * {@link Connectors#executeRootHandler(HttpHandler, HttpServerExchange)},
300      * such as when performing async IO.
301      * <p>
302      * If this is true then when the call stack returns the exchange will either be dispatched,
303      * or the exchange will be ended.
304      */

305     private static final int FLAG_IN_CALL = 1 << 17;
306     /**
307      * Flag that indicates that reads should be resumed when the call stack returns.
308      */

309     private static final int FLAG_SHOULD_RESUME_READS = 1 << 18;
310
311     /**
312      * Flag that indicates that writes should be resumed when the call stack returns
313      */

314     private static final int FLAG_SHOULD_RESUME_WRITES = 1 << 19;
315
316     /**
317      * Flag that indicates that the request channel has been reset, and {@link #getRequestChannel()} can be called again
318      */

319     private static final int FLAG_REQUEST_RESET= 1 << 20;
320
321     /**
322      * The source address for the request. If this is null then the actual source address from the channel is used
323      */

324     private InetSocketAddress sourceAddress;
325
326     /**
327      * The destination address for the request. If this is null then the actual source address from the channel is used
328      */

329     private InetSocketAddress destinationAddress;
330
331     public HttpServerExchange(final ServerConnection connection, long maxEntitySize) {
332         this(connection, new HeaderMap(), new HeaderMap(), maxEntitySize);
333     }
334
335     public HttpServerExchange(final ServerConnection connection) {
336         this(connection, 0);
337     }
338
339     public HttpServerExchange(final ServerConnection connection, final HeaderMap requestHeaders, final HeaderMap responseHeaders,  long maxEntitySize) {
340         this.connection = connection;
341         this.maxEntitySize = maxEntitySize;
342         this.requestHeaders = requestHeaders;
343         this.responseHeaders = responseHeaders;
344     }
345
346     /**
347      * Get the request protocol string.  Normally this is one of the strings listed in {@link Protocols}.
348      *
349      * @return the request protocol string
350      */

351     public HttpString getProtocol() {
352         return protocol;
353     }
354
355     /**
356      * Sets the http protocol
357      *
358      * @param protocol
359      */

360     public HttpServerExchange setProtocol(final HttpString protocol) {
361         this.protocol = protocol;
362         return this;
363     }
364
365     /**
366      * Determine whether this request conforms to HTTP 0.9.
367      *
368      * @return {@code trueif the request protocol is equal to {@link Protocols#HTTP_0_9}, {@code false} otherwise
369      */

370     public boolean isHttp09() {
371         return protocol.equals(Protocols.HTTP_0_9);
372     }
373
374     /**
375      * Determine whether this request conforms to HTTP 1.0.
376      *
377      * @return {@code trueif the request protocol is equal to {@link Protocols#HTTP_1_0}, {@code false} otherwise
378      */

379     public boolean isHttp10() {
380         return protocol.equals(Protocols.HTTP_1_0);
381     }
382
383     /**
384      * Determine whether this request conforms to HTTP 1.1.
385      *
386      * @return {@code trueif the request protocol is equal to {@link Protocols#HTTP_1_1}, {@code false} otherwise
387      */

388     public boolean isHttp11() {
389         return protocol.equals(Protocols.HTTP_1_1);
390     }
391
392     public boolean isSecure() {
393         Boolean secure = getAttachment(SECURE_REQUEST);
394         if(secure != null && secure) {
395             return true;
396         }
397         String scheme = getRequestScheme();
398         if (scheme != null && scheme.equalsIgnoreCase(HTTPS)) {
399             return true;
400         }
401         return false;
402     }
403
404     /**
405      * Get the HTTP request method.  Normally this is one of the strings listed in {@link io.undertow.util.Methods}.
406      *
407      * @return the HTTP request method
408      */

409     public HttpString getRequestMethod() {
410         return requestMethod;
411     }
412
413     /**
414      * Set the HTTP request method.
415      *
416      * @param requestMethod the HTTP request method
417      */

418     public HttpServerExchange setRequestMethod(final HttpString requestMethod) {
419         this.requestMethod = requestMethod;
420         return this;
421     }
422
423     /**
424      * Get the request URI scheme.  Normally this is one of {@code http} or {@code https}.
425      *
426      * @return the request URI scheme
427      */

428     public String getRequestScheme() {
429         return requestScheme;
430     }
431
432     /**
433      * Set the request URI scheme.
434      *
435      * @param requestScheme the request URI scheme
436      */

437     public HttpServerExchange setRequestScheme(final String requestScheme) {
438         this.requestScheme = requestScheme;
439         return this;
440     }
441
442     /**
443      * The original request URI. This will include the host name, protocol etc
444      * if it was specified by the client.
445      * <p>
446      * This is not decoded in any way, and does not include the query string.
447      * <p>
448      * Examples:
449      * GET http://localhost:8080/myFile.jsf?foo=bar HTTP/1.1 -&gt; 'http://localhost:8080/myFile.jsf'
450      * POST /my+File.jsf?foo=bar HTTP/1.1 -&gt; '/my+File.jsf'
451      */

452     public String getRequestURI() {
453         return requestURI;
454     }
455
456     /**
457      * Sets the request URI
458      *
459      * @param requestURI The new request URI
460      */

461     public HttpServerExchange setRequestURI(final String requestURI) {
462         this.requestURI = requestURI;
463         return this;
464     }
465
466     /**
467      * Sets the request URI
468      *
469      * @param requestURI   The new request URI
470      * @param containsHost If this is true the request URI contains the host part
471      */

472     public HttpServerExchange setRequestURI(final String requestURI, boolean containsHost) {
473         this.requestURI = requestURI;
474         if (containsHost) {
475             this.state |= FLAG_URI_CONTAINS_HOST;
476         } else {
477             this.state &= ~FLAG_URI_CONTAINS_HOST;
478         }
479         return this;
480     }
481
482     /**
483      * If a request was submitted to the server with a full URI instead of just a path this
484      * will return true. For example:
485      * <p>
486      * GET http://localhost:8080/b/../my+File.jsf?foo=bar HTTP/1.1 -&gt; true
487      * POST /my+File.jsf?foo=bar HTTP/1.1 -&gt; false
488      *
489      * @return <code>true</code> If the request URI contains the host part of the URI
490      */

491     public boolean isHostIncludedInRequestURI() {
492         return anyAreSet(state, FLAG_URI_CONTAINS_HOST);
493     }
494
495
496     /**
497      * The request path. This will be decoded by the server, and does not include the query string.
498      * <p>
499      * This path is not canonicalised, so care must be taken to ensure that escape attacks are not possible.
500      * <p>
501      * Examples:
502      * GET http://localhost:8080/b/../my+File.jsf?foo=bar HTTP/1.1 -&gt; '/b/../my+File.jsf'
503      * POST /my+File.jsf?foo=bar HTTP/1.1 -&gt; '/my File.jsf'
504      */

505     public String getRequestPath() {
506         return requestPath;
507     }
508
509     /**
510      * Set the request URI path.
511      *
512      * @param requestPath the request URI path
513      */

514     public HttpServerExchange setRequestPath(final String requestPath) {
515         this.requestPath = requestPath;
516         return this;
517     }
518
519     /**
520      * Get the request relative path.  This is the path which should be evaluated by the current handler.
521      * <p>
522      * If the {@link io.undertow.server.handlers.CanonicalPathHandler} is installed in the current chain
523      * then this path with be canonicalized
524      *
525      * @return the request relative path
526      */

527     public String getRelativePath() {
528         return relativePath;
529     }
530
531     /**
532      * Set the request relative path.
533      *
534      * @param relativePath the request relative path
535      */

536     public HttpServerExchange setRelativePath(final String relativePath) {
537         this.relativePath = relativePath;
538         return this;
539     }
540
541     /**
542      * Get the resolved path.
543      *
544      * @return the resolved path
545      */

546     public String getResolvedPath() {
547         return resolvedPath;
548     }
549
550     /**
551      * Set the resolved path.
552      *
553      * @param resolvedPath the resolved path
554      */

555     public HttpServerExchange setResolvedPath(final String resolvedPath) {
556         this.resolvedPath = resolvedPath;
557         return this;
558     }
559
560     /**
561      *
562      * @return The query string, without the leading ?
563      */

564     public String getQueryString() {
565         return queryString;
566     }
567
568     public HttpServerExchange setQueryString(final String queryString) {
569         this.queryString = queryString;
570         return this;
571     }
572
573     /**
574      * Reconstructs the complete URL as seen by the user. This includes scheme, host name etc,
575      * but does not include query string.
576      * <p>
577      * This is not decoded.
578      */

579     public String getRequestURL() {
580         if (isHostIncludedInRequestURI()) {
581             return getRequestURI();
582         } else {
583             return getRequestScheme() + "://" + getHostAndPort() + getRequestURI();
584         }
585     }
586
587     /**
588      * Returns the request charset. If none was explicitly specified it will return
589      * "ISO-8859-1", which is the default charset for HTTP requests.
590      *
591      * @return The character encoding
592      */

593     public String getRequestCharset() {
594         return extractCharset(requestHeaders);
595     }
596
597     /**
598      * Returns the response charset. If none was explicitly specified it will return
599      * "ISO-8859-1", which is the default charset for HTTP requests.
600      *
601      * @return The character encoding
602      */

603     public String getResponseCharset() {
604         HeaderMap headers = responseHeaders;
605         return extractCharset(headers);
606     }
607
608     private String extractCharset(HeaderMap headers) {
609         String contentType = headers.getFirst(Headers.CONTENT_TYPE);
610         if (contentType != null) {
611             String value = Headers.extractQuotedValueFromHeader(contentType, "charset");
612             if (value != null) {
613                 return value;
614             }
615         }
616         return ISO_8859_1;
617     }
618
619     /**
620      * Return the host that this request was sent to, in general this will be the
621      * value of the Host header, minus the port specifier.
622      * <p>
623      * If this resolves to an IPv6 address it will not be enclosed by square brackets.
624      * Care must be taken when constructing URLs based on this method to ensure IPv6 URLs
625      * are handled correctly.
626      *
627      * @return The host part of the destination address
628      */

629     public String getHostName() {
630         String host = requestHeaders.getFirst(Headers.HOST);
631         if (host == null || "".equals(host.trim())) {
632             host = getDestinationAddress().getHostString();
633         } else {
634             if (host.startsWith("[")) {
635                 host = host.substring(1, host.indexOf(']'));
636             } else if (host.indexOf(':') != -1) {
637                 host = host.substring(0, host.indexOf(':'));
638             }
639         }
640         return host;
641     }
642
643     /**
644      * Return the host, and also the port if this request was sent to a non-standard port. In general
645      * this will just be the value of the Host header.
646      * <p>
647      * If this resolves to an IPv6 address it *will*  be enclosed by square brackets. The return
648      * value of this method is suitable for inclusion in a URL.
649      *
650      * @return The host and port part of the destination address
651      */

652     public String getHostAndPort() {
653         String host = requestHeaders.getFirst(Headers.HOST);
654         if (host == null || "".equals(host.trim())) {
655             InetSocketAddress address = getDestinationAddress();
656             host = NetworkUtils.formatPossibleIpv6Address(address.getHostString());
657             int port = address.getPort();
658             if (!((getRequestScheme().equals("http") && port == 80)
659                     || (getRequestScheme().equals("https") && port == 443))) {
660                 host = host + ":" + port;
661             }
662         }
663         return host;
664     }
665
666     /**
667      * Return the port that this request was sent to. In general this will be the value of the Host
668      * header, minus the host name.
669      *
670      * @return The port part of the destination address
671      */

672     public int getHostPort() {
673         String host = requestHeaders.getFirst(Headers.HOST);
674         if (host != null) {
675             //for ipv6 addresses we make sure we take out the first part, which can have multiple occurrences of :
676             final int colonIndex;
677             if (host.startsWith("[")) {
678                 colonIndex = host.indexOf(':', host.indexOf(']'));
679             } else {
680                 colonIndex = host.indexOf(':');
681             }
682             if (colonIndex != -1) {
683                 try {
684                     return Integer.parseInt(host.substring(colonIndex + 1));
685                 } catch (NumberFormatException ignore) {}
686             }
687             if (getRequestScheme().equals("https")) {
688                 return 443;
689             } else if (getRequestScheme().equals("http")) {
690                 return 80;
691             }
692
693         }
694         return getDestinationAddress().getPort();
695     }
696
697     /**
698      * Get the underlying HTTP connection.
699      *
700      * @return the underlying HTTP connection
701      */

702     public ServerConnection getConnection() {
703         return connection;
704     }
705
706     public boolean isPersistent() {
707         return anyAreSet(state, FLAG_PERSISTENT);
708     }
709
710     /**
711      *
712      * @return <code>true</code> If the current thread in the IO thread for the exchange
713      */

714     public boolean isInIoThread() {
715         return getIoThread() == Thread.currentThread();
716     }
717
718     /**
719      *
720      * @return True if this exchange represents an upgrade response
721      */

722     public boolean isUpgrade() {
723         return getStatusCode() == StatusCodes.SWITCHING_PROTOCOLS;
724     }
725
726     /**
727      *
728      * @return The number of bytes sent in the entity body
729      */

730     public long getResponseBytesSent() {
731         if(Connectors.isEntityBodyAllowed(this) && !getRequestMethod().equals(Methods.HEAD)) {
732             return responseBytesSent;
733         } else {
734             return 0; //body is not allowed, even if we attempt to write it will be ignored
735         }
736     }
737
738     /**
739      * Updates the number of response bytes sent. Used when compression is in use
740      * @param bytes The number of bytes to increase the response size by. May be negative
741      */

742     void updateBytesSent(long bytes) {
743         if(Connectors.isEntityBodyAllowed(this) && !getRequestMethod().equals(Methods.HEAD)) {
744             responseBytesSent += bytes;
745         }
746     }
747
748     public HttpServerExchange setPersistent(final boolean persistent) {
749         if (persistent) {
750             this.state = this.state | FLAG_PERSISTENT;
751         } else {
752             this.state = this.state & ~FLAG_PERSISTENT;
753         }
754         return this;
755     }
756
757     public boolean isDispatched() {
758         return anyAreSet(state, FLAG_DISPATCHED);
759     }
760
761     public HttpServerExchange unDispatch() {
762         state &= ~FLAG_DISPATCHED;
763         dispatchTask = null;
764         return this;
765     }
766
767     /**
768      * {@link #dispatch(Executor, Runnable)} should be used instead of this method, as it is hard to use safely.
769      *
770      * Use {@link io.undertow.util.SameThreadExecutor#INSTANCE} if you do not want to dispatch to another thread.
771      *
772      * @return this exchange
773      */

774     @Deprecated
775     public HttpServerExchange dispatch() {
776         state |= FLAG_DISPATCHED;
777         return this;
778     }
779
780     /**
781      * Dispatches this request to the XNIO worker thread pool. Once the call stack returns
782      * the given runnable will be submitted to the executor.
783      * <p>
784      * In general handlers should first check the value of {@link #isInIoThread()} before
785      * calling this method, and only dispatch if the request is actually running in the IO
786      * thread.
787      *
788      * @param runnable The task to run
789      * @throws IllegalStateException If this exchange has already been dispatched
790      */

791     public HttpServerExchange dispatch(final Runnable runnable) {
792         dispatch(null, runnable);
793         return this;
794     }
795
796     /**
797      * Dispatches this request to the given executor. Once the call stack returns
798      * the given runnable will be submitted to the executor.
799      * <p>
800      * In general handlers should first check the value of {@link #isInIoThread()} before
801      * calling this method, and only dispatch if the request is actually running in the IO
802      * thread.
803      *
804      * @param runnable The task to run
805      * @throws IllegalStateException If this exchange has already been dispatched
806      */

807     public HttpServerExchange dispatch(final Executor executor, final Runnable runnable) {
808         if (isInCall()) {
809             if (executor != null) {
810                 this.dispatchExecutor = executor;
811             }
812             state |= FLAG_DISPATCHED;
813             if(anyAreSet(state, FLAG_SHOULD_RESUME_READS | FLAG_SHOULD_RESUME_WRITES)) {
814                 throw UndertowMessages.MESSAGES.resumedAndDispatched();
815             }
816             this.dispatchTask = runnable;
817         } else {
818             if (executor == null) {
819                 getConnection().getWorker().execute(runnable);
820             } else {
821                 executor.execute(runnable);
822             }
823         }
824         return this;
825     }
826
827     public HttpServerExchange dispatch(final HttpHandler handler) {
828         dispatch(null, handler);
829         return this;
830     }
831
832     public HttpServerExchange dispatch(final Executor executor, final HttpHandler handler) {
833         final Runnable runnable = new Runnable() {
834             @Override
835             public void run() {
836                 Connectors.executeRootHandler(handler, HttpServerExchange.this);
837             }
838         };
839         dispatch(executor, runnable);
840         return this;
841     }
842
843     /**
844      * Sets the executor that is used for dispatch operations where no executor is specified.
845      *
846      * @param executor The executor to use
847      */

848     public HttpServerExchange setDispatchExecutor(final Executor executor) {
849         if (executor == null) {
850             dispatchExecutor = null;
851         } else {
852             dispatchExecutor = executor;
853         }
854         return this;
855     }
856
857     /**
858      * Gets the current executor that is used for dispatch operations. This may be null
859      *
860      * @return The current dispatch executor
861      */

862     public Executor getDispatchExecutor() {
863         return dispatchExecutor;
864     }
865
866     /**
867      * @return The current dispatch task
868      */

869     Runnable getDispatchTask() {
870         return dispatchTask;
871     }
872
873     boolean isInCall() {
874         return anyAreSet(state, FLAG_IN_CALL);
875     }
876
877     HttpServerExchange setInCall(boolean value) {
878         if (value) {
879             state |= FLAG_IN_CALL;
880         } else {
881             state &= ~FLAG_IN_CALL;
882         }
883         return this;
884     }
885
886
887     /**
888      * Upgrade the channel to a raw socket. This method set the response code to 101, and then marks both the
889      * request and response as terminated, which means that once the current request is completed the raw channel
890      * can be obtained from {@link io.undertow.server.protocol.http.HttpServerConnection#getChannel()}
891      *
892      * @throws IllegalStateException if a response or upgrade was already sent, or if the request body is already being
893      *                               read
894      */

895     public HttpServerExchange upgradeChannel(final HttpUpgradeListener listener) {
896         if (!connection.isUpgradeSupported()) {
897             throw UndertowMessages.MESSAGES.upgradeNotSupported();
898         }
899         if(!getRequestHeaders().contains(Headers.UPGRADE)) {
900             throw UndertowMessages.MESSAGES.notAnUpgradeRequest();
901         }
902         UndertowLogger.REQUEST_LOGGER.debugf("Upgrading request %s"this);
903         connection.setUpgradeListener(listener);
904         setStatusCode(StatusCodes.SWITCHING_PROTOCOLS);
905         getResponseHeaders().put(Headers.CONNECTION, Headers.UPGRADE_STRING);
906         return this;
907     }
908
909     /**
910      * Upgrade the channel to a raw socket. This method set the response code to 101, and then marks both the
911      * request and response as terminated, which means that once the current request is completed the raw channel
912      * can be obtained from {@link io.undertow.server.protocol.http.HttpServerConnection#getChannel()}
913      *
914      * @param productName the product name to report to the client
915      * @throws IllegalStateException if a response or upgrade was already sent, or if the request body is already being
916      *                               read
917      */

918     public HttpServerExchange upgradeChannel(String productName, final HttpUpgradeListener listener) {
919         if (!connection.isUpgradeSupported()) {
920             throw UndertowMessages.MESSAGES.upgradeNotSupported();
921         }
922         UndertowLogger.REQUEST_LOGGER.debugf("Upgrading request %s"this);
923         connection.setUpgradeListener(listener);
924         setStatusCode(StatusCodes.SWITCHING_PROTOCOLS);
925         final HeaderMap headers = getResponseHeaders();
926         headers.put(Headers.UPGRADE, productName);
927         headers.put(Headers.CONNECTION, Headers.UPGRADE_STRING);
928         return this;
929     }
930
931     /**
932      *
933      * @param connectListener
934      * @return
935      */

936     public HttpServerExchange acceptConnectRequest(HttpUpgradeListener connectListener) {
937         if(!getRequestMethod().equals(Methods.CONNECT)) {
938             throw UndertowMessages.MESSAGES.notAConnectRequest();
939         }
940         connection.setConnectListener(connectListener);
941         return this;
942     }
943
944
945     public HttpServerExchange addExchangeCompleteListener(final ExchangeCompletionListener listener) {
946         if(isComplete() || this.exchangeCompletionListenersCount == -1) {
947             throw UndertowMessages.MESSAGES.exchangeAlreadyComplete();
948         }
949         final int exchangeCompletionListenersCount = this.exchangeCompletionListenersCount++;
950         ExchangeCompletionListener[] exchangeCompleteListeners = this.exchangeCompleteListeners;
951         if (exchangeCompleteListeners == null || exchangeCompleteListeners.length == exchangeCompletionListenersCount) {
952             ExchangeCompletionListener[] old = exchangeCompleteListeners;
953             this.exchangeCompleteListeners = exchangeCompleteListeners = new ExchangeCompletionListener[exchangeCompletionListenersCount + 2];
954             if(old != null) {
955                 System.arraycopy(old, 0, exchangeCompleteListeners, 0, exchangeCompletionListenersCount);
956             }
957         }
958         exchangeCompleteListeners[exchangeCompletionListenersCount] = listener;
959         return this;
960     }
961
962     public HttpServerExchange addDefaultResponseListener(final DefaultResponseListener listener) {
963         int i = 0;
964         if(defaultResponseListeners == null) {
965             defaultResponseListeners = new DefaultResponseListener[2];
966         } else {
967             while (i != defaultResponseListeners.length && defaultResponseListeners[i] != null) {
968                 ++i;
969             }
970             if (i == defaultResponseListeners.length) {
971                 DefaultResponseListener[] old = defaultResponseListeners;
972                 defaultResponseListeners = new DefaultResponseListener[defaultResponseListeners.length + 2];
973                 System.arraycopy(old, 0, defaultResponseListeners, 0, old.length);
974             }
975         }
976         defaultResponseListeners[i] = listener;
977         return this;
978     }
979
980     /**
981      * Get the source address of the HTTP request.
982      *
983      * @return the source address of the HTTP request
984      */

985     public InetSocketAddress getSourceAddress() {
986         if (sourceAddress != null) {
987             return sourceAddress;
988         }
989         return connection.getPeerAddress(InetSocketAddress.class);
990     }
991
992     /**
993      * Sets the source address of the HTTP request. If this is not explicitly set
994      * the actual source address of the channel is used.
995      *
996      * @param sourceAddress The address
997      */

998     public HttpServerExchange setSourceAddress(InetSocketAddress sourceAddress) {
999         this.sourceAddress = sourceAddress;
1000         return this;
1001     }
1002
1003     /**
1004      * Get the destination address of the HTTP request.
1005      *
1006      * @return the destination address of the HTTP request
1007      */

1008     public InetSocketAddress getDestinationAddress() {
1009         if (destinationAddress != null) {
1010             return destinationAddress;
1011         }
1012         return connection.getLocalAddress(InetSocketAddress.class);
1013     }
1014
1015     /**
1016      * Sets the destination address of the HTTP request. If this is not explicitly set
1017      * the actual destination address of the channel is used.
1018      *
1019      * @param destinationAddress The address
1020      */

1021     public HttpServerExchange setDestinationAddress(InetSocketAddress destinationAddress) {
1022         this.destinationAddress = destinationAddress;
1023         return this;
1024     }
1025
1026     /**
1027      * Get the request headers.
1028      *
1029      * @return the request headers
1030      */

1031     public HeaderMap getRequestHeaders() {
1032         return requestHeaders;
1033     }
1034
1035     /**
1036      * @return The content length of the request, or <code>-1</code> if it has not been set
1037      */

1038     public long getRequestContentLength() {
1039         String contentLengthString = requestHeaders.getFirst(Headers.CONTENT_LENGTH);
1040         if (contentLengthString == null) {
1041             return -1;
1042         }
1043         return Long.parseLong(contentLengthString);
1044     }
1045
1046     /**
1047      * Get the response headers.
1048      *
1049      * @return the response headers
1050      */

1051     public HeaderMap getResponseHeaders() {
1052         return responseHeaders;
1053     }
1054
1055     /**
1056      * @return The content length of the response, or <code>-1</code> if it has not been set
1057      */

1058     public long getResponseContentLength() {
1059         String contentLengthString = responseHeaders.getFirst(Headers.CONTENT_LENGTH);
1060         if (contentLengthString == null) {
1061             return -1;
1062         }
1063         return Long.parseLong(contentLengthString);
1064     }
1065
1066     /**
1067      * Sets the response content length
1068      *
1069      * @param length The content length
1070      */

1071     public HttpServerExchange setResponseContentLength(long length) {
1072         if (length == -1) {
1073             responseHeaders.remove(Headers.CONTENT_LENGTH);
1074         } else {
1075             responseHeaders.put(Headers.CONTENT_LENGTH, Long.toString(length));
1076         }
1077         return this;
1078     }
1079
1080     /**
1081      * Returns a mutable map of query parameters.
1082      *
1083      * @return The query parameters
1084      */

1085     public Map<String, Deque<String>> getQueryParameters() {
1086         if (queryParameters == null) {
1087             queryParameters = new TreeMap<>();
1088         }
1089         return queryParameters;
1090     }
1091
1092     public HttpServerExchange addQueryParam(final String name, final String param) {
1093         if (queryParameters == null) {
1094             queryParameters = new TreeMap<>();
1095         }
1096         Deque<String> list = queryParameters.get(name);
1097         if (list == null) {
1098             queryParameters.put(name, list = new ArrayDeque<>(2));
1099         }
1100         list.add(param);
1101         return this;
1102     }
1103
1104
1105     /**
1106      * Returns a mutable map of path parameters
1107      *
1108      * @return The path parameters
1109      */

1110     public Map<String, Deque<String>> getPathParameters() {
1111         if (pathParameters == null) {
1112             pathParameters = new TreeMap<>();
1113         }
1114         return pathParameters;
1115     }
1116
1117     public HttpServerExchange addPathParam(final String name, final String param) {
1118         if (pathParameters == null) {
1119             pathParameters = new TreeMap<>();
1120         }
1121         Deque<String> list = pathParameters.get(name);
1122         if (list == null) {
1123             pathParameters.put(name, list = new ArrayDeque<>(2));
1124         }
1125         list.add(param);
1126         return this;
1127     }
1128
1129     /**
1130      * @return A mutable map of request cookies
1131      */

1132     public Map<String, Cookie> getRequestCookies() {
1133         if (requestCookies == null) {
1134             requestCookies = Cookies.parseRequestCookies(
1135                     getConnection().getUndertowOptions().get(UndertowOptions.MAX_COOKIES, 200),
1136                     getConnection().getUndertowOptions().get(UndertowOptions.ALLOW_EQUALS_IN_COOKIE_VALUE, false),
1137                     requestHeaders.get(Headers.COOKIE));
1138         }
1139         return requestCookies;
1140     }
1141
1142     /**
1143      * Sets a response cookie
1144      *
1145      * @param cookie The cookie
1146      */

1147     public HttpServerExchange setResponseCookie(final Cookie cookie) {
1148         if (getConnection().getUndertowOptions().get(UndertowOptions.ENABLE_RFC6265_COOKIE_VALIDATION, UndertowOptions.DEFAULT_ENABLE_RFC6265_COOKIE_VALIDATION)) {
1149             if (cookie.getValue() != null && !cookie.getValue().isEmpty()) {
1150                 Rfc6265CookieSupport.validateCookieValue(cookie.getValue());
1151             }
1152             if (cookie.getPath() != null && !cookie.getPath().isEmpty()) {
1153                 Rfc6265CookieSupport.validatePath(cookie.getPath());
1154             }
1155             if (cookie.getDomain() != null && !cookie.getDomain().isEmpty()) {
1156                 Rfc6265CookieSupport.validateDomain(cookie.getDomain());
1157             }
1158         }
1159         if (responseCookies == null) {
1160             responseCookies = new TreeMap<>(); //hashmap is slow to allocate in JDK7
1161         }
1162         responseCookies.put(cookie.getName(), cookie);
1163         return this;
1164     }
1165
1166     /**
1167      * @return A mutable map of response cookies
1168      */

1169     public Map<String, Cookie> getResponseCookies() {
1170         if (responseCookies == null) {
1171             responseCookies = new TreeMap<>();
1172         }
1173         return responseCookies;
1174     }
1175
1176     /**
1177      * For internal use only
1178      *
1179      * @return The response cookies, or null if they have not been set yet
1180      */

1181     Map<String, Cookie> getResponseCookiesInternal() {
1182         return responseCookies;
1183     }
1184
1185     /**
1186      * @return <code>true</code> If the response has already been started
1187      */

1188     public boolean isResponseStarted() {
1189         return allAreSet(state, FLAG_RESPONSE_SENT);
1190     }
1191
1192     /**
1193      * Get the inbound request.  If there is no request body, calling this method
1194      * may cause the next request to immediately be processed.  The {@link StreamSourceChannel#close()} or {@link StreamSourceChannel#shutdownReads()}
1195      * method must be called at some point after the request is processed to prevent resource leakage and to allow
1196      * the next request to proceed.  Any unread content will be discarded.
1197      *
1198      * @return the channel for the inbound request, or {@code nullif another party already acquired the channel
1199      */

1200     public StreamSourceChannel getRequestChannel() {
1201         if (requestChannel != null) {
1202             if(anyAreSet(state, FLAG_REQUEST_RESET)) {
1203                 state &= ~FLAG_REQUEST_RESET;
1204                 return requestChannel;
1205             }
1206             return null;
1207         }
1208         if (anyAreSet(state, FLAG_REQUEST_TERMINATED)) {
1209             return requestChannel = new ReadDispatchChannel(new ConduitStreamSourceChannel(Configurable.EMPTY, new EmptyStreamSourceConduit(getIoThread())));
1210         }
1211         final ConduitWrapper<StreamSourceConduit>[] wrappers = this.requestWrappers;
1212         final ConduitStreamSourceChannel sourceChannel = connection.getSourceChannel();
1213         if (wrappers != null) {
1214             this.requestWrappers = null;
1215             final WrapperConduitFactory<StreamSourceConduit> factory = new WrapperConduitFactory<>(wrappers, requestWrapperCount, sourceChannel.getConduit(), this);
1216             sourceChannel.setConduit(factory.create());
1217         }
1218         return requestChannel = new ReadDispatchChannel(sourceChannel);
1219     }
1220
1221     void resetRequestChannel() {
1222         state |= FLAG_REQUEST_RESET;
1223     }
1224
1225     public boolean isRequestChannelAvailable() {
1226         return requestChannel == null || anyAreSet(state, FLAG_REQUEST_RESET);
1227     }
1228
1229     /**
1230      * Returns true if the completion handler for this exchange has been invoked, and the request is considered
1231      * finished.
1232      */

1233     public boolean isComplete() {
1234         return allAreSet(state, FLAG_REQUEST_TERMINATED | FLAG_RESPONSE_TERMINATED);
1235     }
1236
1237     /**
1238      * Returns true if all data has been read from the request, or if there
1239      * was not data.
1240      *
1241      * @return true if the request is complete
1242      */

1243     public boolean isRequestComplete() {
1244         PooledByteBuffer[] data = getAttachment(BUFFERED_REQUEST_DATA);
1245         if(data != null) {
1246             return false;
1247         }
1248         return allAreSet(state, FLAG_REQUEST_TERMINATED);
1249     }
1250
1251     /**
1252      * @return true if the responses is complete
1253      */

1254     public boolean isResponseComplete() {
1255         return allAreSet(state, FLAG_RESPONSE_TERMINATED);
1256     }
1257
1258     /**
1259      * Force the codec to treat the request as fully read.  Should only be invoked by handlers which downgrade
1260      * the socket or implement a transfer coding.
1261      */

1262     void terminateRequest() {
1263         int oldVal = state;
1264         if (allAreSet(oldVal, FLAG_REQUEST_TERMINATED)) {
1265             // idempotent
1266             return;
1267         }
1268         if (requestChannel != null) {
1269             requestChannel.requestDone();
1270         }
1271         this.state = oldVal | FLAG_REQUEST_TERMINATED;
1272         if (anyAreSet(oldVal, FLAG_RESPONSE_TERMINATED)) {
1273             invokeExchangeCompleteListeners();
1274         }
1275     }
1276
1277     private void invokeExchangeCompleteListeners() {
1278         if (exchangeCompletionListenersCount > 0) {
1279             int i = exchangeCompletionListenersCount - 1;
1280             ExchangeCompletionListener next = exchangeCompleteListeners[i];
1281             exchangeCompletionListenersCount = -1;
1282             next.exchangeEvent(thisnew ExchangeCompleteNextListener(exchangeCompleteListeners, this, i));
1283         } else if (exchangeCompletionListenersCount == 0) {
1284             exchangeCompletionListenersCount = -1;
1285             connection.exchangeComplete(this);
1286         }
1287     }
1288
1289     /**
1290      * Get the response channel. The channel must be closed and fully flushed before the next response can be started.
1291      * In order to close the channel you must first call {@link org.xnio.channels.StreamSinkChannel#shutdownWrites()},
1292      * and then call {@link org.xnio.channels.StreamSinkChannel#flush()} until it returns true. Alternatively you can
1293      * call {@link #endExchange()}, which will close the channel as part of its cleanup.
1294      * <p>
1295      * Closing a fixed-length response before the corresponding number of bytes has been written will cause the connection
1296      * to be reset and subsequent requests to fail; thus it is important to ensure that the proper content length is
1297      * delivered when one is specified.  The response channel may not be writable until after the response headers have
1298      * been sent.
1299      * <p>
1300      * If this method is not called then an empty or default response body will be used, depending on the response code set.
1301      * <p>
1302      * The returned channel will begin to write out headers when the first write request is initiated, or when
1303      * {@link org.xnio.channels.StreamSinkChannel#shutdownWrites()} is called on the channel with no content being written.
1304      * Once the channel is acquired, however, the response code and headers may not be modified.
1305      * <p>
1306      *
1307      * @return the response channel, or {@code nullif another party already acquired the channel
1308      */

1309     public StreamSinkChannel getResponseChannel() {
1310         if (responseChannel != null) {
1311             return null;
1312         }
1313         final ConduitWrapper<StreamSinkConduit>[] wrappers = responseWrappers;
1314         this.responseWrappers = null;
1315         final ConduitStreamSinkChannel sinkChannel = connection.getSinkChannel();
1316         if (sinkChannel == null) {
1317             return null;
1318         }
1319         if(wrappers != null) {
1320             final WrapperStreamSinkConduitFactory factory = new WrapperStreamSinkConduitFactory(wrappers, responseWrapperCount, this, sinkChannel.getConduit());
1321             sinkChannel.setConduit(factory.create());
1322         } else {
1323             sinkChannel.setConduit(connection.getSinkConduit(this, sinkChannel.getConduit()));
1324         }
1325         this.responseChannel = new WriteDispatchChannel(sinkChannel);
1326         this.startResponse();
1327         return responseChannel;
1328     }
1329
1330     /**
1331      * Get the response sender.
1332      * <p>
1333      * For blocking exchanges this will return a sender that uses the underlying output stream.
1334      *
1335      * @return the response sender, or {@code nullif another party already acquired the channel or the sender
1336      * @see #getResponseChannel()
1337      */

1338     public Sender getResponseSender() {
1339         if (blockingHttpExchange != null) {
1340             return blockingHttpExchange.getSender();
1341         }
1342         if (sender != null) {
1343             return sender;
1344         }
1345         return sender = new AsyncSenderImpl(this);
1346     }
1347
1348     public Receiver getRequestReceiver() {
1349         if(blockingHttpExchange != null) {
1350             return blockingHttpExchange.getReceiver();
1351         }
1352         if(receiver != null) {
1353             return receiver;
1354         }
1355         return receiver = new AsyncReceiverImpl(this);
1356     }
1357
1358     /**
1359      * @return <code>true</code> if {@link #getResponseChannel()} has not been called
1360      */

1361     public boolean isResponseChannelAvailable() {
1362         return responseChannel == null;
1363     }
1364
1365
1366     /**
1367      * Get the status code.
1368      *
1369      * @see #getStatusCode()
1370      * @return the status code
1371      */

1372     @Deprecated
1373     public int getResponseCode() {
1374         return state & MASK_RESPONSE_CODE;
1375     }
1376
1377     /**
1378      * Change the status code for this response.  If not specified, the code will be a {@code 200}.  Setting
1379      * the status code after the response headers have been transmitted has no effect.
1380      *
1381      * @see #setStatusCode(int)
1382      * @param statusCode the new code
1383      * @throws IllegalStateException if a response or upgrade was already sent
1384      */

1385     @Deprecated
1386     public HttpServerExchange setResponseCode(final int statusCode) {
1387         return setStatusCode(statusCode);
1388     }
1389
1390     /**
1391      * Get the status code.
1392      *
1393      * @return the status code
1394      */

1395     public int getStatusCode() {
1396         return state & MASK_RESPONSE_CODE;
1397     }
1398
1399     /**
1400      * Change the status code for this response.  If not specified, the code will be a {@code 200}.  Setting
1401      * the status code after the response headers have been transmitted has no effect.
1402      *
1403      * @param statusCode the new code
1404      * @throws IllegalStateException if a response or upgrade was already sent
1405      */

1406     public HttpServerExchange setStatusCode(final int statusCode) {
1407         if (statusCode < 0 || statusCode > 999) {
1408             throw new IllegalArgumentException("Invalid response code");
1409         }
1410         int oldVal = state;
1411         if (allAreSet(oldVal, FLAG_RESPONSE_SENT)) {
1412             throw UndertowMessages.MESSAGES.responseAlreadyStarted();
1413         }
1414         if(statusCode >= 500) {
1415             if(UndertowLogger.ERROR_RESPONSE.isDebugEnabled()) {
1416                 UndertowLogger.ERROR_RESPONSE.debugf(new RuntimeException(), "Setting error code %s for exchange %s", statusCode, this);
1417             }
1418         }
1419         this.state = oldVal & ~MASK_RESPONSE_CODE | statusCode & MASK_RESPONSE_CODE;
1420         return this;
1421     }
1422
1423     /**
1424      * Sets the HTTP reason phrase. Depending on the protocol this may or may not be honoured. In particular HTTP2
1425      * has removed support for the reason phrase.
1426      *
1427      * This method should only be used to interact with legacy frameworks that give special meaning to the reason phrase.
1428      *
1429      * @param message The status message
1430      * @return this exchange
1431      */

1432     public HttpServerExchange setReasonPhrase(String message) {
1433         putAttachment(REASON_PHRASE, message);
1434         return this;
1435     }
1436
1437     /**
1438      *
1439      * @return The current reason phrase
1440      */

1441     public String getReasonPhrase() {
1442         return getAttachment(REASON_PHRASE);
1443     }
1444
1445     /**
1446      * Adds a {@link ConduitWrapper} to the request wrapper chain.
1447      *
1448      * @param wrapper the wrapper
1449      */

1450     public HttpServerExchange addRequestWrapper(final ConduitWrapper<StreamSourceConduit> wrapper) {
1451         ConduitWrapper<StreamSourceConduit>[] wrappers = requestWrappers;
1452         if (requestChannel != null) {
1453             throw UndertowMessages.MESSAGES.requestChannelAlreadyProvided();
1454         }
1455         if (wrappers == null) {
1456             wrappers = requestWrappers = new ConduitWrapper[2];
1457         } else if (wrappers.length == requestWrapperCount) {
1458             requestWrappers = new ConduitWrapper[wrappers.length + 2];
1459             System.arraycopy(wrappers, 0, requestWrappers, 0, wrappers.length);
1460             wrappers = requestWrappers;
1461         }
1462         wrappers[requestWrapperCount++] = wrapper;
1463         return this;
1464     }
1465
1466     /**
1467      * Adds a {@link ConduitWrapper} to the response wrapper chain.
1468      *
1469      * @param wrapper the wrapper
1470      */

1471     public HttpServerExchange addResponseWrapper(final ConduitWrapper<StreamSinkConduit> wrapper) {
1472         ConduitWrapper<StreamSinkConduit>[] wrappers = responseWrappers;
1473         if (responseChannel != null) {
1474             throw UndertowMessages.MESSAGES.responseChannelAlreadyProvided();
1475         }
1476         if(wrappers == null) {
1477             this.responseWrappers = wrappers = new ConduitWrapper[2];
1478         } else if (wrappers.length == responseWrapperCount) {
1479             responseWrappers = new ConduitWrapper[wrappers.length + 2];
1480             System.arraycopy(wrappers, 0, responseWrappers, 0, wrappers.length);
1481             wrappers = responseWrappers;
1482         }
1483         wrappers[responseWrapperCount++] = wrapper;
1484         return this;
1485     }
1486
1487     /**
1488      * Calling this method puts the exchange in blocking mode, and creates a
1489      * {@link BlockingHttpExchange} object to store the streams.
1490      * <p>
1491      * When an exchange is in blocking mode the input stream methods become
1492      * available, other than that there is presently no major difference
1493      * between blocking an non-blocking modes.
1494      *
1495      * @return The existing blocking exchange, if any
1496      */

1497     public BlockingHttpExchange startBlocking() {
1498         final BlockingHttpExchange old = this.blockingHttpExchange;
1499         blockingHttpExchange = new DefaultBlockingHttpExchange(this);
1500         return old;
1501     }
1502
1503     /**
1504      * Calling this method puts the exchange in blocking mode, using the given
1505      * blocking exchange as the source of the streams.
1506      * <p>
1507      * When an exchange is in blocking mode the input stream methods become
1508      * available, other than that there is presently no major difference
1509      * between blocking an non-blocking modes.
1510      * <p>
1511      * Note that this method may be called multiple times with different
1512      * exchange objects, to allow handlers to modify the streams
1513      * that are being used.
1514      *
1515      * @return The existing blocking exchange, if any
1516      */

1517     public BlockingHttpExchange startBlocking(final BlockingHttpExchange httpExchange) {
1518         final BlockingHttpExchange old = this.blockingHttpExchange;
1519         blockingHttpExchange = httpExchange;
1520         return old;
1521     }
1522
1523     /**
1524      * Returns true if {@link #startBlocking()} or {@link #startBlocking(BlockingHttpExchange)} has been called.
1525      *
1526      * @return <code>true</code> If this is a blocking HTTP server exchange
1527      */

1528     public boolean isBlocking() {
1529         return blockingHttpExchange != null;
1530     }
1531
1532     /**
1533      * @return The input stream
1534      * @throws IllegalStateException if {@link #startBlocking()} has not been called
1535      */

1536     public InputStream getInputStream() {
1537         if (blockingHttpExchange == null) {
1538             throw UndertowMessages.MESSAGES.startBlockingHasNotBeenCalled();
1539         }
1540         return blockingHttpExchange.getInputStream();
1541     }
1542
1543     /**
1544      * @return The output stream
1545      * @throws IllegalStateException if {@link #startBlocking()} has not been called
1546      */

1547     public OutputStream getOutputStream() {
1548         if (blockingHttpExchange == null) {
1549             throw UndertowMessages.MESSAGES.startBlockingHasNotBeenCalled();
1550         }
1551         return blockingHttpExchange.getOutputStream();
1552     }
1553
1554     /**
1555      * Force the codec to treat the response as fully written.  Should only be invoked by handlers which downgrade
1556      * the socket or implement a transfer coding.
1557      */

1558     HttpServerExchange terminateResponse() {
1559         int oldVal = state;
1560         if (allAreSet(oldVal, FLAG_RESPONSE_TERMINATED)) {
1561             // idempotent
1562             return this;
1563         }
1564         if(responseChannel != null) {
1565             responseChannel.responseDone();
1566         }
1567         this.state = oldVal | FLAG_RESPONSE_TERMINATED;
1568         if (anyAreSet(oldVal, FLAG_REQUEST_TERMINATED)) {
1569             invokeExchangeCompleteListeners();
1570         }
1571         return this;
1572     }
1573
1574     /**
1575      * @return The request start time using the JVM's high-resolution time source,
1576      * in nanoseconds, or -1 if this was not recorded
1577      * @see UndertowOptions#RECORD_REQUEST_START_TIME
1578      * @see Connectors#setRequestStartTime(HttpServerExchange)
1579      */

1580     public long getRequestStartTime() {
1581         return requestStartTime;
1582     }
1583
1584
1585     HttpServerExchange setRequestStartTime(long requestStartTime) {
1586         this.requestStartTime = requestStartTime;
1587         return this;
1588     }
1589
1590     /**
1591      * Ends the exchange by fully draining the request channel, and flushing the response channel.
1592      * <p>
1593      * This can result in handoff to an XNIO worker, so after this method is called the exchange should
1594      * not be modified by the caller.
1595      * <p>
1596      * If the exchange is already complete this method is a noop
1597      */

1598     public HttpServerExchange endExchange() {
1599         final int state = this.state;
1600         if (allAreSet(state, FLAG_REQUEST_TERMINATED | FLAG_RESPONSE_TERMINATED)) {
1601             if(blockingHttpExchange != null) {
1602                 //we still have to close the blocking exchange in this case,
1603                 IoUtils.safeClose(blockingHttpExchange);
1604             }
1605             return this;
1606         }
1607         if(defaultResponseListeners != null) {
1608             int i = defaultResponseListeners.length - 1;
1609             while (i >= 0) {
1610                 DefaultResponseListener listener = defaultResponseListeners[i];
1611                 if (listener != null) {
1612                     defaultResponseListeners[i] = null;
1613                     try {
1614                         if (listener.handleDefaultResponse(this)) {
1615                             return this;
1616                         }
1617                     } catch (Throwable e) {
1618                         UndertowLogger.REQUEST_LOGGER.debug("Exception running default response listener", e);
1619                     }
1620                 }
1621                 i--;
1622             }
1623         }
1624
1625         if (anyAreClear(state, FLAG_REQUEST_TERMINATED)) {
1626             connection.terminateRequestChannel(this);
1627         }
1628
1629         if (blockingHttpExchange != null) {
1630             try {
1631                 //TODO: can we end up in this situation in a IO thread?
1632                 blockingHttpExchange.close();
1633             } catch (IOException e) {
1634                 UndertowLogger.REQUEST_IO_LOGGER.ioException(e);
1635                 IoUtils.safeClose(connection);
1636             } catch (Throwable t) {
1637                 UndertowLogger.REQUEST_IO_LOGGER.handleUnexpectedFailure(t);
1638                 IoUtils.safeClose(connection);
1639             }
1640         }
1641
1642         //417 means that we are rejecting the request
1643         //so the client should not actually send any data
1644         if (anyAreClear(state, FLAG_REQUEST_TERMINATED)) {
1645
1646             //not really sure what the best thing to do here is
1647             //for now we are just going to drain the channel
1648             if (requestChannel == null) {
1649                 getRequestChannel();
1650             }
1651             int totalRead = 0;
1652             for (; ; ) {
1653                 try {
1654                     long read = Channels.drain(requestChannel, Long.MAX_VALUE);
1655                     totalRead += read;
1656                     if (read == 0) {
1657                         //if the response code is 417 this is a rejected continuation request.
1658                         //however there is a chance the client could have sent the data anyway
1659                         //so we attempt to drain, and if we have not drained anything then we
1660                         //assume the server has not sent any data
1661
1662                         if (getStatusCode() != StatusCodes.EXPECTATION_FAILED || totalRead > 0) {
1663                             requestChannel.getReadSetter().set(ChannelListeners.drainListener(Long.MAX_VALUE,
1664                                     new ChannelListener<StreamSourceChannel>() {
1665                                         @Override
1666                                         public void handleEvent(final StreamSourceChannel channel) {
1667                                             if (anyAreClear(state, FLAG_RESPONSE_TERMINATED)) {
1668                                                 closeAndFlushResponse();
1669                                             }
1670                                         }
1671                                     }, new ChannelExceptionHandler<StreamSourceChannel>() {
1672                                         @Override
1673                                         public void handleException(final StreamSourceChannel channel, final IOException e) {
1674
1675                                             //make sure the listeners have been invoked
1676                                             //unless the connection has been killed this is a no-op
1677                                             terminateRequest();
1678                                             terminateResponse();
1679                                             UndertowLogger.REQUEST_LOGGER.debug("Exception draining request stream", e);
1680                                             IoUtils.safeClose(connection);
1681                                         }
1682                                     }
1683                             ));
1684                             requestChannel.resumeReads();
1685                             return this;
1686                         } else {
1687                             break;
1688                         }
1689                     } else if (read == -1) {
1690                         break;
1691                     }
1692                 } catch (Throwable t) {
1693                     if (t instanceof IOException) {
1694                         UndertowLogger.REQUEST_IO_LOGGER.ioException((IOException) t);
1695                     } else {
1696                         UndertowLogger.REQUEST_IO_LOGGER.handleUnexpectedFailure(t);
1697                     }
1698                     invokeExchangeCompleteListeners();
1699                     IoUtils.safeClose(connection);
1700                     return this;
1701                 }
1702
1703             }
1704         }
1705         if (anyAreClear(state, FLAG_RESPONSE_TERMINATED)) {
1706             closeAndFlushResponse();
1707         }
1708         return this;
1709     }
1710
1711     private void closeAndFlushResponse() {
1712         if(!connection.isOpen()) {
1713             //not much point trying to flush
1714
1715             //make sure the listeners have been invoked
1716             terminateRequest();
1717             terminateResponse();
1718             return;
1719         }
1720         try {
1721             if (isResponseChannelAvailable()) {
1722                 if(!getRequestMethod().equals(Methods.CONNECT) && !(getRequestMethod().equals(Methods.HEAD) && getResponseHeaders().contains(Headers.CONTENT_LENGTH)) && Connectors.isEntityBodyAllowed(this)) {
1723                     //according to
1724                     getResponseHeaders().put(Headers.CONTENT_LENGTH, "0");
1725                 }
1726                 getResponseChannel();
1727             }
1728             responseChannel.shutdownWrites();
1729             if (!responseChannel.flush()) {
1730                 responseChannel.getWriteSetter().set(ChannelListeners.flushingChannelListener(
1731                         new ChannelListener<StreamSinkChannel>() {
1732                             @Override
1733                             public void handleEvent(final StreamSinkChannel channel) {
1734                                 channel.suspendWrites();
1735                                 channel.getWriteSetter().set(null);
1736                                 //defensive programming, should never happen
1737                                 if (anyAreClear(state, FLAG_RESPONSE_TERMINATED)) {
1738                                     //make sure the listeners have been invoked
1739                                     invokeExchangeCompleteListeners();
1740                                     UndertowLogger.ROOT_LOGGER.responseWasNotTerminated(connection, HttpServerExchange.this);
1741                                     IoUtils.safeClose(connection);
1742                                 }
1743                             }
1744                         }, new ChannelExceptionHandler<Channel>() {
1745                             @Override
1746                             public void handleException(final Channel channel, final IOException exception) {
1747                                 //make sure the listeners have been invoked
1748                                 invokeExchangeCompleteListeners();
1749                                 UndertowLogger.REQUEST_LOGGER.debug("Exception ending request", exception);
1750                                 IoUtils.safeClose(connection);
1751                             }
1752                         }
1753                 ));
1754                 responseChannel.resumeWrites();
1755             } else {
1756                 //defensive programming, should never happen
1757                 if (anyAreClear(state, FLAG_RESPONSE_TERMINATED)) {
1758                     //make sure the listeners have been invoked
1759                     invokeExchangeCompleteListeners();
1760                     UndertowLogger.ROOT_LOGGER.responseWasNotTerminated(connection, this);
1761                     IoUtils.safeClose(connection);
1762                 }
1763             }
1764         } catch (Throwable t) {
1765             if (t instanceof IOException) {
1766                 UndertowLogger.REQUEST_IO_LOGGER.ioException((IOException) t);
1767             } else {
1768                 UndertowLogger.REQUEST_IO_LOGGER.handleUnexpectedFailure(t);
1769             }
1770             invokeExchangeCompleteListeners();
1771
1772             IoUtils.safeClose(connection);
1773         }
1774     }
1775
1776     /**
1777      * Transmit the response headers. After this method successfully returns,
1778      * the response channel may become writable.
1779      * <p>
1780      * If this method fails the request and response channels will be closed.
1781      * <p>
1782      * This method runs asynchronously. If the channel is writable it will
1783      * attempt to write as much of the response header as possible, and then
1784      * queue the rest in a listener and return.
1785      * <p>
1786      * If future handlers in the chain attempt to write before this is finished
1787      * XNIO will just magically sort it out so it works. This is not actually
1788      * implemented yet, so we just terminate the connection straight away at
1789      * the moment.
1790      * <p>
1791      * TODO: make this work properly
1792      *
1793      * @throws IllegalStateException if the response headers were already sent
1794      */

1795     HttpServerExchange startResponse() throws IllegalStateException {
1796         int oldVal = state;
1797         if (allAreSet(oldVal, FLAG_RESPONSE_SENT)) {
1798             throw UndertowMessages.MESSAGES.responseAlreadyStarted();
1799         }
1800         this.state = oldVal | FLAG_RESPONSE_SENT;
1801
1802         log.tracef("Starting to write response for %s"this);
1803         return this;
1804     }
1805
1806     public XnioIoThread getIoThread() {
1807         return connection.getIoThread();
1808     }
1809
1810     /**
1811      * @return The maximum entity size for this exchange
1812      */

1813     public long getMaxEntitySize() {
1814         return maxEntitySize;
1815     }
1816
1817     /**
1818      * Sets the max entity size for this exchange. This cannot be modified after the request channel has been obtained.
1819      *
1820      * @param maxEntitySize The max entity size
1821      */

1822     public HttpServerExchange setMaxEntitySize(final long maxEntitySize) {
1823         if (!isRequestChannelAvailable()) {
1824             throw UndertowMessages.MESSAGES.requestChannelAlreadyProvided();
1825         }
1826         this.maxEntitySize = maxEntitySize;
1827         connection.maxEntitySizeUpdated(this);
1828         return this;
1829     }
1830
1831     public SecurityContext getSecurityContext() {
1832         return securityContext;
1833     }
1834
1835     public void setSecurityContext(SecurityContext securityContext) {
1836         SecurityManager sm = System.getSecurityManager();
1837         if(sm != null) {
1838             sm.checkPermission(SET_SECURITY_CONTEXT);
1839         }
1840         this.securityContext = securityContext;
1841     }
1842
1843     /**
1844      * Adds a listener that will be invoked on response commit
1845      *
1846      * @param listener The response listener
1847      */

1848     public void addResponseCommitListener(final ResponseCommitListener listener) {
1849
1850         //technically it is possible to modify the exchange after the response conduit has been created
1851         //as the response channel should not be retrieved until it is about to be written to
1852         //if we get complaints about this we can add support for it, however it makes the exchange bigger and the connectors more complex
1853         addResponseWrapper(new ConduitWrapper<StreamSinkConduit>() {
1854             @Override
1855             public StreamSinkConduit wrap(ConduitFactory<StreamSinkConduit> factory, HttpServerExchange exchange) {
1856                 listener.beforeCommit(exchange);
1857                 return factory.create();
1858             }
1859         });
1860     }
1861
1862     /**
1863      * Actually resumes reads or writes, if the relevant method has been called.
1864      *
1865      * @return <code>true</code> if reads or writes were resumed
1866      */

1867     boolean runResumeReadWrite() {
1868         boolean ret = false;
1869         if(anyAreSet(state, FLAG_SHOULD_RESUME_WRITES)) {
1870             responseChannel.runResume();
1871             ret = true;
1872         }
1873         if(anyAreSet(state, FLAG_SHOULD_RESUME_READS)) {
1874             requestChannel.runResume();
1875             ret = true;
1876         }
1877         return ret;
1878     }
1879
1880     boolean isResumed() {
1881         return anyAreSet(state, FLAG_SHOULD_RESUME_WRITES | FLAG_SHOULD_RESUME_READS);
1882     }
1883
1884     private static class ExchangeCompleteNextListener implements ExchangeCompletionListener.NextListener {
1885         private final ExchangeCompletionListener[] list;
1886         private final HttpServerExchange exchange;
1887         private int i;
1888
1889         ExchangeCompleteNextListener(final ExchangeCompletionListener[] list, final HttpServerExchange exchange, int i) {
1890             this.list = list;
1891             this.exchange = exchange;
1892             this.i = i;
1893         }
1894
1895         @Override
1896         public void proceed() {
1897             if (--i >= 0) {
1898                 final ExchangeCompletionListener next = list[i];
1899                 next.exchangeEvent(exchange, this);
1900             } else if(i == -1) {
1901                 exchange.connection.exchangeComplete(exchange);
1902             }
1903         }
1904     }
1905
1906     private static class DefaultBlockingHttpExchange implements BlockingHttpExchange {
1907
1908         private InputStream inputStream;
1909         private UndertowOutputStream outputStream;
1910         private Sender sender;
1911         private final HttpServerExchange exchange;
1912
1913         DefaultBlockingHttpExchange(final HttpServerExchange exchange) {
1914             this.exchange = exchange;
1915         }
1916
1917         public InputStream getInputStream() {
1918             if (inputStream == null) {
1919                 inputStream = new UndertowInputStream(exchange);
1920             }
1921             return inputStream;
1922         }
1923
1924         public UndertowOutputStream getOutputStream() {
1925             if (outputStream == null) {
1926                 outputStream = new UndertowOutputStream(exchange);
1927             }
1928             return outputStream;
1929         }
1930
1931         @Override
1932         public Sender getSender() {
1933             if (sender == null) {
1934                 sender = new BlockingSenderImpl(exchange, getOutputStream());
1935             }
1936             return sender;
1937         }
1938
1939         @Override
1940         public void close() throws IOException {
1941             try {
1942                 getInputStream().close();
1943             } finally {
1944                 getOutputStream().close();
1945             }
1946         }
1947
1948         @Override
1949         public Receiver getReceiver() {
1950             return new BlockingReceiverImpl(exchange, getInputStream());
1951         }
1952     }
1953
1954     /**
1955      * Channel implementation that is actually provided to clients of the exchange.
1956      * <p>
1957      * We do not provide the underlying conduit channel, as this is shared between requests, so we need to make sure that after this request
1958      * is done the the channel cannot affect the next request.
1959      * <p>
1960      * It also delays a wakeup/resumesWrites calls until the current call stack has returned, thus ensuring that only 1 thread is
1961      * active in the exchange at any one time.
1962      */

1963     private class WriteDispatchChannel extends DetachableStreamSinkChannel implements StreamSinkChannel {
1964
1965         private boolean wakeup;
1966
1967         WriteDispatchChannel(final ConduitStreamSinkChannel delegate) {
1968             super(delegate);
1969         }
1970
1971         @Override
1972         protected boolean isFinished() {
1973             return allAreSet(state, FLAG_RESPONSE_TERMINATED);
1974         }
1975
1976         @Override
1977         public void resumeWrites() {
1978             if (isInCall()) {
1979                 state |= FLAG_SHOULD_RESUME_WRITES;
1980                 if(anyAreSet(state, FLAG_DISPATCHED)) {
1981                     throw UndertowMessages.MESSAGES.resumedAndDispatched();
1982                 }
1983             } else if(!isFinished()){
1984                 delegate.resumeWrites();
1985             }
1986         }
1987
1988         @Override
1989         public void suspendWrites() {
1990             state &= ~FLAG_SHOULD_RESUME_WRITES;
1991             super.suspendWrites();
1992         }
1993
1994         @Override
1995         public void wakeupWrites() {
1996             if (isFinished()) {
1997                 return;
1998             }
1999             if (isInCall()) {
2000                 wakeup = true;
2001                 state |= FLAG_SHOULD_RESUME_WRITES;
2002                 if(anyAreSet(state, FLAG_DISPATCHED)) {
2003                     throw UndertowMessages.MESSAGES.resumedAndDispatched();
2004                 }
2005             } else {
2006                 delegate.wakeupWrites();
2007             }
2008         }
2009
2010         @Override
2011         public boolean isWriteResumed() {
2012             return anyAreSet(state, FLAG_SHOULD_RESUME_WRITES) || super.isWriteResumed();
2013         }
2014
2015         public void runResume() {
2016             if (isWriteResumed()) {
2017                 if(isFinished()) {
2018                     invokeListener();
2019                 } else {
2020                     if (wakeup) {
2021                         wakeup = false;
2022                         state &= ~FLAG_SHOULD_RESUME_WRITES;
2023                         delegate.wakeupWrites();
2024                     } else {
2025                         state &= ~FLAG_SHOULD_RESUME_WRITES;
2026                         delegate.resumeWrites();
2027                     }
2028                 }
2029             } else if(wakeup) {
2030                 wakeup = false;
2031                 invokeListener();
2032             }
2033         }
2034
2035         private void invokeListener() {
2036             if(writeSetter != null) {
2037                 super.getIoThread().execute(new Runnable() {
2038                     @Override
2039                     public void run() {
2040                         ChannelListeners.invokeChannelListener(WriteDispatchChannel.this, writeSetter.get());
2041                     }
2042                 });
2043             }
2044         }
2045
2046         @Override
2047         public void awaitWritable() throws IOException {
2048             if(Thread.currentThread() == super.getIoThread()) {
2049                 throw UndertowMessages.MESSAGES.awaitCalledFromIoThread();
2050             }
2051             super.awaitWritable();
2052         }
2053
2054         @Override
2055         public void awaitWritable(long time, TimeUnit timeUnit) throws IOException {
2056             if(Thread.currentThread() == super.getIoThread()) {
2057                 throw UndertowMessages.MESSAGES.awaitCalledFromIoThread();
2058             }
2059             super.awaitWritable(time, timeUnit);
2060         }
2061
2062         @Override
2063         public long transferFrom(FileChannel src, long position, long count) throws IOException {
2064             long l = super.transferFrom(src, position, count);
2065             if(l > 0) {
2066                 responseBytesSent += l;
2067             }
2068             return l;
2069         }
2070
2071         @Override
2072         public long transferFrom(StreamSourceChannel source, long count, ByteBuffer throughBuffer) throws IOException {
2073             long l = super.transferFrom(source, count, throughBuffer);
2074             if(l > 0) {
2075                 responseBytesSent += l;
2076             }
2077             return l;
2078         }
2079
2080         @Override
2081         public long write(ByteBuffer[] srcs, int offset, int length) throws IOException {
2082             long l = super.write(srcs, offset, length);
2083             responseBytesSent += l;
2084             return l;
2085         }
2086
2087         @Override
2088         public long write(ByteBuffer[] srcs) throws IOException {
2089             long l = super.write(srcs);
2090             responseBytesSent += l;
2091             return l;
2092         }
2093
2094         @Override
2095         public int writeFinal(ByteBuffer src) throws IOException {
2096             int l = super.writeFinal(src);
2097             responseBytesSent += l;
2098             return l;
2099         }
2100
2101         @Override
2102         public long writeFinal(ByteBuffer[] srcs, int offset, int length) throws IOException {
2103             long l = super.writeFinal(srcs, offset, length);
2104             responseBytesSent += l;
2105             return l;
2106         }
2107
2108         @Override
2109         public long writeFinal(ByteBuffer[] srcs) throws IOException {
2110             long l = super.writeFinal(srcs);
2111             responseBytesSent += l;
2112             return l;
2113         }
2114
2115         @Override
2116         public int write(ByteBuffer src) throws IOException {
2117             int l = super.write(src);
2118             responseBytesSent += l;
2119             return l;
2120         }
2121     }
2122
2123     /**
2124      * Channel implementation that is actually provided to clients of the exchange. We do not provide the underlying
2125      * conduit channel, as this will become the next requests conduit channel, so if a thread is still hanging onto this
2126      * exchange it can result in problems.
2127      * <p>
2128      * It also delays a readResume call until the current call stack has returned, thus ensuring that only 1 thread is
2129      * active in the exchange at any one time.
2130      * <p>
2131      * It also handles buffered request data.
2132      */

2133     private final class ReadDispatchChannel extends DetachableStreamSourceChannel implements StreamSourceChannel {
2134
2135         private boolean wakeup = true;
2136         private boolean readsResumed = false;
2137
2138
2139         ReadDispatchChannel(final ConduitStreamSourceChannel delegate) {
2140             super(delegate);
2141         }
2142
2143         @Override
2144         protected boolean isFinished() {
2145             return allAreSet(state, FLAG_REQUEST_TERMINATED);
2146         }
2147
2148         @Override
2149         public void resumeReads() {
2150             readsResumed = true;
2151             if (isInCall()) {
2152                 state |= FLAG_SHOULD_RESUME_READS;
2153                 if(anyAreSet(state, FLAG_DISPATCHED)) {
2154                     throw UndertowMessages.MESSAGES.resumedAndDispatched();
2155                 }
2156             } else if (!isFinished()) {
2157                 delegate.resumeReads();
2158             }
2159
2160         }
2161
2162         public void wakeupReads() {
2163             if (isInCall()) {
2164                 wakeup = true;
2165                 state |= FLAG_SHOULD_RESUME_READS;
2166                 if(anyAreSet(state, FLAG_DISPATCHED)) {
2167                     throw UndertowMessages.MESSAGES.resumedAndDispatched();
2168                 }
2169             } else {
2170                 if(isFinished()) {
2171                     invokeListener();
2172                 } else {
2173                     delegate.wakeupReads();
2174                 }
2175             }
2176         }
2177
2178         private void invokeListener() {
2179             if(readSetter != null) {
2180                 super.getIoThread().execute(new Runnable() {
2181                     @Override
2182                     public void run() {
2183                         ChannelListeners.invokeChannelListener(ReadDispatchChannel.this, readSetter.get());
2184                     }
2185                 });
2186             }
2187         }
2188
2189         public void requestDone() {
2190             if(delegate instanceof ConduitStreamSourceChannel) {
2191                 ((ConduitStreamSourceChannel)delegate).setReadListener(null);
2192                 ((ConduitStreamSourceChannel)delegate).setCloseListener(null);
2193             } else {
2194                 delegate.getReadSetter().set(null);
2195                 delegate.getCloseSetter().set(null);
2196             }
2197         }
2198
2199         @Override
2200         public long transferTo(long position, long count, FileChannel target) throws IOException {
2201             PooledByteBuffer[] buffered = getAttachment(BUFFERED_REQUEST_DATA);
2202             if (buffered == null) {
2203                 return super.transferTo(position, count, target);
2204             }
2205             return target.transferFrom(this, position, count);
2206         }
2207
2208         @Override
2209         public void awaitReadable() throws IOException {
2210             if(Thread.currentThread() == super.getIoThread()) {
2211                 throw UndertowMessages.MESSAGES.awaitCalledFromIoThread();
2212             }
2213             PooledByteBuffer[] buffered = getAttachment(BUFFERED_REQUEST_DATA);
2214             if (buffered == null) {
2215                 super.awaitReadable();
2216             }
2217         }
2218
2219         @Override
2220         public void suspendReads() {
2221             readsResumed = false;
2222             state &= ~(FLAG_SHOULD_RESUME_READS);
2223             super.suspendReads();
2224         }
2225
2226         @Override
2227         public long transferTo(long count, ByteBuffer throughBuffer, StreamSinkChannel target) throws IOException {
2228             PooledByteBuffer[] buffered = getAttachment(BUFFERED_REQUEST_DATA);
2229             if (buffered == null) {
2230                 return super.transferTo(count, throughBuffer, target);
2231             }
2232             //make sure there is no garbage in throughBuffer
2233             throughBuffer.position(0);
2234             throughBuffer.limit(0);
2235             long copied = 0;
2236             for (int i = 0; i < buffered.length; ++i) {
2237                 PooledByteBuffer pooled = buffered[i];
2238                 if (pooled != null) {
2239                     final ByteBuffer buf = pooled.getBuffer();
2240                     if (buf.hasRemaining()) {
2241                         int res = target.write(buf);
2242
2243                         if (!buf.hasRemaining()) {
2244                             pooled.close();
2245                             buffered[i] = null;
2246                         }
2247                         if (res == 0) {
2248                             return copied;
2249                         } else {
2250                             copied += res;
2251                         }
2252                     } else {
2253                         pooled.close();
2254                         buffered[i] = null;
2255                     }
2256                 }
2257             }
2258             removeAttachment(BUFFERED_REQUEST_DATA);
2259             if (copied == 0) {
2260                 return super.transferTo(count, throughBuffer, target);
2261             } else {
2262                 return copied;
2263             }
2264         }
2265
2266         @Override
2267         public void awaitReadable(long time, TimeUnit timeUnit) throws IOException {
2268             if(Thread.currentThread() == super.getIoThread()) {
2269                 throw UndertowMessages.MESSAGES.awaitCalledFromIoThread();
2270             }
2271             PooledByteBuffer[] buffered = getAttachment(BUFFERED_REQUEST_DATA);
2272             if (buffered == null) {
2273                 super.awaitReadable(time, timeUnit);
2274             }
2275         }
2276
2277         @Override
2278         public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
2279             PooledByteBuffer[] buffered = getAttachment(BUFFERED_REQUEST_DATA);
2280             if (buffered == null) {
2281                 return super.read(dsts, offset, length);
2282             }
2283             long copied = 0;
2284             for (int i = 0; i < buffered.length; ++i) {
2285                 PooledByteBuffer pooled = buffered[i];
2286                 if (pooled != null) {
2287                     final ByteBuffer buf = pooled.getBuffer();
2288                     if (buf.hasRemaining()) {
2289                         copied += Buffers.copy(dsts, offset, length, buf);
2290                         if (!buf.hasRemaining()) {
2291                             pooled.close();
2292                             buffered[i] = null;
2293                         }
2294                         if (!Buffers.hasRemaining(dsts, offset, length)) {
2295                             return copied;
2296                         }
2297                     } else {
2298                         pooled.close();
2299                         buffered[i] = null;
2300                     }
2301                 }
2302             }
2303             removeAttachment(BUFFERED_REQUEST_DATA);
2304             if (copied == 0) {
2305                 return super.read(dsts, offset, length);
2306             } else {
2307                 return copied;
2308             }
2309         }
2310
2311         @Override
2312         public long read(ByteBuffer[] dsts) throws IOException {
2313             return read(dsts, 0, dsts.length);
2314         }
2315
2316         @Override
2317         public boolean isOpen() {
2318             PooledByteBuffer[] buffered = getAttachment(BUFFERED_REQUEST_DATA);
2319             if (buffered != null) {
2320                 return true;
2321             }
2322             return super.isOpen();
2323         }
2324
2325         @Override
2326         public void close() throws IOException {
2327             PooledByteBuffer[] buffered = getAttachment(BUFFERED_REQUEST_DATA);
2328             if (buffered != null) {
2329                 for (PooledByteBuffer pooled : buffered) {
2330                     if (pooled != null) {
2331                         pooled.close();
2332                     }
2333                 }
2334             }
2335             removeAttachment(BUFFERED_REQUEST_DATA);
2336             super.close();
2337         }
2338
2339         @Override
2340         public boolean isReadResumed() {
2341             PooledByteBuffer[] buffered = getAttachment(BUFFERED_REQUEST_DATA);
2342             if (buffered != null) {
2343                 return readsResumed;
2344             }
2345             if(isFinished()) {
2346                 return false;
2347             }
2348             return anyAreSet(state, FLAG_SHOULD_RESUME_READS) || super.isReadResumed();
2349         }
2350
2351         @Override
2352         public int read(ByteBuffer dst) throws IOException {
2353             PooledByteBuffer[] buffered = getAttachment(BUFFERED_REQUEST_DATA);
2354             if (buffered == null) {
2355                 return super.read(dst);
2356             }
2357             int copied = 0;
2358             for (int i = 0; i < buffered.length; ++i) {
2359                 PooledByteBuffer pooled = buffered[i];
2360                 if (pooled != null) {
2361                     final ByteBuffer buf = pooled.getBuffer();
2362                     if (buf.hasRemaining()) {
2363                         copied += Buffers.copy(dst, buf);
2364                         if (!buf.hasRemaining()) {
2365                             pooled.close();
2366                             buffered[i] = null;
2367                         }
2368                         if (!dst.hasRemaining()) {
2369                             return copied;
2370                         }
2371                     } else {
2372                         pooled.close();
2373                         buffered[i] = null;
2374                     }
2375                 }
2376             }
2377             removeAttachment(BUFFERED_REQUEST_DATA);
2378             if (copied == 0) {
2379                 return super.read(dst);
2380             } else {
2381                 return copied;
2382             }
2383         }
2384
2385         public void runResume() {
2386             if (isReadResumed()) {
2387                 if(isFinished()) {
2388                     invokeListener();
2389                 } else {
2390                     if (wakeup) {
2391                         wakeup = false;
2392                         state &= ~FLAG_SHOULD_RESUME_READS;
2393                         delegate.wakeupReads();
2394                     } else {
2395                         state &= ~FLAG_SHOULD_RESUME_READS;
2396                         delegate.resumeReads();
2397                     }
2398                 }
2399             } else if(wakeup) {
2400                 wakeup = false;
2401                 invokeListener();
2402             }
2403         }
2404     }
2405
2406     public static class WrapperStreamSinkConduitFactory implements ConduitFactory<StreamSinkConduit> {
2407
2408         private final HttpServerExchange exchange;
2409         private final ConduitWrapper<StreamSinkConduit>[] wrappers;
2410         private int position;
2411         private final StreamSinkConduit first;
2412
2413
2414         public WrapperStreamSinkConduitFactory(ConduitWrapper<StreamSinkConduit>[] wrappers, int wrapperCount, HttpServerExchange exchange, StreamSinkConduit first) {
2415             this.wrappers = wrappers;
2416             this.exchange = exchange;
2417             this.first = first;
2418             this.position = wrapperCount - 1;
2419         }
2420
2421         @Override
2422         public StreamSinkConduit create() {
2423             if (position == -1) {
2424                 return exchange.getConnection().getSinkConduit(exchange, first);
2425             } else {
2426                 return wrappers[position--].wrap(this, exchange);
2427             }
2428         }
2429     }
2430
2431     public static class WrapperConduitFactory<T extends Conduit> implements ConduitFactory<T> {
2432
2433         private final HttpServerExchange exchange;
2434         private final ConduitWrapper<T>[] wrappers;
2435         private int position;
2436         private T first;
2437
2438
2439         public WrapperConduitFactory(ConduitWrapper<T>[] wrappers, int wrapperCount, T first, HttpServerExchange exchange) {
2440             this.wrappers = wrappers;
2441             this.exchange = exchange;
2442             this.position = wrapperCount - 1;
2443             this.first = first;
2444         }
2445
2446         @Override
2447         public T create() {
2448             if (position == -1) {
2449                 return first;
2450             } else {
2451                 return wrappers[position--].wrap(this, exchange);
2452             }
2453         }
2454     }
2455
2456     @Override
2457     public String toString() {
2458         return "HttpServerExchange{ " + getRequestMethod().toString() + " " + getRequestURI() + '}';
2459     }
2460 }
2461