1 /*
2  * JBoss, Home of Professional Open Source.
3  * Copyright 2014 Red Hat, Inc., and individual contributors
4  * as indicated by the @author tags.
5  *
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  */

18
19 package io.undertow.server.protocol.http;
20
21 import java.io.IOException;
22 import java.nio.channels.Channel;
23 import java.util.Collections;
24 import java.util.HashSet;
25 import java.util.List;
26 import java.util.Set;
27 import java.util.concurrent.TimeUnit;
28
29 import org.xnio.ChannelExceptionHandler;
30 import org.xnio.ChannelListener;
31 import org.xnio.ChannelListeners;
32 import org.xnio.channels.StreamSinkChannel;
33 import io.undertow.UndertowMessages;
34 import io.undertow.io.IoCallback;
35 import io.undertow.server.HttpHandler;
36 import io.undertow.server.HttpServerExchange;
37 import io.undertow.util.AttachmentKey;
38 import io.undertow.util.HeaderMap;
39 import io.undertow.util.Headers;
40 import io.undertow.util.HttpString;
41 import io.undertow.util.Protocols;
42 import io.undertow.util.StatusCodes;
43
44 /**
45  * Class that provides support for dealing with HTTP 100 (Continue) responses.
46  * <p>
47  * Note that if a client is pipelining some requests and sending continue for others this
48  * could cause problems if the pipelining buffer is enabled.
49  *
50  * @author Stuart Douglas
51  */

