1
18
19 package org.xnio.nio;
20
21 import java.io.IOException;
22 import java.net.InetSocketAddress;
23 import java.net.SocketAddress;
24 import java.nio.channels.CancelledKeyException;
25 import java.nio.channels.ClosedChannelException;
26 import java.nio.channels.Pipe;
27 import java.nio.channels.SelectableChannel;
28 import java.nio.channels.SelectionKey;
29 import java.nio.channels.Selector;
30 import java.nio.channels.ServerSocketChannel;
31 import java.nio.channels.SocketChannel;
32 import java.nio.channels.spi.AbstractSelectableChannel;
33 import java.security.AccessController;
34 import java.util.ArrayDeque;
35 import java.util.Arrays;
36 import java.util.Iterator;
37 import java.util.Queue;
38 import java.util.Set;
39 import java.util.TreeSet;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
42 import java.util.concurrent.atomic.AtomicLong;
43 import java.util.concurrent.atomic.AtomicReference;
44
45 import org.jboss.logging.Logger;
46 import org.xnio.Cancellable;
47 import org.xnio.ChannelListener;
48 import org.xnio.ChannelListeners;
49 import org.xnio.ChannelPipe;
50 import org.xnio.ClosedWorkerException;
51 import org.xnio.FailedIoFuture;
52 import org.xnio.FinishedIoFuture;
53 import org.xnio.FutureResult;
54 import org.xnio.IoFuture;
55 import org.xnio.Option;
56 import org.xnio.OptionMap;
57 import org.xnio.Options;
58 import org.xnio.ReadPropertyAction;
59 import org.xnio.StreamConnection;
60 import org.xnio.XnioExecutor;
61 import org.xnio.XnioIoFactory;
62 import org.xnio.XnioIoThread;
63 import org.xnio.XnioWorker;
64 import org.xnio.channels.BoundChannel;
65 import org.xnio.channels.StreamSinkChannel;
66 import org.xnio.channels.StreamSourceChannel;
67
68 import static java.lang.System.identityHashCode;
69 import static java.lang.System.nanoTime;
70 import static java.util.concurrent.locks.LockSupport.park;
71 import static java.util.concurrent.locks.LockSupport.unpark;
72 import static org.xnio.IoUtils.safeClose;
73 import static org.xnio.nio.Log.log;
74 import static org.xnio.nio.Log.selectorLog;
75
76
79 final class WorkerThread extends XnioIoThread implements XnioExecutor {
80 private static final long LONGEST_DELAY = 9223372036853L;
81 private static final String FQCN = WorkerThread.class.getName();
82 private static final boolean OLD_LOCKING;
83 private static final boolean THREAD_SAFE_SELECTION_KEYS;
84 private static final long START_TIME = System.nanoTime();
85
86 private final Selector selector;
87 private final Object workLock = new Object();
88
89 private final Queue<Runnable> selectorWorkQueue = new ArrayDeque<Runnable>();
90 private final TreeSet<TimeKey> delayWorkQueue = new TreeSet<TimeKey>();
91
92 private volatile int state;
93
94 private static final int SHUTDOWN = (1 << 31);
95
96 private static final AtomicIntegerFieldUpdater<WorkerThread> stateUpdater = AtomicIntegerFieldUpdater.newUpdater(WorkerThread.class, "state");
97
98 static {
99 OLD_LOCKING = Boolean.parseBoolean(AccessController.doPrivileged(new ReadPropertyAction("xnio.nio.old-locking", "false")));
100 THREAD_SAFE_SELECTION_KEYS = Boolean.parseBoolean(AccessController.doPrivileged(new ReadPropertyAction("xnio.nio.thread-safe-selection-keys", "false")));
101 }
102
103 WorkerThread(final NioXnioWorker worker, final Selector selector, final String name, final ThreadGroup group, final long stackSize, final int number) {
104 super(worker, number, group, name, stackSize);
105 this.selector = selector;
106 }
107
108 static WorkerThread getCurrent() {
109 final Thread thread = currentThread();
110 return thread instanceof WorkerThread ? (WorkerThread) thread : null;
111 }
112
113 public NioXnioWorker getWorker() {
114 return (NioXnioWorker) super.getWorker();
115 }
116
117 protected IoFuture<StreamConnection> acceptTcpStreamConnection(final InetSocketAddress destination, final ChannelListener<? super StreamConnection> openListener, final ChannelListener<? super BoundChannel> bindListener, final OptionMap optionMap) {
118 try {
119 getWorker().checkShutdown();
120 } catch (ClosedWorkerException e) {
121 return new FailedIoFuture<StreamConnection>(e);
122 }
123 final FutureResult<StreamConnection> futureResult = new FutureResult<StreamConnection>(this);
124 try {
125 boolean ok = false;
126 final ServerSocketChannel serverChannel = ServerSocketChannel.open();
127 try {
128 serverChannel.configureBlocking(false);
129 if (optionMap.contains(Options.RECEIVE_BUFFER)) {
130 serverChannel.socket().setReceiveBufferSize(optionMap.get(Options.RECEIVE_BUFFER, -1));
131 }
132 serverChannel.socket().setReuseAddress(optionMap.get(Options.REUSE_ADDRESSES, true));
133 serverChannel.bind(destination);
134 if (bindListener != null) ChannelListeners.invokeChannelListener(new BoundChannel() {
135 public SocketAddress getLocalAddress() {
136 return serverChannel.socket().getLocalSocketAddress();
137 }
138
139 public <A extends SocketAddress> A getLocalAddress(final Class<A> type) {
140 final SocketAddress address = getLocalAddress();
141 return type.isInstance(address) ? type.cast(address) : null;
142 }
143
144 public ChannelListener.Setter<? extends BoundChannel> getCloseSetter() {
145 return new ChannelListener.SimpleSetter<BoundChannel>();
146 }
147
148 public XnioWorker getWorker() {
149 return WorkerThread.this.getWorker();
150 }
151
152 public XnioIoThread getIoThread() {
153 return WorkerThread.this;
154 }
155
156 public void close() throws IOException {
157 serverChannel.close();
158 }
159
160 public boolean isOpen() {
161 return serverChannel.isOpen();
162 }
163
164 public boolean supportsOption(final Option<?> option) {
165 return false;
166 }
167
168 public <T> T getOption(final Option<T> option) throws IOException {
169 return null;
170 }
171
172 public <T> T setOption(final Option<T> option, final T value) throws IllegalArgumentException, IOException {
173 return null;
174 }
175 }, bindListener);
176 final SelectionKey key = this.registerChannel(serverChannel);
177 final NioHandle handle = new NioHandle(this, key) {
178 void handleReady(final int ops) {
179 boolean ok = false;
180 try {
181 final SocketChannel channel = serverChannel.accept();
182 if (channel == null) {
183 ok = true;
184 return;
185 } else {
186 safeClose(serverChannel);
187 }
188 try {
189 channel.configureBlocking(false);
190 if (optionMap.contains(Options.TCP_OOB_INLINE)) channel.socket().setOOBInline(optionMap.get(Options.TCP_OOB_INLINE, false));
191 if (optionMap.contains(Options.TCP_NODELAY)) channel.socket().setTcpNoDelay(optionMap.get(Options.TCP_NODELAY, false));
192 if (optionMap.contains(Options.IP_TRAFFIC_CLASS)) channel.socket().setTrafficClass(optionMap.get(Options.IP_TRAFFIC_CLASS, -1));
193 if (optionMap.contains(Options.CLOSE_ABORT)) channel.socket().setSoLinger(optionMap.get(Options.CLOSE_ABORT, false), 0);
194 if (optionMap.contains(Options.KEEP_ALIVE)) channel.socket().setKeepAlive(optionMap.get(Options.KEEP_ALIVE, false));
195 if (optionMap.contains(Options.SEND_BUFFER)) channel.socket().setSendBufferSize(optionMap.get(Options.SEND_BUFFER, -1));
196 final SelectionKey selectionKey = WorkerThread.this.registerChannel(channel);
197 final NioSocketStreamConnection connection = new NioSocketStreamConnection(WorkerThread.this, selectionKey, null);
198 if (optionMap.contains(Options.READ_TIMEOUT)) connection.setOption(Options.READ_TIMEOUT, optionMap.get(Options.READ_TIMEOUT, 0));
199 if (optionMap.contains(Options.WRITE_TIMEOUT)) connection.setOption(Options.WRITE_TIMEOUT, optionMap.get(Options.WRITE_TIMEOUT, 0));
200 if (futureResult.setResult(connection)) {
201 ok = true;
202 ChannelListeners.invokeChannelListener(connection, openListener);
203 }
204 } finally {
205 if (! ok) safeClose(channel);
206 }
207 } catch (IOException e) {
208 futureResult.setException(e);
209 } finally {
210 if (! ok) {
211 safeClose(serverChannel);
212 }
213 }
214 }
215
216 void terminated() {
217 }
218
219 void forceTermination() {
220 futureResult.setCancelled();
221 }
222 };
223 key.attach(handle);
224 handle.resume(SelectionKey.OP_ACCEPT);
225 ok = true;
226 futureResult.addCancelHandler(new Cancellable() {
227 public Cancellable cancel() {
228 if (futureResult.setCancelled()) {
229 safeClose(serverChannel);
230 }
231 return this;
232 }
233 });
234 return futureResult.getIoFuture();
235 } finally {
236 if (! ok) safeClose(serverChannel);
237 }
238 } catch (IOException e) {
239 return new FailedIoFuture<StreamConnection>(e);
240 }
241 }
242
243 protected IoFuture<StreamConnection> openTcpStreamConnection(final InetSocketAddress bindAddress, final InetSocketAddress destinationAddress, final ChannelListener<? super StreamConnection> openListener, final ChannelListener<? super BoundChannel> bindListener, final OptionMap optionMap) {
244 try {
245 getWorker().checkShutdown();
246 } catch (ClosedWorkerException e) {
247 return new FailedIoFuture<StreamConnection>(e);
248 }
249 try {
250 final SocketChannel channel = SocketChannel.open();
251 boolean ok = false;
252 try {
253 channel.configureBlocking(false);
254 if (optionMap.contains(Options.TCP_OOB_INLINE)) channel.socket().setOOBInline(optionMap.get(Options.TCP_OOB_INLINE, false));
255 if (optionMap.contains(Options.TCP_NODELAY)) channel.socket().setTcpNoDelay(optionMap.get(Options.TCP_NODELAY, false));
256 if (optionMap.contains(Options.IP_TRAFFIC_CLASS)) channel.socket().setTrafficClass(optionMap.get(Options.IP_TRAFFIC_CLASS, -1));
257 if (optionMap.contains(Options.CLOSE_ABORT)) channel.socket().setSoLinger(optionMap.get(Options.CLOSE_ABORT, false), 0);
258 if (optionMap.contains(Options.KEEP_ALIVE)) channel.socket().setKeepAlive(optionMap.get(Options.KEEP_ALIVE, false));
259 if (optionMap.contains(Options.RECEIVE_BUFFER)) channel.socket().setReceiveBufferSize(optionMap.get(Options.RECEIVE_BUFFER, -1));
260 if (optionMap.contains(Options.REUSE_ADDRESSES)) channel.socket().setReuseAddress(optionMap.get(Options.REUSE_ADDRESSES, false));
261 if (optionMap.contains(Options.SEND_BUFFER)) channel.socket().setSendBufferSize(optionMap.get(Options.SEND_BUFFER, -1));
262 final SelectionKey key = registerChannel(channel);
263 final NioSocketStreamConnection connection = new NioSocketStreamConnection(this, key, null);
264 if (optionMap.contains(Options.READ_TIMEOUT)) connection.setOption(Options.READ_TIMEOUT, optionMap.get(Options.READ_TIMEOUT, 0));
265 if (optionMap.contains(Options.WRITE_TIMEOUT)) connection.setOption(Options.WRITE_TIMEOUT, optionMap.get(Options.WRITE_TIMEOUT, 0));
266 if (bindAddress != null || bindListener != null) {
267 channel.socket().bind(bindAddress);
268 ChannelListeners.invokeChannelListener(connection, bindListener);
269 }
270 if (channel.connect(destinationAddress)) {
271 selectorLog.tracef("Synchronous connect");
272 execute(ChannelListeners.getChannelListenerTask(connection, openListener));
273 final FinishedIoFuture<StreamConnection> finishedIoFuture = new FinishedIoFuture<StreamConnection>(connection);
274 ok = true;
275 return finishedIoFuture;
276 }
277 selectorLog.tracef("Asynchronous connect");
278 final FutureResult<StreamConnection> futureResult = new FutureResult<StreamConnection>(this);
279 final ConnectHandle connectHandle = new ConnectHandle(this, key, futureResult, connection, openListener);
280 key.attach(connectHandle);
281 futureResult.addCancelHandler(new Cancellable() {
282 public Cancellable cancel() {
283 if (futureResult.setCancelled()) {
284 safeClose(connection);
285 }
286 return this;
287 }
288 });
289 connectHandle.resume(SelectionKey.OP_CONNECT);
290 ok = true;
291 return futureResult.getIoFuture();
292 } finally {
293 if (! ok) safeClose(channel);
294 }
295 } catch (IOException e) {
296 return new FailedIoFuture<StreamConnection>(e);
297 }
298 }
299
300 WorkerThread getNextThread() {
301 final WorkerThread[] all = getWorker().getAll();
302 final int number = getNumber();
303 if (number == all.length - 1) {
304 return all[0];
305 } else {
306 return all[number + 1];
307 }
308 }
309
310 static final class ConnectHandle extends NioHandle {
311
312 private final FutureResult<StreamConnection> futureResult;
313 private final NioSocketStreamConnection connection;
314 private final ChannelListener<? super StreamConnection> openListener;
315
316 ConnectHandle(final WorkerThread workerThread, final SelectionKey selectionKey, final FutureResult<StreamConnection> futureResult, final NioSocketStreamConnection connection, final ChannelListener<? super StreamConnection> openListener) {
317 super(workerThread, selectionKey);
318 this.futureResult = futureResult;
319 this.connection = connection;
320 this.openListener = openListener;
321 }
322
323 void handleReady(final int ops) {
324 final SocketChannel channel = getChannel();
325 boolean ok = false;
326 try {
327 if (channel.finishConnect()) {
328 selectorLog.tracef("handleReady connect finished");
329 suspend(SelectionKey.OP_CONNECT);
330 getSelectionKey().attach(connection.getConduit());
331 if (futureResult.setResult(connection)) {
332 ok = true;
333 ChannelListeners.invokeChannelListener(connection, openListener);
334 }
335 }
336 } catch (IOException e) {
337 selectorLog.tracef("ConnectHandle.handleReady Exception, %s", e);
338 futureResult.setException(e);
339 } finally {
340 if (!ok) {
341 selectorLog.tracef("!OK, closing connection");
342 safeClose(connection);
343 }
344 }
345 }
346
347 private SocketChannel getChannel() {
348 return (SocketChannel) getSelectionKey().channel();
349 }
350
351 void forceTermination() {
352 futureResult.setCancelled();
353 safeClose(getChannel());
354 }
355
356 void terminated() {
357 }
358 }
359
360 private static WorkerThread getPeerThread(final XnioIoFactory peer) throws ClosedWorkerException {
361 final WorkerThread peerThread;
362 if (peer instanceof NioXnioWorker) {
363 final NioXnioWorker peerWorker = (NioXnioWorker) peer;
364 peerWorker.checkShutdown();
365 peerThread = peerWorker.chooseThread();
366 } else if (peer instanceof WorkerThread) {
367 peerThread = (WorkerThread) peer;
368 peerThread.getWorker().checkShutdown();
369 } else {
370 throw log.notNioProvider();
371 }
372 return peerThread;
373 }
374
375 public ChannelPipe<StreamConnection, StreamConnection> createFullDuplexPipeConnection(XnioIoFactory peer) throws IOException {
376 getWorker().checkShutdown();
377 boolean ok = false;
378 final Pipe topPipe = Pipe.open();
379 try {
380 topPipe.source().configureBlocking(false);
381 topPipe.sink().configureBlocking(false);
382 final Pipe bottomPipe = Pipe.open();
383 try {
384 bottomPipe.source().configureBlocking(false);
385 bottomPipe.sink().configureBlocking(false);
386 final WorkerThread peerThread = getPeerThread(peer);
387 final SelectionKey topSourceKey = registerChannel(topPipe.source());
388 final SelectionKey topSinkKey = peerThread.registerChannel(topPipe.sink());
389 final SelectionKey bottomSourceKey = peerThread.registerChannel(bottomPipe.source());
390 final SelectionKey bottomSinkKey = registerChannel(bottomPipe.sink());
391 final NioPipeStreamConnection leftConnection = new NioPipeStreamConnection(this, bottomSourceKey, topSinkKey);
392 final NioPipeStreamConnection rightConnection = new NioPipeStreamConnection(this, topSourceKey, bottomSinkKey);
393 final ChannelPipe<StreamConnection, StreamConnection> result = new ChannelPipe<StreamConnection, StreamConnection>(leftConnection, rightConnection);
394 ok = true;
395 return result;
396 } finally {
397 if (! ok) {
398 safeClose(bottomPipe.sink());
399 safeClose(bottomPipe.source());
400 }
401 }
402 } finally {
403 if (! ok) {
404 safeClose(topPipe.sink());
405 safeClose(topPipe.source());
406 }
407 }
408 }
409
410 public ChannelPipe<StreamSourceChannel, StreamSinkChannel> createHalfDuplexPipe(final XnioIoFactory peer) throws IOException {
411 getWorker().checkShutdown();
412 final Pipe pipe = Pipe.open();
413 boolean ok = false;
414 try {
415 pipe.source().configureBlocking(false);
416 pipe.sink().configureBlocking(false);
417 final WorkerThread peerThread = getPeerThread(peer);
418 final SelectionKey readKey = registerChannel(pipe.source());
419 final SelectionKey writeKey = peerThread.registerChannel(pipe.sink());
420 final NioPipeStreamConnection leftConnection = new NioPipeStreamConnection(this, readKey, null);
421 final NioPipeStreamConnection rightConnection = new NioPipeStreamConnection(this, null, writeKey);
422 leftConnection.writeClosed();
423 rightConnection.readClosed();
424 final ChannelPipe<StreamSourceChannel,StreamSinkChannel> result = new ChannelPipe<StreamSourceChannel, StreamSinkChannel>(leftConnection.getSourceChannel(), rightConnection.getSinkChannel());
425 ok = true;
426 return result;
427 } finally {
428 if (! ok) {
429 safeClose(pipe.sink());
430 safeClose(pipe.source());
431 }
432 }
433 }
434
435 volatile boolean polling;
436
437 public void run() {
438 final Selector selector = this.selector;
439 try {
440 log.tracef("Starting worker thread %s", this);
441 final Object lock = workLock;
442 final Queue<Runnable> workQueue = selectorWorkQueue;
443 final TreeSet<TimeKey> delayQueue = delayWorkQueue;
444 log.debugf("Started channel thread '%s', selector %s", currentThread().getName(), selector);
445 Runnable task;
446 Iterator<TimeKey> iterator;
447 long delayTime = Long.MAX_VALUE;
448 Set<SelectionKey> selectedKeys;
449 SelectionKey[] keys = new SelectionKey[16];
450 int oldState;
451 int keyCount;
452 for (;;) {
453
454 do {
455 synchronized (lock) {
456 task = workQueue.poll();
457 if (task == null) {
458 iterator = delayQueue.iterator();
459 delayTime = Long.MAX_VALUE;
460 if (iterator.hasNext()) {
461 final long now = nanoTime();
462 do {
463 final TimeKey key = iterator.next();
464 if (key.deadline <= (now - START_TIME)) {
465 workQueue.add(key.command);
466 iterator.remove();
467 } else {
468 delayTime = key.deadline - (now - START_TIME);
469
470 break;
471 }
472 } while (iterator.hasNext());
473 }
474 task = workQueue.poll();
475 }
476 }
477
478 Thread.interrupted();
479 safeRun(task);
480 } while (task != null);
481
482 oldState = state;
483 if ((oldState & SHUTDOWN) != 0) {
484 synchronized (lock) {
485 keyCount = selector.keys().size();
486 state = keyCount | SHUTDOWN;
487 if (keyCount == 0 && workQueue.isEmpty()) {
488
489 return;
490 }
491 }
492 synchronized (selector) {
493 final Set<SelectionKey> keySet = selector.keys();
494 synchronized (keySet) {
495 keys = keySet.toArray(keys);
496 Arrays.fill(keys, keySet.size(), keys.length, null);
497 }
498 }
499
500 for (int i = 0; i < keys.length; i++) {
501 final SelectionKey key = keys[i];
502 if (key == null) break;
503 keys[i] = null;
504 final NioHandle attachment = (NioHandle) key.attachment();
505 if (attachment != null) {
506 safeClose(key.channel());
507 attachment.forceTermination();
508 }
509 }
510 Arrays.fill(keys, 0, keys.length, null);
511 }
512
513 Thread.interrupted();
514
515 try {
516 if ((oldState & SHUTDOWN) != 0) {
517 selectorLog.tracef("Beginning select on %s (shutdown in progress)", selector);
518 selector.selectNow();
519 } else if (delayTime == Long.MAX_VALUE) {
520 selectorLog.tracef("Beginning select on %s", selector);
521 polling = true;
522 try {
523 Runnable item = null;
524 synchronized (lock) {
525 item = workQueue.peek();
526 }
527 if (item != null) {
528 log.tracef("SelectNow, queue is not empty");
529 selector.selectNow();
530 } else {
531 log.tracef("Select, queue is empty");
532 selector.select();
533 }
534 } finally {
535 polling = false;
536 }
537 } else {
538 final long millis = 1L + delayTime / 1000000L;
539 selectorLog.tracef("Beginning select on %s (with timeout)", selector);
540 polling = true;
541 try {
542 Runnable item = null;
543 synchronized (lock) {
544 item = workQueue.peek();
545 }
546 if (item != null) {
547 log.tracef("SelectNow, queue is not empty");
548 selector.selectNow();
549 } else {
550 log.tracef("Select, queue is empty");
551 selector.select(millis);
552 }
553 } finally {
554 polling = false;
555 }
556 }
557 } catch (CancelledKeyException ignored) {
558
559 selectorLog.trace("Spurious cancelled key exception");
560 } catch (IOException e) {
561 selectorLog.selectionError(e);
562
563 }
564 selectorLog.tracef("Selected on %s", selector);
565
566 synchronized (selector) {
567 selectedKeys = selector.selectedKeys();
568 synchronized (selectedKeys) {
569
570 keys = selectedKeys.toArray(keys);
571 Arrays.fill(keys, selectedKeys.size(), keys.length, null);
572 selectedKeys.clear();
573 }
574 }
575 for (int i = 0; i < keys.length; i++) {
576 final SelectionKey key = keys[i];
577 if (key == null) break;
578 keys[i] = null;
579 final int ops;
580 try {
581 ops = key.interestOps();
582 if (ops != 0) {
583 selectorLog.tracef("Selected key %s for %s", key, key.channel());
584 final NioHandle handle = (NioHandle) key.attachment();
585 if (handle == null) {
586 cancelKey(key, false);
587 } else {
588
589 Thread.interrupted();
590 selectorLog.tracef("Calling handleReady key %s for %s", key.readyOps(), key.channel());
591 handle.handleReady(key.readyOps());
592 }
593 }
594 } catch (CancelledKeyException ignored) {
595 selectorLog.tracef("Skipping selection of cancelled key %s", key);
596 } catch (Throwable t) {
597 selectorLog.tracef(t, "Unexpected failure of selection of key %s", key);
598 }
599 }
600
601 }
602 } finally {
603 log.tracef("Shutting down channel thread \"%s\"", this);
604 safeClose(selector);
605 getWorker().closeResource();
606 }
607 }
608
609 private static void safeRun(final Runnable command) {
610 if (command != null) try {
611 log.tracef("Running task %s", command);
612 command.run();
613 } catch (Throwable t) {
614 log.taskFailed(command, t);
615 }
616 }
617
618 public void execute(final Runnable command) {
619 if ((state & SHUTDOWN) != 0) {
620 throw log.threadExiting();
621 }
622 synchronized (workLock) {
623 selectorWorkQueue.add(command);
624 log.tracef("Added task %s", command);
625 }
626 if (polling) {
627 selector.wakeup();
628 } else {
629 log.tracef("Not polling, no wakeup");
630 }
631 }
632
633 void shutdown() {
634 int oldState;
635 do {
636 oldState = state;
637 if ((oldState & SHUTDOWN) != 0) {
638
639 return;
640 }
641 } while (! stateUpdater.compareAndSet(this, oldState, oldState | SHUTDOWN));
642 if(currentThread() != this) {
643 selector.wakeup();
644 }
645 }
646
647 public Key executeAfter(final Runnable command, final long time, final TimeUnit unit) {
648 final long millis = unit.toMillis(time);
649 if ((state & SHUTDOWN) != 0) {
650 throw log.threadExiting();
651 }
652 if (millis <= 0) {
653 execute(command);
654 return Key.IMMEDIATE;
655 }
656 final long deadline = (nanoTime() - START_TIME) + Math.min(millis, LONGEST_DELAY) * 1000000L;
657 final TimeKey key = new TimeKey(deadline, command);
658 synchronized (workLock) {
659 final TreeSet<TimeKey> queue = delayWorkQueue;
660 queue.add(key);
661 if (queue.iterator().next() == key) {
662
663 if (polling) {
664 selector.wakeup();
665 }
666 }
667 return key;
668 }
669 }
670
671 class RepeatKey implements Key, Runnable {
672 private final Runnable command;
673 private final long millis;
674 private final AtomicReference<Key> current = new AtomicReference<>();
675
676 RepeatKey(final Runnable command, final long millis) {
677 this.command = command;
678 this.millis = millis;
679 }
680
681 public boolean remove() {
682 final Key removed = current.getAndSet(this);
683
684 assert removed != null;
685 return removed != this && removed.remove();
686 }
687
688 void setFirst(Key key) {
689 current.compareAndSet(null, key);
690 }
691
692 public void run() {
693 try {
694 command.run();
695 } finally {
696 Key o, n;
697 o = current.get();
698 if (o != this) {
699 n = executeAfter(this, millis, TimeUnit.MILLISECONDS);
700 if (!current.compareAndSet(o, n)) {
701 n.remove();
702 }
703 }
704 }
705 }
706 }
707
708 public Key executeAtInterval(final Runnable command, final long time, final TimeUnit unit) {
709 final long millis = unit.toMillis(time);
710 final RepeatKey repeatKey = new RepeatKey(command, millis);
711 final Key firstKey = executeAfter(repeatKey, millis, TimeUnit.MILLISECONDS);
712 repeatKey.setFirst(firstKey);
713 return repeatKey;
714 }
715
716 SelectionKey registerChannel(final AbstractSelectableChannel channel) throws ClosedChannelException {
717 if (currentThread() == this) {
718 return channel.register(selector, 0);
719 } else if (THREAD_SAFE_SELECTION_KEYS) {
720 try {
721 return channel.register(selector, 0);
722 } finally {
723 if (polling) selector.wakeup();
724 }
725 } else {
726 final SynchTask task = new SynchTask();
727 queueTask(task);
728 try {
729
730 selector.wakeup();
731 return channel.register(selector, 0);
732 } finally {
733 task.done();
734 }
735 }
736 }
737
738 void queueTask(final Runnable task) {
739 synchronized (workLock) {
740 selectorWorkQueue.add(task);
741 }
742 }
743
744 void cancelKey(final SelectionKey key, final boolean block) {
745 assert key.selector() == selector;
746 final SelectableChannel channel = key.channel();
747 if (currentThread() == this) {
748 log.logf(FQCN, Logger.Level.TRACE, null, "Cancelling key %s of %s (same thread)", key, channel);
749 try {
750 key.cancel();
751 try {
752 selector.selectNow();
753 } catch (IOException e) {
754 log.selectionError(e);
755 }
756 } catch (Throwable t) {
757 log.logf(FQCN, Logger.Level.TRACE, t, "Error cancelling key %s of %s (same thread)", key, channel);
758 }
759 } else if (OLD_LOCKING) {
760 log.logf(FQCN, Logger.Level.TRACE, null, "Cancelling key %s of %s (same thread, old locking)", key, channel);
761 final SynchTask task = new SynchTask();
762 queueTask(task);
763 try {
764
765 selector.wakeup();
766 key.cancel();
767 } catch (Throwable t) {
768 log.logf(FQCN, Logger.Level.TRACE, t, "Error cancelling key %s of %s (same thread, old locking)", key, channel);
769 } finally {
770 task.done();
771 }
772 } else {
773 log.logf(FQCN, Logger.Level.TRACE, null, "Cancelling key %s of %s (other thread)", key, channel);
774 try {
775 key.cancel();
776 if (block) {
777 final SelectNowTask task = new SelectNowTask();
778 queueTask(task);
779 selector.wakeup();
780
781 task.doWait();
782 } else {
783 selector.wakeup();
784 }
785 } catch (Throwable t) {
786 log.logf(FQCN, Logger.Level.TRACE, t, "Error cancelling key %s of %s (other thread)", key, channel);
787 }
788 }
789 }
790
791 void setOps(final SelectionKey key, final int ops) {
792 if (currentThread() == this) {
793 try {
794 synchronized(key) {
795 key.interestOps(key.interestOps() | ops);
796 }
797 } catch (CancelledKeyException ignored) {}
798 } else if (OLD_LOCKING) {
799 final SynchTask task = new SynchTask();
800 queueTask(task);
801 try {
802
803 selector.wakeup();
804 synchronized(key) {
805 key.interestOps(key.interestOps() | ops);
806 }
807 } catch (CancelledKeyException ignored) {
808 } finally {
809 task.done();
810 }
811 } else {
812 try {
813 synchronized(key) {
814 key.interestOps(key.interestOps() | ops);
815 }
816 if (polling) selector.wakeup();
817 } catch (CancelledKeyException ignored) {
818 }
819 }
820 }
821
822 void clearOps(final SelectionKey key, final int ops) {
823 if (currentThread() == this || ! OLD_LOCKING) {
824 try {
825 synchronized(key) {
826 key.interestOps(key.interestOps() & ~ops);
827 }
828 } catch (CancelledKeyException ignored) {}
829 } else {
830 final SynchTask task = new SynchTask();
831 queueTask(task);
832 try {
833
834 selector.wakeup();
835 synchronized(key) {
836 key.interestOps(key.interestOps() & ~ops);
837 }
838 } catch (CancelledKeyException ignored) {
839 } finally {
840 task.done();
841 }
842 }
843 }
844
845 Selector getSelector() {
846 return selector;
847 }
848
849 public boolean equals(final Object obj) {
850 return obj == this;
851 }
852
853 public int hashCode() {
854 return identityHashCode(this);
855 }
856
857 static final AtomicLong seqGen = new AtomicLong();
858
859 final class TimeKey implements XnioExecutor.Key, Comparable<TimeKey> {
860 private final long deadline;
861 private final long seq = seqGen.incrementAndGet();
862 private final Runnable command;
863
864 TimeKey(final long deadline, final Runnable command) {
865 this.deadline = deadline;
866 this.command = command;
867 }
868
869 public boolean remove() {
870 synchronized (workLock) {
871 return delayWorkQueue.remove(this);
872 }
873 }
874
875 public int compareTo(final TimeKey o) {
876 int r = Long.signum(deadline - o.deadline);
877 if (r == 0) r = Long.signum(seq - o.seq);
878 return r;
879 }
880 }
881
882 final class SynchTask implements Runnable {
883 volatile boolean done;
884
885 public void run() {
886 while (! done) {
887 park();
888 }
889 }
890
891 void done() {
892 done = true;
893 unpark(WorkerThread.this);
894 }
895 }
896
897 final class SelectNowTask implements Runnable {
898 final Thread thread = Thread.currentThread();
899 volatile boolean done;
900
901 void doWait() {
902 while (! done) {
903 park();
904 }
905 }
906
907 public void run() {
908 try {
909 selector.selectNow();
910 } catch (IOException ignored) {
911 }
912 done = true;
913 unpark(thread);
914 }
915 }
916 }
917