1 package org.influxdb;
2
3 import org.influxdb.dto.Point;
4
5 import java.util.concurrent.Executors;
6 import java.util.concurrent.ThreadFactory;
7 import java.util.concurrent.TimeUnit;
8 import java.util.function.BiConsumer;
9 import java.util.function.Consumer;
10
11 /**
12  * BatchOptions are used to configure batching of individual data point writes
13  * into InfluxDB. See {@link InfluxDB#enableBatch(BatchOptions)}
14  */

15 public final class BatchOptions implements Cloneable {
16
17   // default values here are consistent with Telegraf
18   public static final int DEFAULT_BATCH_ACTIONS_LIMIT = 1000;
19   public static final int DEFAULT_BATCH_INTERVAL_DURATION = 1000;
20   public static final int DEFAULT_JITTER_INTERVAL_DURATION = 0;
21   public static final int DEFAULT_BUFFER_LIMIT = 10000;
22   public static final TimeUnit DEFAULT_PRECISION = TimeUnit.NANOSECONDS;
23   public static final boolean DEFAULT_DROP_ACTIONS_ON_QUEUE_EXHAUSTION = false;
24
25
26   /**
27    * Default batch options. This class is immutable, each configuration
28    * is built by taking the DEFAULTS and setting specific configuration
29    * properties.
30    */

31   public static final BatchOptions DEFAULTS = new BatchOptions();
32
33   private int actions = DEFAULT_BATCH_ACTIONS_LIMIT;
34   private int flushDuration = DEFAULT_BATCH_INTERVAL_DURATION;
35   private int jitterDuration = DEFAULT_JITTER_INTERVAL_DURATION;
36   private int bufferLimit = DEFAULT_BUFFER_LIMIT;
37   private TimeUnit precision = DEFAULT_PRECISION;
38   private boolean dropActionsOnQueueExhaustion = DEFAULT_DROP_ACTIONS_ON_QUEUE_EXHAUSTION;
39   private Consumer<Point> droppedActionHandler = (point) -> {
40   };
41
42   private ThreadFactory threadFactory = Executors.defaultThreadFactory();
43   BiConsumer<Iterable<Point>, Throwable> exceptionHandler = (points, throwable) -> {
44   };
45
46   private InfluxDB.ConsistencyLevel consistency = InfluxDB.ConsistencyLevel.ONE;
47
48   private BatchOptions() {
49   }
50
51   /**
52    * @param actions the number of actions to collect
53    * @return the BatchOptions instance to be able to use it in a fluent manner.
54    */

55   public BatchOptions actions(final int actions) {
56     BatchOptions clone = getClone();
57     clone.actions = actions;
58     return clone;
59   }
60
61   /**
62    * @param flushDuration the time to wait at most (milliseconds).
63    * @return the BatchOptions instance to be able to use it in a fluent manner.
64    */

65   public BatchOptions flushDuration(final int flushDuration) {
66     BatchOptions clone = getClone();
67     clone.flushDuration = flushDuration;
68     return clone;
69   }
70
71   /**
72    * Jitters the batch flush interval by a random amount. This is primarily to avoid
73    * large write spikes for users running a large number of client instances.
74    * ie, a jitter of 5s and flush duration 10s means flushes will happen every 10-15s.
75    *
76    * @param jitterDuration (milliseconds)
77    * @return the BatchOptions instance to be able to use it in a fluent manner.
78    */

79   public BatchOptions jitterDuration(final int jitterDuration) {
80     BatchOptions clone = getClone();
81     clone.jitterDuration = jitterDuration;
82     return clone;
83   }
84
85   /**
86    * The client maintains a buffer for failed writes so that the writes will be retried later on. This may
87    * help to overcome temporary network problems or InfluxDB load spikes.
88    * When the buffer is full and new points are written, oldest entries in the buffer are lost.
89    *
90    * To disable this feature set buffer limit to a value smaller than {@link BatchOptions#getActions}
91    *
92    * @param bufferLimit maximum number of points stored in the retry buffer
93    * @return the BatchOptions instance to be able to use it in a fluent manner.
94    */

95   public BatchOptions bufferLimit(final int bufferLimit) {
96     BatchOptions clone = getClone();
97     clone.bufferLimit = bufferLimit;
98     return clone;
99   }
100
101   /**
102    * @param threadFactory a ThreadFactory instance to be used
103    * @return the BatchOptions instance to be able to use it in a fluent manner.
104    */

105   public BatchOptions threadFactory(final ThreadFactory threadFactory) {
106     BatchOptions clone = getClone();
107     clone.threadFactory = threadFactory;
108     return clone;
109   }
110
111   /**
112    * @param exceptionHandler a consumer function to handle asynchronous errors
113    * @return the BatchOptions instance to be able to use it in a fluent manner.
114    */

115   public BatchOptions exceptionHandler(final BiConsumer<Iterable<Point>, Throwable> exceptionHandler) {
116     BatchOptions clone = getClone();
117     clone.exceptionHandler = exceptionHandler;
118     return clone;
119   }
120
121   /**
122    * @param consistency cluster consistency setting (how many nodes have to store data points
123    *                    to treat a write as a success)
124    * @return the BatchOptions instance to be able to use it in a fluent manner.
125    */

126   public BatchOptions consistency(final InfluxDB.ConsistencyLevel consistency) {
127     BatchOptions clone = getClone();
128     clone.consistency = consistency;
129     return clone;
130   }
131
132   /**
133    * Set the time precision to use for the whole batch. If unspecified, will default to {@link TimeUnit#NANOSECONDS}.
134    * @param precision sets the precision to use
135    * @return the BatchOptions instance to be able to use it in a fluent manner.
136    */

137   public BatchOptions precision(final TimeUnit precision) {
138     BatchOptions clone = getClone();
139     clone.precision = precision;
140     return clone;
141   }
142
143   /**
144    * Set to define the behaviour when the action queue exhausts. If unspecified, will default to false which means
145    * that the {@link InfluxDB#write(Point)} will be blocked till the space in the queue is created.
146    * true means that the newer actions being written to the queue will be dropped and
147    * {@link BatchOptions#droppedActionHandler} will be called.
148    * @param dropActionsOnQueueExhaustion sets the behavior
149    * @return the BatchOptions instance to be able to use it in a fluent manner.
150    */

151   public BatchOptions dropActionsOnQueueExhaustion(final boolean dropActionsOnQueueExhaustion) {
152     BatchOptions clone = getClone();
153     clone.dropActionsOnQueueExhaustion = dropActionsOnQueueExhaustion;
154     return clone;
155   }
156
157   /**
158    * Handler to handle dropped actions due to queue actions. This is only valid when
159    * {@link BatchOptions#dropActionsOnQueueExhaustion} is set to true.
160    * @param droppedActionHandler to handle action drops on action queue exhaustion.
161    * @return the BatchOptions instance to be able to use it in a fluent manner.
162    */

163   public BatchOptions droppedActionHandler(final Consumer<Point> droppedActionHandler) {
164     BatchOptions clone = getClone();
165     clone.droppedActionHandler = droppedActionHandler;
166     return clone;
167   }
168
169
170   /**
171    * @return actions the number of actions to collect
172    */

173   public int getActions() {
174     return actions;
175   }
176
177   /**
178    * @return flushDuration the time to wait at most (milliseconds).
179    */

180   public int getFlushDuration() {
181     return flushDuration;
182   }
183
184   /**
185    * @return batch flush interval jitter value (milliseconds)
186    */

187   public int getJitterDuration() {
188     return jitterDuration;
189   }
190
191   /**
192    * @return Maximum number of points stored in the retry buffer, see {@link BatchOptions#bufferLimit(int)}
193    */

194   public int getBufferLimit() {
195     return bufferLimit;
196   }
197
198   /**
199    * @return a ThreadFactory instance to be used
200    */

201   public ThreadFactory getThreadFactory() {
202     return threadFactory;
203   }
204
205   /**
206    * @return a consumer function to handle asynchronous errors
207    */

208   public BiConsumer<Iterable<Point>, Throwable> getExceptionHandler() {
209     return exceptionHandler;
210   }
211
212   /**
213    * @return cluster consistency setting (how many nodes have to store data points
214    * to treat a write as a success)
215    */

216   public InfluxDB.ConsistencyLevel getConsistency() {
217     return consistency;
218   }
219
220   /**
221    * @return the time precision
222    */

223   public TimeUnit getPrecision() {
224     return precision;
225   }
226
227
228   /**
229    * @return a boolean determining whether to drop actions on action queue exhaustion.
230    */

231   public boolean isDropActionsOnQueueExhaustion() {
232     return dropActionsOnQueueExhaustion;
233   }
234
235   /**
236    * @return a consumer function to handle actions drops on action queue exhaustion.
237    */

238   public Consumer<Point> getDroppedActionHandler() {
239     return droppedActionHandler;
240   }
241
242   private BatchOptions getClone() {
243     try {
244       return (BatchOptions) this.clone();
245     } catch (CloneNotSupportedException e) {
246       throw new RuntimeException(e);
247     }
248   }
249
250 }
251