1 package org.influxdb.impl;
2
3 import com.squareup.moshi.JsonAdapter;
4 import com.squareup.moshi.Moshi;
5 import okhttp3.Headers;
6 import okhttp3.MediaType;
7 import okhttp3.OkHttpClient;
8 import okhttp3.Request;
9 import okhttp3.RequestBody;
10 import okhttp3.ResponseBody;
11 import okhttp3.logging.HttpLoggingInterceptor;
12 import okhttp3.logging.HttpLoggingInterceptor.Level;
13 import okio.BufferedSource;
14 import org.influxdb.BatchOptions;
15 import org.influxdb.InfluxDB;
16 import org.influxdb.InfluxDBException;
17 import org.influxdb.InfluxDBIOException;
18 import org.influxdb.dto.BatchPoints;
19 import org.influxdb.dto.BoundParameterQuery;
20 import org.influxdb.dto.Point;
21 import org.influxdb.dto.Pong;
22 import org.influxdb.dto.Query;
23 import org.influxdb.dto.QueryResult;
24 import org.influxdb.impl.BatchProcessor.HttpBatchEntry;
25 import org.influxdb.impl.BatchProcessor.UdpBatchEntry;
26 import org.influxdb.msgpack.MessagePackConverterFactory;
27 import org.influxdb.msgpack.MessagePackTraverser;
28 import retrofit2.Call;
29 import retrofit2.Callback;
30 import retrofit2.Converter.Factory;
31 import retrofit2.Response;
32 import retrofit2.Retrofit;
33 import retrofit2.converter.moshi.MoshiConverterFactory;
34
35 import java.io.EOFException;
36 import java.io.IOException;
37 import java.io.InputStream;
38 import java.net.DatagramPacket;
39 import java.net.DatagramSocket;
40 import java.net.InetAddress;
41 import java.net.InetSocketAddress;
42 import java.net.SocketException;
43 import java.net.URI;
44 import java.net.URISyntaxException;
45 import java.net.UnknownHostException;
46 import java.nio.charset.StandardCharsets;
47 import java.util.ArrayList;
48 import java.util.Collections;
49 import java.util.Iterator;
50 import java.util.List;
51 import java.util.concurrent.Executors;
52 import java.util.concurrent.ThreadFactory;
53 import java.util.concurrent.TimeUnit;
54 import java.util.concurrent.atomic.AtomicBoolean;
55 import java.util.concurrent.atomic.LongAdder;
56 import java.util.function.BiConsumer;
57 import java.util.function.Consumer;
58 import java.util.regex.Matcher;
59 import java.util.regex.Pattern;
60
61 /**
62  * Implementation of a InluxDB API.
63  *
64  * @author stefan.majer [at] gmail.com
65  */

