1 /*
2  * JBoss, Home of Professional Open Source.
3  * Copyright 2019 Red Hat, Inc., and individual contributors
4  * as indicated by the @author tags.
5  *
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18
19 package org.xnio.nio;
20
21 import java.io.IOException;
22 import java.net.SocketAddress;
23 import java.nio.channels.ClosedChannelException;
24 import java.util.ArrayList;
25 import java.util.List;
26 import java.util.Queue;
27 import java.util.concurrent.LinkedBlockingQueue;
28 import java.util.concurrent.TimeUnit;
29
30 import org.wildfly.common.Assert;
31 import org.xnio.ChannelListener;
32 import org.xnio.ChannelListeners;
33 import org.xnio.Option;
34 import org.xnio.StreamConnection;
35 import org.xnio.XnioExecutor;
36 import org.xnio.XnioIoThread;
37 import org.xnio.channels.AcceptListenerSettable;
38 import org.xnio.channels.AcceptingChannel;
39
40 final class QueuedNioTcpServer2 extends AbstractNioChannel<QueuedNioTcpServer2> implements AcceptingChannel<StreamConnection>, AcceptListenerSettable<QueuedNioTcpServer2> {
41     private final NioTcpServer realServer;
42     private final List<Queue<StreamConnection>> acceptQueues;
43
44     private final Runnable acceptTask = this::acceptTask;
45
46     private volatile ChannelListener<? super QueuedNioTcpServer2> acceptListener;
47
48     QueuedNioTcpServer2(final NioTcpServer realServer) {
49         super(realServer.getWorker());
50         this.realServer = realServer;
51         final NioXnioWorker worker = realServer.getWorker();
52         final int cnt = worker.getIoThreadCount();
53         acceptQueues = new ArrayList<>(cnt);
54         for (int i = 0; i < cnt; i ++) {
55             acceptQueues.add(new LinkedBlockingQueue<>());
56         }
57         realServer.getCloseSetter().set(ignored -> invokeCloseHandler());
58         realServer.getAcceptSetter().set(ignored -> handleReady());
59     }
60
61     public StreamConnection accept() throws IOException {
62         final WorkerThread current = WorkerThread.getCurrent();
63         if (current == null) {
64             return null;
65         }
66         final Queue<StreamConnection> socketChannels = acceptQueues.get(current.getNumber());
67         final StreamConnection connection = socketChannels.poll();
68         if (connection == null) {
69             if (! realServer.isOpen()) {
70                 throw new ClosedChannelException();
71             }
72         }
73         return connection;
74     }
75
76     public ChannelListener<? super QueuedNioTcpServer2> getAcceptListener() {
77         return acceptListener;
78     }
79
80     public void setAcceptListener(final ChannelListener<? super QueuedNioTcpServer2> listener) {
81         this.acceptListener = listener;
82     }
83
84     public ChannelListener.Setter<QueuedNioTcpServer2> getAcceptSetter() {
85         return new Setter<QueuedNioTcpServer2>(this);
86     }
87
88     public SocketAddress getLocalAddress() {
89         return realServer.getLocalAddress();
90     }
91
92     public <A extends SocketAddress> A getLocalAddress(final Class<A> type) {
93         return realServer.getLocalAddress(type);
94     }
95
96     public void suspendAccepts() {
97         realServer.suspendAccepts();
98     }
99
100     public void resumeAccepts() {
101         realServer.resumeAccepts();
102     }
103
104     public boolean isAcceptResumed() {
105         return realServer.isAcceptResumed();
106     }
107
108     public void wakeupAccepts() {
109         realServer.wakeupAccepts();
110     }
111
112     public void awaitAcceptable() {
113         throw Assert.unsupported();
114     }
115
116     public void awaitAcceptable(final long time, final TimeUnit timeUnit) {
117         throw Assert.unsupported();
118     }
119
120     @Deprecated
121     public XnioExecutor getAcceptThread() {
122         return getIoThread();
123     }
124
125     public void close() throws IOException {
126         realServer.close();
127     }
128
129     public boolean isOpen() {
130         return realServer.isOpen();
131     }
132
133     public boolean supportsOption(final Option<?> option) {
134         return realServer.supportsOption(option);
135     }
136
137     public <T> T getOption(final Option<T> option) throws IOException {
138         return realServer.getOption(option);
139     }
140
141     public <T> T setOption(final Option<T> option, final T value) throws IllegalArgumentException, IOException {
142         return realServer.setOption(option, value);
143     }
144
145     void handleReady() {
146         final NioTcpServer realServer = this.realServer;
147         NioSocketStreamConnection connection;
148         try {
149             connection = realServer.accept();
150         } catch (ClosedChannelException e) {
151             return;
152         }
153         XnioIoThread thread;
154         if (connection != null) {
155             int i = 0;
156             final Runnable acceptTask = this.acceptTask;
157             do {
158                 thread = connection.getIoThread();
159                 acceptQueues.get(thread.getNumber()).add(connection);
160                 thread.execute(acceptTask);
161                 if (++i == 128) {
162                     // prevent starvation of other acceptors
163                     return;
164                 }
165                 try {
166                     connection = realServer.accept();
167                 } catch (ClosedChannelException e) {
168                     return;
169                 }
170             } while (connection != null);
171         }
172     }
173
174     void acceptTask() {
175         final WorkerThread current = WorkerThread.getCurrent();
176         assert current != null;
177         final Queue<StreamConnection> queue = acceptQueues.get(current.getNumber());
178         ChannelListeners.invokeChannelListener(QueuedNioTcpServer2.this, getAcceptListener());
179         if (! queue.isEmpty()) {
180             current.execute(acceptTask);
181         }
182     }
183 }
184