1 /*
2  * JBoss, Home of Professional Open Source.
3  * Copyright 2012 Red Hat, Inc., and individual contributors
4  * as indicated by the @author tags.
5  *
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18
19 package org.xnio.nio;
20
21 import java.io.IOException;
22 import java.net.InetSocketAddress;
23 import java.net.SocketAddress;
24 import java.nio.channels.CancelledKeyException;
25 import java.nio.channels.ClosedChannelException;
26 import java.nio.channels.Pipe;
27 import java.nio.channels.SelectableChannel;
28 import java.nio.channels.SelectionKey;
29 import java.nio.channels.Selector;
30 import java.nio.channels.ServerSocketChannel;
31 import java.nio.channels.SocketChannel;
32 import java.nio.channels.spi.AbstractSelectableChannel;
33 import java.security.AccessController;
34 import java.util.ArrayDeque;
35 import java.util.Arrays;
36 import java.util.Iterator;
37 import java.util.Queue;
38 import java.util.Set;
39 import java.util.TreeSet;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
42 import java.util.concurrent.atomic.AtomicLong;
43 import java.util.concurrent.atomic.AtomicReference;
44
45 import org.jboss.logging.Logger;
46 import org.xnio.Cancellable;
47 import org.xnio.ChannelListener;
48 import org.xnio.ChannelListeners;
49 import org.xnio.ChannelPipe;
50 import org.xnio.ClosedWorkerException;
51 import org.xnio.FailedIoFuture;
52 import org.xnio.FinishedIoFuture;
53 import org.xnio.FutureResult;
54 import org.xnio.IoFuture;
55 import org.xnio.Option;
56 import org.xnio.OptionMap;
57 import org.xnio.Options;
58 import org.xnio.ReadPropertyAction;
59 import org.xnio.StreamConnection;
60 import org.xnio.XnioExecutor;
61 import org.xnio.XnioIoFactory;
62 import org.xnio.XnioIoThread;
63 import org.xnio.XnioWorker;
64 import org.xnio.channels.BoundChannel;
65 import org.xnio.channels.StreamSinkChannel;
66 import org.xnio.channels.StreamSourceChannel;
67
68 import static java.lang.System.identityHashCode;
69 import static java.lang.System.nanoTime;
70 import static java.util.concurrent.locks.LockSupport.park;
71 import static java.util.concurrent.locks.LockSupport.unpark;
72 import static org.xnio.IoUtils.safeClose;
73 import static org.xnio.nio.Log.log;
74 import static org.xnio.nio.Log.selectorLog;
75
76 /**
77  * @author <a href="mailto:david.lloyd@redhat.com">David M. Lloyd</a>
78  */

