1
16 package okhttp3;
17
18 import java.util.ArrayDeque;
19 import java.util.ArrayList;
20 import java.util.Collections;
21 import java.util.Deque;
22 import java.util.Iterator;
23 import java.util.List;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.SynchronousQueue;
26 import java.util.concurrent.ThreadPoolExecutor;
27 import java.util.concurrent.TimeUnit;
28 import javax.annotation.Nullable;
29 import okhttp3.RealCall.AsyncCall;
30 import okhttp3.internal.Util;
31
32
39 public final class Dispatcher {
40 private int maxRequests = 64;
41 private int maxRequestsPerHost = 5;
42 private @Nullable Runnable idleCallback;
43
44
45 private @Nullable ExecutorService executorService;
46
47
48 private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
49
50
51 private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
52
53
54 private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
55
56 public Dispatcher(ExecutorService executorService) {
57 this.executorService = executorService;
58 }
59
60 public Dispatcher() {
61 }
62
63 public synchronized ExecutorService executorService() {
64 if (executorService == null) {
65 executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
66 new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher", false));
67 }
68 return executorService;
69 }
70
71
78 public void setMaxRequests(int maxRequests) {
79 if (maxRequests < 1) {
80 throw new IllegalArgumentException("max < 1: " + maxRequests);
81 }
82 synchronized (this) {
83 this.maxRequests = maxRequests;
84 }
85 promoteAndExecute();
86 }
87
88 public synchronized int getMaxRequests() {
89 return maxRequests;
90 }
91
92
103 public void setMaxRequestsPerHost(int maxRequestsPerHost) {
104 if (maxRequestsPerHost < 1) {
105 throw new IllegalArgumentException("max < 1: " + maxRequestsPerHost);
106 }
107 synchronized (this) {
108 this.maxRequestsPerHost = maxRequestsPerHost;
109 }
110 promoteAndExecute();
111 }
112
113 public synchronized int getMaxRequestsPerHost() {
114 return maxRequestsPerHost;
115 }
116
117
129 public synchronized void setIdleCallback(@Nullable Runnable idleCallback) {
130 this.idleCallback = idleCallback;
131 }
132
133 void enqueue(AsyncCall call) {
134 synchronized (this) {
135 readyAsyncCalls.add(call);
136
137
138
139 if (!call.get().forWebSocket) {
140 AsyncCall existingCall = findExistingCallWithHost(call.host());
141 if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);
142 }
143 }
144 promoteAndExecute();
145 }
146
147 @Nullable private AsyncCall findExistingCallWithHost(String host) {
148 for (AsyncCall existingCall : runningAsyncCalls) {
149 if (existingCall.host().equals(host)) return existingCall;
150 }
151 for (AsyncCall existingCall : readyAsyncCalls) {
152 if (existingCall.host().equals(host)) return existingCall;
153 }
154 return null;
155 }
156
157
161 public synchronized void cancelAll() {
162 for (AsyncCall call : readyAsyncCalls) {
163 call.get().cancel();
164 }
165
166 for (AsyncCall call : runningAsyncCalls) {
167 call.get().cancel();
168 }
169
170 for (RealCall call : runningSyncCalls) {
171 call.cancel();
172 }
173 }
174
175
182 private boolean promoteAndExecute() {
183 assert (!Thread.holdsLock(this));
184
185 List<AsyncCall> executableCalls = new ArrayList<>();
186 boolean isRunning;
187 synchronized (this) {
188 for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
189 AsyncCall asyncCall = i.next();
190
191 if (runningAsyncCalls.size() >= maxRequests) break;
192 if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue;
193
194 i.remove();
195 asyncCall.callsPerHost().incrementAndGet();
196 executableCalls.add(asyncCall);
197 runningAsyncCalls.add(asyncCall);
198 }
199 isRunning = runningCallsCount() > 0;
200 }
201
202 for (int i = 0, size = executableCalls.size(); i < size; i++) {
203 AsyncCall asyncCall = executableCalls.get(i);
204 asyncCall.executeOn(executorService());
205 }
206
207 return isRunning;
208 }
209
210
211 synchronized void executed(RealCall call) {
212 runningSyncCalls.add(call);
213 }
214
215
216 void finished(AsyncCall call) {
217 call.callsPerHost().decrementAndGet();
218 finished(runningAsyncCalls, call);
219 }
220
221
222 void finished(RealCall call) {
223 finished(runningSyncCalls, call);
224 }
225
226 private <T> void finished(Deque<T> calls, T call) {
227 Runnable idleCallback;
228 synchronized (this) {
229 if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
230 idleCallback = this.idleCallback;
231 }
232
233 boolean isRunning = promoteAndExecute();
234
235 if (!isRunning && idleCallback != null) {
236 idleCallback.run();
237 }
238 }
239
240
241 public synchronized List<Call> queuedCalls() {
242 List<Call> result = new ArrayList<>();
243 for (AsyncCall asyncCall : readyAsyncCalls) {
244 result.add(asyncCall.get());
245 }
246 return Collections.unmodifiableList(result);
247 }
248
249
250 public synchronized List<Call> runningCalls() {
251 List<Call> result = new ArrayList<>();
252 result.addAll(runningSyncCalls);
253 for (AsyncCall asyncCall : runningAsyncCalls) {
254 result.add(asyncCall.get());
255 }
256 return Collections.unmodifiableList(result);
257 }
258
259 public synchronized int queuedCallsCount() {
260 return readyAsyncCalls.size();
261 }
262
263 public synchronized int runningCallsCount() {
264 return runningAsyncCalls.size() + runningSyncCalls.size();
265 }
266 }
267