1 /*
2  * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License").
5  * You may not use this file except in compliance with the License.
6  * A copy of the License is located at
7  *
8  *  http://aws.amazon.com/apache2.0
9  *
10  * or in the "license" file accompanying this file. This file is distributed
11  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12  * express or implied. See the License for the specific language governing
13  * permissions and limitations under the License.
14  */

15
16 package software.amazon.awssdk.core.retry.conditions;
17
18 import static software.amazon.awssdk.core.internal.retry.SdkDefaultRetrySetting.TOKEN_BUCKET_SIZE;
19
20 import java.util.Optional;
21 import software.amazon.awssdk.annotations.SdkPublicApi;
22 import software.amazon.awssdk.core.interceptor.ExecutionAttribute;
23 import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
24 import software.amazon.awssdk.core.internal.capacity.TokenBucket;
25 import software.amazon.awssdk.core.internal.retry.SdkDefaultRetrySetting;
26 import software.amazon.awssdk.core.retry.RetryMode;
27 import software.amazon.awssdk.core.retry.RetryPolicy;
28 import software.amazon.awssdk.core.retry.RetryPolicyContext;
29 import software.amazon.awssdk.utils.ToString;
30 import software.amazon.awssdk.utils.Validate;
31
32 /**
33  * A {@link RetryCondition} that limits the number of retries made by the SDK using a token bucket algorithm. "Tokens" are
34  * acquired from the bucket whenever {@link #shouldRetry} returns true, and are released to the bucket whenever
35  * {@link #requestSucceeded} or {@link #requestWillNotBeRetried} are invoked.
36  *
37  * <p>
38  * If "tokens" cannot be acquired from the bucket, it means too many requests have failed and the request will not be allowed
39  * to retry until we start to see initial non-retried requests succeed via {@link #requestSucceeded(RetryPolicyContext)}.
40  *
41  * <p>
42  * This prevents the client from holding the calling thread to retry when it's likely that it will fail anyway.
43  *
44  * <p>
45  * This is currently included in the default {@link RetryPolicy#aggregateRetryCondition()}, but can be disabled by setting the
46  * {@link RetryPolicy.Builder#retryCapacityCondition} to null.
47  */