79 final class WorkerThread extends XnioIoThread implements XnioExecutor {
80     private static final long LONGEST_DELAY = 9223372036853L;
81     private static final String FQCN = WorkerThread.class.getName();
82     private static final boolean OLD_LOCKING;
83     private static final boolean THREAD_SAFE_SELECTION_KEYS;
84     private static final long START_TIME = System.nanoTime();
85
86     private final Selector selector;
87     private final Object workLock = new Object();
88
89     private final Queue<Runnable> selectorWorkQueue = new ArrayDeque<Runnable>();
90     private final TreeSet<TimeKey> delayWorkQueue = new TreeSet<TimeKey>();
91
92     private volatile int state;
93
94     private static final int SHUTDOWN = (1 << 31);
95
96     private static final AtomicIntegerFieldUpdater<WorkerThread> stateUpdater = AtomicIntegerFieldUpdater.newUpdater(WorkerThread.class"state");
97
98     static {
99         OLD_LOCKING = Boolean.parseBoolean(AccessController.doPrivileged(new ReadPropertyAction("xnio.nio.old-locking""false")));
100         THREAD_SAFE_SELECTION_KEYS = Boolean.parseBoolean(AccessController.doPrivileged(new ReadPropertyAction("xnio.nio.thread-safe-selection-keys""false")));
101     }
102
103     WorkerThread(final NioXnioWorker worker, final Selector selector, final String name, final ThreadGroup group, final long stackSize, final int number) {
104         super(worker, number, group, name, stackSize);
105         this.selector = selector;
106     }
107
108     static WorkerThread getCurrent() {
109         final Thread thread = currentThread();
110         return thread instanceof WorkerThread ? (WorkerThread) thread : null;
111     }
112
113     public NioXnioWorker getWorker() {
114         return (NioXnioWorker) super.getWorker();
115     }
116
117     protected IoFuture<StreamConnection> acceptTcpStreamConnection(final InetSocketAddress destination, final ChannelListener<? super StreamConnection> openListener, final ChannelListener<? super BoundChannel> bindListener, final OptionMap optionMap) {
118         try {
119             getWorker().checkShutdown();
120         } catch (ClosedWorkerException e) {
121             return new FailedIoFuture<StreamConnection>(e);
122         }
123         final FutureResult<StreamConnection> futureResult = new FutureResult<StreamConnection>(this);
124         try {
125             boolean ok = false;
126             final ServerSocketChannel serverChannel = ServerSocketChannel.open();
127             try {
128                 serverChannel.configureBlocking(false);
129                 if (optionMap.contains(Options.RECEIVE_BUFFER)) {
130                     serverChannel.socket().setReceiveBufferSize(optionMap.get(Options.RECEIVE_BUFFER, -1));
131                 }
132                 serverChannel.socket().setReuseAddress(optionMap.get(Options.REUSE_ADDRESSES, true));
133                 serverChannel.bind(destination);
134                 if (bindListener != null) ChannelListeners.invokeChannelListener(new BoundChannel() {
135                     public SocketAddress getLocalAddress() {
136                         return serverChannel.socket().getLocalSocketAddress();
137                     }
138
139                     public <A extends SocketAddress> A getLocalAddress(final Class<A> type) {
140                         final SocketAddress address = getLocalAddress();
141                         return type.isInstance(address) ? type.cast(address) : null;
142                     }
143
144                     public ChannelListener.Setter<? extends BoundChannel> getCloseSetter() {
145                         return new ChannelListener.SimpleSetter<BoundChannel>();
146                     }
147
148                     public XnioWorker getWorker() {
149                         return WorkerThread.this.getWorker();
150                     }
151
152                     public XnioIoThread getIoThread() {
153                         return WorkerThread.this;
154                     }
155
156                     public void close() throws IOException {
157                         serverChannel.close();
158                     }
159
160                     public boolean isOpen() {
161                         return serverChannel.isOpen();
162                     }
163
164                     public boolean supportsOption(final Option<?> option) {
165                         return false;
166                     }
167
168                     public <T> T getOption(final Option<T> option) throws IOException {
169                         return null;
170                     }
171
172                     public <T> T setOption(final Option<T> option, final T value) throws IllegalArgumentException, IOException {
173                         return null;
174                     }
175                 }, bindListener);
176                 final SelectionKey key = this.registerChannel(serverChannel);
177                 final NioHandle handle = new NioHandle(this, key) {
178                     void handleReady(final int ops) {
179                         boolean ok = false;
180                         try {
181                             final SocketChannel channel = serverChannel.accept();
182                             if (channel == null) {
183                                 ok = true;
184                                 return;
185                             } else {
186                                 safeClose(serverChannel);
187                             }
188                             try {
189                                 channel.configureBlocking(false);
190                                 if (optionMap.contains(Options.TCP_OOB_INLINE)) channel.socket().setOOBInline(optionMap.get(Options.TCP_OOB_INLINE, false));
191                                 if (optionMap.contains(Options.TCP_NODELAY)) channel.socket().setTcpNoDelay(optionMap.get(Options.TCP_NODELAY, false));
192                                 if (optionMap.contains(Options.IP_TRAFFIC_CLASS)) channel.socket().setTrafficClass(optionMap.get(Options.IP_TRAFFIC_CLASS, -1));
193                                 if (optionMap.contains(Options.CLOSE_ABORT)) channel.socket().setSoLinger(optionMap.get(Options.CLOSE_ABORT, false), 0);
194                                 if (optionMap.contains(Options.KEEP_ALIVE)) channel.socket().setKeepAlive(optionMap.get(Options.KEEP_ALIVE, false));
195                                 if (optionMap.contains(Options.SEND_BUFFER)) channel.socket().setSendBufferSize(optionMap.get(Options.SEND_BUFFER, -1));
196                                 final SelectionKey selectionKey = WorkerThread.this.registerChannel(channel);
197                                 final NioSocketStreamConnection connection = new NioSocketStreamConnection(WorkerThread.this, selectionKey, null);
198                                 if (optionMap.contains(Options.READ_TIMEOUT)) connection.setOption(Options.READ_TIMEOUT, optionMap.get(Options.READ_TIMEOUT, 0));
199                                 if (optionMap.contains(Options.WRITE_TIMEOUT)) connection.setOption(Options.WRITE_TIMEOUT, optionMap.get(Options.WRITE_TIMEOUT, 0));
200                                 if (futureResult.setResult(connection)) {
201                                     ok = true;
202                                     ChannelListeners.invokeChannelListener(connection, openListener);
203                                 }
204                             } finally {
205                                 if (! ok) safeClose(channel);
206                             }
207                         } catch (IOException e) {
208                             futureResult.setException(e);
209                         } finally {
210                             if (! ok) {
211                                 safeClose(serverChannel);
212                             }
213                         }
214                     }
215
216                     void terminated() {
217                     }
218
219                     void forceTermination() {
220                         futureResult.setCancelled();
221                     }
222                 };
223                 key.attach(handle);
224                 handle.resume(SelectionKey.OP_ACCEPT);
225                 ok = true;
226                 futureResult.addCancelHandler(new Cancellable() {
227                     public Cancellable cancel() {
228                         if (futureResult.setCancelled()) {
229                             safeClose(serverChannel);
230                         }
231                         return this;
232                     }
233                 });
234                 return futureResult.getIoFuture();
235             } finally {
236                 if (! ok) safeClose(serverChannel);
237             }
238         } catch (IOException e) {
239             return new FailedIoFuture<StreamConnection>(e);
240         }
241     }
242
243     protected IoFuture<StreamConnection> openTcpStreamConnection(final InetSocketAddress bindAddress, final InetSocketAddress destinationAddress, final ChannelListener<? super StreamConnection> openListener, final ChannelListener<? super BoundChannel> bindListener, final OptionMap optionMap) {
244         try {
245             getWorker().checkShutdown();
246         } catch (ClosedWorkerException e) {
247             return new FailedIoFuture<StreamConnection>(e);
248         }
249         try {
250             final SocketChannel channel = SocketChannel.open();
251             boolean ok = false;
252             try {
253                 channel.configureBlocking(false);
254                 if (optionMap.contains(Options.TCP_OOB_INLINE)) channel.socket().setOOBInline(optionMap.get(Options.TCP_OOB_INLINE, false));
255                 if (optionMap.contains(Options.TCP_NODELAY)) channel.socket().setTcpNoDelay(optionMap.get(Options.TCP_NODELAY, false));
256                 if (optionMap.contains(Options.IP_TRAFFIC_CLASS)) channel.socket().setTrafficClass(optionMap.get(Options.IP_TRAFFIC_CLASS, -1));
257                 if (optionMap.contains(Options.CLOSE_ABORT)) channel.socket().setSoLinger(optionMap.get(Options.CLOSE_ABORT, false), 0);
258                 if (optionMap.contains(Options.KEEP_ALIVE)) channel.socket().setKeepAlive(optionMap.get(Options.KEEP_ALIVE, false));
259                 if (optionMap.contains(Options.RECEIVE_BUFFER)) channel.socket().setReceiveBufferSize(optionMap.get(Options.RECEIVE_BUFFER, -1));
260                 if (optionMap.contains(Options.REUSE_ADDRESSES)) channel.socket().setReuseAddress(optionMap.get(Options.REUSE_ADDRESSES, false));
261                 if (optionMap.contains(Options.SEND_BUFFER)) channel.socket().setSendBufferSize(optionMap.get(Options.SEND_BUFFER, -1));
262                 final SelectionKey key = registerChannel(channel);
263                 final NioSocketStreamConnection connection = new NioSocketStreamConnection(this, key, null);
264                 if (optionMap.contains(Options.READ_TIMEOUT)) connection.setOption(Options.READ_TIMEOUT, optionMap.get(Options.READ_TIMEOUT, 0));
265                 if (optionMap.contains(Options.WRITE_TIMEOUT)) connection.setOption(Options.WRITE_TIMEOUT, optionMap.get(Options.WRITE_TIMEOUT, 0));
266                 if (bindAddress != null || bindListener != null) {
267                     channel.socket().bind(bindAddress);
268                     ChannelListeners.invokeChannelListener(connection, bindListener);
269                 }
270                 if (channel.connect(destinationAddress)) {
271                     selectorLog.tracef("Synchronous connect");
272                     execute(ChannelListeners.getChannelListenerTask(connection, openListener));
273                     final FinishedIoFuture<StreamConnection> finishedIoFuture = new FinishedIoFuture<StreamConnection>(connection);
274                     ok = true;
275                     return finishedIoFuture;
276                 }
277                 selectorLog.tracef("Asynchronous connect");
278                 final FutureResult<StreamConnection> futureResult = new FutureResult<StreamConnection>(this);
279                 final ConnectHandle connectHandle = new ConnectHandle(this, key, futureResult, connection, openListener);
280                 key.attach(connectHandle);
281                 futureResult.addCancelHandler(new Cancellable() {
282                     public Cancellable cancel() {
283                         if (futureResult.setCancelled()) {
284                             safeClose(connection);
285                         }
286                         return this;
287                     }
288                 });
289                 connectHandle.resume(SelectionKey.OP_CONNECT);
290                 ok = true;
291                 return futureResult.getIoFuture();
292             } finally {
293                 if (! ok) safeClose(channel);
294             }
295         } catch (IOException e) {
296             return new FailedIoFuture<StreamConnection>(e);
297         }
298     }
299
300     WorkerThread getNextThread() {
301         final WorkerThread[] all = getWorker().getAll();
302         final int number = getNumber();
303         if (number == all.length - 1) {
304             return all[0];
305         } else {
306             return all[number + 1];
307         }
308     }
309
310     static final class ConnectHandle extends NioHandle {
311
312         private final FutureResult<StreamConnection> futureResult;
313         private final NioSocketStreamConnection connection;
314         private final ChannelListener<? super StreamConnection> openListener;
315
316         ConnectHandle(final WorkerThread workerThread, final SelectionKey selectionKey, final FutureResult<StreamConnection> futureResult, final NioSocketStreamConnection connection, final ChannelListener<? super StreamConnection> openListener) {
317             super(workerThread, selectionKey);
318             this.futureResult = futureResult;
319             this.connection = connection;
320             this.openListener = openListener;
321         }
322
323         void handleReady(final int ops) {
324             final SocketChannel channel = getChannel();
325             boolean ok = false;
326             try {
327                 if (channel.finishConnect()) {
328                     selectorLog.tracef("handleReady connect finished");
329                     suspend(SelectionKey.OP_CONNECT);
330                     getSelectionKey().attach(connection.getConduit());
331                     if (futureResult.setResult(connection)) {
332                         ok = true;
333                         ChannelListeners.invokeChannelListener(connection, openListener);
334                     }
335                 }
336             } catch (IOException e) {
337                 selectorLog.tracef("ConnectHandle.handleReady Exception, %s", e);
338                 futureResult.setException(e);
339             } finally {
340                 if (!ok) {
341                     selectorLog.tracef("!OK, closing connection");
342                     safeClose(connection);
343                 }
344             }
345         }
346
347         private SocketChannel getChannel() {
348             return (SocketChannel) getSelectionKey().channel();
349         }
350
351         void forceTermination() {
352             futureResult.setCancelled();
353             safeClose(getChannel());
354         }
355
356         void terminated() {
357         }
358     }
359
360     private static WorkerThread getPeerThread(final XnioIoFactory peer) throws ClosedWorkerException {
361         final WorkerThread peerThread;
362         if (peer instanceof NioXnioWorker) {
363             final NioXnioWorker peerWorker = (NioXnioWorker) peer;
364             peerWorker.checkShutdown();
365             peerThread = peerWorker.chooseThread();
366         } else if (peer instanceof WorkerThread) {
367             peerThread = (WorkerThread) peer;
368             peerThread.getWorker().checkShutdown();
369         } else {
370             throw log.notNioProvider();
371         }
372         return peerThread;
373     }
374
375     public ChannelPipe<StreamConnection, StreamConnection> createFullDuplexPipeConnection(XnioIoFactory peer) throws IOException {
376         getWorker().checkShutdown();
377         boolean ok = false;
378         final Pipe topPipe = Pipe.open();
379         try {
380             topPipe.source().configureBlocking(false);
381             topPipe.sink().configureBlocking(false);
382             final Pipe bottomPipe = Pipe.open();
383             try {
384                 bottomPipe.source().configureBlocking(false);
385                 bottomPipe.sink().configureBlocking(false);
386                 final WorkerThread peerThread = getPeerThread(peer);
387                 final SelectionKey topSourceKey = registerChannel(topPipe.source());
388                 final SelectionKey topSinkKey = peerThread.registerChannel(topPipe.sink());
389                 final SelectionKey bottomSourceKey = peerThread.registerChannel(bottomPipe.source());
390                 final SelectionKey bottomSinkKey = registerChannel(bottomPipe.sink());
391                 final NioPipeStreamConnection leftConnection = new NioPipeStreamConnection(this, bottomSourceKey, topSinkKey);
392                 final NioPipeStreamConnection rightConnection = new NioPipeStreamConnection(this, topSourceKey, bottomSinkKey);
393                 final ChannelPipe<StreamConnection, StreamConnection> result = new ChannelPipe<StreamConnection, StreamConnection>(leftConnection, rightConnection);
394                 ok = true;
395                 return result;
396             } finally {
397                 if (! ok) {
398                     safeClose(bottomPipe.sink());
399                     safeClose(bottomPipe.source());
400                 }
401             }
402         } finally {
403             if (! ok) {
404                 safeClose(topPipe.sink());
405                 safeClose(topPipe.source());
406             }
407         }
408     }
409
410     public ChannelPipe<StreamSourceChannel, StreamSinkChannel> createHalfDuplexPipe(final XnioIoFactory peer) throws IOException {
411         getWorker().checkShutdown();
412         final Pipe pipe = Pipe.open();
413         boolean ok = false;
414         try {
415             pipe.source().configureBlocking(false);
416             pipe.sink().configureBlocking(false);
417             final WorkerThread peerThread = getPeerThread(peer);
418             final SelectionKey readKey = registerChannel(pipe.source());
419             final SelectionKey writeKey = peerThread.registerChannel(pipe.sink());
420             final NioPipeStreamConnection leftConnection = new NioPipeStreamConnection(this, readKey, null);
421             final NioPipeStreamConnection rightConnection = new NioPipeStreamConnection(thisnull, writeKey);
422             leftConnection.writeClosed();
423             rightConnection.readClosed();
424             final ChannelPipe<StreamSourceChannel,StreamSinkChannel> result = new ChannelPipe<StreamSourceChannel, StreamSinkChannel>(leftConnection.getSourceChannel(), rightConnection.getSinkChannel());
425             ok = true;
426             return result;
427         } finally {
428             if (! ok) {
429                 safeClose(pipe.sink());
430                 safeClose(pipe.source());
431             }
432         }
433     }
434
435     volatile boolean polling;
436
437     public void run() {
438         final Selector selector = this.selector;
439         try {
440             log.tracef("Starting worker thread %s"this);
441             final Object lock = workLock;
442             final Queue<Runnable> workQueue = selectorWorkQueue;
443             final TreeSet<TimeKey> delayQueue = delayWorkQueue;
444             log.debugf("Started channel thread '%s', selector %s", currentThread().getName(), selector);
445             Runnable task;
446             Iterator<TimeKey> iterator;
447             long delayTime = Long.MAX_VALUE;
448             Set<SelectionKey> selectedKeys;
449             SelectionKey[] keys = new SelectionKey[16];
450             int oldState;
451             int keyCount;
452             for (;;) {
453                 // Run all tasks
454                 do {
455                     synchronized (lock) {
456                         task = workQueue.poll();
457                         if (task == null) {
458                             iterator = delayQueue.iterator();
459                             delayTime = Long.MAX_VALUE;
460                             if (iterator.hasNext()) {
461                                 final long now = nanoTime();
462                                 do {
463                                     final TimeKey key = iterator.next();
464                                     if (key.deadline <= (now - START_TIME)) {
465                                         workQueue.add(key.command);
466                                         iterator.remove();
467                                     } else {
468                                         delayTime = key.deadline - (now - START_TIME);
469                                         // the rest are in the future
470                                         break;
471                                     }
472                                 } while (iterator.hasNext());
473                             }
474                             task = workQueue.poll();
475                         }
476                     }
477                     // clear interrupt status
478                     Thread.interrupted();
479                     safeRun(task);
480                 } while (task != null);
481                 // all tasks have been run
482                 oldState = state;
483                 if ((oldState & SHUTDOWN) != 0) {
484                     synchronized (lock) {
485                         keyCount = selector.keys().size();
486                         state = keyCount | SHUTDOWN;
487                         if (keyCount == 0 && workQueue.isEmpty()) {
488                             // no keys or tasks left, shut down (delay tasks are discarded)
489                             return;
490                         }
491                     }
492                     synchronized (selector) {
493                         final Set<SelectionKey> keySet = selector.keys();
494                         synchronized (keySet) {
495                             keys = keySet.toArray(keys);
496                             Arrays.fill(keys, keySet.size(), keys.length, null);
497                         }
498                     }
499                     // shut em down
500                     for (int i = 0; i < keys.length; i++) {
501                         final SelectionKey key = keys[i];
502                         if (key == nullbreak//end of list
503                         keys[i] = null;
504                         final NioHandle attachment = (NioHandle) key.attachment();
505                         if (attachment != null) {
506                             safeClose(key.channel());
507                             attachment.forceTermination();
508                         }
509                     }
510                     Arrays.fill(keys, 0, keys.length, null);
511                 }
512                 // clear interrupt status
513                 Thread.interrupted();
514                 // perform select
515                 try {
516                     if ((oldState & SHUTDOWN) != 0) {
517                         selectorLog.tracef("Beginning select on %s (shutdown in progress)", selector);
518                         selector.selectNow();
519                     } else if (delayTime == Long.MAX_VALUE) {
520                         selectorLog.tracef("Beginning select on %s", selector);
521                         polling = true;
522                         try {
523                             Runnable item = null;
524                             synchronized (lock) {
525                                item =  workQueue.peek();
526                             }
527                             if (item != null) {
528                                 log.tracef("SelectNow, queue is not empty");
529                                 selector.selectNow();
530                             } else {
531                                 log.tracef("Select, queue is empty");
532                                 selector.select();
533                             }
534                         } finally {
535                             polling = false;
536                         }
537                     } else {
538                         final long millis = 1L + delayTime / 1000000L;
539                         selectorLog.tracef("Beginning select on %s (with timeout)", selector);
540                         polling = true;
541                         try {
542                             Runnable item = null;
543                             synchronized (lock) {
544                                item =  workQueue.peek();
545                             }
546                             if (item != null) {
547                                 log.tracef("SelectNow, queue is not empty");
548                                 selector.selectNow();
549                             } else {
550                                 log.tracef("Select, queue is empty");
551                                 selector.select(millis);
552                             }
553                         } finally {
554                             polling = false;
555                         }
556                     }
557                 } catch (CancelledKeyException ignored) {
558                     // Mac and other buggy implementations sometimes spits these out
559                     selectorLog.trace("Spurious cancelled key exception");
560                 } catch (IOException e) {
561                     selectorLog.selectionError(e);
562                     // hopefully transient; should never happen
563                 }
564                 selectorLog.tracef("Selected on %s", selector);
565                 // iterate the ready key set
566                 synchronized (selector) {
567                     selectedKeys = selector.selectedKeys();
568                     synchronized (selectedKeys) {
569                         // copy so that handlers can safely cancel keys
570                         keys = selectedKeys.toArray(keys);
571                         Arrays.fill(keys, selectedKeys.size(), keys.length, null);
572                         selectedKeys.clear();
573                     }
574                 }
575                 for (int i = 0; i < keys.length; i++) {
576                     final SelectionKey key = keys[i];
577                     if (key == nullbreak//end of list
578                     keys[i] = null;
579                     final int ops;
580                     try {
581                         ops = key.interestOps();
582                         if (ops != 0) {
583                             selectorLog.tracef("Selected key %s for %s", key, key.channel());
584                             final NioHandle handle = (NioHandle) key.attachment();
585                             if (handle == null) {
586                                 cancelKey(key, false);
587                             } else {
588                                 // clear interrupt status
589                                 Thread.interrupted();
590                                 selectorLog.tracef("Calling handleReady key %s for %s", key.readyOps(), key.channel());
591                                 handle.handleReady(key.readyOps());
592                             }
593                         }
594                     } catch (CancelledKeyException ignored) {
595                         selectorLog.tracef("Skipping selection of cancelled key %s", key);
596                     } catch (Throwable t) {
597                         selectorLog.tracef(t, "Unexpected failure of selection of key %s", key);
598                     }
599                 }
600                 // all selected keys invoked; loop back to run tasks
601             }
602         } finally {
603             log.tracef("Shutting down channel thread \"%s\""this);
604             safeClose(selector);
605             getWorker().closeResource();
606         }
607     }
608
609     private static void safeRun(final Runnable command) {
610         if (command != nulltry {
611             log.tracef("Running task %s", command);
612             command.run();
613         } catch (Throwable t) {
614             log.taskFailed(command, t);
615         }
616     }
617
618     public void execute(final Runnable command) {
619         if ((state & SHUTDOWN) != 0) {
620             throw log.threadExiting();
621         }
622         synchronized (workLock) {
623             selectorWorkQueue.add(command);
624             log.tracef("Added task %s", command);
625         }
626         if (polling) { // flag is always false if we're the same thread
627             selector.wakeup();
628         } else {
629             log.tracef("Not polling, no wakeup");
630         }
631     }
632
633     void shutdown() {
634         int oldState;
635         do {
636             oldState = state;
637             if ((oldState & SHUTDOWN) != 0) {
638                 // idempotent
639                 return;
640             }
641         } while (! stateUpdater.compareAndSet(this, oldState, oldState | SHUTDOWN));
642         if(currentThread() != this) {
643             selector.wakeup();
644         }
645     }
646
647     public Key executeAfter(final Runnable command, final long time, final TimeUnit unit) {
648         final long millis = unit.toMillis(time);
649         if ((state & SHUTDOWN) != 0) {
650             throw log.threadExiting();
651         }
652         if (millis <= 0) {
653             execute(command);
654             return Key.IMMEDIATE;
655         }
656         final long deadline = (nanoTime() - START_TIME) + Math.min(millis, LONGEST_DELAY) * 1000000L;
657         final TimeKey key = new TimeKey(deadline, command);
658         synchronized (workLock) {
659             final TreeSet<TimeKey> queue = delayWorkQueue;
660             queue.add(key);
661             if (queue.iterator().next() == key) {
662                 // we're the next one up; poke the selector to update its delay time
663                 if (polling) { // flag is always false if we're the same thread
664                     selector.wakeup();
665                 }
666             }
667             return key;
668         }
669     }
670
671     class RepeatKey implements Key, Runnable {
672         private final Runnable command;
673         private final long millis;
674         private final AtomicReference<Key> current = new AtomicReference<>();
675
676         RepeatKey(final Runnable command, final long millis) {
677             this.command = command;
678             this.millis = millis;
679         }
680
681         public boolean remove() {
682             final Key removed = current.getAndSet(this);
683             // removed key should not be null because remove cannot be called before it is populated.
684             assert removed != null;
685             return removed != this && removed.remove();
686         }
687
688         void setFirst(Key key) {
689             current.compareAndSet(null, key);
690         }
691
692         public void run() {
693             try {
694                 command.run();
695             } finally {
696                 Key o, n;
697                 o = current.get();
698                 if (o != this) {
699                     n = executeAfter(this, millis, TimeUnit.MILLISECONDS);
700                     if (!current.compareAndSet(o, n)) {
701                         n.remove();
702                     }
703                 }
704             }
705         }
706     }
707
708     public Key executeAtInterval(final Runnable command, final long time, final TimeUnit unit) {
709         final long millis = unit.toMillis(time);
710         final RepeatKey repeatKey = new RepeatKey(command, millis);
711         final Key firstKey = executeAfter(repeatKey, millis, TimeUnit.MILLISECONDS);
712         repeatKey.setFirst(firstKey);
713         return repeatKey;
714     }
715
716     SelectionKey registerChannel(final AbstractSelectableChannel channel) throws ClosedChannelException {
717         if (currentThread() == this) {
718             return channel.register(selector, 0);
719         } else if (THREAD_SAFE_SELECTION_KEYS) {
720             try {
721                 return channel.register(selector, 0);
722             } finally {
723                 if (polling) selector.wakeup();
724             }
725         } else {
726             final SynchTask task = new SynchTask();
727             queueTask(task);
728             try {
729                 // Prevent selector from sleeping until we're done!
730                 selector.wakeup();
731                 return channel.register(selector, 0);
732             } finally {
733                 task.done();
734             }
735         }
736     }
737
738     void queueTask(final Runnable task) {
739         synchronized (workLock) {
740             selectorWorkQueue.add(task);
741         }
742     }
743
744     void cancelKey(final SelectionKey key, final boolean block) {
745         assert key.selector() == selector;
746         final SelectableChannel channel = key.channel();
747         if (currentThread() == this) {
748             log.logf(FQCN, Logger.Level.TRACE, null"Cancelling key %s of %s (same thread)", key, channel);
749             try {
750                 key.cancel();
751                 try {
752                     selector.selectNow();
753                 } catch (IOException e) {
754                     log.selectionError(e);
755                 }
756             } catch (Throwable t) {
757                 log.logf(FQCN, Logger.Level.TRACE, t, "Error cancelling key %s of %s (same thread)", key, channel);
758             }
759         } else if (OLD_LOCKING) {
760             log.logf(FQCN, Logger.Level.TRACE, null"Cancelling key %s of %s (same thread, old locking)", key, channel);
761             final SynchTask task = new SynchTask();
762             queueTask(task);
763             try {
764                 // Prevent selector from sleeping until we're done!
765                 selector.wakeup();
766                 key.cancel();
767             } catch (Throwable t) {
768                 log.logf(FQCN, Logger.Level.TRACE, t, "Error cancelling key %s of %s (same thread, old locking)", key, channel);
769             } finally {
770                 task.done();
771             }
772         } else {
773             log.logf(FQCN, Logger.Level.TRACE, null"Cancelling key %s of %s (other thread)", key, channel);
774             try {
775                 key.cancel();
776                 if (block) {
777                     final SelectNowTask task = new SelectNowTask();
778                     queueTask(task);
779                     selector.wakeup();
780                     // block until the selector is actually deregistered
781                     task.doWait();
782                 } else {
783                     selector.wakeup();
784                 }
785             } catch (Throwable t) {
786                 log.logf(FQCN, Logger.Level.TRACE, t, "Error cancelling key %s of %s (other thread)", key, channel);
787             }
788         }
789     }
790
791     void setOps(final SelectionKey key, final int ops) {
792         if (currentThread() == this) {
793             try {
794                 synchronized(key) {
795                     key.interestOps(key.interestOps() | ops);
796                 }
797             } catch (CancelledKeyException ignored) {}
798         } else if (OLD_LOCKING) {
799             final SynchTask task = new SynchTask();
800             queueTask(task);
801             try {
802                 // Prevent selector from sleeping until we're done!
803                 selector.wakeup();
804                 synchronized(key) {
805                     key.interestOps(key.interestOps() | ops);
806                 }
807             } catch (CancelledKeyException ignored) {
808             } finally {
809                 task.done();
810             }
811         } else {
812             try {
813                 synchronized(key) {
814                     key.interestOps(key.interestOps() | ops);
815                 }
816                 if (polling) selector.wakeup();
817             } catch (CancelledKeyException ignored) {
818             }
819         }
820     }
821
822     void clearOps(final SelectionKey key, final int ops) {
823         if (currentThread() == this || ! OLD_LOCKING) {
824             try {
825                 synchronized(key) {
826                     key.interestOps(key.interestOps() & ~ops);
827                 }
828             } catch (CancelledKeyException ignored) {}
829         } else {
830             final SynchTask task = new SynchTask();
831             queueTask(task);
832             try {
833                 // Prevent selector from sleeping until we're done!
834                 selector.wakeup();
835                 synchronized(key) {
836                     key.interestOps(key.interestOps() & ~ops);
837                 }
838             } catch (CancelledKeyException ignored) {
839             } finally {
840                 task.done();
841             }
842         }
843     }
844
845     Selector getSelector() {
846         return selector;
847     }
848
849     public boolean equals(final Object obj) {
850         return obj == this;
851     }
852
853     public int hashCode() {
854         return identityHashCode(this);
855     }
856
857     static final AtomicLong seqGen = new AtomicLong();
858
859     final class TimeKey implements XnioExecutor.Key, Comparable<TimeKey> {
860         private final long deadline;
861         private final long seq = seqGen.incrementAndGet();
862         private final Runnable command;
863
864         TimeKey(final long deadline, final Runnable command) {
865             this.deadline = deadline;
866             this.command = command;
867         }
868
869         public boolean remove() {
870             synchronized (workLock) {
871                 return delayWorkQueue.remove(this);
872             }
873         }
874
875         public int compareTo(final TimeKey o) {
876             int r = Long.signum(deadline - o.deadline);
877             if (r == 0) r = Long.signum(seq - o.seq);
878             return r;
879         }
880     }
881
882     final class SynchTask implements Runnable {
883         volatile boolean done;
884
885         public void run() {
886             while (! done) {
887                 park();
888             }
889         }
890
891         void done() {
892             done = true;
893             unpark(WorkerThread.this);
894         }
895     }
896
897     final class SelectNowTask implements Runnable {
898         final Thread thread = Thread.currentThread();
899         volatile boolean done;
900
901         void doWait() {
902             while (! done) {
903                 park();
904             }
905         }
906
907         public void run() {
908             try {
909                 selector.selectNow();
910             } catch (IOException ignored) {
911             }
912             done = true;
913             unpark(thread);
914         }
915     }
916 }
917