1 /*
2  * Copyright 2006-2014 the original author or authors.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      https://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */

16
17 package org.springframework.retry.support;
18
19 import java.util.ArrayList;
20 import java.util.Arrays;
21 import java.util.List;
22
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25
26 import org.springframework.retry.ExhaustedRetryException;
27 import org.springframework.retry.RecoveryCallback;
28 import org.springframework.retry.RetryCallback;
29 import org.springframework.retry.RetryContext;
30 import org.springframework.retry.RetryException;
31 import org.springframework.retry.RetryListener;
32 import org.springframework.retry.RetryOperations;
33 import org.springframework.retry.RetryPolicy;
34 import org.springframework.retry.RetryState;
35 import org.springframework.retry.TerminatedRetryException;
36 import org.springframework.retry.backoff.BackOffContext;
37 import org.springframework.retry.backoff.BackOffInterruptedException;
38 import org.springframework.retry.backoff.BackOffPolicy;
39 import org.springframework.retry.backoff.NoBackOffPolicy;
40 import org.springframework.retry.policy.MapRetryContextCache;
41 import org.springframework.retry.policy.RetryContextCache;
42 import org.springframework.retry.policy.SimpleRetryPolicy;
43
44 /**
45  * Template class that simplifies the execution of operations with retry semantics.
46  * <p>
47  * Retryable operations are encapsulated in implementations of the {@link RetryCallback}
48  * interface and are executed using one of the supplied execute methods.
49  * <p>
50  * By default, an operation is retried if is throws any {@link Exception} or subclass of
51  * {@link Exception}. This behaviour can be changed by using the
52  * {@link #setRetryPolicy(RetryPolicy)} method.
53  * <p>
54  * Also by default, each operation is retried for a maximum of three attempts with no back
55  * off in between. This behaviour can be configured using the
56  * {@link #setRetryPolicy(RetryPolicy)} and {@link #setBackOffPolicy(BackOffPolicy)}
57  * properties. The {@link org.springframework.retry.backoff.BackOffPolicy} controls how
58  * long the pause is between each individual retry attempt.
59  * <p>
60  * This class is thread-safe and suitable for concurrent access when executing operations
61  * and when performing configuration changes. As such, it is possible to change the number
62  * of retries on the fly, as well as the {@link BackOffPolicy} used and no in progress
63  * retryable operations will be affected.
64  *
65  * @author Rob Harrop
66  * @author Dave Syer
67  * @author Gary Russell
68  * @author Artem Bilan
69  * @author Josh Long
70  */

