1
18
19 package io.undertow.server;
20
21 import io.undertow.UndertowLogger;
22 import io.undertow.UndertowMessages;
23 import io.undertow.UndertowOptions;
24 import io.undertow.channels.DetachableStreamSinkChannel;
25 import io.undertow.channels.DetachableStreamSourceChannel;
26 import io.undertow.conduits.EmptyStreamSourceConduit;
27 import io.undertow.connector.PooledByteBuffer;
28 import io.undertow.io.AsyncReceiverImpl;
29 import io.undertow.io.AsyncSenderImpl;
30 import io.undertow.io.BlockingReceiverImpl;
31 import io.undertow.io.BlockingSenderImpl;
32 import io.undertow.io.Receiver;
33 import io.undertow.io.Sender;
34 import io.undertow.io.UndertowInputStream;
35 import io.undertow.io.UndertowOutputStream;
36 import io.undertow.security.api.SecurityContext;
37 import io.undertow.server.handlers.Cookie;
38 import io.undertow.util.AbstractAttachable;
39 import io.undertow.util.AttachmentKey;
40 import io.undertow.util.ConduitFactory;
41 import io.undertow.util.Cookies;
42 import io.undertow.util.HeaderMap;
43 import io.undertow.util.Headers;
44 import io.undertow.util.HttpString;
45 import io.undertow.util.Methods;
46 import io.undertow.util.NetworkUtils;
47 import io.undertow.util.Protocols;
48 import io.undertow.util.Rfc6265CookieSupport;
49 import io.undertow.util.StatusCodes;
50 import org.jboss.logging.Logger;
51 import org.xnio.Buffers;
52 import org.xnio.ChannelExceptionHandler;
53 import org.xnio.ChannelListener;
54 import org.xnio.ChannelListeners;
55 import org.xnio.IoUtils;
56 import org.xnio.XnioIoThread;
57 import org.xnio.channels.Channels;
58 import org.xnio.channels.Configurable;
59 import org.xnio.channels.StreamSinkChannel;
60 import org.xnio.channels.StreamSourceChannel;
61 import org.xnio.conduits.Conduit;
62 import org.xnio.conduits.ConduitStreamSinkChannel;
63 import org.xnio.conduits.ConduitStreamSourceChannel;
64 import org.xnio.conduits.StreamSinkConduit;
65 import org.xnio.conduits.StreamSourceConduit;
66
67 import java.io.IOException;
68 import java.io.InputStream;
69 import java.io.OutputStream;
70 import java.net.InetSocketAddress;
71 import java.nio.ByteBuffer;
72 import java.nio.channels.Channel;
73 import java.nio.channels.FileChannel;
74 import java.util.ArrayDeque;
75 import java.util.Deque;
76 import java.util.Map;
77 import java.util.TreeMap;
78 import java.util.concurrent.Executor;
79 import java.util.concurrent.TimeUnit;
80
81 import static org.xnio.Bits.allAreSet;
82 import static org.xnio.Bits.anyAreClear;
83 import static org.xnio.Bits.anyAreSet;
84 import static org.xnio.Bits.intBitMask;
85
86
92 public final class HttpServerExchange extends AbstractAttachable {
93
94
95
96 private static final Logger log = Logger.getLogger(HttpServerExchange.class);
97
98 private static final RuntimePermission SET_SECURITY_CONTEXT = new RuntimePermission("io.undertow.SET_SECURITY_CONTEXT");
99 private static final String ISO_8859_1 = "ISO-8859-1";
100 private static final String HTTPS = "https";
101
102
106 private static final AttachmentKey<String> REASON_PHRASE = AttachmentKey.create(String.class);
107
108
111 static final AttachmentKey<PooledByteBuffer[]> BUFFERED_REQUEST_DATA = AttachmentKey.create(PooledByteBuffer[].class);
112
113
116 public static final AttachmentKey<Map<String, String>> REQUEST_ATTRIBUTES = AttachmentKey.create(Map.class);
117
118
121 public static final AttachmentKey<String> REMOTE_USER = AttachmentKey.create(String.class);
122
123
124
127 public static final AttachmentKey<Boolean> SECURE_REQUEST = AttachmentKey.create(Boolean.class);
128
129 private final ServerConnection connection;
130 private final HeaderMap requestHeaders;
131 private final HeaderMap responseHeaders;
132
133 private int exchangeCompletionListenersCount = 0;
134 private ExchangeCompletionListener[] exchangeCompleteListeners;
135 private DefaultResponseListener[] defaultResponseListeners;
136
137 private Map<String, Deque<String>> queryParameters;
138 private Map<String, Deque<String>> pathParameters;
139
140 private Map<String, Cookie> requestCookies;
141 private Map<String, Cookie> responseCookies;
142
143
146 private WriteDispatchChannel responseChannel;
147
150 protected ReadDispatchChannel requestChannel;
151
152 private BlockingHttpExchange blockingHttpExchange;
153
154 private HttpString protocol;
155
156
159 private SecurityContext securityContext;
160
161
162
163 private int state = 200;
164 private HttpString requestMethod = HttpString.EMPTY;
165 private String requestScheme;
166
167
176 private String requestURI;
177
178
187 private String requestPath;
188
189
195 private String relativePath;
196
197
200 private String resolvedPath = "";
201
202
205 private String queryString = "";
206
207 private int requestWrapperCount = 0;
208 private ConduitWrapper<StreamSourceConduit>[] requestWrappers;
209
210 private int responseWrapperCount = 0;
211 private ConduitWrapper<StreamSinkConduit>[] responseWrappers;
212
213 private Sender sender;
214 private Receiver receiver;
215
216 private long requestStartTime = -1;
217
218
219
233 private long maxEntitySize;
234
235
239 private Runnable dispatchTask;
240
241
246 private Executor dispatchExecutor;
247
248
252 private long responseBytesSent = 0;
253
254
255 private static final int MASK_RESPONSE_CODE = intBitMask(0, 9);
256
257
260 private static final int FLAG_RESPONSE_SENT = 1 << 10;
261
262
265 private static final int FLAG_RESPONSE_TERMINATED = 1 << 11;
266
267
271 private static final int FLAG_REQUEST_TERMINATED = 1 << 12;
272
273
277 private static final int FLAG_PERSISTENT = 1 << 14;
278
279
286 private static final int FLAG_DISPATCHED = 1 << 15;
287
288
291 private static final int FLAG_URI_CONTAINS_HOST = 1 << 16;
292
293
305 private static final int FLAG_IN_CALL = 1 << 17;
306
309 private static final int FLAG_SHOULD_RESUME_READS = 1 << 18;
310
311
314 private static final int FLAG_SHOULD_RESUME_WRITES = 1 << 19;
315
316
319 private static final int FLAG_REQUEST_RESET= 1 << 20;
320
321
324 private InetSocketAddress sourceAddress;
325
326
329 private InetSocketAddress destinationAddress;
330
331 public HttpServerExchange(final ServerConnection connection, long maxEntitySize) {
332 this(connection, new HeaderMap(), new HeaderMap(), maxEntitySize);
333 }
334
335 public HttpServerExchange(final ServerConnection connection) {
336 this(connection, 0);
337 }
338
339 public HttpServerExchange(final ServerConnection connection, final HeaderMap requestHeaders, final HeaderMap responseHeaders, long maxEntitySize) {
340 this.connection = connection;
341 this.maxEntitySize = maxEntitySize;
342 this.requestHeaders = requestHeaders;
343 this.responseHeaders = responseHeaders;
344 }
345
346
351 public HttpString getProtocol() {
352 return protocol;
353 }
354
355
360 public HttpServerExchange setProtocol(final HttpString protocol) {
361 this.protocol = protocol;
362 return this;
363 }
364
365
370 public boolean isHttp09() {
371 return protocol.equals(Protocols.HTTP_0_9);
372 }
373
374
379 public boolean isHttp10() {
380 return protocol.equals(Protocols.HTTP_1_0);
381 }
382
383
388 public boolean isHttp11() {
389 return protocol.equals(Protocols.HTTP_1_1);
390 }
391
392 public boolean isSecure() {
393 Boolean secure = getAttachment(SECURE_REQUEST);
394 if(secure != null && secure) {
395 return true;
396 }
397 String scheme = getRequestScheme();
398 if (scheme != null && scheme.equalsIgnoreCase(HTTPS)) {
399 return true;
400 }
401 return false;
402 }
403
404
409 public HttpString getRequestMethod() {
410 return requestMethod;
411 }
412
413
418 public HttpServerExchange setRequestMethod(final HttpString requestMethod) {
419 this.requestMethod = requestMethod;
420 return this;
421 }
422
423
428 public String getRequestScheme() {
429 return requestScheme;
430 }
431
432
437 public HttpServerExchange setRequestScheme(final String requestScheme) {
438 this.requestScheme = requestScheme;
439 return this;
440 }
441
442
452 public String getRequestURI() {
453 return requestURI;
454 }
455
456
461 public HttpServerExchange setRequestURI(final String requestURI) {
462 this.requestURI = requestURI;
463 return this;
464 }
465
466
472 public HttpServerExchange setRequestURI(final String requestURI, boolean containsHost) {
473 this.requestURI = requestURI;
474 if (containsHost) {
475 this.state |= FLAG_URI_CONTAINS_HOST;
476 } else {
477 this.state &= ~FLAG_URI_CONTAINS_HOST;
478 }
479 return this;
480 }
481
482
491 public boolean isHostIncludedInRequestURI() {
492 return anyAreSet(state, FLAG_URI_CONTAINS_HOST);
493 }
494
495
496
505 public String getRequestPath() {
506 return requestPath;
507 }
508
509
514 public HttpServerExchange setRequestPath(final String requestPath) {
515 this.requestPath = requestPath;
516 return this;
517 }
518
519
527 public String getRelativePath() {
528 return relativePath;
529 }
530
531
536 public HttpServerExchange setRelativePath(final String relativePath) {
537 this.relativePath = relativePath;
538 return this;
539 }
540
541
546 public String getResolvedPath() {
547 return resolvedPath;
548 }
549
550
555 public HttpServerExchange setResolvedPath(final String resolvedPath) {
556 this.resolvedPath = resolvedPath;
557 return this;
558 }
559
560
564 public String getQueryString() {
565 return queryString;
566 }
567
568 public HttpServerExchange setQueryString(final String queryString) {
569 this.queryString = queryString;
570 return this;
571 }
572
573
579 public String getRequestURL() {
580 if (isHostIncludedInRequestURI()) {
581 return getRequestURI();
582 } else {
583 return getRequestScheme() + ": + getHostAndPort() + getRequestURI();
584 }
585 }
586
587
593 public String getRequestCharset() {
594 return extractCharset(requestHeaders);
595 }
596
597
603 public String getResponseCharset() {
604 HeaderMap headers = responseHeaders;
605 return extractCharset(headers);
606 }
607
608 private String extractCharset(HeaderMap headers) {
609 String contentType = headers.getFirst(Headers.CONTENT_TYPE);
610 if (contentType != null) {
611 String value = Headers.extractQuotedValueFromHeader(contentType, "charset");
612 if (value != null) {
613 return value;
614 }
615 }
616 return ISO_8859_1;
617 }
618
619
629 public String getHostName() {
630 String host = requestHeaders.getFirst(Headers.HOST);
631 if (host == null || "".equals(host.trim())) {
632 host = getDestinationAddress().getHostString();
633 } else {
634 if (host.startsWith("[")) {
635 host = host.substring(1, host.indexOf(']'));
636 } else if (host.indexOf(':') != -1) {
637 host = host.substring(0, host.indexOf(':'));
638 }
639 }
640 return host;
641 }
642
643
652 public String getHostAndPort() {
653 String host = requestHeaders.getFirst(Headers.HOST);
654 if (host == null || "".equals(host.trim())) {
655 InetSocketAddress address = getDestinationAddress();
656 host = NetworkUtils.formatPossibleIpv6Address(address.getHostString());
657 int port = address.getPort();
658 if (!((getRequestScheme().equals("http") && port == 80)
659 || (getRequestScheme().equals("https") && port == 443))) {
660 host = host + ":" + port;
661 }
662 }
663 return host;
664 }
665
666
672 public int getHostPort() {
673 String host = requestHeaders.getFirst(Headers.HOST);
674 if (host != null) {
675
676 final int colonIndex;
677 if (host.startsWith("[")) {
678 colonIndex = host.indexOf(':', host.indexOf(']'));
679 } else {
680 colonIndex = host.indexOf(':');
681 }
682 if (colonIndex != -1) {
683 try {
684 return Integer.parseInt(host.substring(colonIndex + 1));
685 } catch (NumberFormatException ignore) {}
686 }
687 if (getRequestScheme().equals("https")) {
688 return 443;
689 } else if (getRequestScheme().equals("http")) {
690 return 80;
691 }
692
693 }
694 return getDestinationAddress().getPort();
695 }
696
697
702 public ServerConnection getConnection() {
703 return connection;
704 }
705
706 public boolean isPersistent() {
707 return anyAreSet(state, FLAG_PERSISTENT);
708 }
709
710
714 public boolean isInIoThread() {
715 return getIoThread() == Thread.currentThread();
716 }
717
718
722 public boolean isUpgrade() {
723 return getStatusCode() == StatusCodes.SWITCHING_PROTOCOLS;
724 }
725
726
730 public long getResponseBytesSent() {
731 if(Connectors.isEntityBodyAllowed(this) && !getRequestMethod().equals(Methods.HEAD)) {
732 return responseBytesSent;
733 } else {
734 return 0;
735 }
736 }
737
738
742 void updateBytesSent(long bytes) {
743 if(Connectors.isEntityBodyAllowed(this) && !getRequestMethod().equals(Methods.HEAD)) {
744 responseBytesSent += bytes;
745 }
746 }
747
748 public HttpServerExchange setPersistent(final boolean persistent) {
749 if (persistent) {
750 this.state = this.state | FLAG_PERSISTENT;
751 } else {
752 this.state = this.state & ~FLAG_PERSISTENT;
753 }
754 return this;
755 }
756
757 public boolean isDispatched() {
758 return anyAreSet(state, FLAG_DISPATCHED);
759 }
760
761 public HttpServerExchange unDispatch() {
762 state &= ~FLAG_DISPATCHED;
763 dispatchTask = null;
764 return this;
765 }
766
767
774 @Deprecated
775 public HttpServerExchange dispatch() {
776 state |= FLAG_DISPATCHED;
777 return this;
778 }
779
780
791 public HttpServerExchange dispatch(final Runnable runnable) {
792 dispatch(null, runnable);
793 return this;
794 }
795
796
807 public HttpServerExchange dispatch(final Executor executor, final Runnable runnable) {
808 if (isInCall()) {
809 if (executor != null) {
810 this.dispatchExecutor = executor;
811 }
812 state |= FLAG_DISPATCHED;
813 if(anyAreSet(state, FLAG_SHOULD_RESUME_READS | FLAG_SHOULD_RESUME_WRITES)) {
814 throw UndertowMessages.MESSAGES.resumedAndDispatched();
815 }
816 this.dispatchTask = runnable;
817 } else {
818 if (executor == null) {
819 getConnection().getWorker().execute(runnable);
820 } else {
821 executor.execute(runnable);
822 }
823 }
824 return this;
825 }
826
827 public HttpServerExchange dispatch(final HttpHandler handler) {
828 dispatch(null, handler);
829 return this;
830 }
831
832 public HttpServerExchange dispatch(final Executor executor, final HttpHandler handler) {
833 final Runnable runnable = new Runnable() {
834 @Override
835 public void run() {
836 Connectors.executeRootHandler(handler, HttpServerExchange.this);
837 }
838 };
839 dispatch(executor, runnable);
840 return this;
841 }
842
843
848 public HttpServerExchange setDispatchExecutor(final Executor executor) {
849 if (executor == null) {
850 dispatchExecutor = null;
851 } else {
852 dispatchExecutor = executor;
853 }
854 return this;
855 }
856
857
862 public Executor getDispatchExecutor() {
863 return dispatchExecutor;
864 }
865
866
869 Runnable getDispatchTask() {
870 return dispatchTask;
871 }
872
873 boolean isInCall() {
874 return anyAreSet(state, FLAG_IN_CALL);
875 }
876
877 HttpServerExchange setInCall(boolean value) {
878 if (value) {
879 state |= FLAG_IN_CALL;
880 } else {
881 state &= ~FLAG_IN_CALL;
882 }
883 return this;
884 }
885
886
887
895 public HttpServerExchange upgradeChannel(final HttpUpgradeListener listener) {
896 if (!connection.isUpgradeSupported()) {
897 throw UndertowMessages.MESSAGES.upgradeNotSupported();
898 }
899 if(!getRequestHeaders().contains(Headers.UPGRADE)) {
900 throw UndertowMessages.MESSAGES.notAnUpgradeRequest();
901 }
902 UndertowLogger.REQUEST_LOGGER.debugf("Upgrading request %s", this);
903 connection.setUpgradeListener(listener);
904 setStatusCode(StatusCodes.SWITCHING_PROTOCOLS);
905 getResponseHeaders().put(Headers.CONNECTION, Headers.UPGRADE_STRING);
906 return this;
907 }
908
909
918 public HttpServerExchange upgradeChannel(String productName, final HttpUpgradeListener listener) {
919 if (!connection.isUpgradeSupported()) {
920 throw UndertowMessages.MESSAGES.upgradeNotSupported();
921 }
922 UndertowLogger.REQUEST_LOGGER.debugf("Upgrading request %s", this);
923 connection.setUpgradeListener(listener);
924 setStatusCode(StatusCodes.SWITCHING_PROTOCOLS);
925 final HeaderMap headers = getResponseHeaders();
926 headers.put(Headers.UPGRADE, productName);
927 headers.put(Headers.CONNECTION, Headers.UPGRADE_STRING);
928 return this;
929 }
930
931
936 public HttpServerExchange acceptConnectRequest(HttpUpgradeListener connectListener) {
937 if(!getRequestMethod().equals(Methods.CONNECT)) {
938 throw UndertowMessages.MESSAGES.notAConnectRequest();
939 }
940 connection.setConnectListener(connectListener);
941 return this;
942 }
943
944
945 public HttpServerExchange addExchangeCompleteListener(final ExchangeCompletionListener listener) {
946 if(isComplete() || this.exchangeCompletionListenersCount == -1) {
947 throw UndertowMessages.MESSAGES.exchangeAlreadyComplete();
948 }
949 final int exchangeCompletionListenersCount = this.exchangeCompletionListenersCount++;
950 ExchangeCompletionListener[] exchangeCompleteListeners = this.exchangeCompleteListeners;
951 if (exchangeCompleteListeners == null || exchangeCompleteListeners.length == exchangeCompletionListenersCount) {
952 ExchangeCompletionListener[] old = exchangeCompleteListeners;
953 this.exchangeCompleteListeners = exchangeCompleteListeners = new ExchangeCompletionListener[exchangeCompletionListenersCount + 2];
954 if(old != null) {
955 System.arraycopy(old, 0, exchangeCompleteListeners, 0, exchangeCompletionListenersCount);
956 }
957 }
958 exchangeCompleteListeners[exchangeCompletionListenersCount] = listener;
959 return this;
960 }
961
962 public HttpServerExchange addDefaultResponseListener(final DefaultResponseListener listener) {
963 int i = 0;
964 if(defaultResponseListeners == null) {
965 defaultResponseListeners = new DefaultResponseListener[2];
966 } else {
967 while (i != defaultResponseListeners.length && defaultResponseListeners[i] != null) {
968 ++i;
969 }
970 if (i == defaultResponseListeners.length) {
971 DefaultResponseListener[] old = defaultResponseListeners;
972 defaultResponseListeners = new DefaultResponseListener[defaultResponseListeners.length + 2];
973 System.arraycopy(old, 0, defaultResponseListeners, 0, old.length);
974 }
975 }
976 defaultResponseListeners[i] = listener;
977 return this;
978 }
979
980
985 public InetSocketAddress getSourceAddress() {
986 if (sourceAddress != null) {
987 return sourceAddress;
988 }
989 return connection.getPeerAddress(InetSocketAddress.class);
990 }
991
992
998 public HttpServerExchange setSourceAddress(InetSocketAddress sourceAddress) {
999 this.sourceAddress = sourceAddress;
1000 return this;
1001 }
1002
1003
1008 public InetSocketAddress getDestinationAddress() {
1009 if (destinationAddress != null) {
1010 return destinationAddress;
1011 }
1012 return connection.getLocalAddress(InetSocketAddress.class);
1013 }
1014
1015
1021 public HttpServerExchange setDestinationAddress(InetSocketAddress destinationAddress) {
1022 this.destinationAddress = destinationAddress;
1023 return this;
1024 }
1025
1026
1031 public HeaderMap getRequestHeaders() {
1032 return requestHeaders;
1033 }
1034
1035
1038 public long getRequestContentLength() {
1039 String contentLengthString = requestHeaders.getFirst(Headers.CONTENT_LENGTH);
1040 if (contentLengthString == null) {
1041 return -1;
1042 }
1043 return Long.parseLong(contentLengthString);
1044 }
1045
1046
1051 public HeaderMap getResponseHeaders() {
1052 return responseHeaders;
1053 }
1054
1055
1058 public long getResponseContentLength() {
1059 String contentLengthString = responseHeaders.getFirst(Headers.CONTENT_LENGTH);
1060 if (contentLengthString == null) {
1061 return -1;
1062 }
1063 return Long.parseLong(contentLengthString);
1064 }
1065
1066
1071 public HttpServerExchange setResponseContentLength(long length) {
1072 if (length == -1) {
1073 responseHeaders.remove(Headers.CONTENT_LENGTH);
1074 } else {
1075 responseHeaders.put(Headers.CONTENT_LENGTH, Long.toString(length));
1076 }
1077 return this;
1078 }
1079
1080
1085 public Map<String, Deque<String>> getQueryParameters() {
1086 if (queryParameters == null) {
1087 queryParameters = new TreeMap<>();
1088 }
1089 return queryParameters;
1090 }
1091
1092 public HttpServerExchange addQueryParam(final String name, final String param) {
1093 if (queryParameters == null) {
1094 queryParameters = new TreeMap<>();
1095 }
1096 Deque<String> list = queryParameters.get(name);
1097 if (list == null) {
1098 queryParameters.put(name, list = new ArrayDeque<>(2));
1099 }
1100 list.add(param);
1101 return this;
1102 }
1103
1104
1105
1110 public Map<String, Deque<String>> getPathParameters() {
1111 if (pathParameters == null) {
1112 pathParameters = new TreeMap<>();
1113 }
1114 return pathParameters;
1115 }
1116
1117 public HttpServerExchange addPathParam(final String name, final String param) {
1118 if (pathParameters == null) {
1119 pathParameters = new TreeMap<>();
1120 }
1121 Deque<String> list = pathParameters.get(name);
1122 if (list == null) {
1123 pathParameters.put(name, list = new ArrayDeque<>(2));
1124 }
1125 list.add(param);
1126 return this;
1127 }
1128
1129
1132 public Map<String, Cookie> getRequestCookies() {
1133 if (requestCookies == null) {
1134 requestCookies = Cookies.parseRequestCookies(
1135 getConnection().getUndertowOptions().get(UndertowOptions.MAX_COOKIES, 200),
1136 getConnection().getUndertowOptions().get(UndertowOptions.ALLOW_EQUALS_IN_COOKIE_VALUE, false),
1137 requestHeaders.get(Headers.COOKIE));
1138 }
1139 return requestCookies;
1140 }
1141
1142
1147 public HttpServerExchange setResponseCookie(final Cookie cookie) {
1148 if (getConnection().getUndertowOptions().get(UndertowOptions.ENABLE_RFC6265_COOKIE_VALIDATION, UndertowOptions.DEFAULT_ENABLE_RFC6265_COOKIE_VALIDATION)) {
1149 if (cookie.getValue() != null && !cookie.getValue().isEmpty()) {
1150 Rfc6265CookieSupport.validateCookieValue(cookie.getValue());
1151 }
1152 if (cookie.getPath() != null && !cookie.getPath().isEmpty()) {
1153 Rfc6265CookieSupport.validatePath(cookie.getPath());
1154 }
1155 if (cookie.getDomain() != null && !cookie.getDomain().isEmpty()) {
1156 Rfc6265CookieSupport.validateDomain(cookie.getDomain());
1157 }
1158 }
1159 if (responseCookies == null) {
1160 responseCookies = new TreeMap<>();
1161 }
1162 responseCookies.put(cookie.getName(), cookie);
1163 return this;
1164 }
1165
1166
1169 public Map<String, Cookie> getResponseCookies() {
1170 if (responseCookies == null) {
1171 responseCookies = new TreeMap<>();
1172 }
1173 return responseCookies;
1174 }
1175
1176
1181 Map<String, Cookie> getResponseCookiesInternal() {
1182 return responseCookies;
1183 }
1184
1185
1188 public boolean isResponseStarted() {
1189 return allAreSet(state, FLAG_RESPONSE_SENT);
1190 }
1191
1192
1200 public StreamSourceChannel getRequestChannel() {
1201 if (requestChannel != null) {
1202 if(anyAreSet(state, FLAG_REQUEST_RESET)) {
1203 state &= ~FLAG_REQUEST_RESET;
1204 return requestChannel;
1205 }
1206 return null;
1207 }
1208 if (anyAreSet(state, FLAG_REQUEST_TERMINATED)) {
1209 return requestChannel = new ReadDispatchChannel(new ConduitStreamSourceChannel(Configurable.EMPTY, new EmptyStreamSourceConduit(getIoThread())));
1210 }
1211 final ConduitWrapper<StreamSourceConduit>[] wrappers = this.requestWrappers;
1212 final ConduitStreamSourceChannel sourceChannel = connection.getSourceChannel();
1213 if (wrappers != null) {
1214 this.requestWrappers = null;
1215 final WrapperConduitFactory<StreamSourceConduit> factory = new WrapperConduitFactory<>(wrappers, requestWrapperCount, sourceChannel.getConduit(), this);
1216 sourceChannel.setConduit(factory.create());
1217 }
1218 return requestChannel = new ReadDispatchChannel(sourceChannel);
1219 }
1220
1221 void resetRequestChannel() {
1222 state |= FLAG_REQUEST_RESET;
1223 }
1224
1225 public boolean isRequestChannelAvailable() {
1226 return requestChannel == null || anyAreSet(state, FLAG_REQUEST_RESET);
1227 }
1228
1229
1233 public boolean isComplete() {
1234 return allAreSet(state, FLAG_REQUEST_TERMINATED | FLAG_RESPONSE_TERMINATED);
1235 }
1236
1237
1243 public boolean isRequestComplete() {
1244 PooledByteBuffer[] data = getAttachment(BUFFERED_REQUEST_DATA);
1245 if(data != null) {
1246 return false;
1247 }
1248 return allAreSet(state, FLAG_REQUEST_TERMINATED);
1249 }
1250
1251
1254 public boolean isResponseComplete() {
1255 return allAreSet(state, FLAG_RESPONSE_TERMINATED);
1256 }
1257
1258
1262 void terminateRequest() {
1263 int oldVal = state;
1264 if (allAreSet(oldVal, FLAG_REQUEST_TERMINATED)) {
1265
1266 return;
1267 }
1268 if (requestChannel != null) {
1269 requestChannel.requestDone();
1270 }
1271 this.state = oldVal | FLAG_REQUEST_TERMINATED;
1272 if (anyAreSet(oldVal, FLAG_RESPONSE_TERMINATED)) {
1273 invokeExchangeCompleteListeners();
1274 }
1275 }
1276
1277 private void invokeExchangeCompleteListeners() {
1278 if (exchangeCompletionListenersCount > 0) {
1279 int i = exchangeCompletionListenersCount - 1;
1280 ExchangeCompletionListener next = exchangeCompleteListeners[i];
1281 exchangeCompletionListenersCount = -1;
1282 next.exchangeEvent(this, new ExchangeCompleteNextListener(exchangeCompleteListeners, this, i));
1283 } else if (exchangeCompletionListenersCount == 0) {
1284 exchangeCompletionListenersCount = -1;
1285 connection.exchangeComplete(this);
1286 }
1287 }
1288
1289
1309 public StreamSinkChannel getResponseChannel() {
1310 if (responseChannel != null) {
1311 return null;
1312 }
1313 final ConduitWrapper<StreamSinkConduit>[] wrappers = responseWrappers;
1314 this.responseWrappers = null;
1315 final ConduitStreamSinkChannel sinkChannel = connection.getSinkChannel();
1316 if (sinkChannel == null) {
1317 return null;
1318 }
1319 if(wrappers != null) {
1320 final WrapperStreamSinkConduitFactory factory = new WrapperStreamSinkConduitFactory(wrappers, responseWrapperCount, this, sinkChannel.getConduit());
1321 sinkChannel.setConduit(factory.create());
1322 } else {
1323 sinkChannel.setConduit(connection.getSinkConduit(this, sinkChannel.getConduit()));
1324 }
1325 this.responseChannel = new WriteDispatchChannel(sinkChannel);
1326 this.startResponse();
1327 return responseChannel;
1328 }
1329
1330
1338 public Sender getResponseSender() {
1339 if (blockingHttpExchange != null) {
1340 return blockingHttpExchange.getSender();
1341 }
1342 if (sender != null) {
1343 return sender;
1344 }
1345 return sender = new AsyncSenderImpl(this);
1346 }
1347
1348 public Receiver getRequestReceiver() {
1349 if(blockingHttpExchange != null) {
1350 return blockingHttpExchange.getReceiver();
1351 }
1352 if(receiver != null) {
1353 return receiver;
1354 }
1355 return receiver = new AsyncReceiverImpl(this);
1356 }
1357
1358
1361 public boolean isResponseChannelAvailable() {
1362 return responseChannel == null;
1363 }
1364
1365
1366
1372 @Deprecated
1373 public int getResponseCode() {
1374 return state & MASK_RESPONSE_CODE;
1375 }
1376
1377
1385 @Deprecated
1386 public HttpServerExchange setResponseCode(final int statusCode) {
1387 return setStatusCode(statusCode);
1388 }
1389
1390
1395 public int getStatusCode() {
1396 return state & MASK_RESPONSE_CODE;
1397 }
1398
1399
1406 public HttpServerExchange setStatusCode(final int statusCode) {
1407 if (statusCode < 0 || statusCode > 999) {
1408 throw new IllegalArgumentException("Invalid response code");
1409 }
1410 int oldVal = state;
1411 if (allAreSet(oldVal, FLAG_RESPONSE_SENT)) {
1412 throw UndertowMessages.MESSAGES.responseAlreadyStarted();
1413 }
1414 if(statusCode >= 500) {
1415 if(UndertowLogger.ERROR_RESPONSE.isDebugEnabled()) {
1416 UndertowLogger.ERROR_RESPONSE.debugf(new RuntimeException(), "Setting error code %s for exchange %s", statusCode, this);
1417 }
1418 }
1419 this.state = oldVal & ~MASK_RESPONSE_CODE | statusCode & MASK_RESPONSE_CODE;
1420 return this;
1421 }
1422
1423
1432 public HttpServerExchange setReasonPhrase(String message) {
1433 putAttachment(REASON_PHRASE, message);
1434 return this;
1435 }
1436
1437
1441 public String getReasonPhrase() {
1442 return getAttachment(REASON_PHRASE);
1443 }
1444
1445
1450 public HttpServerExchange addRequestWrapper(final ConduitWrapper<StreamSourceConduit> wrapper) {
1451 ConduitWrapper<StreamSourceConduit>[] wrappers = requestWrappers;
1452 if (requestChannel != null) {
1453 throw UndertowMessages.MESSAGES.requestChannelAlreadyProvided();
1454 }
1455 if (wrappers == null) {
1456 wrappers = requestWrappers = new ConduitWrapper[2];
1457 } else if (wrappers.length == requestWrapperCount) {
1458 requestWrappers = new ConduitWrapper[wrappers.length + 2];
1459 System.arraycopy(wrappers, 0, requestWrappers, 0, wrappers.length);
1460 wrappers = requestWrappers;
1461 }
1462 wrappers[requestWrapperCount++] = wrapper;
1463 return this;
1464 }
1465
1466
1471 public HttpServerExchange addResponseWrapper(final ConduitWrapper<StreamSinkConduit> wrapper) {
1472 ConduitWrapper<StreamSinkConduit>[] wrappers = responseWrappers;
1473 if (responseChannel != null) {
1474 throw UndertowMessages.MESSAGES.responseChannelAlreadyProvided();
1475 }
1476 if(wrappers == null) {
1477 this.responseWrappers = wrappers = new ConduitWrapper[2];
1478 } else if (wrappers.length == responseWrapperCount) {
1479 responseWrappers = new ConduitWrapper[wrappers.length + 2];
1480 System.arraycopy(wrappers, 0, responseWrappers, 0, wrappers.length);
1481 wrappers = responseWrappers;
1482 }
1483 wrappers[responseWrapperCount++] = wrapper;
1484 return this;
1485 }
1486
1487
1497 public BlockingHttpExchange startBlocking() {
1498 final BlockingHttpExchange old = this.blockingHttpExchange;
1499 blockingHttpExchange = new DefaultBlockingHttpExchange(this);
1500 return old;
1501 }
1502
1503
1517 public BlockingHttpExchange startBlocking(final BlockingHttpExchange httpExchange) {
1518 final BlockingHttpExchange old = this.blockingHttpExchange;
1519 blockingHttpExchange = httpExchange;
1520 return old;
1521 }
1522
1523
1528 public boolean isBlocking() {
1529 return blockingHttpExchange != null;
1530 }
1531
1532
1536 public InputStream getInputStream() {
1537 if (blockingHttpExchange == null) {
1538 throw UndertowMessages.MESSAGES.startBlockingHasNotBeenCalled();
1539 }
1540 return blockingHttpExchange.getInputStream();
1541 }
1542
1543
1547 public OutputStream getOutputStream() {
1548 if (blockingHttpExchange == null) {
1549 throw UndertowMessages.MESSAGES.startBlockingHasNotBeenCalled();
1550 }
1551 return blockingHttpExchange.getOutputStream();
1552 }
1553
1554
1558 HttpServerExchange terminateResponse() {
1559 int oldVal = state;
1560 if (allAreSet(oldVal, FLAG_RESPONSE_TERMINATED)) {
1561
1562 return this;
1563 }
1564 if(responseChannel != null) {
1565 responseChannel.responseDone();
1566 }
1567 this.state = oldVal | FLAG_RESPONSE_TERMINATED;
1568 if (anyAreSet(oldVal, FLAG_REQUEST_TERMINATED)) {
1569 invokeExchangeCompleteListeners();
1570 }
1571 return this;
1572 }
1573
1574
1580 public long getRequestStartTime() {
1581 return requestStartTime;
1582 }
1583
1584
1585 HttpServerExchange setRequestStartTime(long requestStartTime) {
1586 this.requestStartTime = requestStartTime;
1587 return this;
1588 }
1589
1590
1598 public HttpServerExchange endExchange() {
1599 final int state = this.state;
1600 if (allAreSet(state, FLAG_REQUEST_TERMINATED | FLAG_RESPONSE_TERMINATED)) {
1601 if(blockingHttpExchange != null) {
1602
1603 IoUtils.safeClose(blockingHttpExchange);
1604 }
1605 return this;
1606 }
1607 if(defaultResponseListeners != null) {
1608 int i = defaultResponseListeners.length - 1;
1609 while (i >= 0) {
1610 DefaultResponseListener listener = defaultResponseListeners[i];
1611 if (listener != null) {
1612 defaultResponseListeners[i] = null;
1613 try {
1614 if (listener.handleDefaultResponse(this)) {
1615 return this;
1616 }
1617 } catch (Throwable e) {
1618 UndertowLogger.REQUEST_LOGGER.debug("Exception running default response listener", e);
1619 }
1620 }
1621 i--;
1622 }
1623 }
1624
1625 if (anyAreClear(state, FLAG_REQUEST_TERMINATED)) {
1626 connection.terminateRequestChannel(this);
1627 }
1628
1629 if (blockingHttpExchange != null) {
1630 try {
1631
1632 blockingHttpExchange.close();
1633 } catch (IOException e) {
1634 UndertowLogger.REQUEST_IO_LOGGER.ioException(e);
1635 IoUtils.safeClose(connection);
1636 } catch (Throwable t) {
1637 UndertowLogger.REQUEST_IO_LOGGER.handleUnexpectedFailure(t);
1638 IoUtils.safeClose(connection);
1639 }
1640 }
1641
1642
1643
1644 if (anyAreClear(state, FLAG_REQUEST_TERMINATED)) {
1645
1646
1647
1648 if (requestChannel == null) {
1649 getRequestChannel();
1650 }
1651 int totalRead = 0;
1652 for (; ; ) {
1653 try {
1654 long read = Channels.drain(requestChannel, Long.MAX_VALUE);
1655 totalRead += read;
1656 if (read == 0) {
1657
1658
1659
1660
1661
1662 if (getStatusCode() != StatusCodes.EXPECTATION_FAILED || totalRead > 0) {
1663 requestChannel.getReadSetter().set(ChannelListeners.drainListener(Long.MAX_VALUE,
1664 new ChannelListener<StreamSourceChannel>() {
1665 @Override
1666 public void handleEvent(final StreamSourceChannel channel) {
1667 if (anyAreClear(state, FLAG_RESPONSE_TERMINATED)) {
1668 closeAndFlushResponse();
1669 }
1670 }
1671 }, new ChannelExceptionHandler<StreamSourceChannel>() {
1672 @Override
1673 public void handleException(final StreamSourceChannel channel, final IOException e) {
1674
1675
1676
1677 terminateRequest();
1678 terminateResponse();
1679 UndertowLogger.REQUEST_LOGGER.debug("Exception draining request stream", e);
1680 IoUtils.safeClose(connection);
1681 }
1682 }
1683 ));
1684 requestChannel.resumeReads();
1685 return this;
1686 } else {
1687 break;
1688 }
1689 } else if (read == -1) {
1690 break;
1691 }
1692 } catch (Throwable t) {
1693 if (t instanceof IOException) {
1694 UndertowLogger.REQUEST_IO_LOGGER.ioException((IOException) t);
1695 } else {
1696 UndertowLogger.REQUEST_IO_LOGGER.handleUnexpectedFailure(t);
1697 }
1698 invokeExchangeCompleteListeners();
1699 IoUtils.safeClose(connection);
1700 return this;
1701 }
1702
1703 }
1704 }
1705 if (anyAreClear(state, FLAG_RESPONSE_TERMINATED)) {
1706 closeAndFlushResponse();
1707 }
1708 return this;
1709 }
1710
1711 private void closeAndFlushResponse() {
1712 if(!connection.isOpen()) {
1713
1714
1715
1716 terminateRequest();
1717 terminateResponse();
1718 return;
1719 }
1720 try {
1721 if (isResponseChannelAvailable()) {
1722 if(!getRequestMethod().equals(Methods.CONNECT) && !(getRequestMethod().equals(Methods.HEAD) && getResponseHeaders().contains(Headers.CONTENT_LENGTH)) && Connectors.isEntityBodyAllowed(this)) {
1723
1724 getResponseHeaders().put(Headers.CONTENT_LENGTH, "0");
1725 }
1726 getResponseChannel();
1727 }
1728 responseChannel.shutdownWrites();
1729 if (!responseChannel.flush()) {
1730 responseChannel.getWriteSetter().set(ChannelListeners.flushingChannelListener(
1731 new ChannelListener<StreamSinkChannel>() {
1732 @Override
1733 public void handleEvent(final StreamSinkChannel channel) {
1734 channel.suspendWrites();
1735 channel.getWriteSetter().set(null);
1736
1737 if (anyAreClear(state, FLAG_RESPONSE_TERMINATED)) {
1738
1739 invokeExchangeCompleteListeners();
1740 UndertowLogger.ROOT_LOGGER.responseWasNotTerminated(connection, HttpServerExchange.this);
1741 IoUtils.safeClose(connection);
1742 }
1743 }
1744 }, new ChannelExceptionHandler<Channel>() {
1745 @Override
1746 public void handleException(final Channel channel, final IOException exception) {
1747
1748 invokeExchangeCompleteListeners();
1749 UndertowLogger.REQUEST_LOGGER.debug("Exception ending request", exception);
1750 IoUtils.safeClose(connection);
1751 }
1752 }
1753 ));
1754 responseChannel.resumeWrites();
1755 } else {
1756
1757 if (anyAreClear(state, FLAG_RESPONSE_TERMINATED)) {
1758
1759 invokeExchangeCompleteListeners();
1760 UndertowLogger.ROOT_LOGGER.responseWasNotTerminated(connection, this);
1761 IoUtils.safeClose(connection);
1762 }
1763 }
1764 } catch (Throwable t) {
1765 if (t instanceof IOException) {
1766 UndertowLogger.REQUEST_IO_LOGGER.ioException((IOException) t);
1767 } else {
1768 UndertowLogger.REQUEST_IO_LOGGER.handleUnexpectedFailure(t);
1769 }
1770 invokeExchangeCompleteListeners();
1771
1772 IoUtils.safeClose(connection);
1773 }
1774 }
1775
1776
1795 HttpServerExchange startResponse() throws IllegalStateException {
1796 int oldVal = state;
1797 if (allAreSet(oldVal, FLAG_RESPONSE_SENT)) {
1798 throw UndertowMessages.MESSAGES.responseAlreadyStarted();
1799 }
1800 this.state = oldVal | FLAG_RESPONSE_SENT;
1801
1802 log.tracef("Starting to write response for %s", this);
1803 return this;
1804 }
1805
1806 public XnioIoThread getIoThread() {
1807 return connection.getIoThread();
1808 }
1809
1810
1813 public long getMaxEntitySize() {
1814 return maxEntitySize;
1815 }
1816
1817
1822 public HttpServerExchange setMaxEntitySize(final long maxEntitySize) {
1823 if (!isRequestChannelAvailable()) {
1824 throw UndertowMessages.MESSAGES.requestChannelAlreadyProvided();
1825 }
1826 this.maxEntitySize = maxEntitySize;
1827 connection.maxEntitySizeUpdated(this);
1828 return this;
1829 }
1830
1831 public SecurityContext getSecurityContext() {
1832 return securityContext;
1833 }
1834
1835 public void setSecurityContext(SecurityContext securityContext) {
1836 SecurityManager sm = System.getSecurityManager();
1837 if(sm != null) {
1838 sm.checkPermission(SET_SECURITY_CONTEXT);
1839 }
1840 this.securityContext = securityContext;
1841 }
1842
1843
1848 public void addResponseCommitListener(final ResponseCommitListener listener) {
1849
1850
1851
1852
1853 addResponseWrapper(new ConduitWrapper<StreamSinkConduit>() {
1854 @Override
1855 public StreamSinkConduit wrap(ConduitFactory<StreamSinkConduit> factory, HttpServerExchange exchange) {
1856 listener.beforeCommit(exchange);
1857 return factory.create();
1858 }
1859 });
1860 }
1861
1862
1867 boolean runResumeReadWrite() {
1868 boolean ret = false;
1869 if(anyAreSet(state, FLAG_SHOULD_RESUME_WRITES)) {
1870 responseChannel.runResume();
1871 ret = true;
1872 }
1873 if(anyAreSet(state, FLAG_SHOULD_RESUME_READS)) {
1874 requestChannel.runResume();
1875 ret = true;
1876 }
1877 return ret;
1878 }
1879
1880 boolean isResumed() {
1881 return anyAreSet(state, FLAG_SHOULD_RESUME_WRITES | FLAG_SHOULD_RESUME_READS);
1882 }
1883
1884 private static class ExchangeCompleteNextListener implements ExchangeCompletionListener.NextListener {
1885 private final ExchangeCompletionListener[] list;
1886 private final HttpServerExchange exchange;
1887 private int i;
1888
1889 ExchangeCompleteNextListener(final ExchangeCompletionListener[] list, final HttpServerExchange exchange, int i) {
1890 this.list = list;
1891 this.exchange = exchange;
1892 this.i = i;
1893 }
1894
1895 @Override
1896 public void proceed() {
1897 if (--i >= 0) {
1898 final ExchangeCompletionListener next = list[i];
1899 next.exchangeEvent(exchange, this);
1900 } else if(i == -1) {
1901 exchange.connection.exchangeComplete(exchange);
1902 }
1903 }
1904 }
1905
1906 private static class DefaultBlockingHttpExchange implements BlockingHttpExchange {
1907
1908 private InputStream inputStream;
1909 private UndertowOutputStream outputStream;
1910 private Sender sender;
1911 private final HttpServerExchange exchange;
1912
1913 DefaultBlockingHttpExchange(final HttpServerExchange exchange) {
1914 this.exchange = exchange;
1915 }
1916
1917 public InputStream getInputStream() {
1918 if (inputStream == null) {
1919 inputStream = new UndertowInputStream(exchange);
1920 }
1921 return inputStream;
1922 }
1923
1924 public UndertowOutputStream getOutputStream() {
1925 if (outputStream == null) {
1926 outputStream = new UndertowOutputStream(exchange);
1927 }
1928 return outputStream;
1929 }
1930
1931 @Override
1932 public Sender getSender() {
1933 if (sender == null) {
1934 sender = new BlockingSenderImpl(exchange, getOutputStream());
1935 }
1936 return sender;
1937 }
1938
1939 @Override
1940 public void close() throws IOException {
1941 try {
1942 getInputStream().close();
1943 } finally {
1944 getOutputStream().close();
1945 }
1946 }
1947
1948 @Override
1949 public Receiver getReceiver() {
1950 return new BlockingReceiverImpl(exchange, getInputStream());
1951 }
1952 }
1953
1954
1963 private class WriteDispatchChannel extends DetachableStreamSinkChannel implements StreamSinkChannel {
1964
1965 private boolean wakeup;
1966
1967 WriteDispatchChannel(final ConduitStreamSinkChannel delegate) {
1968 super(delegate);
1969 }
1970
1971 @Override
1972 protected boolean isFinished() {
1973 return allAreSet(state, FLAG_RESPONSE_TERMINATED);
1974 }
1975
1976 @Override
1977 public void resumeWrites() {
1978 if (isInCall()) {
1979 state |= FLAG_SHOULD_RESUME_WRITES;
1980 if(anyAreSet(state, FLAG_DISPATCHED)) {
1981 throw UndertowMessages.MESSAGES.resumedAndDispatched();
1982 }
1983 } else if(!isFinished()){
1984 delegate.resumeWrites();
1985 }
1986 }
1987
1988 @Override
1989 public void suspendWrites() {
1990 state &= ~FLAG_SHOULD_RESUME_WRITES;
1991 super.suspendWrites();
1992 }
1993
1994 @Override
1995 public void wakeupWrites() {
1996 if (isFinished()) {
1997 return;
1998 }
1999 if (isInCall()) {
2000 wakeup = true;
2001 state |= FLAG_SHOULD_RESUME_WRITES;
2002 if(anyAreSet(state, FLAG_DISPATCHED)) {
2003 throw UndertowMessages.MESSAGES.resumedAndDispatched();
2004 }
2005 } else {
2006 delegate.wakeupWrites();
2007 }
2008 }
2009
2010 @Override
2011 public boolean isWriteResumed() {
2012 return anyAreSet(state, FLAG_SHOULD_RESUME_WRITES) || super.isWriteResumed();
2013 }
2014
2015 public void runResume() {
2016 if (isWriteResumed()) {
2017 if(isFinished()) {
2018 invokeListener();
2019 } else {
2020 if (wakeup) {
2021 wakeup = false;
2022 state &= ~FLAG_SHOULD_RESUME_WRITES;
2023 delegate.wakeupWrites();
2024 } else {
2025 state &= ~FLAG_SHOULD_RESUME_WRITES;
2026 delegate.resumeWrites();
2027 }
2028 }
2029 } else if(wakeup) {
2030 wakeup = false;
2031 invokeListener();
2032 }
2033 }
2034
2035 private void invokeListener() {
2036 if(writeSetter != null) {
2037 super.getIoThread().execute(new Runnable() {
2038 @Override
2039 public void run() {
2040 ChannelListeners.invokeChannelListener(WriteDispatchChannel.this, writeSetter.get());
2041 }
2042 });
2043 }
2044 }
2045
2046 @Override
2047 public void awaitWritable() throws IOException {
2048 if(Thread.currentThread() == super.getIoThread()) {
2049 throw UndertowMessages.MESSAGES.awaitCalledFromIoThread();
2050 }
2051 super.awaitWritable();
2052 }
2053
2054 @Override
2055 public void awaitWritable(long time, TimeUnit timeUnit) throws IOException {
2056 if(Thread.currentThread() == super.getIoThread()) {
2057 throw UndertowMessages.MESSAGES.awaitCalledFromIoThread();
2058 }
2059 super.awaitWritable(time, timeUnit);
2060 }
2061
2062 @Override
2063 public long transferFrom(FileChannel src, long position, long count) throws IOException {
2064 long l = super.transferFrom(src, position, count);
2065 if(l > 0) {
2066 responseBytesSent += l;
2067 }
2068 return l;
2069 }
2070
2071 @Override
2072 public long transferFrom(StreamSourceChannel source, long count, ByteBuffer throughBuffer) throws IOException {
2073 long l = super.transferFrom(source, count, throughBuffer);
2074 if(l > 0) {
2075 responseBytesSent += l;
2076 }
2077 return l;
2078 }
2079
2080 @Override
2081 public long write(ByteBuffer[] srcs, int offset, int length) throws IOException {
2082 long l = super.write(srcs, offset, length);
2083 responseBytesSent += l;
2084 return l;
2085 }
2086
2087 @Override
2088 public long write(ByteBuffer[] srcs) throws IOException {
2089 long l = super.write(srcs);
2090 responseBytesSent += l;
2091 return l;
2092 }
2093
2094 @Override
2095 public int writeFinal(ByteBuffer src) throws IOException {
2096 int l = super.writeFinal(src);
2097 responseBytesSent += l;
2098 return l;
2099 }
2100
2101 @Override
2102 public long writeFinal(ByteBuffer[] srcs, int offset, int length) throws IOException {
2103 long l = super.writeFinal(srcs, offset, length);
2104 responseBytesSent += l;
2105 return l;
2106 }
2107
2108 @Override
2109 public long writeFinal(ByteBuffer[] srcs) throws IOException {
2110 long l = super.writeFinal(srcs);
2111 responseBytesSent += l;
2112 return l;
2113 }
2114
2115 @Override
2116 public int write(ByteBuffer src) throws IOException {
2117 int l = super.write(src);
2118 responseBytesSent += l;
2119 return l;
2120 }
2121 }
2122
2123
2133 private final class ReadDispatchChannel extends DetachableStreamSourceChannel implements StreamSourceChannel {
2134
2135 private boolean wakeup = true;
2136 private boolean readsResumed = false;
2137
2138
2139 ReadDispatchChannel(final ConduitStreamSourceChannel delegate) {
2140 super(delegate);
2141 }
2142
2143 @Override
2144 protected boolean isFinished() {
2145 return allAreSet(state, FLAG_REQUEST_TERMINATED);
2146 }
2147
2148 @Override
2149 public void resumeReads() {
2150 readsResumed = true;
2151 if (isInCall()) {
2152 state |= FLAG_SHOULD_RESUME_READS;
2153 if(anyAreSet(state, FLAG_DISPATCHED)) {
2154 throw UndertowMessages.MESSAGES.resumedAndDispatched();
2155 }
2156 } else if (!isFinished()) {
2157 delegate.resumeReads();
2158 }
2159
2160 }
2161
2162 public void wakeupReads() {
2163 if (isInCall()) {
2164 wakeup = true;
2165 state |= FLAG_SHOULD_RESUME_READS;
2166 if(anyAreSet(state, FLAG_DISPATCHED)) {
2167 throw UndertowMessages.MESSAGES.resumedAndDispatched();
2168 }
2169 } else {
2170 if(isFinished()) {
2171 invokeListener();
2172 } else {
2173 delegate.wakeupReads();
2174 }
2175 }
2176 }
2177
2178 private void invokeListener() {
2179 if(readSetter != null) {
2180 super.getIoThread().execute(new Runnable() {
2181 @Override
2182 public void run() {
2183 ChannelListeners.invokeChannelListener(ReadDispatchChannel.this, readSetter.get());
2184 }
2185 });
2186 }
2187 }
2188
2189 public void requestDone() {
2190 if(delegate instanceof ConduitStreamSourceChannel) {
2191 ((ConduitStreamSourceChannel)delegate).setReadListener(null);
2192 ((ConduitStreamSourceChannel)delegate).setCloseListener(null);
2193 } else {
2194 delegate.getReadSetter().set(null);
2195 delegate.getCloseSetter().set(null);
2196 }
2197 }
2198
2199 @Override
2200 public long transferTo(long position, long count, FileChannel target) throws IOException {
2201 PooledByteBuffer[] buffered = getAttachment(BUFFERED_REQUEST_DATA);
2202 if (buffered == null) {
2203 return super.transferTo(position, count, target);
2204 }
2205 return target.transferFrom(this, position, count);
2206 }
2207
2208 @Override
2209 public void awaitReadable() throws IOException {
2210 if(Thread.currentThread() == super.getIoThread()) {
2211 throw UndertowMessages.MESSAGES.awaitCalledFromIoThread();
2212 }
2213 PooledByteBuffer[] buffered = getAttachment(BUFFERED_REQUEST_DATA);
2214 if (buffered == null) {
2215 super.awaitReadable();
2216 }
2217 }
2218
2219 @Override
2220 public void suspendReads() {
2221 readsResumed = false;
2222 state &= ~(FLAG_SHOULD_RESUME_READS);
2223 super.suspendReads();
2224 }
2225
2226 @Override
2227 public long transferTo(long count, ByteBuffer throughBuffer, StreamSinkChannel target) throws IOException {
2228 PooledByteBuffer[] buffered = getAttachment(BUFFERED_REQUEST_DATA);
2229 if (buffered == null) {
2230 return super.transferTo(count, throughBuffer, target);
2231 }
2232
2233 throughBuffer.position(0);
2234 throughBuffer.limit(0);
2235 long copied = 0;
2236 for (int i = 0; i < buffered.length; ++i) {
2237 PooledByteBuffer pooled = buffered[i];
2238 if (pooled != null) {
2239 final ByteBuffer buf = pooled.getBuffer();
2240 if (buf.hasRemaining()) {
2241 int res = target.write(buf);
2242
2243 if (!buf.hasRemaining()) {
2244 pooled.close();
2245 buffered[i] = null;
2246 }
2247 if (res == 0) {
2248 return copied;
2249 } else {
2250 copied += res;
2251 }
2252 } else {
2253 pooled.close();
2254 buffered[i] = null;
2255 }
2256 }
2257 }
2258 removeAttachment(BUFFERED_REQUEST_DATA);
2259 if (copied == 0) {
2260 return super.transferTo(count, throughBuffer, target);
2261 } else {
2262 return copied;
2263 }
2264 }
2265
2266 @Override
2267 public void awaitReadable(long time, TimeUnit timeUnit) throws IOException {
2268 if(Thread.currentThread() == super.getIoThread()) {
2269 throw UndertowMessages.MESSAGES.awaitCalledFromIoThread();
2270 }
2271 PooledByteBuffer[] buffered = getAttachment(BUFFERED_REQUEST_DATA);
2272 if (buffered == null) {
2273 super.awaitReadable(time, timeUnit);
2274 }
2275 }
2276
2277 @Override
2278 public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
2279 PooledByteBuffer[] buffered = getAttachment(BUFFERED_REQUEST_DATA);
2280 if (buffered == null) {
2281 return super.read(dsts, offset, length);
2282 }
2283 long copied = 0;
2284 for (int i = 0; i < buffered.length; ++i) {
2285 PooledByteBuffer pooled = buffered[i];
2286 if (pooled != null) {
2287 final ByteBuffer buf = pooled.getBuffer();
2288 if (buf.hasRemaining()) {
2289 copied += Buffers.copy(dsts, offset, length, buf);
2290 if (!buf.hasRemaining()) {
2291 pooled.close();
2292 buffered[i] = null;
2293 }
2294 if (!Buffers.hasRemaining(dsts, offset, length)) {
2295 return copied;
2296 }
2297 } else {
2298 pooled.close();
2299 buffered[i] = null;
2300 }
2301 }
2302 }
2303 removeAttachment(BUFFERED_REQUEST_DATA);
2304 if (copied == 0) {
2305 return super.read(dsts, offset, length);
2306 } else {
2307 return copied;
2308 }
2309 }
2310
2311 @Override
2312 public long read(ByteBuffer[] dsts) throws IOException {
2313 return read(dsts, 0, dsts.length);
2314 }
2315
2316 @Override
2317 public boolean isOpen() {
2318 PooledByteBuffer[] buffered = getAttachment(BUFFERED_REQUEST_DATA);
2319 if (buffered != null) {
2320 return true;
2321 }
2322 return super.isOpen();
2323 }
2324
2325 @Override
2326 public void close() throws IOException {
2327 PooledByteBuffer[] buffered = getAttachment(BUFFERED_REQUEST_DATA);
2328 if (buffered != null) {
2329 for (PooledByteBuffer pooled : buffered) {
2330 if (pooled != null) {
2331 pooled.close();
2332 }
2333 }
2334 }
2335 removeAttachment(BUFFERED_REQUEST_DATA);
2336 super.close();
2337 }
2338
2339 @Override
2340 public boolean isReadResumed() {
2341 PooledByteBuffer[] buffered = getAttachment(BUFFERED_REQUEST_DATA);
2342 if (buffered != null) {
2343 return readsResumed;
2344 }
2345 if(isFinished()) {
2346 return false;
2347 }
2348 return anyAreSet(state, FLAG_SHOULD_RESUME_READS) || super.isReadResumed();
2349 }
2350
2351 @Override
2352 public int read(ByteBuffer dst) throws IOException {
2353 PooledByteBuffer[] buffered = getAttachment(BUFFERED_REQUEST_DATA);
2354 if (buffered == null) {
2355 return super.read(dst);
2356 }
2357 int copied = 0;
2358 for (int i = 0; i < buffered.length; ++i) {
2359 PooledByteBuffer pooled = buffered[i];
2360 if (pooled != null) {
2361 final ByteBuffer buf = pooled.getBuffer();
2362 if (buf.hasRemaining()) {
2363 copied += Buffers.copy(dst, buf);
2364 if (!buf.hasRemaining()) {
2365 pooled.close();
2366 buffered[i] = null;
2367 }
2368 if (!dst.hasRemaining()) {
2369 return copied;
2370 }
2371 } else {
2372 pooled.close();
2373 buffered[i] = null;
2374 }
2375 }
2376 }
2377 removeAttachment(BUFFERED_REQUEST_DATA);
2378 if (copied == 0) {
2379 return super.read(dst);
2380 } else {
2381 return copied;
2382 }
2383 }
2384
2385 public void runResume() {
2386 if (isReadResumed()) {
2387 if(isFinished()) {
2388 invokeListener();
2389 } else {
2390 if (wakeup) {
2391 wakeup = false;
2392 state &= ~FLAG_SHOULD_RESUME_READS;
2393 delegate.wakeupReads();
2394 } else {
2395 state &= ~FLAG_SHOULD_RESUME_READS;
2396 delegate.resumeReads();
2397 }
2398 }
2399 } else if(wakeup) {
2400 wakeup = false;
2401 invokeListener();
2402 }
2403 }
2404 }
2405
2406 public static class WrapperStreamSinkConduitFactory implements ConduitFactory<StreamSinkConduit> {
2407
2408 private final HttpServerExchange exchange;
2409 private final ConduitWrapper<StreamSinkConduit>[] wrappers;
2410 private int position;
2411 private final StreamSinkConduit first;
2412
2413
2414 public WrapperStreamSinkConduitFactory(ConduitWrapper<StreamSinkConduit>[] wrappers, int wrapperCount, HttpServerExchange exchange, StreamSinkConduit first) {
2415 this.wrappers = wrappers;
2416 this.exchange = exchange;
2417 this.first = first;
2418 this.position = wrapperCount - 1;
2419 }
2420
2421 @Override
2422 public StreamSinkConduit create() {
2423 if (position == -1) {
2424 return exchange.getConnection().getSinkConduit(exchange, first);
2425 } else {
2426 return wrappers[position--].wrap(this, exchange);
2427 }
2428 }
2429 }
2430
2431 public static class WrapperConduitFactory<T extends Conduit> implements ConduitFactory<T> {
2432
2433 private final HttpServerExchange exchange;
2434 private final ConduitWrapper<T>[] wrappers;
2435 private int position;
2436 private T first;
2437
2438
2439 public WrapperConduitFactory(ConduitWrapper<T>[] wrappers, int wrapperCount, T first, HttpServerExchange exchange) {
2440 this.wrappers = wrappers;
2441 this.exchange = exchange;
2442 this.position = wrapperCount - 1;
2443 this.first = first;
2444 }
2445
2446 @Override
2447 public T create() {
2448 if (position == -1) {
2449 return first;
2450 } else {
2451 return wrappers[position--].wrap(this, exchange);
2452 }
2453 }
2454 }
2455
2456 @Override
2457 public String toString() {
2458 return "HttpServerExchange{ " + getRequestMethod().toString() + " " + getRequestURI() + '}';
2459 }
2460 }
2461