1
18
19 package org.xnio.nio;
20
21 import java.io.IOException;
22 import java.net.SocketAddress;
23 import java.nio.channels.ClosedChannelException;
24 import java.util.ArrayList;
25 import java.util.List;
26 import java.util.Queue;
27 import java.util.concurrent.LinkedBlockingQueue;
28 import java.util.concurrent.TimeUnit;
29
30 import org.wildfly.common.Assert;
31 import org.xnio.ChannelListener;
32 import org.xnio.ChannelListeners;
33 import org.xnio.Option;
34 import org.xnio.StreamConnection;
35 import org.xnio.XnioExecutor;
36 import org.xnio.XnioIoThread;
37 import org.xnio.channels.AcceptListenerSettable;
38 import org.xnio.channels.AcceptingChannel;
39
40 final class QueuedNioTcpServer2 extends AbstractNioChannel<QueuedNioTcpServer2> implements AcceptingChannel<StreamConnection>, AcceptListenerSettable<QueuedNioTcpServer2> {
41 private final NioTcpServer realServer;
42 private final List<Queue<StreamConnection>> acceptQueues;
43
44 private final Runnable acceptTask = this::acceptTask;
45
46 private volatile ChannelListener<? super QueuedNioTcpServer2> acceptListener;
47
48 QueuedNioTcpServer2(final NioTcpServer realServer) {
49 super(realServer.getWorker());
50 this.realServer = realServer;
51 final NioXnioWorker worker = realServer.getWorker();
52 final int cnt = worker.getIoThreadCount();
53 acceptQueues = new ArrayList<>(cnt);
54 for (int i = 0; i < cnt; i ++) {
55 acceptQueues.add(new LinkedBlockingQueue<>());
56 }
57 realServer.getCloseSetter().set(ignored -> invokeCloseHandler());
58 realServer.getAcceptSetter().set(ignored -> handleReady());
59 }
60
61 public StreamConnection accept() throws IOException {
62 final WorkerThread current = WorkerThread.getCurrent();
63 if (current == null) {
64 return null;
65 }
66 final Queue<StreamConnection> socketChannels = acceptQueues.get(current.getNumber());
67 final StreamConnection connection = socketChannels.poll();
68 if (connection == null) {
69 if (! realServer.isOpen()) {
70 throw new ClosedChannelException();
71 }
72 }
73 return connection;
74 }
75
76 public ChannelListener<? super QueuedNioTcpServer2> getAcceptListener() {
77 return acceptListener;
78 }
79
80 public void setAcceptListener(final ChannelListener<? super QueuedNioTcpServer2> listener) {
81 this.acceptListener = listener;
82 }
83
84 public ChannelListener.Setter<QueuedNioTcpServer2> getAcceptSetter() {
85 return new Setter<QueuedNioTcpServer2>(this);
86 }
87
88 public SocketAddress getLocalAddress() {
89 return realServer.getLocalAddress();
90 }
91
92 public <A extends SocketAddress> A getLocalAddress(final Class<A> type) {
93 return realServer.getLocalAddress(type);
94 }
95
96 public void suspendAccepts() {
97 realServer.suspendAccepts();
98 }
99
100 public void resumeAccepts() {
101 realServer.resumeAccepts();
102 }
103
104 public boolean isAcceptResumed() {
105 return realServer.isAcceptResumed();
106 }
107
108 public void wakeupAccepts() {
109 realServer.wakeupAccepts();
110 }
111
112 public void awaitAcceptable() {
113 throw Assert.unsupported();
114 }
115
116 public void awaitAcceptable(final long time, final TimeUnit timeUnit) {
117 throw Assert.unsupported();
118 }
119
120 @Deprecated
121 public XnioExecutor getAcceptThread() {
122 return getIoThread();
123 }
124
125 public void close() throws IOException {
126 realServer.close();
127 }
128
129 public boolean isOpen() {
130 return realServer.isOpen();
131 }
132
133 public boolean supportsOption(final Option<?> option) {
134 return realServer.supportsOption(option);
135 }
136
137 public <T> T getOption(final Option<T> option) throws IOException {
138 return realServer.getOption(option);
139 }
140
141 public <T> T setOption(final Option<T> option, final T value) throws IllegalArgumentException, IOException {
142 return realServer.setOption(option, value);
143 }
144
145 void handleReady() {
146 final NioTcpServer realServer = this.realServer;
147 NioSocketStreamConnection connection;
148 try {
149 connection = realServer.accept();
150 } catch (ClosedChannelException e) {
151 return;
152 }
153 XnioIoThread thread;
154 if (connection != null) {
155 int i = 0;
156 final Runnable acceptTask = this.acceptTask;
157 do {
158 thread = connection.getIoThread();
159 acceptQueues.get(thread.getNumber()).add(connection);
160 thread.execute(acceptTask);
161 if (++i == 128) {
162
163 return;
164 }
165 try {
166 connection = realServer.accept();
167 } catch (ClosedChannelException e) {
168 return;
169 }
170 } while (connection != null);
171 }
172 }
173
174 void acceptTask() {
175 final WorkerThread current = WorkerThread.getCurrent();
176 assert current != null;
177 final Queue<StreamConnection> queue = acceptQueues.get(current.getNumber());
178 ChannelListeners.invokeChannelListener(QueuedNioTcpServer2.this, getAcceptListener());
179 if (! queue.isEmpty()) {
180 current.execute(acceptTask);
181 }
182 }
183 }
184