1 /*
2  * JBoss, Home of Professional Open Source.
3  * Copyright 2014 Red Hat, Inc., and individual contributors
4  * as indicated by the @author tags.
5  *
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  */

18 package io.undertow.websockets.jsr;
19
20 import io.undertow.protocols.ssl.UndertowXnioSsl;
21 import io.undertow.server.HttpServerExchange;
22 import io.undertow.server.HttpUpgradeListener;
23 import io.undertow.servlet.api.ClassIntrospecter;
24 import io.undertow.servlet.api.InstanceFactory;
25 import io.undertow.servlet.api.InstanceHandle;
26 import io.undertow.servlet.api.ThreadSetupHandler;
27 import io.undertow.servlet.spec.ServletContextImpl;
28 import io.undertow.servlet.util.ConstructorInstanceFactory;
29 import io.undertow.servlet.util.ImmediateInstanceHandle;
30 import io.undertow.servlet.websockets.ServletWebSocketHttpExchange;
31 import io.undertow.util.CopyOnWriteMap;
32 import io.undertow.util.PathTemplate;
33 import io.undertow.util.StatusCodes;
34 import io.undertow.websockets.WebSocketExtension;
35 import io.undertow.websockets.client.WebSocketClient;
36 import io.undertow.websockets.client.WebSocketClientNegotiation;
37 import io.undertow.websockets.core.WebSocketChannel;
38 import io.undertow.websockets.core.protocol.Handshake;
39 import io.undertow.websockets.extensions.ExtensionHandshake;
40 import io.undertow.websockets.jsr.annotated.AnnotatedEndpointFactory;
41 import io.undertow.websockets.jsr.handshake.HandshakeUtil;
42 import io.undertow.websockets.jsr.handshake.JsrHybi07Handshake;
43 import io.undertow.websockets.jsr.handshake.JsrHybi08Handshake;
44 import io.undertow.websockets.jsr.handshake.JsrHybi13Handshake;
45 import org.xnio.IoFuture;
46 import org.xnio.IoUtils;
47 import io.undertow.connector.ByteBufferPool;
48
49 import org.xnio.OptionMap;
50 import org.xnio.StreamConnection;
51 import org.xnio.XnioWorker;
52 import org.xnio.http.UpgradeFailedException;
53 import org.xnio.ssl.XnioSsl;
54
55 import javax.net.ssl.SSLContext;
56 import javax.servlet.DispatcherType;
57 import javax.servlet.ServletException;
58 import javax.servlet.http.HttpServletRequest;
59 import javax.servlet.http.HttpServletResponse;
60 import javax.websocket.ClientEndpoint;
61 import javax.websocket.ClientEndpointConfig;
62 import javax.websocket.CloseReason;
63 import javax.websocket.DeploymentException;
64 import javax.websocket.Endpoint;
65 import javax.websocket.Extension;
66 import javax.websocket.HandshakeResponse;
67 import javax.websocket.Session;
68 import javax.websocket.server.ServerContainer;
69 import javax.websocket.server.ServerEndpoint;
70 import javax.websocket.server.ServerEndpointConfig;
71 import java.io.Closeable;
72 import java.io.IOException;
73 import java.net.InetSocketAddress;
74 import java.net.URI;
75 import java.nio.channels.ClosedChannelException;
76 import java.security.NoSuchAlgorithmException;
77 import java.util.ArrayList;
78 import java.util.Arrays;
79 import java.util.Collections;
80 import java.util.HashMap;
81 import java.util.HashSet;
82 import java.util.List;
83 import java.util.Map;
84 import java.util.ServiceLoader;
85 import java.util.Set;
86 import java.util.TreeMap;
87 import java.util.TreeSet;
88 import java.util.concurrent.Executor;
89 import java.util.concurrent.TimeUnit;
90 import java.util.function.Supplier;
91
92 import static java.lang.System.*;
93
94
95 /**
96  * {@link ServerContainer} implementation which allows to deploy endpoints for a server.
97  *
98  * @author <a href="mailto:nmaurer@redhat.com">Norman Maurer</a>
99  */

