1 /*
2  * Copyright 2013-2019 the original author or authors.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      https://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */

16
17 package org.springframework.cloud.aws.messaging.listener;
18
19 import java.lang.reflect.Method;
20 import java.util.ArrayList;
21 import java.util.Arrays;
22 import java.util.Collections;
23 import java.util.Comparator;
24 import java.util.HashSet;
25 import java.util.List;
26 import java.util.Set;
27 import java.util.stream.Collectors;
28
29 import org.springframework.beans.factory.config.BeanExpressionContext;
30 import org.springframework.beans.factory.config.BeanExpressionResolver;
31 import org.springframework.beans.factory.config.ConfigurableBeanFactory;
32 import org.springframework.cloud.aws.messaging.listener.annotation.SqsListener;
33 import org.springframework.cloud.aws.messaging.listener.support.AcknowledgmentHandlerMethodArgumentResolver;
34 import org.springframework.cloud.aws.messaging.listener.support.VisibilityHandlerMethodArgumentResolver;
35 import org.springframework.cloud.aws.messaging.support.NotificationMessageArgumentResolver;
36 import org.springframework.cloud.aws.messaging.support.NotificationSubjectArgumentResolver;
37 import org.springframework.cloud.aws.messaging.support.converter.ObjectMessageConverter;
38 import org.springframework.context.ConfigurableApplicationContext;
39 import org.springframework.core.annotation.AnnotationUtils;
40 import org.springframework.messaging.Message;
41 import org.springframework.messaging.MessagingException;
42 import org.springframework.messaging.converter.CompositeMessageConverter;
43 import org.springframework.messaging.converter.MessageConverter;
44 import org.springframework.messaging.converter.SimpleMessageConverter;
45 import org.springframework.messaging.converter.StringMessageConverter;
46 import org.springframework.messaging.handler.HandlerMethod;
47 import org.springframework.messaging.handler.annotation.MessageMapping;
48 import org.springframework.messaging.handler.annotation.support.AnnotationExceptionHandlerMethodResolver;
49 import org.springframework.messaging.handler.annotation.support.HeaderMethodArgumentResolver;
50 import org.springframework.messaging.handler.annotation.support.HeadersMethodArgumentResolver;
51 import org.springframework.messaging.handler.annotation.support.MessageMethodArgumentResolver;
52 import org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver;
53 import org.springframework.messaging.handler.invocation.AbstractExceptionHandlerMethodResolver;
54 import org.springframework.messaging.handler.invocation.AbstractMethodMessageHandler;
55 import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;
56 import org.springframework.messaging.handler.invocation.HandlerMethodReturnValueHandler;
57 import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
58 import org.springframework.util.ClassUtils;
59 import org.springframework.util.comparator.ComparableComparator;
60 import org.springframework.validation.Errors;
61 import org.springframework.validation.Validator;
62
63 /**
64  * @author Agim Emruli
65  * @author Alain Sahli
66  * @author Maciej Walkowiak
67  * @since 1.0
68  */

