1 package org.influxdb.impl;
2
3 import org.influxdb.InfluxDB;
4 import org.influxdb.InfluxDB.ConsistencyLevel;
5 import org.influxdb.dto.BatchPoints;
6 import org.influxdb.dto.Point;
7
8 import java.util.ArrayList;
9 import java.util.Collections;
10 import java.util.HashMap;
11 import java.util.List;
12 import java.util.Map;
13 import java.util.Map.Entry;
14 import java.util.Objects;
15 import java.util.concurrent.BlockingQueue;
16 import java.util.concurrent.Executors;
17 import java.util.concurrent.LinkedBlockingQueue;
18 import java.util.concurrent.ScheduledExecutorService;
19 import java.util.concurrent.ThreadFactory;
20 import java.util.concurrent.TimeUnit;
21 import java.util.function.BiConsumer;
22 import java.util.function.Consumer;
23 import java.util.function.Supplier;
24 import java.util.logging.Level;
25 import java.util.logging.Logger;
26
27
34 public final class BatchProcessor {
35
36 private static final Logger LOG = Logger.getLogger(BatchProcessor.class.getName());
37 protected final BlockingQueue<AbstractBatchEntry> queue;
38 private final ScheduledExecutorService scheduler;
39 private final BiConsumer<Iterable<Point>, Throwable> exceptionHandler;
40 final InfluxDB influxDB;
41 final int actions;
42 private final TimeUnit flushIntervalUnit;
43 private final int flushInterval;
44 private final ConsistencyLevel consistencyLevel;
45 private final int jitterInterval;
46 private final TimeUnit precision;
47 private final BatchWriter batchWriter;
48 private boolean dropActionsOnQueueExhaustion;
49 Consumer<Point> droppedActionHandler;
50 Supplier<Double> randomSupplier;
51
52
55 public static final class Builder {
56 private final InfluxDB influxDB;
57 private ThreadFactory threadFactory = Executors.defaultThreadFactory();
58 private int actions;
59 private TimeUnit flushIntervalUnit;
60 private int flushInterval;
61 private int jitterInterval;
62
63
64 private int bufferLimit = 0;
65 private TimeUnit precision;
66
67 private BiConsumer<Iterable<Point>, Throwable> exceptionHandler = (entries, throwable) -> { };
68 private ConsistencyLevel consistencyLevel;
69
70 private boolean dropActionsOnQueueExhaustion;
71 private Consumer<Point> droppedActionsHandler;
72
77 public Builder threadFactory(final ThreadFactory threadFactory) {
78 this.threadFactory = threadFactory;
79 return this;
80 }
81
82
86 public Builder(final InfluxDB influxDB) {
87 this.influxDB = influxDB;
88 }
89
90
97 public Builder actions(final int maxActions) {
98 this.actions = maxActions;
99 return this;
100 }
101
102
112 public Builder interval(final int interval, final TimeUnit unit) {
113 this.flushInterval = interval;
114 this.flushIntervalUnit = unit;
115 return this;
116 }
117
118
130 public Builder interval(final int flushInterval, final int jitterInterval, final TimeUnit unit) {
131 this.flushInterval = flushInterval;
132 this.jitterInterval = jitterInterval;
133 this.flushIntervalUnit = unit;
134 return this;
135 }
136
137
144 public Builder bufferLimit(final int bufferLimit) {
145 this.bufferLimit = bufferLimit;
146 return this;
147 }
148
149
157 public Builder exceptionHandler(final BiConsumer<Iterable<Point>, Throwable> handler) {
158 this.exceptionHandler = handler;
159 return this;
160 }
161
162
173 public Builder dropActionsOnQueueExhaustion(final boolean dropActionsOnQueueExhaustion) {
174 this.dropActionsOnQueueExhaustion = dropActionsOnQueueExhaustion;
175 return this;
176 }
177
178
186 public Builder droppedActionHandler(final Consumer<Point> handler) {
187 this.droppedActionsHandler = handler;
188 return this;
189 }
190
191
192
193
201 public Builder consistencyLevel(final ConsistencyLevel consistencyLevel) {
202 this.consistencyLevel = consistencyLevel;
203 return this;
204 }
205
206
214 public Builder precision(final TimeUnit precision) {
215 this.precision = precision;
216 return this;
217 }
218
219
224 public BatchProcessor build() {
225 Objects.requireNonNull(this.influxDB, "influxDB");
226 Preconditions.checkPositiveNumber(this.actions, "actions");
227 Preconditions.checkPositiveNumber(this.flushInterval, "flushInterval");
228 Preconditions.checkNotNegativeNumber(jitterInterval, "jitterInterval");
229 Preconditions.checkNotNegativeNumber(bufferLimit, "bufferLimit");
230 Objects.requireNonNull(this.flushIntervalUnit, "flushIntervalUnit");
231 Objects.requireNonNull(this.threadFactory, "threadFactory");
232 Objects.requireNonNull(this.exceptionHandler, "exceptionHandler");
233 BatchWriter batchWriter;
234 if (this.bufferLimit > this.actions) {
235 batchWriter = new RetryCapableBatchWriter(this.influxDB, this.exceptionHandler, this.bufferLimit, this.actions);
236 } else {
237 batchWriter = new OneShotBatchWriter(this.influxDB);
238 }
239 return new BatchProcessor(this.influxDB, batchWriter, this.threadFactory, this.actions, this.flushIntervalUnit,
240 this.flushInterval, this.jitterInterval, exceptionHandler, this.consistencyLevel,
241 this.precision, this.dropActionsOnQueueExhaustion, this.droppedActionsHandler);
242 }
243 }
244
245 abstract static class AbstractBatchEntry {
246 private final Point point;
247
248 public AbstractBatchEntry(final Point point) {
249 this.point = point;
250 }
251
252 public Point getPoint() {
253 return this.point;
254 }
255 }
256
257 static class HttpBatchEntry extends AbstractBatchEntry {
258 private final String db;
259 private final String rp;
260
261 public HttpBatchEntry(final Point point, final String db, final String rp) {
262 super(point);
263 this.db = db;
264 this.rp = rp;
265 }
266
267 public String getDb() {
268 return this.db;
269 }
270
271 public String getRp() {
272 return this.rp;
273 }
274 }
275
276 static class UdpBatchEntry extends AbstractBatchEntry {
277 private final int udpPort;
278
279 public UdpBatchEntry(final Point point, final int udpPort) {
280 super(point);
281 this.udpPort = udpPort;
282 }
283
284 public int getUdpPort() {
285 return this.udpPort;
286 }
287 }
288
289
296 public static Builder builder(final InfluxDB influxDB) {
297 return new Builder(influxDB);
298 }
299
300 BatchProcessor(final InfluxDB influxDB, final BatchWriter batchWriter, final ThreadFactory threadFactory,
301 final int actions, final TimeUnit flushIntervalUnit, final int flushInterval, final int jitterInterval,
302 final BiConsumer<Iterable<Point>, Throwable> exceptionHandler,
303 final ConsistencyLevel consistencyLevel, final TimeUnit precision,
304 final boolean dropActionsOnQueueExhaustion, final Consumer<Point> droppedActionHandler) {
305 super();
306 this.influxDB = influxDB;
307 this.batchWriter = batchWriter;
308 this.actions = actions;
309 this.flushIntervalUnit = flushIntervalUnit;
310 this.flushInterval = flushInterval;
311 this.jitterInterval = jitterInterval;
312 this.scheduler = Executors.newSingleThreadScheduledExecutor(threadFactory);
313 this.exceptionHandler = exceptionHandler;
314 this.consistencyLevel = consistencyLevel;
315 this.precision = precision;
316 this.dropActionsOnQueueExhaustion = dropActionsOnQueueExhaustion;
317 this.droppedActionHandler = droppedActionHandler;
318 if (actions > 1 && actions < Integer.MAX_VALUE) {
319 this.queue = new LinkedBlockingQueue<>(actions);
320 } else {
321 this.queue = new LinkedBlockingQueue<>();
322 }
323 this.randomSupplier = Math::random;
324
325 Runnable flushRunnable = new Runnable() {
326 @Override
327 public void run() {
328
329 write();
330 int jitterInterval = (int) (randomSupplier.get() * BatchProcessor.this.jitterInterval);
331 BatchProcessor.this.scheduler.schedule(this,
332 BatchProcessor.this.flushInterval + jitterInterval, BatchProcessor.this.flushIntervalUnit);
333 }
334 };
335
336 this.scheduler.schedule(flushRunnable,
337 this.flushInterval + (int) (randomSupplier.get() * BatchProcessor.this.jitterInterval),
338 this.flushIntervalUnit);
339 }
340
341 void write() {
342 List<Point> currentBatch = null;
343 try {
344 if (this.queue.isEmpty()) {
345 BatchProcessor.this.batchWriter.write(Collections.emptyList());
346 return;
347 }
348
349 Map<String, BatchPoints> batchKeyToBatchPoints = new HashMap<>();
350
351 Map<Integer, List<String>> udpPortToBatchPoints = new HashMap<>();
352 List<AbstractBatchEntry> batchEntries = new ArrayList<>(this.queue.size());
353 this.queue.drainTo(batchEntries);
354 currentBatch = new ArrayList<>(batchEntries.size());
355
356 for (AbstractBatchEntry batchEntry : batchEntries) {
357 Point point = batchEntry.getPoint();
358 currentBatch.add(point);
359 if (batchEntry instanceof HttpBatchEntry) {
360 HttpBatchEntry httpBatchEntry = HttpBatchEntry.class.cast(batchEntry);
361 String dbName = httpBatchEntry.getDb();
362 String rp = httpBatchEntry.getRp();
363 String batchKey = dbName + "_" + rp;
364 if (!batchKeyToBatchPoints.containsKey(batchKey)) {
365 BatchPoints batchPoints = BatchPoints.database(dbName)
366 .retentionPolicy(rp).consistency(getConsistencyLevel())
367 .precision(getPrecision()).build();
368 batchKeyToBatchPoints.put(batchKey, batchPoints);
369 }
370 batchKeyToBatchPoints.get(batchKey).point(point);
371 } else if (batchEntry instanceof UdpBatchEntry) {
372 UdpBatchEntry udpBatchEntry = UdpBatchEntry.class.cast(batchEntry);
373 int udpPort = udpBatchEntry.getUdpPort();
374 if (!udpPortToBatchPoints.containsKey(udpPort)) {
375 List<String> batchPoints = new ArrayList<String>();
376 udpPortToBatchPoints.put(udpPort, batchPoints);
377 }
378 udpPortToBatchPoints.get(udpPort).add(point.lineProtocol());
379 }
380 }
381
382 BatchProcessor.this.batchWriter.write(batchKeyToBatchPoints.values());
383
384 for (Entry<Integer, List<String>> entry : udpPortToBatchPoints.entrySet()) {
385 for (String lineprotocolStr : entry.getValue()) {
386 BatchProcessor.this.influxDB.write(entry.getKey(), lineprotocolStr);
387 }
388 }
389 } catch (Throwable t) {
390
391 exceptionHandler.accept(currentBatch, t);
392 LOG.log(Level.SEVERE, "Batch could not be sent. Data will be lost", t);
393 }
394 }
395
396
402 void put(final AbstractBatchEntry batchEntry) {
403 try {
404 if (this.dropActionsOnQueueExhaustion) {
405 if (!this.queue.offer(batchEntry)) {
406 this.droppedActionHandler.accept(batchEntry.getPoint());
407 return;
408 }
409 } else {
410 this.queue.put(batchEntry);
411 }
412 } catch (InterruptedException e) {
413 throw new RuntimeException(e);
414 }
415 if (this.queue.size() >= this.actions) {
416 this.scheduler.submit(new Runnable() {
417 @Override
418 public void run() {
419 write();
420 }
421 });
422 }
423 }
424
425
430 void flushAndShutdown() {
431 this.write();
432 this.scheduler.shutdown();
433 this.batchWriter.close();
434 }
435
436
439 void flush() {
440 this.write();
441 }
442
443 public ConsistencyLevel getConsistencyLevel() {
444 return consistencyLevel;
445 }
446
447 public TimeUnit getPrecision() {
448 return precision;
449 }
450
451 BatchWriter getBatchWriter() {
452 return batchWriter;
453 }
454
455 public boolean isDropActionsOnQueueExhaustion() {
456 return dropActionsOnQueueExhaustion;
457 }
458
459 public Consumer<Point> getDroppedActionHandler() {
460 return droppedActionHandler;
461 }
462 }
463