1 /*
2  * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License").
5  * You may not use this file except in compliance with the License.
6  * A copy of the License is located at
7  *
8  *  http://aws.amazon.com/apache2.0
9  *
10  * or in the "license" file accompanying this file. This file is distributed
11  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12  * express or implied. See the License for the specific language governing
13  * permissions and limitations under the License.
14  */

15
16 package software.amazon.awssdk.core.internal.http.pipeline.stages;
17
18 import static software.amazon.awssdk.core.internal.http.timers.TimerUtils.resolveTimeoutInMillis;
19 import static software.amazon.awssdk.core.internal.http.timers.TimerUtils.timeSyncTaskIfNeeded;
20 import static software.amazon.awssdk.utils.FunctionalUtils.invokeSafely;
21
22 import java.time.Duration;
23 import java.util.concurrent.ScheduledExecutorService;
24 import software.amazon.awssdk.annotations.SdkInternalApi;
25 import software.amazon.awssdk.core.Response;
26 import software.amazon.awssdk.core.client.config.SdkClientOption;
27 import software.amazon.awssdk.core.exception.AbortedException;
28 import software.amazon.awssdk.core.exception.ApiCallAttemptTimeoutException;
29 import software.amazon.awssdk.core.exception.ApiCallTimeoutException;
30 import software.amazon.awssdk.core.exception.SdkInterruptedException;
31 import software.amazon.awssdk.core.internal.http.HttpClientDependencies;
32 import software.amazon.awssdk.core.internal.http.RequestExecutionContext;
33 import software.amazon.awssdk.core.internal.http.pipeline.RequestPipeline;
34 import software.amazon.awssdk.core.internal.http.pipeline.RequestToResponsePipeline;
35 import software.amazon.awssdk.core.internal.http.timers.SyncTimeoutTask;
36 import software.amazon.awssdk.core.internal.http.timers.TimeoutTracker;
37 import software.amazon.awssdk.http.SdkHttpFullRequest;
38
39 /**
40  * Wrapper around a {@link RequestPipeline} to manage the api call attempt timeout feature.
41  */

42 @SdkInternalApi
43 public final class ApiCallAttemptTimeoutTrackingStage<OutputT> implements RequestToResponsePipeline<OutputT> {
44
45     private final RequestPipeline<SdkHttpFullRequest, Response<OutputT>> wrapped;
46     private final Duration apiCallAttemptTimeout;
47     private final ScheduledExecutorService timeoutExecutor;
48
49     public ApiCallAttemptTimeoutTrackingStage(HttpClientDependencies dependencies,
50                                               RequestPipeline<SdkHttpFullRequest,
51                                               Response<OutputT>> wrapped) {
52         this.wrapped = wrapped;
53         this.timeoutExecutor = dependencies.clientConfiguration().option(SdkClientOption.SCHEDULED_EXECUTOR_SERVICE);
54         this.apiCallAttemptTimeout = dependencies.clientConfiguration().option(SdkClientOption.API_CALL_ATTEMPT_TIMEOUT);
55     }
56
57     /**
58      * Start and end api call attempt timer around the execution of the api call attempt. It's important
59      * that the client execution task is canceled before the InterruptedException is handled by
60      * {@link ApiCallTimeoutTrackingStage#wrapped#execute(SdkHttpFullRequest)} so the interrupt status doesn't leak out to the
61      * callers code.
62      */

63     @Override
64     public Response<OutputT> execute(SdkHttpFullRequest request, RequestExecutionContext context) throws Exception {
65         try {
66             long timeoutInMillis = resolveTimeoutInMillis(context.requestConfig()::apiCallAttemptTimeout, apiCallAttemptTimeout);
67
68             TimeoutTracker timeoutTracker = timeSyncTaskIfNeeded(timeoutExecutor, timeoutInMillis, Thread.currentThread());
69
70             Response<OutputT> response;
71             try {
72                 context.apiCallAttemptTimeoutTracker(timeoutTracker);
73                 response = wrapped.execute(request, context);
74             } finally {
75                 // Cancel the timeout tracker, guaranteeing that if it hasn't already executed and set this thread's
76                 // interrupt flag, it won't do so later. Every code path executed after this line *must* call
77                 // timeoutTracker.hasExecuted() and appropriately clear the interrupt flag if it returns true.
78                 timeoutTracker.cancel();
79             }
80
81             if (timeoutTracker.hasExecuted()) {
82                 // The timeout tracker executed before the call to cancel(), which means it set this thread's interrupt
83                 // flag. However, the execute() call returned before we raised an InterruptedException, so just clear
84                 // the interrupt flag and return the result we got back.
85                 Thread.interrupted();
86             }
87             return response;
88
89         } catch (Exception e) {
90             throw translatePipelineException(context, e);
91         }
92     }
93
94
95
96     /**
97      * Take the given exception thrown from the wrapped pipeline and return a more appropriate
98      * timeout related exception based on its type and the the execution status.
99      *
100      * @param context The execution context.
101      * @param e The exception thrown from the inner pipeline.
102      * @return The translated exception.
103      */

104     private Exception translatePipelineException(RequestExecutionContext context, Exception e) {
105         if (e instanceof InterruptedException) {
106             return handleInterruptedException(context, (InterruptedException) e);
107         }
108
109         // Timeout tracker finished and interrupted this thread after wrapped.execute() last checked the interrupt flag,
110         // but before we called timeoutTracker.cancel(). Note that if hasExecuted() returns true, its guaranteed that
111         // the timeout tracker has set the interrupt flag, and if it returns false, it guarantees that it did not and
112         // will never set the interrupt flag.
113         if (context.apiCallAttemptTimeoutTracker().hasExecuted()) {
114             // Clear the interrupt flag. Since we already have an exception from the call, which may contain information
115             // that's useful to the caller, just return that instead of an ApiCallTimeoutException.
116             Thread.interrupted();
117         }
118
119         return e;
120     }
121
122     /**
123      * Determine if an interrupted exception is caused by the api call timeout task
124      * interrupting the current thread or some other task interrupting the thread for another
125      * purpose.
126      *
127      * @return {@link ApiCallTimeoutException} if the {@link InterruptedException} was
128      * caused by the {@link SyncTimeoutTask}. Otherwise re-interrupts the current thread
129      * and returns a {@link AbortedException} wrapping an {@link InterruptedException}
130      */

131     private RuntimeException handleInterruptedException(RequestExecutionContext context, InterruptedException e) {
132         if (e instanceof SdkInterruptedException) {
133             ((SdkInterruptedException) e).getResponseStream().ifPresent(r -> invokeSafely(r::close));
134         }
135         if (context.apiCallAttemptTimeoutTracker().hasExecuted()) {
136             // Clear the interrupt status
137             Thread.interrupted();
138             return generateApiCallAttemptTimeoutException(context);
139         }
140
141         Thread.currentThread().interrupt();
142         return AbortedException.create("Thread was interrupted", e);
143     }
144
145     private ApiCallAttemptTimeoutException generateApiCallAttemptTimeoutException(RequestExecutionContext context) {
146         return ApiCallAttemptTimeoutException.create(
147                 resolveTimeoutInMillis(context.requestConfig()::apiCallAttemptTimeout, apiCallAttemptTimeout));
148     }
149 }
150