69 public class QueueMessageHandler
70         extends AbstractMethodMessageHandler<QueueMessageHandler.MappingInformation> {
71
72     static final String LOGICAL_RESOURCE_ID = "LogicalResourceId";
73     static final String ACKNOWLEDGMENT = "Acknowledgment";
74     static final String VISIBILITY = "Visibility";
75
76     private final List<MessageConverter> messageConverters;
77
78     public QueueMessageHandler(List<MessageConverter> messageConverters) {
79         this.messageConverters = messageConverters;
80     }
81
82     public QueueMessageHandler() {
83         this.messageConverters = Collections.emptyList();
84     }
85
86     private static String[] wrapInStringArray(Object valueToWrap) {
87         return new String[] { valueToWrap.toString() };
88     }
89
90     @Override
91     protected List<? extends HandlerMethodArgumentResolver> initArgumentResolvers() {
92         List<HandlerMethodArgumentResolver> resolvers = new ArrayList<>(
93                 getCustomArgumentResolvers());
94
95         resolvers.add(new HeaderMethodArgumentResolver(nullnull));
96         resolvers.add(new HeadersMethodArgumentResolver());
97
98         resolvers.add(new NotificationSubjectArgumentResolver());
99         resolvers.add(new AcknowledgmentHandlerMethodArgumentResolver(ACKNOWLEDGMENT));
100         resolvers.add(new VisibilityHandlerMethodArgumentResolver(VISIBILITY));
101
102         CompositeMessageConverter compositeMessageConverter = createPayloadArgumentCompositeConverter();
103         resolvers.add(new NotificationMessageArgumentResolver(compositeMessageConverter));
104         resolvers.add(new MessageMethodArgumentResolver(
105                 this.messageConverters.isEmpty() ? new StringMessageConverter()
106                         : new CompositeMessageConverter(this.messageConverters)));
107         resolvers.add(new PayloadArgumentResolver(compositeMessageConverter,
108                 new NoOpValidator()));
109
110         return resolvers;
111     }
112
113     @Override
114     protected List<? extends HandlerMethodReturnValueHandler> initReturnValueHandlers() {
115
116         return new ArrayList<>(this.getCustomReturnValueHandlers());
117     }
118
119     @Override
120     protected boolean isHandler(Class<?> beanType) {
121         return true;
122     }
123
124     @Override
125     protected MappingInformation getMappingForMethod(Method method,
126             Class<?> handlerType) {
127         SqsListener sqsListenerAnnotation = AnnotationUtils.findAnnotation(method,
128                 SqsListener.class);
129         if (sqsListenerAnnotation != null && sqsListenerAnnotation.value().length > 0) {
130             if (sqsListenerAnnotation.deletionPolicy() == SqsMessageDeletionPolicy.NEVER
131                     && hasNoAcknowledgmentParameter(method.getParameterTypes())) {
132                 this.logger.warn("Listener method '" + method.getName() + "' in type '"
133                         + method.getDeclaringClass().getName()
134                         + "' has deletion policy 'NEVER' but does not have a parameter of type Acknowledgment.");
135             }
136             return new MappingInformation(
137                     resolveDestinationNames(sqsListenerAnnotation.value()),
138                     sqsListenerAnnotation.deletionPolicy());
139         }
140
141         MessageMapping messageMappingAnnotation = AnnotationUtils.findAnnotation(method,
142                 MessageMapping.class);
143         if (messageMappingAnnotation != null
144                 && messageMappingAnnotation.value().length > 0) {
145             return new MappingInformation(
146                     resolveDestinationNames(messageMappingAnnotation.value()),
147                     SqsMessageDeletionPolicy.ALWAYS);
148         }
149
150         return null;
151     }
152
153     private boolean hasNoAcknowledgmentParameter(Class<?>[] parameterTypes) {
154         for (Class<?> parameterType : parameterTypes) {
155             if (ClassUtils.isAssignable(Acknowledgment.class, parameterType)) {
156                 return false;
157             }
158         }
159
160         return true;
161     }
162
163     private Set<String> resolveDestinationNames(String[] destinationNames) {
164         Set<String> result = new HashSet<>(destinationNames.length);
165
166         for (String destinationName : destinationNames) {
167             result.addAll(Arrays.asList(resolveName(destinationName)));
168         }
169
170         return result;
171     }
172
173     private String[] resolveName(String name) {
174         if (!(getApplicationContext() instanceof ConfigurableApplicationContext)) {
175             return wrapInStringArray(name);
176         }
177
178         ConfigurableApplicationContext applicationContext = (ConfigurableApplicationContext) getApplicationContext();
179         ConfigurableBeanFactory configurableBeanFactory = applicationContext
180                 .getBeanFactory();
181
182         String placeholdersResolved = configurableBeanFactory.resolveEmbeddedValue(name);
183         BeanExpressionResolver exprResolver = configurableBeanFactory
184                 .getBeanExpressionResolver();
185         if (exprResolver == null) {
186             return wrapInStringArray(name);
187         }
188         Object result = exprResolver.evaluate(placeholdersResolved,
189                 new BeanExpressionContext(configurableBeanFactory, null));
190         if (result instanceof String[]) {
191             return (String[]) result;
192         }
193         else if (result != null) {
194             return wrapInStringArray(result);
195         }
196         else {
197             return wrapInStringArray(name);
198         }
199     }
200
201     @Override
202     protected Set<String> getDirectLookupDestinations(MappingInformation mapping) {
203         return mapping.getLogicalResourceIds();
204     }
205
206     @Override
207     protected String getDestination(Message<?> message) {
208         return message.getHeaders().get(LOGICAL_RESOURCE_ID).toString();
209     }
210
211     @Override
212     protected MappingInformation getMatchingMapping(MappingInformation mapping,
213             Message<?> message) {
214         if (mapping.getLogicalResourceIds().contains(getDestination(message))) {
215             return mapping;
216         }
217         else {
218             return null;
219         }
220     }
221
222     @Override
223     protected Comparator<MappingInformation> getMappingComparator(Message<?> message) {
224         return new ComparableComparator<>();
225     }
226
227     @Override
228     protected AbstractExceptionHandlerMethodResolver createExceptionHandlerMethodResolverFor(
229             Class<?> beanType) {
230         return new AnnotationExceptionHandlerMethodResolver(beanType);
231     }
232
233     @Override
234     protected void handleNoMatch(Set<MappingInformation> ts, String lookupDestination,
235             Message<?> message) {
236         this.logger.warn("No match found");
237     }
238
239     @Override
240     protected void processHandlerMethodException(HandlerMethod handlerMethod,
241             Exception ex, Message<?> message) {
242         InvocableHandlerMethod exceptionHandlerMethod = getExceptionHandlerMethod(
243                 handlerMethod, ex);
244         if (exceptionHandlerMethod != null) {
245             super.processHandlerMethodException(handlerMethod, ex, message);
246         }
247         else {
248             this.logger.error("An exception occurred while invoking the handler method",
249                     ex);
250         }
251         throw new MessagingException(
252                 "An exception occurred while invoking the handler method", ex);
253     }
254
255     private CompositeMessageConverter createPayloadArgumentCompositeConverter() {
256         List<MessageConverter> payloadArgumentConverters = new ArrayList<>(
257                 this.messageConverters);
258
259         ObjectMessageConverter objectMessageConverter = new ObjectMessageConverter();
260         objectMessageConverter.setStrictContentTypeMatch(true);
261         payloadArgumentConverters.add(objectMessageConverter);
262
263         payloadArgumentConverters.add(new SimpleMessageConverter());
264
265         return new CompositeMessageConverter(payloadArgumentConverters);
266     }
267
268     @SuppressWarnings("ComparableImplementedButEqualsNotOverridden")
269     protected static class MappingInformation implements Comparable<MappingInformation> {
270
271         private final Set<String> logicalResourceIds;
272
273         private final SqsMessageDeletionPolicy deletionPolicy;
274
275         public MappingInformation(Set<String> logicalResourceIds,
276                 SqsMessageDeletionPolicy deletionPolicy) {
277             this.logicalResourceIds = Collections.unmodifiableSet(logicalResourceIds);
278             this.deletionPolicy = deletionPolicy;
279         }
280
281         public Set<String> getLogicalResourceIds() {
282             return this.logicalResourceIds;
283         }
284
285         public SqsMessageDeletionPolicy getDeletionPolicy() {
286             return this.deletionPolicy;
287         }
288
289         @SuppressWarnings("NullableProblems")
290         @Override
291         public int compareTo(MappingInformation o) {
292             return 0;
293         }
294
295         @Override
296         public String toString() {
297             return logicalResourceIds.stream().collect(Collectors.joining(", "));
298         }
299
300     }
301
302     private static final class NoOpValidator implements Validator {
303
304         @Override
305         public boolean supports(Class<?> clazz) {
306             return false;
307         }
308
309         @Override
310         public void validate(Object target, Errors errors) {
311         }
312
313     }
314
315 }
316