1 /*
2  * Copyright (C) 2011 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */

16 package okhttp3.internal.http2;
17
18 import java.io.Closeable;
19 import java.io.IOException;
20 import java.io.InterruptedIOException;
21 import java.net.InetSocketAddress;
22 import java.net.Socket;
23 import java.net.SocketAddress;
24 import java.util.LinkedHashMap;
25 import java.util.LinkedHashSet;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Set;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.LinkedBlockingQueue;
31 import java.util.concurrent.RejectedExecutionException;
32 import java.util.concurrent.ScheduledExecutorService;
33 import java.util.concurrent.ScheduledThreadPoolExecutor;
34 import java.util.concurrent.SynchronousQueue;
35 import java.util.concurrent.ThreadPoolExecutor;
36 import java.util.concurrent.TimeUnit;
37 import javax.annotation.Nullable;
38 import okhttp3.Headers;
39 import okhttp3.internal.NamedRunnable;
40 import okhttp3.internal.Util;
41 import okhttp3.internal.platform.Platform;
42 import okio.Buffer;
43 import okio.BufferedSink;
44 import okio.BufferedSource;
45 import okio.ByteString;
46 import okio.Okio;
47
48 import static java.util.concurrent.TimeUnit.MILLISECONDS;
49 import static okhttp3.internal.http2.ErrorCode.REFUSED_STREAM;
50 import static okhttp3.internal.http2.Settings.DEFAULT_INITIAL_WINDOW_SIZE;
51 import static okhttp3.internal.platform.Platform.INFO;
52
53 /**
54  * A socket connection to a remote peer. A connection hosts streams which can send and receive
55  * data.
56  *
57  * <p>Many methods in this API are <strong>synchronous:</strong> the call is completed before the
58  * method returns. This is typical for Java but atypical for HTTP/2. This is motivated by exception
59  * transparency: an IOException that was triggered by a certain caller can be caught and handled by
60  * that caller.
61  */

