1 /*
2  * JBoss, Home of Professional Open Source.
3  * Copyright 2014 Red Hat, Inc., and individual contributors
4  * as indicated by the @author tags.
5  *
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  */

18
19 package io.undertow.server.protocol.http;
20
21 import io.undertow.UndertowLogger;
22 import io.undertow.UndertowMessages;
23 import io.undertow.UndertowOptions;
24 import io.undertow.conduits.ReadDataStreamSourceConduit;
25 import io.undertow.connector.PooledByteBuffer;
26 import io.undertow.protocols.http2.Http2Channel;
27 import io.undertow.server.ConnectorStatisticsImpl;
28 import io.undertow.server.Connectors;
29 import io.undertow.server.HttpServerExchange;
30 import io.undertow.server.protocol.ParseTimeoutUpdater;
31 import io.undertow.server.protocol.http2.Http2ReceiveListener;
32 import io.undertow.util.ClosingChannelExceptionHandler;
33 import io.undertow.util.ConnectionUtils;
34 import io.undertow.util.HeaderValues;
35 import io.undertow.util.Headers;
36 import io.undertow.util.HttpString;
37 import io.undertow.util.Methods;
38 import io.undertow.util.Protocols;
39 import io.undertow.util.StringWriteChannelListener;
40 import org.xnio.ChannelListener;
41 import org.xnio.ChannelListeners;
42 import org.xnio.IoUtils;
43 import org.xnio.StreamConnection;
44 import org.xnio.channels.StreamSinkChannel;
45 import org.xnio.channels.StreamSourceChannel;
46 import org.xnio.conduits.ConduitStreamSinkChannel;
47 import org.xnio.conduits.ConduitStreamSourceChannel;
48
49 import java.io.IOException;
50 import java.nio.ByteBuffer;
51 import java.util.concurrent.Executor;
52 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
53
54 /**
55  * Listener which reads requests and headers off of an HTTP stream.
56  *
57  * @author <a href="mailto:david.lloyd@redhat.com">David M. Lloyd</a>
58  */