66 public class InfluxDBImpl implements InfluxDB {
67
68   private static final String APPLICATION_MSGPACK = "application/x-msgpack";
69
70   static final okhttp3.MediaType MEDIA_TYPE_STRING = MediaType.parse("text/plain");
71
72   private static final String SHOW_DATABASE_COMMAND_ENCODED = Query.encode("SHOW DATABASES");
73
74   /**
75    * This static constant holds the http logging log level expected in DEBUG mode
76    * It is set by System property {@code org.influxdb.InfluxDB.logLevel}.
77    *
78    * @see org.influxdb.InfluxDB#LOG_LEVEL_PROPERTY
79    */

80   private static final LogLevel LOG_LEVEL = LogLevel.parseLogLevel(System.getProperty(LOG_LEVEL_PROPERTY));
81
82   private final String hostName;
83   private String version;
84   private final Retrofit retrofit;
85   private final OkHttpClient client;
86   private final InfluxDBService influxDBService;
87   private BatchProcessor batchProcessor;
88   private final AtomicBoolean batchEnabled = new AtomicBoolean(false);
89   private final LongAdder writeCount = new LongAdder();
90   private final LongAdder unBatchedCount = new LongAdder();
91   private final LongAdder batchedCount = new LongAdder();
92   private volatile DatagramSocket datagramSocket;
93   private final HttpLoggingInterceptor loggingInterceptor;
94   private final GzipRequestInterceptor gzipRequestInterceptor;
95   private LogLevel logLevel = LogLevel.NONE;
96   private String database;
97   private String retentionPolicy = "autogen";
98   private ConsistencyLevel consistency = ConsistencyLevel.ONE;
99   private final boolean messagePack;
100   private Boolean messagePackSupport;
101   private final ChunkProccesor chunkProccesor;
102
103   /**
104    * Constructs a new {@code InfluxDBImpl}.
105    *
106    * @param url
107    *          The InfluxDB server API URL
108    * @param username
109    *          The InfluxDB user name
110    * @param password
111    *          The InfluxDB user password
112    * @param okHttpBuilder
113    *          The OkHttp Client Builder
114    * @param responseFormat
115    *          The {@code ResponseFormat} to use for response from InfluxDB
116    *          server
117    */

118   public InfluxDBImpl(final String url, final String username, final String password,
119                       final OkHttpClient.Builder okHttpBuilder, final ResponseFormat responseFormat) {
120     this(url, username, password, okHttpBuilder, new Retrofit.Builder(), responseFormat);
121   }
122
123   /**
124    * Constructs a new {@code InfluxDBImpl}.
125    *
126    * @param url
127    *          The InfluxDB server API URL
128    * @param username
129    *          The InfluxDB user name
130    * @param password
131    *          The InfluxDB user password
132    * @param okHttpBuilder
133    *          The OkHttp Client Builder
134    * @param retrofitBuilder
135    *          The Retrofit Builder
136    * @param responseFormat
137    *          The {@code ResponseFormat} to use for response from InfluxDB
138    *          server
139    */

140   public InfluxDBImpl(final String url, final String username, final String password,
141                       final OkHttpClient.Builder okHttpBuilder, final Retrofit.Builder retrofitBuilder,
142                       final ResponseFormat responseFormat) {
143     this.messagePack = ResponseFormat.MSGPACK.equals(responseFormat);
144     this.hostName = parseHost(url);
145
146     this.loggingInterceptor = new HttpLoggingInterceptor();
147     setLogLevel(LOG_LEVEL);
148
149     this.gzipRequestInterceptor = new GzipRequestInterceptor();
150     OkHttpClient.Builder clonedOkHttpBuilder = okHttpBuilder.build().newBuilder()
151             .addInterceptor(loggingInterceptor)
152             .addInterceptor(gzipRequestInterceptor);
153     if (username != null && password != null) {
154       clonedOkHttpBuilder.addInterceptor(new BasicAuthInterceptor(username, password));
155     }
156     Factory converterFactory = null;
157     switch (responseFormat) {
158     case MSGPACK:
159       clonedOkHttpBuilder.addInterceptor(chain -> {
160         Request request = chain.request().newBuilder().addHeader("Accept", APPLICATION_MSGPACK).build();
161         return chain.proceed(request);
162       });
163
164       converterFactory = MessagePackConverterFactory.create();
165       chunkProccesor = new MessagePackChunkProccesor();
166       break;
167     case JSON:
168     default:
169       converterFactory = MoshiConverterFactory.create();
170
171       Moshi moshi = new Moshi.Builder().build();
172       JsonAdapter<QueryResult> adapter = moshi.adapter(QueryResult.class);
173       chunkProccesor = new JSONChunkProccesor(adapter);
174       break;
175     }
176
177     this.client = clonedOkHttpBuilder.build();
178     Retrofit.Builder clonedRetrofitBuilder = retrofitBuilder.baseUrl(url).build().newBuilder();
179     this.retrofit = clonedRetrofitBuilder.client(this.client)
180             .addConverterFactory(converterFactory).build();
181     this.influxDBService = this.retrofit.create(InfluxDBService.class);
182
183   }
184
185   public InfluxDBImpl(final String url, final String username, final String password,
186       final OkHttpClient.Builder client) {
187     this(url, username, password, client, ResponseFormat.JSON);
188
189   }
190
191   InfluxDBImpl(final String url, final String username, final String password, final OkHttpClient.Builder client,
192       final InfluxDBService influxDBService, final JsonAdapter<QueryResult> adapter) {
193     super();
194     this.messagePack = false;
195     this.hostName = parseHost(url);
196
197     this.loggingInterceptor = new HttpLoggingInterceptor();
198     setLogLevel(LOG_LEVEL);
199
200     this.gzipRequestInterceptor = new GzipRequestInterceptor();
201     OkHttpClient.Builder clonedBuilder = client.build().newBuilder()
202             .addInterceptor(loggingInterceptor)
203             .addInterceptor(gzipRequestInterceptor)
204             .addInterceptor(new BasicAuthInterceptor(username, password));
205     this.client = clonedBuilder.build();
206     this.retrofit = new Retrofit.Builder().baseUrl(url)
207         .client(this.client)
208         .addConverterFactory(MoshiConverterFactory.create()).build();
209     this.influxDBService = influxDBService;
210
211     chunkProccesor = new JSONChunkProccesor(adapter);
212   }
213
214   public InfluxDBImpl(final String url, final String username, final String password, final OkHttpClient.Builder client,
215       final String database, final String retentionPolicy, final ConsistencyLevel consistency) {
216     this(url, username, password, client);
217
218     setConsistency(consistency);
219     setDatabase(database);
220     setRetentionPolicy(retentionPolicy);
221   }
222
223   private String parseHost(final String url) {
224     String hostName;
225     try {
226       URI uri = new URI(url);
227       hostName = uri.getHost();
228     } catch (URISyntaxException e1) {
229       throw new IllegalArgumentException("Unable to parse url: " + url, e1);
230     }
231
232     if (hostName == null) {
233       throw new IllegalArgumentException("Unable to parse url: " + url);
234     }
235
236     try {
237       InetAddress.getByName(hostName);
238     } catch (UnknownHostException e) {
239       throw new InfluxDBIOException(e);
240     }
241     return hostName;
242   }
243
244   @Override
245   public InfluxDB setLogLevel(final LogLevel logLevel) {
246     switch (logLevel) {
247     case NONE:
248       this.loggingInterceptor.setLevel(Level.NONE);
249       break;
250     case BASIC:
251       this.loggingInterceptor.setLevel(Level.BASIC);
252       break;
253     case HEADERS:
254       this.loggingInterceptor.setLevel(Level.HEADERS);
255       break;
256     case FULL:
257       this.loggingInterceptor.setLevel(Level.BODY);
258       break;
259     default:
260       break;
261     }
262     this.logLevel = logLevel;
263     return this;
264   }
265
266   /**
267    * {@inheritDoc}
268    */

269   @Override
270   public InfluxDB enableGzip() {
271     this.gzipRequestInterceptor.enable();
272     return this;
273   }
274
275   /**
276    * {@inheritDoc}
277    */

278   @Override
279   public InfluxDB disableGzip() {
280     this.gzipRequestInterceptor.disable();
281     return this;
282   }
283
284   /**
285    * {@inheritDoc}
286    */

287   @Override
288   public boolean isGzipEnabled() {
289     return this.gzipRequestInterceptor.isEnabled();
290   }
291
292   @Override
293   public InfluxDB enableBatch() {
294     enableBatch(BatchOptions.DEFAULTS);
295     return this;
296   }
297
298   @Override
299   public InfluxDB enableBatch(final BatchOptions batchOptions) {
300
301     if (this.batchEnabled.get()) {
302       throw new IllegalStateException("BatchProcessing is already enabled.");
303     }
304     this.batchProcessor = BatchProcessor
305             .builder(this)
306             .actions(batchOptions.getActions())
307             .exceptionHandler(batchOptions.getExceptionHandler())
308             .interval(batchOptions.getFlushDuration(), batchOptions.getJitterDuration(), TimeUnit.MILLISECONDS)
309             .threadFactory(batchOptions.getThreadFactory())
310             .bufferLimit(batchOptions.getBufferLimit())
311             .consistencyLevel(batchOptions.getConsistency())
312             .precision(batchOptions.getPrecision())
313             .dropActionsOnQueueExhaustion(batchOptions.isDropActionsOnQueueExhaustion())
314             .droppedActionHandler(batchOptions.getDroppedActionHandler())
315             .build();
316     this.batchEnabled.set(true);
317     return this;
318   }
319
320   @Override
321   public InfluxDB enableBatch(final int actions, final int flushDuration,
322                               final TimeUnit flushDurationTimeUnit) {
323     enableBatch(actions, flushDuration, flushDurationTimeUnit, Executors.defaultThreadFactory());
324     return this;
325   }
326
327   @Override
328   public InfluxDB enableBatch(final int actions, final int flushDuration,
329                               final TimeUnit flushDurationTimeUnit, final ThreadFactory threadFactory) {
330     enableBatch(actions, flushDuration, flushDurationTimeUnit, threadFactory, (points, throwable) -> { });
331     return this;
332   }
333
334   @Override
335   public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit,
336                               final ThreadFactory threadFactory,
337                               final BiConsumer<Iterable<Point>, Throwable> exceptionHandler,
338                               final ConsistencyLevel consistency) {
339     enableBatch(actions, flushDuration, flushDurationTimeUnit, threadFactory, exceptionHandler)
340         .setConsistency(consistency);
341     return this;
342   }
343
344   @Override
345   public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit,
346                               final ThreadFactory threadFactory,
347                               final BiConsumer<Iterable<Point>, Throwable> exceptionHandler) {
348     enableBatch(actions, flushDuration, 0, flushDurationTimeUnit, threadFactory, exceptionHandler, falsenull);
349     return this;
350   }
351
352   private InfluxDB enableBatch(final int actions, final int flushDuration, final int jitterDuration,
353                                final TimeUnit durationTimeUnit, final ThreadFactory threadFactory,
354                                final BiConsumer<Iterable<Point>, Throwable> exceptionHandler,
355                                final boolean dropActionsOnQueueExhaustion, final Consumer<Point> droppedActionHandler) {
356     if (this.batchEnabled.get()) {
357       throw new IllegalStateException("BatchProcessing is already enabled.");
358     }
359     this.batchProcessor = BatchProcessor
360             .builder(this)
361             .actions(actions)
362             .exceptionHandler(exceptionHandler)
363             .interval(flushDuration, jitterDuration, durationTimeUnit)
364             .threadFactory(threadFactory)
365             .consistencyLevel(consistency)
366             .dropActionsOnQueueExhaustion(dropActionsOnQueueExhaustion)
367             .droppedActionHandler(droppedActionHandler)
368             .build();
369     this.batchEnabled.set(true);
370     return this;
371   }
372
373   @Override
374   public void disableBatch() {
375     this.batchEnabled.set(false);
376     if (this.batchProcessor != null) {
377       this.batchProcessor.flushAndShutdown();
378     }
379   }
380
381   @Override
382   public boolean isBatchEnabled() {
383     return this.batchEnabled.get();
384   }
385
386   @Override
387   public Pong ping() {
388     final long started = System.currentTimeMillis();
389     Call<ResponseBody> call = this.influxDBService.ping();
390     try {
391       Response<ResponseBody> response = call.execute();
392       Headers headers = response.headers();
393       String version = "unknown";
394       for (String name : headers.toMultimap().keySet()) {
395         if (null != name && "X-Influxdb-Version".equalsIgnoreCase(name)) {
396           version = headers.get(name);
397           break;
398         }
399       }
400       Pong pong = new Pong();
401       pong.setVersion(version);
402       pong.setResponseTime(System.currentTimeMillis() - started);
403       return pong;
404     } catch (IOException e) {
405       throw new InfluxDBIOException(e);
406     }
407   }
408
409   @Override
410   public String version() {
411     if (version == null) {
412       this.version = ping().getVersion();
413     }
414     return this.version;
415   }
416
417   @Override
418   public void write(final Point point) {
419     write(database, retentionPolicy, point);
420   }
421
422   @Override
423   public void write(final String records) {
424     write(database, retentionPolicy, consistency, records);
425   }
426
427   @Override
428   public void write(final List<String> records) {
429     write(database, retentionPolicy, consistency, records);
430   }
431
432   @Override
433   public void write(final String database, final String retentionPolicy, final Point point) {
434     if (this.batchEnabled.get()) {
435       HttpBatchEntry batchEntry = new HttpBatchEntry(point, database, retentionPolicy);
436       this.batchProcessor.put(batchEntry);
437     } else {
438       BatchPoints batchPoints = BatchPoints.database(database)
439                                            .retentionPolicy(retentionPolicy).build();
440       batchPoints.point(point);
441       this.write(batchPoints);
442       this.unBatchedCount.increment();
443     }
444     this.writeCount.increment();
445   }
446
447   /**
448    * {@inheritDoc}
449    */