62 public final class Http2Connection implements Closeable {
63
64   // Internal state of this connection is guarded by 'this'. No blocking
65   // operations may be performed while holding this lock!
66   //
67   // Socket writes are guarded by frameWriter.
68   //
69   // Socket reads are unguarded but are only made by the reader thread.
70   //
71   // Certain operations (like SYN_STREAM) need to synchronize on both the
72   // frameWriter (to do blocking I/O) and this (to create streams). Such
73   // operations must synchronize on 'this' last. This ensures that we never
74   // wait for a blocking operation while holding 'this'.
75
76   static final int OKHTTP_CLIENT_WINDOW_SIZE = 16 * 1024 * 1024;
77
78   static final int INTERVAL_PING = 1;
79   static final int DEGRADED_PING = 2;
80   static final int AWAIT_PING = 3;
81   static final long DEGRADED_PONG_TIMEOUT_NS = 1_000_000_000L; // 1 second.
82
83   /**
84    * Shared executor to send notifications of incoming streams. This executor requires multiple
85    * threads because listeners are not required to return promptly.
86    */

87   private static final ExecutorService listenerExecutor = new ThreadPoolExecutor(0,
88       Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<>(),
89       Util.threadFactory("OkHttp Http2Connection"true));
90
91   /** True if this peer initiated the connection. */
92   final boolean client;
93
94   /**
95    * User code to run in response to incoming streams or settings. Calls to this are always invoked
96    * on {@link #listenerExecutor}.
97    */

98   final Listener listener;
99   final Map<Integer, Http2Stream> streams = new LinkedHashMap<>();
100   final String connectionName;
101   int lastGoodStreamId;
102   int nextStreamId;
103   private boolean shutdown;
104
105   /** Asynchronously writes frames to the outgoing socket. */
106   private final ScheduledExecutorService writerExecutor;
107
108   /** Ensures push promise callbacks events are sent in order per stream. */
109   private final ExecutorService pushExecutor;
110
111   /** User code to run in response to push promise events. */
112   final PushObserver pushObserver;
113
114   // Total number of pings send and received of the corresponding types. All guarded by this.
115   private long intervalPingsSent = 0L;
116   private long intervalPongsReceived = 0L;
117   private long degradedPingsSent = 0L;
118   private long degradedPongsReceived = 0L;
119   private long awaitPingsSent = 0L;
120   private long awaitPongsReceived = 0L;
121
122   /** Consider this connection to be unhealthy if a degraded pong isn't received by this time. */
123   private long degradedPongDeadlineNs = 0L;
124
125   /**
126    * The total number of bytes consumed by the application, but not yet acknowledged by sending a
127    * {@code WINDOW_UPDATE} frame on this connection.
128    */

129   // Visible for testing
130   long unacknowledgedBytesRead = 0;
131
132   /**
133    * Count of bytes that can be written on the connection before receiving a window update.
134    */

135   // Visible for testing
136   long bytesLeftInWriteWindow;
137
138   /** Settings we communicate to the peer. */
139   Settings okHttpSettings = new Settings();
140
141   /** Settings we receive from the peer. */
142   // TODO: MWS will need to guard on this setting before attempting to push.
143   final Settings peerSettings = new Settings();
144
145   final Socket socket;
146   final Http2Writer writer;
147
148   // Visible for testing
149   final ReaderRunnable readerRunnable;
150
151   Http2Connection(Builder builder) {
152     pushObserver = builder.pushObserver;
153     client = builder.client;
154     listener = builder.listener;
155     // http://tools.ietf.org/html/draft-ietf-httpbis-http2-17#section-5.1.1
156     nextStreamId = builder.client ? 1 : 2;
157     if (builder.client) {
158       nextStreamId += 2; // In HTTP/2, 1 on client is reserved for Upgrade.
159     }
160
161     // Flow control was designed more for servers, or proxies than edge clients.
162     // If we are a client, set the flow control window to 16MiB.  This avoids
163     // thrashing window updates every 64KiB, yet small enough to avoid blowing
164     // up the heap.
165     if (builder.client) {
166       okHttpSettings.set(Settings.INITIAL_WINDOW_SIZE, OKHTTP_CLIENT_WINDOW_SIZE);
167     }
168
169     connectionName = builder.connectionName;
170
171     writerExecutor = new ScheduledThreadPoolExecutor(1,
172         Util.threadFactory(Util.format("OkHttp %s Writer", connectionName), false));
173     if (builder.pingIntervalMillis != 0) {
174       writerExecutor.scheduleAtFixedRate(new IntervalPingRunnable(),
175           builder.pingIntervalMillis, builder.pingIntervalMillis, MILLISECONDS);
176     }
177
178     // Like newSingleThreadExecutor, except lazy creates the thread.
179     pushExecutor = new ThreadPoolExecutor(0, 1, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),
180         Util.threadFactory(Util.format("OkHttp %s Push Observer", connectionName), true));
181     peerSettings.set(Settings.INITIAL_WINDOW_SIZE, DEFAULT_INITIAL_WINDOW_SIZE);
182     peerSettings.set(Settings.MAX_FRAME_SIZE, Http2.INITIAL_MAX_FRAME_SIZE);
183     bytesLeftInWriteWindow = peerSettings.getInitialWindowSize();
184     socket = builder.socket;
185     writer = new Http2Writer(builder.sink, client);
186
187     readerRunnable = new ReaderRunnable(new Http2Reader(builder.source, client));
188   }
189
190   /**
191    * Returns the number of {@link Http2Stream#isOpen() open streams} on this connection.
192    */

193   public synchronized int openStreamCount() {
194     return streams.size();
195   }
196
197   synchronized Http2Stream getStream(int id) {
198     return streams.get(id);
199   }
200
201   synchronized Http2Stream removeStream(int streamId) {
202     Http2Stream stream = streams.remove(streamId);
203     notifyAll(); // The removed stream may be blocked on a connection-wide window update.
204     return stream;
205   }
206
207   public synchronized int maxConcurrentStreams() {
208     return peerSettings.getMaxConcurrentStreams(Integer.MAX_VALUE);
209   }
210
211   synchronized void updateConnectionFlowControl(long read) {
212     unacknowledgedBytesRead += read;
213     if (unacknowledgedBytesRead >= okHttpSettings.getInitialWindowSize() / 2) {
214       writeWindowUpdateLater(0, unacknowledgedBytesRead);
215       unacknowledgedBytesRead = 0;
216     }
217   }
218
219   /**
220    * Returns a new server-initiated stream.
221    *
222    * @param associatedStreamId the stream that triggered the sender to create this stream.
223    * @param out true to create an output stream that we can use to send data to the remote peer.
224    * Corresponds to {@code FLAG_FIN}.
225    */

226   public Http2Stream pushStream(int associatedStreamId, List<Header> requestHeaders, boolean out)
227       throws IOException {
228     if (client) throw new IllegalStateException("Client cannot push requests.");
229     return newStream(associatedStreamId, requestHeaders, out);
230   }
231
232   /**
233    * Returns a new locally-initiated stream.
234    * @param out true to create an output stream that we can use to send data to the remote peer.
235    * Corresponds to {@code FLAG_FIN}.
236    */

