1 package org.influxdb.impl;
2
3 import org.influxdb.InfluxDB;
4 import org.influxdb.InfluxDBException;
5 import org.influxdb.dto.BatchPoints;
6 import org.influxdb.dto.Point;
7
8 import java.util.Collection;
9 import java.util.Iterator;
10 import java.util.LinkedList;
11 import java.util.List;
12 import java.util.ListIterator;
13 import java.util.function.BiConsumer;
14
15 /**
16  * Batch writer that tries to retry a write if it failed previously and
17  * the reason of the failure is not permanent.
18  */

19 class RetryCapableBatchWriter implements BatchWriter {
20
21   private InfluxDB influxDB;
22   private BiConsumer<Iterable<Point>, Throwable> exceptionHandler;
23   private LinkedList<BatchPoints> batchQueue;
24   private int requestActionsLimit;
25   private int retryBufferCapacity;
26   private int usedRetryBufferCapacity;
27
28   RetryCapableBatchWriter(final InfluxDB influxDB, final BiConsumer<Iterable<Point>, Throwable> exceptionHandler,
29                           final int retryBufferCapacity, final int requestActionsLimit) {
30     this.influxDB = influxDB;
31     this.exceptionHandler = exceptionHandler;
32     batchQueue = new LinkedList<>();
33     this.retryBufferCapacity = retryBufferCapacity;
34     this.requestActionsLimit = requestActionsLimit;
35   }
36
37   private enum WriteResultOutcome { WRITTEN, FAILED_RETRY_POSSIBLE, FAILED_RETRY_IMPOSSIBLE }
38
39   private static final class WriteResult {
40
41     static final WriteResult WRITTEN = new WriteResult(WriteResultOutcome.WRITTEN);
42
43     WriteResultOutcome outcome;
44     Throwable throwable;
45
46     private WriteResult(final WriteResultOutcome outcome) {
47       this.outcome = outcome;
48     }
49
50     private WriteResult(final WriteResultOutcome outcome, final Throwable throwable) {
51       this.outcome = outcome;
52       this.throwable = throwable;
53     }
54
55     private WriteResult(final InfluxDBException e) {
56       this.throwable = e;
57       if (e.isRetryWorth()) {
58         this.outcome = WriteResultOutcome.FAILED_RETRY_POSSIBLE;
59       } else {
60         this.outcome = WriteResultOutcome.FAILED_RETRY_IMPOSSIBLE;
61       }
62     }
63   }
64
65   /* This method is synchronized to avoid parallel execution when the user invokes flush/close
66    * of the client in the middle of scheduled write execution (buffer flush / action limit overrun) */

67   @Override
68   public synchronized void write(final Collection<BatchPoints> collection) {
69     // empty the cached data first
70     ListIterator<BatchPoints> batchQueueIterator = batchQueue.listIterator();
71     while (batchQueueIterator.hasNext()) {
72       BatchPoints entry = batchQueueIterator.next();
73       WriteResult result = tryToWrite(entry);
74       if (result.outcome == WriteResultOutcome.WRITTEN
75               || result.outcome == WriteResultOutcome.FAILED_RETRY_IMPOSSIBLE) {
76         batchQueueIterator.remove();
77         usedRetryBufferCapacity -= entry.getPoints().size();
78         // we are throwing out data, notify the client
79         if (result.outcome == WriteResultOutcome.FAILED_RETRY_IMPOSSIBLE) {
80           exceptionHandler.accept(entry.getPoints(), result.throwable);
81         }
82       } else {
83         // we cannot send more data otherwise we would write them in different
84         // order than in which were submitted
85         for (BatchPoints batchPoints : collection) {
86           addToBatchQueue(batchPoints);
87         }
88         return;
89       }
90     }
91     // write the last given batch last so that duplicate data points get overwritten correctly
92     Iterator<BatchPoints> collectionIterator = collection.iterator();
93     while (collectionIterator.hasNext()) {
94       BatchPoints batchPoints = collectionIterator.next();
95       WriteResult result = tryToWrite(batchPoints);
96       switch (result.outcome) {
97         case FAILED_RETRY_POSSIBLE:
98           addToBatchQueue(batchPoints);
99           while (collectionIterator.hasNext()) {
100             addToBatchQueue(collectionIterator.next());
101           }
102           break;
103         case FAILED_RETRY_IMPOSSIBLE:
104           exceptionHandler.accept(batchPoints.getPoints(), result.throwable);
105           break;
106         default:
107
108       }
109     }
110   }
111
112   /* This method is synchronized to avoid parallel execution when the BatchProcessor scheduler
113    * has been shutdown but there are jobs still being executed (using RetryCapableBatchWriter.write).*/

114   @Override
115   public synchronized void close() {
116     // try to write everything queued / buffered
117     for (BatchPoints points : batchQueue) {
118       WriteResult result = tryToWrite(points);
119       if (result.outcome != WriteResultOutcome.WRITTEN) {
120         exceptionHandler.accept(points.getPoints(), result.throwable);
121       }
122     }
123   }
124
125   private WriteResult tryToWrite(final BatchPoints batchPoints) {
126     try {
127       influxDB.write(batchPoints);
128       return WriteResult.WRITTEN;
129     } catch (InfluxDBException e) {
130       return new WriteResult(e);
131     } catch (Exception e) {
132       return new WriteResult(WriteResultOutcome.FAILED_RETRY_POSSIBLE, e);
133     }
134   }
135
136   private void evictTooOldFailedWrites() {
137     while (usedRetryBufferCapacity > retryBufferCapacity && batchQueue.size() > 0) {
138       List<Point> points = batchQueue.removeFirst().getPoints();
139       usedRetryBufferCapacity -= points.size();
140       exceptionHandler.accept(points,
141               new InfluxDBException.RetryBufferOverrunException(
142                       "Retry buffer overrun, current capacity: " + retryBufferCapacity));
143     }
144   }
145
146   private void addToBatchQueue(final BatchPoints batchPoints) {
147     boolean hasBeenMergedIn = false;
148     if (batchQueue.size() > 0) {
149       BatchPoints last = batchQueue.getLast();
150       if (last.getPoints().size() + batchPoints.getPoints().size() <= requestActionsLimit) {
151         hasBeenMergedIn = last.mergeIn(batchPoints);
152       }
153     }
154     if (!hasBeenMergedIn) {
155         batchQueue.add(batchPoints);
156     }
157     // recalculate local counter and evict old batches on merge as well
158     usedRetryBufferCapacity += batchPoints.getPoints().size();
159     evictTooOldFailedWrites();
160   }
161 }
162