450   @Override
451   public void write(final int udpPort, final Point point) {
452     if (this.batchEnabled.get()) {
453       UdpBatchEntry batchEntry = new UdpBatchEntry(point, udpPort);
454       this.batchProcessor.put(batchEntry);
455     } else {
456       this.write(udpPort, point.lineProtocol());
457       this.unBatchedCount.increment();
458     }
459     this.writeCount.increment();
460   }
461
462   @Override
463   public void write(final BatchPoints batchPoints) {
464     this.batchedCount.add(batchPoints.getPoints().size());
465     RequestBody lineProtocol = RequestBody.create(MEDIA_TYPE_STRING, batchPoints.lineProtocol());
466     String db = batchPoints.getDatabase();
467     if (db == null) {
468         db = this.database;
469     }
470     execute(this.influxDBService.writePoints(
471         db,
472         batchPoints.getRetentionPolicy(),
473         TimeUtil.toTimePrecision(batchPoints.getPrecision()),
474         batchPoints.getConsistency().value(),
475         lineProtocol));
476   }
477
478   @Override
479   public void writeWithRetry(final BatchPoints batchPoints) {
480     if (isBatchEnabled()) {
481       batchProcessor.getBatchWriter().write(Collections.singleton(batchPoints));
482     } else {
483       write(batchPoints);
484     }
485   }
486
487   @Override
488   public void write(final String database, final String retentionPolicy, final ConsistencyLevel consistency,
489           final TimeUnit precision, final String records) {
490     execute(this.influxDBService.writePoints(
491         database,
492         retentionPolicy,
493         TimeUtil.toTimePrecision(precision),
494         consistency.value(),
495         RequestBody.create(MEDIA_TYPE_STRING, records)));
496   }
497
498   @Override
499   public void write(final String database, final String retentionPolicy, final ConsistencyLevel consistency,
500       final String records) {
501     write(database, retentionPolicy, consistency, TimeUnit.NANOSECONDS, records);
502   }
503
504   @Override
505   public void write(final String database, final String retentionPolicy, final ConsistencyLevel consistency,
506       final List<String> records) {
507     write(database, retentionPolicy, consistency, TimeUnit.NANOSECONDS, records);
508   }
509
510
511   @Override
512   public void write(final String database, final String retentionPolicy, final ConsistencyLevel consistency,
513           final TimeUnit precision, final List<String> records) {
514     write(database, retentionPolicy, consistency, precision, String.join("\n", records));
515   }
516
517
518   /**
519    * {@inheritDoc}
520    */