237   public Http2Stream newStream(List<Header> requestHeaders, boolean out) throws IOException {
238     return newStream(0, requestHeaders, out);
239   }
240
241   private Http2Stream newStream(
242       int associatedStreamId, List<Header> requestHeaders, boolean out) throws IOException {
243     boolean outFinished = !out;
244     boolean inFinished = false;
245     boolean flushHeaders;
246     Http2Stream stream;
247     int streamId;
248
249     synchronized (writer) {
250       synchronized (this) {
251         if (nextStreamId > Integer.MAX_VALUE / 2) {
252           shutdown(REFUSED_STREAM);
253         }
254         if (shutdown) {
255           throw new ConnectionShutdownException();
256         }
257         streamId = nextStreamId;
258         nextStreamId += 2;
259         stream = new Http2Stream(streamId, this, outFinished, inFinished, null);
260         flushHeaders = !out || bytesLeftInWriteWindow == 0L || stream.bytesLeftInWriteWindow == 0L;
261         if (stream.isOpen()) {
262           streams.put(streamId, stream);
263         }
264       }
265       if (associatedStreamId == 0) {
266         writer.headers(outFinished, streamId, requestHeaders);
267       } else if (client) {
268         throw new IllegalArgumentException("client streams shouldn't have associated stream IDs");
269       } else { // HTTP/2 has a PUSH_PROMISE frame.
270         writer.pushPromise(associatedStreamId, streamId, requestHeaders);
271       }
272     }
273
274     if (flushHeaders) {
275       writer.flush();
276     }
277
278     return stream;
279   }
280
281   void writeHeaders(int streamId, boolean outFinished, List<Header> alternating)
282       throws IOException {
283     writer.headers(outFinished, streamId, alternating);
284   }
285
286   /**
287    * Callers of this method are not thread safe, and sometimes on application threads. Most often,
288    * this method will be called to send a buffer worth of data to the peer.
289    *
290    * <p>Writes are subject to the write window of the stream and the connection. Until there is a
291    * window sufficient to send {@code byteCount}, the caller will block. For example, a user of
292    * {@code HttpURLConnection} who flushes more bytes to the output stream than the connection's
293    * write window will block.
294    *
295    * <p>Zero {@code byteCount} writes are not subject to flow control and will not block. The only
296    * use case for zero {@code byteCount} is closing a flushed output stream.
297    */

