1 /*
2  * JBoss, Home of Professional Open Source
3  *
4  * Copyright 2013 Red Hat, Inc. and/or its affiliates.
5  *
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18
19 package org.xnio.nio;
20
21 import java.nio.channels.SelectionKey;
22 import java.util.concurrent.TimeUnit;
23
24 import org.xnio.ChannelListeners;
25
26 import static java.lang.Math.min;
27 import static java.lang.Math.max;
28 import static java.lang.Thread.currentThread;
29
30 import org.jboss.logging.Logger;
31
32 import static org.xnio.IoUtils.safeClose;
33 import static org.xnio.nio.Log.tcpServerConnectionLimitLog;
34 import static org.xnio.nio.Log.tcpServerLog;
35
36 /**
37 * @author <a href="mailto:david.lloyd@redhat.com">David M. Lloyd</a>
38 */

39 final class NioTcpServerHandle extends NioHandle implements ChannelClosed {
40
41     private static final String FQCN = NioTcpServerHandle.class.getName();
42     private final Runnable freeTask;
43     private final NioTcpServer server;
44     private int count;
45     private int low;
46     private int high;
47     private int tokenCount = -1;
48     private boolean stopped;
49     private boolean backOff;
50     private int backOffTime = 0;
51
52     NioTcpServerHandle(final NioTcpServer server, final SelectionKey key, final WorkerThread thread, final int low, final int high) {
53         super(thread, key);
54         this.server = server;
55         this.low = low;
56         this.high = high;
57         freeTask = new Runnable() {
58             public void run() {
59                 freeConnection();
60             }
61         };
62     }
63
64     void handleReady(final int ops) {
65         ChannelListeners.invokeChannelListener(server, server.getAcceptListener());
66     }
67
68     void forceTermination() {
69         safeClose(server);
70     }
71
72     void terminated() {
73         server.invokeCloseHandler();
74     }
75
76     Runnable getFreeTask() {
77         return freeTask;
78     }
79
80     void resume() {
81         final WorkerThread thread = getWorkerThread();
82         if (thread == currentThread()) {
83             if (! stopped && ! backOff && server.resumed) super.resume(SelectionKey.OP_ACCEPT);
84         } else {
85             thread.execute(new Runnable() {
86                 public void run() {
87                     resume();
88                 }
89             });
90         }
91     }
92
93     void suspend() {
94         final WorkerThread thread = getWorkerThread();
95         if (thread == currentThread()) {
96             if (stopped || backOff || ! server.resumed) super.suspend(SelectionKey.OP_ACCEPT);
97         } else {
98             thread.execute(new Runnable() {
99                 public void run() {
100                     suspend();
101                 }
102             });
103         }
104     }
105
106     public void channelClosed() {
107         final WorkerThread thread = getWorkerThread();
108         if (thread == currentThread()) {
109             freeConnection();
110         } else {
111             thread.execute(freeTask);
112         }
113     }
114
115     void freeConnection() {
116         assert currentThread() == getWorkerThread();
117         if (count-- <= low && tokenCount != 0 && stopped) {
118             tcpServerConnectionLimitLog.logf(FQCN, Logger.Level.DEBUG, null,
119                     "Connection freed, resuming accept connections");
120             stopped = false;
121             if (server.resumed) {
122                 // end backoff optimistically
123                 backOff = false;
124                 super.resume(SelectionKey.OP_ACCEPT);
125             }
126         }
127     }
128
129     void setTokenCount(final int newCount) {
130         WorkerThread workerThread = getWorkerThread();
131         if (workerThread == currentThread()) {
132             if (tokenCount == 0) {
133                 tokenCount = newCount;
134                 if (count <= low && stopped) {
135                     stopped = false;
136                     if (server.resumed && ! backOff) {
137                         super.resume(SelectionKey.OP_ACCEPT);
138                     }
139                 }
140                 return;
141             }
142             workerThread = workerThread.getNextThread();
143         }
144         setThreadNewCount(workerThread, newCount);
145     }
146
147     /**
148      * Start back-off, when an accept produces an exception.
149      */

150     void startBackOff() {
151         backOff = true;
152         backOffTime = max(250, min(30_000, backOffTime << 2));
153         suspend();
154         getWorkerThread().executeAfter(this::endBackOff, backOffTime, TimeUnit.MILLISECONDS);
155     }
156
157     /**
158      * End back-off, when an accept may be retried.
159      */

160     void endBackOff() {
161         backOff = false;
162         resume();
163     }
164
165     /**
166      * Reset back-off, when an accept has succeeded.
167      */

168     void resetBackOff() {
169         backOffTime = 0;
170     }
171
172     private void setThreadNewCount(final WorkerThread workerThread, final int newCount) {
173         final int number = workerThread.getNumber();
174         workerThread.execute(new Runnable() {
175             public void run() {
176                 server.getHandle(number).setTokenCount(newCount);
177             }
178         });
179     }
180
181     void initializeTokenCount(final int newCount) {
182         WorkerThread workerThread = getWorkerThread();
183         final int number = workerThread.getNumber();
184         if (workerThread == currentThread()) {
185             tokenCount = newCount;
186             if (newCount == 0) {
187                 stopped = true;
188                 super.suspend(SelectionKey.OP_ACCEPT);
189             }
190         } else {
191             workerThread.execute(new Runnable() {
192                 public void run() {
193                     server.getHandle(number).initializeTokenCount(newCount);
194                 }
195             });
196         }
197     }
198
199     boolean getConnection() {
200         assert currentThread() == getWorkerThread();
201         if (stopped || backOff) {
202             tcpServerConnectionLimitLog.logf(FQCN, Logger.Level.DEBUG, null"Refusing accepting request (temporarily stopped: %s, backed off: %s)", stopped, backOff);
203             return false;
204         }
205         if (tokenCount != -1 && --tokenCount == 0) {
206             setThreadNewCount(getWorkerThread().getNextThread(), server.getTokenConnectionCount());
207         }
208         if (++count >= high || tokenCount == 0) {
209             if (tcpServerLog.isDebugEnabled() && count >= high)
210                 tcpServerConnectionLimitLog.logf(FQCN, Logger.Level.DEBUG, null,
211                             "Total open connections reach high water limit (%s) by this new accepting request. Temporarily stopping accept connections",
212                             high);
213             stopped = true;
214             super.suspend(SelectionKey.OP_ACCEPT);
215         }
216         return true;
217     }
218
219     public void executeSetTask(final int high, final int low) {
220         final WorkerThread thread = getWorkerThread();
221         if (thread == currentThread()) {
222             this.high = high;
223             this.low = low;
224             if (count >= high && ! stopped) {
225                 stopped = true;
226                 suspend();
227             } else if (count <= low && stopped) {
228                 stopped = false;
229                 if (server.resumed && ! backOff) resume();
230             }
231         } else {
232             thread.execute(new Runnable() {
233                 public void run() {
234                     executeSetTask(high, low);
235                 }
236             });
237         }
238     }
239
240     int getConnectionCount() {
241         assert currentThread() == getWorkerThread();
242         return count;
243     }
244
245     int getBackOffTime() {
246         return backOffTime;
247     }
248 }
249