521   @Override
522   public void write(final int udpPort, final String records) {
523     initialDatagramSocket();
524     byte[] bytes = records.getBytes(StandardCharsets.UTF_8);
525     try {
526         datagramSocket.send(new DatagramPacket(bytes, bytes.length, new InetSocketAddress(hostName, udpPort)));
527     } catch (IOException e) {
528         throw new InfluxDBIOException(e);
529     }
530   }
531
532   private void initialDatagramSocket() {
533     if (datagramSocket == null) {
534         synchronized (InfluxDBImpl.class) {
535             if (datagramSocket == null) {
536                 try {
537                     datagramSocket = new DatagramSocket();
538                 } catch (SocketException e) {
539                     throw new InfluxDBIOException(e);
540                 }
541             }
542         }
543     }
544 }
545
546   /**
547    * {@inheritDoc}
548    */

549   @Override
550   public void write(final int udpPort, final List<String> records) {
551     write(udpPort, String.join("\n", records));
552   }
553
554   /**
555    * {@inheritDoc}
556    */

557   @Override
558   public QueryResult query(final Query query) {
559     return executeQuery(callQuery(query));
560   }
561
562   /**
563    * {@inheritDoc}
564    */

565   @Override
566   public void query(final Query query, final Consumer<QueryResult> onSuccess, final Consumer<Throwable> onFailure) {
567     final Call<QueryResult> call = callQuery(query);
568     call.enqueue(new Callback<QueryResult>() {
569       @Override
570       public void onResponse(final Call<QueryResult> call, final Response<QueryResult> response) {
571         if (response.isSuccessful()) {
572           onSuccess.accept(response.body());
573         } else {
574           Throwable t = null;
575           String errorBody = null;
576
577           try {
578             if (response.errorBody() != null) {
579               errorBody = response.errorBody().string();
580             }
581           } catch (IOException e) {
582             t = e;
583           }
584
585           if (t != null) {
586             onFailure.accept(new InfluxDBException(response.message(), t));
587           } else if (errorBody != null) {
588             onFailure.accept(new InfluxDBException(response.message() + " - " + errorBody));
589           } else {
590             onFailure.accept(new InfluxDBException(response.message()));
591           }
592         }
593       }
594
595       @Override
596       public void onFailure(final Call<QueryResult> call, final Throwable throwable) {
597         onFailure.accept(throwable);
598       }
599     });
600   }
601
602   /**
603    * {@inheritDoc}
604    */

