1
19
20 package org.xnio;
21
22 import java.io.IOException;
23 import java.net.InetAddress;
24 import java.net.InetSocketAddress;
25 import java.net.SocketAddress;
26 import java.security.PrivilegedAction;
27 import java.util.Collections;
28 import java.util.List;
29 import java.util.Set;
30 import java.util.concurrent.AbstractExecutorService;
31 import java.util.concurrent.BlockingQueue;
32 import java.util.concurrent.ExecutorService;
33 import java.util.concurrent.LinkedBlockingDeque;
34 import java.util.concurrent.ThreadFactory;
35 import java.util.concurrent.ThreadPoolExecutor;
36 import java.util.concurrent.TimeUnit;
37 import java.util.concurrent.atomic.AtomicInteger;
38 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
39 import java.util.zip.Deflater;
40 import java.util.zip.Inflater;
41
42 import org.jboss.threads.EnhancedQueueExecutor;
43 import org.wildfly.common.Assert;
44 import org.wildfly.common.context.ContextManager;
45 import org.wildfly.common.context.Contextual;
46 import org.wildfly.common.net.CidrAddress;
47 import org.wildfly.common.net.CidrAddressTable;
48 import org.xnio._private.Messages;
49 import org.xnio.channels.AcceptingChannel;
50 import org.xnio.channels.AssembledConnectedMessageChannel;
51 import org.xnio.channels.AssembledConnectedStreamChannel;
52 import org.xnio.channels.BoundChannel;
53 import org.xnio.channels.Configurable;
54 import org.xnio.channels.ConnectedMessageChannel;
55 import org.xnio.channels.ConnectedStreamChannel;
56 import org.xnio.channels.MulticastMessageChannel;
57 import org.xnio.channels.StreamChannel;
58 import org.xnio.channels.StreamSinkChannel;
59 import org.xnio.channels.StreamSourceChannel;
60 import org.xnio.conduits.ConduitStreamSinkChannel;
61 import org.xnio.conduits.ConduitStreamSourceChannel;
62 import org.xnio.conduits.DeflatingStreamSinkConduit;
63 import org.xnio.conduits.InflatingStreamSourceConduit;
64 import org.xnio.conduits.StreamSinkChannelWrappingConduit;
65 import org.xnio.conduits.StreamSourceChannelWrappingConduit;
66 import org.xnio.management.XnioServerMXBean;
67 import org.xnio.management.XnioWorkerMXBean;
68
69 import static java.lang.Math.max;
70 import static java.security.AccessController.doPrivileged;
71 import org.jboss.logging.Logger;
72 import static org.xnio.IoUtils.safeClose;
73 import static org.xnio._private.Messages.msg;
74
75
82 @SuppressWarnings("unused")
83 public abstract class XnioWorker extends AbstractExecutorService implements Configurable, ExecutorService, XnioIoFactory, Contextual<XnioWorker> {
84
85 private final Xnio xnio;
86 private final TaskPool taskPool;
87 private final String name;
88 private final Runnable terminationTask;
89 private final CidrAddressTable<InetSocketAddress> bindAddressTable;
90
91 private volatile int taskSeq;
92
93 private static final AtomicIntegerFieldUpdater<XnioWorker> taskSeqUpdater = AtomicIntegerFieldUpdater.newUpdater(XnioWorker.class, "taskSeq");
94
95 private static final AtomicInteger seq = new AtomicInteger(1);
96
97 private static final RuntimePermission CREATE_WORKER_PERMISSION = new RuntimePermission("createXnioWorker");
98
99 private int getNextSeq() {
100 return taskSeqUpdater.incrementAndGet(this);
101 }
102
103 private static final Logger log = Logger.getLogger("org.xnio");
104
105
110 protected XnioWorker(final Builder builder) {
111 this.xnio = builder.xnio;
112 this.terminationTask = builder.terminationTask;
113 final SecurityManager sm = System.getSecurityManager();
114 if (sm != null) {
115 sm.checkPermission(CREATE_WORKER_PERMISSION);
116 }
117 String workerName = builder.getWorkerName();
118 if (workerName == null) {
119 workerName = "XNIO-" + seq.getAndIncrement();
120 }
121 name = workerName;
122 final boolean markThreadAsDaemon = builder.isDaemon();
123 bindAddressTable = builder.getBindAddressConfigurations();
124 final Runnable terminationTask = new Runnable() {
125 public void run() {
126 taskPoolTerminated();
127 }
128 };
129 final ExecutorService executorService = builder.getExternalExecutorService();
130 if (executorService != null) {
131 if (executorService instanceof EnhancedQueueExecutor) {
132 taskPool = new ExternalTaskPool(
133 new EnhancedQueueExecutorTaskPool((EnhancedQueueExecutor) executorService));
134 } else if (executorService instanceof ThreadPoolExecutor) {
135 taskPool = new ExternalTaskPool(new ThreadPoolExecutorTaskPool((ThreadPoolExecutor) executorService));
136 } else {
137 taskPool = new ExternalTaskPool(new ExecutorServiceTaskPool(executorService));
138 }
139 } else if (EnhancedQueueExecutor.DISABLE_HINT) {
140 final int poolSize = max(builder.getMaxWorkerPoolSize(), builder.getCoreWorkerPoolSize());
141 taskPool = new ThreadPoolExecutorTaskPool(new DefaultThreadPoolExecutor(
142 poolSize,
143 poolSize,
144 builder.getWorkerKeepAlive(), TimeUnit.MILLISECONDS,
145 new LinkedBlockingDeque<>(),
146 new WorkerThreadFactory(builder.getThreadGroup(), builder.getWorkerStackSize(), markThreadAsDaemon),
147 terminationTask));
148 } else {
149 taskPool = new EnhancedQueueExecutorTaskPool(new EnhancedQueueExecutor.Builder()
150 .setCorePoolSize(builder.getCoreWorkerPoolSize())
151 .setMaximumPoolSize(builder.getMaxWorkerPoolSize())
152 .setKeepAliveTime(builder.getWorkerKeepAlive(), TimeUnit.MILLISECONDS)
153 .setThreadFactory(new WorkerThreadFactory(builder.getThreadGroup(), builder.getWorkerStackSize(), markThreadAsDaemon))
154 .setTerminationTask(terminationTask)
155 .setRegisterMBean(true)
156 .setMBeanName(workerName)
157 .build()
158 );
159 }
160 }
161
162
163
164
165
166
167
168 private static final ContextManager<XnioWorker> CONTEXT_MANAGER = doPrivileged((PrivilegedAction<ContextManager<XnioWorker>>) () -> new ContextManager<XnioWorker>(XnioWorker.class, "org.xnio.worker"));
169
170 static {
171 doPrivileged((PrivilegedAction<Void>) () -> {
172 CONTEXT_MANAGER.setGlobalDefaultSupplier(() -> DefaultXnioWorkerHolder.INSTANCE);
173 return null;
174 });
175 }
176
177
182 public static ContextManager<XnioWorker> getContextManager() {
183 return CONTEXT_MANAGER;
184 }
185
186
191 public ContextManager<XnioWorker> getInstanceContextManager() {
192 return getContextManager();
193 }
194
195
196
197
198
199
200
201
202
203
212 @Deprecated
213 public AcceptingChannel<? extends ConnectedStreamChannel> createStreamServer(SocketAddress bindAddress, ChannelListener<? super AcceptingChannel<ConnectedStreamChannel>> acceptListener, OptionMap optionMap) throws IOException {
214 final AcceptingChannel<StreamConnection> server = createStreamConnectionServer(bindAddress, null, optionMap);
215 final AcceptingChannel<ConnectedStreamChannel> acceptingChannel = new AcceptingChannel<ConnectedStreamChannel>() {
216 public ConnectedStreamChannel accept() throws IOException {
217 final StreamConnection connection = server.accept();
218 return connection == null ? null : new AssembledConnectedStreamChannel(connection, connection.getSourceChannel(), connection.getSinkChannel());
219 }
220
221 public ChannelListener.Setter<? extends AcceptingChannel<ConnectedStreamChannel>> getAcceptSetter() {
222 return ChannelListeners.getDelegatingSetter(server.getAcceptSetter(), this);
223 }
224
225 public ChannelListener.Setter<? extends AcceptingChannel<ConnectedStreamChannel>> getCloseSetter() {
226 return ChannelListeners.getDelegatingSetter(server.getCloseSetter(), this);
227 }
228
229 public SocketAddress getLocalAddress() {
230 return server.getLocalAddress();
231 }
232
233 public <A extends SocketAddress> A getLocalAddress(final Class<A> type) {
234 return server.getLocalAddress(type);
235 }
236
237 public void suspendAccepts() {
238 server.suspendAccepts();
239 }
240
241 public void resumeAccepts() {
242 server.resumeAccepts();
243 }
244
245 public boolean isAcceptResumed() {
246 return server.isAcceptResumed();
247 }
248
249 public void wakeupAccepts() {
250 server.wakeupAccepts();
251 }
252
253 public void awaitAcceptable() throws IOException {
254 server.awaitAcceptable();
255 }
256
257 public void awaitAcceptable(final long time, final TimeUnit timeUnit) throws IOException {
258 server.awaitAcceptable(time, timeUnit);
259 }
260
261 public XnioWorker getWorker() {
262 return server.getWorker();
263 }
264
265 @Deprecated
266 public XnioExecutor getAcceptThread() {
267 return server.getAcceptThread();
268 }
269
270 public XnioIoThread getIoThread() {
271 return server.getIoThread();
272 }
273
274 public void close() throws IOException {
275 server.close();
276 }
277
278 public boolean isOpen() {
279 return server.isOpen();
280 }
281
282 public boolean supportsOption(final Option<?> option) {
283 return server.supportsOption(option);
284 }
285
286 public <T> T getOption(final Option<T> option) throws IOException {
287 return server.getOption(option);
288 }
289
290 public <T> T setOption(final Option<T> option, final T value) throws IllegalArgumentException, IOException {
291 return server.setOption(option, value);
292 }
293 };
294 acceptingChannel.getAcceptSetter().set(acceptListener);
295 return acceptingChannel;
296 }
297
298
307 public AcceptingChannel<StreamConnection> createStreamConnectionServer(SocketAddress bindAddress, ChannelListener<? super AcceptingChannel<StreamConnection>> acceptListener, OptionMap optionMap) throws IOException {
308 Assert.checkNotNullParam("bindAddress", bindAddress);
309 if (bindAddress instanceof InetSocketAddress) {
310 return createTcpConnectionServer((InetSocketAddress) bindAddress, acceptListener, optionMap);
311 } else if (bindAddress instanceof LocalSocketAddress) {
312 return createLocalStreamConnectionServer((LocalSocketAddress) bindAddress, acceptListener, optionMap);
313 } else {
314 throw msg.badSockType(bindAddress.getClass());
315 }
316 }
317
318
327 protected AcceptingChannel<StreamConnection> createTcpConnectionServer(InetSocketAddress bindAddress, ChannelListener<? super AcceptingChannel<StreamConnection>> acceptListener, OptionMap optionMap) throws IOException {
328 throw msg.unsupported("createTcpConnectionServer");
329 }
330
331
340 protected AcceptingChannel<StreamConnection> createLocalStreamConnectionServer(LocalSocketAddress bindAddress, ChannelListener<? super AcceptingChannel<StreamConnection>> acceptListener, OptionMap optionMap) throws IOException {
341 throw msg.unsupported("createLocalStreamConnectionServer");
342 }
343
344
345
346
354 @Deprecated
355 public IoFuture<ConnectedStreamChannel> connectStream(SocketAddress destination, ChannelListener<? super ConnectedStreamChannel> openListener, OptionMap optionMap) {
356 final FutureResult<ConnectedStreamChannel> futureResult = new FutureResult<ConnectedStreamChannel>();
357 final ChannelListener<StreamConnection> nestedOpenListener = new StreamConnectionWrapListener(futureResult, openListener);
358 final IoFuture<StreamConnection> future = openStreamConnection(destination, nestedOpenListener, optionMap);
359 future.addNotifier(STREAM_WRAPPING_HANDLER, futureResult);
360 futureResult.addCancelHandler(future);
361 return futureResult.getIoFuture();
362 }
363
364
373 @Deprecated
374 public IoFuture<ConnectedStreamChannel> connectStream(SocketAddress destination, ChannelListener<? super ConnectedStreamChannel> openListener, ChannelListener<? super BoundChannel> bindListener, OptionMap optionMap) {
375 final FutureResult<ConnectedStreamChannel> futureResult = new FutureResult<ConnectedStreamChannel>();
376 final ChannelListener<StreamConnection> nestedOpenListener = new StreamConnectionWrapListener(futureResult, openListener);
377 final IoFuture<StreamConnection> future = openStreamConnection(destination, nestedOpenListener, bindListener, optionMap);
378 future.addNotifier(STREAM_WRAPPING_HANDLER, futureResult);
379 futureResult.addCancelHandler(future);
380 return futureResult.getIoFuture();
381 }
382
383
394 @Deprecated
395 public IoFuture<ConnectedStreamChannel> connectStream(SocketAddress bindAddress, SocketAddress destination, ChannelListener<? super ConnectedStreamChannel> openListener, ChannelListener<? super BoundChannel> bindListener, OptionMap optionMap) {
396 final FutureResult<ConnectedStreamChannel> futureResult = new FutureResult<ConnectedStreamChannel>();
397 final ChannelListener<StreamConnection> nestedOpenListener = new StreamConnectionWrapListener(futureResult, openListener);
398 final IoFuture<StreamConnection> future = openStreamConnection(bindAddress, destination, nestedOpenListener, bindListener, optionMap);
399 future.addNotifier(STREAM_WRAPPING_HANDLER, futureResult);
400 futureResult.addCancelHandler(future);
401 return futureResult.getIoFuture();
402 }
403
404 public IoFuture<StreamConnection> openStreamConnection(SocketAddress destination, ChannelListener<? super StreamConnection> openListener, OptionMap optionMap) {
405 return chooseThread().openStreamConnection(destination, openListener, optionMap);
406 }
407
408 public IoFuture<StreamConnection> openStreamConnection(SocketAddress destination, ChannelListener<? super StreamConnection> openListener, ChannelListener<? super BoundChannel> bindListener, OptionMap optionMap) {
409 return chooseThread().openStreamConnection(destination, openListener, bindListener, optionMap);
410 }
411
412 public IoFuture<StreamConnection> openStreamConnection(SocketAddress bindAddress, SocketAddress destination, ChannelListener<? super StreamConnection> openListener, ChannelListener<? super BoundChannel> bindListener, OptionMap optionMap) {
413 return chooseThread().openStreamConnection(bindAddress, destination, openListener, bindListener, optionMap);
414 }
415
416
417
418
428 @Deprecated
429 public IoFuture<ConnectedStreamChannel> acceptStream(SocketAddress destination, ChannelListener<? super ConnectedStreamChannel> openListener, ChannelListener<? super BoundChannel> bindListener, OptionMap optionMap) {
430 final FutureResult<ConnectedStreamChannel> futureResult = new FutureResult<ConnectedStreamChannel>();
431 final ChannelListener<StreamConnection> nestedOpenListener = new StreamConnectionWrapListener(futureResult, openListener);
432 final IoFuture<StreamConnection> future = acceptStreamConnection(destination, nestedOpenListener, bindListener, optionMap);
433 future.addNotifier(STREAM_WRAPPING_HANDLER, futureResult);
434 futureResult.addCancelHandler(future);
435 return futureResult.getIoFuture();
436 }
437
438 public IoFuture<StreamConnection> acceptStreamConnection(SocketAddress destination, ChannelListener<? super StreamConnection> openListener, ChannelListener<? super BoundChannel> bindListener, OptionMap optionMap) {
439 return chooseThread().acceptStreamConnection(destination, openListener, bindListener, optionMap);
440 }
441
442
443
444
445
446
447
448
457 @Deprecated
458
459 public IoFuture<ConnectedMessageChannel> connectDatagram(SocketAddress destination, ChannelListener<? super ConnectedMessageChannel> openListener, ChannelListener<? super BoundChannel> bindListener, OptionMap optionMap) {
460 final FutureResult<ConnectedMessageChannel> futureResult = new FutureResult<ConnectedMessageChannel>();
461 final ChannelListener<MessageConnection> nestedOpenListener = new MessageConnectionWrapListener(futureResult, openListener);
462 final IoFuture<MessageConnection> future = openMessageConnection(destination, nestedOpenListener, optionMap);
463 future.addNotifier(MESSAGE_WRAPPING_HANDLER, futureResult);
464 futureResult.addCancelHandler(future);
465 return futureResult.getIoFuture();
466 }
467
468
479 @Deprecated
480
481 public IoFuture<ConnectedMessageChannel> connectDatagram(SocketAddress bindAddress, SocketAddress destination, ChannelListener<? super ConnectedMessageChannel> openListener, ChannelListener<? super BoundChannel> bindListener, OptionMap optionMap) {
482 final FutureResult<ConnectedMessageChannel> futureResult = new FutureResult<ConnectedMessageChannel>();
483 final ChannelListener<MessageConnection> nestedOpenListener = new MessageConnectionWrapListener(futureResult, openListener);
484 final IoFuture<MessageConnection> future = openMessageConnection(destination, nestedOpenListener, optionMap);
485 future.addNotifier(MESSAGE_WRAPPING_HANDLER, futureResult);
486 futureResult.addCancelHandler(future);
487 return futureResult.getIoFuture();
488 }
489
490 public IoFuture<MessageConnection> openMessageConnection(final SocketAddress destination, final ChannelListener<? super MessageConnection> openListener, final OptionMap optionMap) {
491 return chooseThread().openMessageConnection(destination, openListener, optionMap);
492 }
493
494
495
496
506 @Deprecated
507 public IoFuture<ConnectedMessageChannel> acceptDatagram(SocketAddress destination, ChannelListener<? super ConnectedMessageChannel> openListener, ChannelListener<? super BoundChannel> bindListener, OptionMap optionMap) {
508 final FutureResult<ConnectedMessageChannel> futureResult = new FutureResult<ConnectedMessageChannel>();
509 final ChannelListener<MessageConnection> nestedOpenListener = new MessageConnectionWrapListener(futureResult, openListener);
510 final IoFuture<MessageConnection> future = acceptMessageConnection(destination, nestedOpenListener, bindListener, optionMap);
511 future.addNotifier(MESSAGE_WRAPPING_HANDLER, futureResult);
512 futureResult.addCancelHandler(future);
513 return futureResult.getIoFuture();
514 }
515
516 public IoFuture<MessageConnection> acceptMessageConnection(final SocketAddress destination, final ChannelListener<? super MessageConnection> openListener, final ChannelListener<? super BoundChannel> bindListener, final OptionMap optionMap) {
517 return chooseThread().acceptMessageConnection(destination, openListener, bindListener, optionMap);
518 }
519
520
521
522
523
524
525
526
539 public MulticastMessageChannel createUdpServer(InetSocketAddress bindAddress, ChannelListener<? super MulticastMessageChannel> bindListener, OptionMap optionMap) throws IOException {
540 throw msg.unsupported("createUdpServer");
541 }
542
543
555 public MulticastMessageChannel createUdpServer(InetSocketAddress bindAddress, OptionMap optionMap) throws IOException {
556 return createUdpServer(bindAddress, ChannelListeners.nullChannelListener(), optionMap);
557 }
558
559
560
561
562
563
564
565
574 @Deprecated
575 public void createPipe(ChannelListener<? super StreamChannel> leftOpenListener, ChannelListener<? super StreamChannel> rightOpenListener, final OptionMap optionMap) throws IOException {
576 final ChannelPipe<StreamChannel, StreamChannel> pipe = createFullDuplexPipe();
577 final boolean establishWriting = optionMap.get(Options.WORKER_ESTABLISH_WRITING, false);
578 final StreamChannel left = pipe.getLeftSide();
579 XnioExecutor leftExec = establishWriting ? left.getWriteThread() : left.getReadThread();
580 final StreamChannel right = pipe.getRightSide();
581 XnioExecutor rightExec = establishWriting ? right.getWriteThread() : right.getReadThread();
582
583
584 leftExec.execute(ChannelListeners.getChannelListenerTask(left, leftOpenListener));
585
586
587 rightExec.execute(ChannelListeners.getChannelListenerTask(right, rightOpenListener));
588 }
589
590
599 @Deprecated
600 public void createOneWayPipe(ChannelListener<? super StreamSourceChannel> sourceListener, ChannelListener<? super StreamSinkChannel> sinkListener, final OptionMap optionMap) throws IOException {
601 final ChannelPipe<StreamSourceChannel, StreamSinkChannel> pipe = createHalfDuplexPipe();
602 final StreamSourceChannel left = pipe.getLeftSide();
603 XnioExecutor leftExec = left.getReadThread();
604 final StreamSinkChannel right = pipe.getRightSide();
605 XnioExecutor rightExec = right.getWriteThread();
606
607
608 leftExec.execute(ChannelListeners.getChannelListenerTask(left, sourceListener));
609
610
611 rightExec.execute(ChannelListeners.getChannelListenerTask(right, sinkListener));
612 }
613
614
615
616
617
618
619
620
628 public StreamSourceChannel getInflatingChannel(final StreamSourceChannel delegate, OptionMap options) throws IOException {
629 final boolean nowrap;
630 switch (options.get(Options.COMPRESSION_TYPE, CompressionType.DEFLATE)) {
631 case DEFLATE: nowrap = false; break;
632 case GZIP: nowrap = true; break;
633 default: throw msg.badCompressionFormat();
634 }
635 return getInflatingChannel(delegate, new Inflater(nowrap));
636 }
637
638
646 protected StreamSourceChannel getInflatingChannel(final StreamSourceChannel delegate, final Inflater inflater) throws IOException {
647 return new ConduitStreamSourceChannel(Configurable.EMPTY, new InflatingStreamSourceConduit(new StreamSourceChannelWrappingConduit(delegate), inflater));
648 }
649
650
658 public StreamSinkChannel getDeflatingChannel(final StreamSinkChannel delegate, final OptionMap options) throws IOException {
659 final int level = options.get(Options.COMPRESSION_LEVEL, -1);
660 final boolean nowrap;
661 switch (options.get(Options.COMPRESSION_TYPE, CompressionType.DEFLATE)) {
662 case DEFLATE: nowrap = false; break;
663 case GZIP: nowrap = true; break;
664 default: throw msg.badCompressionFormat();
665 }
666 return getDeflatingChannel(delegate, new Deflater(level, nowrap));
667 }
668
669
677 protected StreamSinkChannel getDeflatingChannel(final StreamSinkChannel delegate, final Deflater deflater) throws IOException {
678 return new ConduitStreamSinkChannel(Configurable.EMPTY, new DeflatingStreamSinkConduit(new StreamSinkChannelWrappingConduit(delegate), deflater));
679 }
680
681 public ChannelPipe<StreamChannel, StreamChannel> createFullDuplexPipe() throws IOException {
682 return chooseThread().createFullDuplexPipe();
683 }
684
685 public ChannelPipe<StreamConnection, StreamConnection> createFullDuplexPipeConnection() throws IOException {
686 return chooseThread().createFullDuplexPipeConnection();
687 }
688
689 public ChannelPipe<StreamSourceChannel, StreamSinkChannel> createHalfDuplexPipe() throws IOException {
690 return chooseThread().createHalfDuplexPipe();
691 }
692
693 public ChannelPipe<StreamConnection, StreamConnection> createFullDuplexPipeConnection(final XnioIoFactory peer) throws IOException {
694 return chooseThread().createFullDuplexPipeConnection(peer);
695 }
696
697 public ChannelPipe<StreamSourceChannel, StreamSinkChannel> createHalfDuplexPipe(final XnioIoFactory peer) throws IOException {
698 return chooseThread().createHalfDuplexPipe(peer);
699 }
700
701
702
703
704
705
706
707
712 public abstract void shutdown();
713
714
721 public abstract List<Runnable> shutdownNow();
722
723
729 public abstract boolean isShutdown();
730
731
737 public abstract boolean isTerminated();
738
739
747 public abstract boolean awaitTermination(final long timeout, final TimeUnit unit) throws InterruptedException;
748
749
754 public abstract void awaitTermination() throws InterruptedException;
755
756
757
758
759
760
761
762
767 public final XnioIoThread getIoThread() {
768 return chooseThread();
769 }
770
771
777 public abstract XnioIoThread getIoThread(int hashCode);
778
779
784 protected Runnable getTerminationTask() {
785 return terminationTask;
786 }
787
788
791 protected void taskPoolTerminated() {}
792
793
797 protected void shutDownTaskPool() {
798 if (isTaskPoolExternal()) {
799 taskPoolTerminated();
800 } else {
801 doPrivileged(new PrivilegedAction<Object>() {
802 public Object run() {
803 taskPool.shutdown();
804 return null;
805 }
806 });
807 }
808 }
809
810
815 protected List<Runnable> shutDownTaskPoolNow() {
816 if (! isTaskPoolExternal()) return doPrivileged(new PrivilegedAction<List<Runnable>>() {
817 public List<Runnable> run() {
818 return taskPool.shutdownNow();
819 }
820 });
821 return Collections.emptyList();
822 }
823
824
830 protected boolean isTaskPoolExternal() {
831 return taskPool instanceof ExternalTaskPool;
832 }
833
834
839 public void execute(final Runnable command) {
840 taskPool.execute(command);
841 }
842
843
848 public abstract int getIoThreadCount();
849
850
851
852
853
854
855
856 private final static Set<Option<?>> OPTIONS = Option.setBuilder()
857 .add(Options.WORKER_TASK_CORE_THREADS)
858 .add(Options.WORKER_TASK_MAX_THREADS)
859 .add(Options.WORKER_TASK_KEEPALIVE)
860 .create();
861
862 private final static Set<Option<?>> EXTERNAL_POOL_OPTIONS = Option.setBuilder()
863 .create();
864
865 public boolean supportsOption(final Option<?> option) {
866 return taskPool instanceof ExternalTaskPool ? EXTERNAL_POOL_OPTIONS.contains(option) : OPTIONS.contains(option);
867 }
868
869 public <T> T getOption(final Option<T> option) throws IOException {
870 if (! supportsOption(option)) {
871 return null;
872 } else if (option.equals(Options.WORKER_TASK_CORE_THREADS)) {
873 return option.cast(Integer.valueOf(taskPool.getCorePoolSize()));
874 } else if (option.equals(Options.WORKER_TASK_MAX_THREADS)) {
875 return option.cast(Integer.valueOf(taskPool.getMaximumPoolSize()));
876 } else if (option.equals(Options.WORKER_TASK_KEEPALIVE)) {
877 return option.cast(Integer.valueOf((int) Math.min((long) Integer.MAX_VALUE, taskPool.getKeepAliveTime(TimeUnit.MILLISECONDS))));
878 } else {
879 return null;
880 }
881 }
882
883 public <T> T setOption(final Option<T> option, final T value) throws IllegalArgumentException, IOException {
884 if (! supportsOption(option)) {
885 return null;
886 } else if (option.equals(Options.WORKER_TASK_CORE_THREADS)) {
887 final int old = taskPool.getCorePoolSize();
888 taskPool.setCorePoolSize(Options.WORKER_TASK_CORE_THREADS.cast(value).intValue());
889 return option.cast(Integer.valueOf(old));
890 } else if (option.equals(Options.WORKER_TASK_MAX_THREADS)) {
891 final int old = taskPool.getMaximumPoolSize();
892 taskPool.setMaximumPoolSize(Options.WORKER_TASK_MAX_THREADS.cast(value).intValue());
893 return option.cast(Integer.valueOf(old));
894 } else if (option.equals(Options.WORKER_TASK_KEEPALIVE)) {
895 final long old = taskPool.getKeepAliveTime(TimeUnit.MILLISECONDS);
896 taskPool.setKeepAliveTime(Options.WORKER_TASK_KEEPALIVE.cast(value).intValue(), TimeUnit.MILLISECONDS);
897 return option.cast(Integer.valueOf((int) Math.min((long) Integer.MAX_VALUE, old)));
898 } else {
899 return null;
900 }
901 }
902
903
904
905
906
907
908
909
914 public Xnio getXnio() {
915 return xnio;
916 }
917
918
923 public String getName() {
924 return name;
925 }
926
927
928
929
930
931
932
933
938 protected abstract XnioIoThread chooseThread();
939
940
945 protected final int getCoreWorkerPoolSize() {
946 return taskPool.getCorePoolSize();
947 }
948
949
954 protected final int getBusyWorkerThreadCount() {
955 return taskPool.getActiveCount();
956 }
957
958
963 protected final int getWorkerPoolSize() {
964 return taskPool.getPoolSize();
965 }
966
967
972 protected final int getMaxWorkerPoolSize() {
973 return taskPool.getMaximumPoolSize();
974 }
975
976
981 protected final int getWorkerQueueSize() {
982 return taskPool.getQueueSize();
983 }
984
985
986
987
988
989
990
991
996 protected CidrAddressTable<InetSocketAddress> getBindAddressTable() {
997 return bindAddressTable;
998 }
999
1000
1005 public InetSocketAddress getBindAddress(InetAddress destination) {
1006 return bindAddressTable.get(destination);
1007 }
1008
1009
1010
1011
1012
1013
1014
1015 public abstract XnioWorkerMXBean getMXBean();
1016
1017 protected abstract ManagementRegistration registerServerMXBean(XnioServerMXBean metrics);
1018
1019
1020
1021
1022
1023
1024
1025
1028 public static class Builder {
1029 private final Xnio xnio;
1030 private ExecutorService externalExecutorService;
1031 private Runnable terminationTask;
1032 private String workerName;
1033 private int coreWorkerPoolSize = 4;
1034 private int maxWorkerPoolSize = 16;
1035 private ThreadGroup threadGroup;
1036 private boolean daemon;
1037 private int workerKeepAlive = 60_000;
1038 private int workerIoThreads = 1;
1039 private long workerStackSize = 0L;
1040 private CidrAddressTable<InetSocketAddress> bindAddressConfigurations = new CidrAddressTable<>();
1041
1042
1047 protected Builder(final Xnio xnio) {
1048 this.xnio = xnio;
1049 }
1050
1051 public Xnio getXnio() {
1052 return xnio;
1053 }
1054
1055 public Builder populateFromOptions(OptionMap optionMap) {
1056 setWorkerName(optionMap.get(Options.WORKER_NAME));
1057 setCoreWorkerPoolSize(optionMap.get(Options.WORKER_TASK_CORE_THREADS, coreWorkerPoolSize));
1058 setMaxWorkerPoolSize(optionMap.get(Options.WORKER_TASK_MAX_THREADS, maxWorkerPoolSize));
1059 setDaemon(optionMap.get(Options.THREAD_DAEMON, daemon));
1060 setWorkerKeepAlive(optionMap.get(Options.WORKER_TASK_KEEPALIVE, workerKeepAlive));
1061 if (optionMap.contains(Options.WORKER_IO_THREADS)) {
1062 setWorkerIoThreads(optionMap.get(Options.WORKER_IO_THREADS, 1));
1063 } else if (optionMap.contains(Options.WORKER_READ_THREADS) || optionMap.contains(Options.WORKER_WRITE_THREADS)) {
1064 setWorkerIoThreads(max(optionMap.get(Options.WORKER_READ_THREADS, 1), optionMap.get(Options.WORKER_WRITE_THREADS, 1)));
1065 }
1066 setWorkerStackSize(optionMap.get(Options.STACK_SIZE, workerStackSize));
1067 return this;
1068 }
1069
1070 public Builder addBindAddressConfiguration(CidrAddress cidrAddress, InetAddress bindAddress) {
1071 return addBindAddressConfiguration(cidrAddress, new InetSocketAddress(bindAddress, 0));
1072 }
1073
1074 public Builder addBindAddressConfiguration(CidrAddress cidrAddress, InetSocketAddress bindAddress) {
1075 final Class<? extends InetAddress> networkAddrClass = cidrAddress.getNetworkAddress().getClass();
1076 if (bindAddress.isUnresolved()) {
1077 throw Messages.msg.addressUnresolved(bindAddress);
1078 }
1079 if (networkAddrClass != bindAddress.getAddress().getClass()) {
1080 throw Messages.msg.mismatchAddressType(networkAddrClass, bindAddress.getAddress().getClass());
1081 }
1082 bindAddressConfigurations.put(cidrAddress, bindAddress);
1083 return this;
1084 }
1085
1086 public Builder setBindAddressConfigurations(CidrAddressTable<InetSocketAddress> newTable) {
1087 bindAddressConfigurations = newTable;
1088 return this;
1089 }
1090
1091 public CidrAddressTable<InetSocketAddress> getBindAddressConfigurations() {
1092 return bindAddressConfigurations;
1093 }
1094
1095 public Runnable getTerminationTask() {
1096 return terminationTask;
1097 }
1098
1099 public Builder setTerminationTask(final Runnable terminationTask) {
1100 this.terminationTask = terminationTask;
1101 return this;
1102 }
1103
1104 public String getWorkerName() {
1105 return workerName;
1106 }
1107
1108 public Builder setWorkerName(final String workerName) {
1109 this.workerName = workerName;
1110 return this;
1111 }
1112
1113 public int getCoreWorkerPoolSize() {
1114 return coreWorkerPoolSize;
1115 }
1116
1117 public Builder setCoreWorkerPoolSize(final int coreWorkerPoolSize) {
1118 Assert.checkMinimumParameter("coreWorkerPoolSize", 0, coreWorkerPoolSize);
1119 this.coreWorkerPoolSize = coreWorkerPoolSize;
1120 return this;
1121 }
1122
1123 public int getMaxWorkerPoolSize() {
1124 return maxWorkerPoolSize;
1125 }
1126
1127 public Builder setMaxWorkerPoolSize(final int maxWorkerPoolSize) {
1128 Assert.checkMinimumParameter("maxWorkerPoolSize", 0, maxWorkerPoolSize);
1129 this.maxWorkerPoolSize = maxWorkerPoolSize;
1130 return this;
1131 }
1132
1133 public ThreadGroup getThreadGroup() {
1134 return threadGroup;
1135 }
1136
1137 public Builder setThreadGroup(final ThreadGroup threadGroup) {
1138 this.threadGroup = threadGroup;
1139 return this;
1140 }
1141
1142 public boolean isDaemon() {
1143 return daemon;
1144 }
1145
1146 public Builder setDaemon(final boolean daemon) {
1147 this.daemon = daemon;
1148 return this;
1149 }
1150
1151 public long getWorkerKeepAlive() {
1152 return workerKeepAlive;
1153 }
1154
1155 public Builder setWorkerKeepAlive(final int workerKeepAlive) {
1156 Assert.checkMinimumParameter("workerKeepAlive", 0, workerKeepAlive);
1157 this.workerKeepAlive = workerKeepAlive;
1158 return this;
1159 }
1160
1161 public int getWorkerIoThreads() {
1162 return workerIoThreads;
1163 }
1164
1165 public Builder setWorkerIoThreads(final int workerIoThreads) {
1166 Assert.checkMinimumParameter("workerIoThreads", 0, workerIoThreads);
1167 this.workerIoThreads = workerIoThreads;
1168 return this;
1169 }
1170
1171 public long getWorkerStackSize() {
1172 return workerStackSize;
1173 }
1174
1175 public Builder setWorkerStackSize(final long workerStackSize) {
1176 Assert.checkMinimumParameter("workerStackSize", 0, workerStackSize);
1177 this.workerStackSize = workerStackSize;
1178 return this;
1179 }
1180
1181 public ExecutorService getExternalExecutorService() {
1182 return externalExecutorService;
1183 }
1184
1185 public Builder setExternalExecutorService(final ExecutorService executorService) {
1186 this.externalExecutorService = executorService;
1187 return this;
1188 }
1189
1190 public XnioWorker build() {
1191 return xnio.build(this);
1192 }
1193 }
1194
1195
1196
1197
1198
1199
1200
1201 static class StreamConnectionWrapListener implements ChannelListener<StreamConnection> {
1202
1203 private final FutureResult<ConnectedStreamChannel> futureResult;
1204 private final ChannelListener<? super ConnectedStreamChannel> openListener;
1205
1206 public StreamConnectionWrapListener(final FutureResult<ConnectedStreamChannel> futureResult, final ChannelListener<? super ConnectedStreamChannel> openListener) {
1207 this.futureResult = futureResult;
1208 this.openListener = openListener;
1209 }
1210
1211 public void handleEvent(final StreamConnection channel) {
1212 final AssembledConnectedStreamChannel assembledChannel = new AssembledConnectedStreamChannel(channel, channel.getSourceChannel(), channel.getSinkChannel());
1213 if (!futureResult.setResult(assembledChannel)) {
1214 safeClose(assembledChannel);
1215 } else {
1216 ChannelListeners.invokeChannelListener(assembledChannel, openListener);
1217 }
1218 }
1219 }
1220
1221 static class MessageConnectionWrapListener implements ChannelListener<MessageConnection> {
1222
1223 private final FutureResult<ConnectedMessageChannel> futureResult;
1224 private final ChannelListener<? super ConnectedMessageChannel> openListener;
1225
1226 public MessageConnectionWrapListener(final FutureResult<ConnectedMessageChannel> futureResult, final ChannelListener<? super ConnectedMessageChannel> openListener) {
1227 this.futureResult = futureResult;
1228 this.openListener = openListener;
1229 }
1230
1231 public void handleEvent(final MessageConnection channel) {
1232 final AssembledConnectedMessageChannel assembledChannel = new AssembledConnectedMessageChannel(channel, channel.getSourceChannel(), channel.getSinkChannel());
1233 if (!futureResult.setResult(assembledChannel)) {
1234 safeClose(assembledChannel);
1235 } else {
1236 ChannelListeners.invokeChannelListener(assembledChannel, openListener);
1237 }
1238 }
1239 }
1240
1241 private static final IoFuture.HandlingNotifier<StreamConnection, FutureResult<ConnectedStreamChannel>> STREAM_WRAPPING_HANDLER = new IoFuture.HandlingNotifier<StreamConnection, FutureResult<ConnectedStreamChannel>>() {
1242 public void handleCancelled(final FutureResult<ConnectedStreamChannel> attachment) {
1243 attachment.setCancelled();
1244 }
1245
1246 public void handleFailed(final IOException exception, final FutureResult<ConnectedStreamChannel> attachment) {
1247 attachment.setException(exception);
1248 }
1249 };
1250
1251 private static final IoFuture.HandlingNotifier<MessageConnection, FutureResult<ConnectedMessageChannel>> MESSAGE_WRAPPING_HANDLER = new IoFuture.HandlingNotifier<MessageConnection, FutureResult<ConnectedMessageChannel>>() {
1252 public void handleCancelled(final FutureResult<ConnectedMessageChannel> attachment) {
1253 attachment.setCancelled();
1254 }
1255
1256 public void handleFailed(final IOException exception, final FutureResult<ConnectedMessageChannel> attachment) {
1257 attachment.setException(exception);
1258 }
1259 };
1260
1261 class WorkerThreadFactory implements ThreadFactory {
1262
1263 private final ThreadGroup threadGroup;
1264 private final long stackSize;
1265 private final boolean markThreadAsDaemon;
1266
1267 WorkerThreadFactory(final ThreadGroup threadGroup, final long stackSize, final boolean markThreadAsDaemon) {
1268 this.threadGroup = threadGroup;
1269 this.stackSize = stackSize;
1270 this.markThreadAsDaemon = markThreadAsDaemon;
1271 }
1272
1273 public Thread newThread(final Runnable r) {
1274 return doPrivileged(new PrivilegedAction<Thread>() {
1275 public Thread run() {
1276 final Thread taskThread = new Thread(threadGroup, r, name + " task-" + getNextSeq(), stackSize);
1277
1278 if (markThreadAsDaemon) {
1279 taskThread.setDaemon(true);
1280 }
1281 return taskThread;
1282 }
1283 });
1284 }
1285 }
1286
1287 interface TaskPool {
1288
1289 void shutdown();
1290
1291 List<Runnable> shutdownNow();
1292
1293 void execute(Runnable command);
1294
1295 int getCorePoolSize();
1296
1297 int getMaximumPoolSize();
1298
1299 long getKeepAliveTime(TimeUnit unit);
1300
1301 void setCorePoolSize(int size);
1302
1303 void setMaximumPoolSize(int size);
1304
1305 void setKeepAliveTime(long time, TimeUnit unit);
1306
1307 int getActiveCount();
1308
1309 int getPoolSize();
1310
1311 int getQueueSize();
1312 }
1313
1314 static class DefaultThreadPoolExecutor extends ThreadPoolExecutor {
1315 private final Runnable terminationTask;
1316
1317 DefaultThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit, final BlockingQueue<Runnable> workQueue, final ThreadFactory threadFactory, final Runnable terminationTask) {
1318 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
1319 this.terminationTask = terminationTask;
1320 }
1321
1322 protected void terminated() {
1323 terminationTask.run();
1324 }
1325
1326 public void setCorePoolSize(final int size) {
1327 setMaximumPoolSize(size);
1328 }
1329
1330 public void setMaximumPoolSize(final int size) {
1331 if (size > getCorePoolSize()) {
1332 super.setMaximumPoolSize(size);
1333 super.setCorePoolSize(size);
1334 } else {
1335 super.setCorePoolSize(size);
1336 super.setMaximumPoolSize(size);
1337 }
1338 }
1339 }
1340
1341 static class ThreadPoolExecutorTaskPool implements TaskPool {
1342 private final ThreadPoolExecutor delegate;
1343
1344 ThreadPoolExecutorTaskPool(final ThreadPoolExecutor delegate) {
1345 this.delegate = delegate;
1346 }
1347
1348 @Override
1349 public void shutdown() {
1350 delegate.shutdown();
1351 }
1352
1353 @Override
1354 public List<Runnable> shutdownNow() {
1355 return delegate.shutdownNow();
1356 }
1357
1358 @Override
1359 public void execute(final Runnable command) {
1360 delegate.execute(command);
1361 }
1362
1363 @Override
1364 public int getCorePoolSize() {
1365 return delegate.getCorePoolSize();
1366 }
1367
1368 @Override
1369 public int getMaximumPoolSize() {
1370 return delegate.getMaximumPoolSize();
1371 }
1372
1373 @Override
1374 public long getKeepAliveTime(final TimeUnit unit) {
1375 return delegate.getKeepAliveTime(unit);
1376 }
1377
1378 @Override
1379 public void setCorePoolSize(final int size) {
1380 delegate.setCorePoolSize(size);
1381 }
1382
1383 @Override
1384 public void setMaximumPoolSize(final int size) {
1385 delegate.setMaximumPoolSize(size);
1386 }
1387
1388 @Override
1389 public void setKeepAliveTime(final long time, final TimeUnit unit) {
1390 delegate.setKeepAliveTime(time, unit);
1391 }
1392
1393 @Override
1394 public int getActiveCount() {
1395 return delegate.getActiveCount();
1396 }
1397
1398 @Override
1399 public int getPoolSize() {
1400 return delegate.getPoolSize();
1401 }
1402
1403 @Override
1404 public int getQueueSize() {
1405 return delegate.getQueue().size();
1406 }
1407 }
1408
1409 static class EnhancedQueueExecutorTaskPool implements TaskPool {
1410 private final EnhancedQueueExecutor executor;
1411
1412 EnhancedQueueExecutorTaskPool(final EnhancedQueueExecutor executor) {
1413 this.executor = executor;
1414 }
1415
1416 public void shutdown() {
1417 executor.shutdown();
1418 }
1419
1420 public List<Runnable> shutdownNow() {
1421 return executor.shutdownNow();
1422 }
1423
1424 public void execute(final Runnable command) {
1425 executor.execute(command);
1426 }
1427
1428 public int getCorePoolSize() {
1429 return executor.getCorePoolSize();
1430 }
1431
1432 public int getMaximumPoolSize() {
1433 return executor.getMaximumPoolSize();
1434 }
1435
1436 public long getKeepAliveTime(final TimeUnit unit) {
1437 return executor.getKeepAliveTime(unit);
1438 }
1439
1440 public void setCorePoolSize(final int size) {
1441 executor.setCorePoolSize(size);
1442 }
1443
1444 public void setMaximumPoolSize(final int size) {
1445 executor.setMaximumPoolSize(size);
1446 }
1447
1448 public void setKeepAliveTime(final long time, final TimeUnit unit) {
1449 executor.setKeepAliveTime(time, unit);
1450 }
1451
1452 public int getActiveCount() {
1453 return executor.getActiveCount();
1454 }
1455
1456 public int getPoolSize() {
1457 return executor.getPoolSize();
1458 }
1459
1460 public int getQueueSize() {
1461 return executor.getQueueSize();
1462 }
1463 }
1464
1465 static class ExecutorServiceTaskPool implements TaskPool {
1466 private final ExecutorService delegate;
1467
1468 ExecutorServiceTaskPool(final ExecutorService delegate) {
1469 this.delegate = delegate;
1470 }
1471
1472 public void shutdown() {
1473 delegate.shutdown();
1474 }
1475
1476 public List<Runnable> shutdownNow() {
1477 return delegate.shutdownNow();
1478 }
1479
1480 public void execute(final Runnable command) {
1481 delegate.execute(command);
1482 }
1483
1484 public int getCorePoolSize() {
1485 return -1;
1486 }
1487
1488 public int getMaximumPoolSize() {
1489 return -1;
1490 }
1491
1492 public long getKeepAliveTime(final TimeUnit unit) {
1493 return -1;
1494 }
1495
1496 public void setCorePoolSize(final int size) {
1497 }
1498
1499 public void setMaximumPoolSize(final int size) {
1500 }
1501
1502 public void setKeepAliveTime(final long time, final TimeUnit unit) {
1503 }
1504
1505 public int getActiveCount() {
1506 return -1;
1507 }
1508
1509 public int getPoolSize() {
1510 return -1;
1511 }
1512
1513 public int getQueueSize() {
1514 return -1;
1515 }
1516 }
1517
1518 static class ExternalTaskPool implements TaskPool {
1519 private final TaskPool delegate;
1520
1521 ExternalTaskPool(final TaskPool delegate) {
1522 this.delegate = delegate;
1523 }
1524
1525 @Override
1526 public void shutdown() {
1527
1528 }
1529
1530 @Override
1531 public List<Runnable> shutdownNow() {
1532 return Collections.emptyList();
1533 }
1534
1535 @Override
1536 public void execute(final Runnable command) {
1537 delegate.execute(command);
1538 }
1539
1540 @Override
1541 public int getCorePoolSize() {
1542 return delegate.getCorePoolSize();
1543 }
1544
1545 @Override
1546 public int getMaximumPoolSize() {
1547 return delegate.getMaximumPoolSize();
1548 }
1549
1550 @Override
1551 public long getKeepAliveTime(final TimeUnit unit) {
1552 return delegate.getKeepAliveTime(unit);
1553 }
1554
1555 @Override
1556 public void setCorePoolSize(final int size) {
1557 delegate.setCorePoolSize(size);
1558 }
1559
1560 @Override
1561 public void setMaximumPoolSize(final int size) {
1562 delegate.setMaximumPoolSize(size);
1563 }
1564
1565 @Override
1566 public void setKeepAliveTime(final long time, final TimeUnit unit) {
1567 delegate.setKeepAliveTime(time, unit);
1568 }
1569
1570 @Override
1571 public int getActiveCount() {
1572 return delegate.getActiveCount();
1573 }
1574
1575 @Override
1576 public int getPoolSize() {
1577 return delegate.getPoolSize();
1578 }
1579
1580 @Override
1581 public int getQueueSize() {
1582 return delegate.getQueueSize();
1583 }
1584 }
1585 }
1586
1587