1 package org.influxdb.impl;
2
3 import org.influxdb.InfluxDB;
4 import org.influxdb.InfluxDBException;
5 import org.influxdb.dto.BatchPoints;
6 import org.influxdb.dto.Point;
7
8 import java.util.Collection;
9 import java.util.Iterator;
10 import java.util.LinkedList;
11 import java.util.List;
12 import java.util.ListIterator;
13 import java.util.function.BiConsumer;
14
15
19 class RetryCapableBatchWriter implements BatchWriter {
20
21 private InfluxDB influxDB;
22 private BiConsumer<Iterable<Point>, Throwable> exceptionHandler;
23 private LinkedList<BatchPoints> batchQueue;
24 private int requestActionsLimit;
25 private int retryBufferCapacity;
26 private int usedRetryBufferCapacity;
27
28 RetryCapableBatchWriter(final InfluxDB influxDB, final BiConsumer<Iterable<Point>, Throwable> exceptionHandler,
29 final int retryBufferCapacity, final int requestActionsLimit) {
30 this.influxDB = influxDB;
31 this.exceptionHandler = exceptionHandler;
32 batchQueue = new LinkedList<>();
33 this.retryBufferCapacity = retryBufferCapacity;
34 this.requestActionsLimit = requestActionsLimit;
35 }
36
37 private enum WriteResultOutcome { WRITTEN, FAILED_RETRY_POSSIBLE, FAILED_RETRY_IMPOSSIBLE }
38
39 private static final class WriteResult {
40
41 static final WriteResult WRITTEN = new WriteResult(WriteResultOutcome.WRITTEN);
42
43 WriteResultOutcome outcome;
44 Throwable throwable;
45
46 private WriteResult(final WriteResultOutcome outcome) {
47 this.outcome = outcome;
48 }
49
50 private WriteResult(final WriteResultOutcome outcome, final Throwable throwable) {
51 this.outcome = outcome;
52 this.throwable = throwable;
53 }
54
55 private WriteResult(final InfluxDBException e) {
56 this.throwable = e;
57 if (e.isRetryWorth()) {
58 this.outcome = WriteResultOutcome.FAILED_RETRY_POSSIBLE;
59 } else {
60 this.outcome = WriteResultOutcome.FAILED_RETRY_IMPOSSIBLE;
61 }
62 }
63 }
64
65
67 @Override
68 public synchronized void write(final Collection<BatchPoints> collection) {
69
70 ListIterator<BatchPoints> batchQueueIterator = batchQueue.listIterator();
71 while (batchQueueIterator.hasNext()) {
72 BatchPoints entry = batchQueueIterator.next();
73 WriteResult result = tryToWrite(entry);
74 if (result.outcome == WriteResultOutcome.WRITTEN
75 || result.outcome == WriteResultOutcome.FAILED_RETRY_IMPOSSIBLE) {
76 batchQueueIterator.remove();
77 usedRetryBufferCapacity -= entry.getPoints().size();
78
79 if (result.outcome == WriteResultOutcome.FAILED_RETRY_IMPOSSIBLE) {
80 exceptionHandler.accept(entry.getPoints(), result.throwable);
81 }
82 } else {
83
84
85 for (BatchPoints batchPoints : collection) {
86 addToBatchQueue(batchPoints);
87 }
88 return;
89 }
90 }
91
92 Iterator<BatchPoints> collectionIterator = collection.iterator();
93 while (collectionIterator.hasNext()) {
94 BatchPoints batchPoints = collectionIterator.next();
95 WriteResult result = tryToWrite(batchPoints);
96 switch (result.outcome) {
97 case FAILED_RETRY_POSSIBLE:
98 addToBatchQueue(batchPoints);
99 while (collectionIterator.hasNext()) {
100 addToBatchQueue(collectionIterator.next());
101 }
102 break;
103 case FAILED_RETRY_IMPOSSIBLE:
104 exceptionHandler.accept(batchPoints.getPoints(), result.throwable);
105 break;
106 default:
107
108 }
109 }
110 }
111
112
114 @Override
115 public synchronized void close() {
116
117 for (BatchPoints points : batchQueue) {
118 WriteResult result = tryToWrite(points);
119 if (result.outcome != WriteResultOutcome.WRITTEN) {
120 exceptionHandler.accept(points.getPoints(), result.throwable);
121 }
122 }
123 }
124
125 private WriteResult tryToWrite(final BatchPoints batchPoints) {
126 try {
127 influxDB.write(batchPoints);
128 return WriteResult.WRITTEN;
129 } catch (InfluxDBException e) {
130 return new WriteResult(e);
131 } catch (Exception e) {
132 return new WriteResult(WriteResultOutcome.FAILED_RETRY_POSSIBLE, e);
133 }
134 }
135
136 private void evictTooOldFailedWrites() {
137 while (usedRetryBufferCapacity > retryBufferCapacity && batchQueue.size() > 0) {
138 List<Point> points = batchQueue.removeFirst().getPoints();
139 usedRetryBufferCapacity -= points.size();
140 exceptionHandler.accept(points,
141 new InfluxDBException.RetryBufferOverrunException(
142 "Retry buffer overrun, current capacity: " + retryBufferCapacity));
143 }
144 }
145
146 private void addToBatchQueue(final BatchPoints batchPoints) {
147 boolean hasBeenMergedIn = false;
148 if (batchQueue.size() > 0) {
149 BatchPoints last = batchQueue.getLast();
150 if (last.getPoints().size() + batchPoints.getPoints().size() <= requestActionsLimit) {
151 hasBeenMergedIn = last.mergeIn(batchPoints);
152 }
153 }
154 if (!hasBeenMergedIn) {
155 batchQueue.add(batchPoints);
156 }
157
158 usedRetryBufferCapacity += batchPoints.getPoints().size();
159 evictTooOldFailedWrites();
160 }
161 }
162