605   @Override
606   public void query(final Query query, final int chunkSize, final Consumer<QueryResult> onNext) {
607     query(query, chunkSize, onNext, () -> { });
608   }
609
610   /**
611    * {@inheritDoc}
612    */

613   @Override
614   public void query(final Query query, final int chunkSize, final BiConsumer<Cancellable, QueryResult> onNext) {
615     query(query, chunkSize, onNext, () -> { });
616   }
617
618   /**
619    * {@inheritDoc}
620    */

621   @Override
622   public void query(final Query query, final int chunkSize, final Consumer<QueryResult> onNext,
623                     final Runnable onComplete) {
624     query(query, chunkSize, (cancellable, queryResult) -> onNext.accept(queryResult), onComplete);
625   }
626
627   @Override
628   public void query(final Query query, final int chunkSize, final BiConsumer<Cancellable, QueryResult> onNext,
629                     final Runnable onComplete) {
630     query(query, chunkSize, onNext, onComplete, null);
631   }
632
633   /**
634    * {@inheritDoc}
635    */

636   @Override
637   public void query(final Query query, final int chunkSize, final BiConsumer<Cancellable, QueryResult> onNext,
638                     final Runnable onComplete, final Consumer<Throwable> onFailure) {
639     Call<ResponseBody> call;
640     if (query instanceof BoundParameterQuery) {
641       BoundParameterQuery boundParameterQuery = (BoundParameterQuery) query;
642       call = this.influxDBService.query(getDatabase(query), query.getCommandWithUrlEncoded(), chunkSize,
643           boundParameterQuery.getParameterJsonWithUrlEncoded());
644     } else {
645       if (query.requiresPost()) {
646         call = this.influxDBService.query(getDatabase(query), query.getCommandWithUrlEncoded(), chunkSize, null);
647       } else {
648         call = this.influxDBService.query(getDatabase(query), query.getCommandWithUrlEncoded(), chunkSize);
649       }
650     }
651
652     call.enqueue(new Callback<ResponseBody>() {
653       @Override
654       public void onResponse(final Call<ResponseBody> call, final Response<ResponseBody> response) {
655
656         Cancellable cancellable = new Cancellable() {
657           @Override
658           public void cancel() {
659             call.cancel();
660           }
661
662           @Override
663           public boolean isCanceled() {
664             return call.isCanceled();
665           }
666         };
667
668         try {
669           if (response.isSuccessful()) {
670             ResponseBody chunkedBody = response.body();
671             chunkProccesor.process(chunkedBody, cancellable, onNext, onComplete);
672           } else {
673             // REVIEW: must be handled consistently with IOException.
674             ResponseBody errorBody = response.errorBody();
675             if (errorBody != null) {
676               InfluxDBException influxDBException = new InfluxDBException(errorBody.string());
677               if (onFailure == null) {
678                 throw influxDBException;
679               } else {
680                 onFailure.accept(influxDBException);
681               }
682             }
683           }
684         } catch (IOException e) {
685           QueryResult queryResult = new QueryResult();
686           queryResult.setError(e.toString());
687           onNext.accept(cancellable, queryResult);
688           //passing null onFailure consumer is here for backward compatibility
689           //where the empty queryResult containing error is propagating into onNext consumer
690           if (onFailure != null) {
691             onFailure.accept(e);
692           }
693         } catch (Exception e) {
694           call.cancel();
695           if (onFailure != null) {
696             onFailure.accept(e);
697           }
698         }
699
700       }
701
702       @Override
703       public void onFailure(final Call<ResponseBody> call, final Throwable t) {
704         if (onFailure == null) {
705           throw new InfluxDBException(t);
706         } else {
707           onFailure.accept(t);
708         }
709       }
710     });
711   }
712
713   /**
714    * {@inheritDoc}
715    */