298   public void writeData(int streamId, boolean outFinished, Buffer buffer, long byteCount)
299       throws IOException {
300     if (byteCount == 0) { // Empty data frames are not flow-controlled.
301       writer.data(outFinished, streamId, buffer, 0);
302       return;
303     }
304
305     while (byteCount > 0) {
306       int toWrite;
307       synchronized (Http2Connection.this) {
308         try {
309           while (bytesLeftInWriteWindow <= 0) {
310             // Before blocking, confirm that the stream we're writing is still open. It's possible
311             // that the stream has since been closed (such as if this write timed out.)
312             if (!streams.containsKey(streamId)) {
313               throw new IOException("stream closed");
314             }
315             Http2Connection.this.wait(); // Wait until we receive a WINDOW_UPDATE.
316           }
317         } catch (InterruptedException e) {
318           Thread.currentThread().interrupt(); // Retain interrupted status.
319           throw new InterruptedIOException();
320         }
321
322         toWrite = (int) Math.min(byteCount, bytesLeftInWriteWindow);
323         toWrite = Math.min(toWrite, writer.maxDataLength());
324         bytesLeftInWriteWindow -= toWrite;
325       }
326
327       byteCount -= toWrite;
328       writer.data(outFinished && byteCount == 0, streamId, buffer, toWrite);
329     }
330   }
331
332   void writeSynResetLater(final int streamId, final ErrorCode errorCode) {
333     try {
334       writerExecutor.execute(new NamedRunnable("OkHttp %s stream %d", connectionName, streamId) {
335         @Override public void execute() {
336           try {
337             writeSynReset(streamId, errorCode);
338           } catch (IOException e) {
339             failConnection(e);
340           }
341         }
342       });
343     } catch (RejectedExecutionException ignored) {
344       // This connection has been closed.
345     }
346   }
347
348   void writeSynReset(int streamId, ErrorCode statusCode) throws IOException {
349     writer.rstStream(streamId, statusCode);
350   }
351
352   void writeWindowUpdateLater(final int streamId, final long unacknowledgedBytesRead) {
353     try {
354       writerExecutor.execute(
355           new NamedRunnable("OkHttp Window Update %s stream %d", connectionName, streamId) {
356             @Override public void execute() {
357               try {
358                 writer.windowUpdate(streamId, unacknowledgedBytesRead);
359               } catch (IOException e) {
360                 failConnection(e);
361               }
362             }
363           });
364     } catch (RejectedExecutionException ignored) {
365       // This connection has been closed.
366     }
367   }
368
369   final class PingRunnable extends NamedRunnable {
370     final boolean reply;
371     final int payload1;
372     final int payload2;
373
374     PingRunnable(boolean reply, int payload1, int payload2) {
375       super("OkHttp %s ping %08x%08x", connectionName, payload1, payload2);
376       this.reply = reply;
377       this.payload1 = payload1;
378       this.payload2 = payload2;
379     }
380
381     @Override public void execute() {
382       writePing(reply, payload1, payload2);
383     }
384   }
385
386   final class IntervalPingRunnable extends NamedRunnable {
387     IntervalPingRunnable() {
388       super("OkHttp %s ping", connectionName);
389     }
390
391     @Override public void execute() {
392       boolean failDueToMissingPong;
393       synchronized (Http2Connection.this) {
394         if (intervalPongsReceived < intervalPingsSent) {
395           failDueToMissingPong = true;
396         } else {
397           intervalPingsSent++;
398           failDueToMissingPong = false;
399         }
400       }
401       if (failDueToMissingPong) {
402         failConnection(null);
403       } else {
404         writePing(false, INTERVAL_PING, 0);
405       }
406     }
407   }
408
409   void writePing(boolean reply, int payload1, int payload2) {
410     try {
411       writer.ping(reply, payload1, payload2);
412     } catch (IOException e) {
413       failConnection(e);
414     }
415   }
416
417   /** For testing: sends a ping and waits for a pong. */
418   void writePingAndAwaitPong() throws InterruptedException {
419     writePing();
420     awaitPong();
421   }
422
423   /** For testing: sends a ping to be awaited with {@link #awaitPong}. */
424   void writePing() {
425     synchronized (this) {
426       awaitPingsSent++;
427     }
428     writePing(false, AWAIT_PING, 0x4f4b6f6b /* "OKok" */);
429   }
430
431   /** For testing: awaits a pong. */
432   synchronized void awaitPong() throws InterruptedException {
433     while (awaitPongsReceived < awaitPingsSent) {
434       wait();
435     }
436   }
437
438   public void flush() throws IOException {
439     writer.flush();
440   }
441
442   /**
443    * Degrades this connection such that new streams can neither be created locally, nor accepted
444    * from the remote peer. Existing streams are not impacted. This is intended to permit an endpoint
445    * to gracefully stop accepting new requests without harming previously established streams.
446    */

447   public void shutdown(ErrorCode statusCode) throws IOException {
448     synchronized (writer) {
449       int lastGoodStreamId;
450       synchronized (this) {
451         if (shutdown) {
452           return;
453         }
454         shutdown = true;
455         lastGoodStreamId = this.lastGoodStreamId;
456       }
457       // TODO: propagate exception message into debugData.
458       // TODO: configure a timeout on the reader so that it doesn’t block forever.
459       writer.goAway(lastGoodStreamId, statusCode, Util.EMPTY_BYTE_ARRAY);
460     }
461   }
462
463   /**
464    * Closes this connection. This cancels all open streams and unanswered pings. It closes the
465    * underlying input and output streams and shuts down internal executor services.
466    */

467   @Override public void close() {
468     close(ErrorCode.NO_ERROR, ErrorCode.CANCEL, null);
469   }
470
471   void close(ErrorCode connectionCode, ErrorCode streamCode, @Nullable IOException cause) {
472     assert (!Thread.holdsLock(this));
473     try {
474       shutdown(connectionCode);
475     } catch (IOException ignored) {
476     }
477
478     Http2Stream[] streamsToClose = null;
479     synchronized (this) {
480       if (!streams.isEmpty()) {
481         streamsToClose = streams.values().toArray(new Http2Stream[streams.size()]);
482         streams.clear();
483       }
484     }
485
486     if (streamsToClose != null) {
487       for (Http2Stream stream : streamsToClose) {
488         try {
489           stream.close(streamCode, cause);
490         } catch (IOException ignored) {
491         }
492       }
493     }
494
495     // Close the writer to release its resources (such as deflaters).
496     try {
497       writer.close();
498     } catch (IOException ignored) {
499     }
500
501     // Close the socket to break out the reader thread, which will clean up after itself.
502     try {
503       socket.close();
504     } catch (IOException ignored) {
505     }
506
507     // Release the threads.
508     writerExecutor.shutdown();
509     pushExecutor.shutdown();
510   }
511
512   private void failConnection(@Nullable IOException e) {
513     close(ErrorCode.PROTOCOL_ERROR, ErrorCode.PROTOCOL_ERROR, e);
514   }
515
516   /**
517    * Sends any initial frames and starts reading frames from the remote peer. This should be called
518    * after {@link Builder#build} for all new connections.
519    */

