1 /*
2  *  Licensed to the Apache Software Foundation (ASF) under one or more
3  *  contributor license agreements.  See the NOTICE file distributed with
4  *  this work for additional information regarding copyright ownership.
5  *  The ASF licenses this file to You under the Apache License, Version 2.0
6  *  (the "License"); you may not use this file except in compliance with
7  *  the License.  You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an "AS IS" BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  */

17 package okhttp3.internal.connection;
18
19 import java.io.IOException;
20 import java.lang.ref.Reference;
21 import java.net.Proxy;
22 import java.util.ArrayDeque;
23 import java.util.ArrayList;
24 import java.util.Deque;
25 import java.util.Iterator;
26 import java.util.List;
27 import java.util.concurrent.Executor;
28 import java.util.concurrent.SynchronousQueue;
29 import java.util.concurrent.ThreadPoolExecutor;
30 import java.util.concurrent.TimeUnit;
31 import javax.annotation.Nullable;
32 import okhttp3.Address;
33 import okhttp3.Route;
34 import okhttp3.internal.Util;
35 import okhttp3.internal.connection.Transmitter.TransmitterReference;
36 import okhttp3.internal.platform.Platform;
37
38 import static okhttp3.internal.Util.closeQuietly;
39
40 public final class RealConnectionPool {
41   /**
42    * Background threads are used to cleanup expired connections. There will be at most a single
43    * thread running per connection pool. The thread pool executor permits the pool itself to be
44    * garbage collected.
45    */

46   private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
47       Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
48       new SynchronousQueue<>(), Util.threadFactory("OkHttp ConnectionPool"true));
49
50   /** The maximum number of idle connections for each address. */
51   private final int maxIdleConnections;
52   private final long keepAliveDurationNs;
53   private final Runnable cleanupRunnable = () -> {
54     while (true) {
55       long waitNanos = cleanup(System.nanoTime());
56       if (waitNanos == -1) return;
57       if (waitNanos > 0) {
58         long waitMillis = waitNanos / 1000000L;
59         waitNanos -= (waitMillis * 1000000L);
60         synchronized (RealConnectionPool.this) {
61           try {
62             RealConnectionPool.this.wait(waitMillis, (int) waitNanos);
63           } catch (InterruptedException ignored) {
64           }
65         }
66       }
67     }
68   };
69
70   private final Deque<RealConnection> connections = new ArrayDeque<>();
71   final RouteDatabase routeDatabase = new RouteDatabase();
72   boolean cleanupRunning;
73
74   public RealConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
75     this.maxIdleConnections = maxIdleConnections;
76     this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration);
77
78     // Put a floor on the keep alive duration, otherwise cleanup will spin loop.
79     if (keepAliveDuration <= 0) {
80       throw new IllegalArgumentException("keepAliveDuration <= 0: " + keepAliveDuration);
81     }
82   }
83
84   public synchronized int idleConnectionCount() {
85     int total = 0;
86     for (RealConnection connection : connections) {
87       if (connection.transmitters.isEmpty()) total++;
88     }
89     return total;
90   }
91
92   public synchronized int connectionCount() {
93     return connections.size();
94   }
95
96   /**
97    * Attempts to acquire a recycled connection to {@code address} for {@code transmitter}. Returns
98    * true if a connection was acquired.
99    *
100    * <p>If {@code routes} is non-null these are the resolved routes (ie. IP addresses) for the
101    * connection. This is used to coalesce related domains to the same HTTP/2 connection, such as
102    * {@code square.com} and {@code square.ca}.
103    */

104   boolean transmitterAcquirePooledConnection(Address address, Transmitter transmitter,
105       @Nullable List<Route> routes, boolean requireMultiplexed) {
106     assert (Thread.holdsLock(this));
107     for (RealConnection connection : connections) {
108       if (requireMultiplexed && !connection.isMultiplexed()) continue;
109       if (!connection.isEligible(address, routes)) continue;
110       transmitter.acquireConnectionNoEvents(connection);
111       return true;
112     }
113     return false;
114   }
115
116   void put(RealConnection connection) {
117     assert (Thread.holdsLock(this));
118     if (!cleanupRunning) {
119       cleanupRunning = true;
120       executor.execute(cleanupRunnable);
121     }
122     connections.add(connection);
123   }
124
125   /**
126    * Notify this pool that {@code connection} has become idle. Returns true if the connection has
127    * been removed from the pool and should be closed.
128    */

