1 package org.influxdb.impl;
2
3 import org.influxdb.InfluxDB;
4 import org.influxdb.InfluxDB.ConsistencyLevel;
5 import org.influxdb.dto.BatchPoints;
6 import org.influxdb.dto.Point;
7
8 import java.util.ArrayList;
9 import java.util.Collections;
10 import java.util.HashMap;
11 import java.util.List;
12 import java.util.Map;
13 import java.util.Map.Entry;
14 import java.util.Objects;
15 import java.util.concurrent.BlockingQueue;
16 import java.util.concurrent.Executors;
17 import java.util.concurrent.LinkedBlockingQueue;
18 import java.util.concurrent.ScheduledExecutorService;
19 import java.util.concurrent.ThreadFactory;
20 import java.util.concurrent.TimeUnit;
21 import java.util.function.BiConsumer;
22 import java.util.function.Consumer;
23 import java.util.function.Supplier;
24 import java.util.logging.Level;
25 import java.util.logging.Logger;
26
27 /**
28  * A BatchProcessor can be attached to a InfluxDB Instance to collect single point writes and
29  * aggregates them to BatchPoints to get a better write performance.
30  *
31  * @author stefan.majer [at] gmail.com
32  *
33  */

34 public final class BatchProcessor {
35
36   private static final Logger LOG = Logger.getLogger(BatchProcessor.class.getName());
37   protected final BlockingQueue<AbstractBatchEntry> queue;
38   private final ScheduledExecutorService scheduler;
39   private final BiConsumer<Iterable<Point>, Throwable> exceptionHandler;
40   final InfluxDB influxDB;
41   final int actions;
42   private final TimeUnit flushIntervalUnit;
43   private final int flushInterval;
44   private final ConsistencyLevel consistencyLevel;
45   private final int jitterInterval;
46   private final TimeUnit precision;
47   private final BatchWriter batchWriter;
48   private boolean dropActionsOnQueueExhaustion;
49   Consumer<Point> droppedActionHandler;
50   Supplier<Double> randomSupplier;
51
52   /**
53    * The Builder to create a BatchProcessor instance.
54    */

55   public static final class Builder {
56     private final InfluxDB influxDB;
57     private ThreadFactory threadFactory = Executors.defaultThreadFactory();
58     private int actions;
59     private TimeUnit flushIntervalUnit;
60     private int flushInterval;
61     private int jitterInterval;
62     // this is a default value if the InfluxDb.enableBatch(BatchOptions) IS NOT used
63     // the reason is backward compatibility
64     private int bufferLimit = 0;
65     private TimeUnit precision;
66
67     private BiConsumer<Iterable<Point>, Throwable> exceptionHandler = (entries, throwable) -> { };
68     private ConsistencyLevel consistencyLevel;
69
70     private boolean dropActionsOnQueueExhaustion;
71     private Consumer<Point> droppedActionsHandler;
72     /**
73      * @param threadFactory
74      *            is optional.
75      * @return this Builder to use it fluent
76      */

77     public Builder threadFactory(final ThreadFactory threadFactory) {
78       this.threadFactory = threadFactory;
79       return this;
80     }
81
82     /**
83      * @param influxDB
84      *            is mandatory.
85      */

86     public Builder(final InfluxDB influxDB) {
87       this.influxDB = influxDB;
88     }
89
90     /**
91      * The number of actions after which a batchwrite must be performed.
92      *
93      * @param maxActions
94      *            number of Points written after which a write must happen.
95      * @return this Builder to use it fluent
96      */

97     public Builder actions(final int maxActions) {
98       this.actions = maxActions;
99       return this;
100     }
101
102     /**
103      * The interval at which at least should issued a write.
104      *
105      * @param interval
106      *            the interval
107      * @param unit
108      *            the TimeUnit of the interval
109      *
110      * @return this Builder to use it fluent
111      */

112     public Builder interval(final int interval, final TimeUnit unit) {
113       this.flushInterval = interval;
114       this.flushIntervalUnit = unit;
115       return this;
116     }
117
118     /**
119      * The interval at which at least should issued a write.
120      *
121      * @param flushInterval
122      *            the flush interval
123      * @param jitterInterval
124      *            the flush jitter interval
125      * @param unit
126      *            the TimeUnit of the interval
127      *
128      * @return this Builder to use it fluent
129      */

130     public Builder interval(final int flushInterval, final int jitterInterval, final TimeUnit unit) {
131       this.flushInterval = flushInterval;
132       this.jitterInterval = jitterInterval;
133       this.flushIntervalUnit = unit;
134       return this;
135     }
136
137     /**
138      * A buffer for failed writes so that the writes will be retried later on. When the buffer is full and
139      * new points are written, oldest entries in the buffer are lost.
140      *
141      * @param bufferLimit maximum number of points stored in the buffer
142      * @return this Builder to use it fluent
143      */

144     public Builder bufferLimit(final int bufferLimit) {
145       this.bufferLimit = bufferLimit;
146       return this;
147     }
148
149     /**
150      * A callback to be used when an error occurs during a batchwrite.
151      *
152      * @param handler
153      *            the handler
154      *
155      * @return this Builder to use it fluent
156      */

157     public Builder exceptionHandler(final BiConsumer<Iterable<Point>, Throwable> handler) {
158       this.exceptionHandler = handler;
159       return this;
160     }
161
162     /**
163      * To define the behaviour when the action queue exhausts. If unspecified, will default to false which means that
164      * the {@link InfluxDB#write(Point)} will be blocked till the space in the queue is created.
165      * true means that the newer actions being written to the queue will dropped and
166      * {@link BatchProcessor#droppedActionHandler} will be called.
167      *
168      * @param dropActionsOnQueueExhaustion
169      *            the dropActionsOnQueueExhaustion
170      *
171      * @return this Builder to use it fluent
172      */

173     public Builder dropActionsOnQueueExhaustion(final boolean dropActionsOnQueueExhaustion) {
174       this.dropActionsOnQueueExhaustion = dropActionsOnQueueExhaustion;
175       return this;
176     }
177
178     /**
179      * A callback to be used when an actions are dropped on action queue exhaustion.
180      *
181      * @param handler
182      *            the handler
183      *
184      * @return this Builder to use it fluent
185      */

186     public Builder droppedActionHandler(final Consumer<Point> handler) {
187       this.droppedActionsHandler = handler;
188       return this;
189     }
190
191
192
193     /**
194      * Consistency level for batch write.
195      *
196      * @param consistencyLevel
197      *            the consistencyLevel
198      *
199      * @return this Builder to use it fluent
200      */

201     public Builder consistencyLevel(final ConsistencyLevel consistencyLevel) {
202         this.consistencyLevel = consistencyLevel;
203         return this;
204     }
205
206     /**
207      * Set the time precision to use for the batch.
208      *
209      * @param precision
210      *            the precision
211      *
212      * @return this Builder to use it fluent
213      */

214     public Builder precision(final TimeUnit precision) {
215         this.precision = precision;
216         return this;
217     }
218
219     /**
220      * Create the BatchProcessor.
221      *
222      * @return the BatchProcessor instance.
223      */

224     public BatchProcessor build() {
225       Objects.requireNonNull(this.influxDB, "influxDB");
226       Preconditions.checkPositiveNumber(this.actions, "actions");
227       Preconditions.checkPositiveNumber(this.flushInterval, "flushInterval");
228       Preconditions.checkNotNegativeNumber(jitterInterval, "jitterInterval");
229       Preconditions.checkNotNegativeNumber(bufferLimit, "bufferLimit");
230       Objects.requireNonNull(this.flushIntervalUnit, "flushIntervalUnit");
231       Objects.requireNonNull(this.threadFactory, "threadFactory");
232       Objects.requireNonNull(this.exceptionHandler, "exceptionHandler");
233       BatchWriter batchWriter;
234       if (this.bufferLimit > this.actions) {
235         batchWriter = new RetryCapableBatchWriter(this.influxDB, this.exceptionHandler, this.bufferLimit, this.actions);
236       } else {
237         batchWriter = new OneShotBatchWriter(this.influxDB);
238       }
239       return new BatchProcessor(this.influxDB, batchWriter, this.threadFactory, this.actions, this.flushIntervalUnit,
240                                 this.flushInterval, this.jitterInterval, exceptionHandler, this.consistencyLevel,
241                                 this.precision, this.dropActionsOnQueueExhaustion, this.droppedActionsHandler);
242     }
243   }
244
245   abstract static class AbstractBatchEntry {
246       private final Point point;
247
248       public AbstractBatchEntry(final Point point) {
249         this.point = point;
250       }
251
252       public Point getPoint() {
253         return this.point;
254       }
255   }
256
257   static class HttpBatchEntry extends AbstractBatchEntry {
258     private final String db;
259     private final String rp;
260
261     public HttpBatchEntry(final Point point, final String db, final String rp) {
262       super(point);
263       this.db = db;
264       this.rp = rp;
265     }
266
267     public String getDb() {
268       return this.db;
269     }
270
271     public String getRp() {
272       return this.rp;
273     }
274   }
275
276   static class UdpBatchEntry extends AbstractBatchEntry {
277       private final int udpPort;
278
279       public UdpBatchEntry(final Point point, final int udpPort) {
280         super(point);
281         this.udpPort = udpPort;
282       }
283
284       public int getUdpPort() {
285         return this.udpPort;
286       }
287   }
288
289   /**
290    * Static method to create the Builder for this BatchProcessor.
291    *
292    * @param influxDB
293    *            the influxdb database handle.
294    * @return the Builder to create the BatchProcessor.
295    */

296   public static Builder builder(final InfluxDB influxDB) {
297     return new Builder(influxDB);
298   }
299
300   BatchProcessor(final InfluxDB influxDB, final BatchWriter batchWriter, final ThreadFactory threadFactory,
301                  final int actions, final TimeUnit flushIntervalUnit, final int flushInterval, final int jitterInterval,
302                  final BiConsumer<Iterable<Point>, Throwable> exceptionHandler,
303                  final ConsistencyLevel consistencyLevel, final TimeUnit precision,
304                  final boolean dropActionsOnQueueExhaustion, final Consumer<Point> droppedActionHandler) {
305     super();
306     this.influxDB = influxDB;
307     this.batchWriter = batchWriter;
308     this.actions = actions;
309     this.flushIntervalUnit = flushIntervalUnit;
310     this.flushInterval = flushInterval;
311     this.jitterInterval = jitterInterval;
312     this.scheduler = Executors.newSingleThreadScheduledExecutor(threadFactory);
313     this.exceptionHandler = exceptionHandler;
314     this.consistencyLevel = consistencyLevel;
315     this.precision = precision;
316     this.dropActionsOnQueueExhaustion = dropActionsOnQueueExhaustion;
317     this.droppedActionHandler = droppedActionHandler;
318     if (actions > 1 && actions < Integer.MAX_VALUE) {
319         this.queue = new LinkedBlockingQueue<>(actions);
320     } else {
321         this.queue = new LinkedBlockingQueue<>();
322     }
323     this.randomSupplier = Math::random;
324
325     Runnable flushRunnable = new Runnable() {
326       @Override
327       public void run() {
328         // write doesn't throw any exceptions
329         write();
330         int jitterInterval = (int) (randomSupplier.get() * BatchProcessor.this.jitterInterval);
331         BatchProcessor.this.scheduler.schedule(this,
332                 BatchProcessor.this.flushInterval + jitterInterval, BatchProcessor.this.flushIntervalUnit);
333       }
334     };
335     // Flush at specified Rate
336     this.scheduler.schedule(flushRunnable,
337             this.flushInterval + (int) (randomSupplier.get() * BatchProcessor.this.jitterInterval),
338             this.flushIntervalUnit);
339   }
340
341   void write() {
342     List<Point> currentBatch = null;
343     try {
344       if (this.queue.isEmpty()) {
345         BatchProcessor.this.batchWriter.write(Collections.emptyList());
346         return;
347       }
348       //for batch on HTTP.
349       Map<String, BatchPoints> batchKeyToBatchPoints = new HashMap<>();
350       //for batch on UDP.
351       Map<Integer, List<String>> udpPortToBatchPoints = new HashMap<>();
352       List<AbstractBatchEntry> batchEntries = new ArrayList<>(this.queue.size());
353       this.queue.drainTo(batchEntries);
354       currentBatch = new ArrayList<>(batchEntries.size());
355
356       for (AbstractBatchEntry batchEntry : batchEntries) {
357         Point point = batchEntry.getPoint();
358         currentBatch.add(point);
359         if (batchEntry instanceof HttpBatchEntry) {
360             HttpBatchEntry httpBatchEntry = HttpBatchEntry.class.cast(batchEntry);
361             String dbName = httpBatchEntry.getDb();
362             String rp = httpBatchEntry.getRp();
363             String batchKey = dbName + "_" + rp;
364             if (!batchKeyToBatchPoints.containsKey(batchKey)) {
365               BatchPoints batchPoints = BatchPoints.database(dbName)
366                                                    .retentionPolicy(rp).consistency(getConsistencyLevel())
367                                                    .precision(getPrecision()).build();
368               batchKeyToBatchPoints.put(batchKey, batchPoints);
369             }
370             batchKeyToBatchPoints.get(batchKey).point(point);
371         } else if (batchEntry instanceof UdpBatchEntry) {
372             UdpBatchEntry udpBatchEntry = UdpBatchEntry.class.cast(batchEntry);
373             int udpPort = udpBatchEntry.getUdpPort();
374             if (!udpPortToBatchPoints.containsKey(udpPort)) {
375               List<String> batchPoints = new ArrayList<String>();
376               udpPortToBatchPoints.put(udpPort, batchPoints);
377             }
378             udpPortToBatchPoints.get(udpPort).add(point.lineProtocol());
379         }
380       }
381
382       BatchProcessor.this.batchWriter.write(batchKeyToBatchPoints.values());
383
384       for (Entry<Integer, List<String>> entry : udpPortToBatchPoints.entrySet()) {
385           for (String lineprotocolStr : entry.getValue()) {
386               BatchProcessor.this.influxDB.write(entry.getKey(), lineprotocolStr);
387           }
388       }
389     } catch (Throwable t) {
390       // any exception wouldn't stop the scheduler
391       exceptionHandler.accept(currentBatch, t);
392       LOG.log(Level.SEVERE, "Batch could not be sent. Data will be lost", t);
393     }
394   }
395
396   /**
397    * Put a single BatchEntry to the cache for later processing.
398    *
399    * @param batchEntry
400    *            the batchEntry to write to the cache.
401    */

402   void put(final AbstractBatchEntry batchEntry) {
403     try {
404         if (this.dropActionsOnQueueExhaustion) {
405           if (!this.queue.offer(batchEntry)) {
406             this.droppedActionHandler.accept(batchEntry.getPoint());
407             return;
408           }
409         } else {
410           this.queue.put(batchEntry);
411         }
412     } catch (InterruptedException e) {
413         throw new RuntimeException(e);
414     }
415     if (this.queue.size() >= this.actions) {
416       this.scheduler.submit(new Runnable() {
417         @Override
418         public void run() {
419           write();
420         }
421       });
422     }
423   }
424
425   /**
426    * Flush the current open writes to influxdb and end stop the reaper thread. This should only be
427    * called if no batch processing is needed anymore.
428    *
429    */

430   void flushAndShutdown() {
431     this.write();
432     this.scheduler.shutdown();
433     this.batchWriter.close();
434   }
435
436   /**
437    * Flush the current open writes to InfluxDB. This will block until all pending points are written.
438    */

439   void flush() {
440     this.write();
441   }
442
443   public ConsistencyLevel getConsistencyLevel() {
444     return consistencyLevel;
445   }
446
447   public TimeUnit getPrecision() {
448     return precision;
449   }
450
451   BatchWriter getBatchWriter() {
452     return batchWriter;
453   }
454
455   public boolean isDropActionsOnQueueExhaustion() {
456     return dropActionsOnQueueExhaustion;
457   }
458
459   public Consumer<Point> getDroppedActionHandler() {
460     return droppedActionHandler;
461   }
462 }
463