1 /*
2  * Copyright (C) 2019 Square, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */

16 package okhttp3.internal.connection;
17
18 import java.io.IOException;
19 import java.io.InterruptedIOException;
20 import java.lang.ref.Reference;
21 import java.lang.ref.WeakReference;
22 import java.net.Socket;
23 import javax.annotation.Nullable;
24 import javax.net.ssl.HostnameVerifier;
25 import javax.net.ssl.SSLSocketFactory;
26 import okhttp3.Address;
27 import okhttp3.Call;
28 import okhttp3.CertificatePinner;
29 import okhttp3.Connection;
30 import okhttp3.EventListener;
31 import okhttp3.HttpUrl;
32 import okhttp3.Interceptor;
33 import okhttp3.OkHttpClient;
34 import okhttp3.Request;
35 import okhttp3.internal.Internal;
36 import okhttp3.internal.http.ExchangeCodec;
37 import okhttp3.internal.platform.Platform;
38 import okio.AsyncTimeout;
39 import okio.Timeout;
40
41 import static java.util.concurrent.TimeUnit.MILLISECONDS;
42 import static okhttp3.internal.Util.closeQuietly;
43 import static okhttp3.internal.Util.sameConnection;
44
45 /**
46  * Bridge between OkHttp's application and network layers. This class exposes high-level application
47  * layer primitives: connections, requests, responses, and streams.
48  *
49  * <p>This class supports {@linkplain #cancel asynchronous canceling}. This is intended to have the
50  * smallest blast radius possible. If an HTTP/2 stream is active, canceling will cancel that stream
51  * but not the other streams sharing its connection. But if the TLS handshake is still in progress
52  * then canceling may break the entire connection.
53  */