129   boolean connectionBecameIdle(RealConnection connection) {
130     assert (Thread.holdsLock(this));
131     if (connection.noNewExchanges || maxIdleConnections == 0) {
132       connections.remove(connection);
133       return true;
134     } else {
135       notifyAll(); // Awake the cleanup thread: we may have exceeded the idle connection limit.
136       return false;
137     }
138   }
139
140   public void evictAll() {
141     List<RealConnection> evictedConnections = new ArrayList<>();
142     synchronized (this) {
143       for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
144         RealConnection connection = i.next();
145         if (connection.transmitters.isEmpty()) {
146           connection.noNewExchanges = true;
147           evictedConnections.add(connection);
148           i.remove();
149         }
150       }
151     }
152
153     for (RealConnection connection : evictedConnections) {
154       closeQuietly(connection.socket());
155     }
156   }
157
158   /**
159    * Performs maintenance on this pool, evicting the connection that has been idle the longest if
160    * either it has exceeded the keep alive limit or the idle connections limit.
161    *
162    * <p>Returns the duration in nanos to sleep until the next scheduled call to this method. Returns
163    * -1 if no further cleanups are required.
164    */

165   long cleanup(long now) {
166     int inUseConnectionCount = 0;
167     int idleConnectionCount = 0;
168     RealConnection longestIdleConnection = null;
169     long longestIdleDurationNs = Long.MIN_VALUE;
170
171     // Find either a connection to evict, or the time that the next eviction is due.
172     synchronized (this) {
173       for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
174         RealConnection connection = i.next();
175
176         // If the connection is in use, keep searching.
177         if (pruneAndGetAllocationCount(connection, now) > 0) {
178           inUseConnectionCount++;
179           continue;
180         }
181
182         idleConnectionCount++;
183
184         // If the connection is ready to be evicted, we're done.
185         long idleDurationNs = now - connection.idleAtNanos;
186         if (idleDurationNs > longestIdleDurationNs) {
187           longestIdleDurationNs = idleDurationNs;
188           longestIdleConnection = connection;
189         }
190       }
191
192       if (longestIdleDurationNs >= this.keepAliveDurationNs
193           || idleConnectionCount > this.maxIdleConnections) {
194         // We've found a connection to evict. Remove it from the list, then close it below (outside
195         // of the synchronized block).
196         connections.remove(longestIdleConnection);
197       } else if (idleConnectionCount > 0) {
198         // A connection will be ready to evict soon.
199         return keepAliveDurationNs - longestIdleDurationNs;
200       } else if (inUseConnectionCount > 0) {
201         // All connections are in use. It'll be at least the keep alive duration 'til we run again.
202         return keepAliveDurationNs;
203       } else {
204         // No connections, idle or in use.
205         cleanupRunning = false;
206         return -1;
207       }
208     }
209
210     closeQuietly(longestIdleConnection.socket());
211
212     // Cleanup again immediately.
213     return 0;
214   }
215
216   /**
217    * Prunes any leaked transmitters and then returns the number of remaining live transmitters on
218    * {@code connection}. Transmitters are leaked if the connection is tracking them but the
219    * application code has abandoned them. Leak detection is imprecise and relies on garbage
220    * collection.
221    */

222   private int pruneAndGetAllocationCount(RealConnection connection, long now) {
223     List<Reference<Transmitter>> references = connection.transmitters;
224     for (int i = 0; i < references.size(); ) {
225       Reference<Transmitter> reference = references.get(i);
226
227       if (reference.get() != null) {
228         i++;
229         continue;
230       }
231
232       // We've discovered a leaked transmitter. This is an application bug.
233       TransmitterReference transmitterRef = (TransmitterReference) reference;
234       String message = "A connection to " + connection.route().address().url()
235           + " was leaked. Did you forget to close a response body?";
236       Platform.get().logCloseableLeak(message, transmitterRef.callStackTrace);
237
238       references.remove(i);
239       connection.noNewExchanges = true;
240
241       // If this was the last allocation, the connection is eligible for immediate eviction.
242       if (references.isEmpty()) {
243         connection.idleAtNanos = now - keepAliveDurationNs;
244         return 0;
245       }
246     }
247
248     return references.size();
249   }
250
251   /** Track a bad route in the route database. Other routes will be attempted first. */
252   public void connectFailed(Route failedRoute, IOException failure) {
253     // Tell the proxy selector when we fail to connect on a fresh connection.
254     if (failedRoute.proxy().type() != Proxy.Type.DIRECT) {
255       Address address = failedRoute.address();
256       address.proxySelector().connectFailed(
257           address.url().uri(), failedRoute.proxy().address(), failure);
258     }
259
260     routeDatabase.failed(failedRoute);
261   }
262 }
263