100 public class ServerWebSocketContainer implements ServerContainer, Closeable {
101
102     public static final String TIMEOUT = "io.undertow.websocket.CONNECT_TIMEOUT";
103     public static final int DEFAULT_WEB_SOCKET_TIMEOUT_SECONDS = 10;
104
105     private final ClassIntrospecter classIntrospecter;
106
107     private final Map<Class<?>, ConfiguredClientEndpoint> clientEndpoints = new CopyOnWriteMap<>();
108
109     private final List<ConfiguredServerEndpoint> configuredServerEndpoints = new ArrayList<>();
110     private final Set<Class<?>> annotatedEndpointClasses = new HashSet<>();
111
112     /**
113      * set of all deployed server endpoint paths. Due to the comparison function we can detect
114      * overlaps
115      */

116     private final TreeSet<PathTemplate> seenPaths = new TreeSet<>();
117
118     private final Supplier<XnioWorker> xnioWorker;
119     private final ByteBufferPool bufferPool;
120     private final boolean dispatchToWorker;
121     private final InetSocketAddress clientBindAddress;
122     private final WebSocketReconnectHandler webSocketReconnectHandler;
123
124     private volatile long defaultAsyncSendTimeout;
125     private volatile long defaultMaxSessionIdleTimeout;
126     private volatile int defaultMaxBinaryMessageBufferSize;
127     private volatile int defaultMaxTextMessageBufferSize;
128     private volatile boolean deploymentComplete = false;
129     private final List<DeploymentException> deploymentExceptions = new ArrayList<>();
130
131     private ServletContextImpl contextToAddFilter = null;
132
133     private final List<WebsocketClientSslProvider> clientSslProviders;
134     private final List<PauseListener> pauseListeners = new ArrayList<>();
135     private final List<Extension> installedExtensions;
136
137     private final ThreadSetupHandler.Action<Void, Runnable> invokeEndpointTask;
138
139     private volatile boolean closed = false;
140
141     public ServerWebSocketContainer(final ClassIntrospecter classIntrospecter, final Supplier<XnioWorker> xnioWorker, ByteBufferPool bufferPool, List<ThreadSetupHandler> threadSetupHandlers, boolean dispatchToWorker, boolean clientMode) {
142         this(classIntrospecter, ServerWebSocketContainer.class.getClassLoader(), xnioWorker, bufferPool, threadSetupHandlers, dispatchToWorker, nullnull);
143     }
144
145     public ServerWebSocketContainer(final ClassIntrospecter classIntrospecter, final ClassLoader classLoader, Supplier<XnioWorker> xnioWorker, ByteBufferPool bufferPool, List<ThreadSetupHandler> threadSetupHandlers, boolean dispatchToWorker) {
146         this(classIntrospecter, classLoader, xnioWorker, bufferPool, threadSetupHandlers, dispatchToWorker, nullnull);
147     }
148
149     public ServerWebSocketContainer(final ClassIntrospecter classIntrospecter, final ClassLoader classLoader, Supplier<XnioWorker> xnioWorker, ByteBufferPool bufferPool, List<ThreadSetupHandler> threadSetupHandlers, boolean dispatchToWorker, InetSocketAddress clientBindAddress, WebSocketReconnectHandler reconnectHandler) {
150         this(classIntrospecter, classLoader, xnioWorker, bufferPool, threadSetupHandlers, dispatchToWorker, clientBindAddress, reconnectHandler, Collections.emptyList());
151     }
152
153     public ServerWebSocketContainer(final ClassIntrospecter classIntrospecter, final ClassLoader classLoader, Supplier<XnioWorker> xnioWorker, ByteBufferPool bufferPool, List<ThreadSetupHandler> threadSetupHandlers, boolean dispatchToWorker, InetSocketAddress clientBindAddress, WebSocketReconnectHandler reconnectHandler, List<Extension> installedExtensions) {
154         this.classIntrospecter = classIntrospecter;
155         this.bufferPool = bufferPool;
156         this.xnioWorker = xnioWorker;
157         this.dispatchToWorker = dispatchToWorker;
158         this.clientBindAddress = clientBindAddress;
159         this.installedExtensions = new ArrayList<>(installedExtensions);
160         List<WebsocketClientSslProvider> clientSslProviders = new ArrayList<>();
161         for (WebsocketClientSslProvider provider : ServiceLoader.load(WebsocketClientSslProvider.class, classLoader)) {
162             clientSslProviders.add(provider);
163         }
164
165         this.clientSslProviders = Collections.unmodifiableList(clientSslProviders);
166         this.webSocketReconnectHandler = reconnectHandler;
167         ThreadSetupHandler.Action<Void, Runnable> task = new ThreadSetupHandler.Action<Void, Runnable>() {
168             @Override
169             public Void call(HttpServerExchange exchange, Runnable context) throws Exception {
170                 context.run();
171                 return null;
172             }
173         };
174         for(ThreadSetupHandler handler : threadSetupHandlers) {
175             task = handler.create(task);
176         }
177         this.invokeEndpointTask = task;
178     }
179
180     @Override
181     public long getDefaultAsyncSendTimeout() {
182         return defaultAsyncSendTimeout;
183     }
184
185     @Override
186     public void setAsyncSendTimeout(long defaultAsyncSendTimeout) {
187         this.defaultAsyncSendTimeout = defaultAsyncSendTimeout;
188     }
189
190     public Session connectToServer(final Object annotatedEndpointInstance, WebSocketClient.ConnectionBuilder connectionBuilder) throws DeploymentException, IOException {
191         if(closed) {
192             throw new ClosedChannelException();
193         }
194         ConfiguredClientEndpoint config = getClientEndpoint(annotatedEndpointInstance.getClass(), false);
195         if (config == null) {
196             throw JsrWebSocketMessages.MESSAGES.notAValidClientEndpointType(annotatedEndpointInstance.getClass());
197         }
198         Endpoint instance = config.getFactory().createInstance(new ImmediateInstanceHandle<>(annotatedEndpointInstance));
199         return connectToServerInternal(instance, config, connectionBuilder);
200     }
201
202     @Override
203     public Session connectToServer(final Object annotatedEndpointInstance, final URI path) throws DeploymentException, IOException {
204         if(closed) {
205             throw new ClosedChannelException();
206         }
207         ConfiguredClientEndpoint config = getClientEndpoint(annotatedEndpointInstance.getClass(), false);
208         if (config == null) {
209             throw JsrWebSocketMessages.MESSAGES.notAValidClientEndpointType(annotatedEndpointInstance.getClass());
210         }
211         Endpoint instance = config.getFactory().createInstance(new ImmediateInstanceHandle<>(annotatedEndpointInstance));
212         XnioSsl ssl = null;
213         for (WebsocketClientSslProvider provider : clientSslProviders) {
214             ssl = provider.getSsl(xnioWorker.get(), annotatedEndpointInstance, path);
215             if (ssl != null) {
216                 break;
217             }
218         }
219         if(ssl == null) {
220             try {
221                 ssl = new UndertowXnioSsl(xnioWorker.get().getXnio(), OptionMap.EMPTY, SSLContext.getDefault());
222             } catch (NoSuchAlgorithmException e) {
223                 //ignore
224             }
225         }
226         return connectToServerInternal(instance, ssl, config, path);
227     }
228
229     public Session connectToServer(Class<?> aClass, WebSocketClient.ConnectionBuilder connectionBuilder) throws DeploymentException, IOException {
230         if(closed) {
231             throw new ClosedChannelException();
232         }
233         ConfiguredClientEndpoint config = getClientEndpoint(aClass, true);
234         if (config == null) {
235             throw JsrWebSocketMessages.MESSAGES.notAValidClientEndpointType(aClass);
236         }
237         try {
238             AnnotatedEndpointFactory factory = config.getFactory();
239             InstanceHandle<?> instance = config.getInstanceFactory().createInstance();
240             return connectToServerInternal(factory.createInstance(instance), config, connectionBuilder);
241         } catch (InstantiationException e) {
242             throw new RuntimeException(e);
243         }
244     }
245
246     @Override
247     public Session connectToServer(Class<?> aClass, URI uri) throws DeploymentException, IOException {
248         if(closed) {
249             throw new ClosedChannelException();
250         }
251         ConfiguredClientEndpoint config = getClientEndpoint(aClass, true);
252         if (config == null) {
253             throw JsrWebSocketMessages.MESSAGES.notAValidClientEndpointType(aClass);
254         }
255         try {
256             AnnotatedEndpointFactory factory = config.getFactory();
257
258
259             InstanceHandle<?> instance = config.getInstanceFactory().createInstance();
260             XnioSsl ssl = null;
261             for (WebsocketClientSslProvider provider : clientSslProviders) {
262                 ssl = provider.getSsl(xnioWorker.get(), aClass, uri);
263                 if (ssl != null) {
264                     break;
265                 }
266             }
267             if(ssl == null) {
268                 try {
269                     ssl = new UndertowXnioSsl(xnioWorker.get().getXnio(), OptionMap.EMPTY, SSLContext.getDefault());
270                 } catch (NoSuchAlgorithmException e) {
271                     //ignore
272                 }
273             }
274             return connectToServerInternal(factory.createInstance(instance), ssl, config, uri);
275         } catch (InstantiationException e) {
276             throw new RuntimeException(e);
277         }
278     }
279
280     @Override
281     public Session connectToServer(final Endpoint endpointInstance, final ClientEndpointConfig config, final URI path) throws DeploymentException, IOException {
282         if(closed) {
283             throw new ClosedChannelException();
284         }
285         ClientEndpointConfig cec = config != null ? config : ClientEndpointConfig.Builder.create().build();
286         XnioSsl ssl = null;
287         for (WebsocketClientSslProvider provider : clientSslProviders) {
288             ssl = provider.getSsl(xnioWorker.get(), endpointInstance, cec, path);
289             if (ssl != null) {
290                 break;
291             }
292         }
293         if(ssl == null) {
294             try {
295                 ssl = new UndertowXnioSsl(xnioWorker.get().getXnio(), OptionMap.EMPTY, SSLContext.getDefault());
296             } catch (NoSuchAlgorithmException e) {
297                 //ignore
298             }
299         }
300         //in theory we should not be able to connect until the deployment is complete, but the definition of when a deployment is complete is a bit nebulous.
301         WebSocketClientNegotiation clientNegotiation = new ClientNegotiation(cec.getPreferredSubprotocols(), toExtensionList(cec.getExtensions()), cec);
302
303
304         WebSocketClient.ConnectionBuilder connectionBuilder = WebSocketClient.connectionBuilder(xnioWorker.get(), bufferPool, path)
305                 .setSsl(ssl)
306                 .setBindAddress(clientBindAddress)
307                 .setClientNegotiation(clientNegotiation);
308
309         return connectToServer(endpointInstance, config, connectionBuilder);
310     }
311
312     public Session connectToServer(final Endpoint endpointInstance, final ClientEndpointConfig config, WebSocketClient.ConnectionBuilder connectionBuilder) throws DeploymentException, IOException {
313         if(closed) {
314             throw new ClosedChannelException();
315         }
316         ClientEndpointConfig cec = config != null ? config : ClientEndpointConfig.Builder.create().build();
317
318         WebSocketClientNegotiation clientNegotiation = connectionBuilder.getClientNegotiation();
319
320         IoFuture<WebSocketChannel> session = connectionBuilder
321                 .connect();
322         Number timeout = (Number) cec.getUserProperties().get(TIMEOUT);
323         if(session.await(timeout == null ? DEFAULT_WEB_SOCKET_TIMEOUT_SECONDS: timeout.intValue(), TimeUnit.SECONDS) == IoFuture.Status.WAITING) {
324             //add a notifier to close the channel if the connection actually completes
325             session.cancel();
326             session.addNotifier(new IoFuture.HandlingNotifier<WebSocketChannel, Object>() {
327                 @Override
328                 public void handleDone(WebSocketChannel data, Object attachment) {
329                     IoUtils.safeClose(data);
330                 }
331             }, null);
332             throw JsrWebSocketMessages.MESSAGES.connectionTimedOut();
333         }
334         WebSocketChannel channel;
335         try {
336             channel = session.get();
337         } catch (UpgradeFailedException e) {
338             throw new DeploymentException(e.getMessage(), e);
339         }
340         EndpointSessionHandler sessionHandler = new EndpointSessionHandler(this);
341
342         final List<Extension> extensions = new ArrayList<>();
343         final Map<String, Extension> extMap = new HashMap<>();
344         for (Extension ext : cec.getExtensions()) {
345             extMap.put(ext.getName(), ext);
346         }
347         for (WebSocketExtension e : clientNegotiation.getSelectedExtensions()) {
348             Extension ext = extMap.get(e.getName());
349             if (ext == null) {
350                 throw JsrWebSocketMessages.MESSAGES.extensionWasNotPresentInClientHandshake(e.getName(), clientNegotiation.getSupportedExtensions());
351             }
352             extensions.add(ExtensionImpl.create(e));
353         }
354         ConfiguredClientEndpoint configured = clientEndpoints.get(endpointInstance.getClass());
355         Endpoint instance = endpointInstance;
356         if(configured == null) {
357             synchronized (clientEndpoints) {
358                 // make sure to create an instance of AnnotatedEndpoint if we have the annotation
359                 configured = getClientEndpoint(endpointInstance.getClass(), false);
360                 if(configured == null) {
361                     // if we don't, add an endpoint anyway to the list of clientEndpoints
362                     clientEndpoints.put(endpointInstance.getClass(), configured = new ConfiguredClientEndpoint());
363                 } else {
364                     // use the  factory in configured to reach the endpoint
365                     instance = configured.getFactory().createInstance(new ImmediateInstanceHandle<>(endpointInstance));
366                 }
367             }
368         }
369
370         EncodingFactory encodingFactory = EncodingFactory.createFactory(classIntrospecter, cec.getDecoders(), cec.getEncoders());
371         UndertowSession undertowSession = new UndertowSession(channel, connectionBuilder.getUri(), Collections.<String, String>emptyMap(), Collections.<String, List<String>>emptyMap(), sessionHandler, nullnew ImmediateInstanceHandle<>(endpointInstance), cec, connectionBuilder.getUri().getQuery(), encodingFactory.createEncoding(cec), configured, clientNegotiation.getSelectedSubProtocol(), extensions, connectionBuilder);
372         instance.onOpen(undertowSession, cec);
373         channel.resumeReceives();
374
375         return undertowSession;
376     }
377
378
379     @Override
380     public Session connectToServer(final Class<? extends Endpoint> endpointClass, final ClientEndpointConfig cec, final URI path) throws DeploymentException, IOException {
381         if(closed) {
382             throw new ClosedChannelException();
383         }
384         try {
385             Endpoint endpoint = classIntrospecter.createInstanceFactory(endpointClass).createInstance().getInstance();
386             return connectToServer(endpoint, cec, path);
387         } catch (InstantiationException | NoSuchMethodException e) {
388             throw new RuntimeException(e);
389         }
390     }
391
392
393     public void doUpgrade(HttpServletRequest request,
394                           HttpServletResponse response, final ServerEndpointConfig sec,
395                           Map<String,String> pathParams)
396             throws ServletException, IOException {
397         ServerEndpointConfig.Configurator configurator = sec.getConfigurator();
398         try {
399             EncodingFactory encodingFactory = EncodingFactory.createFactory(classIntrospecter, sec.getDecoders(), sec.getEncoders());
400             PathTemplate pt = PathTemplate.create(sec.getPath());
401
402             InstanceFactory<?> instanceFactory = null;
403             try {
404                 instanceFactory = classIntrospecter.createInstanceFactory(sec.getEndpointClass());
405             } catch (Exception e) {
406                 //so it is possible that this is still valid if a custom configurator is in use
407                 if (configurator == null || configurator.getClass() == ServerEndpointConfig.Configurator.class) {
408                     throw JsrWebSocketMessages.MESSAGES.couldNotDeploy(e);
409                 } else {
410                     instanceFactory = new InstanceFactory<Object>() {
411                         @Override
412                         public InstanceHandle<Object> createInstance() throws InstantiationException {
413                             throw JsrWebSocketMessages.MESSAGES.endpointDoesNotHaveAppropriateConstructor(sec.getEndpointClass());
414                         }
415                     };
416                 }
417             }
418             if (configurator == null) {
419                 configurator = DefaultContainerConfigurator.INSTANCE;
420             }
421
422             ServerEndpointConfig config = ServerEndpointConfig.Builder.create(sec.getEndpointClass(), sec.getPath())
423                     .decoders(sec.getDecoders())
424                     .encoders(sec.getEncoders())
425                     .subprotocols(sec.getSubprotocols())
426                     .extensions(sec.getExtensions())
427                     .configurator(configurator)
428                     .build();
429
430
431             AnnotatedEndpointFactory annotatedEndpointFactory = null;
432             if(!Endpoint.class.isAssignableFrom(sec.getEndpointClass())) {
433                 annotatedEndpointFactory = AnnotatedEndpointFactory.create(sec.getEndpointClass(), encodingFactory, pt.getParameterNames());
434             }
435
436
437             ConfiguredServerEndpoint confguredServerEndpoint;
438             if(annotatedEndpointFactory == null) {
439                 confguredServerEndpoint = new ConfiguredServerEndpoint(config, instanceFactory, null, encodingFactory);
440             } else {
441                 confguredServerEndpoint = new ConfiguredServerEndpoint(config, instanceFactory, null, encodingFactory, annotatedEndpointFactory, installedExtensions);
442             }
443             WebSocketHandshakeHolder hand;
444
445             WebSocketDeploymentInfo info = (WebSocketDeploymentInfo)request.getServletContext().getAttribute(WebSocketDeploymentInfo.ATTRIBUTE_NAME);
446             if (info == null || info.getExtensions() == null) {
447                 hand = ServerWebSocketContainer.handshakes(confguredServerEndpoint);
448             } else {
449                 hand = ServerWebSocketContainer.handshakes(confguredServerEndpoint, info.getExtensions());
450             }
451
452             final ServletWebSocketHttpExchange facade = new ServletWebSocketHttpExchange(request, response, new HashSet<WebSocketChannel>());
453             Handshake handshaker = null;
454             for (Handshake method : hand.handshakes) {
455                 if (method.matches(facade)) {
456                     handshaker = method;
457                     break;
458                 }
459             }
460
461             if (handshaker != null) {
462                 if(isClosed()) {
463                     response.sendError(StatusCodes.SERVICE_UNAVAILABLE);
464                     return;
465                 }
466                 facade.putAttachment(HandshakeUtil.PATH_PARAMS, pathParams);
467                 final Handshake selected = handshaker;
468                 facade.upgradeChannel(new HttpUpgradeListener() {
469                     @Override
470                     public void handleUpgrade(StreamConnection streamConnection, HttpServerExchange exchange) {
471                         WebSocketChannel channel = selected.createChannel(facade, streamConnection, facade.getBufferPool());
472                         new EndpointSessionHandler(ServerWebSocketContainer.this).onConnect(facade, channel);
473                     }
474                 });
475                 handshaker.handshake(facade);
476                 return;
477             }
478         } catch (Exception e) {
479             throw new ServletException(e);
480         }
481     }
482
483     private Session connectToServerInternal(final Endpoint endpointInstance, XnioSsl ssl, final ConfiguredClientEndpoint cec, final URI path) throws DeploymentException, IOException {
484         //in theory we should not be able to connect until the deployment is complete, but the definition of when a deployment is complete is a bit nebulous.
485         WebSocketClientNegotiation clientNegotiation = new ClientNegotiation(cec.getConfig().getPreferredSubprotocols(), toExtensionList(cec.getConfig().getExtensions()), cec.getConfig());
486
487
488         WebSocketClient.ConnectionBuilder connectionBuilder = WebSocketClient.connectionBuilder(xnioWorker.get(), bufferPool, path)
489                 .setSsl(ssl)
490                 .setBindAddress(clientBindAddress)
491                 .setClientNegotiation(clientNegotiation);
492         return connectToServerInternal(endpointInstance, cec, connectionBuilder);
493     }
494
495     private Session connectToServerInternal(final Endpoint endpointInstance, final ConfiguredClientEndpoint cec, WebSocketClient.ConnectionBuilder connectionBuilder) throws DeploymentException, IOException {
496
497         IoFuture<WebSocketChannel> session = connectionBuilder
498                 .connect();
499         Number timeout = (Number) cec.getConfig().getUserProperties().get(TIMEOUT);
500         IoFuture.Status result = session.await(timeout == null ? DEFAULT_WEB_SOCKET_TIMEOUT_SECONDS : timeout.intValue(), TimeUnit.SECONDS);
501         if(result == IoFuture.Status.WAITING) {
502             //add a notifier to close the channel if the connection actually completes
503
504             session.cancel();
505             session.addNotifier(new IoFuture.HandlingNotifier<WebSocketChannel, Object>() {
506                 @Override
507                 public void handleDone(WebSocketChannel data, Object attachment) {
508                     IoUtils.safeClose(data);
509                 }
510             }, null);
511             throw JsrWebSocketMessages.MESSAGES.connectionTimedOut();
512         }
513
514         WebSocketChannel channel;
515         try {
516             channel = session.get();
517         } catch (UpgradeFailedException e) {
518             throw new DeploymentException(e.getMessage(), e);
519         }
520         EndpointSessionHandler sessionHandler = new EndpointSessionHandler(this);
521
522         final List<Extension> extensions = new ArrayList<>();
523         final Map<String, Extension> extMap = new HashMap<>();
524         for (Extension ext : cec.getConfig().getExtensions()) {
525             extMap.put(ext.getName(), ext);
526         }
527         String subProtocol = null;
528         if(connectionBuilder.getClientNegotiation() != null) {
529             for (WebSocketExtension e : connectionBuilder.getClientNegotiation().getSelectedExtensions()) {
530                 Extension ext = extMap.get(e.getName());
531                 if (ext == null) {
532                     throw JsrWebSocketMessages.MESSAGES.extensionWasNotPresentInClientHandshake(e.getName(), connectionBuilder.getClientNegotiation().getSupportedExtensions());
533                 }
534                 extensions.add(ExtensionImpl.create(e));
535             }
536             subProtocol = connectionBuilder.getClientNegotiation().getSelectedSubProtocol();
537         }
538
539         UndertowSession undertowSession = new UndertowSession(channel, connectionBuilder.getUri(), Collections.<String, String>emptyMap(), Collections.<String, List<String>>emptyMap(), sessionHandler, nullnew ImmediateInstanceHandle<>(endpointInstance), cec.getConfig(), connectionBuilder.getUri().getQuery(), cec.getEncodingFactory().createEncoding(cec.getConfig()), cec, subProtocol, extensions, connectionBuilder);
540         endpointInstance.onOpen(undertowSession, cec.getConfig());
541         channel.resumeReceives();
542
543         return undertowSession;
544     }
545
546     @Override
547     public long getDefaultMaxSessionIdleTimeout() {
548         return defaultMaxSessionIdleTimeout;
549     }
550
551     @Override
552     public void setDefaultMaxSessionIdleTimeout(final long timeout) {
553         this.defaultMaxSessionIdleTimeout = timeout;
554     }
555
556     @Override
557     public int getDefaultMaxBinaryMessageBufferSize() {
558         return defaultMaxBinaryMessageBufferSize;
559     }
560
561     @Override
562     public void setDefaultMaxBinaryMessageBufferSize(int defaultMaxBinaryMessageBufferSize) {
563         this.defaultMaxBinaryMessageBufferSize = defaultMaxBinaryMessageBufferSize;
564     }
565
566     @Override
567     public int getDefaultMaxTextMessageBufferSize() {
568         return defaultMaxTextMessageBufferSize;
569     }
570
571     @Override
572     public void setDefaultMaxTextMessageBufferSize(int defaultMaxTextMessageBufferSize) {
573         this.defaultMaxTextMessageBufferSize = defaultMaxTextMessageBufferSize;
574     }
575
576     @Override
577     public Set<Extension> getInstalledExtensions() {
578         return new HashSet<>(installedExtensions);
579     }
580
581     /**
582      * Runs a web socket invocation, setting up the threads and dispatching a thread pool
583      * <p>
584      * Unfortunately we need to dispatch to a thread pool, because there is a good chance that the endpoint
585      * will use blocking IO methods. We suspend recieves while this is in progress, to make sure that we do not have multiple
586      * methods invoked at once.
587      * <p>
588      *
589      * @param invocation The task to run
590      */

591     public void invokeEndpointMethod(final Executor executor, final Runnable invocation) {
592         if (dispatchToWorker) {
593             executor.execute(new Runnable() {
594                 @Override
595                 public void run() {
596                     invokeEndpointMethod(invocation);
597                 }
598             });
599         } else {
600             invokeEndpointMethod(invocation);
601         }
602     }
603
604     /**
605      * Directly invokes an endpoint method, without dispatching to an executor
606      * @param invocation The invocation
607      */

608     public void invokeEndpointMethod(final Runnable invocation) {
609         try {
610             invokeEndpointTask.call(null, invocation);
611         } catch (Exception e) {
612             throw new RuntimeException(e);
613         }
614     }
615
616     @Override
617     public void addEndpoint(final Class<?> endpoint) throws DeploymentException {
618         if (deploymentComplete) {
619             throw JsrWebSocketMessages.MESSAGES.cannotAddEndpointAfterDeployment();
620         }
621         //work around a TCK7 problem
622         //if the class has already been added we just ignore it
623         if(annotatedEndpointClasses.contains(endpoint)) {
624             return;
625         }
626         annotatedEndpointClasses.add(endpoint);
627         try {
628             addEndpointInternal(endpoint, true);
629         } catch (DeploymentException e) {
630             deploymentExceptions.add(e);
631             throw e;
632         }
633     }
634
635     private synchronized void addEndpointInternal(final Class<?> endpoint, boolean requiresCreation) throws DeploymentException {
636         ServerEndpoint serverEndpoint = endpoint.getAnnotation(ServerEndpoint.class);
637         ClientEndpoint clientEndpoint = endpoint.getAnnotation(ClientEndpoint.class);
638         if (serverEndpoint != null) {
639             JsrWebSocketLogger.ROOT_LOGGER.addingAnnotatedServerEndpoint(endpoint, serverEndpoint.value());
640             final PathTemplate template = PathTemplate.create(serverEndpoint.value());
641             if (seenPaths.contains(template)) {
642                 PathTemplate existing = null;
643                 for (PathTemplate p : seenPaths) {
644                     if (p.compareTo(template) == 0) {
645                         existing = p;
646                         break;
647                     }
648                 }
649                 throw JsrWebSocketMessages.MESSAGES.multipleEndpointsWithOverlappingPaths(template, existing);
650             }
651             seenPaths.add(template);
652             Class<? extends ServerEndpointConfig.Configurator> configuratorClass = serverEndpoint.configurator();
653
654             EncodingFactory encodingFactory = EncodingFactory.createFactory(classIntrospecter, serverEndpoint.decoders(), serverEndpoint.encoders());
655             AnnotatedEndpointFactory annotatedEndpointFactory = AnnotatedEndpointFactory.create(endpoint, encodingFactory, template.getParameterNames());
656             InstanceFactory<?> instanceFactory = null;
657             try {
658                 instanceFactory = classIntrospecter.createInstanceFactory(endpoint);
659             } catch (Exception e) {
660                 //so it is possible that this is still valid if a custom configurator is in use
661                 if(configuratorClass == ServerEndpointConfig.Configurator.class) {
662                     throw JsrWebSocketMessages.MESSAGES.couldNotDeploy(e);
663                 } else {
664                     instanceFactory = new InstanceFactory<Object>() {
665                         @Override
666                         public InstanceHandle<Object> createInstance() throws InstantiationException {
667                             throw JsrWebSocketMessages.MESSAGES.endpointDoesNotHaveAppropriateConstructor(endpoint);
668                         }
669                     };
670                 }
671             }
672             ServerEndpointConfig.Configurator configurator;
673             if (configuratorClass != ServerEndpointConfig.Configurator.class) {
674                 try {
675                     configurator = classIntrospecter.createInstanceFactory(configuratorClass).createInstance().getInstance();
676                 } catch (InstantiationException | NoSuchMethodException e) {
677                     throw JsrWebSocketMessages.MESSAGES.couldNotDeploy(e);
678                 }
679             } else {
680                 configurator = DefaultContainerConfigurator.INSTANCE;
681             }
682
683             ServerEndpointConfig config = ServerEndpointConfig.Builder.create(endpoint, serverEndpoint.value())
684                     .decoders(Arrays.asList(serverEndpoint.decoders()))
685                     .encoders(Arrays.asList(serverEndpoint.encoders()))
686                     .subprotocols(Arrays.asList(serverEndpoint.subprotocols()))
687                     .extensions(Collections.<Extension>emptyList())
688                     .configurator(configurator)
689                     .build();
690
691
692             ConfiguredServerEndpoint confguredServerEndpoint = new ConfiguredServerEndpoint(config, instanceFactory, template, encodingFactory, annotatedEndpointFactory, installedExtensions);
693             configuredServerEndpoints.add(confguredServerEndpoint);
694             handleAddingFilterMapping();
695         } else if (clientEndpoint != null) {
696             JsrWebSocketLogger.ROOT_LOGGER.addingAnnotatedClientEndpoint(endpoint);
697             EncodingFactory encodingFactory = EncodingFactory.createFactory(classIntrospecter, clientEndpoint.decoders(), clientEndpoint.encoders());
698             InstanceFactory<?> instanceFactory;
699             try {
700                 instanceFactory = classIntrospecter.createInstanceFactory(endpoint);
701             } catch (Exception e) {
702                 try {
703                     instanceFactory = new ConstructorInstanceFactory<>(endpoint.getConstructor()); //this endpoint cannot be created by the container, the user will instantiate it
704                 } catch (NoSuchMethodException e1) {
705                     if(requiresCreation) {
706                         throw JsrWebSocketMessages.MESSAGES.couldNotDeploy(e);
707                     } else {
708                         instanceFactory = new InstanceFactory<Object>() {
709                             @Override
710                             public InstanceHandle<Object> createInstance() throws InstantiationException {
711                                 throw new InstantiationException();
712                             }
713                         };
714                     }
715                 }
716             }
717             AnnotatedEndpointFactory factory = AnnotatedEndpointFactory.create(endpoint, encodingFactory, Collections.<String>emptySet());
718
719             ClientEndpointConfig.Configurator configurator = null;
720             try {
721                 configurator = classIntrospecter.createInstanceFactory(clientEndpoint.configurator()).createInstance().getInstance();
722             } catch (InstantiationException | NoSuchMethodException e) {
723                 throw JsrWebSocketMessages.MESSAGES.couldNotDeploy(e);
724             }
725             ClientEndpointConfig config = ClientEndpointConfig.Builder.create()
726                     .decoders(Arrays.asList(clientEndpoint.decoders()))
727                     .encoders(Arrays.asList(clientEndpoint.encoders()))
728                     .preferredSubprotocols(Arrays.asList(clientEndpoint.subprotocols()))
729                     .configurator(configurator)
730                     .build();
731
732             ConfiguredClientEndpoint configuredClientEndpoint = new ConfiguredClientEndpoint(config, factory, encodingFactory, instanceFactory);
733             clientEndpoints.put(endpoint, configuredClientEndpoint);
734         } else {
735             throw JsrWebSocketMessages.MESSAGES.classWasNotAnnotated(endpoint);
736         }
737     }
738
739
740     private void handleAddingFilterMapping() {
741         if (contextToAddFilter != null) {
742             contextToAddFilter.getDeployment().getDeploymentInfo().addFilterUrlMapping(Bootstrap.FILTER_NAME, "/*", DispatcherType.REQUEST);
743             contextToAddFilter.getDeployment().getServletPaths().invalidate();
744             contextToAddFilter = null;
745         }
746     }
747
748     @Override
749     public void addEndpoint(final ServerEndpointConfig endpoint) throws DeploymentException {
750         if (deploymentComplete) {
751             throw JsrWebSocketMessages.MESSAGES.cannotAddEndpointAfterDeployment();
752         }
753         JsrWebSocketLogger.ROOT_LOGGER.addingProgramaticEndpoint(endpoint.getEndpointClass(), endpoint.getPath());
754         final PathTemplate template = PathTemplate.create(endpoint.getPath());
755         if (seenPaths.contains(template)) {
756             PathTemplate existing = null;
757             for (PathTemplate p : seenPaths) {
758                 if (p.compareTo(template) == 0) {
759                     existing = p;
760                     break;
761                 }
762             }
763             throw JsrWebSocketMessages.MESSAGES.multipleEndpointsWithOverlappingPaths(template, existing);
764         }
765         seenPaths.add(template);
766         EncodingFactory encodingFactory = EncodingFactory.createFactory(classIntrospecter, endpoint.getDecoders(), endpoint.getEncoders());
767
768         AnnotatedEndpointFactory annotatedEndpointFactory = null;
769         if(!Endpoint.class.isAssignableFrom(endpoint.getEndpointClass())) {
770             // We may want to check that the path in @ServerEndpoint matches the specified path, and throw if they are not equivalent
771             annotatedEndpointFactory = AnnotatedEndpointFactory.create(endpoint.getEndpointClass(), encodingFactory, template.getParameterNames());
772         }
773         ConfiguredServerEndpoint confguredServerEndpoint = new ConfiguredServerEndpoint(endpoint, null, template, encodingFactory, annotatedEndpointFactory, endpoint.getExtensions());
774         configuredServerEndpoints.add(confguredServerEndpoint);
775         handleAddingFilterMapping();
776     }
777
778
779     private ConfiguredClientEndpoint getClientEndpoint(final Class<?> endpointType, boolean requiresCreation) {
780         Class<?> type = endpointType;
781         while (type != Object.class && type != null && !type.isAnnotationPresent(ClientEndpoint.class)) {
782             type = type.getSuperclass();
783         }
784         if(type == Object.class || type == null) {
785             return null;
786         }
787
788         ConfiguredClientEndpoint existing = clientEndpoints.get(type);
789         if (existing != null) {
790             return existing;
791         }
792         synchronized (this) {
793             existing = clientEndpoints.get(type);
794             if (existing != null) {
795                 return existing;
796             }
797             if (type.isAnnotationPresent(ClientEndpoint.class)) {
798                 try {
799                     addEndpointInternal(type, requiresCreation);
800                     return clientEndpoints.get(type);
801                 } catch (DeploymentException e) {
802                     throw new RuntimeException(e);
803                 }
804             }
805             return null;
806         }
807     }
808
809
810
811     public void validateDeployment() {
812         if(!deploymentExceptions.isEmpty()) {
813             RuntimeException e = JsrWebSocketMessages.MESSAGES.deploymentFailedDueToProgramaticErrors();
814             for(DeploymentException ex : deploymentExceptions) {
815                 e.addSuppressed(ex);
816             }
817             throw e;
818         }
819     }
820
821     public void deploymentComplete() {
822         deploymentComplete = true;
823         validateDeployment();
824     }
825
826     public List<ConfiguredServerEndpoint> getConfiguredServerEndpoints() {
827         return configuredServerEndpoints;
828     }
829
830     public ServletContextImpl getContextToAddFilter() {
831         return contextToAddFilter;
832     }
833
834     public void setContextToAddFilter(ServletContextImpl contextToAddFilter) {
835         this.contextToAddFilter = contextToAddFilter;
836     }
837
838     public synchronized void close(int waitTime) {
839         doClose();
840         //wait for them to close
841         long end = currentTimeMillis() + waitTime;
842         for (ConfiguredServerEndpoint endpoint : configuredServerEndpoints) {
843             endpoint.awaitClose(end - System.currentTimeMillis());
844         }
845     }
846     @Override
847     public synchronized void close() {
848         close(10000);
849     }
850
851     public ByteBufferPool getBufferPool() {
852         return bufferPool;
853     }
854
855     public XnioWorker getXnioWorker() {
856         return xnioWorker.get();
857     }
858
859     private static List<WebSocketExtension> toExtensionList(final List<Extension> extensions) {
860         List<WebSocketExtension> ret = new ArrayList<>();
861         for (Extension e : extensions) {
862             final List<WebSocketExtension.Parameter> parameters = new ArrayList<>();
863             for (Extension.Parameter p : e.getParameters()) {
864                 parameters.add(new WebSocketExtension.Parameter(p.getName(), p.getValue()));
865             }
866             ret.add(new WebSocketExtension(e.getName(), parameters));
867         }
868         return ret;
869     }
870
871     private static class ClientNegotiation extends WebSocketClientNegotiation {
872
873         private final ClientEndpointConfig config;
874
875         ClientNegotiation(List<String> supportedSubProtocols, List<WebSocketExtension> supportedExtensions, ClientEndpointConfig config) {
876             super(supportedSubProtocols, supportedExtensions);
877             this.config = config;
878         }
879
880         @Override
881         public void afterRequest(final Map<String, List<String>> headers) {
882
883             ClientEndpointConfig.Configurator configurator = config.getConfigurator();
884             if (configurator != null) {
885                 final Map<String, List<String>> newHeaders = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
886                 for (Map.Entry<String, List<String>> entry : headers.entrySet()) {
887                     ArrayList<String> arrayList = new ArrayList<>();
888                     arrayList.addAll(entry.getValue());
889                     newHeaders.put(entry.getKey(), arrayList);
890                 }
891                 configurator.afterResponse(new HandshakeResponse() {
892                     @Override
893                     public Map<String, List<String>> getHeaders() {
894                         return newHeaders;
895                     }
896                 });
897             }
898         }
899
900         @Override
901         public void beforeRequest(Map<String, List<String>> headers) {
902             ClientEndpointConfig.Configurator configurator = config.getConfigurator();
903             if (configurator != null) {
904                 final Map<String, List<String>> newHeaders = new HashMap<>();
905                 for (Map.Entry<String, List<String>> entry : headers.entrySet()) {
906                     ArrayList<String> arrayList = new ArrayList<>();
907                     arrayList.addAll(entry.getValue());
908                     newHeaders.put(entry.getKey(), arrayList);
909                 }
910                 configurator.beforeRequest(newHeaders);
911                 headers.clear(); //TODO: more efficient way
912                 for (Map.Entry<String, List<String>> entry : newHeaders.entrySet()) {
913                     if (!entry.getValue().isEmpty()) {
914                         headers.put(entry.getKey(), entry.getValue());
915                     }
916                 }
917             }
918         }
919     }
920
921     /**
922      * Pauses the container
923      * @param listener
924      */

925     public synchronized void pause(PauseListener listener) {
926         closed = true;
927         if(configuredServerEndpoints.isEmpty()) {
928             listener.paused();
929             return;
930         }
931         if(listener != null) {
932             pauseListeners.add(listener);
933         }
934         for (ConfiguredServerEndpoint endpoint : configuredServerEndpoints) {
935             for (final Session session : endpoint.getOpenSessions()) {
936                 ((UndertowSession)session).getExecutor().execute(new Runnable() {
937                     @Override
938                     public void run() {
939                         try {
940                             session.close(new CloseReason(CloseReason.CloseCodes.GOING_AWAY, ""));
941                         } catch (Exception e) {
942                             JsrWebSocketLogger.ROOT_LOGGER.couldNotCloseOnUndeploy(e);
943                         }
944                     }
945                 });
946             }
947         }
948
949         Runnable done = new Runnable() {
950
951             int count = configuredServerEndpoints.size();
952
953             @Override
954             public synchronized void run() {
955                 List<PauseListener> copy = null;
956                 synchronized (ServerWebSocketContainer.this) {
957                     count--;
958                     if (count == 0) {
959                         copy = new ArrayList<>(pauseListeners);
960                         pauseListeners.clear();
961                     }
962                 }
963                 if(copy != null) {
964                     for (PauseListener p : copy) {
965                         p.paused();
966                     }
967                 }
968             }
969         };
970
971         for (ConfiguredServerEndpoint endpoint : configuredServerEndpoints) {
972             endpoint.notifyClosed(done);
973         }
974     }
975
976     private void doClose() {
977         closed = true;
978         for (ConfiguredServerEndpoint endpoint : configuredServerEndpoints) {
979             for (Session session : endpoint.getOpenSessions()) {
980                 try {
981                     session.close(new CloseReason(CloseReason.CloseCodes.GOING_AWAY, ""));
982                 } catch (Exception e) {
983                     JsrWebSocketLogger.ROOT_LOGGER.couldNotCloseOnUndeploy(e);
984                 }
985             }
986         }
987     }
988
989     static WebSocketHandshakeHolder handshakes(ConfiguredServerEndpoint config) {
990         List<Handshake> handshakes = new ArrayList<>();
991         handshakes.add(new JsrHybi13Handshake(config));
992         handshakes.add(new JsrHybi08Handshake(config));
993         handshakes.add(new JsrHybi07Handshake(config));
994         return new WebSocketHandshakeHolder(handshakes, config);
995     }
996
997     static WebSocketHandshakeHolder handshakes(ConfiguredServerEndpoint config, List<ExtensionHandshake> extensions) {
998         List<Handshake> handshakes = new ArrayList<>();
999         Handshake jsrHybi13Handshake = new JsrHybi13Handshake(config);
1000         Handshake jsrHybi08Handshake = new JsrHybi08Handshake(config);
1001         Handshake jsrHybi07Handshake = new JsrHybi07Handshake(config);
1002         for (ExtensionHandshake extension : extensions) {
1003             jsrHybi13Handshake.addExtension(extension);
1004             jsrHybi08Handshake.addExtension(extension);
1005             jsrHybi07Handshake.addExtension(extension);
1006         }
1007         handshakes.add(jsrHybi13Handshake);
1008         handshakes.add(jsrHybi08Handshake);
1009         handshakes.add(jsrHybi07Handshake);
1010         return new WebSocketHandshakeHolder(handshakes, config);
1011     }
1012
1013     static final class WebSocketHandshakeHolder {
1014         final List<Handshake> handshakes;
1015         final ConfiguredServerEndpoint endpoint;
1016
1017         private WebSocketHandshakeHolder(List<Handshake> handshakes, ConfiguredServerEndpoint endpoint) {
1018             this.handshakes = handshakes;
1019             this.endpoint = endpoint;
1020         }
1021     }
1022     /**
1023      * resumes a paused container
1024      */

1025     public synchronized void resume() {
1026         closed = false;
1027         for(PauseListener p : pauseListeners) {
1028             p.resumed();
1029         }
1030         pauseListeners.clear();
1031     }
1032
1033     public WebSocketReconnectHandler getWebSocketReconnectHandler() {
1034         return webSocketReconnectHandler;
1035     }
1036
1037     public boolean isClosed() {
1038         return closed;
1039     }
1040
1041     public interface PauseListener {
1042         void paused();
1043
1044         void resumed();
1045     }
1046
1047     public boolean isDispatchToWorker() {
1048         return dispatchToWorker;
1049     }
1050 }
1051