1 /*
2 * Copyright 2012-2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License").
5 * You may not use this file except in compliance with the License.
6 * A copy of the License is located at
7 *
8 * http://aws.amazon.com/apache2.0
9 *
10 * or in the "license" file accompanying this file. This file is distributed
11 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12 * express or implied. See the License for the specific language governing
13 * permissions and limitations under the License.
14 */
15
16 package com.amazonaws.services.sqs.buffered;
17
18 import java.util.ArrayList;
19 import java.util.Collections;
20 import java.util.List;
21
22 import com.amazonaws.AmazonClientException;
23 import com.amazonaws.handlers.AsyncHandler;
24 import com.amazonaws.services.sqs.model.DeleteMessageRequest;
25 import com.amazonaws.services.sqs.model.DeleteMessageResult;
26
27 public class QueueBufferConfig {
28
29 public static final int MAX_BATCH_SIZE_DEFAULT = 10;
30
31 /** the maximum number of entries in a batch command */
32 private int maxBatchSize;
33
34 /** Updated as the service now supports messages of size max 256 KiB. */
35 public static final long SERVICE_MAX_BATCH_SIZE_BYTES = 256 * 1024;
36
37 /**
38 * The maximum time (milliseconds) a send batch is held open for additional outbound requests.
39 * The longer this timeout, the longer messages wait for other messages to be added to the
40 * batch. Increasing this timeout reduces the number of calls made and increases throughput, but
41 * also increases average message latency.
42 */
43 private long maxBatchOpenMs;
44
45 /** 200 milliseconds */
46 public static final long MAX_BATCH_OPEN_MS_DEFAULT = 200;
47
48 /**
49 * If true, even synchronous calls to delete messages will be made using background
50 * asynchronous batches. The client will return results indicating that the messages were deleted successfully
51 * even if the background calls eventually fail; the actual errors will be logged instead.
52 * This can be beneficial for decreasing message acknowledgement latency at the cost of potential
53 * duplicate messages (which can be produced by SQS itself anyway).
54 */
55 private boolean deleteInBackground = false;
56
57 /**
58 * Should we use long polling or not?
59 */
60 private boolean longPoll;
61
62 /** true */
63 private static final boolean LONG_POLL_DEFAULT = true;
64
65 /**
66 * The maximum number of concurrent batches for each type of outbound request. The greater the
67 * number, the greater the throughput that can be achieved (at the expense of consuming more
68 * threads).
69 */
70 private int maxInflightOutboundBatches;
71
72 /** 5 batches */
73 public static final int MAX_INFLIGHT_OUTBOUND_BATCHES_DEFAULT = 5;
74
75 /**
76 * The maximum number of concurrent receive message batches. The greater this number, the faster
77 * the queue will be pulling messages from the SQS servers (at the expense of consuming more
78 * threads).
79 */
80 private int maxInflightReceiveBatches;
81
82 /** 10 batches */
83 public static final int MAX_INFLIGHT_RECEIVE_BATCHES_DEFAULT = 10;
84
85 /**
86 * If more than that number of completed receive batches are waiting in the buffer, the querying
87 * for new messages will stop. The larger this number, the more messages the buffer queue will
88 * pre-fetch and keep in the buffer on the client side, and the faster receive requests will be
89 * satisfied. The visibility timeout of a pre-fetched message starts at the point of pre-fetch,
90 * which means that while the message is in the local buffer it is unavailable for other clients
91 * to process, and when this client retrieves it, part of the visibility timeout may have
92 * already expired. The number of messages prefetched will not exceed maxBatchSize *
93 * maxDoneReceiveBatches.
94 */
95 private int maxDoneReceiveBatches;
96
97 /** 10 batches */
98 public static final int MAX_DONE_RECEIVE_BATCHES_DEFAULT = 10;
99
100 /**
101 * Maximum permitted size of a SendMessage or SendMessageBatch message, in bytes
102 */
103 private long maxBatchSizeBytes;
104
105 /** 256 kilobytes */
106 public static final long MAX_BATCH_SIZE_BYTES_DEFAULT = SERVICE_MAX_BATCH_SIZE_BYTES;
107
108 /**
109 * Custom visibility timeout to use when retrieving messages from SQS. If set to a value greater
110 * than zero, this timeout will override the default visibility timeout set on the SQS queue.
111 * Set it to -1 to use the default visiblity timeout of the queue. Visibility timeout of 0
112 * seconds is not supported.
113 */
114 private int visibilityTimeoutSeconds;
115
116 /** -1, which means use the visibility timeout of the queue */
117 public static final int VISIBILITY_TIMEOUT_SECONDS_DEFAULT = -1;
118
119 /**
120 * Specifies the amount of time, in seconds, the receive call will block on the server waiting
121 * for messages to arrive if the queue is empty when the receive call is first made. This
122 * setting has no effect if long polling is disabled.
123 */
124 private int longPollWaitTimeoutSeconds;
125
126 public static final int LONGPOLL_WAIT_TIMEOUT_SECONDS_DEFAULT = 20;
127
128 /**
129 * Configures the minimum wait time for incoming receive message requests. Without a non-zero
130 * minimum wait time, threads can easily waste CPU time busy-waiting against empty local buffers.
131 * Avoid setting this to 0 unless you are confident threads will do useful work in-between
132 * each call to receive messages!
133 * <p></p>
134 * This will be applied to both requests that explicitly set WaitTimeSeconds and
135 * those that inherit the ReceiveMessageWaitTimeSeconds queue attribute.
136 */
137 private int minReceiveWaitTimeMs = MIN_RECEIVE_WAIT_TIME_MS_DEFAULT;
138
139 /** 50 ms, which is in the ballpark for typical latency contacting a remote service like SQS */
140 public static final int MIN_RECEIVE_WAIT_TIME_MS_DEFAULT = 50;
141
142 /**
143 * Specifies the message attributes receive calls will request. Only receive message requests that
144 * request the same set of attributes will be satisfied from the receive buffers.
145 * <p>
146 * The default value is an empty list, so any receive requests that require message attributes
147 * will not be fulfilled from buffers.
148 */
149 private List<String> receiveMessageAttributeNames = RECEIVE_MESSAGE_ATTRIBUTE_NAMES_DEFAULT;
150
151 public static final List<String> RECEIVE_MESSAGE_ATTRIBUTE_NAMES_DEFAULT = Collections.emptyList();
152
153 /**
154 * Specifies the attributes receive calls will request. Only receive message requests that
155 * request the same set of attributes will be satisfied from the receive buffers.
156 * <p>
157 * The default value is an empty list, so any receive requests that require attributes
158 * will not be fulfilled from buffers.
159 */
160 private List<String> receiveAttributeNames = RECEIVE_ATTRIBUTE_NAMES_DEFAULT;
161
162 public static final List<String> RECEIVE_ATTRIBUTE_NAMES_DEFAULT = Collections.emptyList();
163
164 /**
165 * If set, prefetching will be scaled with the number of in-flight incoming receive requests
166 * made to the client. The advantage of this is reducing the number of outgoing requests
167 * made to SQS when incoming requests are reduced: in particular, if all incoming requests
168 * stop no future requests to SQS will be made. The disadvantage is increased latency when
169 * incoming requests first start occurring.
170 */
171 private boolean adaptivePrefetching = ADAPTIVE_PREFETCHING_DEFAULT;
172
173 public static final boolean ADAPTIVE_PREFETCHING_DEFAULT = false;
174
175 /**
176 * Option to configure flushOnShutdown. Enabling this option will flush the pending requests in the
177 * {@link SendQueueBuffer} during shutdown.
178 * <p>
179 * The default value is false which indicates flushOnShutdown is disabled.
180 * </p>
181 */
182 private boolean flushOnShutdown = false;
183
184 public QueueBufferConfig(long maxBatchOpenMs, int maxInflightOutboundBatches, int maxInflightReceiveBatches,
185 int maxDoneReceiveBatches, boolean paramLongPoll, long maxBatchSizeBytes, int visibilityTimeout,
186 int longPollTimeout, int maxBatch) {
187 super();
188 this.maxBatchOpenMs = maxBatchOpenMs;
189 this.maxInflightOutboundBatches = maxInflightOutboundBatches;
190 this.maxInflightReceiveBatches = maxInflightReceiveBatches;
191 this.maxDoneReceiveBatches = maxDoneReceiveBatches;
192 this.longPoll = paramLongPoll;
193 this.maxBatchSizeBytes = maxBatchSizeBytes;
194 this.visibilityTimeoutSeconds = visibilityTimeout;
195 this.longPollWaitTimeoutSeconds = longPollTimeout;
196 this.maxBatchSize = maxBatch;
197 }
198
199 public QueueBufferConfig() {
200 this(MAX_BATCH_OPEN_MS_DEFAULT, MAX_INFLIGHT_OUTBOUND_BATCHES_DEFAULT, MAX_INFLIGHT_RECEIVE_BATCHES_DEFAULT,
201 MAX_DONE_RECEIVE_BATCHES_DEFAULT, LONG_POLL_DEFAULT, MAX_BATCH_SIZE_BYTES_DEFAULT,
202 VISIBILITY_TIMEOUT_SECONDS_DEFAULT, LONGPOLL_WAIT_TIMEOUT_SECONDS_DEFAULT, MAX_BATCH_SIZE_DEFAULT);
203 }
204
205 /** copy constructor */
206 public QueueBufferConfig(QueueBufferConfig other) {
207 longPoll = other.longPoll;
208 longPollWaitTimeoutSeconds = other.longPollWaitTimeoutSeconds;
209 minReceiveWaitTimeMs = other.minReceiveWaitTimeMs;
210 maxBatchOpenMs = other.maxBatchOpenMs;
211 maxBatchSize = other.maxBatchSize;
212 maxBatchSizeBytes = other.maxBatchSizeBytes;
213 maxDoneReceiveBatches = other.maxDoneReceiveBatches;
214 maxInflightOutboundBatches = other.maxInflightOutboundBatches;
215 maxInflightReceiveBatches = other.maxInflightReceiveBatches;
216 visibilityTimeoutSeconds = other.visibilityTimeoutSeconds;
217 flushOnShutdown = other.flushOnShutdown;
218 receiveAttributeNames = other.receiveAttributeNames;
219 receiveMessageAttributeNames = other.receiveMessageAttributeNames;
220 adaptivePrefetching = other.adaptivePrefetching;
221 deleteInBackground = other.deleteInBackground;
222 }
223
224 @Override
225 public String toString() {
226 return "QueueBufferConfig [maxBatchSize=" + maxBatchSize + ", maxBatchOpenMs=" + maxBatchOpenMs + ", longPoll="
227 + longPoll + ", maxInflightOutboundBatches=" + maxInflightOutboundBatches
228 + ", maxInflightReceiveBatches=" + maxInflightReceiveBatches + ", maxDoneReceiveBatches="
229 + maxDoneReceiveBatches + ", maxBatchSizeBytes=" + maxBatchSizeBytes + ", visibilityTimeoutSeconds="
230 + visibilityTimeoutSeconds + ", longPollWaitTimeoutSeconds=" + longPollWaitTimeoutSeconds + "]";
231 }
232
233 /**
234 * The maximum time (milliseconds) a send batch is held open for additional outbound requests.
235 * The longer this timeout, the longer messages wait for other messages to be added to the
236 * batch. Increasing this timeout reduces the number of calls made and increases throughput, but
237 * also increases average message latency.
238 */
239 public long getMaxBatchOpenMs() {
240 return maxBatchOpenMs;
241 }
242
243 /**
244 * The maximum time (milliseconds) a send batch is held open for additional outbound requests.
245 * The longer this timeout, the longer messages wait for other messages to be added to the
246 * batch. Increasing this timeout reduces the number of calls made and increases throughput, but
247 * also increases average message latency.
248 */
249 public void setMaxBatchOpenMs(long maxBatchOpenMs) {
250 this.maxBatchOpenMs = maxBatchOpenMs;
251 }
252
253 /**
254 * The maximum time (milliseconds) a send batch is held open for additional outbound requests.
255 * The longer this timeout, the longer messages wait for other messages to be added to the
256 * batch. Increasing this timeout reduces the number of calls made and increases throughput, but
257 * also increases average message latency.
258 */
259 public QueueBufferConfig withMaxBatchOpenMs(long maxBatchOpenMs) {
260 setMaxBatchOpenMs(maxBatchOpenMs);
261 return this;
262 }
263
264 /**
265 * If set, even synchronous calls to delete messages will be made using background
266 * asynchronous batches. The client will return results indicating that the messages were deleted successfully
267 * even if the background calls eventually fail; the actual result of the deletions will be reported
268 * through the given handler instead (often just logging errors). This can be beneficial for decreasing message
269 * acknowledgement latency at the cost of potential duplicate messages (which can be produced by SQS itself anyway).
270 */
271 public boolean isDeleteInBackground() {
272 return deleteInBackground;
273 }
274
275 /**
276 * If set, even synchronous calls to delete messages will be made using background
277 * asynchronous batches. The client will return results indicating that the messages were deleted successfully
278 * even if the background calls eventually fail; any errors result of the deletions will be reported
279 * through the given handler instead (often just logging errors). This can be beneficial for decreasing message
280 * acknowledgement latency at the cost of potential duplicate messages (which can be produced by SQS itself anyway).
281 */
282 public void setDeleteInBackground(boolean deleteInBackground) {
283 this.deleteInBackground = deleteInBackground;
284 }
285
286 /**
287 * If set, even synchronous calls to delete messages will be made using background
288 * asynchronous batches. The client will return results indicating that the messages were deleted successfully
289 * even if the background calls eventually fail; the actual result of the deletions will be reported
290 * through the given handler instead (often just logging errors). This can be beneficial for decreasing message
291 * acknowledgement latency at the cost of potential duplicate messages (which can be produced by SQS itself anyway).
292 */
293 public QueueBufferConfig withDeleteInBackground(boolean deleteInBackground) {
294 setDeleteInBackground(deleteInBackground);
295 return this;
296 }
297
298 /**
299 * @return true if the queue buffer will use long polling while retrieving messages from the
300 * SQS server, false otherwise.
301 */
302 public boolean isLongPoll() {
303 return longPoll;
304 }
305
306 /**
307 * Specify "true" for receive requests to use long polling.
308 */
309 public void setLongPoll(boolean longPoll) {
310 this.longPoll = longPoll;
311 }
312
313 /**
314 * Specify "true" for receive requests to use long polling.
315 */
316 public QueueBufferConfig withLongPoll(boolean longPoll) {
317 setLongPoll(longPoll);
318 return this;
319 }
320
321 /**
322 * The maximum number of concurrent batches for each type of outbound request. The greater the
323 * number, the greater the throughput that can be achieved (at the expense of consuming more
324 * threads).
325 */
326 public int getMaxInflightOutboundBatches() {
327 return maxInflightOutboundBatches;
328 }
329
330 /**
331 * The maximum number of concurrent batches for each type of outbound request. The greater the
332 * number, the greater the throughput that can be achieved (at the expense of consuming more
333 * threads).
334 */
335 public void setMaxInflightOutboundBatches(int maxInflightOutboundBatches) {
336 this.maxInflightOutboundBatches = maxInflightOutboundBatches;
337 }
338
339 /**
340 * The maximum number of concurrent batches for each type of outbound request. The greater the
341 * number, the greater the throughput that can be achieved (at the expense of consuming more
342 * threads).
343 */
344 public QueueBufferConfig withMaxInflightOutboundBatches(int maxInflightOutboundBatches) {
345 setMaxInflightOutboundBatches(maxInflightOutboundBatches);
346 return this;
347 }
348
349 /**
350 * The maximum number of concurrent receive message batches. The greater this number, the faster
351 * the queue will be pulling messages from the SQS servers (at the expense of consuming more
352 * threads).
353 */
354 public int getMaxInflightReceiveBatches() {
355 return maxInflightReceiveBatches;
356 }
357
358 /**
359 * The maximum number of concurrent receive message batches. The greater this number, the faster
360 * the queue will be pulling messages from the SQS servers (at the expense of consuming more
361 * threads).
362 */
363 public void setMaxInflightReceiveBatches(int maxInflightReceiveBatches) {
364 this.maxInflightReceiveBatches = maxInflightReceiveBatches;
365 }
366
367 /**
368 * The maximum number of concurrent receive message batches. The greater this number, the faster
369 * the queue will be pulling messages from the SQS servers (at the expense of consuming more
370 * threads).
371 */
372 public QueueBufferConfig withMaxInflightReceiveBatches(int maxInflightReceiveBatches) {
373 setMaxInflightReceiveBatches(maxInflightReceiveBatches);
374 return this;
375 }
376
377 /**
378 * If more than that number of completed receive batches are waiting in the buffer, the querying
379 * for new messages will stop.<br>
380 * The larger this number, the more messages the queue buffer will pre-fetch and keep in the
381 * buffer on the client side, and the faster receive requests will be satisfied. <br>
382 * The visibility timeout of a pre-fetched message starts at the point of pre-fetch, which means
383 * that while the message is in the local buffer it is unavailable for other clients to process,
384 * and when this client retrieves it, part of the visibility timeout may have already expired.<br>
385 * The number of messages prefetched will not exceed 10 * maxDoneReceiveBatches, as there can be
386 * a maximum of 10 messages per batch.<br>
387 */
388 public int getMaxDoneReceiveBatches() {
389 return maxDoneReceiveBatches;
390 }
391
392 /**
393 * If more than that number of completed receive batches are waiting in the buffer, the querying
394 * for new messages will stop. The larger this number, the more messages the buffer queue will
395 * pre-fetch and keep in the buffer on the client side, and the faster receive requests will be
396 * satisfied. The visibility timeout of a pre-fetched message starts at the point of pre-fetch,
397 * which means that while the message is in the local buffer it is unavailable for other clients
398 * to process, and when this client retrieves it, part of the visibility timeout may have
399 * already expired. The number of messages prefetched will not exceed maxBatchSize *
400 * maxDoneReceiveBatches.
401 */
402 public void setMaxDoneReceiveBatches(int maxDoneReceiveBatches) {
403 this.maxDoneReceiveBatches = maxDoneReceiveBatches;
404 }
405
406 /**
407 * If more than that number of completed receive batches are waiting in the buffer, the querying
408 * for new messages will stop. The larger this number, the more messages the buffer queue will
409 * pre-fetch and keep in the buffer on the client side, and the faster receive requests will be
410 * satisfied. The visibility timeout of a pre-fetched message starts at the point of pre-fetch,
411 * which means that while the message is in the local buffer it is unavailable for other clients
412 * to process, and when this client retrieves it, part of the visibility timeout may have
413 * already expired. The number of messages prefetched will not exceed maxBatchSize *
414 * maxDoneReceiveBatches.
415 */
416 public QueueBufferConfig withMaxDoneReceiveBatches(int maxDoneReceiveBatches) {
417 setMaxDoneReceiveBatches(maxDoneReceiveBatches);
418 return this;
419 }
420
421 /**
422 * Maximum permitted size of a SendMessage or SendMessageBatch message, in bytes. This setting
423 * is also enforced on the server, and if this client submits a request of a size larger than
424 * the server can support, the server will reject the request.
425 */
426 public long getMaxBatchSizeBytes() {
427 return maxBatchSizeBytes;
428 }
429
430 /**
431 * Maximum permitted size of a SendMessage or SendMessageBatch message, in bytes. This setting
432 * is also enforced on the server, and if this client submits a request of a size larger than
433 * the server can support, the server will reject the request.
434 *
435 * @throws IllegalArgumentException
436 * if the size being set is greater than the service allowed size for message body.
437 */
438 public void setMaxBatchSizeBytes(long maxBatchSizeBytes) {
439 if (maxBatchSizeBytes > SERVICE_MAX_BATCH_SIZE_BYTES) {
440 throw new IllegalArgumentException(
441 "Maximum Size of the message cannot be greater than the allowed limit of "
442 + SERVICE_MAX_BATCH_SIZE_BYTES + " bytes");
443 }
444 this.maxBatchSizeBytes = maxBatchSizeBytes;
445 }
446
447 /**
448 * Maximum permitted size of a SendMessage or SendMessageBatch message, in bytes. This setting
449 * is also enforced on the server, and if this client submits a request of a size larger than
450 * the server can support, the server will reject the request.
451 *
452 * @throws IllegalArgumentException
453 * if the size being set is greater than the service allowed size for message body.
454 */
455 public QueueBufferConfig withMaxBatchSizeBytes(long maxBatchSizeBytes) {
456 setMaxBatchSizeBytes(maxBatchSizeBytes);
457 return this;
458 }
459
460 /**
461 * Custom visibility timeout to use when retrieving messages from SQS. If set to a value greater
462 * than zero, this timeout will override the default visibility timeout set on the SQS queue.
463 * Set it to -1 to use the default visiblity timeout of the queue. Visibility timeout of 0
464 * seconds is not supported.
465 */
466 public int getVisibilityTimeoutSeconds() {
467 return visibilityTimeoutSeconds;
468 }
469
470 /**
471 * Custom visibility timeout to use when retrieving messages from SQS. If set to a value greater
472 * than zero, this timeout will override the default visibility timeout set on the SQS queue.
473 * Set it to -1 to use the default visiblity timeout of the queue. Visibility timeout of 0
474 * seconds is not supported.
475 */
476 public void setVisibilityTimeoutSeconds(int visibilityTimeoutSeconds) {
477 this.visibilityTimeoutSeconds = visibilityTimeoutSeconds;
478 }
479
480 /**
481 * Custom visibility timeout to use when retrieving messages from SQS. If set to a value greater
482 * than zero, this timeout will override the default visibility timeout set on the SQS queue.
483 * Set it to -1 to use the default visiblity timeout of the queue. Visibility timeout of 0
484 * seconds is not supported.
485 */
486 public QueueBufferConfig withVisibilityTimeoutSeconds(int visibilityTimeoutSeconds) {
487 setVisibilityTimeoutSeconds(visibilityTimeoutSeconds);
488 return this;
489 }
490
491 /**
492 * Specifies the amount of time, in seconds, the receive call will block on the server waiting
493 * for messages to arrive if the queue is empty when the receive call is first made. This
494 * setting has no effect if long polling is disabled.
495 */
496 public void setLongPollWaitTimeoutSeconds(int longPollWaitTimeoutSeconds) {
497 this.longPollWaitTimeoutSeconds = longPollWaitTimeoutSeconds;
498 }
499
500 /**
501 * Specifies the amount of time, in seconds, the receive call will block on the server waiting
502 * for messages to arrive if the queue is empty when the receive call is first made. This
503 * setting has no effect if long polling is disabled.
504 */
505 public int getLongPollWaitTimeoutSeconds() {
506 return longPollWaitTimeoutSeconds;
507 }
508
509 /**
510 * Specifies the amount of time, in seconds, the receive call will block on the server waiting
511 * for messages to arrive if the queue is empty when the receive call is first made. This
512 * setting has no effect if long polling is disabled.
513 */
514 public QueueBufferConfig withLongPollWaitTimeoutSeconds(int longPollWaitTimeoutSeconds) {
515 setLongPollWaitTimeoutSeconds(longPollWaitTimeoutSeconds);
516 return this;
517 }
518
519 /**
520 * Configures the minimum wait time for incoming receive message requests. Without a non-zero
521 * minimum wait time, threads can easily waste CPU time busy-waiting against empty local buffers.
522 * Avoid setting this to 0 unless you are confident threads will do useful work in-between
523 * each call to receive messages!
524 * <p></p>
525 * This will be applied to both requests that explicitly set WaitTimeSeconds and
526 * those that inherit the ReceiveMessageWaitTimeSeconds queue attribute.
527 */
528 public int getMinReceiveWaitTimeMs() {
529 return minReceiveWaitTimeMs;
530 }
531
532 /**
533 * Configures the minimum wait time for incoming receive message requests. Without a non-zero
534 * minimum wait time, threads can easily waste CPU time busy-waiting against empty local buffers.
535 * Avoid setting this to 0 unless you are confident threads will do useful work in-between
536 * each call to receive messages!
537 * <p></p>
538 * This will be applied to both requests that explicitly set WaitTimeSeconds and
539 * those that inherit the ReceiveMessageWaitTimeSeconds queue attribute.
540 */
541 public void setMinReceiveWaitTimeMs(int minReceiveWaitTimeMs) {
542 this.minReceiveWaitTimeMs = minReceiveWaitTimeMs;
543 }
544
545 /**
546 * Configures the minimum wait time for incoming receive message requests. Without a non-zero
547 * minimum wait time, threads can easily waste CPU time busy-waiting against empty local buffers.
548 * Avoid setting this to 0 unless you are confident threads will do useful work in-between
549 * each call to receive messages!
550 * <p></p>
551 * This will be applied to both requests that explicitly set WaitTimeSeconds and
552 * those that inherit the ReceiveMessageWaitTimeSeconds queue attribute.
553 */
554 public QueueBufferConfig withMinReceiveWaitTimeMs(int minReceiveWaitTimeMs) {
555 setMinReceiveWaitTimeMs(minReceiveWaitTimeMs);
556 return this;
557 }
558
559 /**
560 * Specifies the maximum number of entries the buffered client will put in a single batch
561 * request.
562 */
563 public int getMaxBatchSize() {
564 return maxBatchSize;
565 }
566
567 /**
568 * Specifies the maximum number of entries the buffered client will put in a single batch
569 * request.
570 */
571 public void setMaxBatchSize(int maxBatchSize) {
572 this.maxBatchSize = maxBatchSize;
573 }
574
575 /**
576 * Specifies the maximum number of entries the buffered client will put in a single batch
577 * request.
578 */
579 public QueueBufferConfig withMaxBatchSize(int maxBatchSize) {
580 setMaxBatchSize(maxBatchSize);
581 return this;
582 }
583
584 /**
585 * Specifies the attributes receive calls will request. Only receive message requests that
586 * request the same set of attributes will be satisfied from the receive buffers.
587 * <p>
588 * The default value is an empty list, so any receive requests that require attributes
589 * will not be fulfilled from buffers.
590 */
591 public List<String> getReceiveAttributeNames() {
592 return receiveAttributeNames;
593 }
594
595 /**
596 * Specifies the attributes receive calls will request. Only receive message requests that
597 * request the same set of attributes will be satisfied from the receive buffers.
598 * <p>
599 * The default value is an empty list, so any receive requests that require attributes
600 * will not be fulfilled from buffers.
601 */
602 public void setReceiveAttributeNames(List<String> receiveAttributeNames) {
603 if (receiveAttributeNames == null) {
604 this.receiveAttributeNames = Collections.emptyList();
605 } else {
606 this.receiveAttributeNames = Collections.unmodifiableList(new ArrayList<String>(receiveAttributeNames));
607 }
608 }
609
610 /**
611 * Specifies the attributes receive calls will request. Only receive message requests that
612 * request the same set of attributes will be satisfied from the receive buffers.
613 * <p>
614 * The default value is an empty list, so any receive requests that require attributes
615 * will not be fulfilled from buffers.
616 */
617 public QueueBufferConfig withReceiveAttributeNames(List<String> receiveAttributes) {
618 setReceiveAttributeNames(receiveAttributes);
619 return this;
620 }
621
622 /**
623 * Specifies the message attributes receive calls will request. Only receive message requests that
624 * request the same set of attributes will be satisfied from the receive buffers.
625 * <p>
626 * The default value is an empty list, so any receive requests that require message attributes
627 * will not be fulfilled from buffers.
628 */
629 public List<String> getReceiveMessageAttributeNames() {
630 return receiveMessageAttributeNames;
631 }
632
633 /**
634 * Specifies the message attributes receive calls will request. Only receive message requests that
635 * request the same set of attributes will be satisfied from the receive buffers.
636 * <p>
637 * The default value is an empty list, so any receive requests that require message attributes
638 * will not be fulfilled from buffers.
639 */
640 public void setReceiveMessageAttributeNames(List<String> receiveMessageAttributeNames) {
641 if (receiveMessageAttributeNames == null) {
642 this.receiveMessageAttributeNames = Collections.emptyList();
643 } else {
644 this.receiveMessageAttributeNames = Collections.unmodifiableList(new ArrayList<String>(receiveMessageAttributeNames));
645 }
646 }
647
648 /**
649 * Specifies the message attributes receive calls will request. Only receive message requests that
650 * request the same set of attributes will be satisfied from the receive buffers.
651 * <p>
652 * The default value is an empty list, so any receive requests that require message attributes
653 * will not be fulfilled from buffers.
654 */
655 public QueueBufferConfig withReceiveMessageAttributeNames(List<String> receiveMessageAttributes) {
656 setReceiveMessageAttributeNames(receiveMessageAttributes);
657 return this;
658 }
659
660 /**
661 * If set, prefetching will be scaled with the number of in-flight incoming receive requests
662 * made to the client. The advantage of this is reducing the number of outgoing requests
663 * made to SQS when incoming requests are reduced: in particular, if all incoming requests
664 * stop no future requests to SQS will be made. The disadvantage is increased latency when
665 * incoming requests first start occurring.
666 */
667 public void setAdaptivePrefetching(boolean adaptivePrefetching) {
668 this.adaptivePrefetching = adaptivePrefetching;
669 }
670
671 /**
672 * If set, prefetching will be scaled with the number of in-flight incoming receive requests
673 * made to the client. The advantage of this is reducing the number of outgoing requests
674 * made to SQS when incoming requests are reduced: in particular, if all incoming requests
675 * stop no future requests to SQS will be made. The disadvantage is increased latency when
676 * incoming requests first start occurring.
677 */
678 public boolean isAdapativePrefetching() {
679 return adaptivePrefetching;
680 }
681
682 /**
683 * If set, prefetching will be scaled with the number of in-flight incoming receive requests
684 * made to the client. The advantage of this is reducing the number of outgoing requests
685 * made to SQS when incoming requests are reduced: in particular, if all incoming requests
686 * stop no future requests to SQS will be made. The disadvantage is increased latency when
687 * incoming requests first start occurring.
688 */
689 public QueueBufferConfig withAdapativePrefetching(boolean adaptivePrefetching) {
690 setAdaptivePrefetching(adaptivePrefetching);
691 return this;
692 }
693
694 /**
695 * Returns the flushOnShutdown value. The default value is false which indicates flushOnShutdown is disabled.
696 *
697 * Enabling this option will flush the pending requests in the {@link SendQueueBuffer} during shutdown.
698 *
699 * @return true if flushOnShutdown is enabled, otherwise false.
700 */
701 public boolean isFlushOnShutdown() {
702 return flushOnShutdown;
703 }
704
705 /**
706 * Sets the flushOnShutdown option. The default value is false which indicates flushOnShutdown is disabled.
707 *
708 * Enabling this option will flush the pending requests in the {@link SendQueueBuffer} during shutdown.
709 *
710 * @param flushOnShutdown boolean value to configure flushOnShutdown.
711 */
712 public void setFlushOnShutdown(boolean flushOnShutdown) {
713 this.flushOnShutdown = flushOnShutdown;
714 }
715
716 /**
717 * Sets the flushOnShutdown option. The default value is false which indicates flushOnShutdown is disabled.
718 *
719 * Enabling this option will flush the pending requests in the {@link SendQueueBuffer} during shutdown.
720 *
721 * @param flushOnShutdown boolean value to configure flushOnShutdown.
722 * @return This object for method chaining.
723 */
724 public QueueBufferConfig withFlushOnShutdown(boolean flushOnShutdown) {
725 setFlushOnShutdown(flushOnShutdown);
726 return this;
727 }
728
729 /**
730 * this method checks the config for validity. If the config is deemed to be invalid, an
731 * informative exception is thrown.
732 *
733 * @throws AmazonClientException
734 * with a message explaining why the config was invalid
735 */
736 void validate() {
737 if (visibilityTimeoutSeconds == 0) {
738 throw new AmazonClientException("Visibility timeout value may not be equal to zero ");
739 }
740 }
741
742 }
743