1
18
19 package org.xnio.nio;
20
21 import java.nio.channels.SelectionKey;
22 import java.util.concurrent.TimeUnit;
23
24 import org.xnio.ChannelListeners;
25
26 import static java.lang.Math.min;
27 import static java.lang.Math.max;
28 import static java.lang.Thread.currentThread;
29
30 import org.jboss.logging.Logger;
31
32 import static org.xnio.IoUtils.safeClose;
33 import static org.xnio.nio.Log.tcpServerConnectionLimitLog;
34 import static org.xnio.nio.Log.tcpServerLog;
35
36
39 final class NioTcpServerHandle extends NioHandle implements ChannelClosed {
40
41 private static final String FQCN = NioTcpServerHandle.class.getName();
42 private final Runnable freeTask;
43 private final NioTcpServer server;
44 private int count;
45 private int low;
46 private int high;
47 private int tokenCount = -1;
48 private boolean stopped;
49 private boolean backOff;
50 private int backOffTime = 0;
51
52 NioTcpServerHandle(final NioTcpServer server, final SelectionKey key, final WorkerThread thread, final int low, final int high) {
53 super(thread, key);
54 this.server = server;
55 this.low = low;
56 this.high = high;
57 freeTask = new Runnable() {
58 public void run() {
59 freeConnection();
60 }
61 };
62 }
63
64 void handleReady(final int ops) {
65 ChannelListeners.invokeChannelListener(server, server.getAcceptListener());
66 }
67
68 void forceTermination() {
69 safeClose(server);
70 }
71
72 void terminated() {
73 server.invokeCloseHandler();
74 }
75
76 Runnable getFreeTask() {
77 return freeTask;
78 }
79
80 void resume() {
81 final WorkerThread thread = getWorkerThread();
82 if (thread == currentThread()) {
83 if (! stopped && ! backOff && server.resumed) super.resume(SelectionKey.OP_ACCEPT);
84 } else {
85 thread.execute(new Runnable() {
86 public void run() {
87 resume();
88 }
89 });
90 }
91 }
92
93 void suspend() {
94 final WorkerThread thread = getWorkerThread();
95 if (thread == currentThread()) {
96 if (stopped || backOff || ! server.resumed) super.suspend(SelectionKey.OP_ACCEPT);
97 } else {
98 thread.execute(new Runnable() {
99 public void run() {
100 suspend();
101 }
102 });
103 }
104 }
105
106 public void channelClosed() {
107 final WorkerThread thread = getWorkerThread();
108 if (thread == currentThread()) {
109 freeConnection();
110 } else {
111 thread.execute(freeTask);
112 }
113 }
114
115 void freeConnection() {
116 assert currentThread() == getWorkerThread();
117 if (count-- <= low && tokenCount != 0 && stopped) {
118 tcpServerConnectionLimitLog.logf(FQCN, Logger.Level.DEBUG, null,
119 "Connection freed, resuming accept connections");
120 stopped = false;
121 if (server.resumed) {
122
123 backOff = false;
124 super.resume(SelectionKey.OP_ACCEPT);
125 }
126 }
127 }
128
129 void setTokenCount(final int newCount) {
130 WorkerThread workerThread = getWorkerThread();
131 if (workerThread == currentThread()) {
132 if (tokenCount == 0) {
133 tokenCount = newCount;
134 if (count <= low && stopped) {
135 stopped = false;
136 if (server.resumed && ! backOff) {
137 super.resume(SelectionKey.OP_ACCEPT);
138 }
139 }
140 return;
141 }
142 workerThread = workerThread.getNextThread();
143 }
144 setThreadNewCount(workerThread, newCount);
145 }
146
147
150 void startBackOff() {
151 backOff = true;
152 backOffTime = max(250, min(30_000, backOffTime << 2));
153 suspend();
154 getWorkerThread().executeAfter(this::endBackOff, backOffTime, TimeUnit.MILLISECONDS);
155 }
156
157
160 void endBackOff() {
161 backOff = false;
162 resume();
163 }
164
165
168 void resetBackOff() {
169 backOffTime = 0;
170 }
171
172 private void setThreadNewCount(final WorkerThread workerThread, final int newCount) {
173 final int number = workerThread.getNumber();
174 workerThread.execute(new Runnable() {
175 public void run() {
176 server.getHandle(number).setTokenCount(newCount);
177 }
178 });
179 }
180
181 void initializeTokenCount(final int newCount) {
182 WorkerThread workerThread = getWorkerThread();
183 final int number = workerThread.getNumber();
184 if (workerThread == currentThread()) {
185 tokenCount = newCount;
186 if (newCount == 0) {
187 stopped = true;
188 super.suspend(SelectionKey.OP_ACCEPT);
189 }
190 } else {
191 workerThread.execute(new Runnable() {
192 public void run() {
193 server.getHandle(number).initializeTokenCount(newCount);
194 }
195 });
196 }
197 }
198
199 boolean getConnection() {
200 assert currentThread() == getWorkerThread();
201 if (stopped || backOff) {
202 tcpServerConnectionLimitLog.logf(FQCN, Logger.Level.DEBUG, null, "Refusing accepting request (temporarily stopped: %s, backed off: %s)", stopped, backOff);
203 return false;
204 }
205 if (tokenCount != -1 && --tokenCount == 0) {
206 setThreadNewCount(getWorkerThread().getNextThread(), server.getTokenConnectionCount());
207 }
208 if (++count >= high || tokenCount == 0) {
209 if (tcpServerLog.isDebugEnabled() && count >= high)
210 tcpServerConnectionLimitLog.logf(FQCN, Logger.Level.DEBUG, null,
211 "Total open connections reach high water limit (%s) by this new accepting request. Temporarily stopping accept connections",
212 high);
213 stopped = true;
214 super.suspend(SelectionKey.OP_ACCEPT);
215 }
216 return true;
217 }
218
219 public void executeSetTask(final int high, final int low) {
220 final WorkerThread thread = getWorkerThread();
221 if (thread == currentThread()) {
222 this.high = high;
223 this.low = low;
224 if (count >= high && ! stopped) {
225 stopped = true;
226 suspend();
227 } else if (count <= low && stopped) {
228 stopped = false;
229 if (server.resumed && ! backOff) resume();
230 }
231 } else {
232 thread.execute(new Runnable() {
233 public void run() {
234 executeSetTask(high, low);
235 }
236 });
237 }
238 }
239
240 int getConnectionCount() {
241 assert currentThread() == getWorkerThread();
242 return count;
243 }
244
245 int getBackOffTime() {
246 return backOffTime;
247 }
248 }
249