71 public class RetryTemplate implements RetryOperations {
72
73     /**
74      * Retry context attribute name that indicates the context should be considered global
75      * state (never closed). TODO: convert this to a flag in the RetryState.
76      */

77     private static final String GLOBAL_STATE = "state.global";
78
79     protected final Log logger = LogFactory.getLog(getClass());
80
81     private volatile BackOffPolicy backOffPolicy = new NoBackOffPolicy();
82
83     private volatile RetryPolicy retryPolicy = new SimpleRetryPolicy(3);
84
85     private volatile RetryListener[] listeners = new RetryListener[0];
86
87     private RetryContextCache retryContextCache = new MapRetryContextCache();
88
89     private boolean throwLastExceptionOnExhausted;
90
91     /**
92      * @param throwLastExceptionOnExhausted the throwLastExceptionOnExhausted to set
93      */

94     public void setThrowLastExceptionOnExhausted(boolean throwLastExceptionOnExhausted) {
95         this.throwLastExceptionOnExhausted = throwLastExceptionOnExhausted;
96     }
97
98     /**
99      * Public setter for the {@link RetryContextCache}.
100      *
101      * @param retryContextCache the {@link RetryContextCache} to set.
102      */

103     public void setRetryContextCache(RetryContextCache retryContextCache) {
104         this.retryContextCache = retryContextCache;
105     }
106
107     /**
108      * Setter for listeners. The listeners are executed before and after a retry block
109      * (i.e. before and after all the attempts), and on an error (every attempt).
110      *
111      * @param listeners the {@link RetryListener}s
112      * @see RetryListener
113      */

114     public void setListeners(RetryListener[] listeners) {
115         this.listeners = Arrays.asList(listeners)
116                 .toArray(new RetryListener[listeners.length]);
117     }
118
119     /**
120      * Register an additional listener.
121      *
122      * @param listener the {@link RetryListener}
123      * @see #setListeners(RetryListener[])
124      */

125     public void registerListener(RetryListener listener) {
126         List<RetryListener> list = new ArrayList<RetryListener>(
127                 Arrays.asList(this.listeners));
128         list.add(listener);
129         this.listeners = list.toArray(new RetryListener[list.size()]);
130     }
131
132     /**
133      * Setter for {@link BackOffPolicy}.
134      *
135      * @param backOffPolicy the {@link BackOffPolicy}
136      */

137     public void setBackOffPolicy(BackOffPolicy backOffPolicy) {
138         this.backOffPolicy = backOffPolicy;
139     }
140
141     /**
142      * Setter for {@link RetryPolicy}.
143      *
144      * @param retryPolicy the {@link RetryPolicy}
145      */

146     public void setRetryPolicy(RetryPolicy retryPolicy) {
147         this.retryPolicy = retryPolicy;
148     }
149
150     /**
151      * Keep executing the callback until it either succeeds or the policy dictates that we
152      * stop, in which case the most recent exception thrown by the callback will be
153      * rethrown.
154      *
155      * @see RetryOperations#execute(RetryCallback)
156      * @param retryCallback the {@link RetryCallback}
157      *
158      * @throws TerminatedRetryException if the retry has been manually terminated by a
159      * listener.
160      */

161     @Override
162     public final <T, E extends Throwable> T execute(RetryCallback<T, E> retryCallback)
163             throws E {
164         return doExecute(retryCallback, nullnull);
165     }
166
167     /**
168      * Keep executing the callback until it either succeeds or the policy dictates that we
169      * stop, in which case the recovery callback will be executed.
170      *
171      * @see RetryOperations#execute(RetryCallback, RecoveryCallback)
172      * @param retryCallback the {@link RetryCallback}
173      * @param recoveryCallback the {@link RecoveryCallback}
174      * @throws TerminatedRetryException if the retry has been manually terminated by a
175      * listener.
176      */

177     @Override
178     public final <T, E extends Throwable> T execute(RetryCallback<T, E> retryCallback,
179             RecoveryCallback<T> recoveryCallback) throws E {
180         return doExecute(retryCallback, recoveryCallback, null);
181     }
182
183     /**
184      * Execute the callback once if the policy dictates that we can, re-throwing any
185      * exception encountered so that clients can re-present the same task later.
186      *
187      * @see RetryOperations#execute(RetryCallback, RetryState)
188      * @param retryCallback the {@link RetryCallback}
189      * @param retryState the {@link RetryState}
190      * @throws ExhaustedRetryException if the retry has been exhausted.
191      */

192     @Override
193     public final <T, E extends Throwable> T execute(RetryCallback<T, E> retryCallback,
194             RetryState retryState) throws E, ExhaustedRetryException {
195         return doExecute(retryCallback, null, retryState);
196     }
197
198     /**
199      * Execute the callback once if the policy dictates that we can, re-throwing any
200      * exception encountered so that clients can re-present the same task later.
201      *
202      * @see RetryOperations#execute(RetryCallback, RetryState)
203      * @param retryCallback the {@link RetryCallback}
204      * @param recoveryCallback the {@link RecoveryCallback}
205      * @param retryState the {@link RetryState}
206      */

207     @Override
208     public final <T, E extends Throwable> T execute(RetryCallback<T, E> retryCallback,
209             RecoveryCallback<T> recoveryCallback, RetryState retryState)
210             throws E, ExhaustedRetryException {
211         return doExecute(retryCallback, recoveryCallback, retryState);
212     }
213
214     /**
215      * Execute the callback once if the policy dictates that we can, otherwise execute the
216      * recovery callback.
217      * @param recoveryCallback the {@link RecoveryCallback}
218      * @param retryCallback the {@link RetryCallback}
219      * @param state the {@link RetryState}
220      * @param <T> the type of the return value
221      * @param <E> the exception type to throw
222      * @see RetryOperations#execute(RetryCallback, RecoveryCallback, RetryState)
223      * @throws ExhaustedRetryException if the retry has been exhausted.
224      * @throws E an exception if the retry operation fails
225      * @return T the retried value
226      */

227     protected <T, E extends Throwable> T doExecute(RetryCallback<T, E> retryCallback,
228             RecoveryCallback<T> recoveryCallback, RetryState state)
229             throws E, ExhaustedRetryException {
230
231         RetryPolicy retryPolicy = this.retryPolicy;
232         BackOffPolicy backOffPolicy = this.backOffPolicy;
233
234         // Allow the retry policy to initialise itself...
235         RetryContext context = open(retryPolicy, state);
236         if (this.logger.isTraceEnabled()) {
237             this.logger.trace("RetryContext retrieved: " + context);
238         }
239
240         // Make sure the context is available globally for clients who need
241         // it...
242         RetrySynchronizationManager.register(context);
243
244         Throwable lastException = null;
245
246         boolean exhausted = false;
247         try {
248
249             // Give clients a chance to enhance the context...
250             boolean running = doOpenInterceptors(retryCallback, context);
251
252             if (!running) {
253                 throw new TerminatedRetryException(
254                         "Retry terminated abnormally by interceptor before first attempt");
255             }
256
257             // Get or Start the backoff context...
258             BackOffContext backOffContext = null;
259             Object resource = context.getAttribute("backOffContext");
260
261             if (resource instanceof BackOffContext) {
262                 backOffContext = (BackOffContext) resource;
263             }
264
265             if (backOffContext == null) {
266                 backOffContext = backOffPolicy.start(context);
267                 if (backOffContext != null) {
268                     context.setAttribute("backOffContext", backOffContext);
269                 }
270             }
271
272             /*
273              * We allow the whole loop to be skipped if the policy or context already
274              * forbid the first try. This is used in the case of external retry to allow a
275              * recovery in handleRetryExhausted without the callback processing (which
276              * would throw an exception).
277              */

278             while (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
279
280                 try {
281                     if (this.logger.isDebugEnabled()) {
282                         this.logger.debug("Retry: count=" + context.getRetryCount());
283                     }
284                     // Reset the last exception, so if we are successful
285                     // the close interceptors will not think we failed...
286                     lastException = null;
287                     return retryCallback.doWithRetry(context);
288                 }
289                 catch (Throwable e) {
290
291                     lastException = e;
292
293                     try {
294                         registerThrowable(retryPolicy, state, context, e);
295                     }
296                     catch (Exception ex) {
297                         throw new TerminatedRetryException("Could not register throwable",
298                                 ex);
299                     }
300                     finally {
301                         doOnErrorInterceptors(retryCallback, context, e);
302                     }
303
304                     if (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
305                         try {
306                             backOffPolicy.backOff(backOffContext);
307                         }
308                         catch (BackOffInterruptedException ex) {
309                             lastException = e;
310                             // back off was prevented by another thread - fail the retry
311                             if (this.logger.isDebugEnabled()) {
312                                 this.logger
313                                         .debug("Abort retry because interrupted: count="
314                                                 + context.getRetryCount());
315                             }
316                             throw ex;
317                         }
318                     }
319
320                     if (this.logger.isDebugEnabled()) {
321                         this.logger.debug(
322                                 "Checking for rethrow: count=" + context.getRetryCount());
323                     }
324
325                     if (shouldRethrow(retryPolicy, context, state)) {
326                         if (this.logger.isDebugEnabled()) {
327                             this.logger.debug("Rethrow in retry for policy: count="
328                                     + context.getRetryCount());
329                         }
330                         throw RetryTemplate.<E>wrapIfNecessary(e);
331                     }
332
333                 }
334
335                 /*
336                  * A stateful attempt that can retry may rethrow the exception before now,
337                  * but if we get this far in a stateful retry there's a reason for it,
338                  * like a circuit breaker or a rollback classifier.
339                  */

340                 if (state != null && context.hasAttribute(GLOBAL_STATE)) {
341                     break;
342                 }
343             }
344
345             if (state == null && this.logger.isDebugEnabled()) {
346                 this.logger.debug(
347                         "Retry failed last attempt: count=" + context.getRetryCount());
348             }
349
350             exhausted = true;
351             return handleRetryExhausted(recoveryCallback, context, state);
352
353         }
354         catch (Throwable e) {
355             throw RetryTemplate.<E>wrapIfNecessary(e);
356         }
357         finally {
358             close(retryPolicy, context, state, lastException == null || exhausted);
359             doCloseInterceptors(retryCallback, context, lastException);
360             RetrySynchronizationManager.clear();
361         }
362
363     }
364
365     /**
366      * Decide whether to proceed with the ongoing retry attempt. This method is called
367      * before the {@link RetryCallback} is executed, but after the backoff and open
368      * interceptors.
369      *
370      * @param retryPolicy the policy to apply
371      * @param context the current retry context
372      * @return true if we can continue with the attempt
373      */

374     protected boolean canRetry(RetryPolicy retryPolicy, RetryContext context) {
375         return retryPolicy.canRetry(context);
376     }
377
378     /**
379      * Clean up the cache if necessary and close the context provided (if the flag
380      * indicates that processing was successful).
381      *
382      * @param retryPolicy the {@link RetryPolicy}
383      * @param context the {@link RetryContext}
384      * @param state the {@link RetryState}
385      * @param succeeded whether the close succeeded
386      */

387     protected void close(RetryPolicy retryPolicy, RetryContext context, RetryState state,
388             boolean succeeded) {
389         if (state != null) {
390             if (succeeded) {
391                 if (!context.hasAttribute(GLOBAL_STATE)) {
392                     this.retryContextCache.remove(state.getKey());
393                 }
394                 retryPolicy.close(context);
395                 context.setAttribute(RetryContext.CLOSED, true);
396             }
397         }
398         else {
399             retryPolicy.close(context);
400             context.setAttribute(RetryContext.CLOSED, true);
401         }
402     }
403
404     protected void registerThrowable(RetryPolicy retryPolicy, RetryState state,
405             RetryContext context, Throwable e) {
406         retryPolicy.registerThrowable(context, e);
407         registerContext(context, state);
408     }
409
410     private void registerContext(RetryContext context, RetryState state) {
411         if (state != null) {
412             Object key = state.getKey();
413             if (key != null) {
414                 if (context.getRetryCount() > 1
415                         && !this.retryContextCache.containsKey(key)) {
416                     throw new RetryException(
417                             "Inconsistent state for failed item key: cache key has changed. "
418                                     + "Consider whether equals() or hashCode() for the key might be inconsistent, "
419                                     + "or if you need to supply a better key");
420                 }
421                 this.retryContextCache.put(key, context);
422             }
423         }
424     }
425
426     /**
427      * Delegate to the {@link RetryPolicy} having checked in the cache for an existing
428      * value if the state is not null.
429      *
430      * @param state a {@link RetryState}
431      * @param retryPolicy a {@link RetryPolicy} to delegate the context creation
432      *
433      * @return a retry context, either a new one or the one used last time the same state
434      * was encountered
435      */

436     protected RetryContext open(RetryPolicy retryPolicy, RetryState state) {
437
438         if (state == null) {
439             return doOpenInternal(retryPolicy);
440         }
441
442         Object key = state.getKey();
443         if (state.isForceRefresh()) {
444             return doOpenInternal(retryPolicy, state);
445         }
446
447         // If there is no cache hit we can avoid the possible expense of the
448         // cache re-hydration.
449         if (!this.retryContextCache.containsKey(key)) {
450             // The cache is only used if there is a failure.
451             return doOpenInternal(retryPolicy, state);
452         }
453
454         RetryContext context = this.retryContextCache.get(key);
455         if (context == null) {
456             if (this.retryContextCache.containsKey(key)) {
457                 throw new RetryException(
458                         "Inconsistent state for failed item: no history found. "
459                                 + "Consider whether equals() or hashCode() for the item might be inconsistent, "
460                                 + "or if you need to supply a better ItemKeyGenerator");
461             }
462             // The cache could have been expired in between calls to
463             // containsKey(), so we have to live with this:
464             return doOpenInternal(retryPolicy, state);
465         }
466
467         // Start with a clean slate for state that others may be inspecting
468         context.removeAttribute(RetryContext.CLOSED);
469         context.removeAttribute(RetryContext.EXHAUSTED);
470         context.removeAttribute(RetryContext.RECOVERED);
471         return context;
472
473     }
474
475     private RetryContext doOpenInternal(RetryPolicy retryPolicy, RetryState state) {
476         RetryContext context = retryPolicy.open(RetrySynchronizationManager.getContext());
477         if (state != null) {
478             context.setAttribute(RetryContext.STATE_KEY, state.getKey());
479         }
480         if (context.hasAttribute(GLOBAL_STATE)) {
481             registerContext(context, state);
482         }
483         return context;
484     }
485
486     private RetryContext doOpenInternal(RetryPolicy retryPolicy) {
487         return doOpenInternal(retryPolicy, null);
488     }
489
490     /**
491      * Actions to take after final attempt has failed. If there is state clean up the
492      * cache. If there is a recovery callback, execute that and return its result.
493      * Otherwise throw an exception.
494      *
495      * @param recoveryCallback the callback for recovery (might be null)
496      * @param context the current retry context
497      * @param state the {@link RetryState}
498      * @param <T> the type to classify
499      * @throws Exception if the callback does, and if there is no callback and the state
500      * is null then the last exception from the context
501      * @throws ExhaustedRetryException if the state is not null and there is no recovery
502      * callback
503      * @return T the payload to return
504      */

505     protected <T> T handleRetryExhausted(RecoveryCallback<T> recoveryCallback,
506             RetryContext context, RetryState state) throws Throwable {
507         context.setAttribute(RetryContext.EXHAUSTED, true);
508         if (state != null && !context.hasAttribute(GLOBAL_STATE)) {
509             this.retryContextCache.remove(state.getKey());
510         }
511         if (recoveryCallback != null) {
512             T recovered = recoveryCallback.recover(context);
513             context.setAttribute(RetryContext.RECOVERED, true);
514             return recovered;
515         }
516         if (state != null) {
517             this.logger
518                     .debug("Retry exhausted after last attempt with no recovery path.");
519             rethrow(context, "Retry exhausted after last attempt with no recovery path");
520         }
521         throw wrapIfNecessary(context.getLastThrowable());
522     }
523
524     protected <E extends Throwable> void rethrow(RetryContext context, String message)
525             throws E {
526         if (this.throwLastExceptionOnExhausted) {
527             @SuppressWarnings("unchecked")
528             E rethrow = (E) context.getLastThrowable();
529             throw rethrow;
530         }
531         else {
532             throw new ExhaustedRetryException(message, context.getLastThrowable());
533         }
534     }
535
536     /**
537      * Extension point for subclasses to decide on behaviour after catching an exception
538      * in a {@link RetryCallback}. Normal stateless behaviour is not to rethrow, and if
539      * there is state we rethrow.
540      *
541      * @param retryPolicy the retry policy
542      * @param context the current context
543      * @param state the current retryState
544      * @return true if the state is not null but subclasses might choose otherwise
545      */

546     protected boolean shouldRethrow(RetryPolicy retryPolicy, RetryContext context,
547             RetryState state) {
548         return state != null && state.rollbackFor(context.getLastThrowable());
549     }
550
551     private <T, E extends Throwable> boolean doOpenInterceptors(
552             RetryCallback<T, E> callback, RetryContext context) {
553
554         boolean result = true;
555
556         for (RetryListener listener : this.listeners) {
557             result = result && listener.open(context, callback);
558         }
559
560         return result;
561
562     }
563
564     private <T, E extends Throwable> void doCloseInterceptors(
565             RetryCallback<T, E> callback, RetryContext context, Throwable lastException) {
566         for (int i = this.listeners.length; i-- > 0;) {
567             this.listeners[i].close(context, callback, lastException);
568         }
569     }
570
571     private <T, E extends Throwable> void doOnErrorInterceptors(
572             RetryCallback<T, E> callback, RetryContext context, Throwable throwable) {
573         for (int i = this.listeners.length; i-- > 0;) {
574             this.listeners[i].onError(context, callback, throwable);
575         }
576     }
577
578     /**
579      * Re-throws the original throwable if it is an Exception, and wraps non-exceptions
580      * into {@link RetryException}.
581      */

582     private static <E extends Throwable> E wrapIfNecessary(Throwable throwable)
583             throws RetryException {
584         if (throwable instanceof Error) {
585             throw (Error) throwable;
586         }
587         else if (throwable instanceof Exception) {
588             @SuppressWarnings("unchecked")
589             E rethrow = (E) throwable;
590             return rethrow;
591         }
592         else {
593             throw new RetryException("Exception in retry", throwable);
594         }
595     }
596
597 }
598