1
18 package io.undertow.websockets.jsr;
19
20 import io.undertow.protocols.ssl.UndertowXnioSsl;
21 import io.undertow.server.HttpServerExchange;
22 import io.undertow.server.HttpUpgradeListener;
23 import io.undertow.servlet.api.ClassIntrospecter;
24 import io.undertow.servlet.api.InstanceFactory;
25 import io.undertow.servlet.api.InstanceHandle;
26 import io.undertow.servlet.api.ThreadSetupHandler;
27 import io.undertow.servlet.spec.ServletContextImpl;
28 import io.undertow.servlet.util.ConstructorInstanceFactory;
29 import io.undertow.servlet.util.ImmediateInstanceHandle;
30 import io.undertow.servlet.websockets.ServletWebSocketHttpExchange;
31 import io.undertow.util.CopyOnWriteMap;
32 import io.undertow.util.PathTemplate;
33 import io.undertow.util.StatusCodes;
34 import io.undertow.websockets.WebSocketExtension;
35 import io.undertow.websockets.client.WebSocketClient;
36 import io.undertow.websockets.client.WebSocketClientNegotiation;
37 import io.undertow.websockets.core.WebSocketChannel;
38 import io.undertow.websockets.core.protocol.Handshake;
39 import io.undertow.websockets.extensions.ExtensionHandshake;
40 import io.undertow.websockets.jsr.annotated.AnnotatedEndpointFactory;
41 import io.undertow.websockets.jsr.handshake.HandshakeUtil;
42 import io.undertow.websockets.jsr.handshake.JsrHybi07Handshake;
43 import io.undertow.websockets.jsr.handshake.JsrHybi08Handshake;
44 import io.undertow.websockets.jsr.handshake.JsrHybi13Handshake;
45 import org.xnio.IoFuture;
46 import org.xnio.IoUtils;
47 import io.undertow.connector.ByteBufferPool;
48
49 import org.xnio.OptionMap;
50 import org.xnio.StreamConnection;
51 import org.xnio.XnioWorker;
52 import org.xnio.http.UpgradeFailedException;
53 import org.xnio.ssl.XnioSsl;
54
55 import javax.net.ssl.SSLContext;
56 import javax.servlet.DispatcherType;
57 import javax.servlet.ServletException;
58 import javax.servlet.http.HttpServletRequest;
59 import javax.servlet.http.HttpServletResponse;
60 import javax.websocket.ClientEndpoint;
61 import javax.websocket.ClientEndpointConfig;
62 import javax.websocket.CloseReason;
63 import javax.websocket.DeploymentException;
64 import javax.websocket.Endpoint;
65 import javax.websocket.Extension;
66 import javax.websocket.HandshakeResponse;
67 import javax.websocket.Session;
68 import javax.websocket.server.ServerContainer;
69 import javax.websocket.server.ServerEndpoint;
70 import javax.websocket.server.ServerEndpointConfig;
71 import java.io.Closeable;
72 import java.io.IOException;
73 import java.net.InetSocketAddress;
74 import java.net.URI;
75 import java.nio.channels.ClosedChannelException;
76 import java.security.NoSuchAlgorithmException;
77 import java.util.ArrayList;
78 import java.util.Arrays;
79 import java.util.Collections;
80 import java.util.HashMap;
81 import java.util.HashSet;
82 import java.util.List;
83 import java.util.Map;
84 import java.util.ServiceLoader;
85 import java.util.Set;
86 import java.util.TreeMap;
87 import java.util.TreeSet;
88 import java.util.concurrent.Executor;
89 import java.util.concurrent.TimeUnit;
90 import java.util.function.Supplier;
91
92 import static java.lang.System.*;
93
94
95
100 public class ServerWebSocketContainer implements ServerContainer, Closeable {
101
102 public static final String TIMEOUT = "io.undertow.websocket.CONNECT_TIMEOUT";
103 public static final int DEFAULT_WEB_SOCKET_TIMEOUT_SECONDS = 10;
104
105 private final ClassIntrospecter classIntrospecter;
106
107 private final Map<Class<?>, ConfiguredClientEndpoint> clientEndpoints = new CopyOnWriteMap<>();
108
109 private final List<ConfiguredServerEndpoint> configuredServerEndpoints = new ArrayList<>();
110 private final Set<Class<?>> annotatedEndpointClasses = new HashSet<>();
111
112
116 private final TreeSet<PathTemplate> seenPaths = new TreeSet<>();
117
118 private final Supplier<XnioWorker> xnioWorker;
119 private final ByteBufferPool bufferPool;
120 private final boolean dispatchToWorker;
121 private final InetSocketAddress clientBindAddress;
122 private final WebSocketReconnectHandler webSocketReconnectHandler;
123
124 private volatile long defaultAsyncSendTimeout;
125 private volatile long defaultMaxSessionIdleTimeout;
126 private volatile int defaultMaxBinaryMessageBufferSize;
127 private volatile int defaultMaxTextMessageBufferSize;
128 private volatile boolean deploymentComplete = false;
129 private final List<DeploymentException> deploymentExceptions = new ArrayList<>();
130
131 private ServletContextImpl contextToAddFilter = null;
132
133 private final List<WebsocketClientSslProvider> clientSslProviders;
134 private final List<PauseListener> pauseListeners = new ArrayList<>();
135 private final List<Extension> installedExtensions;
136
137 private final ThreadSetupHandler.Action<Void, Runnable> invokeEndpointTask;
138
139 private volatile boolean closed = false;
140
141 public ServerWebSocketContainer(final ClassIntrospecter classIntrospecter, final Supplier<XnioWorker> xnioWorker, ByteBufferPool bufferPool, List<ThreadSetupHandler> threadSetupHandlers, boolean dispatchToWorker, boolean clientMode) {
142 this(classIntrospecter, ServerWebSocketContainer.class.getClassLoader(), xnioWorker, bufferPool, threadSetupHandlers, dispatchToWorker, null, null);
143 }
144
145 public ServerWebSocketContainer(final ClassIntrospecter classIntrospecter, final ClassLoader classLoader, Supplier<XnioWorker> xnioWorker, ByteBufferPool bufferPool, List<ThreadSetupHandler> threadSetupHandlers, boolean dispatchToWorker) {
146 this(classIntrospecter, classLoader, xnioWorker, bufferPool, threadSetupHandlers, dispatchToWorker, null, null);
147 }
148
149 public ServerWebSocketContainer(final ClassIntrospecter classIntrospecter, final ClassLoader classLoader, Supplier<XnioWorker> xnioWorker, ByteBufferPool bufferPool, List<ThreadSetupHandler> threadSetupHandlers, boolean dispatchToWorker, InetSocketAddress clientBindAddress, WebSocketReconnectHandler reconnectHandler) {
150 this(classIntrospecter, classLoader, xnioWorker, bufferPool, threadSetupHandlers, dispatchToWorker, clientBindAddress, reconnectHandler, Collections.emptyList());
151 }
152
153 public ServerWebSocketContainer(final ClassIntrospecter classIntrospecter, final ClassLoader classLoader, Supplier<XnioWorker> xnioWorker, ByteBufferPool bufferPool, List<ThreadSetupHandler> threadSetupHandlers, boolean dispatchToWorker, InetSocketAddress clientBindAddress, WebSocketReconnectHandler reconnectHandler, List<Extension> installedExtensions) {
154 this.classIntrospecter = classIntrospecter;
155 this.bufferPool = bufferPool;
156 this.xnioWorker = xnioWorker;
157 this.dispatchToWorker = dispatchToWorker;
158 this.clientBindAddress = clientBindAddress;
159 this.installedExtensions = new ArrayList<>(installedExtensions);
160 List<WebsocketClientSslProvider> clientSslProviders = new ArrayList<>();
161 for (WebsocketClientSslProvider provider : ServiceLoader.load(WebsocketClientSslProvider.class, classLoader)) {
162 clientSslProviders.add(provider);
163 }
164
165 this.clientSslProviders = Collections.unmodifiableList(clientSslProviders);
166 this.webSocketReconnectHandler = reconnectHandler;
167 ThreadSetupHandler.Action<Void, Runnable> task = new ThreadSetupHandler.Action<Void, Runnable>() {
168 @Override
169 public Void call(HttpServerExchange exchange, Runnable context) throws Exception {
170 context.run();
171 return null;
172 }
173 };
174 for(ThreadSetupHandler handler : threadSetupHandlers) {
175 task = handler.create(task);
176 }
177 this.invokeEndpointTask = task;
178 }
179
180 @Override
181 public long getDefaultAsyncSendTimeout() {
182 return defaultAsyncSendTimeout;
183 }
184
185 @Override
186 public void setAsyncSendTimeout(long defaultAsyncSendTimeout) {
187 this.defaultAsyncSendTimeout = defaultAsyncSendTimeout;
188 }
189
190 public Session connectToServer(final Object annotatedEndpointInstance, WebSocketClient.ConnectionBuilder connectionBuilder) throws DeploymentException, IOException {
191 if(closed) {
192 throw new ClosedChannelException();
193 }
194 ConfiguredClientEndpoint config = getClientEndpoint(annotatedEndpointInstance.getClass(), false);
195 if (config == null) {
196 throw JsrWebSocketMessages.MESSAGES.notAValidClientEndpointType(annotatedEndpointInstance.getClass());
197 }
198 Endpoint instance = config.getFactory().createInstance(new ImmediateInstanceHandle<>(annotatedEndpointInstance));
199 return connectToServerInternal(instance, config, connectionBuilder);
200 }
201
202 @Override
203 public Session connectToServer(final Object annotatedEndpointInstance, final URI path) throws DeploymentException, IOException {
204 if(closed) {
205 throw new ClosedChannelException();
206 }
207 ConfiguredClientEndpoint config = getClientEndpoint(annotatedEndpointInstance.getClass(), false);
208 if (config == null) {
209 throw JsrWebSocketMessages.MESSAGES.notAValidClientEndpointType(annotatedEndpointInstance.getClass());
210 }
211 Endpoint instance = config.getFactory().createInstance(new ImmediateInstanceHandle<>(annotatedEndpointInstance));
212 XnioSsl ssl = null;
213 for (WebsocketClientSslProvider provider : clientSslProviders) {
214 ssl = provider.getSsl(xnioWorker.get(), annotatedEndpointInstance, path);
215 if (ssl != null) {
216 break;
217 }
218 }
219 if(ssl == null) {
220 try {
221 ssl = new UndertowXnioSsl(xnioWorker.get().getXnio(), OptionMap.EMPTY, SSLContext.getDefault());
222 } catch (NoSuchAlgorithmException e) {
223
224 }
225 }
226 return connectToServerInternal(instance, ssl, config, path);
227 }
228
229 public Session connectToServer(Class<?> aClass, WebSocketClient.ConnectionBuilder connectionBuilder) throws DeploymentException, IOException {
230 if(closed) {
231 throw new ClosedChannelException();
232 }
233 ConfiguredClientEndpoint config = getClientEndpoint(aClass, true);
234 if (config == null) {
235 throw JsrWebSocketMessages.MESSAGES.notAValidClientEndpointType(aClass);
236 }
237 try {
238 AnnotatedEndpointFactory factory = config.getFactory();
239 InstanceHandle<?> instance = config.getInstanceFactory().createInstance();
240 return connectToServerInternal(factory.createInstance(instance), config, connectionBuilder);
241 } catch (InstantiationException e) {
242 throw new RuntimeException(e);
243 }
244 }
245
246 @Override
247 public Session connectToServer(Class<?> aClass, URI uri) throws DeploymentException, IOException {
248 if(closed) {
249 throw new ClosedChannelException();
250 }
251 ConfiguredClientEndpoint config = getClientEndpoint(aClass, true);
252 if (config == null) {
253 throw JsrWebSocketMessages.MESSAGES.notAValidClientEndpointType(aClass);
254 }
255 try {
256 AnnotatedEndpointFactory factory = config.getFactory();
257
258
259 InstanceHandle<?> instance = config.getInstanceFactory().createInstance();
260 XnioSsl ssl = null;
261 for (WebsocketClientSslProvider provider : clientSslProviders) {
262 ssl = provider.getSsl(xnioWorker.get(), aClass, uri);
263 if (ssl != null) {
264 break;
265 }
266 }
267 if(ssl == null) {
268 try {
269 ssl = new UndertowXnioSsl(xnioWorker.get().getXnio(), OptionMap.EMPTY, SSLContext.getDefault());
270 } catch (NoSuchAlgorithmException e) {
271
272 }
273 }
274 return connectToServerInternal(factory.createInstance(instance), ssl, config, uri);
275 } catch (InstantiationException e) {
276 throw new RuntimeException(e);
277 }
278 }
279
280 @Override
281 public Session connectToServer(final Endpoint endpointInstance, final ClientEndpointConfig config, final URI path) throws DeploymentException, IOException {
282 if(closed) {
283 throw new ClosedChannelException();
284 }
285 ClientEndpointConfig cec = config != null ? config : ClientEndpointConfig.Builder.create().build();
286 XnioSsl ssl = null;
287 for (WebsocketClientSslProvider provider : clientSslProviders) {
288 ssl = provider.getSsl(xnioWorker.get(), endpointInstance, cec, path);
289 if (ssl != null) {
290 break;
291 }
292 }
293 if(ssl == null) {
294 try {
295 ssl = new UndertowXnioSsl(xnioWorker.get().getXnio(), OptionMap.EMPTY, SSLContext.getDefault());
296 } catch (NoSuchAlgorithmException e) {
297
298 }
299 }
300
301 WebSocketClientNegotiation clientNegotiation = new ClientNegotiation(cec.getPreferredSubprotocols(), toExtensionList(cec.getExtensions()), cec);
302
303
304 WebSocketClient.ConnectionBuilder connectionBuilder = WebSocketClient.connectionBuilder(xnioWorker.get(), bufferPool, path)
305 .setSsl(ssl)
306 .setBindAddress(clientBindAddress)
307 .setClientNegotiation(clientNegotiation);
308
309 return connectToServer(endpointInstance, config, connectionBuilder);
310 }
311
312 public Session connectToServer(final Endpoint endpointInstance, final ClientEndpointConfig config, WebSocketClient.ConnectionBuilder connectionBuilder) throws DeploymentException, IOException {
313 if(closed) {
314 throw new ClosedChannelException();
315 }
316 ClientEndpointConfig cec = config != null ? config : ClientEndpointConfig.Builder.create().build();
317
318 WebSocketClientNegotiation clientNegotiation = connectionBuilder.getClientNegotiation();
319
320 IoFuture<WebSocketChannel> session = connectionBuilder
321 .connect();
322 Number timeout = (Number) cec.getUserProperties().get(TIMEOUT);
323 if(session.await(timeout == null ? DEFAULT_WEB_SOCKET_TIMEOUT_SECONDS: timeout.intValue(), TimeUnit.SECONDS) == IoFuture.Status.WAITING) {
324
325 session.cancel();
326 session.addNotifier(new IoFuture.HandlingNotifier<WebSocketChannel, Object>() {
327 @Override
328 public void handleDone(WebSocketChannel data, Object attachment) {
329 IoUtils.safeClose(data);
330 }
331 }, null);
332 throw JsrWebSocketMessages.MESSAGES.connectionTimedOut();
333 }
334 WebSocketChannel channel;
335 try {
336 channel = session.get();
337 } catch (UpgradeFailedException e) {
338 throw new DeploymentException(e.getMessage(), e);
339 }
340 EndpointSessionHandler sessionHandler = new EndpointSessionHandler(this);
341
342 final List<Extension> extensions = new ArrayList<>();
343 final Map<String, Extension> extMap = new HashMap<>();
344 for (Extension ext : cec.getExtensions()) {
345 extMap.put(ext.getName(), ext);
346 }
347 for (WebSocketExtension e : clientNegotiation.getSelectedExtensions()) {
348 Extension ext = extMap.get(e.getName());
349 if (ext == null) {
350 throw JsrWebSocketMessages.MESSAGES.extensionWasNotPresentInClientHandshake(e.getName(), clientNegotiation.getSupportedExtensions());
351 }
352 extensions.add(ExtensionImpl.create(e));
353 }
354 ConfiguredClientEndpoint configured = clientEndpoints.get(endpointInstance.getClass());
355 Endpoint instance = endpointInstance;
356 if(configured == null) {
357 synchronized (clientEndpoints) {
358
359 configured = getClientEndpoint(endpointInstance.getClass(), false);
360 if(configured == null) {
361
362 clientEndpoints.put(endpointInstance.getClass(), configured = new ConfiguredClientEndpoint());
363 } else {
364
365 instance = configured.getFactory().createInstance(new ImmediateInstanceHandle<>(endpointInstance));
366 }
367 }
368 }
369
370 EncodingFactory encodingFactory = EncodingFactory.createFactory(classIntrospecter, cec.getDecoders(), cec.getEncoders());
371 UndertowSession undertowSession = new UndertowSession(channel, connectionBuilder.getUri(), Collections.<String, String>emptyMap(), Collections.<String, List<String>>emptyMap(), sessionHandler, null, new ImmediateInstanceHandle<>(endpointInstance), cec, connectionBuilder.getUri().getQuery(), encodingFactory.createEncoding(cec), configured, clientNegotiation.getSelectedSubProtocol(), extensions, connectionBuilder);
372 instance.onOpen(undertowSession, cec);
373 channel.resumeReceives();
374
375 return undertowSession;
376 }
377
378
379 @Override
380 public Session connectToServer(final Class<? extends Endpoint> endpointClass, final ClientEndpointConfig cec, final URI path) throws DeploymentException, IOException {
381 if(closed) {
382 throw new ClosedChannelException();
383 }
384 try {
385 Endpoint endpoint = classIntrospecter.createInstanceFactory(endpointClass).createInstance().getInstance();
386 return connectToServer(endpoint, cec, path);
387 } catch (InstantiationException | NoSuchMethodException e) {
388 throw new RuntimeException(e);
389 }
390 }
391
392
393 public void doUpgrade(HttpServletRequest request,
394 HttpServletResponse response, final ServerEndpointConfig sec,
395 Map<String,String> pathParams)
396 throws ServletException, IOException {
397 ServerEndpointConfig.Configurator configurator = sec.getConfigurator();
398 try {
399 EncodingFactory encodingFactory = EncodingFactory.createFactory(classIntrospecter, sec.getDecoders(), sec.getEncoders());
400 PathTemplate pt = PathTemplate.create(sec.getPath());
401
402 InstanceFactory<?> instanceFactory = null;
403 try {
404 instanceFactory = classIntrospecter.createInstanceFactory(sec.getEndpointClass());
405 } catch (Exception e) {
406
407 if (configurator == null || configurator.getClass() == ServerEndpointConfig.Configurator.class) {
408 throw JsrWebSocketMessages.MESSAGES.couldNotDeploy(e);
409 } else {
410 instanceFactory = new InstanceFactory<Object>() {
411 @Override
412 public InstanceHandle<Object> createInstance() throws InstantiationException {
413 throw JsrWebSocketMessages.MESSAGES.endpointDoesNotHaveAppropriateConstructor(sec.getEndpointClass());
414 }
415 };
416 }
417 }
418 if (configurator == null) {
419 configurator = DefaultContainerConfigurator.INSTANCE;
420 }
421
422 ServerEndpointConfig config = ServerEndpointConfig.Builder.create(sec.getEndpointClass(), sec.getPath())
423 .decoders(sec.getDecoders())
424 .encoders(sec.getEncoders())
425 .subprotocols(sec.getSubprotocols())
426 .extensions(sec.getExtensions())
427 .configurator(configurator)
428 .build();
429
430
431 AnnotatedEndpointFactory annotatedEndpointFactory = null;
432 if(!Endpoint.class.isAssignableFrom(sec.getEndpointClass())) {
433 annotatedEndpointFactory = AnnotatedEndpointFactory.create(sec.getEndpointClass(), encodingFactory, pt.getParameterNames());
434 }
435
436
437 ConfiguredServerEndpoint confguredServerEndpoint;
438 if(annotatedEndpointFactory == null) {
439 confguredServerEndpoint = new ConfiguredServerEndpoint(config, instanceFactory, null, encodingFactory);
440 } else {
441 confguredServerEndpoint = new ConfiguredServerEndpoint(config, instanceFactory, null, encodingFactory, annotatedEndpointFactory, installedExtensions);
442 }
443 WebSocketHandshakeHolder hand;
444
445 WebSocketDeploymentInfo info = (WebSocketDeploymentInfo)request.getServletContext().getAttribute(WebSocketDeploymentInfo.ATTRIBUTE_NAME);
446 if (info == null || info.getExtensions() == null) {
447 hand = ServerWebSocketContainer.handshakes(confguredServerEndpoint);
448 } else {
449 hand = ServerWebSocketContainer.handshakes(confguredServerEndpoint, info.getExtensions());
450 }
451
452 final ServletWebSocketHttpExchange facade = new ServletWebSocketHttpExchange(request, response, new HashSet<WebSocketChannel>());
453 Handshake handshaker = null;
454 for (Handshake method : hand.handshakes) {
455 if (method.matches(facade)) {
456 handshaker = method;
457 break;
458 }
459 }
460
461 if (handshaker != null) {
462 if(isClosed()) {
463 response.sendError(StatusCodes.SERVICE_UNAVAILABLE);
464 return;
465 }
466 facade.putAttachment(HandshakeUtil.PATH_PARAMS, pathParams);
467 final Handshake selected = handshaker;
468 facade.upgradeChannel(new HttpUpgradeListener() {
469 @Override
470 public void handleUpgrade(StreamConnection streamConnection, HttpServerExchange exchange) {
471 WebSocketChannel channel = selected.createChannel(facade, streamConnection, facade.getBufferPool());
472 new EndpointSessionHandler(ServerWebSocketContainer.this).onConnect(facade, channel);
473 }
474 });
475 handshaker.handshake(facade);
476 return;
477 }
478 } catch (Exception e) {
479 throw new ServletException(e);
480 }
481 }
482
483 private Session connectToServerInternal(final Endpoint endpointInstance, XnioSsl ssl, final ConfiguredClientEndpoint cec, final URI path) throws DeploymentException, IOException {
484
485 WebSocketClientNegotiation clientNegotiation = new ClientNegotiation(cec.getConfig().getPreferredSubprotocols(), toExtensionList(cec.getConfig().getExtensions()), cec.getConfig());
486
487
488 WebSocketClient.ConnectionBuilder connectionBuilder = WebSocketClient.connectionBuilder(xnioWorker.get(), bufferPool, path)
489 .setSsl(ssl)
490 .setBindAddress(clientBindAddress)
491 .setClientNegotiation(clientNegotiation);
492 return connectToServerInternal(endpointInstance, cec, connectionBuilder);
493 }
494
495 private Session connectToServerInternal(final Endpoint endpointInstance, final ConfiguredClientEndpoint cec, WebSocketClient.ConnectionBuilder connectionBuilder) throws DeploymentException, IOException {
496
497 IoFuture<WebSocketChannel> session = connectionBuilder
498 .connect();
499 Number timeout = (Number) cec.getConfig().getUserProperties().get(TIMEOUT);
500 IoFuture.Status result = session.await(timeout == null ? DEFAULT_WEB_SOCKET_TIMEOUT_SECONDS : timeout.intValue(), TimeUnit.SECONDS);
501 if(result == IoFuture.Status.WAITING) {
502
503
504 session.cancel();
505 session.addNotifier(new IoFuture.HandlingNotifier<WebSocketChannel, Object>() {
506 @Override
507 public void handleDone(WebSocketChannel data, Object attachment) {
508 IoUtils.safeClose(data);
509 }
510 }, null);
511 throw JsrWebSocketMessages.MESSAGES.connectionTimedOut();
512 }
513
514 WebSocketChannel channel;
515 try {
516 channel = session.get();
517 } catch (UpgradeFailedException e) {
518 throw new DeploymentException(e.getMessage(), e);
519 }
520 EndpointSessionHandler sessionHandler = new EndpointSessionHandler(this);
521
522 final List<Extension> extensions = new ArrayList<>();
523 final Map<String, Extension> extMap = new HashMap<>();
524 for (Extension ext : cec.getConfig().getExtensions()) {
525 extMap.put(ext.getName(), ext);
526 }
527 String subProtocol = null;
528 if(connectionBuilder.getClientNegotiation() != null) {
529 for (WebSocketExtension e : connectionBuilder.getClientNegotiation().getSelectedExtensions()) {
530 Extension ext = extMap.get(e.getName());
531 if (ext == null) {
532 throw JsrWebSocketMessages.MESSAGES.extensionWasNotPresentInClientHandshake(e.getName(), connectionBuilder.getClientNegotiation().getSupportedExtensions());
533 }
534 extensions.add(ExtensionImpl.create(e));
535 }
536 subProtocol = connectionBuilder.getClientNegotiation().getSelectedSubProtocol();
537 }
538
539 UndertowSession undertowSession = new UndertowSession(channel, connectionBuilder.getUri(), Collections.<String, String>emptyMap(), Collections.<String, List<String>>emptyMap(), sessionHandler, null, new ImmediateInstanceHandle<>(endpointInstance), cec.getConfig(), connectionBuilder.getUri().getQuery(), cec.getEncodingFactory().createEncoding(cec.getConfig()), cec, subProtocol, extensions, connectionBuilder);
540 endpointInstance.onOpen(undertowSession, cec.getConfig());
541 channel.resumeReceives();
542
543 return undertowSession;
544 }
545
546 @Override
547 public long getDefaultMaxSessionIdleTimeout() {
548 return defaultMaxSessionIdleTimeout;
549 }
550
551 @Override
552 public void setDefaultMaxSessionIdleTimeout(final long timeout) {
553 this.defaultMaxSessionIdleTimeout = timeout;
554 }
555
556 @Override
557 public int getDefaultMaxBinaryMessageBufferSize() {
558 return defaultMaxBinaryMessageBufferSize;
559 }
560
561 @Override
562 public void setDefaultMaxBinaryMessageBufferSize(int defaultMaxBinaryMessageBufferSize) {
563 this.defaultMaxBinaryMessageBufferSize = defaultMaxBinaryMessageBufferSize;
564 }
565
566 @Override
567 public int getDefaultMaxTextMessageBufferSize() {
568 return defaultMaxTextMessageBufferSize;
569 }
570
571 @Override
572 public void setDefaultMaxTextMessageBufferSize(int defaultMaxTextMessageBufferSize) {
573 this.defaultMaxTextMessageBufferSize = defaultMaxTextMessageBufferSize;
574 }
575
576 @Override
577 public Set<Extension> getInstalledExtensions() {
578 return new HashSet<>(installedExtensions);
579 }
580
581
591 public void invokeEndpointMethod(final Executor executor, final Runnable invocation) {
592 if (dispatchToWorker) {
593 executor.execute(new Runnable() {
594 @Override
595 public void run() {
596 invokeEndpointMethod(invocation);
597 }
598 });
599 } else {
600 invokeEndpointMethod(invocation);
601 }
602 }
603
604
608 public void invokeEndpointMethod(final Runnable invocation) {
609 try {
610 invokeEndpointTask.call(null, invocation);
611 } catch (Exception e) {
612 throw new RuntimeException(e);
613 }
614 }
615
616 @Override
617 public void addEndpoint(final Class<?> endpoint) throws DeploymentException {
618 if (deploymentComplete) {
619 throw JsrWebSocketMessages.MESSAGES.cannotAddEndpointAfterDeployment();
620 }
621
622
623 if(annotatedEndpointClasses.contains(endpoint)) {
624 return;
625 }
626 annotatedEndpointClasses.add(endpoint);
627 try {
628 addEndpointInternal(endpoint, true);
629 } catch (DeploymentException e) {
630 deploymentExceptions.add(e);
631 throw e;
632 }
633 }
634
635 private synchronized void addEndpointInternal(final Class<?> endpoint, boolean requiresCreation) throws DeploymentException {
636 ServerEndpoint serverEndpoint = endpoint.getAnnotation(ServerEndpoint.class);
637 ClientEndpoint clientEndpoint = endpoint.getAnnotation(ClientEndpoint.class);
638 if (serverEndpoint != null) {
639 JsrWebSocketLogger.ROOT_LOGGER.addingAnnotatedServerEndpoint(endpoint, serverEndpoint.value());
640 final PathTemplate template = PathTemplate.create(serverEndpoint.value());
641 if (seenPaths.contains(template)) {
642 PathTemplate existing = null;
643 for (PathTemplate p : seenPaths) {
644 if (p.compareTo(template) == 0) {
645 existing = p;
646 break;
647 }
648 }
649 throw JsrWebSocketMessages.MESSAGES.multipleEndpointsWithOverlappingPaths(template, existing);
650 }
651 seenPaths.add(template);
652 Class<? extends ServerEndpointConfig.Configurator> configuratorClass = serverEndpoint.configurator();
653
654 EncodingFactory encodingFactory = EncodingFactory.createFactory(classIntrospecter, serverEndpoint.decoders(), serverEndpoint.encoders());
655 AnnotatedEndpointFactory annotatedEndpointFactory = AnnotatedEndpointFactory.create(endpoint, encodingFactory, template.getParameterNames());
656 InstanceFactory<?> instanceFactory = null;
657 try {
658 instanceFactory = classIntrospecter.createInstanceFactory(endpoint);
659 } catch (Exception e) {
660
661 if(configuratorClass == ServerEndpointConfig.Configurator.class) {
662 throw JsrWebSocketMessages.MESSAGES.couldNotDeploy(e);
663 } else {
664 instanceFactory = new InstanceFactory<Object>() {
665 @Override
666 public InstanceHandle<Object> createInstance() throws InstantiationException {
667 throw JsrWebSocketMessages.MESSAGES.endpointDoesNotHaveAppropriateConstructor(endpoint);
668 }
669 };
670 }
671 }
672 ServerEndpointConfig.Configurator configurator;
673 if (configuratorClass != ServerEndpointConfig.Configurator.class) {
674 try {
675 configurator = classIntrospecter.createInstanceFactory(configuratorClass).createInstance().getInstance();
676 } catch (InstantiationException | NoSuchMethodException e) {
677 throw JsrWebSocketMessages.MESSAGES.couldNotDeploy(e);
678 }
679 } else {
680 configurator = DefaultContainerConfigurator.INSTANCE;
681 }
682
683 ServerEndpointConfig config = ServerEndpointConfig.Builder.create(endpoint, serverEndpoint.value())
684 .decoders(Arrays.asList(serverEndpoint.decoders()))
685 .encoders(Arrays.asList(serverEndpoint.encoders()))
686 .subprotocols(Arrays.asList(serverEndpoint.subprotocols()))
687 .extensions(Collections.<Extension>emptyList())
688 .configurator(configurator)
689 .build();
690
691
692 ConfiguredServerEndpoint confguredServerEndpoint = new ConfiguredServerEndpoint(config, instanceFactory, template, encodingFactory, annotatedEndpointFactory, installedExtensions);
693 configuredServerEndpoints.add(confguredServerEndpoint);
694 handleAddingFilterMapping();
695 } else if (clientEndpoint != null) {
696 JsrWebSocketLogger.ROOT_LOGGER.addingAnnotatedClientEndpoint(endpoint);
697 EncodingFactory encodingFactory = EncodingFactory.createFactory(classIntrospecter, clientEndpoint.decoders(), clientEndpoint.encoders());
698 InstanceFactory<?> instanceFactory;
699 try {
700 instanceFactory = classIntrospecter.createInstanceFactory(endpoint);
701 } catch (Exception e) {
702 try {
703 instanceFactory = new ConstructorInstanceFactory<>(endpoint.getConstructor());
704 } catch (NoSuchMethodException e1) {
705 if(requiresCreation) {
706 throw JsrWebSocketMessages.MESSAGES.couldNotDeploy(e);
707 } else {
708 instanceFactory = new InstanceFactory<Object>() {
709 @Override
710 public InstanceHandle<Object> createInstance() throws InstantiationException {
711 throw new InstantiationException();
712 }
713 };
714 }
715 }
716 }
717 AnnotatedEndpointFactory factory = AnnotatedEndpointFactory.create(endpoint, encodingFactory, Collections.<String>emptySet());
718
719 ClientEndpointConfig.Configurator configurator = null;
720 try {
721 configurator = classIntrospecter.createInstanceFactory(clientEndpoint.configurator()).createInstance().getInstance();
722 } catch (InstantiationException | NoSuchMethodException e) {
723 throw JsrWebSocketMessages.MESSAGES.couldNotDeploy(e);
724 }
725 ClientEndpointConfig config = ClientEndpointConfig.Builder.create()
726 .decoders(Arrays.asList(clientEndpoint.decoders()))
727 .encoders(Arrays.asList(clientEndpoint.encoders()))
728 .preferredSubprotocols(Arrays.asList(clientEndpoint.subprotocols()))
729 .configurator(configurator)
730 .build();
731
732 ConfiguredClientEndpoint configuredClientEndpoint = new ConfiguredClientEndpoint(config, factory, encodingFactory, instanceFactory);
733 clientEndpoints.put(endpoint, configuredClientEndpoint);
734 } else {
735 throw JsrWebSocketMessages.MESSAGES.classWasNotAnnotated(endpoint);
736 }
737 }
738
739
740 private void handleAddingFilterMapping() {
741 if (contextToAddFilter != null) {
742 contextToAddFilter.getDeployment().getDeploymentInfo().addFilterUrlMapping(Bootstrap.FILTER_NAME, ", DispatcherType.REQUEST);
743 contextToAddFilter.getDeployment().getServletPaths().invalidate();
744 contextToAddFilter = null;
745 }
746 }
747
748 @Override
749 public void addEndpoint(final ServerEndpointConfig endpoint) throws DeploymentException {
750 if (deploymentComplete) {
751 throw JsrWebSocketMessages.MESSAGES.cannotAddEndpointAfterDeployment();
752 }
753 JsrWebSocketLogger.ROOT_LOGGER.addingProgramaticEndpoint(endpoint.getEndpointClass(), endpoint.getPath());
754 final PathTemplate template = PathTemplate.create(endpoint.getPath());
755 if (seenPaths.contains(template)) {
756 PathTemplate existing = null;
757 for (PathTemplate p : seenPaths) {
758 if (p.compareTo(template) == 0) {
759 existing = p;
760 break;
761 }
762 }
763 throw JsrWebSocketMessages.MESSAGES.multipleEndpointsWithOverlappingPaths(template, existing);
764 }
765 seenPaths.add(template);
766 EncodingFactory encodingFactory = EncodingFactory.createFactory(classIntrospecter, endpoint.getDecoders(), endpoint.getEncoders());
767
768 AnnotatedEndpointFactory annotatedEndpointFactory = null;
769 if(!Endpoint.class.isAssignableFrom(endpoint.getEndpointClass())) {
770
771 annotatedEndpointFactory = AnnotatedEndpointFactory.create(endpoint.getEndpointClass(), encodingFactory, template.getParameterNames());
772 }
773 ConfiguredServerEndpoint confguredServerEndpoint = new ConfiguredServerEndpoint(endpoint, null, template, encodingFactory, annotatedEndpointFactory, endpoint.getExtensions());
774 configuredServerEndpoints.add(confguredServerEndpoint);
775 handleAddingFilterMapping();
776 }
777
778
779 private ConfiguredClientEndpoint getClientEndpoint(final Class<?> endpointType, boolean requiresCreation) {
780 Class<?> type = endpointType;
781 while (type != Object.class && type != null && !type.isAnnotationPresent(ClientEndpoint.class)) {
782 type = type.getSuperclass();
783 }
784 if(type == Object.class || type == null) {
785 return null;
786 }
787
788 ConfiguredClientEndpoint existing = clientEndpoints.get(type);
789 if (existing != null) {
790 return existing;
791 }
792 synchronized (this) {
793 existing = clientEndpoints.get(type);
794 if (existing != null) {
795 return existing;
796 }
797 if (type.isAnnotationPresent(ClientEndpoint.class)) {
798 try {
799 addEndpointInternal(type, requiresCreation);
800 return clientEndpoints.get(type);
801 } catch (DeploymentException e) {
802 throw new RuntimeException(e);
803 }
804 }
805 return null;
806 }
807 }
808
809
810
811 public void validateDeployment() {
812 if(!deploymentExceptions.isEmpty()) {
813 RuntimeException e = JsrWebSocketMessages.MESSAGES.deploymentFailedDueToProgramaticErrors();
814 for(DeploymentException ex : deploymentExceptions) {
815 e.addSuppressed(ex);
816 }
817 throw e;
818 }
819 }
820
821 public void deploymentComplete() {
822 deploymentComplete = true;
823 validateDeployment();
824 }
825
826 public List<ConfiguredServerEndpoint> getConfiguredServerEndpoints() {
827 return configuredServerEndpoints;
828 }
829
830 public ServletContextImpl getContextToAddFilter() {
831 return contextToAddFilter;
832 }
833
834 public void setContextToAddFilter(ServletContextImpl contextToAddFilter) {
835 this.contextToAddFilter = contextToAddFilter;
836 }
837
838 public synchronized void close(int waitTime) {
839 doClose();
840
841 long end = currentTimeMillis() + waitTime;
842 for (ConfiguredServerEndpoint endpoint : configuredServerEndpoints) {
843 endpoint.awaitClose(end - System.currentTimeMillis());
844 }
845 }
846 @Override
847 public synchronized void close() {
848 close(10000);
849 }
850
851 public ByteBufferPool getBufferPool() {
852 return bufferPool;
853 }
854
855 public XnioWorker getXnioWorker() {
856 return xnioWorker.get();
857 }
858
859 private static List<WebSocketExtension> toExtensionList(final List<Extension> extensions) {
860 List<WebSocketExtension> ret = new ArrayList<>();
861 for (Extension e : extensions) {
862 final List<WebSocketExtension.Parameter> parameters = new ArrayList<>();
863 for (Extension.Parameter p : e.getParameters()) {
864 parameters.add(new WebSocketExtension.Parameter(p.getName(), p.getValue()));
865 }
866 ret.add(new WebSocketExtension(e.getName(), parameters));
867 }
868 return ret;
869 }
870
871 private static class ClientNegotiation extends WebSocketClientNegotiation {
872
873 private final ClientEndpointConfig config;
874
875 ClientNegotiation(List<String> supportedSubProtocols, List<WebSocketExtension> supportedExtensions, ClientEndpointConfig config) {
876 super(supportedSubProtocols, supportedExtensions);
877 this.config = config;
878 }
879
880 @Override
881 public void afterRequest(final Map<String, List<String>> headers) {
882
883 ClientEndpointConfig.Configurator configurator = config.getConfigurator();
884 if (configurator != null) {
885 final Map<String, List<String>> newHeaders = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
886 for (Map.Entry<String, List<String>> entry : headers.entrySet()) {
887 ArrayList<String> arrayList = new ArrayList<>();
888 arrayList.addAll(entry.getValue());
889 newHeaders.put(entry.getKey(), arrayList);
890 }
891 configurator.afterResponse(new HandshakeResponse() {
892 @Override
893 public Map<String, List<String>> getHeaders() {
894 return newHeaders;
895 }
896 });
897 }
898 }
899
900 @Override
901 public void beforeRequest(Map<String, List<String>> headers) {
902 ClientEndpointConfig.Configurator configurator = config.getConfigurator();
903 if (configurator != null) {
904 final Map<String, List<String>> newHeaders = new HashMap<>();
905 for (Map.Entry<String, List<String>> entry : headers.entrySet()) {
906 ArrayList<String> arrayList = new ArrayList<>();
907 arrayList.addAll(entry.getValue());
908 newHeaders.put(entry.getKey(), arrayList);
909 }
910 configurator.beforeRequest(newHeaders);
911 headers.clear();
912 for (Map.Entry<String, List<String>> entry : newHeaders.entrySet()) {
913 if (!entry.getValue().isEmpty()) {
914 headers.put(entry.getKey(), entry.getValue());
915 }
916 }
917 }
918 }
919 }
920
921 /**
922 * Pauses the container
923 * @param listener
924 */
925 public synchronized void pause(PauseListener listener) {
926 closed = true;
927 if(configuredServerEndpoints.isEmpty()) {
928 listener.paused();
929 return;
930 }
931 if(listener != null) {
932 pauseListeners.add(listener);
933 }
934 for (ConfiguredServerEndpoint endpoint : configuredServerEndpoints) {
935 for (final Session session : endpoint.getOpenSessions()) {
936 ((UndertowSession)session).getExecutor().execute(new Runnable() {
937 @Override
938 public void run() {
939 try {
940 session.close(new CloseReason(CloseReason.CloseCodes.GOING_AWAY, ""));
941 } catch (Exception e) {
942 JsrWebSocketLogger.ROOT_LOGGER.couldNotCloseOnUndeploy(e);
943 }
944 }
945 });
946 }
947 }
948
949 Runnable done = new Runnable() {
950
951 int count = configuredServerEndpoints.size();
952
953 @Override
954 public synchronized void run() {
955 List<PauseListener> copy = null;
956 synchronized (ServerWebSocketContainer.this) {
957 count--;
958 if (count == 0) {
959 copy = new ArrayList<>(pauseListeners);
960 pauseListeners.clear();
961 }
962 }
963 if(copy != null) {
964 for (PauseListener p : copy) {
965 p.paused();
966 }
967 }
968 }
969 };
970
971 for (ConfiguredServerEndpoint endpoint : configuredServerEndpoints) {
972 endpoint.notifyClosed(done);
973 }
974 }
975
976 private void doClose() {
977 closed = true;
978 for (ConfiguredServerEndpoint endpoint : configuredServerEndpoints) {
979 for (Session session : endpoint.getOpenSessions()) {
980 try {
981 session.close(new CloseReason(CloseReason.CloseCodes.GOING_AWAY, ""));
982 } catch (Exception e) {
983 JsrWebSocketLogger.ROOT_LOGGER.couldNotCloseOnUndeploy(e);
984 }
985 }
986 }
987 }
988
989 static WebSocketHandshakeHolder handshakes(ConfiguredServerEndpoint config) {
990 List<Handshake> handshakes = new ArrayList<>();
991 handshakes.add(new JsrHybi13Handshake(config));
992 handshakes.add(new JsrHybi08Handshake(config));
993 handshakes.add(new JsrHybi07Handshake(config));
994 return new WebSocketHandshakeHolder(handshakes, config);
995 }
996
997 static WebSocketHandshakeHolder handshakes(ConfiguredServerEndpoint config, List<ExtensionHandshake> extensions) {
998 List<Handshake> handshakes = new ArrayList<>();
999 Handshake jsrHybi13Handshake = new JsrHybi13Handshake(config);
1000 Handshake jsrHybi08Handshake = new JsrHybi08Handshake(config);
1001 Handshake jsrHybi07Handshake = new JsrHybi07Handshake(config);
1002 for (ExtensionHandshake extension : extensions) {
1003 jsrHybi13Handshake.addExtension(extension);
1004 jsrHybi08Handshake.addExtension(extension);
1005 jsrHybi07Handshake.addExtension(extension);
1006 }
1007 handshakes.add(jsrHybi13Handshake);
1008 handshakes.add(jsrHybi08Handshake);
1009 handshakes.add(jsrHybi07Handshake);
1010 return new WebSocketHandshakeHolder(handshakes, config);
1011 }
1012
1013 static final class WebSocketHandshakeHolder {
1014 final List<Handshake> handshakes;
1015 final ConfiguredServerEndpoint endpoint;
1016
1017 private WebSocketHandshakeHolder(List<Handshake> handshakes, ConfiguredServerEndpoint endpoint) {
1018 this.handshakes = handshakes;
1019 this.endpoint = endpoint;
1020 }
1021 }
1022
1025 public synchronized void resume() {
1026 closed = false;
1027 for(PauseListener p : pauseListeners) {
1028 p.resumed();
1029 }
1030 pauseListeners.clear();
1031 }
1032
1033 public WebSocketReconnectHandler getWebSocketReconnectHandler() {
1034 return webSocketReconnectHandler;
1035 }
1036
1037 public boolean isClosed() {
1038 return closed;
1039 }
1040
1041 public interface PauseListener {
1042 void paused();
1043
1044 void resumed();
1045 }
1046
1047 public boolean isDispatchToWorker() {
1048 return dispatchToWorker;
1049 }
1050 }
1051