1 /*
2  * JBoss, Home of Professional Open Source.
3  *
4  * Copyright 2011 Red Hat, Inc. and/or its affiliates, and individual
5  * contributors as indicated by the @author tags.
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *     http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */

19
20 package org.xnio;
21
22 import java.io.IOException;
23 import java.net.InetAddress;
24 import java.net.InetSocketAddress;
25 import java.net.SocketAddress;
26 import java.security.PrivilegedAction;
27 import java.util.Collections;
28 import java.util.List;
29 import java.util.Set;
30 import java.util.concurrent.AbstractExecutorService;
31 import java.util.concurrent.BlockingQueue;
32 import java.util.concurrent.ExecutorService;
33 import java.util.concurrent.LinkedBlockingDeque;
34 import java.util.concurrent.ThreadFactory;
35 import java.util.concurrent.ThreadPoolExecutor;
36 import java.util.concurrent.TimeUnit;
37 import java.util.concurrent.atomic.AtomicInteger;
38 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
39 import java.util.zip.Deflater;
40 import java.util.zip.Inflater;
41
42 import org.jboss.threads.EnhancedQueueExecutor;
43 import org.wildfly.common.Assert;
44 import org.wildfly.common.context.ContextManager;
45 import org.wildfly.common.context.Contextual;
46 import org.wildfly.common.net.CidrAddress;
47 import org.wildfly.common.net.CidrAddressTable;
48 import org.xnio._private.Messages;
49 import org.xnio.channels.AcceptingChannel;
50 import org.xnio.channels.AssembledConnectedMessageChannel;
51 import org.xnio.channels.AssembledConnectedStreamChannel;
52 import org.xnio.channels.BoundChannel;
53 import org.xnio.channels.Configurable;
54 import org.xnio.channels.ConnectedMessageChannel;
55 import org.xnio.channels.ConnectedStreamChannel;
56 import org.xnio.channels.MulticastMessageChannel;
57 import org.xnio.channels.StreamChannel;
58 import org.xnio.channels.StreamSinkChannel;
59 import org.xnio.channels.StreamSourceChannel;
60 import org.xnio.conduits.ConduitStreamSinkChannel;
61 import org.xnio.conduits.ConduitStreamSourceChannel;
62 import org.xnio.conduits.DeflatingStreamSinkConduit;
63 import org.xnio.conduits.InflatingStreamSourceConduit;
64 import org.xnio.conduits.StreamSinkChannelWrappingConduit;
65 import org.xnio.conduits.StreamSourceChannelWrappingConduit;
66 import org.xnio.management.XnioServerMXBean;
67 import org.xnio.management.XnioWorkerMXBean;
68
69 import static java.lang.Math.max;
70 import static java.security.AccessController.doPrivileged;
71 import org.jboss.logging.Logger;
72 import static org.xnio.IoUtils.safeClose;
73 import static org.xnio._private.Messages.msg;
74
75 /**
76  * A worker for I/O channel notification.
77  *
78  * @author <a href="mailto:david.lloyd@redhat.com">David M. Lloyd</a>
79  *
80  * @since 3.0
81  */