520   public void start() throws IOException {
521     start(true);
522   }
523
524   /**
525    * @param sendConnectionPreface true to send connection preface frames. This should always be true
526    *     except for in tests that don't check for a connection preface.
527    */

528   void start(boolean sendConnectionPreface) throws IOException {
529     if (sendConnectionPreface) {
530       writer.connectionPreface();
531       writer.settings(okHttpSettings);
532       int windowSize = okHttpSettings.getInitialWindowSize();
533       if (windowSize != Settings.DEFAULT_INITIAL_WINDOW_SIZE) {
534         writer.windowUpdate(0, windowSize - Settings.DEFAULT_INITIAL_WINDOW_SIZE);
535       }
536     }
537     new Thread(readerRunnable).start(); // Not a daemon thread.
538   }
539
540   /** Merges {@code settings} into this peer's settings and sends them to the remote peer. */
541   public void setSettings(Settings settings) throws IOException {
542     synchronized (writer) {
543       synchronized (this) {
544         if (shutdown) {
545           throw new ConnectionShutdownException();
546         }
547         okHttpSettings.merge(settings);
548       }
549       writer.settings(settings);
550     }
551   }
552
553   public synchronized boolean isHealthy(long nowNs) {
554     if (shutdown) return false;
555
556     // A degraded pong is overdue.
557     if (degradedPongsReceived < degradedPingsSent && nowNs >= degradedPongDeadlineNs) return false;
558
559     return true;
560   }
561
562   /**
563    * HTTP/2 can have both stream timeouts (due to a problem with a single stream) and connection
564    * timeouts (due to a problem with the transport). When a stream times out we don't know whether
565    * the problem impacts just one stream or the entire connection.
566    *
567    * <p>To differentiate the two cases we ping the server when a stream times out. If the overall
568    * connection is fine the ping will receive a pong; otherwise it won't.
569    *
570    * <p>The deadline to respond to this ping attempts to limit the cost of being wrong. If it is too
571    * long, streams created while we await the pong will reuse broken connections and inevitably
572    * fail. If it is too short, slow connections will be marked as failed and extra TCP and TLS
573    * handshakes will be required.
574    *
575    * <p>The deadline is currently hardcoded. We may make this configurable in the future!
576    */

577   void sendDegradedPingLater() {
578     synchronized (this) {
579       if (degradedPongsReceived < degradedPingsSent) return// Already awaiting a degraded pong.
580       degradedPingsSent++;
581       degradedPongDeadlineNs = System.nanoTime() + DEGRADED_PONG_TIMEOUT_NS;
582     }
583     try {
584       writerExecutor.execute(new NamedRunnable("OkHttp %s ping", connectionName) {
585         @Override public void execute() {
586           writePing(false, DEGRADED_PING, 0);
587         }
588       });
589     } catch (RejectedExecutionException ignored) {
590       // This connection has been closed.
591     }
592   }
593
594   public static class Builder {
595     Socket socket;
596     String connectionName;
597     BufferedSource source;
598     BufferedSink sink;
599     Listener listener = Listener.REFUSE_INCOMING_STREAMS;
600     PushObserver pushObserver = PushObserver.CANCEL;
601     boolean client;
602     int pingIntervalMillis;
603
604     /**
605      * @param client true if this peer initiated the connection; false if this peer accepted the
606      * connection.
607      */

608     public Builder(boolean client) {
609       this.client = client;
610     }
611
612     public Builder socket(Socket socket) throws IOException {
613       SocketAddress remoteSocketAddress = socket.getRemoteSocketAddress();
614       String connectionName = remoteSocketAddress instanceof InetSocketAddress
615           ? ((InetSocketAddress) remoteSocketAddress).getHostName()
616           : remoteSocketAddress.toString();
617       return socket(socket, connectionName,
618           Okio.buffer(Okio.source(socket)), Okio.buffer(Okio.sink(socket)));
619     }
620
621     public Builder socket(
622         Socket socket, String connectionName, BufferedSource source, BufferedSink sink) {
623       this.socket = socket;
624       this.connectionName = connectionName;
625       this.source = source;
626       this.sink = sink;
627       return this;
628     }
629
630     public Builder listener(Listener listener) {
631       this.listener = listener;
632       return this;
633     }
634
635     public Builder pushObserver(PushObserver pushObserver) {
636       this.pushObserver = pushObserver;
637       return this;
638     }
639
640     public Builder pingIntervalMillis(int pingIntervalMillis) {
641       this.pingIntervalMillis = pingIntervalMillis;
642       return this;
643     }
644
645     public Http2Connection build() {
646       return new Http2Connection(this);
647     }
648   }
649
650   /**
651    * Methods in this class must not lock FrameWriter.  If a method needs to write a frame, create an
652    * async task to do so.
653    */