59 final class HttpReadListener implements ChannelListener<ConduitStreamSourceChannel>, Runnable {
60
61     /**
62      * used for HTTP2 prior knowledge support
63      */

64     private static final HttpString PRI = new HttpString("PRI");
65     private static final byte[] PRI_EXPECTED = new byte[] {'S', 'M', '\r', '\n', '\r', '\n'};
66
67
68     private static final String BAD_REQUEST = "HTTP/1.1 400 Bad Request\r\nContent-Length: 0\r\nConnection: close\r\n\r\n";
69
70     private final HttpServerConnection connection;
71     private final ParseState state;
72     private final HttpRequestParser parser;
73
74     private HttpServerExchange httpServerExchange;
75
76     private int read = 0;
77     private final int maxRequestSize;
78     private final long maxEntitySize;
79     private final boolean recordRequestStartTime;
80     private final boolean allowUnknownProtocols;
81     private final boolean requireHostHeader;
82
83     //0 = new request ok, reads resumed
84     //1 = request running, new request not ok
85     //2 = suspending/resuming in progress
86     @SuppressWarnings("unused")
87     private volatile int requestState;
88     private static final AtomicIntegerFieldUpdater<HttpReadListener> requestStateUpdater = AtomicIntegerFieldUpdater.newUpdater(HttpReadListener.class"requestState");
89
90     private final ConnectorStatisticsImpl connectorStatistics;
91
92     private ParseTimeoutUpdater parseTimeoutUpdater;
93
94     HttpReadListener(final HttpServerConnection connection, final HttpRequestParser parser, ConnectorStatisticsImpl connectorStatistics) {
95         this.connection = connection;
96         this.parser = parser;
97         this.connectorStatistics = connectorStatistics;
98         this.maxRequestSize = connection.getUndertowOptions().get(UndertowOptions.MAX_HEADER_SIZE, UndertowOptions.DEFAULT_MAX_HEADER_SIZE);
99         this.maxEntitySize = connection.getUndertowOptions().get(UndertowOptions.MAX_ENTITY_SIZE, UndertowOptions.DEFAULT_MAX_ENTITY_SIZE);
100         this.recordRequestStartTime = connection.getUndertowOptions().get(UndertowOptions.RECORD_REQUEST_START_TIME, false);
101         this.requireHostHeader = connection.getUndertowOptions().get(UndertowOptions.REQUIRE_HOST_HTTP11, true);
102         this.allowUnknownProtocols = connection.getUndertowOptions().get(UndertowOptions.ALLOW_UNKNOWN_PROTOCOLS, false);
103         int requestParseTimeout = connection.getUndertowOptions().get(UndertowOptions.REQUEST_PARSE_TIMEOUT, -1);
104         int requestIdleTimeout = connection.getUndertowOptions().get(UndertowOptions.NO_REQUEST_TIMEOUT, -1);
105         if(requestIdleTimeout < 0 && requestParseTimeout < 0) {
106             this.parseTimeoutUpdater = null;
107         } else {
108             this.parseTimeoutUpdater = new ParseTimeoutUpdater(connection, requestParseTimeout, requestIdleTimeout);
109             connection.addCloseListener(parseTimeoutUpdater);
110         }
111         state = new ParseState(connection.getUndertowOptions().get(UndertowOptions.HTTP_HEADERS_CACHE_SIZE, UndertowOptions.DEFAULT_HTTP_HEADERS_CACHE_SIZE));
112     }
113
114     public void newRequest() {
115         state.reset();
116         read = 0;
117         if(parseTimeoutUpdater != null) {
118             parseTimeoutUpdater.connectionIdle();
119         }
120         connection.setCurrentExchange(null);
121     }
122
123     public void handleEvent(final ConduitStreamSourceChannel channel) {
124         while (requestStateUpdater.get(this) != 0) {
125             //if the CAS fails it is because another thread is in the process of changing state
126             //we just immediately retry
127             if (requestStateUpdater.compareAndSet(this, 1, 2)) {
128                 try {
129                     channel.suspendReads();
130                 } finally {
131                     requestStateUpdater.set(this, 1);
132                 }
133                 return;
134             }
135         }
136         handleEventWithNoRunningRequest(channel);
137     }
138
139     public void handleEventWithNoRunningRequest(final ConduitStreamSourceChannel channel) {
140         PooledByteBuffer existing = connection.getExtraBytes();
141         if ((existing == null && connection.getOriginalSourceConduit().isReadShutdown()) || connection.getOriginalSinkConduit().isWriteShutdown()) {
142             IoUtils.safeClose(connection);
143             channel.suspendReads();
144             return;
145         }
146
147         final PooledByteBuffer pooled = existing == null ? connection.getByteBufferPool().allocate() : existing;
148         final ByteBuffer buffer = pooled.getBuffer();
149         boolean free = true;
150
151         try {
152             int res;
153             boolean bytesRead = false;
154             do {
155                 if (existing == null) {
156                     buffer.clear();
157                     try {
158                         res = channel.read(buffer);
159                     } catch (IOException e) {
160                         UndertowLogger.REQUEST_IO_LOGGER.debug("Error reading request", e);
161                         IoUtils.safeClose(connection);
162                         return;
163                     }
164                 } else {
165                     res = buffer.remaining();
166                 }
167
168                 if (res <= 0) {
169                     if(bytesRead && parseTimeoutUpdater != null) {
170                         parseTimeoutUpdater.failedParse();
171                     }
172                     handleFailedRead(channel, res);
173                     return;
174                 } else {
175                     bytesRead = true;
176                 }
177                 if (existing != null) {
178                     existing = null;
179                     connection.setExtraBytes(null);
180                 } else {
181                     buffer.flip();
182                 }
183                 int begin = buffer.remaining();
184                 if(httpServerExchange == null) {
185                     httpServerExchange = new HttpServerExchange(connection, maxEntitySize);
186                 }
187                 parser.handle(buffer, state, httpServerExchange);
188                 if (buffer.hasRemaining()) {
189                     free = false;
190                     connection.setExtraBytes(pooled);
191                 }
192                 int total = read + (begin - buffer.remaining());
193                 read = total;
194                 if (read > maxRequestSize) {
195                     UndertowLogger.REQUEST_LOGGER.requestHeaderWasTooLarge(connection.getPeerAddress(), maxRequestSize);
196                     sendBadRequestAndClose(connection.getChannel(), null);
197                     return;
198                 }
199             } while (!state.isComplete());
200             if(parseTimeoutUpdater != null) {
201                 parseTimeoutUpdater.requestStarted();
202             }
203
204             final HttpServerExchange httpServerExchange = this.httpServerExchange;
205             httpServerExchange.setRequestScheme(connection.getSslSession() != null ? "https" : "http");
206             this.httpServerExchange = null;
207             requestStateUpdater.set(this, 1);
208
209             if (recordRequestStartTime) {
210                 Connectors.setRequestStartTime(httpServerExchange);
211             }
212
213             if(httpServerExchange.getProtocol() == Protocols.HTTP_2_0) {
214                 free = handleHttp2PriorKnowledge(pooled, httpServerExchange);
215                 return;
216             }
217
218             if(!allowUnknownProtocols) {
219                 HttpString protocol = httpServerExchange.getProtocol();
220                 if(protocol != Protocols.HTTP_1_1 && protocol != Protocols.HTTP_1_0 && protocol != Protocols.HTTP_0_9) {
221                     UndertowLogger.REQUEST_IO_LOGGER.debugf("Closing connection from %s due to unknown protocol %s", connection.getChannel().getPeerAddress(), protocol);
222                     sendBadRequestAndClose(connection.getChannel(), new IOException());
223                     return;
224                 }
225             }
226             HttpTransferEncoding.setupRequest(httpServerExchange);
227             connection.setCurrentExchange(httpServerExchange);
228             if(connectorStatistics != null) {
229                 connectorStatistics.setup(httpServerExchange);
230             }
231             if(connection.getSslSession() != null) {
232                 //TODO: figure out a better solution for this
233                 //in order to improve performance we do not generally suspend reads, instead we a CAS to detect when
234                 //data arrives while a request is running and suspend lazily, as suspend/resume is relatively expensive
235                 //however this approach does not work for SSL, as the underlying channel is not thread safe
236                 //so we just suspend every time (the overhead is likely much less than the general SSL overhead anyway)
237                 channel.suspendReads();
238             }
239
240             HeaderValues host = httpServerExchange.getRequestHeaders().get(Headers.HOST);
241             if(host != null && host.size() > 1) {
242                 sendBadRequestAndClose(connection.getChannel(), UndertowMessages.MESSAGES.moreThanOneHostHeader());
243                 return;
244             }
245             if(requireHostHeader && httpServerExchange.getProtocol().equals(Protocols.HTTP_1_1)) {
246                 if(host == null || host.size() ==0 || host.getFirst().isEmpty()) {
247                     sendBadRequestAndClose(connection.getChannel(), UndertowMessages.MESSAGES.noHostInHttp11Request());
248                     return;
249                 }
250             }
251             if(!Connectors.areRequestHeadersValid(httpServerExchange.getRequestHeaders())) {
252                 sendBadRequestAndClose(connection.getChannel(), UndertowMessages.MESSAGES.invalidHeaders());
253                 return;
254             }
255             Connectors.executeRootHandler(connection.getRootHandler(), httpServerExchange);
256         } catch (Throwable t) {
257             sendBadRequestAndClose(connection.getChannel(), t);
258             return;
259         } finally {
260             if (free) pooled.close();
261         }
262     }
263
264     private boolean handleHttp2PriorKnowledge(PooledByteBuffer pooled, HttpServerExchange httpServerExchange) throws IOException {
265         if(httpServerExchange.getRequestMethod().equals(PRI) && connection.getUndertowOptions().get(UndertowOptions.ENABLE_HTTP2, false)) {
266             handleHttp2PriorKnowledge(connection.getChannel(), connection, pooled);
267             return false;
268         } else {
269             sendBadRequestAndClose(connection.getChannel(), new IOException());
270             return true;
271         }
272     }
273
274     private void handleFailedRead(ConduitStreamSourceChannel channel, int res) {
275         if (res == 0) {
276             channel.setReadListener(this);
277             channel.resumeReads();
278         } else if (res == -1) {
279             IoUtils.safeClose(connection);
280         }
281     }
282
283     private void sendBadRequestAndClose(final StreamConnection connection, final Throwable exception) {
284         UndertowLogger.REQUEST_IO_LOGGER.failedToParseRequest(exception);
285         connection.getSourceChannel().suspendReads();
286         new StringWriteChannelListener(BAD_REQUEST) {
287             @Override
288             protected void writeDone(final StreamSinkChannel c) {
289                 super.writeDone(c);
290                 c.suspendWrites();
291                 IoUtils.safeClose(connection);
292             }
293
294             @Override
295             protected void handleError(StreamSinkChannel channel, IOException e) {
296                 IoUtils.safeClose(connection);
297             }
298         }.setup(connection.getSinkChannel());
299     }
300
301     public void exchangeComplete(final HttpServerExchange exchange) {
302         connection.clearChannel();
303         connection.setCurrentExchange(null);
304         final HttpServerConnection connection = this.connection;
305         if (exchange.isPersistent() && !isUpgradeOrConnect(exchange)) {
306             final StreamConnection channel = connection.getChannel();
307             if (connection.getExtraBytes() == null) {
308                 //if we are not pipelining we just register a listener
309                 //we have to resume from with the io thread
310                 if (exchange.isInIoThread()) {
311                     //no need for CAS, we are in the IO thread
312                     newRequest();
313                     channel.getSourceChannel().setReadListener(HttpReadListener.this);
314                     channel.getSourceChannel().resumeReads();
315                     requestStateUpdater.set(this, 0);
316                 } else {
317                     while (true) {
318                         if (connection.getOriginalSourceConduit().isReadShutdown() || connection.getOriginalSinkConduit().isWriteShutdown()) {
319                             channel.getSourceChannel().suspendReads();
320                             channel.getSinkChannel().suspendWrites();
321                             IoUtils.safeClose(connection);
322                             return;
323                         } else {
324                             if (requestStateUpdater.compareAndSet(this, 1, 2)) {
325                                 try {
326                                     newRequest();
327                                     channel.getSourceChannel().setReadListener(HttpReadListener.this);
328                                     channel.getSourceChannel().resumeReads();
329                                 } finally {
330                                     requestStateUpdater.set(this, 0);
331                                 }
332                                 break;
333                             }
334                         }
335                     }
336                 }
337             } else {
338                 if (exchange.isInIoThread()) {
339                     requestStateUpdater.set(this, 0); //no need to CAS, as we don't actually resume
340                     newRequest();
341                     //no need to suspend reads here, the task will always run before the read listener anyway
342                     channel.getIoThread().execute(this);
343                 } else {
344                     while (true) {
345                         if (connection.getOriginalSinkConduit().isWriteShutdown()) {
346                             channel.getSourceChannel().suspendReads();
347                             channel.getSinkChannel().suspendWrites();
348                             IoUtils.safeClose(connection);
349                             return;
350                         } else if (requestStateUpdater.compareAndSet(this, 1, 2)) {
351                             try {
352                                 newRequest();
353                                 channel.getSourceChannel().suspendReads();
354                             } finally {
355                                 requestStateUpdater.set(this, 0);
356                             }
357                             break;
358                         }
359                     }
360                     Executor executor = exchange.getDispatchExecutor();
361                     if (executor == null) {
362                         executor = exchange.getConnection().getWorker();
363                     }
364                     executor.execute(this);
365                 }
366             }
367         } else if (!exchange.isPersistent()) {
368             if (connection.getExtraBytes() != null) {
369                 connection.getExtraBytes().close();
370                 connection.setExtraBytes(null);
371             }
372             ConnectionUtils.cleanClose(connection.getChannel(), connection);
373         } else {
374             //upgrade or connect handling
375             if (connection.getExtraBytes() != null) {
376                 connection.getChannel().getSourceChannel().setConduit(new ReadDataStreamSourceConduit(connection.getChannel().getSourceChannel().getConduit(), connection));
377             }
378             try {
379                 if (!connection.getChannel().getSinkChannel().flush()) {
380                     connection.getChannel().getSinkChannel().setWriteListener(ChannelListeners.flushingChannelListener(new ChannelListener<ConduitStreamSinkChannel>() {
381                         @Override
382                         public void handleEvent(ConduitStreamSinkChannel conduitStreamSinkChannel) {
383                             connection.getUpgradeListener().handleUpgrade(connection.getChannel(), exchange);
384                         }
385                     }, new ClosingChannelExceptionHandler<ConduitStreamSinkChannel>(connection)));
386                     connection.getChannel().getSinkChannel().resumeWrites();
387                     return;
388                 }
389                 connection.getUpgradeListener().handleUpgrade(connection.getChannel(), exchange);
390             } catch (IOException e) {
391                 UndertowLogger.REQUEST_IO_LOGGER.ioException(e);
392                 IoUtils.safeClose(connection);
393             } catch (Throwable t) {
394                 UndertowLogger.REQUEST_IO_LOGGER.handleUnexpectedFailure(t);
395                 IoUtils.safeClose(connection);
396             }
397         }
398     }
399
400     private boolean isUpgradeOrConnect(HttpServerExchange exchange) {
401         return exchange.isUpgrade() || (exchange.getRequestMethod().equals(Methods.CONNECT) && ((HttpServerConnection)exchange.getConnection()).isConnectHandled() );
402     }
403
404     @Override
405     public void run() {
406         handleEvent(connection.getChannel().getSourceChannel());
407     }
408
409
410     private void handleHttp2PriorKnowledge(final StreamConnection connection, final HttpServerConnection serverConnection, PooledByteBuffer readData) throws IOException {
411
412         final ConduitStreamSourceChannel request = connection.getSourceChannel();
413
414         byte[] data = new byte[PRI_EXPECTED.length];
415         final ByteBuffer buffer = ByteBuffer.wrap(data);
416         if(readData.getBuffer().hasRemaining()) {
417             while (readData.getBuffer().hasRemaining() && buffer.hasRemaining()) {
418                 buffer.put(readData.getBuffer().get());
419             }
420         }
421         final PooledByteBuffer extraData;
422         if(readData.getBuffer().hasRemaining()) {
423             extraData = readData;
424         } else {
425             readData.close();
426             extraData = null;
427         }
428         if(!doHttp2PriRead(connection, buffer, serverConnection, extraData)) {
429             request.getReadSetter().set(new ChannelListener<StreamSourceChannel>() {
430                 @Override
431                 public void handleEvent(StreamSourceChannel channel) {
432                     try {
433                         doHttp2PriRead(connection, buffer, serverConnection, extraData);
434                     } catch (IOException e) {
435                         UndertowLogger.REQUEST_IO_LOGGER.ioException(e);
436                         IoUtils.safeClose(connection);
437                     } catch (Throwable t) {
438                         UndertowLogger.REQUEST_IO_LOGGER.handleUnexpectedFailure(t);
439                         IoUtils.safeClose(connection);
440                     }
441                 }
442             });
443             request.resumeReads();
444         }
445     }
446
447     private boolean doHttp2PriRead(StreamConnection connection, ByteBuffer buffer, HttpServerConnection serverConnection, PooledByteBuffer extraData) throws IOException {
448         if(buffer.hasRemaining()) {
449             int res = connection.getSourceChannel().read(buffer);
450             if (res == -1) {
451                 return true//fail
452             }
453             if (buffer.hasRemaining()) {
454                 return false;
455             }
456         }
457         buffer.flip();
458         for(int i = 0; i < PRI_EXPECTED.length; ++i) {
459             if(buffer.get() != PRI_EXPECTED[i]) {
460                 throw UndertowMessages.MESSAGES.http2PriRequestFailed();
461             }
462         }
463
464         Http2Channel channel = new Http2Channel(connection, null, serverConnection.getByteBufferPool(), extraData, falsefalsefalse, serverConnection.getUndertowOptions());
465         Http2ReceiveListener receiveListener = new Http2ReceiveListener(serverConnection.getRootHandler(), serverConnection.getUndertowOptions(), serverConnection.getBufferSize(), null);
466         channel.getReceiveSetter().set(receiveListener);
467         channel.resumeReceives();
468         return true;
469     }
470 }
471