1
16 package okhttp3.internal.http2;
17
18 import java.io.Closeable;
19 import java.io.IOException;
20 import java.io.InterruptedIOException;
21 import java.net.InetSocketAddress;
22 import java.net.Socket;
23 import java.net.SocketAddress;
24 import java.util.LinkedHashMap;
25 import java.util.LinkedHashSet;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Set;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.LinkedBlockingQueue;
31 import java.util.concurrent.RejectedExecutionException;
32 import java.util.concurrent.ScheduledExecutorService;
33 import java.util.concurrent.ScheduledThreadPoolExecutor;
34 import java.util.concurrent.SynchronousQueue;
35 import java.util.concurrent.ThreadPoolExecutor;
36 import java.util.concurrent.TimeUnit;
37 import javax.annotation.Nullable;
38 import okhttp3.Headers;
39 import okhttp3.internal.NamedRunnable;
40 import okhttp3.internal.Util;
41 import okhttp3.internal.platform.Platform;
42 import okio.Buffer;
43 import okio.BufferedSink;
44 import okio.BufferedSource;
45 import okio.ByteString;
46 import okio.Okio;
47
48 import static java.util.concurrent.TimeUnit.MILLISECONDS;
49 import static okhttp3.internal.http2.ErrorCode.REFUSED_STREAM;
50 import static okhttp3.internal.http2.Settings.DEFAULT_INITIAL_WINDOW_SIZE;
51 import static okhttp3.internal.platform.Platform.INFO;
52
53
62 public final class Http2Connection implements Closeable {
63
64
65
66
67
68
69
70
71
72
73
74
75
76 static final int OKHTTP_CLIENT_WINDOW_SIZE = 16 * 1024 * 1024;
77
78 static final int INTERVAL_PING = 1;
79 static final int DEGRADED_PING = 2;
80 static final int AWAIT_PING = 3;
81 static final long DEGRADED_PONG_TIMEOUT_NS = 1_000_000_000L;
82
83
87 private static final ExecutorService listenerExecutor = new ThreadPoolExecutor(0,
88 Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<>(),
89 Util.threadFactory("OkHttp Http2Connection", true));
90
91
92 final boolean client;
93
94
98 final Listener listener;
99 final Map<Integer, Http2Stream> streams = new LinkedHashMap<>();
100 final String connectionName;
101 int lastGoodStreamId;
102 int nextStreamId;
103 private boolean shutdown;
104
105
106 private final ScheduledExecutorService writerExecutor;
107
108
109 private final ExecutorService pushExecutor;
110
111
112 final PushObserver pushObserver;
113
114
115 private long intervalPingsSent = 0L;
116 private long intervalPongsReceived = 0L;
117 private long degradedPingsSent = 0L;
118 private long degradedPongsReceived = 0L;
119 private long awaitPingsSent = 0L;
120 private long awaitPongsReceived = 0L;
121
122
123 private long degradedPongDeadlineNs = 0L;
124
125
129
130 long unacknowledgedBytesRead = 0;
131
132
135
136 long bytesLeftInWriteWindow;
137
138
139 Settings okHttpSettings = new Settings();
140
141
142
143 final Settings peerSettings = new Settings();
144
145 final Socket socket;
146 final Http2Writer writer;
147
148
149 final ReaderRunnable readerRunnable;
150
151 Http2Connection(Builder builder) {
152 pushObserver = builder.pushObserver;
153 client = builder.client;
154 listener = builder.listener;
155
156 nextStreamId = builder.client ? 1 : 2;
157 if (builder.client) {
158 nextStreamId += 2;
159 }
160
161
162
163
164
165 if (builder.client) {
166 okHttpSettings.set(Settings.INITIAL_WINDOW_SIZE, OKHTTP_CLIENT_WINDOW_SIZE);
167 }
168
169 connectionName = builder.connectionName;
170
171 writerExecutor = new ScheduledThreadPoolExecutor(1,
172 Util.threadFactory(Util.format("OkHttp %s Writer", connectionName), false));
173 if (builder.pingIntervalMillis != 0) {
174 writerExecutor.scheduleAtFixedRate(new IntervalPingRunnable(),
175 builder.pingIntervalMillis, builder.pingIntervalMillis, MILLISECONDS);
176 }
177
178
179 pushExecutor = new ThreadPoolExecutor(0, 1, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),
180 Util.threadFactory(Util.format("OkHttp %s Push Observer", connectionName), true));
181 peerSettings.set(Settings.INITIAL_WINDOW_SIZE, DEFAULT_INITIAL_WINDOW_SIZE);
182 peerSettings.set(Settings.MAX_FRAME_SIZE, Http2.INITIAL_MAX_FRAME_SIZE);
183 bytesLeftInWriteWindow = peerSettings.getInitialWindowSize();
184 socket = builder.socket;
185 writer = new Http2Writer(builder.sink, client);
186
187 readerRunnable = new ReaderRunnable(new Http2Reader(builder.source, client));
188 }
189
190
193 public synchronized int openStreamCount() {
194 return streams.size();
195 }
196
197 synchronized Http2Stream getStream(int id) {
198 return streams.get(id);
199 }
200
201 synchronized Http2Stream removeStream(int streamId) {
202 Http2Stream stream = streams.remove(streamId);
203 notifyAll();
204 return stream;
205 }
206
207 public synchronized int maxConcurrentStreams() {
208 return peerSettings.getMaxConcurrentStreams(Integer.MAX_VALUE);
209 }
210
211 synchronized void updateConnectionFlowControl(long read) {
212 unacknowledgedBytesRead += read;
213 if (unacknowledgedBytesRead >= okHttpSettings.getInitialWindowSize() / 2) {
214 writeWindowUpdateLater(0, unacknowledgedBytesRead);
215 unacknowledgedBytesRead = 0;
216 }
217 }
218
219
226 public Http2Stream pushStream(int associatedStreamId, List<Header> requestHeaders, boolean out)
227 throws IOException {
228 if (client) throw new IllegalStateException("Client cannot push requests.");
229 return newStream(associatedStreamId, requestHeaders, out);
230 }
231
232
237 public Http2Stream newStream(List<Header> requestHeaders, boolean out) throws IOException {
238 return newStream(0, requestHeaders, out);
239 }
240
241 private Http2Stream newStream(
242 int associatedStreamId, List<Header> requestHeaders, boolean out) throws IOException {
243 boolean outFinished = !out;
244 boolean inFinished = false;
245 boolean flushHeaders;
246 Http2Stream stream;
247 int streamId;
248
249 synchronized (writer) {
250 synchronized (this) {
251 if (nextStreamId > Integer.MAX_VALUE / 2) {
252 shutdown(REFUSED_STREAM);
253 }
254 if (shutdown) {
255 throw new ConnectionShutdownException();
256 }
257 streamId = nextStreamId;
258 nextStreamId += 2;
259 stream = new Http2Stream(streamId, this, outFinished, inFinished, null);
260 flushHeaders = !out || bytesLeftInWriteWindow == 0L || stream.bytesLeftInWriteWindow == 0L;
261 if (stream.isOpen()) {
262 streams.put(streamId, stream);
263 }
264 }
265 if (associatedStreamId == 0) {
266 writer.headers(outFinished, streamId, requestHeaders);
267 } else if (client) {
268 throw new IllegalArgumentException("client streams shouldn't have associated stream IDs");
269 } else {
270 writer.pushPromise(associatedStreamId, streamId, requestHeaders);
271 }
272 }
273
274 if (flushHeaders) {
275 writer.flush();
276 }
277
278 return stream;
279 }
280
281 void writeHeaders(int streamId, boolean outFinished, List<Header> alternating)
282 throws IOException {
283 writer.headers(outFinished, streamId, alternating);
284 }
285
286
298 public void writeData(int streamId, boolean outFinished, Buffer buffer, long byteCount)
299 throws IOException {
300 if (byteCount == 0) {
301 writer.data(outFinished, streamId, buffer, 0);
302 return;
303 }
304
305 while (byteCount > 0) {
306 int toWrite;
307 synchronized (Http2Connection.this) {
308 try {
309 while (bytesLeftInWriteWindow <= 0) {
310
311
312 if (!streams.containsKey(streamId)) {
313 throw new IOException("stream closed");
314 }
315 Http2Connection.this.wait();
316 }
317 } catch (InterruptedException e) {
318 Thread.currentThread().interrupt();
319 throw new InterruptedIOException();
320 }
321
322 toWrite = (int) Math.min(byteCount, bytesLeftInWriteWindow);
323 toWrite = Math.min(toWrite, writer.maxDataLength());
324 bytesLeftInWriteWindow -= toWrite;
325 }
326
327 byteCount -= toWrite;
328 writer.data(outFinished && byteCount == 0, streamId, buffer, toWrite);
329 }
330 }
331
332 void writeSynResetLater(final int streamId, final ErrorCode errorCode) {
333 try {
334 writerExecutor.execute(new NamedRunnable("OkHttp %s stream %d", connectionName, streamId) {
335 @Override public void execute() {
336 try {
337 writeSynReset(streamId, errorCode);
338 } catch (IOException e) {
339 failConnection(e);
340 }
341 }
342 });
343 } catch (RejectedExecutionException ignored) {
344
345 }
346 }
347
348 void writeSynReset(int streamId, ErrorCode statusCode) throws IOException {
349 writer.rstStream(streamId, statusCode);
350 }
351
352 void writeWindowUpdateLater(final int streamId, final long unacknowledgedBytesRead) {
353 try {
354 writerExecutor.execute(
355 new NamedRunnable("OkHttp Window Update %s stream %d", connectionName, streamId) {
356 @Override public void execute() {
357 try {
358 writer.windowUpdate(streamId, unacknowledgedBytesRead);
359 } catch (IOException e) {
360 failConnection(e);
361 }
362 }
363 });
364 } catch (RejectedExecutionException ignored) {
365
366 }
367 }
368
369 final class PingRunnable extends NamedRunnable {
370 final boolean reply;
371 final int payload1;
372 final int payload2;
373
374 PingRunnable(boolean reply, int payload1, int payload2) {
375 super("OkHttp %s ping %08x%08x", connectionName, payload1, payload2);
376 this.reply = reply;
377 this.payload1 = payload1;
378 this.payload2 = payload2;
379 }
380
381 @Override public void execute() {
382 writePing(reply, payload1, payload2);
383 }
384 }
385
386 final class IntervalPingRunnable extends NamedRunnable {
387 IntervalPingRunnable() {
388 super("OkHttp %s ping", connectionName);
389 }
390
391 @Override public void execute() {
392 boolean failDueToMissingPong;
393 synchronized (Http2Connection.this) {
394 if (intervalPongsReceived < intervalPingsSent) {
395 failDueToMissingPong = true;
396 } else {
397 intervalPingsSent++;
398 failDueToMissingPong = false;
399 }
400 }
401 if (failDueToMissingPong) {
402 failConnection(null);
403 } else {
404 writePing(false, INTERVAL_PING, 0);
405 }
406 }
407 }
408
409 void writePing(boolean reply, int payload1, int payload2) {
410 try {
411 writer.ping(reply, payload1, payload2);
412 } catch (IOException e) {
413 failConnection(e);
414 }
415 }
416
417
418 void writePingAndAwaitPong() throws InterruptedException {
419 writePing();
420 awaitPong();
421 }
422
423
424 void writePing() {
425 synchronized (this) {
426 awaitPingsSent++;
427 }
428 writePing(false, AWAIT_PING, 0x4f4b6f6b );
429 }
430
431
432 synchronized void awaitPong() throws InterruptedException {
433 while (awaitPongsReceived < awaitPingsSent) {
434 wait();
435 }
436 }
437
438 public void flush() throws IOException {
439 writer.flush();
440 }
441
442
447 public void shutdown(ErrorCode statusCode) throws IOException {
448 synchronized (writer) {
449 int lastGoodStreamId;
450 synchronized (this) {
451 if (shutdown) {
452 return;
453 }
454 shutdown = true;
455 lastGoodStreamId = this.lastGoodStreamId;
456 }
457
458
459 writer.goAway(lastGoodStreamId, statusCode, Util.EMPTY_BYTE_ARRAY);
460 }
461 }
462
463
467 @Override public void close() {
468 close(ErrorCode.NO_ERROR, ErrorCode.CANCEL, null);
469 }
470
471 void close(ErrorCode connectionCode, ErrorCode streamCode, @Nullable IOException cause) {
472 assert (!Thread.holdsLock(this));
473 try {
474 shutdown(connectionCode);
475 } catch (IOException ignored) {
476 }
477
478 Http2Stream[] streamsToClose = null;
479 synchronized (this) {
480 if (!streams.isEmpty()) {
481 streamsToClose = streams.values().toArray(new Http2Stream[streams.size()]);
482 streams.clear();
483 }
484 }
485
486 if (streamsToClose != null) {
487 for (Http2Stream stream : streamsToClose) {
488 try {
489 stream.close(streamCode, cause);
490 } catch (IOException ignored) {
491 }
492 }
493 }
494
495
496 try {
497 writer.close();
498 } catch (IOException ignored) {
499 }
500
501
502 try {
503 socket.close();
504 } catch (IOException ignored) {
505 }
506
507
508 writerExecutor.shutdown();
509 pushExecutor.shutdown();
510 }
511
512 private void failConnection(@Nullable IOException e) {
513 close(ErrorCode.PROTOCOL_ERROR, ErrorCode.PROTOCOL_ERROR, e);
514 }
515
516
520 public void start() throws IOException {
521 start(true);
522 }
523
524
528 void start(boolean sendConnectionPreface) throws IOException {
529 if (sendConnectionPreface) {
530 writer.connectionPreface();
531 writer.settings(okHttpSettings);
532 int windowSize = okHttpSettings.getInitialWindowSize();
533 if (windowSize != Settings.DEFAULT_INITIAL_WINDOW_SIZE) {
534 writer.windowUpdate(0, windowSize - Settings.DEFAULT_INITIAL_WINDOW_SIZE);
535 }
536 }
537 new Thread(readerRunnable).start();
538 }
539
540
541 public void setSettings(Settings settings) throws IOException {
542 synchronized (writer) {
543 synchronized (this) {
544 if (shutdown) {
545 throw new ConnectionShutdownException();
546 }
547 okHttpSettings.merge(settings);
548 }
549 writer.settings(settings);
550 }
551 }
552
553 public synchronized boolean isHealthy(long nowNs) {
554 if (shutdown) return false;
555
556
557 if (degradedPongsReceived < degradedPingsSent && nowNs >= degradedPongDeadlineNs) return false;
558
559 return true;
560 }
561
562
577 void sendDegradedPingLater() {
578 synchronized (this) {
579 if (degradedPongsReceived < degradedPingsSent) return;
580 degradedPingsSent++;
581 degradedPongDeadlineNs = System.nanoTime() + DEGRADED_PONG_TIMEOUT_NS;
582 }
583 try {
584 writerExecutor.execute(new NamedRunnable("OkHttp %s ping", connectionName) {
585 @Override public void execute() {
586 writePing(false, DEGRADED_PING, 0);
587 }
588 });
589 } catch (RejectedExecutionException ignored) {
590
591 }
592 }
593
594 public static class Builder {
595 Socket socket;
596 String connectionName;
597 BufferedSource source;
598 BufferedSink sink;
599 Listener listener = Listener.REFUSE_INCOMING_STREAMS;
600 PushObserver pushObserver = PushObserver.CANCEL;
601 boolean client;
602 int pingIntervalMillis;
603
604
608 public Builder(boolean client) {
609 this.client = client;
610 }
611
612 public Builder socket(Socket socket) throws IOException {
613 SocketAddress remoteSocketAddress = socket.getRemoteSocketAddress();
614 String connectionName = remoteSocketAddress instanceof InetSocketAddress
615 ? ((InetSocketAddress) remoteSocketAddress).getHostName()
616 : remoteSocketAddress.toString();
617 return socket(socket, connectionName,
618 Okio.buffer(Okio.source(socket)), Okio.buffer(Okio.sink(socket)));
619 }
620
621 public Builder socket(
622 Socket socket, String connectionName, BufferedSource source, BufferedSink sink) {
623 this.socket = socket;
624 this.connectionName = connectionName;
625 this.source = source;
626 this.sink = sink;
627 return this;
628 }
629
630 public Builder listener(Listener listener) {
631 this.listener = listener;
632 return this;
633 }
634
635 public Builder pushObserver(PushObserver pushObserver) {
636 this.pushObserver = pushObserver;
637 return this;
638 }
639
640 public Builder pingIntervalMillis(int pingIntervalMillis) {
641 this.pingIntervalMillis = pingIntervalMillis;
642 return this;
643 }
644
645 public Http2Connection build() {
646 return new Http2Connection(this);
647 }
648 }
649
650
654 class ReaderRunnable extends NamedRunnable implements Http2Reader.Handler {
655 final Http2Reader reader;
656
657 ReaderRunnable(Http2Reader reader) {
658 super("OkHttp %s", connectionName);
659 this.reader = reader;
660 }
661
662 @Override protected void execute() {
663 ErrorCode connectionErrorCode = ErrorCode.INTERNAL_ERROR;
664 ErrorCode streamErrorCode = ErrorCode.INTERNAL_ERROR;
665 IOException errorException = null;
666 try {
667 reader.readConnectionPreface(this);
668 while (reader.nextFrame(false, this)) {
669 }
670 connectionErrorCode = ErrorCode.NO_ERROR;
671 streamErrorCode = ErrorCode.CANCEL;
672 } catch (IOException e) {
673 errorException = e;
674 connectionErrorCode = ErrorCode.PROTOCOL_ERROR;
675 streamErrorCode = ErrorCode.PROTOCOL_ERROR;
676 } finally {
677 close(connectionErrorCode, streamErrorCode, errorException);
678 Util.closeQuietly(reader);
679 }
680 }
681
682 @Override public void data(boolean inFinished, int streamId, BufferedSource source, int length)
683 throws IOException {
684 if (pushedStream(streamId)) {
685 pushDataLater(streamId, source, length, inFinished);
686 return;
687 }
688 Http2Stream dataStream = getStream(streamId);
689 if (dataStream == null) {
690 writeSynResetLater(streamId, ErrorCode.PROTOCOL_ERROR);
691 updateConnectionFlowControl(length);
692 source.skip(length);
693 return;
694 }
695 dataStream.receiveData(source, length);
696 if (inFinished) {
697 dataStream.receiveHeaders(Util.EMPTY_HEADERS, true);
698 }
699 }
700
701 @Override public void headers(boolean inFinished, int streamId, int associatedStreamId,
702 List<Header> headerBlock) {
703 if (pushedStream(streamId)) {
704 pushHeadersLater(streamId, headerBlock, inFinished);
705 return;
706 }
707 Http2Stream stream;
708 synchronized (Http2Connection.this) {
709 stream = getStream(streamId);
710
711 if (stream == null) {
712
713 if (shutdown) return;
714
715
716 if (streamId <= lastGoodStreamId) return;
717
718
719 if (streamId % 2 == nextStreamId % 2) return;
720
721
722 Headers headers = Util.toHeaders(headerBlock);
723 final Http2Stream newStream = new Http2Stream(streamId, Http2Connection.this,
724 false, inFinished, headers);
725 lastGoodStreamId = streamId;
726 streams.put(streamId, newStream);
727 listenerExecutor.execute(new NamedRunnable(
728 "OkHttp %s stream %d", connectionName, streamId) {
729 @Override public void execute() {
730 try {
731 listener.onStream(newStream);
732 } catch (IOException e) {
733 Platform.get().log(
734 INFO, "Http2Connection.Listener failure for " + connectionName, e);
735 try {
736 newStream.close(ErrorCode.PROTOCOL_ERROR, e);
737 } catch (IOException ignored) {
738 }
739 }
740 }
741 });
742 return;
743 }
744 }
745
746
747 stream.receiveHeaders(Util.toHeaders(headerBlock), inFinished);
748 }
749
750 @Override public void rstStream(int streamId, ErrorCode errorCode) {
751 if (pushedStream(streamId)) {
752 pushResetLater(streamId, errorCode);
753 return;
754 }
755 Http2Stream rstStream = removeStream(streamId);
756 if (rstStream != null) {
757 rstStream.receiveRstStream(errorCode);
758 }
759 }
760
761 @Override public void settings(boolean clearPrevious, Settings settings) {
762 try {
763 writerExecutor.execute(new NamedRunnable("OkHttp %s ACK Settings", connectionName) {
764 @Override public void execute() {
765 applyAndAckSettings(clearPrevious, settings);
766 }
767 });
768 } catch (RejectedExecutionException ignored) {
769
770 }
771 }
772
773 void applyAndAckSettings(boolean clearPrevious, Settings settings) {
774 long delta = 0;
775 Http2Stream[] streamsToNotify = null;
776 synchronized (writer) {
777 synchronized (Http2Connection.this) {
778 int priorWriteWindowSize = peerSettings.getInitialWindowSize();
779 if (clearPrevious) peerSettings.clear();
780 peerSettings.merge(settings);
781 int peerInitialWindowSize = peerSettings.getInitialWindowSize();
782 if (peerInitialWindowSize != -1 && peerInitialWindowSize != priorWriteWindowSize) {
783 delta = peerInitialWindowSize - priorWriteWindowSize;
784 streamsToNotify = !streams.isEmpty()
785 ? streams.values().toArray(new Http2Stream[streams.size()])
786 : null;
787 }
788 }
789 try {
790 writer.applyAndAckSettings(peerSettings);
791 } catch (IOException e) {
792 failConnection(e);
793 }
794 }
795 if (streamsToNotify != null) {
796 for (Http2Stream stream : streamsToNotify) {
797 synchronized (stream) {
798 stream.addBytesToWriteWindow(delta);
799 }
800 }
801 }
802 listenerExecutor.execute(new NamedRunnable("OkHttp %s settings", connectionName) {
803 @Override public void execute() {
804 listener.onSettings(Http2Connection.this);
805 }
806 });
807 }
808
809 @Override public void ackSettings() {
810
811 }
812
813 @Override public void ping(boolean reply, int payload1, int payload2) {
814 if (reply) {
815 synchronized (Http2Connection.this) {
816 if (payload1 == INTERVAL_PING) {
817 intervalPongsReceived++;
818 } else if (payload1 == DEGRADED_PING) {
819 degradedPongsReceived++;
820 } else if (payload1 == AWAIT_PING) {
821 awaitPongsReceived++;
822 Http2Connection.this.notifyAll();
823 }
824 }
825 } else {
826 try {
827
828 writerExecutor.execute(new PingRunnable(true, payload1, payload2));
829 } catch (RejectedExecutionException ignored) {
830
831 }
832 }
833 }
834
835 @Override public void goAway(int lastGoodStreamId, ErrorCode errorCode, ByteString debugData) {
836 if (debugData.size() > 0) {
837 }
838
839
840 Http2Stream[] streamsCopy;
841 synchronized (Http2Connection.this) {
842 streamsCopy = streams.values().toArray(new Http2Stream[streams.size()]);
843 shutdown = true;
844 }
845
846
847 for (Http2Stream http2Stream : streamsCopy) {
848 if (http2Stream.getId() > lastGoodStreamId && http2Stream.isLocallyInitiated()) {
849 http2Stream.receiveRstStream(REFUSED_STREAM);
850 removeStream(http2Stream.getId());
851 }
852 }
853 }
854
855 @Override public void windowUpdate(int streamId, long windowSizeIncrement) {
856 if (streamId == 0) {
857 synchronized (Http2Connection.this) {
858 bytesLeftInWriteWindow += windowSizeIncrement;
859 Http2Connection.this.notifyAll();
860 }
861 } else {
862 Http2Stream stream = getStream(streamId);
863 if (stream != null) {
864 synchronized (stream) {
865 stream.addBytesToWriteWindow(windowSizeIncrement);
866 }
867 }
868 }
869 }
870
871 @Override public void priority(int streamId, int streamDependency, int weight,
872 boolean exclusive) {
873
874 }
875
876 @Override
877 public void pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders) {
878 pushRequestLater(promisedStreamId, requestHeaders);
879 }
880
881 @Override public void alternateService(int streamId, String origin, ByteString protocol,
882 String host, int port, long maxAge) {
883
884 }
885 }
886
887
888 boolean pushedStream(int streamId) {
889 return streamId != 0 && (streamId & 1) == 0;
890 }
891
892
893 final Set<Integer> currentPushRequests = new LinkedHashSet<>();
894
895 void pushRequestLater(final int streamId, final List<Header> requestHeaders) {
896 synchronized (this) {
897 if (currentPushRequests.contains(streamId)) {
898 writeSynResetLater(streamId, ErrorCode.PROTOCOL_ERROR);
899 return;
900 }
901 currentPushRequests.add(streamId);
902 }
903 try {
904 pushExecutorExecute(new NamedRunnable(
905 "OkHttp %s Push Request[%s]", connectionName, streamId) {
906 @Override public void execute() {
907 boolean cancel = pushObserver.onRequest(streamId, requestHeaders);
908 try {
909 if (cancel) {
910 writer.rstStream(streamId, ErrorCode.CANCEL);
911 synchronized (Http2Connection.this) {
912 currentPushRequests.remove(streamId);
913 }
914 }
915 } catch (IOException ignored) {
916 }
917 }
918 });
919 } catch (RejectedExecutionException ignored) {
920
921 }
922 }
923
924 void pushHeadersLater(final int streamId, final List<Header> requestHeaders,
925 final boolean inFinished) {
926 try {
927 pushExecutorExecute(new NamedRunnable(
928 "OkHttp %s Push Headers[%s]", connectionName, streamId) {
929 @Override public void execute() {
930 boolean cancel = pushObserver.onHeaders(streamId, requestHeaders, inFinished);
931 try {
932 if (cancel) writer.rstStream(streamId, ErrorCode.CANCEL);
933 if (cancel || inFinished) {
934 synchronized (Http2Connection.this) {
935 currentPushRequests.remove(streamId);
936 }
937 }
938 } catch (IOException ignored) {
939 }
940 }
941 });
942 } catch (RejectedExecutionException ignored) {
943
944 }
945 }
946
947
951 void pushDataLater(final int streamId, final BufferedSource source, final int byteCount,
952 final boolean inFinished) throws IOException {
953 final Buffer buffer = new Buffer();
954 source.require(byteCount);
955 source.read(buffer, byteCount);
956 if (buffer.size() != byteCount) throw new IOException(buffer.size() + " != " + byteCount);
957 pushExecutorExecute(new NamedRunnable("OkHttp %s Push Data[%s]", connectionName, streamId) {
958 @Override public void execute() {
959 try {
960 boolean cancel = pushObserver.onData(streamId, buffer, byteCount, inFinished);
961 if (cancel) writer.rstStream(streamId, ErrorCode.CANCEL);
962 if (cancel || inFinished) {
963 synchronized (Http2Connection.this) {
964 currentPushRequests.remove(streamId);
965 }
966 }
967 } catch (IOException ignored) {
968 }
969 }
970 });
971 }
972
973 void pushResetLater(final int streamId, final ErrorCode errorCode) {
974 pushExecutorExecute(new NamedRunnable("OkHttp %s Push Reset[%s]", connectionName, streamId) {
975 @Override public void execute() {
976 pushObserver.onReset(streamId, errorCode);
977 synchronized (Http2Connection.this) {
978 currentPushRequests.remove(streamId);
979 }
980 }
981 });
982 }
983
984 private synchronized void pushExecutorExecute(NamedRunnable namedRunnable) {
985 if (!shutdown) {
986 pushExecutor.execute(namedRunnable);
987 }
988 }
989
990
991 public abstract static class Listener {
992 public static final Listener REFUSE_INCOMING_STREAMS = new Listener() {
993 @Override public void onStream(Http2Stream stream) throws IOException {
994 stream.close(REFUSED_STREAM, null);
995 }
996 };
997
998
1003 public abstract void onStream(Http2Stream stream) throws IOException;
1004
1005
1013 public void onSettings(Http2Connection connection) {
1014 }
1015 }
1016 }
1017