1 /*
2 * Copyright (C) 2014 Square, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 package okio;
17
18 import java.io.IOException;
19 import java.io.InterruptedIOException;
20 import java.util.concurrent.TimeUnit;
21 import javax.annotation.Nullable;
22
23 import static okio.Util.checkOffsetAndCount;
24
25 /**
26 * This timeout uses a background thread to take action exactly when the timeout occurs. Use this to
27 * implement timeouts where they aren't supported natively, such as to sockets that are blocked on
28 * writing.
29 *
30 * <p>Subclasses should override {@link #timedOut} to take action when a timeout occurs. This method
31 * will be invoked by the shared watchdog thread so it should not do any long-running operations.
32 * Otherwise we risk starving other timeouts from being triggered.
33 *
34 * <p>Use {@link #sink} and {@link #source} to apply this timeout to a stream. The returned value
35 * will apply the timeout to each operation on the wrapped stream.
36 *
37 * <p>Callers should call {@link #enter} before doing work that is subject to timeouts, and {@link
38 * #exit} afterwards. The return value of {@link #exit} indicates whether a timeout was triggered.
39 * Note that the call to {@link #timedOut} is asynchronous, and may be called after {@link #exit}.
40 */
41 public class AsyncTimeout extends Timeout {
42 /**
43 * Don't write more than 64 KiB of data at a time, give or take a segment. Otherwise slow
44 * connections may suffer timeouts even when they're making (slow) progress. Without this, writing
45 * a single 1 MiB buffer may never succeed on a sufficiently slow connection.
46 */
47 private static final int TIMEOUT_WRITE_SIZE = 64 * 1024;
48
49 /** Duration for the watchdog thread to be idle before it shuts itself down. */
50 private static final long IDLE_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(60);
51 private static final long IDLE_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(IDLE_TIMEOUT_MILLIS);
52
53 /**
54 * The watchdog thread processes a linked list of pending timeouts, sorted in the order to be
55 * triggered. This class synchronizes on AsyncTimeout.class. This lock guards the queue.
56 *
57 * <p>Head's 'next' points to the first element of the linked list. The first element is the next
58 * node to time out, or null if the queue is empty. The head is null until the watchdog thread is
59 * started and also after being idle for {@link #IDLE_TIMEOUT_MILLIS}.
60 */
61 static @Nullable AsyncTimeout head;
62
63 /** True if this node is currently in the queue. */
64 private boolean inQueue;
65
66 /** The next node in the linked list. */
67 private @Nullable AsyncTimeout next;
68
69 /** If scheduled, this is the time that the watchdog should time this out. */
70 private long timeoutAt;
71
72 public final void enter() {
73 if (inQueue) throw new IllegalStateException("Unbalanced enter/exit");
74 long timeoutNanos = timeoutNanos();
75 boolean hasDeadline = hasDeadline();
76 if (timeoutNanos == 0 && !hasDeadline) {
77 return; // No timeout and no deadline? Don't bother with the queue.
78 }
79 inQueue = true;
80 scheduleTimeout(this, timeoutNanos, hasDeadline);
81 }
82
83 private static synchronized void scheduleTimeout(
84 AsyncTimeout node, long timeoutNanos, boolean hasDeadline) {
85 // Start the watchdog thread and create the head node when the first timeout is scheduled.
86 if (head == null) {
87 head = new AsyncTimeout();
88 new Watchdog().start();
89 }
90
91 long now = System.nanoTime();
92 if (timeoutNanos != 0 && hasDeadline) {
93 // Compute the earliest event; either timeout or deadline. Because nanoTime can wrap around,
94 // Math.min() is undefined for absolute values, but meaningful for relative ones.
95 node.timeoutAt = now + Math.min(timeoutNanos, node.deadlineNanoTime() - now);
96 } else if (timeoutNanos != 0) {
97 node.timeoutAt = now + timeoutNanos;
98 } else if (hasDeadline) {
99 node.timeoutAt = node.deadlineNanoTime();
100 } else {
101 throw new AssertionError();
102 }
103
104 // Insert the node in sorted order.
105 long remainingNanos = node.remainingNanos(now);
106 for (AsyncTimeout prev = head; true; prev = prev.next) {
107 if (prev.next == null || remainingNanos < prev.next.remainingNanos(now)) {
108 node.next = prev.next;
109 prev.next = node;
110 if (prev == head) {
111 AsyncTimeout.class.notify(); // Wake up the watchdog when inserting at the front.
112 }
113 break;
114 }
115 }
116 }
117
118 /** Returns true if the timeout occurred. */
119 public final boolean exit() {
120 if (!inQueue) return false;
121 inQueue = false;
122 return cancelScheduledTimeout(this);
123 }
124
125 /** Returns true if the timeout occurred. */
126 private static synchronized boolean cancelScheduledTimeout(AsyncTimeout node) {
127 // Remove the node from the linked list.
128 for (AsyncTimeout prev = head; prev != null; prev = prev.next) {
129 if (prev.next == node) {
130 prev.next = node.next;
131 node.next = null;
132 return false;
133 }
134 }
135
136 // The node wasn't found in the linked list: it must have timed out!
137 return true;
138 }
139
140 /**
141 * Returns the amount of time left until the time out. This will be negative if the timeout has
142 * elapsed and the timeout should occur immediately.
143 */
144 private long remainingNanos(long now) {
145 return timeoutAt - now;
146 }
147
148 /**
149 * Invoked by the watchdog thread when the time between calls to {@link #enter()} and {@link
150 * #exit()} has exceeded the timeout.
151 */
152 protected void timedOut() {
153 }
154
155 /**
156 * Returns a new sink that delegates to {@code sink}, using this to implement timeouts. This works
157 * best if {@link #timedOut} is overridden to interrupt {@code sink}'s current operation.
158 */
159 public final Sink sink(final Sink sink) {
160 return new Sink() {
161 @Override public void write(Buffer source, long byteCount) throws IOException {
162 checkOffsetAndCount(source.size, 0, byteCount);
163
164 while (byteCount > 0L) {
165 // Count how many bytes to write. This loop guarantees we split on a segment boundary.
166 long toWrite = 0L;
167 for (Segment s = source.head; toWrite < TIMEOUT_WRITE_SIZE; s = s.next) {
168 int segmentSize = s.limit - s.pos;
169 toWrite += segmentSize;
170 if (toWrite >= byteCount) {
171 toWrite = byteCount;
172 break;
173 }
174 }
175
176 // Emit one write. Only this section is subject to the timeout.
177 boolean throwOnTimeout = false;
178 enter();
179 try {
180 sink.write(source, toWrite);
181 byteCount -= toWrite;
182 throwOnTimeout = true;
183 } catch (IOException e) {
184 throw exit(e);
185 } finally {
186 exit(throwOnTimeout);
187 }
188 }
189 }
190
191 @Override public void flush() throws IOException {
192 boolean throwOnTimeout = false;
193 enter();
194 try {
195 sink.flush();
196 throwOnTimeout = true;
197 } catch (IOException e) {
198 throw exit(e);
199 } finally {
200 exit(throwOnTimeout);
201 }
202 }
203
204 @Override public void close() throws IOException {
205 boolean throwOnTimeout = false;
206 enter();
207 try {
208 sink.close();
209 throwOnTimeout = true;
210 } catch (IOException e) {
211 throw exit(e);
212 } finally {
213 exit(throwOnTimeout);
214 }
215 }
216
217 @Override public Timeout timeout() {
218 return AsyncTimeout.this;
219 }
220
221 @Override public String toString() {
222 return "AsyncTimeout.sink(" + sink + ")";
223 }
224 };
225 }
226
227 /**
228 * Returns a new source that delegates to {@code source}, using this to implement timeouts. This
229 * works best if {@link #timedOut} is overridden to interrupt {@code sink}'s current operation.
230 */
231 public final Source source(final Source source) {
232 return new Source() {
233 @Override public long read(Buffer sink, long byteCount) throws IOException {
234 boolean throwOnTimeout = false;
235 enter();
236 try {
237 long result = source.read(sink, byteCount);
238 throwOnTimeout = true;
239 return result;
240 } catch (IOException e) {
241 throw exit(e);
242 } finally {
243 exit(throwOnTimeout);
244 }
245 }
246
247 @Override public void close() throws IOException {
248 boolean throwOnTimeout = false;
249 enter();
250 try {
251 source.close();
252 throwOnTimeout = true;
253 } catch (IOException e) {
254 throw exit(e);
255 } finally {
256 exit(throwOnTimeout);
257 }
258 }
259
260 @Override public Timeout timeout() {
261 return AsyncTimeout.this;
262 }
263
264 @Override public String toString() {
265 return "AsyncTimeout.source(" + source + ")";
266 }
267 };
268 }
269
270 /**
271 * Throws an IOException if {@code throwOnTimeout} is {@code true} and a timeout occurred. See
272 * {@link #newTimeoutException(java.io.IOException)} for the type of exception thrown.
273 */
274 final void exit(boolean throwOnTimeout) throws IOException {
275 boolean timedOut = exit();
276 if (timedOut && throwOnTimeout) throw newTimeoutException(null);
277 }
278
279 /**
280 * Returns either {@code cause} or an IOException that's caused by {@code cause} if a timeout
281 * occurred. See {@link #newTimeoutException(java.io.IOException)} for the type of exception
282 * returned.
283 */
284 final IOException exit(IOException cause) throws IOException {
285 if (!exit()) return cause;
286 return newTimeoutException(cause);
287 }
288
289 /**
290 * Returns an {@link IOException} to represent a timeout. By default this method returns {@link
291 * java.io.InterruptedIOException}. If {@code cause} is non-null it is set as the cause of the
292 * returned exception.
293 */
294 protected IOException newTimeoutException(@Nullable IOException cause) {
295 InterruptedIOException e = new InterruptedIOException("timeout");
296 if (cause != null) {
297 e.initCause(cause);
298 }
299 return e;
300 }
301
302 private static final class Watchdog extends Thread {
303 Watchdog() {
304 super("Okio Watchdog");
305 setDaemon(true);
306 }
307
308 public void run() {
309 while (true) {
310 try {
311 AsyncTimeout timedOut;
312 synchronized (AsyncTimeout.class) {
313 timedOut = awaitTimeout();
314
315 // Didn't find a node to interrupt. Try again.
316 if (timedOut == null) continue;
317
318 // The queue is completely empty. Let this thread exit and let another watchdog thread
319 // get created on the next call to scheduleTimeout().
320 if (timedOut == head) {
321 head = null;
322 return;
323 }
324 }
325
326 // Close the timed out node.
327 timedOut.timedOut();
328 } catch (InterruptedException ignored) {
329 }
330 }
331 }
332 }
333
334 /**
335 * Removes and returns the node at the head of the list, waiting for it to time out if necessary.
336 * This returns {@link #head} if there was no node at the head of the list when starting, and
337 * there continues to be no node after waiting {@code IDLE_TIMEOUT_NANOS}. It returns null if a
338 * new node was inserted while waiting. Otherwise this returns the node being waited on that has
339 * been removed.
340 */
341 static @Nullable AsyncTimeout awaitTimeout() throws InterruptedException {
342 // Get the next eligible node.
343 AsyncTimeout node = head.next;
344
345 // The queue is empty. Wait until either something is enqueued or the idle timeout elapses.
346 if (node == null) {
347 long startNanos = System.nanoTime();
348 AsyncTimeout.class.wait(IDLE_TIMEOUT_MILLIS);
349 return head.next == null && (System.nanoTime() - startNanos) >= IDLE_TIMEOUT_NANOS
350 ? head // The idle timeout elapsed.
351 : null; // The situation has changed.
352 }
353
354 long waitNanos = node.remainingNanos(System.nanoTime());
355
356 // The head of the queue hasn't timed out yet. Await that.
357 if (waitNanos > 0) {
358 // Waiting is made complicated by the fact that we work in nanoseconds,
359 // but the API wants (millis, nanos) in two arguments.
360 long waitMillis = waitNanos / 1000000L;
361 waitNanos -= (waitMillis * 1000000L);
362 AsyncTimeout.class.wait(waitMillis, (int) waitNanos);
363 return null;
364 }
365
366 // The head of the queue has timed out. Remove it.
367 head.next = node.next;
368 node.next = null;
369 return node;
370 }
371 }
372