1 package org.influxdb;
2
3 import org.influxdb.dto.Point;
4
5 import java.util.concurrent.Executors;
6 import java.util.concurrent.ThreadFactory;
7 import java.util.concurrent.TimeUnit;
8 import java.util.function.BiConsumer;
9 import java.util.function.Consumer;
10
11 /**
12 * BatchOptions are used to configure batching of individual data point writes
13 * into InfluxDB. See {@link InfluxDB#enableBatch(BatchOptions)}
14 */
15 public final class BatchOptions implements Cloneable {
16
17 // default values here are consistent with Telegraf
18 public static final int DEFAULT_BATCH_ACTIONS_LIMIT = 1000;
19 public static final int DEFAULT_BATCH_INTERVAL_DURATION = 1000;
20 public static final int DEFAULT_JITTER_INTERVAL_DURATION = 0;
21 public static final int DEFAULT_BUFFER_LIMIT = 10000;
22 public static final TimeUnit DEFAULT_PRECISION = TimeUnit.NANOSECONDS;
23 public static final boolean DEFAULT_DROP_ACTIONS_ON_QUEUE_EXHAUSTION = false;
24
25
26 /**
27 * Default batch options. This class is immutable, each configuration
28 * is built by taking the DEFAULTS and setting specific configuration
29 * properties.
30 */
31 public static final BatchOptions DEFAULTS = new BatchOptions();
32
33 private int actions = DEFAULT_BATCH_ACTIONS_LIMIT;
34 private int flushDuration = DEFAULT_BATCH_INTERVAL_DURATION;
35 private int jitterDuration = DEFAULT_JITTER_INTERVAL_DURATION;
36 private int bufferLimit = DEFAULT_BUFFER_LIMIT;
37 private TimeUnit precision = DEFAULT_PRECISION;
38 private boolean dropActionsOnQueueExhaustion = DEFAULT_DROP_ACTIONS_ON_QUEUE_EXHAUSTION;
39 private Consumer<Point> droppedActionHandler = (point) -> {
40 };
41
42 private ThreadFactory threadFactory = Executors.defaultThreadFactory();
43 BiConsumer<Iterable<Point>, Throwable> exceptionHandler = (points, throwable) -> {
44 };
45
46 private InfluxDB.ConsistencyLevel consistency = InfluxDB.ConsistencyLevel.ONE;
47
48 private BatchOptions() {
49 }
50
51 /**
52 * @param actions the number of actions to collect
53 * @return the BatchOptions instance to be able to use it in a fluent manner.
54 */
55 public BatchOptions actions(final int actions) {
56 BatchOptions clone = getClone();
57 clone.actions = actions;
58 return clone;
59 }
60
61 /**
62 * @param flushDuration the time to wait at most (milliseconds).
63 * @return the BatchOptions instance to be able to use it in a fluent manner.
64 */
65 public BatchOptions flushDuration(final int flushDuration) {
66 BatchOptions clone = getClone();
67 clone.flushDuration = flushDuration;
68 return clone;
69 }
70
71 /**
72 * Jitters the batch flush interval by a random amount. This is primarily to avoid
73 * large write spikes for users running a large number of client instances.
74 * ie, a jitter of 5s and flush duration 10s means flushes will happen every 10-15s.
75 *
76 * @param jitterDuration (milliseconds)
77 * @return the BatchOptions instance to be able to use it in a fluent manner.
78 */
79 public BatchOptions jitterDuration(final int jitterDuration) {
80 BatchOptions clone = getClone();
81 clone.jitterDuration = jitterDuration;
82 return clone;
83 }
84
85 /**
86 * The client maintains a buffer for failed writes so that the writes will be retried later on. This may
87 * help to overcome temporary network problems or InfluxDB load spikes.
88 * When the buffer is full and new points are written, oldest entries in the buffer are lost.
89 *
90 * To disable this feature set buffer limit to a value smaller than {@link BatchOptions#getActions}
91 *
92 * @param bufferLimit maximum number of points stored in the retry buffer
93 * @return the BatchOptions instance to be able to use it in a fluent manner.
94 */
95 public BatchOptions bufferLimit(final int bufferLimit) {
96 BatchOptions clone = getClone();
97 clone.bufferLimit = bufferLimit;
98 return clone;
99 }
100
101 /**
102 * @param threadFactory a ThreadFactory instance to be used
103 * @return the BatchOptions instance to be able to use it in a fluent manner.
104 */
105 public BatchOptions threadFactory(final ThreadFactory threadFactory) {
106 BatchOptions clone = getClone();
107 clone.threadFactory = threadFactory;
108 return clone;
109 }
110
111 /**
112 * @param exceptionHandler a consumer function to handle asynchronous errors
113 * @return the BatchOptions instance to be able to use it in a fluent manner.
114 */
115 public BatchOptions exceptionHandler(final BiConsumer<Iterable<Point>, Throwable> exceptionHandler) {
116 BatchOptions clone = getClone();
117 clone.exceptionHandler = exceptionHandler;
118 return clone;
119 }
120
121 /**
122 * @param consistency cluster consistency setting (how many nodes have to store data points
123 * to treat a write as a success)
124 * @return the BatchOptions instance to be able to use it in a fluent manner.
125 */
126 public BatchOptions consistency(final InfluxDB.ConsistencyLevel consistency) {
127 BatchOptions clone = getClone();
128 clone.consistency = consistency;
129 return clone;
130 }
131
132 /**
133 * Set the time precision to use for the whole batch. If unspecified, will default to {@link TimeUnit#NANOSECONDS}.
134 * @param precision sets the precision to use
135 * @return the BatchOptions instance to be able to use it in a fluent manner.
136 */
137 public BatchOptions precision(final TimeUnit precision) {
138 BatchOptions clone = getClone();
139 clone.precision = precision;
140 return clone;
141 }
142
143 /**
144 * Set to define the behaviour when the action queue exhausts. If unspecified, will default to false which means
145 * that the {@link InfluxDB#write(Point)} will be blocked till the space in the queue is created.
146 * true means that the newer actions being written to the queue will be dropped and
147 * {@link BatchOptions#droppedActionHandler} will be called.
148 * @param dropActionsOnQueueExhaustion sets the behavior
149 * @return the BatchOptions instance to be able to use it in a fluent manner.
150 */
151 public BatchOptions dropActionsOnQueueExhaustion(final boolean dropActionsOnQueueExhaustion) {
152 BatchOptions clone = getClone();
153 clone.dropActionsOnQueueExhaustion = dropActionsOnQueueExhaustion;
154 return clone;
155 }
156
157 /**
158 * Handler to handle dropped actions due to queue actions. This is only valid when
159 * {@link BatchOptions#dropActionsOnQueueExhaustion} is set to true.
160 * @param droppedActionHandler to handle action drops on action queue exhaustion.
161 * @return the BatchOptions instance to be able to use it in a fluent manner.
162 */
163 public BatchOptions droppedActionHandler(final Consumer<Point> droppedActionHandler) {
164 BatchOptions clone = getClone();
165 clone.droppedActionHandler = droppedActionHandler;
166 return clone;
167 }
168
169
170 /**
171 * @return actions the number of actions to collect
172 */
173 public int getActions() {
174 return actions;
175 }
176
177 /**
178 * @return flushDuration the time to wait at most (milliseconds).
179 */
180 public int getFlushDuration() {
181 return flushDuration;
182 }
183
184 /**
185 * @return batch flush interval jitter value (milliseconds)
186 */
187 public int getJitterDuration() {
188 return jitterDuration;
189 }
190
191 /**
192 * @return Maximum number of points stored in the retry buffer, see {@link BatchOptions#bufferLimit(int)}
193 */
194 public int getBufferLimit() {
195 return bufferLimit;
196 }
197
198 /**
199 * @return a ThreadFactory instance to be used
200 */
201 public ThreadFactory getThreadFactory() {
202 return threadFactory;
203 }
204
205 /**
206 * @return a consumer function to handle asynchronous errors
207 */
208 public BiConsumer<Iterable<Point>, Throwable> getExceptionHandler() {
209 return exceptionHandler;
210 }
211
212 /**
213 * @return cluster consistency setting (how many nodes have to store data points
214 * to treat a write as a success)
215 */
216 public InfluxDB.ConsistencyLevel getConsistency() {
217 return consistency;
218 }
219
220 /**
221 * @return the time precision
222 */
223 public TimeUnit getPrecision() {
224 return precision;
225 }
226
227
228 /**
229 * @return a boolean determining whether to drop actions on action queue exhaustion.
230 */
231 public boolean isDropActionsOnQueueExhaustion() {
232 return dropActionsOnQueueExhaustion;
233 }
234
235 /**
236 * @return a consumer function to handle actions drops on action queue exhaustion.
237 */
238 public Consumer<Point> getDroppedActionHandler() {
239 return droppedActionHandler;
240 }
241
242 private BatchOptions getClone() {
243 try {
244 return (BatchOptions) this.clone();
245 } catch (CloneNotSupportedException e) {
246 throw new RuntimeException(e);
247 }
248 }
249
250 }
251