1 /*
2  * JBoss, Home of Professional Open Source.
3  * Copyright 2012 Red Hat, Inc., and individual contributors
4  * as indicated by the @author tags.
5  *
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18
19 package org.xnio.nio;
20
21 import static org.xnio.IoUtils.safeClose;
22 import static org.xnio.nio.Log.log;
23
24 import java.io.Closeable;
25 import java.io.IOException;
26 import java.net.Inet6Address;
27 import java.net.InetAddress;
28 import java.net.InetSocketAddress;
29 import java.net.StandardProtocolFamily;
30 import java.nio.channels.DatagramChannel;
31 import java.nio.channels.Selector;
32 import java.nio.channels.ServerSocketChannel;
33 import java.util.LinkedHashSet;
34 import java.util.List;
35 import java.util.Set;
36 import java.util.concurrent.CopyOnWriteArrayList;
37 import java.util.concurrent.ThreadLocalRandom;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
40 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
41 import java.util.concurrent.locks.LockSupport;
42
43 import org.wildfly.common.net.CidrAddressTable;
44 import org.xnio.Bits;
45 import org.xnio.ChannelListener;
46 import org.xnio.ChannelListeners;
47 import org.xnio.ManagementRegistration;
48 import org.xnio.ClosedWorkerException;
49 import org.xnio.IoUtils;
50 import org.xnio.Option;
51 import org.xnio.OptionMap;
52 import org.xnio.Options;
53 import org.xnio.StreamConnection;
54 import org.xnio.XnioWorker;
55 import org.xnio.channels.AcceptingChannel;
56 import org.xnio.channels.MulticastMessageChannel;
57 import org.xnio.management.XnioServerMXBean;
58 import org.xnio.management.XnioWorkerMXBean;
59
60 /**
61  * @author <a href="mailto:david.lloyd@redhat.com">David M. Lloyd</a>
62  */