716   @Override
717   public QueryResult query(final Query query, final TimeUnit timeUnit) {
718     Call<QueryResult> call;
719     if (query instanceof BoundParameterQuery) {
720         BoundParameterQuery boundParameterQuery = (BoundParameterQuery) query;
721         call = this.influxDBService.query(getDatabase(query),
722                 TimeUtil.toTimePrecision(timeUnit), query.getCommandWithUrlEncoded(),
723                 boundParameterQuery.getParameterJsonWithUrlEncoded());
724     } else {
725         if (query.requiresPost()) {
726           call = this.influxDBService.query(getDatabase(query),
727                   TimeUtil.toTimePrecision(timeUnit), query.getCommandWithUrlEncoded(), null);
728         } else {
729           call = this.influxDBService.query(getDatabase(query),
730                   TimeUtil.toTimePrecision(timeUnit), query.getCommandWithUrlEncoded());
731         }
732     }
733     return executeQuery(call);
734   }
735
736   /**
737    * {@inheritDoc}
738    */

739   @Override
740   public void createDatabase(final String name) {
741     Preconditions.checkNonEmptyString(name, "name");
742     String createDatabaseQueryString = String.format("CREATE DATABASE \"%s\"", name);
743     executeQuery(this.influxDBService.postQuery(Query.encode(createDatabaseQueryString)));
744   }
745
746   /**
747    * {@inheritDoc}
748    */

749   @Override
750   public void deleteDatabase(final String name) {
751     executeQuery(this.influxDBService.postQuery(Query.encode("DROP DATABASE \"" + name + "\"")));
752   }
753
754   /**
755    * {@inheritDoc}
756    */

