1 /*
2  * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License").
5  * You may not use this file except in compliance with the License.
6  * A copy of the License is located at
7  *
8  *  http://aws.amazon.com/apache2.0
9  *
10  * or in the "license" file accompanying this file. This file is distributed
11  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12  * express or implied. See the License for the specific language governing
13  * permissions and limitations under the License.
14  */

15
16 package software.amazon.awssdk.utils.cache;
17
18 import java.time.Duration;
19 import java.time.Instant;
20 import java.util.concurrent.TimeUnit;
21 import java.util.concurrent.locks.Lock;
22 import java.util.concurrent.locks.ReentrantLock;
23 import java.util.function.Supplier;
24 import software.amazon.awssdk.annotations.SdkProtectedApi;
25 import software.amazon.awssdk.utils.SdkAutoCloseable;
26 import software.amazon.awssdk.utils.Validate;
27
28 /**
29  * A wrapper for a {@link Supplier} that applies certain caching rules to the retrieval of its value, including customizable
30  * pre-fetching behaviors for updating values as they get close to expiring so that not all threads have to block to update the
31  * value.
32  *
33  * For example, the {@link OneCallerBlocks} strategy will have a single caller block to update the value, and the
34  * {@link NonBlocking} strategy maintains a thread pool for updating the value asynchronously in the background.
35  *
36  * This should be created using {@link #builder(Supplier)}.
37  */

