1 /*
2  * Copyright (C) 2013 Square, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */

16 package okhttp3;
17
18 import java.util.ArrayDeque;
19 import java.util.ArrayList;
20 import java.util.Collections;
21 import java.util.Deque;
22 import java.util.Iterator;
23 import java.util.List;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.SynchronousQueue;
26 import java.util.concurrent.ThreadPoolExecutor;
27 import java.util.concurrent.TimeUnit;
28 import javax.annotation.Nullable;
29 import okhttp3.RealCall.AsyncCall;
30 import okhttp3.internal.Util;
31
32 /**
33  * Policy on when async requests are executed.
34  *
35  * <p>Each dispatcher uses an {@link ExecutorService} to run calls internally. If you supply your
36  * own executor, it should be able to run {@linkplain #getMaxRequests the configured maximum} number
37  * of calls concurrently.
38  */

39 public final class Dispatcher {
40   private int maxRequests = 64;
41   private int maxRequestsPerHost = 5;
42   private @Nullable Runnable idleCallback;
43
44   /** Executes calls. Created lazily. */
45   private @Nullable ExecutorService executorService;
46
47   /** Ready async calls in the order they'll be run. */
48   private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
49
50   /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
51   private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
52
53   /** Running synchronous calls. Includes canceled calls that haven't finished yet. */
54   private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
55
56   public Dispatcher(ExecutorService executorService) {
57     this.executorService = executorService;
58   }
59
60   public Dispatcher() {
61   }
62
63   public synchronized ExecutorService executorService() {
64     if (executorService == null) {
65       executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
66           new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher"false));
67     }
68     return executorService;
69   }
70
71   /**
72    * Set the maximum number of requests to execute concurrently. Above this requests queue in
73    * memory, waiting for the running calls to complete.
74    *
75    * <p>If more than {@code maxRequests} requests are in flight when this is invoked, those requests
76    * will remain in flight.
77    */

78   public void setMaxRequests(int maxRequests) {
79     if (maxRequests < 1) {
80       throw new IllegalArgumentException("max < 1: " + maxRequests);
81     }
82     synchronized (this) {
83       this.maxRequests = maxRequests;
84     }
85     promoteAndExecute();
86   }
87
88   public synchronized int getMaxRequests() {
89     return maxRequests;
90   }
91
92   /**
93    * Set the maximum number of requests for each host to execute concurrently. This limits requests
94    * by the URL's host name. Note that concurrent requests to a single IP address may still exceed
95    * this limit: multiple hostnames may share an IP address or be routed through the same HTTP
96    * proxy.
97    *
98    * <p>If more than {@code maxRequestsPerHost} requests are in flight when this is invoked, those
99    * requests will remain in flight.
100    *
101    * <p>WebSocket connections to hosts <b>do not</b> count against this limit.
102    */

103   public void setMaxRequestsPerHost(int maxRequestsPerHost) {
104     if (maxRequestsPerHost < 1) {
105       throw new IllegalArgumentException("max < 1: " + maxRequestsPerHost);
106     }
107     synchronized (this) {
108       this.maxRequestsPerHost = maxRequestsPerHost;
109     }
110     promoteAndExecute();
111   }
112
113   public synchronized int getMaxRequestsPerHost() {
114     return maxRequestsPerHost;
115   }
116
117   /**
118    * Set a callback to be invoked each time the dispatcher becomes idle (when the number of running
119    * calls returns to zero).
120    *
121    * <p>Note: The time at which a {@linkplain Call call} is considered idle is different depending
122    * on whether it was run {@linkplain Call#enqueue(Callback) asynchronously} or
123    * {@linkplain Call#execute() synchronously}. Asynchronous calls become idle after the
124    * {@link Callback#onResponse onResponse} or {@link Callback#onFailure onFailure} callback has
125    * returned. Synchronous calls become idle once {@link Call#execute() execute()} returns. This
126    * means that if you are doing synchronous calls the network layer will not truly be idle until
127    * every returned {@link Response} has been closed.
128    */

129   public synchronized void setIdleCallback(@Nullable Runnable idleCallback) {
130     this.idleCallback = idleCallback;
131   }
132
133   void enqueue(AsyncCall call) {
134     synchronized (this) {
135       readyAsyncCalls.add(call);
136
137       // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
138       // the same host.
139       if (!call.get().forWebSocket) {
140         AsyncCall existingCall = findExistingCallWithHost(call.host());
141         if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);
142       }
143     }
144     promoteAndExecute();
145   }
146
147   @Nullable private AsyncCall findExistingCallWithHost(String host) {
148     for (AsyncCall existingCall : runningAsyncCalls) {
149       if (existingCall.host().equals(host)) return existingCall;
150     }
151     for (AsyncCall existingCall : readyAsyncCalls) {
152       if (existingCall.host().equals(host)) return existingCall;
153     }
154     return null;
155   }
156
157   /**
158    * Cancel all calls currently enqueued or executing. Includes calls executed both {@linkplain
159    * Call#execute() synchronously} and {@linkplain Call#enqueue asynchronously}.
160    */