757   @Override
758   public List<String> describeDatabases() {
759     QueryResult result = executeQuery(this.influxDBService.postQuery(SHOW_DATABASE_COMMAND_ENCODED));
760     // {"results":[{"series":[{"name":"databases","columns":["name"],"values":[["mydb"]]}]}]}
761     // Series [name=databases, columns=[name], values=[[mydb], [unittest_1433605300968]]]
762     List<List<Object>> databaseNames = result.getResults().get(0).getSeries().get(0).getValues();
763     List<String> databases = new ArrayList<>();
764     if (databaseNames != null) {
765       for (List<Object> database : databaseNames) {
766         databases.add(database.get(0).toString());
767       }
768     }
769     return databases;
770   }
771
772   /**
773    * {@inheritDoc}
774    */

775   @Override
776   public boolean databaseExists(final String name) {
777     List<String> databases = this.describeDatabases();
778     for (String databaseName : databases) {
779       if (databaseName.trim().equals(name)) {
780         return true;
781       }
782     }
783     return false;
784   }
785
786   /**
787    * Calls the influxDBService for the query.
788    */

789   private Call<QueryResult> callQuery(final Query query) {
790     Call<QueryResult> call;
791     if (query instanceof BoundParameterQuery) {
792         BoundParameterQuery boundParameterQuery = (BoundParameterQuery) query;
793         call = this.influxDBService.postQuery(getDatabase(query), query.getCommandWithUrlEncoded(),
794                 boundParameterQuery.getParameterJsonWithUrlEncoded());
795     } else {
796         if (query.requiresPost()) {
797           call = this.influxDBService.postQuery(getDatabase(query), query.getCommandWithUrlEncoded());
798         } else {
799           call = this.influxDBService.query(getDatabase(query), query.getCommandWithUrlEncoded());
800         }
801     }
802     return call;
803   }
804
805   static class ErrorMessage {
806     public String error;
807   }
808
809   private boolean checkMessagePackSupport() {
810     Matcher matcher = Pattern.compile("(\\d+\\.*)+").matcher(version());
811     if (!matcher.find()) {
812       return false;
813     }
814     String s = matcher.group();
815     String[] versionNumbers = s.split("\\.");
816     final int major = Integer.parseInt(versionNumbers[0]);
817     final int minor = Integer.parseInt(versionNumbers[1]);
818     final int fromMinor = 4;
819     return (major >= 2) || ((major == 1) && (minor >= fromMinor));
820   }
821
822   private QueryResult executeQuery(final Call<QueryResult> call) {
823     if (messagePack) {
824       if (messagePackSupport == null) {
825         messagePackSupport = checkMessagePackSupport();
826       }
827
828       if (!messagePackSupport) {
829         throw new UnsupportedOperationException(
830             "MessagePack format is only supported from InfluxDB version 1.4 and later");
831       }
832     }
833     return execute(call);
834   }
835
836   private <T> T execute(final Call<T> call) {
837     try {
838       Response<T> response = call.execute();
839       if (response.isSuccessful()) {
840         return response.body();
841       }
842       try (ResponseBody errorBody = response.errorBody()) {
843         if (messagePack) {
844           throw InfluxDBException.buildExceptionForErrorState(errorBody.byteStream());
845         } else {
846           throw InfluxDBException.buildExceptionForErrorState(errorBody.string());
847         }
848       }
849     } catch (IOException e) {
850       throw new InfluxDBIOException(e);
851     }
852   }
853
854   /**
855    * {@inheritDoc}
856    */

857   @Override
858   public void flush() {
859     if (!batchEnabled.get()) {
860       throw new IllegalStateException("BatchProcessing is not enabled.");
861     }
862     batchProcessor.flush();
863   }
864
865   /**
866    * {@inheritDoc}
867    */

868   @Override
869   public void close() {
870     try {
871         this.disableBatch();
872     } finally {
873         if (datagramSocket != null && !datagramSocket.isClosed()) {
874             datagramSocket.close();
875         }
876     }
877     this.client.dispatcher().executorService().shutdown();
878     this.client.connectionPool().evictAll();
879   }
880
881   @Override
882   public InfluxDB setConsistency(final ConsistencyLevel consistency) {
883     this.consistency = consistency;
884     return this;
885   }
886
887   @Override
888   public InfluxDB setDatabase(final String database) {
889     this.database = database;
890     return this;
891   }
892
893   @Override
894   public InfluxDB setRetentionPolicy(final String retentionPolicy) {
895     this.retentionPolicy = retentionPolicy;
896     return this;
897   }
898
899   /**
900    * {@inheritDoc}
901    */