654   class ReaderRunnable extends NamedRunnable implements Http2Reader.Handler {
655     final Http2Reader reader;
656
657     ReaderRunnable(Http2Reader reader) {
658       super("OkHttp %s", connectionName);
659       this.reader = reader;
660     }
661
662     @Override protected void execute() {
663       ErrorCode connectionErrorCode = ErrorCode.INTERNAL_ERROR;
664       ErrorCode streamErrorCode = ErrorCode.INTERNAL_ERROR;
665       IOException errorException = null;
666       try {
667         reader.readConnectionPreface(this);
668         while (reader.nextFrame(falsethis)) {
669         }
670         connectionErrorCode = ErrorCode.NO_ERROR;
671         streamErrorCode = ErrorCode.CANCEL;
672       } catch (IOException e) {
673         errorException = e;
674         connectionErrorCode = ErrorCode.PROTOCOL_ERROR;
675         streamErrorCode = ErrorCode.PROTOCOL_ERROR;
676       } finally {
677         close(connectionErrorCode, streamErrorCode, errorException);
678         Util.closeQuietly(reader);
679       }
680     }
681
682     @Override public void data(boolean inFinished, int streamId, BufferedSource source, int length)
683         throws IOException {
684       if (pushedStream(streamId)) {
685         pushDataLater(streamId, source, length, inFinished);
686         return;
687       }
688       Http2Stream dataStream = getStream(streamId);
689       if (dataStream == null) {
690         writeSynResetLater(streamId, ErrorCode.PROTOCOL_ERROR);
691         updateConnectionFlowControl(length);
692         source.skip(length);
693         return;
694       }
695       dataStream.receiveData(source, length);
696       if (inFinished) {
697         dataStream.receiveHeaders(Util.EMPTY_HEADERS, true);
698       }
699     }
700
701     @Override public void headers(boolean inFinished, int streamId, int associatedStreamId,
702         List<Header> headerBlock) {
703       if (pushedStream(streamId)) {
704         pushHeadersLater(streamId, headerBlock, inFinished);
705         return;
706       }
707       Http2Stream stream;
708       synchronized (Http2Connection.this) {
709         stream = getStream(streamId);
710
711         if (stream == null) {
712           // If we're shutdown, don't bother with this stream.
713           if (shutdown) return;
714
715           // If the stream ID is less than the last created ID, assume it's already closed.
716           if (streamId <= lastGoodStreamId) return;
717
718           // If the stream ID is in the client's namespace, assume it's already closed.
719           if (streamId % 2 == nextStreamId % 2) return;
720
721           // Create a stream.
722           Headers headers = Util.toHeaders(headerBlock);
723           final Http2Stream newStream = new Http2Stream(streamId, Http2Connection.this,
724               false, inFinished, headers);
725           lastGoodStreamId = streamId;
726           streams.put(streamId, newStream);
727           listenerExecutor.execute(new NamedRunnable(
728               "OkHttp %s stream %d", connectionName, streamId) {
729             @Override public void execute() {
730               try {
731                 listener.onStream(newStream);
732               } catch (IOException e) {
733                 Platform.get().log(
734                     INFO, "Http2Connection.Listener failure for " + connectionName, e);
735                 try {
736                   newStream.close(ErrorCode.PROTOCOL_ERROR, e);
737                 } catch (IOException ignored) {
738                 }
739               }
740             }
741           });
742           return;
743         }
744       }
745
746       // Update an existing stream.
747       stream.receiveHeaders(Util.toHeaders(headerBlock), inFinished);
748     }
749
750     @Override public void rstStream(int streamId, ErrorCode errorCode) {
751       if (pushedStream(streamId)) {
752         pushResetLater(streamId, errorCode);
753         return;
754       }
755       Http2Stream rstStream = removeStream(streamId);
756       if (rstStream != null) {
757         rstStream.receiveRstStream(errorCode);
758       }
759     }
760
761     @Override public void settings(boolean clearPrevious, Settings settings) {
762       try {
763         writerExecutor.execute(new NamedRunnable("OkHttp %s ACK Settings", connectionName) {
764           @Override public void execute() {
765             applyAndAckSettings(clearPrevious, settings);
766           }
767         });
768       } catch (RejectedExecutionException ignored) {
769         // This connection has been closed.
770       }
771     }
772
773     void applyAndAckSettings(boolean clearPrevious, Settings settings) {
774       long delta = 0;
775       Http2Stream[] streamsToNotify = null;
776       synchronized (writer) {
777         synchronized (Http2Connection.this) {
778           int priorWriteWindowSize = peerSettings.getInitialWindowSize();
779           if (clearPrevious) peerSettings.clear();
780           peerSettings.merge(settings);
781           int peerInitialWindowSize = peerSettings.getInitialWindowSize();
782           if (peerInitialWindowSize != -1 && peerInitialWindowSize != priorWriteWindowSize) {
783             delta = peerInitialWindowSize - priorWriteWindowSize;
784             streamsToNotify = !streams.isEmpty()
785                 ? streams.values().toArray(new Http2Stream[streams.size()])
786                 : null;
787           }
788         }
789         try {
790           writer.applyAndAckSettings(peerSettings);
791         } catch (IOException e) {
792           failConnection(e);
793         }
794       }
795       if (streamsToNotify != null) {
796         for (Http2Stream stream : streamsToNotify) {
797           synchronized (stream) {
798             stream.addBytesToWriteWindow(delta);
799           }
800         }
801       }
802       listenerExecutor.execute(new NamedRunnable("OkHttp %s settings", connectionName) {
803         @Override public void execute() {
804           listener.onSettings(Http2Connection.this);
805         }
806       });
807     }
808
809     @Override public void ackSettings() {
810       // TODO: If we don't get this callback after sending settings to the peer, SETTINGS_TIMEOUT.
811     }
812
813     @Override public void ping(boolean reply, int payload1, int payload2) {
814       if (reply) {
815         synchronized (Http2Connection.this) {
816           if (payload1 == INTERVAL_PING) {
817             intervalPongsReceived++;
818           } else if (payload1 == DEGRADED_PING) {
819             degradedPongsReceived++;
820           } else if (payload1 == AWAIT_PING) {
821             awaitPongsReceived++;
822             Http2Connection.this.notifyAll();
823           }
824         }
825       } else {
826         try {
827           // Send a reply to a client ping if this is a server and vice versa.
828           writerExecutor.execute(new PingRunnable(true, payload1, payload2));
829         } catch (RejectedExecutionException ignored) {
830           // This connection has been closed.
831         }
832       }
833     }
834
835     @Override public void goAway(int lastGoodStreamId, ErrorCode errorCode, ByteString debugData) {
836       if (debugData.size() > 0) { // TODO: log the debugData
837       }
838
839       // Copy the streams first. We don't want to hold a lock when we call receiveRstStream().
840       Http2Stream[] streamsCopy;
841       synchronized (Http2Connection.this) {
842         streamsCopy = streams.values().toArray(new Http2Stream[streams.size()]);
843         shutdown = true;
844       }
845
846       // Fail all streams created after the last good stream ID.
847       for (Http2Stream http2Stream : streamsCopy) {
848         if (http2Stream.getId() > lastGoodStreamId && http2Stream.isLocallyInitiated()) {
849           http2Stream.receiveRstStream(REFUSED_STREAM);
850           removeStream(http2Stream.getId());
851         }
852       }
853     }
854
855     @Override public void windowUpdate(int streamId, long windowSizeIncrement) {
856       if (streamId == 0) {
857         synchronized (Http2Connection.this) {
858           bytesLeftInWriteWindow += windowSizeIncrement;
859           Http2Connection.this.notifyAll();
860         }
861       } else {
862         Http2Stream stream = getStream(streamId);
863         if (stream != null) {
864           synchronized (stream) {
865             stream.addBytesToWriteWindow(windowSizeIncrement);
866           }
867         }
868       }
869     }
870
871     @Override public void priority(int streamId, int streamDependency, int weight,
872         boolean exclusive) {
873       // TODO: honor priority.
874     }
875
876     @Override
877     public void pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders) {
878       pushRequestLater(promisedStreamId, requestHeaders);
879     }
880
881     @Override public void alternateService(int streamId, String origin, ByteString protocol,
882         String host, int port, long maxAge) {
883       // TODO: register alternate service.
884     }
885   }
886
887   /** Even, positive numbered streams are pushed streams in HTTP/2. */
888   boolean pushedStream(int streamId) {
889     return streamId != 0 && (streamId & 1) == 0;
890   }
891
892   // Guarded by this.
893   final Set<Integer> currentPushRequests = new LinkedHashSet<>();
894
895   void pushRequestLater(final int streamId, final List<Header> requestHeaders) {
896     synchronized (this) {
897       if (currentPushRequests.contains(streamId)) {
898         writeSynResetLater(streamId, ErrorCode.PROTOCOL_ERROR);
899         return;
900       }
901       currentPushRequests.add(streamId);
902     }
903     try {
904       pushExecutorExecute(new NamedRunnable(
905           "OkHttp %s Push Request[%s]", connectionName, streamId) {
906         @Override public void execute() {
907           boolean cancel = pushObserver.onRequest(streamId, requestHeaders);
908           try {
909             if (cancel) {
910               writer.rstStream(streamId, ErrorCode.CANCEL);
911               synchronized (Http2Connection.this) {
912                 currentPushRequests.remove(streamId);
913               }
914             }
915           } catch (IOException ignored) {
916           }
917         }
918       });
919     } catch (RejectedExecutionException ignored) {
920       // This connection has been closed.
921     }
922   }
923
924   void pushHeadersLater(final int streamId, final List<Header> requestHeaders,
925       final boolean inFinished) {
926     try {
927       pushExecutorExecute(new NamedRunnable(
928           "OkHttp %s Push Headers[%s]", connectionName, streamId) {
929         @Override public void execute() {
930           boolean cancel = pushObserver.onHeaders(streamId, requestHeaders, inFinished);
931           try {
932             if (cancel) writer.rstStream(streamId, ErrorCode.CANCEL);
933             if (cancel || inFinished) {
934               synchronized (Http2Connection.this) {
935                 currentPushRequests.remove(streamId);
936               }
937             }
938           } catch (IOException ignored) {
939           }
940         }
941       });
942     } catch (RejectedExecutionException ignored) {
943       // This connection has been closed.
944     }
945   }
946
947   /**
948    * Eagerly reads {@code byteCount} bytes from the source before launching a background task to
949    * process the data.  This avoids corrupting the stream.
950    */

