1
18
19 package org.xnio.nio;
20
21 import static org.xnio.IoUtils.safeClose;
22 import static org.xnio.nio.Log.log;
23
24 import java.io.Closeable;
25 import java.io.IOException;
26 import java.net.Inet6Address;
27 import java.net.InetAddress;
28 import java.net.InetSocketAddress;
29 import java.net.StandardProtocolFamily;
30 import java.nio.channels.DatagramChannel;
31 import java.nio.channels.Selector;
32 import java.nio.channels.ServerSocketChannel;
33 import java.util.LinkedHashSet;
34 import java.util.List;
35 import java.util.Set;
36 import java.util.concurrent.CopyOnWriteArrayList;
37 import java.util.concurrent.ThreadLocalRandom;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
40 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
41 import java.util.concurrent.locks.LockSupport;
42
43 import org.wildfly.common.net.CidrAddressTable;
44 import org.xnio.Bits;
45 import org.xnio.ChannelListener;
46 import org.xnio.ChannelListeners;
47 import org.xnio.ManagementRegistration;
48 import org.xnio.ClosedWorkerException;
49 import org.xnio.IoUtils;
50 import org.xnio.Option;
51 import org.xnio.OptionMap;
52 import org.xnio.Options;
53 import org.xnio.StreamConnection;
54 import org.xnio.XnioWorker;
55 import org.xnio.channels.AcceptingChannel;
56 import org.xnio.channels.MulticastMessageChannel;
57 import org.xnio.management.XnioServerMXBean;
58 import org.xnio.management.XnioWorkerMXBean;
59
60
63 final class NioXnioWorker extends XnioWorker {
64
65 private static final int CLOSE_REQ = (1 << 31);
66 private static final int CLOSE_COMP = (1 << 30);
67 private final long workerStackSize;
68
69 private volatile int state = 1;
70
71 private final WorkerThread[] workerThreads;
72 private final WorkerThread acceptThread;
73 private final NioWorkerMetrics metrics;
74
75 @SuppressWarnings("unused")
76 private volatile Thread shutdownWaiter;
77
78 private static final AtomicReferenceFieldUpdater<NioXnioWorker, Thread> shutdownWaiterUpdater = AtomicReferenceFieldUpdater.newUpdater(NioXnioWorker.class, Thread.class, "shutdownWaiter");
79
80 private static final AtomicIntegerFieldUpdater<NioXnioWorker> stateUpdater = AtomicIntegerFieldUpdater.newUpdater(NioXnioWorker.class, "state");
81
82 @SuppressWarnings("deprecation")
83 NioXnioWorker(final Builder builder) {
84 super(builder);
85 final NioXnio xnio = (NioXnio) builder.getXnio();
86 final int threadCount = builder.getWorkerIoThreads();
87 this.workerStackSize = builder.getWorkerStackSize();
88 final String workerName = getName();
89 WorkerThread[] workerThreads;
90 workerThreads = new WorkerThread[threadCount];
91 final ThreadGroup threadGroup = builder.getThreadGroup();
92 final boolean markWorkerThreadAsDaemon = builder.isDaemon();
93 boolean ok = false;
94 try {
95 for (int i = 0; i < threadCount; i++) {
96 final Selector threadSelector;
97 try {
98 threadSelector = xnio.mainSelectorCreator.open();
99 } catch (IOException e) {
100 throw Log.log.unexpectedSelectorOpenProblem(e);
101 }
102 final WorkerThread workerThread = new WorkerThread(this, threadSelector, String.format("%s I/O-%d", workerName, Integer.valueOf(i + 1)), threadGroup, workerStackSize, i);
103
104 if (markWorkerThreadAsDaemon) {
105 workerThread.setDaemon(true);
106 }
107 workerThreads[i] = workerThread;
108 }
109 final Selector threadSelector;
110 try {
111 threadSelector = xnio.mainSelectorCreator.open();
112 } catch (IOException e) {
113 throw Log.log.unexpectedSelectorOpenProblem(e);
114 }
115 acceptThread = new WorkerThread(this, threadSelector, String.format("%s Accept", workerName), threadGroup, workerStackSize, threadCount);
116 if (markWorkerThreadAsDaemon) {
117 acceptThread.setDaemon(true);
118 }
119 ok = true;
120 } finally {
121 if (! ok) {
122 for (WorkerThread worker : workerThreads) {
123 if (worker != null) safeClose(worker.getSelector());
124 }
125 }
126 }
127 this.workerThreads = workerThreads;
128 this.metrics = new NioWorkerMetrics(workerName);
129 metrics.register();
130 }
131
132 void start() {
133 for (WorkerThread worker : workerThreads) {
134 openResourceUnconditionally();
135 worker.start();
136 }
137 openResourceUnconditionally();
138 acceptThread.start();
139 }
140
141 protected CidrAddressTable<InetSocketAddress> getBindAddressTable() {
142 return super.getBindAddressTable();
143 }
144
145 protected WorkerThread chooseThread() {
146 return getIoThread(ThreadLocalRandom.current().nextInt());
147 }
148
149 public WorkerThread getIoThread(final int hashCode) {
150 final WorkerThread[] workerThreads = this.workerThreads;
151 final int length = workerThreads.length;
152 if (length == 0) {
153 throw log.noThreads();
154 }
155 if (length == 1) {
156 return workerThreads[0];
157 }
158 return workerThreads[Math.abs(hashCode % length)];
159 }
160
161 public int getIoThreadCount() {
162 return workerThreads.length;
163 }
164
165 WorkerThread[] getAll() {
166 return workerThreads;
167 }
168
169 protected AcceptingChannel<StreamConnection> createTcpConnectionServer(final InetSocketAddress bindAddress, final ChannelListener<? super AcceptingChannel<StreamConnection>> acceptListener, final OptionMap optionMap) throws IOException {
170 checkShutdown();
171 boolean ok = false;
172 final ServerSocketChannel channel = ServerSocketChannel.open();
173 try {
174 if (optionMap.contains(Options.RECEIVE_BUFFER)) channel.socket().setReceiveBufferSize(optionMap.get(Options.RECEIVE_BUFFER, -1));
175 channel.socket().setReuseAddress(optionMap.get(Options.REUSE_ADDRESSES, true));
176 channel.configureBlocking(false);
177 if (optionMap.contains(Options.BACKLOG)) {
178 channel.socket().bind(bindAddress, optionMap.get(Options.BACKLOG, 128));
179 } else {
180 channel.socket().bind(bindAddress);
181 }
182 if (false) {
183 final NioTcpServer server = new NioTcpServer(this, channel, optionMap, false);
184 server.setAcceptListener(acceptListener);
185 ok = true;
186 return server;
187 } else {
188 final QueuedNioTcpServer2 server = new QueuedNioTcpServer2(new NioTcpServer(this, channel, optionMap, true));
189 server.setAcceptListener(acceptListener);
190 ok = true;
191 return server;
192 }
193 } finally {
194 if (! ok) {
195 IoUtils.safeClose(channel);
196 }
197 }
198 }
199
200
201
202 public MulticastMessageChannel createUdpServer(final InetSocketAddress bindAddress, final ChannelListener<? super MulticastMessageChannel> bindListener, final OptionMap optionMap) throws IOException {
203 checkShutdown();
204 final DatagramChannel channel;
205 if (bindAddress != null) {
206 InetAddress address = bindAddress.getAddress();
207 if (address instanceof Inet6Address) {
208 channel = DatagramChannel.open(StandardProtocolFamily.INET6);
209 } else {
210 channel = DatagramChannel.open(StandardProtocolFamily.INET);
211 }
212 } else {
213 channel = DatagramChannel.open();
214 }
215 channel.configureBlocking(false);
216 if (optionMap.contains(Options.BROADCAST)) channel.socket().setBroadcast(optionMap.get(Options.BROADCAST, false));
217 if (optionMap.contains(Options.IP_TRAFFIC_CLASS)) channel.socket().setTrafficClass(optionMap.get(Options.IP_TRAFFIC_CLASS, -1));
218 if (optionMap.contains(Options.RECEIVE_BUFFER)) channel.socket().setReceiveBufferSize(optionMap.get(Options.RECEIVE_BUFFER, -1));
219 channel.socket().setReuseAddress(optionMap.get(Options.REUSE_ADDRESSES, true));
220 if (optionMap.contains(Options.SEND_BUFFER)) channel.socket().setSendBufferSize(optionMap.get(Options.SEND_BUFFER, -1));
221 channel.socket().bind(bindAddress);
222 final NioUdpChannel udpChannel = new NioUdpChannel(this, channel);
223 ChannelListeners.invokeChannelListener(udpChannel, bindListener);
224 return udpChannel;
225 }
226
227 public boolean isShutdown() {
228 return (state & CLOSE_REQ) != 0;
229 }
230
231 public boolean isTerminated() {
232 return (state & CLOSE_COMP) != 0;
233 }
234
235
238 void openResourceUnconditionally() {
239 int oldState = stateUpdater.getAndIncrement(this);
240 if (log.isTraceEnabled()) {
241 log.tracef("CAS %s %08x -> %08x", this, Integer.valueOf(oldState), Integer.valueOf(oldState + 1));
242 }
243 }
244
245 void checkShutdown() throws ClosedWorkerException {
246 if (isShutdown())
247 throw log.workerShutDown();
248 }
249
250 void closeResource() {
251 int oldState = stateUpdater.decrementAndGet(this);
252 if (log.isTraceEnabled()) {
253 log.tracef("CAS %s %08x -> %08x", this, Integer.valueOf(oldState + 1), Integer.valueOf(oldState));
254 }
255 while (oldState == CLOSE_REQ) {
256 if (stateUpdater.compareAndSet(this, CLOSE_REQ, CLOSE_REQ | CLOSE_COMP)) {
257 log.tracef("CAS %s %08x -> %08x (close complete)", this, Integer.valueOf(CLOSE_REQ), Integer.valueOf(CLOSE_REQ | CLOSE_COMP));
258 safeUnpark(shutdownWaiterUpdater.getAndSet(this, null));
259 final Runnable task = getTerminationTask();
260 if (task != null) try {
261 task.run();
262 } catch (Throwable ignored) {}
263 return;
264 }
265 oldState = state;
266 }
267 }
268
269 public void shutdown() {
270 int oldState;
271 oldState = state;
272 while ((oldState & CLOSE_REQ) == 0) {
273
274 if (! stateUpdater.compareAndSet(this, oldState, oldState | CLOSE_REQ)) {
275
276 oldState = state;
277 continue;
278 }
279 log.tracef("Initiating shutdown of %s", this);
280 for (WorkerThread worker : workerThreads) {
281 worker.shutdown();
282 }
283 acceptThread.shutdown();
284 shutDownTaskPool();
285 return;
286 }
287 log.tracef("Idempotent shutdown of %s", this);
288 return;
289 }
290
291 public List<Runnable> shutdownNow() {
292 shutdown();
293 return shutDownTaskPoolNow();
294 }
295
296 public boolean awaitTermination(final long timeout, final TimeUnit unit) throws InterruptedException {
297 int oldState = state;
298 if (Bits.allAreSet(oldState, CLOSE_COMP)) {
299 return true;
300 }
301 long then = System.nanoTime();
302 long duration = unit.toNanos(timeout);
303 final Thread myThread = Thread.currentThread();
304 while (Bits.allAreClear(oldState = state, CLOSE_COMP)) {
305 final Thread oldThread = shutdownWaiterUpdater.getAndSet(this, myThread);
306 try {
307 if (Bits.allAreSet(oldState = state, CLOSE_COMP)) {
308 break;
309 }
310 LockSupport.parkNanos(this, duration);
311 if (Thread.interrupted()) {
312 throw new InterruptedException();
313 }
314 long now = System.nanoTime();
315 duration -= now - then;
316 if (duration < 0L) {
317 oldState = state;
318 break;
319 }
320 } finally {
321 safeUnpark(oldThread);
322 }
323 }
324 return Bits.allAreSet(oldState, CLOSE_COMP);
325 }
326
327 public void awaitTermination() throws InterruptedException {
328 int oldState = state;
329 if (Bits.allAreSet(oldState, CLOSE_COMP)) {
330 return;
331 }
332 final Thread myThread = Thread.currentThread();
333 while (Bits.allAreClear(state, CLOSE_COMP)) {
334 final Thread oldThread = shutdownWaiterUpdater.getAndSet(this, myThread);
335 try {
336 if (Bits.allAreSet(state, CLOSE_COMP)) {
337 break;
338 }
339 LockSupport.park(this);
340 if (Thread.interrupted()) {
341 throw new InterruptedException();
342 }
343 } finally {
344 safeUnpark(oldThread);
345 }
346 }
347 }
348
349 private static void safeUnpark(final Thread waiter) {
350 if (waiter != null) LockSupport.unpark(waiter);
351 }
352
353 protected void taskPoolTerminated() {
354 safeClose(metrics);
355 closeResource();
356 }
357
358 @Override
359 public <T> T getOption(Option<T> option) throws IOException {
360 if (option.equals(Options.WORKER_IO_THREADS)) {
361 return option.cast(workerThreads.length);
362 } else if (option.equals(Options.STACK_SIZE)) {
363 return option.cast(workerStackSize);
364 } else {
365 return super.getOption(option);
366 }
367 }
368
369 public NioXnio getXnio() {
370 return (NioXnio) super.getXnio();
371 }
372
373 WorkerThread getAcceptThread() {
374 return acceptThread;
375 }
376
377 @Override
378 public XnioWorkerMXBean getMXBean() {
379 return metrics;
380 }
381
382 @Override
383 protected ManagementRegistration registerServerMXBean(XnioServerMXBean serverMXBean) {
384 return metrics.registerServerMXBean(serverMXBean);
385 }
386
387 private class NioWorkerMetrics implements XnioWorkerMXBean,Closeable {
388 private final String workerName;
389 private final CopyOnWriteArrayList<XnioServerMXBean> serverMetrics = new CopyOnWriteArrayList<>();
390 private Closeable mbeanHandle;
391
392 private NioWorkerMetrics(String workerName) {
393 this.workerName = workerName;
394 }
395
396 public String getProviderName() {
397 return "nio";
398 }
399
400 public String getName() {
401 return workerName;
402 }
403
404 public boolean isShutdownRequested() {
405 return isShutdown();
406 }
407
408 public int getCoreWorkerPoolSize() {
409 return NioXnioWorker.this.getCoreWorkerPoolSize();
410 }
411
412 public int getMaxWorkerPoolSize() {
413 return NioXnioWorker.this.getMaxWorkerPoolSize();
414 }
415
416 public int getWorkerPoolSize() {
417 return NioXnioWorker.this.getWorkerPoolSize();
418 }
419
420 public int getBusyWorkerThreadCount() {
421 return NioXnioWorker.this.getBusyWorkerThreadCount();
422 }
423
424 public int getIoThreadCount() {
425 return NioXnioWorker.this.getIoThreadCount();
426 }
427
428 public int getWorkerQueueSize() {
429 return NioXnioWorker.this.getWorkerQueueSize();
430 }
431
432 private ManagementRegistration registerServerMXBean(XnioServerMXBean serverMXBean){
433 serverMetrics.addIfAbsent(serverMXBean);
434 final Closeable handle = NioXnio.register(serverMXBean);
435 return () -> {
436 serverMetrics.remove(serverMXBean);
437 safeClose(handle);
438 };
439 }
440
441 public Set<XnioServerMXBean> getServerMXBeans() {
442 return new LinkedHashSet<>(serverMetrics);
443 }
444 private void register(){
445 this.mbeanHandle = NioXnio.register(this);
446 }
447
448 @Override
449 public void close() throws IOException {
450 safeClose(mbeanHandle);
451 serverMetrics.clear();
452 }
453 }
454 }
455