1 /*
2  * Copyright (C) 2014 Square, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */

16 package okio;
17
18 import java.io.IOException;
19 import java.io.InterruptedIOException;
20 import java.util.concurrent.TimeUnit;
21 import javax.annotation.Nullable;
22
23 import static okio.Util.checkOffsetAndCount;
24
25 /**
26  * This timeout uses a background thread to take action exactly when the timeout occurs. Use this to
27  * implement timeouts where they aren't supported natively, such as to sockets that are blocked on
28  * writing.
29  *
30  * <p>Subclasses should override {@link #timedOut} to take action when a timeout occurs. This method
31  * will be invoked by the shared watchdog thread so it should not do any long-running operations.
32  * Otherwise we risk starving other timeouts from being triggered.
33  *
34  * <p>Use {@link #sink} and {@link #source} to apply this timeout to a stream. The returned value
35  * will apply the timeout to each operation on the wrapped stream.
36  *
37  * <p>Callers should call {@link #enter} before doing work that is subject to timeouts, and {@link
38  * #exit} afterwards. The return value of {@link #exit} indicates whether a timeout was triggered.
39  * Note that the call to {@link #timedOut} is asynchronous, and may be called after {@link #exit}.
40  */

41 public class AsyncTimeout extends Timeout {
42   /**
43    * Don't write more than 64 KiB of data at a time, give or take a segment. Otherwise slow
44    * connections may suffer timeouts even when they're making (slow) progress. Without this, writing
45    * a single 1 MiB buffer may never succeed on a sufficiently slow connection.
46    */

47   private static final int TIMEOUT_WRITE_SIZE = 64 * 1024;
48
49   /** Duration for the watchdog thread to be idle before it shuts itself down. */
50   private static final long IDLE_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(60);
51   private static final long IDLE_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(IDLE_TIMEOUT_MILLIS);
52
53   /**
54    * The watchdog thread processes a linked list of pending timeouts, sorted in the order to be
55    * triggered. This class synchronizes on AsyncTimeout.class. This lock guards the queue.
56    *
57    * <p>Head's 'next' points to the first element of the linked list. The first element is the next
58    * node to time out, or null if the queue is empty. The head is null until the watchdog thread is
59    * started and also after being idle for {@link #IDLE_TIMEOUT_MILLIS}.
60    */

61   static @Nullable AsyncTimeout head;
62
63   /** True if this node is currently in the queue. */
64   private boolean inQueue;
65
66   /** The next node in the linked list. */
67   private @Nullable AsyncTimeout next;
68
69   /** If scheduled, this is the time that the watchdog should time this out. */
70   private long timeoutAt;
71
72   public final void enter() {
73     if (inQueue) throw new IllegalStateException("Unbalanced enter/exit");
74     long timeoutNanos = timeoutNanos();
75     boolean hasDeadline = hasDeadline();
76     if (timeoutNanos == 0 && !hasDeadline) {
77       return// No timeout and no deadline? Don't bother with the queue.
78     }
79     inQueue = true;
80     scheduleTimeout(this, timeoutNanos, hasDeadline);
81   }
82
83   private static synchronized void scheduleTimeout(
84       AsyncTimeout node, long timeoutNanos, boolean hasDeadline) {
85     // Start the watchdog thread and create the head node when the first timeout is scheduled.
86     if (head == null) {
87       head = new AsyncTimeout();
88       new Watchdog().start();
89     }
90
91     long now = System.nanoTime();
92     if (timeoutNanos != 0 && hasDeadline) {
93       // Compute the earliest event; either timeout or deadline. Because nanoTime can wrap around,
94       // Math.min() is undefined for absolute values, but meaningful for relative ones.
95       node.timeoutAt = now + Math.min(timeoutNanos, node.deadlineNanoTime() - now);
96     } else if (timeoutNanos != 0) {
97       node.timeoutAt = now + timeoutNanos;
98     } else if (hasDeadline) {
99       node.timeoutAt = node.deadlineNanoTime();
100     } else {
101       throw new AssertionError();
102     }
103
104     // Insert the node in sorted order.
105     long remainingNanos = node.remainingNanos(now);
106     for (AsyncTimeout prev = head; true; prev = prev.next) {
107       if (prev.next == null || remainingNanos < prev.next.remainingNanos(now)) {
108         node.next = prev.next;
109         prev.next = node;
110         if (prev == head) {
111           AsyncTimeout.class.notify(); // Wake up the watchdog when inserting at the front.
112         }
113         break;
114       }
115     }
116   }
117
118   /** Returns true if the timeout occurred. */
119   public final boolean exit() {
120     if (!inQueue) return false;
121     inQueue = false;
122     return cancelScheduledTimeout(this);
123   }
124
125   /** Returns true if the timeout occurred. */
126   private static synchronized boolean cancelScheduledTimeout(AsyncTimeout node) {
127     // Remove the node from the linked list.
128     for (AsyncTimeout prev = head; prev != null; prev = prev.next) {
129       if (prev.next == node) {
130         prev.next = node.next;
131         node.next = null;
132         return false;
133       }
134     }
135
136     // The node wasn't found in the linked list: it must have timed out!
137     return true;
138   }
139
140   /**
141    * Returns the amount of time left until the time out. This will be negative if the timeout has
142    * elapsed and the timeout should occur immediately.
143    */

144   private long remainingNanos(long now) {
145     return timeoutAt - now;
146   }
147
148   /**
149    * Invoked by the watchdog thread when the time between calls to {@link #enter()} and {@link
150    * #exit()} has exceeded the timeout.
151    */

152   protected void timedOut() {
153   }
154
155   /**
156    * Returns a new sink that delegates to {@code sink}, using this to implement timeouts. This works
157    * best if {@link #timedOut} is overridden to interrupt {@code sink}'s current operation.
158    */

159   public final Sink sink(final Sink sink) {
160     return new Sink() {
161       @Override public void write(Buffer source, long byteCount) throws IOException {
162         checkOffsetAndCount(source.size, 0, byteCount);
163
164         while (byteCount > 0L) {
165           // Count how many bytes to write. This loop guarantees we split on a segment boundary.
166           long toWrite = 0L;
167           for (Segment s = source.head; toWrite < TIMEOUT_WRITE_SIZE; s = s.next) {
168             int segmentSize = s.limit - s.pos;
169             toWrite += segmentSize;
170             if (toWrite >= byteCount) {
171               toWrite = byteCount;
172               break;
173             }
174           }
175
176           // Emit one write. Only this section is subject to the timeout.
177           boolean throwOnTimeout = false;
178           enter();
179           try {
180             sink.write(source, toWrite);
181             byteCount -= toWrite;
182             throwOnTimeout = true;
183           } catch (IOException e) {
184             throw exit(e);
185           } finally {
186             exit(throwOnTimeout);
187           }
188         }
189       }
190
191       @Override public void flush() throws IOException {
192         boolean throwOnTimeout = false;
193         enter();
194         try {
195           sink.flush();
196           throwOnTimeout = true;
197         } catch (IOException e) {
198           throw exit(e);
199         } finally {
200           exit(throwOnTimeout);
201         }
202       }
203
204       @Override public void close() throws IOException {
205         boolean throwOnTimeout = false;
206         enter();
207         try {
208           sink.close();
209           throwOnTimeout = true;
210         } catch (IOException e) {
211           throw exit(e);
212         } finally {
213           exit(throwOnTimeout);
214         }
215       }
216
217       @Override public Timeout timeout() {
218         return AsyncTimeout.this;
219       }
220
221       @Override public String toString() {
222         return "AsyncTimeout.sink(" + sink + ")";
223       }
224     };
225   }
226
227   /**
228    * Returns a new source that delegates to {@code source}, using this to implement timeouts. This
229    * works best if {@link #timedOut} is overridden to interrupt {@code sink}'s current operation.
230    */

231   public final Source source(final Source source) {
232     return new Source() {
233       @Override public long read(Buffer sink, long byteCount) throws IOException {
234         boolean throwOnTimeout = false;
235         enter();
236         try {
237           long result = source.read(sink, byteCount);
238           throwOnTimeout = true;
239           return result;
240         } catch (IOException e) {
241           throw exit(e);
242         } finally {
243           exit(throwOnTimeout);
244         }
245       }
246
247       @Override public void close() throws IOException {
248         boolean throwOnTimeout = false;
249         enter();
250         try {
251           source.close();
252           throwOnTimeout = true;
253         } catch (IOException e) {
254           throw exit(e);
255         } finally {
256           exit(throwOnTimeout);
257         }
258       }
259
260       @Override public Timeout timeout() {
261         return AsyncTimeout.this;
262       }
263
264       @Override public String toString() {
265         return "AsyncTimeout.source(" + source + ")";
266       }
267     };
268   }
269
270   /**
271    * Throws an IOException if {@code throwOnTimeout} is {@code true} and a timeout occurred. See
272    * {@link #newTimeoutException(java.io.IOException)} for the type of exception thrown.
273    */

274   final void exit(boolean throwOnTimeout) throws IOException {
275     boolean timedOut = exit();
276     if (timedOut && throwOnTimeout) throw newTimeoutException(null);
277   }
278
279   /**
280    * Returns either {@code cause} or an IOException that's caused by {@code cause} if a timeout
281    * occurred. See {@link #newTimeoutException(java.io.IOException)} for the type of exception
282    * returned.
283    */

284   final IOException exit(IOException cause) throws IOException {
285     if (!exit()) return cause;
286     return newTimeoutException(cause);
287   }
288
289   /**
290    * Returns an {@link IOException} to represent a timeout. By default this method returns {@link
291    * java.io.InterruptedIOException}. If {@code cause} is non-null it is set as the cause of the
292    * returned exception.
293    */

294   protected IOException newTimeoutException(@Nullable IOException cause) {
295     InterruptedIOException e = new InterruptedIOException("timeout");
296     if (cause != null) {
297       e.initCause(cause);
298     }
299     return e;
300   }
301
302   private static final class Watchdog extends Thread {
303     Watchdog() {
304       super("Okio Watchdog");
305       setDaemon(true);
306     }
307
308     public void run() {
309       while (true) {
310         try {
311           AsyncTimeout timedOut;
312           synchronized (AsyncTimeout.class) {
313             timedOut = awaitTimeout();
314
315             // Didn't find a node to interrupt. Try again.
316             if (timedOut == nullcontinue;
317
318             // The queue is completely empty. Let this thread exit and let another watchdog thread
319             // get created on the next call to scheduleTimeout().
320             if (timedOut == head) {
321               head = null;
322               return;
323             }
324           }
325
326           // Close the timed out node.
327           timedOut.timedOut();
328         } catch (InterruptedException ignored) {
329         }
330       }
331     }
332   }
333
334   /**
335    * Removes and returns the node at the head of the list, waiting for it to time out if necessary.
336    * This returns {@link #head} if there was no node at the head of the list when starting, and
337    * there continues to be no node after waiting {@code IDLE_TIMEOUT_NANOS}. It returns null if a
338    * new node was inserted while waiting. Otherwise this returns the node being waited on that has
339    * been removed.
340    */

341   static @Nullable AsyncTimeout awaitTimeout() throws InterruptedException {
342     // Get the next eligible node.
343     AsyncTimeout node = head.next;
344
345     // The queue is empty. Wait until either something is enqueued or the idle timeout elapses.
346     if (node == null) {
347       long startNanos = System.nanoTime();
348       AsyncTimeout.class.wait(IDLE_TIMEOUT_MILLIS);
349       return head.next == null && (System.nanoTime() - startNanos) >= IDLE_TIMEOUT_NANOS
350           ? head  // The idle timeout elapsed.
351           : null// The situation has changed.
352     }
353
354     long waitNanos = node.remainingNanos(System.nanoTime());
355
356     // The head of the queue hasn't timed out yet. Await that.
357     if (waitNanos > 0) {
358       // Waiting is made complicated by the fact that we work in nanoseconds,
359       // but the API wants (millis, nanos) in two arguments.
360       long waitMillis = waitNanos / 1000000L;
361       waitNanos -= (waitMillis * 1000000L);
362       AsyncTimeout.class.wait(waitMillis, (int) waitNanos);
363       return null;
364     }
365
366     // The head of the queue has timed out. Remove it.
367     head.next = node.next;
368     node.next = null;
369     return node;
370   }
371 }
372