82 @SuppressWarnings("unused")
83 public abstract class XnioWorker extends AbstractExecutorService implements Configurable, ExecutorService, XnioIoFactory, Contextual<XnioWorker> {
84
85     private final Xnio xnio;
86     private final TaskPool taskPool;
87     private final String name;
88     private final Runnable terminationTask;
89     private final CidrAddressTable<InetSocketAddress> bindAddressTable;
90
91     private volatile int taskSeq;
92
93     private static final AtomicIntegerFieldUpdater<XnioWorker> taskSeqUpdater = AtomicIntegerFieldUpdater.newUpdater(XnioWorker.class"taskSeq");
94
95     private static final AtomicInteger seq = new AtomicInteger(1);
96
97     private static final RuntimePermission CREATE_WORKER_PERMISSION = new RuntimePermission("createXnioWorker");
98
99     private int getNextSeq() {
100         return taskSeqUpdater.incrementAndGet(this);
101     }
102
103     private static final Logger log = Logger.getLogger("org.xnio");
104
105     /**
106      * Construct a new instance.  Intended to be called only from implementations.
107      *
108      * @param builder the worker builder
109      */

110     protected XnioWorker(final Builder builder) {
111         this.xnio = builder.xnio;
112         this.terminationTask = builder.terminationTask;
113         final SecurityManager sm = System.getSecurityManager();
114         if (sm != null) {
115             sm.checkPermission(CREATE_WORKER_PERMISSION);
116         }
117         String workerName = builder.getWorkerName();
118         if (workerName == null) {
119             workerName = "XNIO-" + seq.getAndIncrement();
120         }
121         name = workerName;
122         final boolean markThreadAsDaemon = builder.isDaemon();
123         bindAddressTable = builder.getBindAddressConfigurations();
124         final Runnable terminationTask = new Runnable() {
125             public void run() {
126                 taskPoolTerminated();
127             }
128         };
129         final ExecutorService executorService = builder.getExternalExecutorService();
130         if (executorService != null) {
131             if (executorService instanceof EnhancedQueueExecutor) {
132                 taskPool = new ExternalTaskPool(
133                         new EnhancedQueueExecutorTaskPool((EnhancedQueueExecutor) executorService));
134             } else if (executorService instanceof ThreadPoolExecutor) {
135                 taskPool = new ExternalTaskPool(new ThreadPoolExecutorTaskPool((ThreadPoolExecutor) executorService));
136             } else {
137                 taskPool = new ExternalTaskPool(new ExecutorServiceTaskPool(executorService));
138             }
139         } else if (EnhancedQueueExecutor.DISABLE_HINT) {
140             final int poolSize = max(builder.getMaxWorkerPoolSize(), builder.getCoreWorkerPoolSize());
141             taskPool = new ThreadPoolExecutorTaskPool(new DefaultThreadPoolExecutor(
142                 poolSize,
143                 poolSize,
144                 builder.getWorkerKeepAlive(), TimeUnit.MILLISECONDS,
145                 new LinkedBlockingDeque<>(),
146                 new WorkerThreadFactory(builder.getThreadGroup(), builder.getWorkerStackSize(), markThreadAsDaemon),
147                 terminationTask));
148         } else {
149             taskPool = new EnhancedQueueExecutorTaskPool(new EnhancedQueueExecutor.Builder()
150                 .setCorePoolSize(builder.getCoreWorkerPoolSize())
151                 .setMaximumPoolSize(builder.getMaxWorkerPoolSize())
152                 .setKeepAliveTime(builder.getWorkerKeepAlive(), TimeUnit.MILLISECONDS)
153                 .setThreadFactory(new WorkerThreadFactory(builder.getThreadGroup(), builder.getWorkerStackSize(), markThreadAsDaemon))
154                 .setTerminationTask(terminationTask)
155                 .setRegisterMBean(true)
156                 .setMBeanName(workerName)
157                 .build()
158             );
159         }
160     }
161
162     //==================================================
163     //
164     // Context methods
165     //
166     //==================================================
167
168     private static final ContextManager<XnioWorker> CONTEXT_MANAGER = doPrivileged((PrivilegedAction<ContextManager<XnioWorker>>) () -> new ContextManager<XnioWorker>(XnioWorker.class"org.xnio.worker"));
169
170     static {
171         doPrivileged((PrivilegedAction<Void>) () -> {
172             CONTEXT_MANAGER.setGlobalDefaultSupplier(() -> DefaultXnioWorkerHolder.INSTANCE);
173             return null;
174         });
175     }
176
177     /**
178      * Get the context manager for XNIO workers.
179      *
180      * @return the context manager (not {@code null})
181      */

182     public static ContextManager<XnioWorker> getContextManager() {
183         return CONTEXT_MANAGER;
184     }
185
186     /**
187      * Get the instance context manager for XNIO workers by delegating to {@link #getContextManager()}.
188      *
189      * @return the context manager (not {@code null})
190      */

191     public ContextManager<XnioWorker> getInstanceContextManager() {
192         return getContextManager();
193     }
194
195     //==================================================
196     //
197     // Stream methods
198     //
199     //==================================================
200
201     // Servers
202
203     /**
204      * Create a stream server, for TCP or UNIX domain servers.  The type of server is determined by the bind address.
205      *
206      * @param bindAddress the address to bind to
207      * @param acceptListener the initial accept listener
208      * @param optionMap the initial configuration for the server
209      * @return the acceptor
210      * @throws IOException if the server could not be created
211      */

212     @Deprecated
213     public AcceptingChannel<? extends ConnectedStreamChannel> createStreamServer(SocketAddress bindAddress, ChannelListener<? super AcceptingChannel<ConnectedStreamChannel>> acceptListener, OptionMap optionMap) throws IOException {
214         final AcceptingChannel<StreamConnection> server = createStreamConnectionServer(bindAddress, null, optionMap);
215         final AcceptingChannel<ConnectedStreamChannel> acceptingChannel = new AcceptingChannel<ConnectedStreamChannel>() {
216             public ConnectedStreamChannel accept() throws IOException {
217                 final StreamConnection connection = server.accept();
218                 return connection == null ? null : new AssembledConnectedStreamChannel(connection, connection.getSourceChannel(), connection.getSinkChannel());
219             }
220
221             public ChannelListener.Setter<? extends AcceptingChannel<ConnectedStreamChannel>> getAcceptSetter() {
222                 return ChannelListeners.getDelegatingSetter(server.getAcceptSetter(), this);
223             }
224
225             public ChannelListener.Setter<? extends AcceptingChannel<ConnectedStreamChannel>> getCloseSetter() {
226                 return ChannelListeners.getDelegatingSetter(server.getCloseSetter(), this);
227             }
228
229             public SocketAddress getLocalAddress() {
230                 return server.getLocalAddress();
231             }
232
233             public <A extends SocketAddress> A getLocalAddress(final Class<A> type) {
234                 return server.getLocalAddress(type);
235             }
236
237             public void suspendAccepts() {
238                 server.suspendAccepts();
239             }
240
241             public void resumeAccepts() {
242                 server.resumeAccepts();
243             }
244
245             public boolean isAcceptResumed() {
246                 return server.isAcceptResumed();
247             }
248
249             public void wakeupAccepts() {
250                 server.wakeupAccepts();
251             }
252
253             public void awaitAcceptable() throws IOException {
254                 server.awaitAcceptable();
255             }
256
257             public void awaitAcceptable(final long time, final TimeUnit timeUnit) throws IOException {
258                 server.awaitAcceptable(time, timeUnit);
259             }
260
261             public XnioWorker getWorker() {
262                 return server.getWorker();
263             }
264
265             @Deprecated
266             public XnioExecutor getAcceptThread() {
267                 return server.getAcceptThread();
268             }
269
270             public XnioIoThread getIoThread() {
271                 return server.getIoThread();
272             }
273
274             public void close() throws IOException {
275                 server.close();
276             }
277
278             public boolean isOpen() {
279                 return server.isOpen();
280             }
281
282             public boolean supportsOption(final Option<?> option) {
283                 return server.supportsOption(option);
284             }
285
286             public <T> T getOption(final Option<T> option) throws IOException {
287                 return server.getOption(option);
288             }
289
290             public <T> T setOption(final Option<T> option, final T value) throws IllegalArgumentException, IOException {
291                 return server.setOption(option, value);
292             }
293         };
294         acceptingChannel.getAcceptSetter().set(acceptListener);
295         return acceptingChannel;
296     }
297
298     /**
299      * Create a stream server, for TCP or UNIX domain servers.  The type of server is determined by the bind address.
300      *
301      * @param bindAddress the address to bind to
302      * @param acceptListener the initial accept listener
303      * @param optionMap the initial configuration for the server
304      * @return the acceptor
305      * @throws IOException if the server could not be created
306      */

307     public AcceptingChannel<StreamConnection> createStreamConnectionServer(SocketAddress bindAddress, ChannelListener<? super AcceptingChannel<StreamConnection>> acceptListener, OptionMap optionMap) throws IOException {
308         Assert.checkNotNullParam("bindAddress", bindAddress);
309         if (bindAddress instanceof InetSocketAddress) {
310             return createTcpConnectionServer((InetSocketAddress) bindAddress, acceptListener, optionMap);
311         } else if (bindAddress instanceof LocalSocketAddress) {
312             return createLocalStreamConnectionServer((LocalSocketAddress) bindAddress, acceptListener, optionMap);
313         } else {
314             throw msg.badSockType(bindAddress.getClass());
315         }
316     }
317
318     /**
319      * Implementation helper method to create a TCP stream server.
320      *
321      * @param bindAddress the address to bind to
322      * @param acceptListener the initial accept listener
323      * @param optionMap the initial configuration for the server
324      * @return the acceptor
325      * @throws IOException if the server could not be created
326      */

327     protected AcceptingChannel<StreamConnection> createTcpConnectionServer(InetSocketAddress bindAddress, ChannelListener<? super AcceptingChannel<StreamConnection>> acceptListener, OptionMap optionMap) throws IOException {
328         throw msg.unsupported("createTcpConnectionServer");
329     }
330
331     /**
332      * Implementation helper method to create a UNIX domain stream server.
333      *
334      * @param bindAddress the address to bind to
335      * @param acceptListener the initial accept listener
336      * @param optionMap the initial configuration for the server
337      * @return the acceptor
338      * @throws IOException if the server could not be created
339      */

340     protected AcceptingChannel<StreamConnection> createLocalStreamConnectionServer(LocalSocketAddress bindAddress, ChannelListener<? super AcceptingChannel<StreamConnection>> acceptListener, OptionMap optionMap) throws IOException {
341         throw msg.unsupported("createLocalStreamConnectionServer");
342     }
343
344     // Connectors
345
346     /**
347      * Connect to a remote stream server.  The protocol family is determined by the type of the socket address given.
348      *
349      * @param destination the destination address
350      * @param openListener the listener which will be notified when the channel is open, or {@code nullfor none
351      * @param optionMap the option map
352      * @return the future result of this operation
353      */

354     @Deprecated
355     public IoFuture<ConnectedStreamChannel> connectStream(SocketAddress destination, ChannelListener<? super ConnectedStreamChannel> openListener, OptionMap optionMap) {
356         final FutureResult<ConnectedStreamChannel> futureResult = new FutureResult<ConnectedStreamChannel>();
357         final ChannelListener<StreamConnection> nestedOpenListener = new StreamConnectionWrapListener(futureResult, openListener);
358         final IoFuture<StreamConnection> future = openStreamConnection(destination, nestedOpenListener, optionMap);
359         future.addNotifier(STREAM_WRAPPING_HANDLER, futureResult);
360         futureResult.addCancelHandler(future);
361         return futureResult.getIoFuture();
362     }
363
364     /**
365      * Connect to a remote stream server.  The protocol family is determined by the type of the socket address given.
366      *
367      * @param destination the destination address
368      * @param openListener the listener which will be notified when the channel is open, or {@code nullfor none
369      * @param bindListener the listener which will be notified when the channel is bound, or {@code nullfor none
370      * @param optionMap the option map
371      * @return the future result of this operation
372      */

373     @Deprecated
374     public IoFuture<ConnectedStreamChannel> connectStream(SocketAddress destination, ChannelListener<? super ConnectedStreamChannel> openListener, ChannelListener<? super BoundChannel> bindListener, OptionMap optionMap) {
375         final FutureResult<ConnectedStreamChannel> futureResult = new FutureResult<ConnectedStreamChannel>();
376         final ChannelListener<StreamConnection> nestedOpenListener = new StreamConnectionWrapListener(futureResult, openListener);
377         final IoFuture<StreamConnection> future = openStreamConnection(destination, nestedOpenListener, bindListener, optionMap);
378         future.addNotifier(STREAM_WRAPPING_HANDLER, futureResult);
379         futureResult.addCancelHandler(future);
380         return futureResult.getIoFuture();
381     }
382
383     /**
384      * Connect to a remote stream server.  The protocol family is determined by the type of the socket addresses given
385      * (which must match).
386      *
387      * @param bindAddress the local address to bind to
388      * @param destination the destination address
389      * @param openListener the listener which will be notified when the channel is open, or {@code nullfor none
390      * @param bindListener the listener which will be notified when the channel is bound, or {@code nullfor none
391      * @param optionMap the option map
392      * @return the future result of this operation
393      */

394     @Deprecated
395     public IoFuture<ConnectedStreamChannel> connectStream(SocketAddress bindAddress, SocketAddress destination, ChannelListener<? super ConnectedStreamChannel> openListener, ChannelListener<? super BoundChannel> bindListener, OptionMap optionMap) {
396         final FutureResult<ConnectedStreamChannel> futureResult = new FutureResult<ConnectedStreamChannel>();
397         final ChannelListener<StreamConnection> nestedOpenListener = new StreamConnectionWrapListener(futureResult, openListener);
398         final IoFuture<StreamConnection> future = openStreamConnection(bindAddress, destination, nestedOpenListener, bindListener, optionMap);
399         future.addNotifier(STREAM_WRAPPING_HANDLER, futureResult);
400         futureResult.addCancelHandler(future);
401         return futureResult.getIoFuture();
402     }
403
404     public IoFuture<StreamConnection> openStreamConnection(SocketAddress destination, ChannelListener<? super StreamConnection> openListener, OptionMap optionMap) {
405         return chooseThread().openStreamConnection(destination, openListener, optionMap);
406     }
407
408     public IoFuture<StreamConnection> openStreamConnection(SocketAddress destination, ChannelListener<? super StreamConnection> openListener, ChannelListener<? super BoundChannel> bindListener, OptionMap optionMap) {
409         return chooseThread().openStreamConnection(destination, openListener, bindListener, optionMap);
410     }
411
412     public IoFuture<StreamConnection> openStreamConnection(SocketAddress bindAddress, SocketAddress destination, ChannelListener<? super StreamConnection> openListener, ChannelListener<? super BoundChannel> bindListener, OptionMap optionMap) {
413         return chooseThread().openStreamConnection(bindAddress, destination, openListener, bindListener, optionMap);
414     }
415
416     // Acceptors
417
418     /**
419      * Accept a stream connection at a destination address.  If a wildcard address is specified, then a destination address
420      * is chosen in a manner specific to the OS and/or channel type.
421      *
422      * @param destination the destination (bind) address
423      * @param openListener the listener which will be notified when the channel is open, or {@code nullfor none
424      * @param bindListener the listener which will be notified when the acceptor is bound, or {@code nullfor none
425      * @param optionMap the option map
426      * @return the future connection
427      */

428     @Deprecated
429     public IoFuture<ConnectedStreamChannel> acceptStream(SocketAddress destination, ChannelListener<? super ConnectedStreamChannel> openListener, ChannelListener<? super BoundChannel> bindListener, OptionMap optionMap) {
430         final FutureResult<ConnectedStreamChannel> futureResult = new FutureResult<ConnectedStreamChannel>();
431         final ChannelListener<StreamConnection> nestedOpenListener = new StreamConnectionWrapListener(futureResult, openListener);
432         final IoFuture<StreamConnection> future = acceptStreamConnection(destination, nestedOpenListener, bindListener, optionMap);
433         future.addNotifier(STREAM_WRAPPING_HANDLER, futureResult);
434         futureResult.addCancelHandler(future);
435         return futureResult.getIoFuture();
436     }
437
438     public IoFuture<StreamConnection> acceptStreamConnection(SocketAddress destination, ChannelListener<? super StreamConnection> openListener, ChannelListener<? super BoundChannel> bindListener, OptionMap optionMap) {
439         return chooseThread().acceptStreamConnection(destination, openListener, bindListener, optionMap);
440     }
441
442     //==================================================
443     //
444     // Message (datagram) channel methods
445     //
446     //==================================================
447
448     /**
449      * Connect to a remote datagram server.  The protocol family is determined by the type of the socket address given.
450      *
451      * @param destination the destination address
452      * @param openListener the listener which will be notified when the channel is open, or {@code nullfor none
453      * @param bindListener the listener which will be notified when the channel is bound, or {@code nullfor none
454      * @param optionMap the option map
455      * @return the future result of this operation
456      */

457     @Deprecated
458     // FIXME XNIO-192 invoke bind listener
459     public IoFuture<ConnectedMessageChannel> connectDatagram(SocketAddress destination, ChannelListener<? super ConnectedMessageChannel> openListener, ChannelListener<? super BoundChannel> bindListener, OptionMap optionMap) {
460         final FutureResult<ConnectedMessageChannel> futureResult = new FutureResult<ConnectedMessageChannel>();
461         final ChannelListener<MessageConnection> nestedOpenListener = new MessageConnectionWrapListener(futureResult, openListener);
462         final IoFuture<MessageConnection> future = openMessageConnection(destination, nestedOpenListener, optionMap);
463         future.addNotifier(MESSAGE_WRAPPING_HANDLER, futureResult);
464         futureResult.addCancelHandler(future);
465         return futureResult.getIoFuture();
466     }
467
468     /**
469      * Connect to a remote datagram server.  The protocol family is determined by the type of the socket addresses given
470      * (which must match).
471      *
472      * @param bindAddress the local address to bind to
473      * @param destination the destination address
474      * @param openListener the listener which will be notified when the channel is open, or {@code nullfor none
475      * @param bindListener the listener which will be notified when the channel is bound, or {@code nullfor none
476      * @param optionMap the option map
477      * @return the future result of this operation
478      */

479     @Deprecated
480     // FIXME bindAddress is now ignored
481     public IoFuture<ConnectedMessageChannel> connectDatagram(SocketAddress bindAddress, SocketAddress destination, ChannelListener<? super ConnectedMessageChannel> openListener, ChannelListener<? super BoundChannel> bindListener, OptionMap optionMap) {
482         final FutureResult<ConnectedMessageChannel> futureResult = new FutureResult<ConnectedMessageChannel>();
483         final ChannelListener<MessageConnection> nestedOpenListener = new MessageConnectionWrapListener(futureResult, openListener);
484         final IoFuture<MessageConnection> future = openMessageConnection(destination, nestedOpenListener, optionMap);
485         future.addNotifier(MESSAGE_WRAPPING_HANDLER, futureResult);
486         futureResult.addCancelHandler(future);
487         return futureResult.getIoFuture();
488     }
489
490     public IoFuture<MessageConnection> openMessageConnection(final SocketAddress destination, final ChannelListener<? super MessageConnection> openListener, final OptionMap optionMap) {
491         return chooseThread().openMessageConnection(destination, openListener, optionMap);
492     }
493
494     // Acceptors
495
496     /**
497      * Accept a message connection at a destination address.  If a wildcard address is specified, then a destination address
498      * is chosen in a manner specific to the OS and/or channel type.
499      *
500      * @param destination the destination (bind) address
501      * @param openListener the listener which will be notified when the channel is open, or {@code nullfor none
502      * @param bindListener the listener which will be notified when the acceptor is bound, or {@code nullfor none
503      * @param optionMap the option map
504      * @return the future connection
505      */

506     @Deprecated
507     public IoFuture<ConnectedMessageChannel> acceptDatagram(SocketAddress destination, ChannelListener<? super ConnectedMessageChannel> openListener, ChannelListener<? super BoundChannel> bindListener, OptionMap optionMap) {
508         final FutureResult<ConnectedMessageChannel> futureResult = new FutureResult<ConnectedMessageChannel>();
509         final ChannelListener<MessageConnection> nestedOpenListener = new MessageConnectionWrapListener(futureResult, openListener);
510         final IoFuture<MessageConnection> future = acceptMessageConnection(destination, nestedOpenListener, bindListener, optionMap);
511         future.addNotifier(MESSAGE_WRAPPING_HANDLER, futureResult);
512         futureResult.addCancelHandler(future);
513         return futureResult.getIoFuture();
514     }
515
516     public IoFuture<MessageConnection> acceptMessageConnection(final SocketAddress destination, final ChannelListener<? super MessageConnection> openListener, final ChannelListener<? super BoundChannel> bindListener, final OptionMap optionMap) {
517         return chooseThread().acceptMessageConnection(destination, openListener, bindListener, optionMap);
518     }
519
520     //==================================================
521     //
522     // UDP methods
523     //
524     //==================================================
525
526     /**
527      * Create a UDP server.  The UDP server can be configured to be multicast-capable; this should only be
528      * done if multicast is needed, since some providers have a performance penalty associated with multicast.
529      * The provider's default executor will be used to execute listener methods.
530      *
531      * @param bindAddress the bind address
532      * @param bindListener the initial open-connection listener
533      * @param optionMap the initial configuration for the server
534      * @return the UDP server channel
535      * @throws java.io.IOException if the server could not be created
536      *
537      * @since 3.0
538      */

539     public MulticastMessageChannel createUdpServer(InetSocketAddress bindAddress, ChannelListener<? super MulticastMessageChannel> bindListener, OptionMap optionMap) throws IOException {
540         throw msg.unsupported("createUdpServer");
541     }
542
543     /**
544      * Create a UDP server.  The UDP server can be configured to be multicast-capable; this should only be
545      * done if multicast is needed, since some providers have a performance penalty associated with multicast.
546      * The provider's default executor will be used to execute listener methods.
547      *
548      * @param bindAddress the bind address
549      * @param optionMap the initial configuration for the server
550      * @return the UDP server channel
551      * @throws java.io.IOException if the server could not be created
552      *
553      * @since 3.0
554      */

555     public MulticastMessageChannel createUdpServer(InetSocketAddress bindAddress, OptionMap optionMap) throws IOException {
556         return createUdpServer(bindAddress, ChannelListeners.nullChannelListener(), optionMap);
557     }
558
559     //==================================================
560     //
561     // Stream pipe methods
562     //
563     //==================================================
564
565     /**
566      * Open a bidirectional stream pipe.
567      *
568      * @param leftOpenListener the left-hand open listener
569      * @param rightOpenListener the right-hand open listener
570      * @param optionMap the pipe channel configuration
571      * @throws java.io.IOException if the pipe could not be created
572      * @deprecated Users should prefer the simpler {@link #createFullDuplexPipe()} instead.
573      */

574     @Deprecated
575     public void createPipe(ChannelListener<? super StreamChannel> leftOpenListener, ChannelListener<? super StreamChannel> rightOpenListener, final OptionMap optionMap) throws IOException {
576         final ChannelPipe<StreamChannel, StreamChannel> pipe = createFullDuplexPipe();
577         final boolean establishWriting = optionMap.get(Options.WORKER_ESTABLISH_WRITING, false);
578         final StreamChannel left = pipe.getLeftSide();
579         XnioExecutor leftExec = establishWriting ? left.getWriteThread() : left.getReadThread();
580         final StreamChannel right = pipe.getRightSide();
581         XnioExecutor rightExec = establishWriting ? right.getWriteThread() : right.getReadThread();
582         // not unsafe - http://youtrack.jetbrains.net/issue/IDEA-59290
583         //noinspection unchecked
584         leftExec.execute(ChannelListeners.getChannelListenerTask(left, leftOpenListener));
585         // not unsafe - http://youtrack.jetbrains.net/issue/IDEA-59290
586         //noinspection unchecked
587         rightExec.execute(ChannelListeners.getChannelListenerTask(right, rightOpenListener));
588     }
589
590     /**
591      * Open a unidirectional stream pipe.
592      *
593      * @param sourceListener the source open listener
594      * @param sinkListener the sink open listener
595      * @param optionMap the pipe channel configuration
596      * @throws java.io.IOException if the pipe could not be created
597      * @deprecated Users should prefer the simpler {@link #createHalfDuplexPipe()} instead.
598      */

599     @Deprecated
600     public void createOneWayPipe(ChannelListener<? super StreamSourceChannel> sourceListener, ChannelListener<? super StreamSinkChannel> sinkListener, final OptionMap optionMap) throws IOException {
601         final ChannelPipe<StreamSourceChannel, StreamSinkChannel> pipe = createHalfDuplexPipe();
602         final StreamSourceChannel left = pipe.getLeftSide();
603         XnioExecutor leftExec = left.getReadThread();
604         final StreamSinkChannel right = pipe.getRightSide();
605         XnioExecutor rightExec = right.getWriteThread();
606         // not unsafe - http://youtrack.jetbrains.net/issue/IDEA-59290
607         //noinspection unchecked
608         leftExec.execute(ChannelListeners.getChannelListenerTask(left, sourceListener));
609         // not unsafe - http://youtrack.jetbrains.net/issue/IDEA-59290
610         //noinspection unchecked
611         rightExec.execute(ChannelListeners.getChannelListenerTask(right, sinkListener));
612     }
613
614     //==================================================
615     //
616     // Compression methods
617     //
618     //==================================================
619
620     /**
621      * Create a stream channel that decompresses the source data according to the configuration in the given option map.
622      *
623      * @param delegate the compressed channel
624      * @param options the configuration options for the channel
625      * @return a decompressed channel
626      * @throws IOException if the channel could not be constructed
627      */

628     public StreamSourceChannel getInflatingChannel(final StreamSourceChannel delegate, OptionMap options) throws IOException {
629         final boolean nowrap;
630         switch (options.get(Options.COMPRESSION_TYPE, CompressionType.DEFLATE)) {
631             case DEFLATE: nowrap = falsebreak;
632             case GZIP: nowrap = truebreak;
633             defaultthrow msg.badCompressionFormat();
634         }
635         return getInflatingChannel(delegate, new Inflater(nowrap));
636     }
637
638     /**
639      * Create a stream channel that decompresses the source data according to the configuration in the given inflater.
640      *
641      * @param delegate the compressed channel
642      * @param inflater the inflater to use
643      * @return a decompressed channel
644      * @throws IOException if the channel could not be constructed
645      */

646     protected StreamSourceChannel getInflatingChannel(final StreamSourceChannel delegate, final Inflater inflater) throws IOException {
647         return new ConduitStreamSourceChannel(Configurable.EMPTY, new InflatingStreamSourceConduit(new StreamSourceChannelWrappingConduit(delegate), inflater));
648     }
649
650     /**
651      * Create a stream channel that compresses to the destination according to the configuration in the given option map.
652      *
653      * @param delegate the channel to compress to
654      * @param options the configuration options for the channel
655      * @return a compressed channel
656      * @throws IOException if the channel could not be constructed
657      */

658     public StreamSinkChannel getDeflatingChannel(final StreamSinkChannel delegate, final OptionMap options) throws IOException {
659         final int level = options.get(Options.COMPRESSION_LEVEL, -1);
660         final boolean nowrap;
661         switch (options.get(Options.COMPRESSION_TYPE, CompressionType.DEFLATE)) {
662             case DEFLATE: nowrap = falsebreak;
663             case GZIP: nowrap = truebreak;
664             defaultthrow msg.badCompressionFormat();
665         }
666         return getDeflatingChannel(delegate, new Deflater(level, nowrap));
667     }
668
669     /**
670      * Create a stream channel that compresses to the destination according to the configuration in the given inflater.
671      *
672      * @param delegate the channel to compress to
673      * @param deflater the deflater to use
674      * @return a compressed channel
675      * @throws IOException if the channel could not be constructed
676      */

677     protected StreamSinkChannel getDeflatingChannel(final StreamSinkChannel delegate, final Deflater deflater) throws IOException {
678         return new ConduitStreamSinkChannel(Configurable.EMPTY, new DeflatingStreamSinkConduit(new StreamSinkChannelWrappingConduit(delegate), deflater));
679     }
680
681     public ChannelPipe<StreamChannel, StreamChannel> createFullDuplexPipe() throws IOException {
682         return chooseThread().createFullDuplexPipe();
683     }
684
685     public ChannelPipe<StreamConnection, StreamConnection> createFullDuplexPipeConnection() throws IOException {
686         return chooseThread().createFullDuplexPipeConnection();
687     }
688
689     public ChannelPipe<StreamSourceChannel, StreamSinkChannel> createHalfDuplexPipe() throws IOException {
690         return chooseThread().createHalfDuplexPipe();
691     }
692
693     public ChannelPipe<StreamConnection, StreamConnection> createFullDuplexPipeConnection(final XnioIoFactory peer) throws IOException {
694         return chooseThread().createFullDuplexPipeConnection(peer);
695     }
696
697     public ChannelPipe<StreamSourceChannel, StreamSinkChannel> createHalfDuplexPipe(final XnioIoFactory peer) throws IOException {
698         return chooseThread().createHalfDuplexPipe(peer);
699     }
700
701     //==================================================
702     //
703     // State methods
704     //
705     //==================================================
706
707     /**
708      * Shut down this worker.  This method returns immediately.  Upon return worker shutdown will have
709      * commenced but not necessarily completed.  When worker shutdown is complete, the termination task (if one was
710      * defined) will be executed.
711      */

712     public abstract void shutdown();
713
714     /**
715      * Immediately terminate the worker.  Any outstanding tasks are collected and returned in a list.  Upon return
716      * worker shutdown will have commenced but not necessarily completed; however the worker will only complete its
717      * current tasks instead of completing all tasks.
718      *
719      * @return the list of outstanding tasks
720      */

721     public abstract List<Runnable> shutdownNow();
722
723     /**
724      * Determine whether the worker has been shut down.  Will return {@code true} once either shutdown method has
725      * been called.
726      *
727      * @return {@code true} the worker has been shut down
728      */

729     public abstract boolean isShutdown();
730
731     /**
732      * Determine whether the worker has terminated.  Will return {@code true} once all worker threads are exited
733      * (with the possible exception of the thread running the termination task, if any).
734      *
735      * @return {@code trueif the worker is terminated
736      */

737     public abstract boolean isTerminated();
738
739     /**
740      * Wait for termination.
741      *
742      * @param timeout the amount of time to wait
743      * @param unit the unit of time
744      * @return {@code trueif termination completed before the timeout expired
745      * @throws InterruptedException if the operation was interrupted
746      */

747     public abstract boolean awaitTermination(final long timeout, final TimeUnit unit) throws InterruptedException;
748
749     /**
750      * Wait for termination.
751      *
752      * @throws InterruptedException if the operation was interrupted
753      */

754     public abstract void awaitTermination() throws InterruptedException;
755
756     //==================================================
757     //
758     // Thread pool methods
759     //
760     //==================================================
761
762     /**
763      * Get an I/O thread from this worker.  The thread may be chosen based on arbitrary rules.
764      *
765      * @return the I/O thread
766      */

767     public final XnioIoThread getIoThread() {
768         return chooseThread();
769     }
770
771     /**
772      * Get an I/O thread from this worker.  The thread is chosen based on the given hash code.
773      *
774      * @param hashCode the hash code
775      * @return the thread
776      */

777     public abstract XnioIoThread getIoThread(int hashCode);
778
779     /**
780      * Get the user task to run once termination is complete.
781      *
782      * @return the termination task
783      */

784     protected Runnable getTerminationTask() {
785         return terminationTask;
786     }
787
788     /**
789      * Callback to indicate that the task thread pool has terminated.  Not called if the task pool is external.
790      */

791     protected void taskPoolTerminated() {}
792
793     /**
794      * Initiate shutdown of the task thread pool.  When all the tasks and threads have completed,
795      * the {@link #taskPoolTerminated()} method is called.
796      */

797     protected void shutDownTaskPool() {
798         if (isTaskPoolExternal()) {
799             taskPoolTerminated();
800         } else {
801             doPrivileged(new PrivilegedAction<Object>() {
802                 public Object run() {
803                     taskPool.shutdown();
804                     return null;
805                 }
806             });
807         }
808     }
809
810     /**
811      * Shut down the task thread pool immediately and return its pending tasks.
812      *
813      * @return the pending task list
814      */

815     protected List<Runnable> shutDownTaskPoolNow() {
816         if (! isTaskPoolExternal()) return doPrivileged(new PrivilegedAction<List<Runnable>>() {
817             public List<Runnable> run() {
818                 return taskPool.shutdownNow();
819             }
820         });
821         return Collections.emptyList();
822     }
823
824     /**
825      * Determine whether the worker task pool is managed externally.  Externally managed task pools will never
826      * respond to shut down requests.
827      *
828      * @return {@code trueif the task pool is externally managed, {@code false} otherwise
829      */

830     protected boolean isTaskPoolExternal() {
831         return taskPool instanceof ExternalTaskPool;
832     }
833
834     /**
835      * Execute a command in the task pool.
836      *
837      * @param command the command to run
838      */

839     public void execute(final Runnable command) {
840         taskPool.execute(command);
841     }
842
843     /**
844      * Get the number of I/O threads configured on this worker.
845      *
846      * @return the number of I/O threads configured on this worker
847      */

848     public abstract int getIoThreadCount();
849
850     //==================================================
851     //
852     // Configuration methods
853     //
854     //==================================================
855
856     private final static Set<Option<?>> OPTIONS = Option.setBuilder()
857             .add(Options.WORKER_TASK_CORE_THREADS)
858             .add(Options.WORKER_TASK_MAX_THREADS)
859             .add(Options.WORKER_TASK_KEEPALIVE)
860             .create();
861
862     private final static Set<Option<?>> EXTERNAL_POOL_OPTIONS = Option.setBuilder()
863             .create();
864
865     public boolean supportsOption(final Option<?> option) {
866         return taskPool instanceof ExternalTaskPool ? EXTERNAL_POOL_OPTIONS.contains(option) : OPTIONS.contains(option);
867     }
868
869     public <T> T getOption(final Option<T> option) throws IOException {
870         if (! supportsOption(option)) {
871             return null;
872         } else if (option.equals(Options.WORKER_TASK_CORE_THREADS)) {
873             return option.cast(Integer.valueOf(taskPool.getCorePoolSize()));
874         } else if (option.equals(Options.WORKER_TASK_MAX_THREADS)) {
875             return option.cast(Integer.valueOf(taskPool.getMaximumPoolSize()));
876         } else if (option.equals(Options.WORKER_TASK_KEEPALIVE)) {
877             return option.cast(Integer.valueOf((int) Math.min((long) Integer.MAX_VALUE, taskPool.getKeepAliveTime(TimeUnit.MILLISECONDS))));
878         } else {
879             return null;
880         }
881     }
882
883     public <T> T setOption(final Option<T> option, final T value) throws IllegalArgumentException, IOException {
884         if (! supportsOption(option)) {
885             return null;
886         } else if (option.equals(Options.WORKER_TASK_CORE_THREADS)) {
887             final int old = taskPool.getCorePoolSize();
888             taskPool.setCorePoolSize(Options.WORKER_TASK_CORE_THREADS.cast(value).intValue());
889             return option.cast(Integer.valueOf(old));
890         } else if (option.equals(Options.WORKER_TASK_MAX_THREADS)) {
891             final int old = taskPool.getMaximumPoolSize();
892             taskPool.setMaximumPoolSize(Options.WORKER_TASK_MAX_THREADS.cast(value).intValue());
893             return option.cast(Integer.valueOf(old));
894         } else if (option.equals(Options.WORKER_TASK_KEEPALIVE)) {
895             final long old = taskPool.getKeepAliveTime(TimeUnit.MILLISECONDS);
896             taskPool.setKeepAliveTime(Options.WORKER_TASK_KEEPALIVE.cast(value).intValue(), TimeUnit.MILLISECONDS);
897             return option.cast(Integer.valueOf((int) Math.min((long) Integer.MAX_VALUE, old)));
898         } else {
899             return null;
900         }
901     }
902
903     //==================================================
904     //
905     // Accessor methods
906     //
907     //==================================================
908
909     /**
910      * Get the XNIO provider which produced this worker.
911      *
912      * @return the XNIO provider
913      */

914     public Xnio getXnio() {
915         return xnio;
916     }
917
918     /**
919      * Get the name of this worker.
920      *
921      * @return the name of the worker
922      */

923     public String getName() {
924         return name;
925     }
926
927     //==================================================
928     //
929     // SPI methods
930     //
931     //==================================================
932
933     /**
934      * Choose a thread randomly from this worker.
935      *
936      * @return the thread
937      */

938     protected abstract XnioIoThread chooseThread();
939
940     /**
941      * Get the core worker pool size.
942      *
943      * @return the core worker pool size
944      */

945     protected final int getCoreWorkerPoolSize() {
946         return taskPool.getCorePoolSize();
947     }
948
949     /**
950      * Get an estimate of the number of busy threads in the worker pool.
951      *
952      * @return the estimated number of busy threads in the worker pool
953      */

954     protected final int getBusyWorkerThreadCount() {
955         return taskPool.getActiveCount();
956     }
957
958     /**
959      * Get an estimate of the number of threads in the worker pool.
960      *
961      * @return the estimated number of threads in the worker pool
962      */

963     protected final int getWorkerPoolSize() {
964         return taskPool.getPoolSize();
965     }
966
967     /**
968      * Get the maximum worker pool size.
969      *
970      * @return the maximum worker pool size
971      */

972     protected final int getMaxWorkerPoolSize() {
973         return taskPool.getMaximumPoolSize();
974     }
975
976     /**
977      * Get an estimate of the number of tasks in the worker queue.
978      *
979      * @return the estimated number of tasks
980      */

981     protected final int getWorkerQueueSize() {
982         return taskPool.getQueueSize();
983     }
984
985     //==================================================
986     //
987     // Source address
988     //
989     //==================================================
990
991     /**
992      * Get the bind address table.
993      *
994      * @return the bind address table
995      */

996     protected CidrAddressTable<InetSocketAddress> getBindAddressTable() {
997         return bindAddressTable;
998     }
999
1000     /**
1001      * Get the expected bind address for the given destination, if any.
1002      *
1003      * @return the expected bind address for the given destination, or {@code nullif no explicit bind will be done
1004      */

1005     public InetSocketAddress getBindAddress(InetAddress destination) {
1006         return bindAddressTable.get(destination);
1007     }
1008
1009     //==================================================
1010     //
1011     // JMX
1012     //
1013     //==================================================
1014
1015     public abstract XnioWorkerMXBean getMXBean();
1016
1017     protected abstract ManagementRegistration registerServerMXBean(XnioServerMXBean metrics);
1018
1019     //==================================================
1020     //
1021     // Builder
1022     //
1023     //==================================================
1024
1025     /**
1026      * A builder which allows workers to be programmatically configured.
1027      */

1028     public static class Builder {
1029         private final Xnio xnio;
1030         private ExecutorService externalExecutorService;
1031         private Runnable terminationTask;
1032         private String workerName;
1033         private int coreWorkerPoolSize = 4;
1034         private int maxWorkerPoolSize = 16;
1035         private ThreadGroup threadGroup;
1036         private boolean daemon;
1037         private int workerKeepAlive = 60_000;
1038         private int workerIoThreads = 1;
1039         private long workerStackSize = 0L;
1040         private CidrAddressTable<InetSocketAddress> bindAddressConfigurations = new CidrAddressTable<>();
1041
1042         /**
1043          * Construct a new instance.
1044          *
1045          * @param xnio the XNIO instance (must not be {@code null})
1046          */

1047         protected Builder(final Xnio xnio) {
1048             this.xnio = xnio;
1049         }
1050
1051         public Xnio getXnio() {
1052             return xnio;
1053         }
1054
1055         public Builder populateFromOptions(OptionMap optionMap) {
1056             setWorkerName(optionMap.get(Options.WORKER_NAME));
1057             setCoreWorkerPoolSize(optionMap.get(Options.WORKER_TASK_CORE_THREADS, coreWorkerPoolSize));
1058             setMaxWorkerPoolSize(optionMap.get(Options.WORKER_TASK_MAX_THREADS, maxWorkerPoolSize));
1059             setDaemon(optionMap.get(Options.THREAD_DAEMON, daemon));
1060             setWorkerKeepAlive(optionMap.get(Options.WORKER_TASK_KEEPALIVE, workerKeepAlive));
1061             if (optionMap.contains(Options.WORKER_IO_THREADS)) {
1062                 setWorkerIoThreads(optionMap.get(Options.WORKER_IO_THREADS, 1));
1063             } else if (optionMap.contains(Options.WORKER_READ_THREADS) || optionMap.contains(Options.WORKER_WRITE_THREADS)) {
1064                 setWorkerIoThreads(max(optionMap.get(Options.WORKER_READ_THREADS, 1), optionMap.get(Options.WORKER_WRITE_THREADS, 1)));
1065             }
1066             setWorkerStackSize(optionMap.get(Options.STACK_SIZE, workerStackSize));
1067             return this;
1068         }
1069
1070         public Builder addBindAddressConfiguration(CidrAddress cidrAddress, InetAddress bindAddress) {
1071             return addBindAddressConfiguration(cidrAddress, new InetSocketAddress(bindAddress, 0));
1072         }
1073
1074         public Builder addBindAddressConfiguration(CidrAddress cidrAddress, InetSocketAddress bindAddress) {
1075             final Class<? extends InetAddress> networkAddrClass = cidrAddress.getNetworkAddress().getClass();
1076             if (bindAddress.isUnresolved()) {
1077                 throw Messages.msg.addressUnresolved(bindAddress);
1078             }
1079             if (networkAddrClass != bindAddress.getAddress().getClass()) {
1080                 throw Messages.msg.mismatchAddressType(networkAddrClass, bindAddress.getAddress().getClass());
1081             }
1082             bindAddressConfigurations.put(cidrAddress, bindAddress);
1083             return this;
1084         }
1085
1086         public Builder setBindAddressConfigurations(CidrAddressTable<InetSocketAddress> newTable) {
1087             bindAddressConfigurations = newTable;
1088             return this;
1089         }
1090
1091         public CidrAddressTable<InetSocketAddress> getBindAddressConfigurations() {
1092             return bindAddressConfigurations;
1093         }
1094
1095         public Runnable getTerminationTask() {
1096             return terminationTask;
1097         }
1098
1099         public Builder setTerminationTask(final Runnable terminationTask) {
1100             this.terminationTask = terminationTask;
1101             return this;
1102         }
1103
1104         public String getWorkerName() {
1105             return workerName;
1106         }
1107
1108         public Builder setWorkerName(final String workerName) {
1109             this.workerName = workerName;
1110             return this;
1111         }
1112
1113         public int getCoreWorkerPoolSize() {
1114             return coreWorkerPoolSize;
1115         }
1116
1117         public Builder setCoreWorkerPoolSize(final int coreWorkerPoolSize) {
1118             Assert.checkMinimumParameter("coreWorkerPoolSize", 0, coreWorkerPoolSize);
1119             this.coreWorkerPoolSize = coreWorkerPoolSize;
1120             return this;
1121         }
1122
1123         public int getMaxWorkerPoolSize() {
1124             return maxWorkerPoolSize;
1125         }
1126
1127         public Builder setMaxWorkerPoolSize(final int maxWorkerPoolSize) {
1128             Assert.checkMinimumParameter("maxWorkerPoolSize", 0, maxWorkerPoolSize);
1129             this.maxWorkerPoolSize = maxWorkerPoolSize;
1130             return this;
1131         }
1132
1133         public ThreadGroup getThreadGroup() {
1134             return threadGroup;
1135         }
1136
1137         public Builder setThreadGroup(final ThreadGroup threadGroup) {
1138             this.threadGroup = threadGroup;
1139             return this;
1140         }
1141
1142         public boolean isDaemon() {
1143             return daemon;
1144         }
1145
1146         public Builder setDaemon(final boolean daemon) {
1147             this.daemon = daemon;
1148             return this;
1149         }
1150
1151         public long getWorkerKeepAlive() {
1152             return workerKeepAlive;
1153         }
1154
1155         public Builder setWorkerKeepAlive(final int workerKeepAlive) {
1156             Assert.checkMinimumParameter("workerKeepAlive", 0, workerKeepAlive);
1157             this.workerKeepAlive = workerKeepAlive;
1158             return this;
1159         }
1160
1161         public int getWorkerIoThreads() {
1162             return workerIoThreads;
1163         }
1164
1165         public Builder setWorkerIoThreads(final int workerIoThreads) {
1166             Assert.checkMinimumParameter("workerIoThreads", 0, workerIoThreads);
1167             this.workerIoThreads = workerIoThreads;
1168             return this;
1169         }
1170
1171         public long getWorkerStackSize() {
1172             return workerStackSize;
1173         }
1174
1175         public Builder setWorkerStackSize(final long workerStackSize) {
1176             Assert.checkMinimumParameter("workerStackSize", 0, workerStackSize);
1177             this.workerStackSize = workerStackSize;
1178             return this;
1179         }
1180
1181         public ExecutorService getExternalExecutorService() {
1182             return externalExecutorService;
1183         }
1184
1185         public Builder setExternalExecutorService(final ExecutorService executorService) {
1186             this.externalExecutorService = executorService;
1187             return this;
1188         }
1189
1190         public XnioWorker build() {
1191             return xnio.build(this);
1192         }
1193     }
1194
1195     //==================================================
1196     //
1197     // Private
1198     //
1199     //==================================================
1200
1201     static class StreamConnectionWrapListener implements ChannelListener<StreamConnection> {
1202
1203         private final FutureResult<ConnectedStreamChannel> futureResult;
1204         private final ChannelListener<? super ConnectedStreamChannel> openListener;
1205
1206         public StreamConnectionWrapListener(final FutureResult<ConnectedStreamChannel> futureResult, final ChannelListener<? super ConnectedStreamChannel> openListener) {
1207             this.futureResult = futureResult;
1208             this.openListener = openListener;
1209         }
1210
1211         public void handleEvent(final StreamConnection channel) {
1212             final AssembledConnectedStreamChannel assembledChannel = new AssembledConnectedStreamChannel(channel, channel.getSourceChannel(), channel.getSinkChannel());
1213             if (!futureResult.setResult(assembledChannel)) {
1214                 safeClose(assembledChannel);
1215             } else {
1216                 ChannelListeners.invokeChannelListener(assembledChannel, openListener);
1217             }
1218         }
1219     }
1220
1221     static class MessageConnectionWrapListener implements ChannelListener<MessageConnection> {
1222
1223         private final FutureResult<ConnectedMessageChannel> futureResult;
1224         private final ChannelListener<? super ConnectedMessageChannel> openListener;
1225
1226         public MessageConnectionWrapListener(final FutureResult<ConnectedMessageChannel> futureResult, final ChannelListener<? super ConnectedMessageChannel> openListener) {
1227             this.futureResult = futureResult;
1228             this.openListener = openListener;
1229         }
1230
1231         public void handleEvent(final MessageConnection channel) {
1232             final AssembledConnectedMessageChannel assembledChannel = new AssembledConnectedMessageChannel(channel, channel.getSourceChannel(), channel.getSinkChannel());
1233             if (!futureResult.setResult(assembledChannel)) {
1234                 safeClose(assembledChannel);
1235             } else {
1236                 ChannelListeners.invokeChannelListener(assembledChannel, openListener);
1237             }
1238         }
1239     }
1240
1241     private static final IoFuture.HandlingNotifier<StreamConnection, FutureResult<ConnectedStreamChannel>> STREAM_WRAPPING_HANDLER = new IoFuture.HandlingNotifier<StreamConnection, FutureResult<ConnectedStreamChannel>>() {
1242         public void handleCancelled(final FutureResult<ConnectedStreamChannel> attachment) {
1243             attachment.setCancelled();
1244         }
1245
1246         public void handleFailed(final IOException exception, final FutureResult<ConnectedStreamChannel> attachment) {
1247             attachment.setException(exception);
1248         }
1249     };
1250
1251     private static final IoFuture.HandlingNotifier<MessageConnection, FutureResult<ConnectedMessageChannel>> MESSAGE_WRAPPING_HANDLER = new IoFuture.HandlingNotifier<MessageConnection, FutureResult<ConnectedMessageChannel>>() {
1252         public void handleCancelled(final FutureResult<ConnectedMessageChannel> attachment) {
1253             attachment.setCancelled();
1254         }
1255
1256         public void handleFailed(final IOException exception, final FutureResult<ConnectedMessageChannel> attachment) {
1257             attachment.setException(exception);
1258         }
1259     };
1260
1261     class WorkerThreadFactory implements ThreadFactory {
1262
1263         private final ThreadGroup threadGroup;
1264         private final long stackSize;
1265         private final boolean markThreadAsDaemon;
1266
1267         WorkerThreadFactory(final ThreadGroup threadGroup, final long stackSize, final boolean markThreadAsDaemon) {
1268             this.threadGroup = threadGroup;
1269             this.stackSize = stackSize;
1270             this.markThreadAsDaemon = markThreadAsDaemon;
1271         }
1272
1273         public Thread newThread(final Runnable r) {
1274             return doPrivileged(new PrivilegedAction<Thread>() {
1275                 public Thread run() {
1276                     final Thread taskThread = new Thread(threadGroup, r, name + " task-" + getNextSeq(), stackSize);
1277                     // Mark the thread as daemon if the Options.THREAD_DAEMON has been set
1278                     if (markThreadAsDaemon) {
1279                         taskThread.setDaemon(true);
1280                     }
1281                     return taskThread;
1282                 }
1283             });
1284         }
1285     }
1286
1287     interface TaskPool {
1288
1289         void shutdown();
1290
1291         List<Runnable> shutdownNow();
1292
1293         void execute(Runnable command);
1294
1295         int getCorePoolSize();
1296
1297         int getMaximumPoolSize();
1298
1299         long getKeepAliveTime(TimeUnit unit);
1300
1301         void setCorePoolSize(int size);
1302
1303         void setMaximumPoolSize(int size);
1304
1305         void setKeepAliveTime(long time, TimeUnit unit);
1306
1307         int getActiveCount();
1308
1309         int getPoolSize();
1310
1311         int getQueueSize();
1312     }
1313
1314     static class DefaultThreadPoolExecutor extends ThreadPoolExecutor {
1315         private final Runnable terminationTask;
1316
1317         DefaultThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit, final BlockingQueue<Runnable> workQueue, final ThreadFactory threadFactory, final Runnable terminationTask) {
1318             super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
1319             this.terminationTask = terminationTask;
1320         }
1321
1322         protected void terminated() {
1323             terminationTask.run();
1324         }
1325
1326         public void setCorePoolSize(final int size) {
1327             setMaximumPoolSize(size);
1328         }
1329
1330         public void setMaximumPoolSize(final int size) {
1331             if (size > getCorePoolSize()) {
1332                 super.setMaximumPoolSize(size);
1333                 super.setCorePoolSize(size);
1334             } else {
1335                 super.setCorePoolSize(size);
1336                 super.setMaximumPoolSize(size);
1337             }
1338         }
1339     }
1340
1341     static class ThreadPoolExecutorTaskPool implements TaskPool {
1342         private final ThreadPoolExecutor delegate;
1343
1344         ThreadPoolExecutorTaskPool(final ThreadPoolExecutor delegate) {
1345             this.delegate = delegate;
1346         }
1347
1348         @Override
1349         public void shutdown() {
1350             delegate.shutdown();
1351         }
1352
1353         @Override
1354         public List<Runnable> shutdownNow() {
1355             return delegate.shutdownNow();
1356         }
1357
1358         @Override
1359         public void execute(final Runnable command) {
1360             delegate.execute(command);
1361         }
1362
1363         @Override
1364         public int getCorePoolSize() {
1365             return delegate.getCorePoolSize();
1366         }
1367
1368         @Override
1369         public int getMaximumPoolSize() {
1370             return delegate.getMaximumPoolSize();
1371         }
1372
1373         @Override
1374         public long getKeepAliveTime(final TimeUnit unit) {
1375             return delegate.getKeepAliveTime(unit);
1376         }
1377
1378         @Override
1379         public void setCorePoolSize(final int size) {
1380             delegate.setCorePoolSize(size);
1381         }
1382
1383         @Override
1384         public void setMaximumPoolSize(final int size) {
1385             delegate.setMaximumPoolSize(size);
1386         }
1387
1388         @Override
1389         public void setKeepAliveTime(final long time, final TimeUnit unit) {
1390             delegate.setKeepAliveTime(time, unit);
1391         }
1392
1393         @Override
1394         public int getActiveCount() {
1395             return delegate.getActiveCount();
1396         }
1397
1398         @Override
1399         public int getPoolSize() {
1400             return delegate.getPoolSize();
1401         }
1402
1403         @Override
1404         public int getQueueSize() {
1405             return delegate.getQueue().size();
1406         }
1407     }
1408
1409     static class EnhancedQueueExecutorTaskPool implements TaskPool {
1410         private final EnhancedQueueExecutor executor;
1411
1412         EnhancedQueueExecutorTaskPool(final EnhancedQueueExecutor executor) {
1413             this.executor = executor;
1414         }
1415
1416         public void shutdown() {
1417             executor.shutdown();
1418         }
1419
1420         public List<Runnable> shutdownNow() {
1421             return executor.shutdownNow();
1422         }
1423
1424         public void execute(final Runnable command) {
1425             executor.execute(command);
1426         }
1427
1428         public int getCorePoolSize() {
1429             return executor.getCorePoolSize();
1430         }
1431
1432         public int getMaximumPoolSize() {
1433             return executor.getMaximumPoolSize();
1434         }
1435
1436         public long getKeepAliveTime(final TimeUnit unit) {
1437             return executor.getKeepAliveTime(unit);
1438         }
1439
1440         public void setCorePoolSize(final int size) {
1441             executor.setCorePoolSize(size);
1442         }
1443
1444         public void setMaximumPoolSize(final int size) {
1445             executor.setMaximumPoolSize(size);
1446         }
1447
1448         public void setKeepAliveTime(final long time, final TimeUnit unit) {
1449             executor.setKeepAliveTime(time, unit);
1450         }
1451
1452         public int getActiveCount() {
1453             return executor.getActiveCount();
1454         }
1455
1456         public int getPoolSize() {
1457             return executor.getPoolSize();
1458         }
1459
1460         public int getQueueSize() {
1461             return executor.getQueueSize();
1462         }
1463     }
1464
1465     static class ExecutorServiceTaskPool implements TaskPool {
1466         private final ExecutorService delegate;
1467
1468         ExecutorServiceTaskPool(final ExecutorService delegate) {
1469             this.delegate = delegate;
1470         }
1471
1472         public void shutdown() {
1473             delegate.shutdown();
1474         }
1475
1476         public List<Runnable> shutdownNow() {
1477             return delegate.shutdownNow();
1478         }
1479
1480         public void execute(final Runnable command) {
1481             delegate.execute(command);
1482         }
1483
1484         public int getCorePoolSize() {
1485             return -1;
1486         }
1487
1488         public int getMaximumPoolSize() {
1489             return -1;
1490         }
1491
1492         public long getKeepAliveTime(final TimeUnit unit) {
1493             return -1;
1494         }
1495
1496         public void setCorePoolSize(final int size) {
1497         }
1498
1499         public void setMaximumPoolSize(final int size) {
1500         }
1501
1502         public void setKeepAliveTime(final long time, final TimeUnit unit) {
1503         }
1504
1505         public int getActiveCount() {
1506             return -1;
1507         }
1508
1509         public int getPoolSize() {
1510             return -1;
1511         }
1512
1513         public int getQueueSize() {
1514             return -1;
1515         }
1516     }
1517
1518     static class ExternalTaskPool implements TaskPool {
1519         private final TaskPool delegate;
1520
1521         ExternalTaskPool(final TaskPool delegate) {
1522             this.delegate = delegate;
1523         }
1524
1525         @Override
1526         public void shutdown() {
1527             // no operation
1528         }
1529
1530         @Override
1531         public List<Runnable> shutdownNow() {
1532             return Collections.emptyList();
1533         }
1534
1535         @Override
1536         public void execute(final Runnable command) {
1537             delegate.execute(command);
1538         }
1539
1540         @Override
1541         public int getCorePoolSize() {
1542             return delegate.getCorePoolSize();
1543         }
1544
1545         @Override
1546         public int getMaximumPoolSize() {
1547             return delegate.getMaximumPoolSize();
1548         }
1549
1550         @Override
1551         public long getKeepAliveTime(final TimeUnit unit) {
1552             return delegate.getKeepAliveTime(unit);
1553         }
1554
1555         @Override
1556         public void setCorePoolSize(final int size) {
1557             delegate.setCorePoolSize(size);
1558         }
1559
1560         @Override
1561         public void setMaximumPoolSize(final int size) {
1562             delegate.setMaximumPoolSize(size);
1563         }
1564
1565         @Override
1566         public void setKeepAliveTime(final long time, final TimeUnit unit) {
1567             delegate.setKeepAliveTime(time, unit);
1568         }
1569
1570         @Override
1571         public int getActiveCount() {
1572             return delegate.getActiveCount();
1573         }
1574
1575         @Override
1576         public int getPoolSize() {
1577             return delegate.getPoolSize();
1578         }
1579
1580         @Override
1581         public int getQueueSize() {
1582             return delegate.getQueueSize();
1583         }
1584     }
1585 }
1586
1587