38 @SdkProtectedApi
39 public final class CachedSupplier<T> implements Supplier<T>, SdkAutoCloseable {
40     /**
41      * Maximum time to wait for a blocking refresh lock before calling refresh again. This is to rate limit how many times we call
42      * refresh. In the ideal case, refresh always occurs in a timely fashion and only one thread actually does the refresh.
43      */

44     private static final Duration BLOCKING_REFRESH_MAX_WAIT = Duration.ofSeconds(5);
45
46     /**
47      * Used as a primitive form of rate limiting for the speed of our refreshes. This will make sure that the backing supplier has
48      * a period of time to update the value when the {@link RefreshResult#staleTime} arrives without getting called by every
49      * thread that initiates a {@link #get()}.
50      */

51     private final Lock refreshLock = new ReentrantLock();
52
53     /**
54      * The strategy we should use for pre-fetching the cached data when the {@link RefreshResult#prefetchTime} arrives. This is
55      * configured when the cache is created via {@link Builder#prefetchStrategy(PrefetchStrategy)}.
56      */

57     private final PrefetchStrategy prefetchStrategy;
58
59     /**
60      * The value currently stored in this cache.
61      */

62     private volatile RefreshResult<T> cachedValue = RefreshResult.builder((T) null)
63                                                                  .staleTime(Instant.MIN)
64                                                                  .prefetchTime(Instant.MIN)
65                                                                  .build();
66
67     /**
68      * The "expensive" to call supplier that is used to refresh the {@link #cachedValue}.
69      */

70     private final Supplier<RefreshResult<T>> valueSupplier;
71
72     private CachedSupplier(Builder<T> builder) {
73         this.valueSupplier = Validate.notNull(builder.supplier, "builder.supplier");
74         this.prefetchStrategy = Validate.notNull(builder.prefetchStrategy, "builder.prefetchStrategy");
75     }
76
77     /**
78      * Retrieve a builder that can be used for creating a {@link CachedSupplier}.
79      *
80      * @param valueSupplier The value supplier that should have its value cached.
81      */

82     public static <T> CachedSupplier.Builder<T> builder(Supplier<RefreshResult<T>> valueSupplier) {
83         return new CachedSupplier.Builder<>(valueSupplier);
84     }
85
86     @Override
87     public T get() {
88         if (cacheIsStale()) {
89             refreshCache();
90         } else if (shouldInitiateCachePrefetch()) {
91             prefetchCache();
92         }
93
94         return this.cachedValue.value();
95     }
96
97     /**
98      * Determines whether the value in this cache is stale, and all threads should block and wait for an updated value.
99      */

100     private boolean cacheIsStale() {
101         return Instant.now().isAfter(cachedValue.staleTime());
102     }
103
104     /**
105      * Determines whether the cached value's prefetch time has passed and we should initiate a pre-fetch on the value using the
106      * configured {@link #prefetchStrategy}.
107      */

108     private boolean shouldInitiateCachePrefetch() {
109         return Instant.now().isAfter(cachedValue.prefetchTime());
110     }
111
112     /**
113      * Initiate a pre-fetch of the data using the configured {@link #prefetchStrategy}.
114      */

115     private void prefetchCache() {
116         prefetchStrategy.prefetch(this::refreshCache);
117     }
118
119     /**
120      * Perform a blocking refresh of the cached value. This will rate limit synchronous refresh calls based on the
121      * {@link #BLOCKING_REFRESH_MAX_WAIT} time. This ensures that when the data needs to be updated, we won't immediately hammer
122      * the underlying value refresher if it can get back to us in a reasonable time.
123      */

124     private void refreshCache() {
125         try {
126             boolean lockAcquired = refreshLock.tryLock(BLOCKING_REFRESH_MAX_WAIT.getSeconds(), TimeUnit.SECONDS);
127
128             try {
129                 // Make sure the value was not refreshed while we waited for the lock.
130                 if (cacheIsStale() || shouldInitiateCachePrefetch()) {
131                     // It wasn't, call the supplier to update it.
132                     cachedValue = valueSupplier.get();
133                 }
134             } finally {
135                 if (lockAcquired) {
136                     refreshLock.unlock();
137                 }
138             }
139         } catch (InterruptedException e) {
140             handleInterruptedException("Interrupted waiting to refresh the value.", e);
141         }
142     }
143
144     private void handleInterruptedException(String message, InterruptedException cause) {
145         Thread.currentThread().interrupt();
146         throw new IllegalStateException(message, cause);
147     }
148
149     /**
150      * Free any resources consumed by the prefetch strategy this supplier is using.
151      */

152     @Override
153     public void close() {
154         prefetchStrategy.close();
155     }
156
157     /**
158      * A Builder for {@link CachedSupplier}, created by {@link #builder(Supplier)}.
159      */

160     public static final class Builder<T> {
161         private final Supplier<RefreshResult<T>> supplier;
162         private PrefetchStrategy prefetchStrategy = new OneCallerBlocks();
163
164         private Builder(Supplier<RefreshResult<T>> supplier) {
165             this.supplier = supplier;
166         }
167
168         /**
169          * Configure the way in which data in the cache should be pre-fetched when the data's {@link RefreshResult#prefetchTime()}
170          * arrives.
171          *
172          * By defaultthis uses the {@link OneCallerBlocks} strategy, which will block a single {@link #get()} caller to update
173          * the value.
174          */

175         public Builder<T> prefetchStrategy(PrefetchStrategy prefetchStrategy) {
176             this.prefetchStrategy = prefetchStrategy;
177             return this;
178         }
179
180         /**
181          * Create a {@link CachedSupplier} using the current configuration of this builder.
182          */

183         public CachedSupplier<T> build() {
184             return new CachedSupplier<>(this);
185         }
186     }
187
188     /**
189      * The way in which the cache should be pre-fetched when the data's {@link RefreshResult#prefetchTime()} arrives.
190      *
191      * @see OneCallerBlocks
192      * @see NonBlocking
193      */

194     @FunctionalInterface
195     public interface PrefetchStrategy extends SdkAutoCloseable {
196         /**
197          * Execute the provided value updater to update the cache. The specific implementation defines how this is invoked.
198          */

199         void prefetch(Runnable valueUpdater);
200
201         /**
202          * Free any resources associated with the strategy. This is invoked when the {@link CachedSupplier#close()} method is
203          * invoked.
204          */

205         default void close() {
206         }
207     }
208 }
209