1 /*
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License").
5 * You may not use this file except in compliance with the License.
6 * A copy of the License is located at
7 *
8 * http://aws.amazon.com/apache2.0
9 *
10 * or in the "license" file accompanying this file. This file is distributed
11 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12 * express or implied. See the License for the specific language governing
13 * permissions and limitations under the License.
14 */
15
16 package software.amazon.awssdk.utils.cache;
17
18 import java.time.Duration;
19 import java.time.Instant;
20 import java.util.concurrent.TimeUnit;
21 import java.util.concurrent.locks.Lock;
22 import java.util.concurrent.locks.ReentrantLock;
23 import java.util.function.Supplier;
24 import software.amazon.awssdk.annotations.SdkProtectedApi;
25 import software.amazon.awssdk.utils.SdkAutoCloseable;
26 import software.amazon.awssdk.utils.Validate;
27
28 /**
29 * A wrapper for a {@link Supplier} that applies certain caching rules to the retrieval of its value, including customizable
30 * pre-fetching behaviors for updating values as they get close to expiring so that not all threads have to block to update the
31 * value.
32 *
33 * For example, the {@link OneCallerBlocks} strategy will have a single caller block to update the value, and the
34 * {@link NonBlocking} strategy maintains a thread pool for updating the value asynchronously in the background.
35 *
36 * This should be created using {@link #builder(Supplier)}.
37 */
38 @SdkProtectedApi
39 public final class CachedSupplier<T> implements Supplier<T>, SdkAutoCloseable {
40 /**
41 * Maximum time to wait for a blocking refresh lock before calling refresh again. This is to rate limit how many times we call
42 * refresh. In the ideal case, refresh always occurs in a timely fashion and only one thread actually does the refresh.
43 */
44 private static final Duration BLOCKING_REFRESH_MAX_WAIT = Duration.ofSeconds(5);
45
46 /**
47 * Used as a primitive form of rate limiting for the speed of our refreshes. This will make sure that the backing supplier has
48 * a period of time to update the value when the {@link RefreshResult#staleTime} arrives without getting called by every
49 * thread that initiates a {@link #get()}.
50 */
51 private final Lock refreshLock = new ReentrantLock();
52
53 /**
54 * The strategy we should use for pre-fetching the cached data when the {@link RefreshResult#prefetchTime} arrives. This is
55 * configured when the cache is created via {@link Builder#prefetchStrategy(PrefetchStrategy)}.
56 */
57 private final PrefetchStrategy prefetchStrategy;
58
59 /**
60 * The value currently stored in this cache.
61 */
62 private volatile RefreshResult<T> cachedValue = RefreshResult.builder((T) null)
63 .staleTime(Instant.MIN)
64 .prefetchTime(Instant.MIN)
65 .build();
66
67 /**
68 * The "expensive" to call supplier that is used to refresh the {@link #cachedValue}.
69 */
70 private final Supplier<RefreshResult<T>> valueSupplier;
71
72 private CachedSupplier(Builder<T> builder) {
73 this.valueSupplier = Validate.notNull(builder.supplier, "builder.supplier");
74 this.prefetchStrategy = Validate.notNull(builder.prefetchStrategy, "builder.prefetchStrategy");
75 }
76
77 /**
78 * Retrieve a builder that can be used for creating a {@link CachedSupplier}.
79 *
80 * @param valueSupplier The value supplier that should have its value cached.
81 */
82 public static <T> CachedSupplier.Builder<T> builder(Supplier<RefreshResult<T>> valueSupplier) {
83 return new CachedSupplier.Builder<>(valueSupplier);
84 }
85
86 @Override
87 public T get() {
88 if (cacheIsStale()) {
89 refreshCache();
90 } else if (shouldInitiateCachePrefetch()) {
91 prefetchCache();
92 }
93
94 return this.cachedValue.value();
95 }
96
97 /**
98 * Determines whether the value in this cache is stale, and all threads should block and wait for an updated value.
99 */
100 private boolean cacheIsStale() {
101 return Instant.now().isAfter(cachedValue.staleTime());
102 }
103
104 /**
105 * Determines whether the cached value's prefetch time has passed and we should initiate a pre-fetch on the value using the
106 * configured {@link #prefetchStrategy}.
107 */
108 private boolean shouldInitiateCachePrefetch() {
109 return Instant.now().isAfter(cachedValue.prefetchTime());
110 }
111
112 /**
113 * Initiate a pre-fetch of the data using the configured {@link #prefetchStrategy}.
114 */
115 private void prefetchCache() {
116 prefetchStrategy.prefetch(this::refreshCache);
117 }
118
119 /**
120 * Perform a blocking refresh of the cached value. This will rate limit synchronous refresh calls based on the
121 * {@link #BLOCKING_REFRESH_MAX_WAIT} time. This ensures that when the data needs to be updated, we won't immediately hammer
122 * the underlying value refresher if it can get back to us in a reasonable time.
123 */
124 private void refreshCache() {
125 try {
126 boolean lockAcquired = refreshLock.tryLock(BLOCKING_REFRESH_MAX_WAIT.getSeconds(), TimeUnit.SECONDS);
127
128 try {
129 // Make sure the value was not refreshed while we waited for the lock.
130 if (cacheIsStale() || shouldInitiateCachePrefetch()) {
131 // It wasn't, call the supplier to update it.
132 cachedValue = valueSupplier.get();
133 }
134 } finally {
135 if (lockAcquired) {
136 refreshLock.unlock();
137 }
138 }
139 } catch (InterruptedException e) {
140 handleInterruptedException("Interrupted waiting to refresh the value.", e);
141 }
142 }
143
144 private void handleInterruptedException(String message, InterruptedException cause) {
145 Thread.currentThread().interrupt();
146 throw new IllegalStateException(message, cause);
147 }
148
149 /**
150 * Free any resources consumed by the prefetch strategy this supplier is using.
151 */
152 @Override
153 public void close() {
154 prefetchStrategy.close();
155 }
156
157 /**
158 * A Builder for {@link CachedSupplier}, created by {@link #builder(Supplier)}.
159 */
160 public static final class Builder<T> {
161 private final Supplier<RefreshResult<T>> supplier;
162 private PrefetchStrategy prefetchStrategy = new OneCallerBlocks();
163
164 private Builder(Supplier<RefreshResult<T>> supplier) {
165 this.supplier = supplier;
166 }
167
168 /**
169 * Configure the way in which data in the cache should be pre-fetched when the data's {@link RefreshResult#prefetchTime()}
170 * arrives.
171 *
172 * By default, this uses the {@link OneCallerBlocks} strategy, which will block a single {@link #get()} caller to update
173 * the value.
174 */
175 public Builder<T> prefetchStrategy(PrefetchStrategy prefetchStrategy) {
176 this.prefetchStrategy = prefetchStrategy;
177 return this;
178 }
179
180 /**
181 * Create a {@link CachedSupplier} using the current configuration of this builder.
182 */
183 public CachedSupplier<T> build() {
184 return new CachedSupplier<>(this);
185 }
186 }
187
188 /**
189 * The way in which the cache should be pre-fetched when the data's {@link RefreshResult#prefetchTime()} arrives.
190 *
191 * @see OneCallerBlocks
192 * @see NonBlocking
193 */
194 @FunctionalInterface
195 public interface PrefetchStrategy extends SdkAutoCloseable {
196 /**
197 * Execute the provided value updater to update the cache. The specific implementation defines how this is invoked.
198 */
199 void prefetch(Runnable valueUpdater);
200
201 /**
202 * Free any resources associated with the strategy. This is invoked when the {@link CachedSupplier#close()} method is
203 * invoked.
204 */
205 default void close() {
206 }
207 }
208 }
209