161   public synchronized void cancelAll() {
162     for (AsyncCall call : readyAsyncCalls) {
163       call.get().cancel();
164     }
165
166     for (AsyncCall call : runningAsyncCalls) {
167       call.get().cancel();
168     }
169
170     for (RealCall call : runningSyncCalls) {
171       call.cancel();
172     }
173   }
174
175   /**
176    * Promotes eligible calls from {@link #readyAsyncCalls} to {@link #runningAsyncCalls} and runs
177    * them on the executor service. Must not be called with synchronization because executing calls
178    * can call into user code.
179    *
180    * @return true if the dispatcher is currently running calls.
181    */

182   private boolean promoteAndExecute() {
183     assert (!Thread.holdsLock(this));
184
185     List<AsyncCall> executableCalls = new ArrayList<>();
186     boolean isRunning;
187     synchronized (this) {
188       for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
189         AsyncCall asyncCall = i.next();
190
191         if (runningAsyncCalls.size() >= maxRequests) break// Max capacity.
192         if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue// Host max capacity.
193
194         i.remove();
195         asyncCall.callsPerHost().incrementAndGet();
196         executableCalls.add(asyncCall);
197         runningAsyncCalls.add(asyncCall);
198       }
199       isRunning = runningCallsCount() > 0;
200     }
201
202     for (int i = 0, size = executableCalls.size(); i < size; i++) {
203       AsyncCall asyncCall = executableCalls.get(i);
204       asyncCall.executeOn(executorService());
205     }
206
207     return isRunning;
208   }
209
210   /** Used by {@code Call#execute} to signal it is in-flight. */
211   synchronized void executed(RealCall call) {
212     runningSyncCalls.add(call);
213   }
214
215   /** Used by {@code AsyncCall#run} to signal completion. */
216   void finished(AsyncCall call) {
217     call.callsPerHost().decrementAndGet();
218     finished(runningAsyncCalls, call);
219   }
220
221   /** Used by {@code Call#execute} to signal completion. */
222   void finished(RealCall call) {
223     finished(runningSyncCalls, call);
224   }
225
226   private <T> void finished(Deque<T> calls, T call) {
227     Runnable idleCallback;
228     synchronized (this) {
229       if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
230       idleCallback = this.idleCallback;
231     }
232
233     boolean isRunning = promoteAndExecute();
234
235     if (!isRunning && idleCallback != null) {
236       idleCallback.run();
237     }
238   }
239
240   /** Returns a snapshot of the calls currently awaiting execution. */
241   public synchronized List<Call> queuedCalls() {
242     List<Call> result = new ArrayList<>();
243     for (AsyncCall asyncCall : readyAsyncCalls) {
244       result.add(asyncCall.get());
245     }
246     return Collections.unmodifiableList(result);
247   }
248
249   /** Returns a snapshot of the calls currently being executed. */
250   public synchronized List<Call> runningCalls() {
251     List<Call> result = new ArrayList<>();
252     result.addAll(runningSyncCalls);
253     for (AsyncCall asyncCall : runningAsyncCalls) {
254       result.add(asyncCall.get());
255     }
256     return Collections.unmodifiableList(result);
257   }
258
259   public synchronized int queuedCallsCount() {
260     return readyAsyncCalls.size();
261   }
262
263   public synchronized int runningCallsCount() {
264     return runningAsyncCalls.size() + runningSyncCalls.size();
265   }
266 }
267