1 /*
2 * Copyright 2006-2014 the original author or authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * https://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package org.springframework.retry.support;
18
19 import java.util.ArrayList;
20 import java.util.Arrays;
21 import java.util.List;
22
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25
26 import org.springframework.retry.ExhaustedRetryException;
27 import org.springframework.retry.RecoveryCallback;
28 import org.springframework.retry.RetryCallback;
29 import org.springframework.retry.RetryContext;
30 import org.springframework.retry.RetryException;
31 import org.springframework.retry.RetryListener;
32 import org.springframework.retry.RetryOperations;
33 import org.springframework.retry.RetryPolicy;
34 import org.springframework.retry.RetryState;
35 import org.springframework.retry.TerminatedRetryException;
36 import org.springframework.retry.backoff.BackOffContext;
37 import org.springframework.retry.backoff.BackOffInterruptedException;
38 import org.springframework.retry.backoff.BackOffPolicy;
39 import org.springframework.retry.backoff.NoBackOffPolicy;
40 import org.springframework.retry.policy.MapRetryContextCache;
41 import org.springframework.retry.policy.RetryContextCache;
42 import org.springframework.retry.policy.SimpleRetryPolicy;
43
44 /**
45 * Template class that simplifies the execution of operations with retry semantics.
46 * <p>
47 * Retryable operations are encapsulated in implementations of the {@link RetryCallback}
48 * interface and are executed using one of the supplied execute methods.
49 * <p>
50 * By default, an operation is retried if is throws any {@link Exception} or subclass of
51 * {@link Exception}. This behaviour can be changed by using the
52 * {@link #setRetryPolicy(RetryPolicy)} method.
53 * <p>
54 * Also by default, each operation is retried for a maximum of three attempts with no back
55 * off in between. This behaviour can be configured using the
56 * {@link #setRetryPolicy(RetryPolicy)} and {@link #setBackOffPolicy(BackOffPolicy)}
57 * properties. The {@link org.springframework.retry.backoff.BackOffPolicy} controls how
58 * long the pause is between each individual retry attempt.
59 * <p>
60 * This class is thread-safe and suitable for concurrent access when executing operations
61 * and when performing configuration changes. As such, it is possible to change the number
62 * of retries on the fly, as well as the {@link BackOffPolicy} used and no in progress
63 * retryable operations will be affected.
64 *
65 * @author Rob Harrop
66 * @author Dave Syer
67 * @author Gary Russell
68 * @author Artem Bilan
69 * @author Josh Long
70 */
71 public class RetryTemplate implements RetryOperations {
72
73 /**
74 * Retry context attribute name that indicates the context should be considered global
75 * state (never closed). TODO: convert this to a flag in the RetryState.
76 */
77 private static final String GLOBAL_STATE = "state.global";
78
79 protected final Log logger = LogFactory.getLog(getClass());
80
81 private volatile BackOffPolicy backOffPolicy = new NoBackOffPolicy();
82
83 private volatile RetryPolicy retryPolicy = new SimpleRetryPolicy(3);
84
85 private volatile RetryListener[] listeners = new RetryListener[0];
86
87 private RetryContextCache retryContextCache = new MapRetryContextCache();
88
89 private boolean throwLastExceptionOnExhausted;
90
91 /**
92 * @param throwLastExceptionOnExhausted the throwLastExceptionOnExhausted to set
93 */
94 public void setThrowLastExceptionOnExhausted(boolean throwLastExceptionOnExhausted) {
95 this.throwLastExceptionOnExhausted = throwLastExceptionOnExhausted;
96 }
97
98 /**
99 * Public setter for the {@link RetryContextCache}.
100 *
101 * @param retryContextCache the {@link RetryContextCache} to set.
102 */
103 public void setRetryContextCache(RetryContextCache retryContextCache) {
104 this.retryContextCache = retryContextCache;
105 }
106
107 /**
108 * Setter for listeners. The listeners are executed before and after a retry block
109 * (i.e. before and after all the attempts), and on an error (every attempt).
110 *
111 * @param listeners the {@link RetryListener}s
112 * @see RetryListener
113 */
114 public void setListeners(RetryListener[] listeners) {
115 this.listeners = Arrays.asList(listeners)
116 .toArray(new RetryListener[listeners.length]);
117 }
118
119 /**
120 * Register an additional listener.
121 *
122 * @param listener the {@link RetryListener}
123 * @see #setListeners(RetryListener[])
124 */
125 public void registerListener(RetryListener listener) {
126 List<RetryListener> list = new ArrayList<RetryListener>(
127 Arrays.asList(this.listeners));
128 list.add(listener);
129 this.listeners = list.toArray(new RetryListener[list.size()]);
130 }
131
132 /**
133 * Setter for {@link BackOffPolicy}.
134 *
135 * @param backOffPolicy the {@link BackOffPolicy}
136 */
137 public void setBackOffPolicy(BackOffPolicy backOffPolicy) {
138 this.backOffPolicy = backOffPolicy;
139 }
140
141 /**
142 * Setter for {@link RetryPolicy}.
143 *
144 * @param retryPolicy the {@link RetryPolicy}
145 */
146 public void setRetryPolicy(RetryPolicy retryPolicy) {
147 this.retryPolicy = retryPolicy;
148 }
149
150 /**
151 * Keep executing the callback until it either succeeds or the policy dictates that we
152 * stop, in which case the most recent exception thrown by the callback will be
153 * rethrown.
154 *
155 * @see RetryOperations#execute(RetryCallback)
156 * @param retryCallback the {@link RetryCallback}
157 *
158 * @throws TerminatedRetryException if the retry has been manually terminated by a
159 * listener.
160 */
161 @Override
162 public final <T, E extends Throwable> T execute(RetryCallback<T, E> retryCallback)
163 throws E {
164 return doExecute(retryCallback, null, null);
165 }
166
167 /**
168 * Keep executing the callback until it either succeeds or the policy dictates that we
169 * stop, in which case the recovery callback will be executed.
170 *
171 * @see RetryOperations#execute(RetryCallback, RecoveryCallback)
172 * @param retryCallback the {@link RetryCallback}
173 * @param recoveryCallback the {@link RecoveryCallback}
174 * @throws TerminatedRetryException if the retry has been manually terminated by a
175 * listener.
176 */
177 @Override
178 public final <T, E extends Throwable> T execute(RetryCallback<T, E> retryCallback,
179 RecoveryCallback<T> recoveryCallback) throws E {
180 return doExecute(retryCallback, recoveryCallback, null);
181 }
182
183 /**
184 * Execute the callback once if the policy dictates that we can, re-throwing any
185 * exception encountered so that clients can re-present the same task later.
186 *
187 * @see RetryOperations#execute(RetryCallback, RetryState)
188 * @param retryCallback the {@link RetryCallback}
189 * @param retryState the {@link RetryState}
190 * @throws ExhaustedRetryException if the retry has been exhausted.
191 */
192 @Override
193 public final <T, E extends Throwable> T execute(RetryCallback<T, E> retryCallback,
194 RetryState retryState) throws E, ExhaustedRetryException {
195 return doExecute(retryCallback, null, retryState);
196 }
197
198 /**
199 * Execute the callback once if the policy dictates that we can, re-throwing any
200 * exception encountered so that clients can re-present the same task later.
201 *
202 * @see RetryOperations#execute(RetryCallback, RetryState)
203 * @param retryCallback the {@link RetryCallback}
204 * @param recoveryCallback the {@link RecoveryCallback}
205 * @param retryState the {@link RetryState}
206 */
207 @Override
208 public final <T, E extends Throwable> T execute(RetryCallback<T, E> retryCallback,
209 RecoveryCallback<T> recoveryCallback, RetryState retryState)
210 throws E, ExhaustedRetryException {
211 return doExecute(retryCallback, recoveryCallback, retryState);
212 }
213
214 /**
215 * Execute the callback once if the policy dictates that we can, otherwise execute the
216 * recovery callback.
217 * @param recoveryCallback the {@link RecoveryCallback}
218 * @param retryCallback the {@link RetryCallback}
219 * @param state the {@link RetryState}
220 * @param <T> the type of the return value
221 * @param <E> the exception type to throw
222 * @see RetryOperations#execute(RetryCallback, RecoveryCallback, RetryState)
223 * @throws ExhaustedRetryException if the retry has been exhausted.
224 * @throws E an exception if the retry operation fails
225 * @return T the retried value
226 */
227 protected <T, E extends Throwable> T doExecute(RetryCallback<T, E> retryCallback,
228 RecoveryCallback<T> recoveryCallback, RetryState state)
229 throws E, ExhaustedRetryException {
230
231 RetryPolicy retryPolicy = this.retryPolicy;
232 BackOffPolicy backOffPolicy = this.backOffPolicy;
233
234 // Allow the retry policy to initialise itself...
235 RetryContext context = open(retryPolicy, state);
236 if (this.logger.isTraceEnabled()) {
237 this.logger.trace("RetryContext retrieved: " + context);
238 }
239
240 // Make sure the context is available globally for clients who need
241 // it...
242 RetrySynchronizationManager.register(context);
243
244 Throwable lastException = null;
245
246 boolean exhausted = false;
247 try {
248
249 // Give clients a chance to enhance the context...
250 boolean running = doOpenInterceptors(retryCallback, context);
251
252 if (!running) {
253 throw new TerminatedRetryException(
254 "Retry terminated abnormally by interceptor before first attempt");
255 }
256
257 // Get or Start the backoff context...
258 BackOffContext backOffContext = null;
259 Object resource = context.getAttribute("backOffContext");
260
261 if (resource instanceof BackOffContext) {
262 backOffContext = (BackOffContext) resource;
263 }
264
265 if (backOffContext == null) {
266 backOffContext = backOffPolicy.start(context);
267 if (backOffContext != null) {
268 context.setAttribute("backOffContext", backOffContext);
269 }
270 }
271
272 /*
273 * We allow the whole loop to be skipped if the policy or context already
274 * forbid the first try. This is used in the case of external retry to allow a
275 * recovery in handleRetryExhausted without the callback processing (which
276 * would throw an exception).
277 */
278 while (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
279
280 try {
281 if (this.logger.isDebugEnabled()) {
282 this.logger.debug("Retry: count=" + context.getRetryCount());
283 }
284 // Reset the last exception, so if we are successful
285 // the close interceptors will not think we failed...
286 lastException = null;
287 return retryCallback.doWithRetry(context);
288 }
289 catch (Throwable e) {
290
291 lastException = e;
292
293 try {
294 registerThrowable(retryPolicy, state, context, e);
295 }
296 catch (Exception ex) {
297 throw new TerminatedRetryException("Could not register throwable",
298 ex);
299 }
300 finally {
301 doOnErrorInterceptors(retryCallback, context, e);
302 }
303
304 if (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
305 try {
306 backOffPolicy.backOff(backOffContext);
307 }
308 catch (BackOffInterruptedException ex) {
309 lastException = e;
310 // back off was prevented by another thread - fail the retry
311 if (this.logger.isDebugEnabled()) {
312 this.logger
313 .debug("Abort retry because interrupted: count="
314 + context.getRetryCount());
315 }
316 throw ex;
317 }
318 }
319
320 if (this.logger.isDebugEnabled()) {
321 this.logger.debug(
322 "Checking for rethrow: count=" + context.getRetryCount());
323 }
324
325 if (shouldRethrow(retryPolicy, context, state)) {
326 if (this.logger.isDebugEnabled()) {
327 this.logger.debug("Rethrow in retry for policy: count="
328 + context.getRetryCount());
329 }
330 throw RetryTemplate.<E>wrapIfNecessary(e);
331 }
332
333 }
334
335 /*
336 * A stateful attempt that can retry may rethrow the exception before now,
337 * but if we get this far in a stateful retry there's a reason for it,
338 * like a circuit breaker or a rollback classifier.
339 */
340 if (state != null && context.hasAttribute(GLOBAL_STATE)) {
341 break;
342 }
343 }
344
345 if (state == null && this.logger.isDebugEnabled()) {
346 this.logger.debug(
347 "Retry failed last attempt: count=" + context.getRetryCount());
348 }
349
350 exhausted = true;
351 return handleRetryExhausted(recoveryCallback, context, state);
352
353 }
354 catch (Throwable e) {
355 throw RetryTemplate.<E>wrapIfNecessary(e);
356 }
357 finally {
358 close(retryPolicy, context, state, lastException == null || exhausted);
359 doCloseInterceptors(retryCallback, context, lastException);
360 RetrySynchronizationManager.clear();
361 }
362
363 }
364
365 /**
366 * Decide whether to proceed with the ongoing retry attempt. This method is called
367 * before the {@link RetryCallback} is executed, but after the backoff and open
368 * interceptors.
369 *
370 * @param retryPolicy the policy to apply
371 * @param context the current retry context
372 * @return true if we can continue with the attempt
373 */
374 protected boolean canRetry(RetryPolicy retryPolicy, RetryContext context) {
375 return retryPolicy.canRetry(context);
376 }
377
378 /**
379 * Clean up the cache if necessary and close the context provided (if the flag
380 * indicates that processing was successful).
381 *
382 * @param retryPolicy the {@link RetryPolicy}
383 * @param context the {@link RetryContext}
384 * @param state the {@link RetryState}
385 * @param succeeded whether the close succeeded
386 */
387 protected void close(RetryPolicy retryPolicy, RetryContext context, RetryState state,
388 boolean succeeded) {
389 if (state != null) {
390 if (succeeded) {
391 if (!context.hasAttribute(GLOBAL_STATE)) {
392 this.retryContextCache.remove(state.getKey());
393 }
394 retryPolicy.close(context);
395 context.setAttribute(RetryContext.CLOSED, true);
396 }
397 }
398 else {
399 retryPolicy.close(context);
400 context.setAttribute(RetryContext.CLOSED, true);
401 }
402 }
403
404 protected void registerThrowable(RetryPolicy retryPolicy, RetryState state,
405 RetryContext context, Throwable e) {
406 retryPolicy.registerThrowable(context, e);
407 registerContext(context, state);
408 }
409
410 private void registerContext(RetryContext context, RetryState state) {
411 if (state != null) {
412 Object key = state.getKey();
413 if (key != null) {
414 if (context.getRetryCount() > 1
415 && !this.retryContextCache.containsKey(key)) {
416 throw new RetryException(
417 "Inconsistent state for failed item key: cache key has changed. "
418 + "Consider whether equals() or hashCode() for the key might be inconsistent, "
419 + "or if you need to supply a better key");
420 }
421 this.retryContextCache.put(key, context);
422 }
423 }
424 }
425
426 /**
427 * Delegate to the {@link RetryPolicy} having checked in the cache for an existing
428 * value if the state is not null.
429 *
430 * @param state a {@link RetryState}
431 * @param retryPolicy a {@link RetryPolicy} to delegate the context creation
432 *
433 * @return a retry context, either a new one or the one used last time the same state
434 * was encountered
435 */
436 protected RetryContext open(RetryPolicy retryPolicy, RetryState state) {
437
438 if (state == null) {
439 return doOpenInternal(retryPolicy);
440 }
441
442 Object key = state.getKey();
443 if (state.isForceRefresh()) {
444 return doOpenInternal(retryPolicy, state);
445 }
446
447 // If there is no cache hit we can avoid the possible expense of the
448 // cache re-hydration.
449 if (!this.retryContextCache.containsKey(key)) {
450 // The cache is only used if there is a failure.
451 return doOpenInternal(retryPolicy, state);
452 }
453
454 RetryContext context = this.retryContextCache.get(key);
455 if (context == null) {
456 if (this.retryContextCache.containsKey(key)) {
457 throw new RetryException(
458 "Inconsistent state for failed item: no history found. "
459 + "Consider whether equals() or hashCode() for the item might be inconsistent, "
460 + "or if you need to supply a better ItemKeyGenerator");
461 }
462 // The cache could have been expired in between calls to
463 // containsKey(), so we have to live with this:
464 return doOpenInternal(retryPolicy, state);
465 }
466
467 // Start with a clean slate for state that others may be inspecting
468 context.removeAttribute(RetryContext.CLOSED);
469 context.removeAttribute(RetryContext.EXHAUSTED);
470 context.removeAttribute(RetryContext.RECOVERED);
471 return context;
472
473 }
474
475 private RetryContext doOpenInternal(RetryPolicy retryPolicy, RetryState state) {
476 RetryContext context = retryPolicy.open(RetrySynchronizationManager.getContext());
477 if (state != null) {
478 context.setAttribute(RetryContext.STATE_KEY, state.getKey());
479 }
480 if (context.hasAttribute(GLOBAL_STATE)) {
481 registerContext(context, state);
482 }
483 return context;
484 }
485
486 private RetryContext doOpenInternal(RetryPolicy retryPolicy) {
487 return doOpenInternal(retryPolicy, null);
488 }
489
490 /**
491 * Actions to take after final attempt has failed. If there is state clean up the
492 * cache. If there is a recovery callback, execute that and return its result.
493 * Otherwise throw an exception.
494 *
495 * @param recoveryCallback the callback for recovery (might be null)
496 * @param context the current retry context
497 * @param state the {@link RetryState}
498 * @param <T> the type to classify
499 * @throws Exception if the callback does, and if there is no callback and the state
500 * is null then the last exception from the context
501 * @throws ExhaustedRetryException if the state is not null and there is no recovery
502 * callback
503 * @return T the payload to return
504 */
505 protected <T> T handleRetryExhausted(RecoveryCallback<T> recoveryCallback,
506 RetryContext context, RetryState state) throws Throwable {
507 context.setAttribute(RetryContext.EXHAUSTED, true);
508 if (state != null && !context.hasAttribute(GLOBAL_STATE)) {
509 this.retryContextCache.remove(state.getKey());
510 }
511 if (recoveryCallback != null) {
512 T recovered = recoveryCallback.recover(context);
513 context.setAttribute(RetryContext.RECOVERED, true);
514 return recovered;
515 }
516 if (state != null) {
517 this.logger
518 .debug("Retry exhausted after last attempt with no recovery path.");
519 rethrow(context, "Retry exhausted after last attempt with no recovery path");
520 }
521 throw wrapIfNecessary(context.getLastThrowable());
522 }
523
524 protected <E extends Throwable> void rethrow(RetryContext context, String message)
525 throws E {
526 if (this.throwLastExceptionOnExhausted) {
527 @SuppressWarnings("unchecked")
528 E rethrow = (E) context.getLastThrowable();
529 throw rethrow;
530 }
531 else {
532 throw new ExhaustedRetryException(message, context.getLastThrowable());
533 }
534 }
535
536 /**
537 * Extension point for subclasses to decide on behaviour after catching an exception
538 * in a {@link RetryCallback}. Normal stateless behaviour is not to rethrow, and if
539 * there is state we rethrow.
540 *
541 * @param retryPolicy the retry policy
542 * @param context the current context
543 * @param state the current retryState
544 * @return true if the state is not null but subclasses might choose otherwise
545 */
546 protected boolean shouldRethrow(RetryPolicy retryPolicy, RetryContext context,
547 RetryState state) {
548 return state != null && state.rollbackFor(context.getLastThrowable());
549 }
550
551 private <T, E extends Throwable> boolean doOpenInterceptors(
552 RetryCallback<T, E> callback, RetryContext context) {
553
554 boolean result = true;
555
556 for (RetryListener listener : this.listeners) {
557 result = result && listener.open(context, callback);
558 }
559
560 return result;
561
562 }
563
564 private <T, E extends Throwable> void doCloseInterceptors(
565 RetryCallback<T, E> callback, RetryContext context, Throwable lastException) {
566 for (int i = this.listeners.length; i-- > 0;) {
567 this.listeners[i].close(context, callback, lastException);
568 }
569 }
570
571 private <T, E extends Throwable> void doOnErrorInterceptors(
572 RetryCallback<T, E> callback, RetryContext context, Throwable throwable) {
573 for (int i = this.listeners.length; i-- > 0;) {
574 this.listeners[i].onError(context, callback, throwable);
575 }
576 }
577
578 /**
579 * Re-throws the original throwable if it is an Exception, and wraps non-exceptions
580 * into {@link RetryException}.
581 */
582 private static <E extends Throwable> E wrapIfNecessary(Throwable throwable)
583 throws RetryException {
584 if (throwable instanceof Error) {
585 throw (Error) throwable;
586 }
587 else if (throwable instanceof Exception) {
588 @SuppressWarnings("unchecked")
589 E rethrow = (E) throwable;
590 return rethrow;
591 }
592 else {
593 throw new RetryException("Exception in retry", throwable);
594 }
595 }
596
597 }
598