52 public class HttpContinue {
53
54     private static final Set<HttpString> COMPATIBLE_PROTOCOLS;
55
56     static {
57         Set<HttpString> compat = new HashSet<>();
58         compat.add(Protocols.HTTP_1_1);
59         compat.add(Protocols.HTTP_2_0);
60         COMPATIBLE_PROTOCOLS = Collections.unmodifiableSet(compat);
61     }
62
63     public static final String CONTINUE = "100-continue";
64
65     private static final AttachmentKey<Boolean> ALREADY_SENT = AttachmentKey.create(Boolean.class);
66
67     /**
68      * Returns true if this exchange requires the server to send a 100 (Continue) response.
69      *
70      * @param exchange The exchange
71      * @return <code>true</code> if the server needs to send a continue response
72      */

73     public static boolean requiresContinueResponse(final HttpServerExchange exchange) {
74         if (!COMPATIBLE_PROTOCOLS.contains(exchange.getProtocol()) || exchange.isResponseStarted() || !exchange.getConnection().isContinueResponseSupported() || exchange.getAttachment(ALREADY_SENT) != null) {
75             return false;
76         }
77
78         HeaderMap requestHeaders = exchange.getRequestHeaders();
79         return requiresContinueResponse(requestHeaders);
80     }
81
82     public static boolean requiresContinueResponse(HeaderMap requestHeaders) {
83         List<String> expect = requestHeaders.get(Headers.EXPECT);
84         if (expect != null) {
85             for (String header : expect) {
86                 if (header.equalsIgnoreCase(CONTINUE)) {
87                     return true;
88                 }
89             }
90         }
91         return false;
92     }
93
94     public static boolean isContinueResponseSent(HttpServerExchange exchange) {
95         return exchange.getAttachment(ALREADY_SENT) != null;
96     }
97
98     /**
99      * Sends a continuation using async IO, and calls back when it is complete.
100      *
101      * @param exchange The exchange
102      * @param callback The completion callback
103      */

104     public static void sendContinueResponse(final HttpServerExchange exchange, final IoCallback callback) {
105         if (!exchange.isResponseChannelAvailable()) {
106             callback.onException(exchange, null, UndertowMessages.MESSAGES.cannotSendContinueResponse());
107             return;
108         }
109         internalSendContinueResponse(exchange, callback);
110     }
111
112     /**
113      * Creates a response sender that can be used to send a HTTP 100-continue response.
114      *
115      * @param exchange The exchange
116      * @return The response sender
117      */

118     public static ContinueResponseSender createResponseSender(final HttpServerExchange exchange) throws IOException {
119         if (!exchange.isResponseChannelAvailable()) {
120             throw UndertowMessages.MESSAGES.cannotSendContinueResponse();
121         }
122         if(exchange.getAttachment(ALREADY_SENT) != null) {
123
124             return new ContinueResponseSender() {
125                 @Override
126                 public boolean send() throws IOException {
127                     return true;
128                 }
129
130                 @Override
131                 public void awaitWritable() throws IOException {
132
133                 }
134
135                 @Override
136                 public void awaitWritable(long time, TimeUnit timeUnit) throws IOException {
137
138                 }
139             };
140         }
141
142         HttpServerExchange newExchange = exchange.getConnection().sendOutOfBandResponse(exchange);
143         exchange.putAttachment(ALREADY_SENT, true);
144         newExchange.setStatusCode(StatusCodes.CONTINUE);
145         newExchange.getResponseHeaders().put(Headers.CONTENT_LENGTH, 0);
146         final StreamSinkChannel responseChannel = newExchange.getResponseChannel();
147         return new ContinueResponseSender() {
148             boolean shutdown = false;
149
150             @Override
151             public boolean send() throws IOException {
152                 if (!shutdown) {
153                     shutdown = true;
154                     responseChannel.shutdownWrites();
155                 }
156                 return responseChannel.flush();
157             }
158
159             @Override
160             public void awaitWritable() throws IOException {
161                 responseChannel.awaitWritable();
162             }
163
164             @Override
165             public void awaitWritable(final long time, final TimeUnit timeUnit) throws IOException {
166                 responseChannel.awaitWritable(time, timeUnit);
167             }
168         };
169     }
170
171     /**
172      * Marks a continue response as already having been sent. In general this should only be used
173      * by low level handlers than need fine grained control over the continue response.
174      *
175      * @param exchange The exchange
176      */

177     public static void markContinueResponseSent(HttpServerExchange exchange) {
178         exchange.putAttachment(ALREADY_SENT, true);
179     }
180
181     /**
182      * Sends a continue response using blocking IO
183      *
184      * @param exchange The exchange
185      */

186     public static void sendContinueResponseBlocking(final HttpServerExchange exchange) throws IOException {
187         if (!exchange.isResponseChannelAvailable()) {
188             throw UndertowMessages.MESSAGES.cannotSendContinueResponse();
189         }
190         if(exchange.getAttachment(ALREADY_SENT) != null) {
191             return;
192         }
193         HttpServerExchange newExchange = exchange.getConnection().sendOutOfBandResponse(exchange);
194         exchange.putAttachment(ALREADY_SENT, true);
195         newExchange.setStatusCode(StatusCodes.CONTINUE);
196         newExchange.getResponseHeaders().put(Headers.CONTENT_LENGTH, 0);
197         newExchange.startBlocking();
198         newExchange.getOutputStream().close();
199         newExchange.getInputStream().close();
200     }
201
202     /**
203      * Sets a 417 response code and ends the exchange.
204      *
205      * @param exchange The exchange to reject
206      */

207     public static void rejectExchange(final HttpServerExchange exchange) {
208         exchange.setStatusCode(StatusCodes.EXPECTATION_FAILED);
209         exchange.setPersistent(false);
210         exchange.endExchange();
211     }
212
213
214     private static void internalSendContinueResponse(final HttpServerExchange exchange, final IoCallback callback) {
215         if(exchange.getAttachment(ALREADY_SENT) != null) {
216             callback.onComplete(exchange, null);
217             return;
218         }
219         HttpServerExchange newExchange = exchange.getConnection().sendOutOfBandResponse(exchange);
220         exchange.putAttachment(ALREADY_SENT, true);
221         newExchange.setStatusCode(StatusCodes.CONTINUE);
222         newExchange.getResponseHeaders().put(Headers.CONTENT_LENGTH, 0);
223         final StreamSinkChannel responseChannel = newExchange.getResponseChannel();
224         try {
225             responseChannel.shutdownWrites();
226             if (!responseChannel.flush()) {
227                 responseChannel.getWriteSetter().set(ChannelListeners.flushingChannelListener(
228                         new ChannelListener<StreamSinkChannel>() {
229                             @Override
230                             public void handleEvent(StreamSinkChannel channel) {
231                                 channel.suspendWrites();
232                                 exchange.dispatch(new HttpHandler() {
233                                     @Override
234                                     public void handleRequest(HttpServerExchange exchange) throws Exception {
235                                         callback.onComplete(exchange, null);
236                                     }
237                                 });
238                             }
239                         }, new ChannelExceptionHandler<Channel>() {
240                             @Override
241                             public void handleException(Channel channel, final IOException e) {
242                                 exchange.dispatch(new HttpHandler() {
243                                     @Override
244                                     public void handleRequest(HttpServerExchange exchange) throws Exception {
245                                         callback.onException(exchange, null, e);
246                                     }
247                                 });
248                             }
249                         }));
250                 responseChannel.resumeWrites();
251                 exchange.dispatch();
252             } else {
253                 callback.onComplete(exchange, null);
254             }
255         } catch (IOException e) {
256             callback.onException(exchange, null, e);
257         }
258     }
259
260     /**
261      * A continue response that is in the process of being sent.
262      */

263     public interface ContinueResponseSender {
264
265         /**
266          * Continue sending the response.
267          *
268          * @return true if the response is fully sent, false otherwise.
269          */

270         boolean send() throws IOException;
271
272         void awaitWritable() throws IOException;
273
274         void awaitWritable(long time, final TimeUnit timeUnit) throws IOException;
275
276     }
277
278 }
279