1
16
17 package org.springframework.cloud.aws.messaging.core;
18
19 import com.amazonaws.services.sqs.AmazonSQS;
20 import com.amazonaws.services.sqs.AmazonSQSAsync;
21
22 import org.springframework.cloud.aws.core.env.ResourceIdResolver;
23 import org.springframework.cloud.aws.messaging.core.support.AbstractMessageChannelMessagingSendingTemplate;
24 import org.springframework.cloud.aws.messaging.support.destination.DynamicQueueUrlDestinationResolver;
25 import org.springframework.messaging.Message;
26 import org.springframework.messaging.MessagingException;
27 import org.springframework.messaging.converter.CompositeMessageConverter;
28 import org.springframework.messaging.converter.MessageConverter;
29 import org.springframework.messaging.converter.StringMessageConverter;
30 import org.springframework.messaging.core.DestinationResolver;
31 import org.springframework.messaging.core.DestinationResolvingMessageReceivingOperations;
32
33
47 public class QueueMessagingTemplate
48 extends AbstractMessageChannelMessagingSendingTemplate<QueueMessageChannel>
49 implements DestinationResolvingMessageReceivingOperations<QueueMessageChannel> {
50
51 private final AmazonSQSAsync amazonSqs;
52
53 public QueueMessagingTemplate(AmazonSQSAsync amazonSqs) {
54 this(amazonSqs, (ResourceIdResolver) null, null);
55 }
56
57 public QueueMessagingTemplate(AmazonSQSAsync amazonSqs,
58 ResourceIdResolver resourceIdResolver) {
59 this(amazonSqs, resourceIdResolver, null);
60 }
61
62
72 public QueueMessagingTemplate(AmazonSQSAsync amazonSqs,
73 ResourceIdResolver resourceIdResolver, MessageConverter messageConverter) {
74 this(amazonSqs,
75 new DynamicQueueUrlDestinationResolver(amazonSqs, resourceIdResolver),
76 messageConverter);
77 }
78
79
91 public QueueMessagingTemplate(AmazonSQSAsync amazonSqs,
92 DestinationResolver<String> destinationResolver,
93 MessageConverter messageConverter) {
94 super(destinationResolver);
95 this.amazonSqs = amazonSqs;
96 initMessageConverter(messageConverter);
97 }
98
99 @Override
100 protected QueueMessageChannel resolveMessageChannel(
101 String physicalResourceIdentifier) {
102 return new QueueMessageChannel(this.amazonSqs, physicalResourceIdentifier);
103 }
104
105 @Override
106 public Message<?> receive() throws MessagingException {
107 return receive(getRequiredDefaultDestination());
108 }
109
110 @Override
111 public Message<?> receive(QueueMessageChannel destination) throws MessagingException {
112 return destination.receive();
113 }
114
115 @Override
116 public <T> T receiveAndConvert(Class<T> targetClass) throws MessagingException {
117 return receiveAndConvert(getRequiredDefaultDestination(), targetClass);
118 }
119
120 @SuppressWarnings("unchecked")
121 @Override
122 public <T> T receiveAndConvert(QueueMessageChannel destination, Class<T> targetClass)
123 throws MessagingException {
124 Message<?> message = destination.receive();
125 if (message != null) {
126 return (T) getMessageConverter().fromMessage(message, targetClass);
127 }
128 else {
129 return null;
130 }
131 }
132
133 @Override
134 public Message<?> receive(String destinationName) throws MessagingException {
135 return resolveMessageChannelByLogicalName(destinationName).receive();
136 }
137
138 @Override
139 public <T> T receiveAndConvert(String destinationName, Class<T> targetClass)
140 throws MessagingException {
141 QueueMessageChannel channel = resolveMessageChannelByLogicalName(destinationName);
142 return receiveAndConvert(channel, targetClass);
143 }
144
145 }
146