1 /*
2  * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License").
5  * You may not use this file except in compliance with the License.
6  * A copy of the License is located at
7  *
8  *  http://aws.amazon.com/apache2.0
9  *
10  * or in the "license" file accompanying this file. This file is distributed
11  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12  * express or implied. See the License for the specific language governing
13  * permissions and limitations under the License.
14  */

15
16 package software.amazon.awssdk.core.internal.http.pipeline.stages;
17
18 import static software.amazon.awssdk.core.internal.http.timers.TimerUtils.resolveTimeoutInMillis;
19 import static software.amazon.awssdk.core.internal.http.timers.TimerUtils.timeSyncTaskIfNeeded;
20 import static software.amazon.awssdk.utils.FunctionalUtils.invokeSafely;
21
22 import java.time.Duration;
23 import java.util.concurrent.ScheduledExecutorService;
24 import software.amazon.awssdk.annotations.SdkInternalApi;
25 import software.amazon.awssdk.core.Response;
26 import software.amazon.awssdk.core.client.config.SdkClientConfiguration;
27 import software.amazon.awssdk.core.client.config.SdkClientOption;
28 import software.amazon.awssdk.core.exception.AbortedException;
29 import software.amazon.awssdk.core.exception.ApiCallTimeoutException;
30 import software.amazon.awssdk.core.exception.SdkInterruptedException;
31 import software.amazon.awssdk.core.internal.http.HttpClientDependencies;
32 import software.amazon.awssdk.core.internal.http.RequestExecutionContext;
33 import software.amazon.awssdk.core.internal.http.pipeline.RequestPipeline;
34 import software.amazon.awssdk.core.internal.http.pipeline.RequestToResponsePipeline;
35 import software.amazon.awssdk.core.internal.http.timers.SyncTimeoutTask;
36 import software.amazon.awssdk.core.internal.http.timers.TimeoutTracker;
37 import software.amazon.awssdk.http.SdkHttpFullRequest;
38
39 /**
40  * Wrapper around a {@link RequestPipeline} to manage the api call timeout feature.
41  */

42 @SdkInternalApi
43 public final class ApiCallTimeoutTrackingStage<OutputT> implements RequestToResponsePipeline<OutputT> {
44     private final RequestPipeline<SdkHttpFullRequest, Response<OutputT>> wrapped;
45     private final SdkClientConfiguration clientConfig;
46     private final ScheduledExecutorService timeoutExecutor;
47     private final Duration apiCallTimeout;
48
49     public ApiCallTimeoutTrackingStage(HttpClientDependencies dependencies,
50                                        RequestPipeline<SdkHttpFullRequest, Response<OutputT>> wrapped) {
51         this.wrapped = wrapped;
52         this.clientConfig = dependencies.clientConfiguration();
53         this.timeoutExecutor = dependencies.clientConfiguration().option(SdkClientOption.SCHEDULED_EXECUTOR_SERVICE);
54         this.apiCallTimeout = clientConfig.option(SdkClientOption.API_CALL_TIMEOUT);
55     }
56
57     @Override
58     public Response<OutputT> execute(SdkHttpFullRequest request, RequestExecutionContext context) throws Exception {
59         try {
60             return executeWithTimer(request, context);
61         } catch (Exception e) {
62             throw translatePipelineException(context, e);
63         }
64     }
65
66     /**
67      * Start and end client execution timer around the execution of the request. It's important
68      * that the client execution task is canceled before the InterruptedException is handled by
69      * {@link ApiCallTimeoutTrackingStage#wrapped#execute(SdkHttpFullRequest)} so the interrupt status
70      * doesn't leak out to the callers code
71      */

72     private Response<OutputT> executeWithTimer(SdkHttpFullRequest request, RequestExecutionContext context) throws Exception {
73         long timeoutInMillis = resolveTimeoutInMillis(context.requestConfig()::apiCallTimeout, apiCallTimeout);
74
75         TimeoutTracker timeoutTracker = timeSyncTaskIfNeeded(timeoutExecutor, timeoutInMillis, Thread.currentThread());
76
77         Response<OutputT> response;
78         try {
79             context.apiCallTimeoutTracker(timeoutTracker);
80             response = wrapped.execute(request, context);
81         } finally {
82             // Cancel the timeout tracker, guaranteeing that if it hasn't already executed and set this thread's
83             // interrupt flag, it won't do so later. Every code path executed after this line *must* call
84             // timeoutTracker.hasExecuted() and appropriately clear the interrupt flag if it returns true.
85             timeoutTracker.cancel();
86         }
87
88         if (timeoutTracker.hasExecuted()) {
89             // The timeout tracker executed before the call to cancel(), which means it set this thread's interrupt
90             // flag. However, the execute() call returned before we raised an InterruptedException, so just clear the
91             // interrupt flag and return the result we got back.
92             Thread.interrupted();
93         }
94         return response;
95     }
96
97     /**
98      * Take the given exception thrown from the wrapped pipeline and return a more appropriate
99      * timeout related exception based on its type and the the execution status.
100      *
101      * @param context The execution context.
102      * @param e The exception thrown from the inner pipeline.
103      * @return The translated exception.
104      */

105     private Exception translatePipelineException(RequestExecutionContext context, Exception e) {
106         if (e instanceof InterruptedException) {
107             return handleInterruptedException(context, (InterruptedException) e);
108         }
109
110         // Timeout tracker finished and interrupted this thread after wrapped.execute() last checked the interrupt flag,
111         // but before we called timeoutTracker.cancel(). Note that if hasExecuted() returns true, its guaranteed that
112         // the timeout tracker has set the interrupt flag, and if it returns false, it guarantees that it did not and
113         // will never set the interrupt flag.
114         if (context.apiCallTimeoutTracker().hasExecuted()) {
115             // Clear the interrupt flag. Since we already have an exception from the call, which may contain information
116             // that's useful to the caller, just return that instead of an ApiCallTimeoutException.
117             Thread.interrupted();
118         }
119
120         return e;
121     }
122
123     /**
124      * Determine if an interrupted exception is caused by the api call timeout task
125      * interrupting the current thread or some other task interrupting the thread for another
126      * purpose.
127      *
128      * @return {@link ApiCallTimeoutException} if the {@link InterruptedException} was
129      * caused by the {@link SyncTimeoutTask}. Otherwise re-interrupts the current thread
130      * and returns a {@link AbortedException} wrapping an {@link InterruptedException}
131      */

132     private RuntimeException handleInterruptedException(RequestExecutionContext context, InterruptedException e) {
133         if (e instanceof SdkInterruptedException) {
134             ((SdkInterruptedException) e).getResponseStream().ifPresent(r -> invokeSafely(r::close));
135         }
136         if (context.apiCallTimeoutTracker().hasExecuted()) {
137             // Clear the interrupt status
138             Thread.interrupted();
139             return generateApiCallTimeoutException(context);
140         }
141
142         Thread.currentThread().interrupt();
143         return AbortedException.create("Thread was interrupted", e);
144     }
145
146     private ApiCallTimeoutException generateApiCallTimeoutException(RequestExecutionContext context) {
147         return ApiCallTimeoutException.create(
148                 resolveTimeoutInMillis(context.requestConfig()::apiCallTimeout, apiCallTimeout));
149     }
150 }
151