1 /*
2 * Copyright 2013-2019 the original author or authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * https://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package org.springframework.cloud.aws.messaging.config;
18
19 import com.amazonaws.services.sqs.AmazonSQS;
20 import com.amazonaws.services.sqs.AmazonSQSAsync;
21
22 import org.springframework.cloud.aws.core.env.ResourceIdResolver;
23 import org.springframework.cloud.aws.messaging.listener.QueueMessageHandler;
24 import org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer;
25 import org.springframework.core.task.AsyncTaskExecutor;
26 import org.springframework.core.task.TaskExecutor;
27 import org.springframework.messaging.core.DestinationResolver;
28 import org.springframework.util.Assert;
29
30 /**
31 * @author Alain Sahli
32 * @since 1.0
33 */
34 public class SimpleMessageListenerContainerFactory {
35
36 private AsyncTaskExecutor taskExecutor;
37
38 private Integer maxNumberOfMessages;
39
40 private Integer visibilityTimeout;
41
42 private Integer waitTimeOut;
43
44 private boolean autoStartup = true;
45
46 private AmazonSQSAsync amazonSqs;
47
48 private QueueMessageHandler queueMessageHandler;
49
50 private ResourceIdResolver resourceIdResolver;
51
52 private DestinationResolver<String> destinationResolver;
53
54 private Long backOffTime;
55
56 /**
57 * Configures the {@link TaskExecutor} which is used to poll messages and execute them
58 * by calling the handler methods. If no {@link TaskExecutor} is set, a default one is
59 * created.
60 * @param taskExecutor The {@link TaskExecutor} used by the container
61 * @see SimpleMessageListenerContainer#createDefaultTaskExecutor()
62 */
63 public void setTaskExecutor(AsyncTaskExecutor taskExecutor) {
64 this.taskExecutor = taskExecutor;
65 }
66
67 /**
68 * Configure the maximum number of messages that should be retrieved during one poll
69 * to the Amazon SQS system. This number must be a positive, non-zero number that has
70 * a maximum number of 10. Values higher then 10 are currently not supported by the
71 * queueing system.
72 * @param maxNumberOfMessages the maximum number of messages (between 1-10)
73 */
74 public void setMaxNumberOfMessages(Integer maxNumberOfMessages) {
75 this.maxNumberOfMessages = maxNumberOfMessages;
76 }
77
78 /**
79 * Configures the duration (in seconds) that the received messages are hidden from
80 * subsequent poll requests after being retrieved from the system.
81 * @param visibilityTimeout the visibility timeout in seconds
82 */
83 public void setVisibilityTimeout(Integer visibilityTimeout) {
84 this.visibilityTimeout = visibilityTimeout;
85 }
86
87 /**
88 * Configures the wait timeout that the poll request will wait for new message to
89 * arrive if the are currently no messages on the queue. Higher values will reduce
90 * poll request to the system significantly. The value should be between 1 and 20. For
91 * more information read the <a href=
92 * "https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-long-polling.html">documentation</a>.
93 * @param waitTimeOut - the wait time out in seconds
94 */
95 public void setWaitTimeOut(Integer waitTimeOut) {
96 this.waitTimeOut = waitTimeOut;
97 }
98
99 /**
100 * Configures if this container should be automatically started. The default value is
101 * true.
102 * @param autoStartup - false if the container will be manually started
103 */
104 public void setAutoStartup(boolean autoStartup) {
105 this.autoStartup = autoStartup;
106 }
107
108 public AmazonSQS getAmazonSqs() {
109 return this.amazonSqs;
110 }
111
112 /**
113 * Sets the {@link AmazonSQSAsync} that is going to be used by the container to
114 * interact with the messaging (SQS) API.
115 * @param amazonSqs The {@link AmazonSQSAsync}, must not be {@code null}.
116 */
117 public void setAmazonSqs(AmazonSQSAsync amazonSqs) {
118 Assert.notNull(amazonSqs, "amazonSqs must not be null");
119 this.amazonSqs = amazonSqs;
120 }
121
122 public QueueMessageHandler getQueueMessageHandler() {
123 return this.queueMessageHandler;
124 }
125
126 /**
127 * Configures the {@link QueueMessageHandler} that must be used to handle incoming
128 * messages.
129 * <p>
130 * <b>NOTE</b>: It is rather unlikely that the {@link QueueMessageHandler} must be
131 * configured with this setter. Consider using the {@link QueueMessageHandlerFactory}
132 * to configure the {@link QueueMessageHandler} before using this setter.
133 * </p>
134 * @param messageHandler the {@link QueueMessageHandler} that must be used by the
135 * container, must not be {@code null}.
136 * @see QueueMessageHandlerFactory
137 */
138 public void setQueueMessageHandler(QueueMessageHandler messageHandler) {
139 Assert.notNull(messageHandler, "messageHandler must not be null");
140 this.queueMessageHandler = messageHandler;
141 }
142
143 public ResourceIdResolver getResourceIdResolver() {
144 return this.resourceIdResolver;
145 }
146
147 /**
148 * This value must be set if no destination resolver has been set.
149 * @param resourceIdResolver the resourceIdResolver to use for resolving logical to
150 * physical ids in a CloudFormation environment. Must not be null.
151 */
152 public void setResourceIdResolver(ResourceIdResolver resourceIdResolver) {
153 this.resourceIdResolver = resourceIdResolver;
154 }
155
156 /**
157 * Configures the destination resolver used to retrieve the queue url based on the
158 * destination name configured for this instance. <br>
159 * This setter can be used when a custom configured {@link DestinationResolver} must
160 * be provided. (For example if one want to have the
161 * {@link org.springframework.cloud.aws.messaging.support.destination.DynamicQueueUrlDestinationResolver}
162 * with the auto creation of queues set to {@code true}.
163 * @param destinationResolver another or customized {@link DestinationResolver}
164 */
165 public void setDestinationResolver(DestinationResolver<String> destinationResolver) {
166 this.destinationResolver = destinationResolver;
167 }
168
169 /**
170 * @return The number of milliseconds the polling thread must wait before trying to
171 * recover when an error occurs (e.g. connection timeout)
172 */
173 public Long getBackOffTime() {
174 return this.backOffTime;
175 }
176
177 /**
178 * The number of milliseconds the polling thread must wait before trying to recover
179 * when an error occurs (e.g. connection timeout). Default value is 10000
180 * milliseconds.
181 * @param backOffTime in milliseconds
182 */
183 public void setBackOffTime(Long backOffTime) {
184 this.backOffTime = backOffTime;
185 }
186
187 public SimpleMessageListenerContainer createSimpleMessageListenerContainer() {
188 Assert.notNull(this.amazonSqs, "amazonSqs must not be null");
189
190 SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
191 simpleMessageListenerContainer.setAmazonSqs(this.amazonSqs);
192 simpleMessageListenerContainer.setAutoStartup(this.autoStartup);
193
194 if (this.taskExecutor != null) {
195 simpleMessageListenerContainer.setTaskExecutor(this.taskExecutor);
196 }
197 if (this.maxNumberOfMessages != null) {
198 simpleMessageListenerContainer
199 .setMaxNumberOfMessages(this.maxNumberOfMessages);
200 }
201 if (this.visibilityTimeout != null) {
202 simpleMessageListenerContainer.setVisibilityTimeout(this.visibilityTimeout);
203 }
204 if (this.waitTimeOut != null) {
205 simpleMessageListenerContainer.setWaitTimeOut(this.waitTimeOut);
206 }
207 if (this.resourceIdResolver != null) {
208 simpleMessageListenerContainer.setResourceIdResolver(this.resourceIdResolver);
209 }
210 if (this.destinationResolver != null) {
211 simpleMessageListenerContainer
212 .setDestinationResolver(this.destinationResolver);
213 }
214 if (this.backOffTime != null) {
215 simpleMessageListenerContainer.setBackOffTime(this.backOffTime);
216 }
217
218 return simpleMessageListenerContainer;
219 }
220
221 }
222