63 final class NioXnioWorker extends XnioWorker {
64
65     private static final int CLOSE_REQ = (1 << 31);
66     private static final int CLOSE_COMP = (1 << 30);
67     private final long workerStackSize;
68
69     private volatile int state = 1;
70
71     private final WorkerThread[] workerThreads;
72     private final WorkerThread acceptThread;
73     private final NioWorkerMetrics metrics;
74
75     @SuppressWarnings("unused")
76     private volatile Thread shutdownWaiter;
77
78     private static final AtomicReferenceFieldUpdater<NioXnioWorker, Thread> shutdownWaiterUpdater = AtomicReferenceFieldUpdater.newUpdater(NioXnioWorker.class, Thread.class"shutdownWaiter");
79
80     private static final AtomicIntegerFieldUpdater<NioXnioWorker> stateUpdater = AtomicIntegerFieldUpdater.newUpdater(NioXnioWorker.class"state");
81
82     @SuppressWarnings("deprecation")
83     NioXnioWorker(final Builder builder) {
84         super(builder);
85         final NioXnio xnio = (NioXnio) builder.getXnio();
86         final int threadCount = builder.getWorkerIoThreads();
87         this.workerStackSize = builder.getWorkerStackSize();
88         final String workerName = getName();
89         WorkerThread[] workerThreads;
90         workerThreads = new WorkerThread[threadCount];
91         final ThreadGroup threadGroup = builder.getThreadGroup();
92         final boolean markWorkerThreadAsDaemon = builder.isDaemon();
93         boolean ok = false;
94         try {
95             for (int i = 0; i < threadCount; i++) {
96                 final Selector threadSelector;
97                 try {
98                     threadSelector = xnio.mainSelectorCreator.open();
99                 } catch (IOException e) {
100                     throw Log.log.unexpectedSelectorOpenProblem(e);
101                 }
102                 final WorkerThread workerThread = new WorkerThread(this, threadSelector, String.format("%s I/O-%d", workerName, Integer.valueOf(i + 1)), threadGroup, workerStackSize, i);
103                 // Mark as daemon if the Options.THREAD_DAEMON has been set
104                 if (markWorkerThreadAsDaemon) {
105                     workerThread.setDaemon(true);
106                 }
107                 workerThreads[i] = workerThread;
108             }
109             final Selector threadSelector;
110             try {
111                 threadSelector = xnio.mainSelectorCreator.open();
112             } catch (IOException e) {
113                 throw Log.log.unexpectedSelectorOpenProblem(e);
114             }
115             acceptThread = new WorkerThread(this, threadSelector, String.format("%s Accept", workerName), threadGroup, workerStackSize, threadCount);
116             if (markWorkerThreadAsDaemon) {
117                 acceptThread.setDaemon(true);
118             }
119             ok = true;
120         } finally {
121             if (! ok) {
122                 for (WorkerThread worker : workerThreads) {
123                     if (worker != null) safeClose(worker.getSelector());
124                 }
125             }
126         }
127         this.workerThreads = workerThreads;
128         this.metrics = new NioWorkerMetrics(workerName);
129         metrics.register();
130     }
131
132     void start() {
133         for (WorkerThread worker : workerThreads) {
134             openResourceUnconditionally();
135             worker.start();
136         }
137         openResourceUnconditionally();
138         acceptThread.start();
139     }
140
141     protected CidrAddressTable<InetSocketAddress> getBindAddressTable() {
142         return super.getBindAddressTable();
143     }
144
145     protected WorkerThread chooseThread() {
146         return getIoThread(ThreadLocalRandom.current().nextInt());
147     }
148
149     public WorkerThread getIoThread(final int hashCode) {
150         final WorkerThread[] workerThreads = this.workerThreads;
151         final int length = workerThreads.length;
152         if (length == 0) {
153             throw log.noThreads();
154         }
155         if (length == 1) {
156             return workerThreads[0];
157         }
158         return workerThreads[Math.abs(hashCode % length)];
159     }
160
161     public int getIoThreadCount() {
162         return workerThreads.length;
163     }
164
165     WorkerThread[] getAll() {
166         return workerThreads;
167     }
168
169     protected AcceptingChannel<StreamConnection> createTcpConnectionServer(final InetSocketAddress bindAddress, final ChannelListener<? super AcceptingChannel<StreamConnection>> acceptListener, final OptionMap optionMap) throws IOException {
170         checkShutdown();
171         boolean ok = false;
172         final ServerSocketChannel channel = ServerSocketChannel.open();
173         try {
174             if (optionMap.contains(Options.RECEIVE_BUFFER)) channel.socket().setReceiveBufferSize(optionMap.get(Options.RECEIVE_BUFFER, -1));
175             channel.socket().setReuseAddress(optionMap.get(Options.REUSE_ADDRESSES, true));
176             channel.configureBlocking(false);
177             if (optionMap.contains(Options.BACKLOG)) {
178                 channel.socket().bind(bindAddress, optionMap.get(Options.BACKLOG, 128));
179             } else {
180                 channel.socket().bind(bindAddress);
181             }
182             if (false) {
183                 final NioTcpServer server = new NioTcpServer(this, channel, optionMap, false);
184                 server.setAcceptListener(acceptListener);
185                 ok = true;
186                 return server;
187             } else {
188                 final QueuedNioTcpServer2 server = new QueuedNioTcpServer2(new NioTcpServer(this, channel, optionMap, true));
189                 server.setAcceptListener(acceptListener);
190                 ok = true;
191                 return server;
192             }
193         } finally {
194             if (! ok) {
195                 IoUtils.safeClose(channel);
196             }
197         }
198     }
199
200
201     /** {@inheritDoc} */
202     public MulticastMessageChannel createUdpServer(final InetSocketAddress bindAddress, final ChannelListener<? super MulticastMessageChannel> bindListener, final OptionMap optionMap) throws IOException {
203         checkShutdown();
204         final DatagramChannel channel;
205         if (bindAddress != null) {
206             InetAddress address = bindAddress.getAddress();
207             if (address instanceof Inet6Address) {
208                 channel = DatagramChannel.open(StandardProtocolFamily.INET6);
209             } else {
210                 channel = DatagramChannel.open(StandardProtocolFamily.INET);
211             }
212         } else {
213             channel = DatagramChannel.open();
214         }
215         channel.configureBlocking(false);
216         if (optionMap.contains(Options.BROADCAST)) channel.socket().setBroadcast(optionMap.get(Options.BROADCAST, false));
217         if (optionMap.contains(Options.IP_TRAFFIC_CLASS)) channel.socket().setTrafficClass(optionMap.get(Options.IP_TRAFFIC_CLASS, -1));
218         if (optionMap.contains(Options.RECEIVE_BUFFER)) channel.socket().setReceiveBufferSize(optionMap.get(Options.RECEIVE_BUFFER, -1));
219         channel.socket().setReuseAddress(optionMap.get(Options.REUSE_ADDRESSES, true));
220         if (optionMap.contains(Options.SEND_BUFFER)) channel.socket().setSendBufferSize(optionMap.get(Options.SEND_BUFFER, -1));
221         channel.socket().bind(bindAddress);
222         final NioUdpChannel udpChannel = new NioUdpChannel(this, channel);
223         ChannelListeners.invokeChannelListener(udpChannel, bindListener);
224         return udpChannel;
225     }
226
227     public boolean isShutdown() {
228         return (state & CLOSE_REQ) != 0;
229     }
230
231     public boolean isTerminated() {
232         return (state & CLOSE_COMP) != 0;
233     }
234
235     /**
236      * Open a resource unconditionally (i.e. accepting a connection on an open server).
237      */

238     void openResourceUnconditionally() {
239         int oldState = stateUpdater.getAndIncrement(this);
240         if (log.isTraceEnabled()) {
241             log.tracef("CAS %s %08x -> %08x"this, Integer.valueOf(oldState), Integer.valueOf(oldState + 1));
242         }
243     }
244
245     void checkShutdown() throws ClosedWorkerException {
246         if (isShutdown())
247             throw log.workerShutDown();
248     }
249
250     void closeResource() {
251         int oldState = stateUpdater.decrementAndGet(this);
252         if (log.isTraceEnabled()) {
253             log.tracef("CAS %s %08x -> %08x"this, Integer.valueOf(oldState + 1), Integer.valueOf(oldState));
254         }
255         while (oldState == CLOSE_REQ) {
256             if (stateUpdater.compareAndSet(this, CLOSE_REQ, CLOSE_REQ | CLOSE_COMP)) {
257                 log.tracef("CAS %s %08x -> %08x (close complete)"this, Integer.valueOf(CLOSE_REQ), Integer.valueOf(CLOSE_REQ | CLOSE_COMP));
258                 safeUnpark(shutdownWaiterUpdater.getAndSet(thisnull));
259                 final Runnable task = getTerminationTask();
260                 if (task != nulltry {
261                     task.run();
262                 } catch (Throwable ignored) {}
263                 return;
264             }
265             oldState = state;
266         }
267     }
268
269     public void shutdown() {
270         int oldState;
271         oldState = state;
272         while ((oldState & CLOSE_REQ) == 0) {
273             // need to do the close ourselves...
274             if (! stateUpdater.compareAndSet(this, oldState, oldState | CLOSE_REQ)) {
275                 // changed in the meantime
276                 oldState = state;
277                 continue;
278             }
279             log.tracef("Initiating shutdown of %s"this);
280             for (WorkerThread worker : workerThreads) {
281                 worker.shutdown();
282             }
283             acceptThread.shutdown();
284             shutDownTaskPool();
285             return;
286         }
287         log.tracef("Idempotent shutdown of %s"this);
288         return;
289     }
290
291     public List<Runnable> shutdownNow() {
292         shutdown();
293         return shutDownTaskPoolNow();
294     }
295
296     public boolean awaitTermination(final long timeout, final TimeUnit unit) throws InterruptedException {
297         int oldState = state;
298         if (Bits.allAreSet(oldState, CLOSE_COMP)) {
299             return true;
300         }
301         long then = System.nanoTime();
302         long duration = unit.toNanos(timeout);
303         final Thread myThread = Thread.currentThread();
304         while (Bits.allAreClear(oldState = state, CLOSE_COMP)) {
305             final Thread oldThread = shutdownWaiterUpdater.getAndSet(this, myThread);
306             try {
307                 if (Bits.allAreSet(oldState = state, CLOSE_COMP)) {
308                     break;
309                 }
310                 LockSupport.parkNanos(this, duration);
311                 if (Thread.interrupted()) {
312                     throw new InterruptedException();
313                 }
314                 long now = System.nanoTime();
315                 duration -= now - then;
316                 if (duration < 0L) {
317                     oldState = state;
318                     break;
319                 }
320             } finally {
321                 safeUnpark(oldThread);
322             }
323         }
324         return Bits.allAreSet(oldState, CLOSE_COMP);
325     }
326
327     public void awaitTermination() throws InterruptedException {
328         int oldState = state;
329         if (Bits.allAreSet(oldState, CLOSE_COMP)) {
330             return;
331         }
332         final Thread myThread = Thread.currentThread();
333         while (Bits.allAreClear(state, CLOSE_COMP)) {
334             final Thread oldThread = shutdownWaiterUpdater.getAndSet(this, myThread);
335             try {
336                 if (Bits.allAreSet(state, CLOSE_COMP)) {
337                     break;
338                 }
339                 LockSupport.park(this);
340                 if (Thread.interrupted()) {
341                     throw new InterruptedException();
342                 }
343             } finally {
344                 safeUnpark(oldThread);
345             }
346         }
347     }
348
349     private static void safeUnpark(final Thread waiter) {
350         if (waiter != null) LockSupport.unpark(waiter);
351     }
352
353     protected void taskPoolTerminated() {
354         safeClose(metrics);
355         closeResource();
356     }
357
358     @Override
359     public <T> T getOption(Option<T> option) throws IOException {
360         if (option.equals(Options.WORKER_IO_THREADS)) {
361             return option.cast(workerThreads.length);
362         } else if (option.equals(Options.STACK_SIZE)) {
363             return option.cast(workerStackSize);
364         } else {
365             return super.getOption(option);
366         }
367     }
368
369     public NioXnio getXnio() {
370         return (NioXnio) super.getXnio();
371     }
372
373     WorkerThread getAcceptThread() {
374         return acceptThread;
375     }
376
377     @Override
378     public XnioWorkerMXBean getMXBean() {
379         return metrics;
380     }
381
382     @Override
383     protected ManagementRegistration registerServerMXBean(XnioServerMXBean serverMXBean) {
384         return metrics.registerServerMXBean(serverMXBean);
385     }
386
387     private class NioWorkerMetrics implements XnioWorkerMXBean,Closeable {
388         private final String workerName;
389         private final CopyOnWriteArrayList<XnioServerMXBean> serverMetrics = new CopyOnWriteArrayList<>();
390         private Closeable mbeanHandle;
391
392         private NioWorkerMetrics(String workerName) {
393             this.workerName = workerName;
394         }
395
396         public String getProviderName() {
397             return "nio";
398         }
399
400         public String getName() {
401             return workerName;
402         }
403
404         public boolean isShutdownRequested() {
405             return isShutdown();
406         }
407
408         public int getCoreWorkerPoolSize() {
409             return NioXnioWorker.this.getCoreWorkerPoolSize();
410         }
411
412         public int getMaxWorkerPoolSize() {
413             return NioXnioWorker.this.getMaxWorkerPoolSize();
414         }
415
416         public int getWorkerPoolSize() {
417             return NioXnioWorker.this.getWorkerPoolSize();
418         }
419
420         public int getBusyWorkerThreadCount() {
421             return NioXnioWorker.this.getBusyWorkerThreadCount();
422         }
423
424         public int getIoThreadCount() {
425             return NioXnioWorker.this.getIoThreadCount();
426         }
427
428         public int getWorkerQueueSize() {
429             return NioXnioWorker.this.getWorkerQueueSize();
430         }
431
432         private ManagementRegistration registerServerMXBean(XnioServerMXBean serverMXBean){
433             serverMetrics.addIfAbsent(serverMXBean);
434             final Closeable handle = NioXnio.register(serverMXBean);
435             return () -> {
436                 serverMetrics.remove(serverMXBean);
437                 safeClose(handle);
438             };
439         }
440
441         public Set<XnioServerMXBean> getServerMXBeans() {
442             return new LinkedHashSet<>(serverMetrics);
443         }
444         private void register(){
445             this.mbeanHandle = NioXnio.register(this);
446         }
447
448         @Override
449         public void close() throws IOException {
450             safeClose(mbeanHandle);
451             serverMetrics.clear();
452         }
453     }
454 }
455