1 /*
2  * Copyright 2013-2019 the original author or authors.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      https://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */

16
17 package org.springframework.cloud.aws.messaging.core;
18
19 import com.amazonaws.services.sqs.AmazonSQS;
20 import com.amazonaws.services.sqs.AmazonSQSAsync;
21
22 import org.springframework.cloud.aws.core.env.ResourceIdResolver;
23 import org.springframework.cloud.aws.messaging.core.support.AbstractMessageChannelMessagingSendingTemplate;
24 import org.springframework.cloud.aws.messaging.support.destination.DynamicQueueUrlDestinationResolver;
25 import org.springframework.messaging.Message;
26 import org.springframework.messaging.MessagingException;
27 import org.springframework.messaging.converter.CompositeMessageConverter;
28 import org.springframework.messaging.converter.MessageConverter;
29 import org.springframework.messaging.converter.StringMessageConverter;
30 import org.springframework.messaging.core.DestinationResolver;
31 import org.springframework.messaging.core.DestinationResolvingMessageReceivingOperations;
32
33 /**
34  * <b>IMPORTANT</b>: For the message conversion this class always tries to first use the
35  * {@link StringMessageConverter} as it fits the underlying message channel type. If a
36  * message converter is set through the constructor then it is added to a composite
37  * converter already containing the {@link StringMessageConverter}. If
38  * {@link QueueMessagingTemplate#setMessageConverter(MessageConverter)} is used, then the
39  * {@link CompositeMessageConverter} containing the {@link StringMessageConverter} will
40  * not be used anymore and the {@code String} payloads are also going to be converted with
41  * the set converter.
42  *
43  * @author Agim Emruli
44  * @author Alain Sahli
45  * @since 1.0
46  */

47 public class QueueMessagingTemplate
48         extends AbstractMessageChannelMessagingSendingTemplate<QueueMessageChannel>
49         implements DestinationResolvingMessageReceivingOperations<QueueMessageChannel> {
50
51     private final AmazonSQSAsync amazonSqs;
52
53     public QueueMessagingTemplate(AmazonSQSAsync amazonSqs) {
54         this(amazonSqs, (ResourceIdResolver) nullnull);
55     }
56
57     public QueueMessagingTemplate(AmazonSQSAsync amazonSqs,
58             ResourceIdResolver resourceIdResolver) {
59         this(amazonSqs, resourceIdResolver, null);
60     }
61
62     /**
63      * Initializes the messaging template by configuring the resource Id resolver as well
64      * as the message converter. Uses the {@link DynamicQueueUrlDestinationResolver} with
65      * the default configuration to resolve destination names.
66      * @param amazonSqs The {@link AmazonSQS} client, cannot be {@code null}.
67      * @param resourceIdResolver The {@link ResourceIdResolver} to be used for resolving
68      * logical queue names.
69      * @param messageConverter A {@link MessageConverter} that is going to be added to the
70      * composite converter.
71      */

72     public QueueMessagingTemplate(AmazonSQSAsync amazonSqs,
73             ResourceIdResolver resourceIdResolver, MessageConverter messageConverter) {
74         this(amazonSqs,
75                 new DynamicQueueUrlDestinationResolver(amazonSqs, resourceIdResolver),
76                 messageConverter);
77     }
78
79     /**
80      * Initializes the messaging template by configuring the destination resolver as well
81      * as the message converter. Uses the {@link DynamicQueueUrlDestinationResolver} with
82      * the default configuration to resolve destination names.
83      * @param amazonSqs The {@link AmazonSQS} client, cannot be {@code null}.
84      * @param destinationResolver A destination resolver implementation to resolve queue
85      * names into queue urls. The destination resolver will be wrapped into a
86      * {@link org.springframework.messaging.core.CachingDestinationResolverProxy} to avoid
87      * duplicate queue url resolutions.
88      * @param messageConverter A {@link MessageConverter} that is going to be added to the
89      * composite converter.
90      */

91     public QueueMessagingTemplate(AmazonSQSAsync amazonSqs,
92             DestinationResolver<String> destinationResolver,
93             MessageConverter messageConverter) {
94         super(destinationResolver);
95         this.amazonSqs = amazonSqs;
96         initMessageConverter(messageConverter);
97     }
98
99     @Override
100     protected QueueMessageChannel resolveMessageChannel(
101             String physicalResourceIdentifier) {
102         return new QueueMessageChannel(this.amazonSqs, physicalResourceIdentifier);
103     }
104
105     @Override
106     public Message<?> receive() throws MessagingException {
107         return receive(getRequiredDefaultDestination());
108     }
109
110     @Override
111     public Message<?> receive(QueueMessageChannel destination) throws MessagingException {
112         return destination.receive();
113     }
114
115     @Override
116     public <T> T receiveAndConvert(Class<T> targetClass) throws MessagingException {
117         return receiveAndConvert(getRequiredDefaultDestination(), targetClass);
118     }
119
120     @SuppressWarnings("unchecked")
121     @Override
122     public <T> T receiveAndConvert(QueueMessageChannel destination, Class<T> targetClass)
123             throws MessagingException {
124         Message<?> message = destination.receive();
125         if (message != null) {
126             return (T) getMessageConverter().fromMessage(message, targetClass);
127         }
128         else {
129             return null;
130         }
131     }
132
133     @Override
134     public Message<?> receive(String destinationName) throws MessagingException {
135         return resolveMessageChannelByLogicalName(destinationName).receive();
136     }
137
138     @Override
139     public <T> T receiveAndConvert(String destinationName, Class<T> targetClass)
140             throws MessagingException {
141         QueueMessageChannel channel = resolveMessageChannelByLogicalName(destinationName);
142         return receiveAndConvert(channel, targetClass);
143     }
144
145 }
146