1
16
17 package org.springframework.cloud.aws.messaging.listener;
18
19 import java.lang.reflect.Method;
20 import java.util.ArrayList;
21 import java.util.Arrays;
22 import java.util.Collections;
23 import java.util.Comparator;
24 import java.util.HashSet;
25 import java.util.List;
26 import java.util.Set;
27 import java.util.stream.Collectors;
28
29 import org.springframework.beans.factory.config.BeanExpressionContext;
30 import org.springframework.beans.factory.config.BeanExpressionResolver;
31 import org.springframework.beans.factory.config.ConfigurableBeanFactory;
32 import org.springframework.cloud.aws.messaging.listener.annotation.SqsListener;
33 import org.springframework.cloud.aws.messaging.listener.support.AcknowledgmentHandlerMethodArgumentResolver;
34 import org.springframework.cloud.aws.messaging.listener.support.VisibilityHandlerMethodArgumentResolver;
35 import org.springframework.cloud.aws.messaging.support.NotificationMessageArgumentResolver;
36 import org.springframework.cloud.aws.messaging.support.NotificationSubjectArgumentResolver;
37 import org.springframework.cloud.aws.messaging.support.converter.ObjectMessageConverter;
38 import org.springframework.context.ConfigurableApplicationContext;
39 import org.springframework.core.annotation.AnnotationUtils;
40 import org.springframework.messaging.Message;
41 import org.springframework.messaging.MessagingException;
42 import org.springframework.messaging.converter.CompositeMessageConverter;
43 import org.springframework.messaging.converter.MessageConverter;
44 import org.springframework.messaging.converter.SimpleMessageConverter;
45 import org.springframework.messaging.converter.StringMessageConverter;
46 import org.springframework.messaging.handler.HandlerMethod;
47 import org.springframework.messaging.handler.annotation.MessageMapping;
48 import org.springframework.messaging.handler.annotation.support.AnnotationExceptionHandlerMethodResolver;
49 import org.springframework.messaging.handler.annotation.support.HeaderMethodArgumentResolver;
50 import org.springframework.messaging.handler.annotation.support.HeadersMethodArgumentResolver;
51 import org.springframework.messaging.handler.annotation.support.MessageMethodArgumentResolver;
52 import org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver;
53 import org.springframework.messaging.handler.invocation.AbstractExceptionHandlerMethodResolver;
54 import org.springframework.messaging.handler.invocation.AbstractMethodMessageHandler;
55 import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;
56 import org.springframework.messaging.handler.invocation.HandlerMethodReturnValueHandler;
57 import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
58 import org.springframework.util.ClassUtils;
59 import org.springframework.util.comparator.ComparableComparator;
60 import org.springframework.validation.Errors;
61 import org.springframework.validation.Validator;
62
63
69 public class QueueMessageHandler
70 extends AbstractMethodMessageHandler<QueueMessageHandler.MappingInformation> {
71
72 static final String LOGICAL_RESOURCE_ID = "LogicalResourceId";
73 static final String ACKNOWLEDGMENT = "Acknowledgment";
74 static final String VISIBILITY = "Visibility";
75
76 private final List<MessageConverter> messageConverters;
77
78 public QueueMessageHandler(List<MessageConverter> messageConverters) {
79 this.messageConverters = messageConverters;
80 }
81
82 public QueueMessageHandler() {
83 this.messageConverters = Collections.emptyList();
84 }
85
86 private static String[] wrapInStringArray(Object valueToWrap) {
87 return new String[] { valueToWrap.toString() };
88 }
89
90 @Override
91 protected List<? extends HandlerMethodArgumentResolver> initArgumentResolvers() {
92 List<HandlerMethodArgumentResolver> resolvers = new ArrayList<>(
93 getCustomArgumentResolvers());
94
95 resolvers.add(new HeaderMethodArgumentResolver(null, null));
96 resolvers.add(new HeadersMethodArgumentResolver());
97
98 resolvers.add(new NotificationSubjectArgumentResolver());
99 resolvers.add(new AcknowledgmentHandlerMethodArgumentResolver(ACKNOWLEDGMENT));
100 resolvers.add(new VisibilityHandlerMethodArgumentResolver(VISIBILITY));
101
102 CompositeMessageConverter compositeMessageConverter = createPayloadArgumentCompositeConverter();
103 resolvers.add(new NotificationMessageArgumentResolver(compositeMessageConverter));
104 resolvers.add(new MessageMethodArgumentResolver(
105 this.messageConverters.isEmpty() ? new StringMessageConverter()
106 : new CompositeMessageConverter(this.messageConverters)));
107 resolvers.add(new PayloadArgumentResolver(compositeMessageConverter,
108 new NoOpValidator()));
109
110 return resolvers;
111 }
112
113 @Override
114 protected List<? extends HandlerMethodReturnValueHandler> initReturnValueHandlers() {
115
116 return new ArrayList<>(this.getCustomReturnValueHandlers());
117 }
118
119 @Override
120 protected boolean isHandler(Class<?> beanType) {
121 return true;
122 }
123
124 @Override
125 protected MappingInformation getMappingForMethod(Method method,
126 Class<?> handlerType) {
127 SqsListener sqsListenerAnnotation = AnnotationUtils.findAnnotation(method,
128 SqsListener.class);
129 if (sqsListenerAnnotation != null && sqsListenerAnnotation.value().length > 0) {
130 if (sqsListenerAnnotation.deletionPolicy() == SqsMessageDeletionPolicy.NEVER
131 && hasNoAcknowledgmentParameter(method.getParameterTypes())) {
132 this.logger.warn("Listener method '" + method.getName() + "' in type '"
133 + method.getDeclaringClass().getName()
134 + "' has deletion policy 'NEVER' but does not have a parameter of type Acknowledgment.");
135 }
136 return new MappingInformation(
137 resolveDestinationNames(sqsListenerAnnotation.value()),
138 sqsListenerAnnotation.deletionPolicy());
139 }
140
141 MessageMapping messageMappingAnnotation = AnnotationUtils.findAnnotation(method,
142 MessageMapping.class);
143 if (messageMappingAnnotation != null
144 && messageMappingAnnotation.value().length > 0) {
145 return new MappingInformation(
146 resolveDestinationNames(messageMappingAnnotation.value()),
147 SqsMessageDeletionPolicy.ALWAYS);
148 }
149
150 return null;
151 }
152
153 private boolean hasNoAcknowledgmentParameter(Class<?>[] parameterTypes) {
154 for (Class<?> parameterType : parameterTypes) {
155 if (ClassUtils.isAssignable(Acknowledgment.class, parameterType)) {
156 return false;
157 }
158 }
159
160 return true;
161 }
162
163 private Set<String> resolveDestinationNames(String[] destinationNames) {
164 Set<String> result = new HashSet<>(destinationNames.length);
165
166 for (String destinationName : destinationNames) {
167 result.addAll(Arrays.asList(resolveName(destinationName)));
168 }
169
170 return result;
171 }
172
173 private String[] resolveName(String name) {
174 if (!(getApplicationContext() instanceof ConfigurableApplicationContext)) {
175 return wrapInStringArray(name);
176 }
177
178 ConfigurableApplicationContext applicationContext = (ConfigurableApplicationContext) getApplicationContext();
179 ConfigurableBeanFactory configurableBeanFactory = applicationContext
180 .getBeanFactory();
181
182 String placeholdersResolved = configurableBeanFactory.resolveEmbeddedValue(name);
183 BeanExpressionResolver exprResolver = configurableBeanFactory
184 .getBeanExpressionResolver();
185 if (exprResolver == null) {
186 return wrapInStringArray(name);
187 }
188 Object result = exprResolver.evaluate(placeholdersResolved,
189 new BeanExpressionContext(configurableBeanFactory, null));
190 if (result instanceof String[]) {
191 return (String[]) result;
192 }
193 else if (result != null) {
194 return wrapInStringArray(result);
195 }
196 else {
197 return wrapInStringArray(name);
198 }
199 }
200
201 @Override
202 protected Set<String> getDirectLookupDestinations(MappingInformation mapping) {
203 return mapping.getLogicalResourceIds();
204 }
205
206 @Override
207 protected String getDestination(Message<?> message) {
208 return message.getHeaders().get(LOGICAL_RESOURCE_ID).toString();
209 }
210
211 @Override
212 protected MappingInformation getMatchingMapping(MappingInformation mapping,
213 Message<?> message) {
214 if (mapping.getLogicalResourceIds().contains(getDestination(message))) {
215 return mapping;
216 }
217 else {
218 return null;
219 }
220 }
221
222 @Override
223 protected Comparator<MappingInformation> getMappingComparator(Message<?> message) {
224 return new ComparableComparator<>();
225 }
226
227 @Override
228 protected AbstractExceptionHandlerMethodResolver createExceptionHandlerMethodResolverFor(
229 Class<?> beanType) {
230 return new AnnotationExceptionHandlerMethodResolver(beanType);
231 }
232
233 @Override
234 protected void handleNoMatch(Set<MappingInformation> ts, String lookupDestination,
235 Message<?> message) {
236 this.logger.warn("No match found");
237 }
238
239 @Override
240 protected void processHandlerMethodException(HandlerMethod handlerMethod,
241 Exception ex, Message<?> message) {
242 InvocableHandlerMethod exceptionHandlerMethod = getExceptionHandlerMethod(
243 handlerMethod, ex);
244 if (exceptionHandlerMethod != null) {
245 super.processHandlerMethodException(handlerMethod, ex, message);
246 }
247 else {
248 this.logger.error("An exception occurred while invoking the handler method",
249 ex);
250 }
251 throw new MessagingException(
252 "An exception occurred while invoking the handler method", ex);
253 }
254
255 private CompositeMessageConverter createPayloadArgumentCompositeConverter() {
256 List<MessageConverter> payloadArgumentConverters = new ArrayList<>(
257 this.messageConverters);
258
259 ObjectMessageConverter objectMessageConverter = new ObjectMessageConverter();
260 objectMessageConverter.setStrictContentTypeMatch(true);
261 payloadArgumentConverters.add(objectMessageConverter);
262
263 payloadArgumentConverters.add(new SimpleMessageConverter());
264
265 return new CompositeMessageConverter(payloadArgumentConverters);
266 }
267
268 @SuppressWarnings("ComparableImplementedButEqualsNotOverridden")
269 protected static class MappingInformation implements Comparable<MappingInformation> {
270
271 private final Set<String> logicalResourceIds;
272
273 private final SqsMessageDeletionPolicy deletionPolicy;
274
275 public MappingInformation(Set<String> logicalResourceIds,
276 SqsMessageDeletionPolicy deletionPolicy) {
277 this.logicalResourceIds = Collections.unmodifiableSet(logicalResourceIds);
278 this.deletionPolicy = deletionPolicy;
279 }
280
281 public Set<String> getLogicalResourceIds() {
282 return this.logicalResourceIds;
283 }
284
285 public SqsMessageDeletionPolicy getDeletionPolicy() {
286 return this.deletionPolicy;
287 }
288
289 @SuppressWarnings("NullableProblems")
290 @Override
291 public int compareTo(MappingInformation o) {
292 return 0;
293 }
294
295 @Override
296 public String toString() {
297 return logicalResourceIds.stream().collect(Collectors.joining(", "));
298 }
299
300 }
301
302 private static final class NoOpValidator implements Validator {
303
304 @Override
305 public boolean supports(Class<?> clazz) {
306 return false;
307 }
308
309 @Override
310 public void validate(Object target, Errors errors) {
311 }
312
313 }
314
315 }
316