1
17 package okhttp3.internal.connection;
18
19 import java.io.IOException;
20 import java.lang.ref.Reference;
21 import java.net.Proxy;
22 import java.util.ArrayDeque;
23 import java.util.ArrayList;
24 import java.util.Deque;
25 import java.util.Iterator;
26 import java.util.List;
27 import java.util.concurrent.Executor;
28 import java.util.concurrent.SynchronousQueue;
29 import java.util.concurrent.ThreadPoolExecutor;
30 import java.util.concurrent.TimeUnit;
31 import javax.annotation.Nullable;
32 import okhttp3.Address;
33 import okhttp3.Route;
34 import okhttp3.internal.Util;
35 import okhttp3.internal.connection.Transmitter.TransmitterReference;
36 import okhttp3.internal.platform.Platform;
37
38 import static okhttp3.internal.Util.closeQuietly;
39
40 public final class RealConnectionPool {
41
46 private static final Executor executor = new ThreadPoolExecutor(0 ,
47 Integer.MAX_VALUE , 60L , TimeUnit.SECONDS,
48 new SynchronousQueue<>(), Util.threadFactory("OkHttp ConnectionPool", true));
49
50
51 private final int maxIdleConnections;
52 private final long keepAliveDurationNs;
53 private final Runnable cleanupRunnable = () -> {
54 while (true) {
55 long waitNanos = cleanup(System.nanoTime());
56 if (waitNanos == -1) return;
57 if (waitNanos > 0) {
58 long waitMillis = waitNanos / 1000000L;
59 waitNanos -= (waitMillis * 1000000L);
60 synchronized (RealConnectionPool.this) {
61 try {
62 RealConnectionPool.this.wait(waitMillis, (int) waitNanos);
63 } catch (InterruptedException ignored) {
64 }
65 }
66 }
67 }
68 };
69
70 private final Deque<RealConnection> connections = new ArrayDeque<>();
71 final RouteDatabase routeDatabase = new RouteDatabase();
72 boolean cleanupRunning;
73
74 public RealConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
75 this.maxIdleConnections = maxIdleConnections;
76 this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration);
77
78
79 if (keepAliveDuration <= 0) {
80 throw new IllegalArgumentException("keepAliveDuration <= 0: " + keepAliveDuration);
81 }
82 }
83
84 public synchronized int idleConnectionCount() {
85 int total = 0;
86 for (RealConnection connection : connections) {
87 if (connection.transmitters.isEmpty()) total++;
88 }
89 return total;
90 }
91
92 public synchronized int connectionCount() {
93 return connections.size();
94 }
95
96
104 boolean transmitterAcquirePooledConnection(Address address, Transmitter transmitter,
105 @Nullable List<Route> routes, boolean requireMultiplexed) {
106 assert (Thread.holdsLock(this));
107 for (RealConnection connection : connections) {
108 if (requireMultiplexed && !connection.isMultiplexed()) continue;
109 if (!connection.isEligible(address, routes)) continue;
110 transmitter.acquireConnectionNoEvents(connection);
111 return true;
112 }
113 return false;
114 }
115
116 void put(RealConnection connection) {
117 assert (Thread.holdsLock(this));
118 if (!cleanupRunning) {
119 cleanupRunning = true;
120 executor.execute(cleanupRunnable);
121 }
122 connections.add(connection);
123 }
124
125
129 boolean connectionBecameIdle(RealConnection connection) {
130 assert (Thread.holdsLock(this));
131 if (connection.noNewExchanges || maxIdleConnections == 0) {
132 connections.remove(connection);
133 return true;
134 } else {
135 notifyAll();
136 return false;
137 }
138 }
139
140 public void evictAll() {
141 List<RealConnection> evictedConnections = new ArrayList<>();
142 synchronized (this) {
143 for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
144 RealConnection connection = i.next();
145 if (connection.transmitters.isEmpty()) {
146 connection.noNewExchanges = true;
147 evictedConnections.add(connection);
148 i.remove();
149 }
150 }
151 }
152
153 for (RealConnection connection : evictedConnections) {
154 closeQuietly(connection.socket());
155 }
156 }
157
158
165 long cleanup(long now) {
166 int inUseConnectionCount = 0;
167 int idleConnectionCount = 0;
168 RealConnection longestIdleConnection = null;
169 long longestIdleDurationNs = Long.MIN_VALUE;
170
171
172 synchronized (this) {
173 for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
174 RealConnection connection = i.next();
175
176
177 if (pruneAndGetAllocationCount(connection, now) > 0) {
178 inUseConnectionCount++;
179 continue;
180 }
181
182 idleConnectionCount++;
183
184
185 long idleDurationNs = now - connection.idleAtNanos;
186 if (idleDurationNs > longestIdleDurationNs) {
187 longestIdleDurationNs = idleDurationNs;
188 longestIdleConnection = connection;
189 }
190 }
191
192 if (longestIdleDurationNs >= this.keepAliveDurationNs
193 || idleConnectionCount > this.maxIdleConnections) {
194
195
196 connections.remove(longestIdleConnection);
197 } else if (idleConnectionCount > 0) {
198
199 return keepAliveDurationNs - longestIdleDurationNs;
200 } else if (inUseConnectionCount > 0) {
201
202 return keepAliveDurationNs;
203 } else {
204
205 cleanupRunning = false;
206 return -1;
207 }
208 }
209
210 closeQuietly(longestIdleConnection.socket());
211
212
213 return 0;
214 }
215
216
222 private int pruneAndGetAllocationCount(RealConnection connection, long now) {
223 List<Reference<Transmitter>> references = connection.transmitters;
224 for (int i = 0; i < references.size(); ) {
225 Reference<Transmitter> reference = references.get(i);
226
227 if (reference.get() != null) {
228 i++;
229 continue;
230 }
231
232
233 TransmitterReference transmitterRef = (TransmitterReference) reference;
234 String message = "A connection to " + connection.route().address().url()
235 + " was leaked. Did you forget to close a response body?";
236 Platform.get().logCloseableLeak(message, transmitterRef.callStackTrace);
237
238 references.remove(i);
239 connection.noNewExchanges = true;
240
241
242 if (references.isEmpty()) {
243 connection.idleAtNanos = now - keepAliveDurationNs;
244 return 0;
245 }
246 }
247
248 return references.size();
249 }
250
251
252 public void connectFailed(Route failedRoute, IOException failure) {
253
254 if (failedRoute.proxy().type() != Proxy.Type.DIRECT) {
255 Address address = failedRoute.address();
256 address.proxySelector().connectFailed(
257 address.url().uri(), failedRoute.proxy().address(), failure);
258 }
259
260 routeDatabase.failed(failedRoute);
261 }
262 }
263