54 public final class Transmitter {
55   private final OkHttpClient client;
56   private final RealConnectionPool connectionPool;
57   private final Call call;
58   private final EventListener eventListener;
59   private final AsyncTimeout timeout = new AsyncTimeout() {
60     @Override protected void timedOut() {
61       cancel();
62     }
63   };
64
65   private @Nullable Object callStackTrace;
66
67   private Request request;
68   private ExchangeFinder exchangeFinder;
69
70   // Guarded by connectionPool.
71   public RealConnection connection;
72   private @Nullable Exchange exchange;
73   private boolean exchangeRequestDone;
74   private boolean exchangeResponseDone;
75   private boolean canceled;
76   private boolean timeoutEarlyExit;
77   private boolean noMoreExchanges;
78
79   public Transmitter(OkHttpClient client, Call call) {
80     this.client = client;
81     this.connectionPool = Internal.instance.realConnectionPool(client.connectionPool());
82     this.call = call;
83     this.eventListener = client.eventListenerFactory().create(call);
84     this.timeout.timeout(client.callTimeoutMillis(), MILLISECONDS);
85   }
86
87   public Timeout timeout() {
88     return timeout;
89   }
90
91   public void timeoutEnter() {
92     timeout.enter();
93   }
94
95   /**
96    * Stops applying the timeout before the call is entirely complete. This is used for WebSockets
97    * and duplex calls where the timeout only applies to the initial setup.
98    */

99   public void timeoutEarlyExit() {
100     if (timeoutEarlyExit) throw new IllegalStateException();
101     timeoutEarlyExit = true;
102     timeout.exit();
103   }
104
105   private @Nullable IOException timeoutExit(@Nullable IOException cause) {
106     if (timeoutEarlyExit) return cause;
107     if (!timeout.exit()) return cause;
108
109     InterruptedIOException e = new InterruptedIOException("timeout");
110     if (cause != null) e.initCause(cause);
111
112     return e;
113   }
114
115   public void callStart() {
116     this.callStackTrace = Platform.get().getStackTraceForCloseable("response.body().close()");
117     eventListener.callStart(call);
118   }
119
120   /**
121    * Prepare to create a stream to carry {@code request}. This prefers to use the existing
122    * connection if it exists.
123    */

124   public void prepareToConnect(Request request) {
125     if (this.request != null) {
126       if (sameConnection(this.request.url(), request.url()) && exchangeFinder.hasRouteToTry()) {
127         return// Already ready.
128       }
129       if (exchange != nullthrow new IllegalStateException();
130
131       if (exchangeFinder != null) {
132         maybeReleaseConnection(nulltrue);
133         exchangeFinder = null;
134       }
135     }
136
137     this.request = request;
138     this.exchangeFinder = new ExchangeFinder(this, connectionPool, createAddress(request.url()),
139         call, eventListener);
140   }
141
142   private Address createAddress(HttpUrl url) {
143     SSLSocketFactory sslSocketFactory = null;
144     HostnameVerifier hostnameVerifier = null;
145     CertificatePinner certificatePinner = null;
146     if (url.isHttps()) {
147       sslSocketFactory = client.sslSocketFactory();
148       hostnameVerifier = client.hostnameVerifier();
149       certificatePinner = client.certificatePinner();
150     }
151
152     return new Address(url.host(), url.port(), client.dns(), client.socketFactory(),
153         sslSocketFactory, hostnameVerifier, certificatePinner, client.proxyAuthenticator(),
154         client.proxy(), client.protocols(), client.connectionSpecs(), client.proxySelector());
155   }
156
157   /** Returns a new exchange to carry a new request and response. */
158   Exchange newExchange(Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
159     synchronized (connectionPool) {
160       if (noMoreExchanges) {
161         throw new IllegalStateException("released");
162       }
163       if (exchange != null) {
164         throw new IllegalStateException("cannot make a new request because the previous response "
165             + "is still open: please call response.close()");
166       }
167     }
168
169     ExchangeCodec codec = exchangeFinder.find(client, chain, doExtensiveHealthChecks);
170     Exchange result = new Exchange(this, call, eventListener, exchangeFinder, codec);
171
172     synchronized (connectionPool) {
173       this.exchange = result;
174       this.exchangeRequestDone = false;
175       this.exchangeResponseDone = false;
176       return result;
177     }
178   }
179
180   void acquireConnectionNoEvents(RealConnection connection) {
181     assert (Thread.holdsLock(connectionPool));
182
183     if (this.connection != nullthrow new IllegalStateException();
184     this.connection = connection;
185     connection.transmitters.add(new TransmitterReference(this, callStackTrace));
186   }
187
188   /**
189    * Remove the transmitter from the connection's list of allocations. Returns a socket that the
190    * caller should close.
191    */

192   @Nullable Socket releaseConnectionNoEvents() {
193     assert (Thread.holdsLock(connectionPool));
194
195     int index = -1;
196     for (int i = 0, size = this.connection.transmitters.size(); i < size; i++) {
197       Reference<Transmitter> reference = this.connection.transmitters.get(i);
198       if (reference.get() == this) {
199         index = i;
200         break;
201       }
202     }
203
204     if (index == -1) throw new IllegalStateException();
205
206     RealConnection released = this.connection;
207     released.transmitters.remove(index);
208     this.connection = null;
209
210     if (released.transmitters.isEmpty()) {
211       released.idleAtNanos = System.nanoTime();
212       if (connectionPool.connectionBecameIdle(released)) {
213         return released.socket();
214       }
215     }
216
217     return null;
218   }
219
220   public void exchangeDoneDueToException() {
221     synchronized (connectionPool) {
222       if (noMoreExchanges) throw new IllegalStateException();
223       exchange = null;
224     }
225   }
226
227   /**
228    * Releases resources held with the request or response of {@code exchange}. This should be called
229    * when the request completes normally or when it fails due to an exception, in which case {@code
230    * e} should be non-null.
231    *
232    * <p>If the exchange was canceled or timed out, this will wrap {@code e} in an exception that
233    * provides that additional context. Otherwise {@code e} is returned as-is.
234    */

235   @Nullable IOException exchangeMessageDone(
236       Exchange exchange, boolean requestDone, boolean responseDone, @Nullable IOException e) {
237     boolean exchangeDone = false;
238     synchronized (connectionPool) {
239       if (exchange != this.exchange) {
240         return e; // This exchange was detached violently!
241       }
242       boolean changed = false;
243       if (requestDone) {
244         if (!exchangeRequestDone) changed = true;
245         this.exchangeRequestDone = true;
246       }
247       if (responseDone) {
248         if (!exchangeResponseDone) changed = true;
249         this.exchangeResponseDone = true;
250       }
251       if (exchangeRequestDone && exchangeResponseDone && changed) {
252         exchangeDone = true;
253         this.exchange.connection().successCount++;
254         this.exchange = null;
255       }
256     }
257     if (exchangeDone) {
258       e = maybeReleaseConnection(e, false);
259     }
260     return e;
261   }
262
263   public @Nullable IOException noMoreExchanges(@Nullable IOException e) {
264     synchronized (connectionPool) {
265       noMoreExchanges = true;
266     }
267     return maybeReleaseConnection(e, false);
268   }
269
270   /**
271    * Release the connection if it is no longer needed. This is called after each exchange completes
272    * and after the call signals that no more exchanges are expected.
273    *
274    * <p>If the transmitter was canceled or timed out, this will wrap {@code e} in an exception that
275    * provides that additional context. Otherwise {@code e} is returned as-is.
276    *
277    * @param force true to release the connection even if more exchanges are expected for the call.
278    */

279   private @Nullable IOException maybeReleaseConnection(@Nullable IOException e, boolean force) {
280     Socket socket;
281     Connection releasedConnection;
282     boolean callEnd;
283     synchronized (connectionPool) {
284       if (force && exchange != null) {
285         throw new IllegalStateException("cannot release connection while it is in use");
286       }
287       releasedConnection = this.connection;
288       socket = this.connection != null && exchange == null && (force || noMoreExchanges)
289           ? releaseConnectionNoEvents()
290           : null;
291       if (this.connection != null) releasedConnection = null;
292       callEnd = noMoreExchanges && exchange == null;
293     }
294     closeQuietly(socket);
295
296     if (releasedConnection != null) {
297       eventListener.connectionReleased(call, releasedConnection);
298     }
299
300     if (callEnd) {
301       boolean callFailed = (e != null);
302       e = timeoutExit(e);
303       if (callFailed) {
304         eventListener.callFailed(call, e);
305       } else {
306         eventListener.callEnd(call);
307       }
308     }
309     return e;
310   }
311
312   public boolean canRetry() {
313     return exchangeFinder.hasStreamFailure() && exchangeFinder.hasRouteToTry();
314   }
315
316   public boolean hasExchange() {
317     synchronized (connectionPool) {
318       return exchange != null;
319     }
320   }
321
322   /**
323    * Immediately closes the socket connection if it's currently held. Use this to interrupt an
324    * in-flight request from any thread. It's the caller's responsibility to close the request body
325    * and response body streams; otherwise resources may be leaked.
326    *
327    * <p>This method is safe to be called concurrently, but provides limited guarantees. If a
328    * transport layer connection has been established (such as a HTTP/2 stream) that is terminated.
329    * Otherwise if a socket connection is being established, that is terminated.
330    */

331   public void cancel() {
332     Exchange exchangeToCancel;
333     RealConnection connectionToCancel;
334     synchronized (connectionPool) {
335       canceled = true;
336       exchangeToCancel = exchange;
337       connectionToCancel = exchangeFinder != null && exchangeFinder.connectingConnection() != null
338           ? exchangeFinder.connectingConnection()
339           : connection;
340     }
341     if (exchangeToCancel != null) {
342       exchangeToCancel.cancel();
343     } else if (connectionToCancel != null) {
344       connectionToCancel.cancel();
345     }
346   }
347
348   public boolean isCanceled() {
349     synchronized (connectionPool) {
350       return canceled;
351     }
352   }
353
354   static final class TransmitterReference extends WeakReference<Transmitter> {
355     /**
356      * Captures the stack trace at the time the Call is executed or enqueued. This is helpful for
357      * identifying the origin of connection leaks.
358      */

359     final Object callStackTrace;
360
361     TransmitterReference(Transmitter referent, Object callStackTrace) {
362       super(referent);
363       this.callStackTrace = callStackTrace;
364     }
365   }
366 }
367