902   @Override
903   public void createRetentionPolicy(final String rpName, final String database, final String duration,
904                                     final String shardDuration, final int replicationFactor, final boolean isDefault) {
905     Preconditions.checkNonEmptyString(rpName, "retentionPolicyName");
906     Preconditions.checkNonEmptyString(database, "database");
907     Preconditions.checkNonEmptyString(duration, "retentionDuration");
908     Preconditions.checkDuration(duration, "retentionDuration");
909     if (shardDuration != null && !shardDuration.isEmpty()) {
910       Preconditions.checkDuration(shardDuration, "shardDuration");
911     }
912     Preconditions.checkPositiveNumber(replicationFactor, "replicationFactor");
913
914     StringBuilder queryBuilder = new StringBuilder("CREATE RETENTION POLICY \"");
915     queryBuilder.append(rpName)
916         .append("\" ON \"")
917         .append(database)
918         .append("\" DURATION ")
919         .append(duration)
920         .append(" REPLICATION ")
921         .append(replicationFactor);
922     if (shardDuration != null && !shardDuration.isEmpty()) {
923       queryBuilder.append(" SHARD DURATION ");
924       queryBuilder.append(shardDuration);
925     }
926     if (isDefault) {
927       queryBuilder.append(" DEFAULT");
928     }
929     executeQuery(this.influxDBService.postQuery(Query.encode(queryBuilder.toString())));
930   }
931
932   /**
933    * {@inheritDoc}
934    */

935   @Override
936   public void createRetentionPolicy(final String rpName, final String database, final String duration,
937                                     final int replicationFactor, final boolean isDefault) {
938     createRetentionPolicy(rpName, database, duration, null, replicationFactor, isDefault);
939   }
940
941   /**
942    * {@inheritDoc}
943    */

944   @Override
945   public void createRetentionPolicy(final String rpName, final String database, final String duration,
946                                     final String shardDuration, final int replicationFactor) {
947     createRetentionPolicy(rpName, database, duration, null, replicationFactor, false);
948   }
949
950   /**
951    * {@inheritDoc}
952    * @param rpName the name of the retentionPolicy
953    * @param database the name of the database
954    */

955   @Override
956   public void dropRetentionPolicy(final String rpName, final String database) {
957     Preconditions.checkNonEmptyString(rpName, "retentionPolicyName");
958     Preconditions.checkNonEmptyString(database, "database");
959     StringBuilder queryBuilder = new StringBuilder("DROP RETENTION POLICY \"");
960     queryBuilder.append(rpName)
961         .append("\" ON \"")
962         .append(database)
963         .append("\"");
964     executeQuery(this.influxDBService.postQuery(Query.encode(queryBuilder.toString())));
965   }
966
967   private String getDatabase(final Query query) {
968     String db = query.getDatabase();
969     if (db == null) {
970       return this.database;
971     }
972     return db;
973   }
974
975   private interface ChunkProccesor {
976     void process(ResponseBody chunkedBody, Cancellable cancellable,
977                  BiConsumer<Cancellable, QueryResult> consumer, Runnable onComplete) throws IOException;
978   }
979
980   private class MessagePackChunkProccesor implements ChunkProccesor {
981     @Override
982     public void process(final ResponseBody chunkedBody, final Cancellable cancellable,
983                         final BiConsumer<Cancellable, QueryResult> consumer, final Runnable onComplete)
984             throws IOException {
985       MessagePackTraverser traverser = new MessagePackTraverser();
986       try (InputStream is = chunkedBody.byteStream()) {
987         for (Iterator<QueryResult> it = traverser.traverse(is).iterator(); it.hasNext() && !cancellable.isCanceled();) {
988           QueryResult result = it.next();
989           consumer.accept(cancellable, result);
990         }
991       }
992       if (!cancellable.isCanceled()) {
993         onComplete.run();
994       }
995     }
996   }
997
998   private class JSONChunkProccesor implements ChunkProccesor {
999     private JsonAdapter<QueryResult> adapter;
1000
1001     public JSONChunkProccesor(final JsonAdapter<QueryResult> adapter) {
1002       this.adapter = adapter;
1003     }
1004
1005     @Override
1006     public void process(final ResponseBody chunkedBody, final Cancellable cancellable,
1007                         final BiConsumer<Cancellable, QueryResult> consumer, final Runnable onComplete)
1008             throws IOException {
1009       try {
1010         BufferedSource source = chunkedBody.source();
1011         while (!cancellable.isCanceled()) {
1012           QueryResult result = adapter.fromJson(source);
1013           if (result != null) {
1014             consumer.accept(cancellable, result);
1015           }
1016         }
1017       } catch (EOFException e) {
1018         QueryResult queryResult = new QueryResult();
1019         queryResult.setError("DONE");
1020         consumer.accept(cancellable, queryResult);
1021         if (!cancellable.isCanceled()) {
1022           onComplete.run();
1023         }
1024       } finally {
1025         chunkedBody.close();
1026       }
1027     }
1028   }
1029 }
1030