951   void pushDataLater(final int streamId, final BufferedSource source, final int byteCount,
952       final boolean inFinished) throws IOException {
953     final Buffer buffer = new Buffer();
954     source.require(byteCount); // Eagerly read the frame before firing client thread.
955     source.read(buffer, byteCount);
956     if (buffer.size() != byteCount) throw new IOException(buffer.size() + " != " + byteCount);
957     pushExecutorExecute(new NamedRunnable("OkHttp %s Push Data[%s]", connectionName, streamId) {
958       @Override public void execute() {
959         try {
960           boolean cancel = pushObserver.onData(streamId, buffer, byteCount, inFinished);
961           if (cancel) writer.rstStream(streamId, ErrorCode.CANCEL);
962           if (cancel || inFinished) {
963             synchronized (Http2Connection.this) {
964               currentPushRequests.remove(streamId);
965             }
966           }
967         } catch (IOException ignored) {
968         }
969       }
970     });
971   }
972
973   void pushResetLater(final int streamId, final ErrorCode errorCode) {
974     pushExecutorExecute(new NamedRunnable("OkHttp %s Push Reset[%s]", connectionName, streamId) {
975       @Override public void execute() {
976         pushObserver.onReset(streamId, errorCode);
977         synchronized (Http2Connection.this) {
978           currentPushRequests.remove(streamId);
979         }
980       }
981     });
982   }
983
984   private synchronized void pushExecutorExecute(NamedRunnable namedRunnable) {
985     if (!shutdown) {
986       pushExecutor.execute(namedRunnable);
987     }
988   }
989
990   /** Listener of streams and settings initiated by the peer. */
991   public abstract static class Listener {
992     public static final Listener REFUSE_INCOMING_STREAMS = new Listener() {
993       @Override public void onStream(Http2Stream stream) throws IOException {
994         stream.close(REFUSED_STREAM, null);
995       }
996     };
997
998     /**
999      * Handle a new stream from this connection's peer. Implementations should respond by either
1000      * {@linkplain Http2Stream#writeHeaders replying to the stream} or {@linkplain
1001      * Http2Stream#close closing it}. This response does not need to be synchronous.
1002      */

1003     public abstract void onStream(Http2Stream stream) throws IOException;
1004
1005     /**
1006      * Notification that the connection's peer's settings may have changed. Implementations should
1007      * take appropriate action to handle the updated settings.
1008      *
1009      * <p>It is the implementation's responsibility to handle concurrent calls to this method. A
1010      * remote peer that sends multiple settings frames will trigger multiple calls to this method,
1011      * and those calls are not necessarily serialized.
1012      */

1013     public void onSettings(Http2Connection connection) {
1014     }
1015   }
1016 }
1017