48 @SdkPublicApi
49 public class TokenBucketRetryCondition implements RetryCondition {
50     private static final ExecutionAttribute<Capacity> LAST_ACQUIRED_CAPACITY =
51         new ExecutionAttribute<>("TokenBucketRetryCondition.LAST_ACQUIRED_CAPACITY");
52
53     private static final ExecutionAttribute<Integer> RETRY_COUNT_OF_LAST_CAPACITY_ACQUISITION =
54         new ExecutionAttribute<>("TokenBucketRetryCondition.RETRY_COUNT_OF_LAST_CAPACITY_ACQUISITION");
55
56     private final TokenBucket capacity;
57     private final TokenBucketExceptionCostFunction exceptionCostFunction;
58
59     private TokenBucketRetryCondition(Builder builder) {
60         this.capacity = new TokenBucket(Validate.notNull(builder.tokenBucketSize, "tokenBucketSize"));
61         this.exceptionCostFunction = Validate.notNull(builder.exceptionCostFunction, "exceptionCostFunction");
62     }
63
64     /**
65      * Create a condition using the {@link RetryMode#defaultRetryMode()}. This is equivalent to
66      * {@code forRetryMode(RetryMode.defaultRetryMode())}.
67      *
68      * <p>
69      * For more detailed control, see {@link #builder()}.
70      */

71     public static TokenBucketRetryCondition create() {
72         return forRetryMode(RetryMode.defaultRetryMode());
73     }
74
75     /**
76      * Create a condition using the configured {@link RetryMode}. The {@link RetryMode#LEGACY} does not subtract tokens from
77      * the token bucket when throttling exceptions are encountered. The {@link RetryMode#STANDARD} treats throttling and non-
78      * throttling exceptions as the same cost.
79      *
80      * <p>
81      * For more detailed control, see {@link #builder()}.
82      */

83     public static TokenBucketRetryCondition forRetryMode(RetryMode retryMode) {
84         return TokenBucketRetryCondition.builder()
85                                         .tokenBucketSize(TOKEN_BUCKET_SIZE)
86                                         .exceptionCostFunction(SdkDefaultRetrySetting.tokenCostFunction(retryMode))
87                                         .build();
88     }
89
90     /**
91      * Create a builder that allows fine-grained control over the token policy of this condition.
92      */

93     public static Builder builder() {
94         return new Builder();
95     }
96
97     /**
98      * If {@link #shouldRetry(RetryPolicyContext)} returned true for the provided execution, this method returns the
99      * {@link Capacity} consumed by the request.
100      */

101     public static Optional<Capacity> getCapacityForExecution(ExecutionAttributes attributes) {
102         return Optional.ofNullable(attributes.getAttribute(LAST_ACQUIRED_CAPACITY));
103     }
104
105     /**
106      * Retrieve the number of tokens currently available in the token bucket. This is a volatile snapshot of the current value.
107      * See {@link #getCapacityForExecution(ExecutionAttributes)} to see how much capacity was left in the bucket after a specific
108      * execution was considered.
109      */

110     public int tokensAvailable() {
111         return capacity.currentCapacity();
112     }
113
114     @Override
115     public boolean shouldRetry(RetryPolicyContext context) {
116         int costOfFailure = exceptionCostFunction.apply(context.exception());
117         Validate.isTrue(costOfFailure >= 0, "Cost of failure must not be negative, but was " + costOfFailure);
118
119         Optional<Capacity> capacity = this.capacity.tryAcquire(costOfFailure);
120
121         capacity.ifPresent(c -> {
122             context.executionAttributes().putAttribute(LAST_ACQUIRED_CAPACITY, c);
123             context.executionAttributes().putAttribute(RETRY_COUNT_OF_LAST_CAPACITY_ACQUISITION,
124                                                        context.retriesAttempted());
125         });
126
127         return capacity.isPresent();
128     }
129
130     @Override
131     public void requestWillNotBeRetried(RetryPolicyContext context) {
132         Integer lastAcquisitionRetryCount = context.executionAttributes().getAttribute(RETRY_COUNT_OF_LAST_CAPACITY_ACQUISITION);
133
134         if (lastAcquisitionRetryCount != null && context.retriesAttempted() == lastAcquisitionRetryCount) {
135             // We said yes to "should-retry", but something else caused it not to retry
136             Capacity lastAcquiredCapacity = context.executionAttributes().getAttribute(LAST_ACQUIRED_CAPACITY);
137             Validate.validState(lastAcquiredCapacity != null"Last acquired capacity should not be null.");
138             capacity.release(lastAcquiredCapacity.capacityAcquired());
139         }
140     }
141
142     @Override
143     public void requestSucceeded(RetryPolicyContext context) {
144         Capacity lastAcquiredCapacity = context.executionAttributes().getAttribute(LAST_ACQUIRED_CAPACITY);
145
146         if (lastAcquiredCapacity == null || lastAcquiredCapacity.capacityAcquired() == 0) {
147             capacity.release(1);
148         } else {
149             capacity.release(lastAcquiredCapacity.capacityAcquired());
150         }
151     }
152
153     @Override
154     public String toString() {
155         return ToString.builder("TokenBucketRetryCondition")
156                        .add("capacity", capacity.currentCapacity() + "/" + capacity.maxCapacity())
157                        .add("exceptionCostFunction", exceptionCostFunction)
158                        .build();
159     }
160
161     @Override
162     public boolean equals(Object o) {
163         if (this == o) {
164             return true;
165         }
166         if (o == null || getClass() != o.getClass()) {
167             return false;
168         }
169
170         TokenBucketRetryCondition that = (TokenBucketRetryCondition) o;
171
172         if (!capacity.equals(that.capacity)) {
173             return false;
174         }
175         return exceptionCostFunction.equals(that.exceptionCostFunction);
176     }
177
178     @Override
179     public int hashCode() {
180         int result = capacity.hashCode();
181         result = 31 * result + exceptionCostFunction.hashCode();
182         return result;
183     }
184
185     /**
186      * Configure and create a {@link TokenBucketRetryCondition}.
187      */

188     public static final class Builder {
189         private Integer tokenBucketSize;
190         private TokenBucketExceptionCostFunction exceptionCostFunction;
191
192         /**
193          * Create using {@link TokenBucketRetryCondition#builder()}.
194          */

195         private Builder() {
196         }
197
198         /**
199          * Specify the maximum number of tokens in the token bucket. This is also used as the initial value for the number of
200          * tokens in the bucket.
201          */

202         public Builder tokenBucketSize(int tokenBucketSize) {
203             this.tokenBucketSize = tokenBucketSize;
204             return this;
205         }
206
207         /**
208          * Configure a {@link TokenBucketExceptionCostFunction} that is used to calculate the number of tokens that should be
209          * taken out of the bucket for each specific exception. These tokens will be returned in case of successful retries.
210          */

211         public Builder exceptionCostFunction(TokenBucketExceptionCostFunction exceptionCostFunction) {
212             this.exceptionCostFunction = exceptionCostFunction;
213             return this;
214         }
215
216         /**
217          * Build a {@link TokenBucketRetryCondition} using the provided configuration.
218          */

219         public TokenBucketRetryCondition build() {
220             return new TokenBucketRetryCondition(this);
221         }
222     }
223
224     /**
225      * The number of tokens in the token bucket after a specific token acquisition succeeds. This can be retrieved via
226      * {@link #getCapacityForExecution(ExecutionAttributes)}.
227      */

228     public static final class Capacity {
229         private final int capacityAcquired;
230         private final int capacityRemaining;
231
232         private Capacity(Builder builder) {
233             this.capacityAcquired = Validate.notNull(builder.capacityAcquired, "capacityAcquired");
234             this.capacityRemaining = Validate.notNull(builder.capacityRemaining, "capacityRemaining");
235         }
236
237         public static Builder builder() {
238             return new Builder();
239         }
240
241         /**
242          * The number of tokens acquired by the last token acquisition.
243          */

244         public int capacityAcquired() {
245             return capacityAcquired;
246         }
247
248         /**
249          * The number of tokens in the token bucket.
250          */

251         public int capacityRemaining() {
252             return capacityRemaining;
253         }
254
255         public static class Builder {
256             private Integer capacityAcquired;
257             private Integer capacityRemaining;
258
259             private Builder() {
260             }
261
262             public Builder capacityAcquired(Integer capacityAcquired) {
263                 this.capacityAcquired = capacityAcquired;
264                 return this;
265             }
266
267             public Builder capacityRemaining(Integer capacityRemaining) {
268                 this.capacityRemaining = capacityRemaining;
269                 return this;
270             }
271
272             public Capacity build() {
273                 return new Capacity(this);
274             }
275         }
276     }
277 }
278