1 package org.influxdb.impl;
2
3 import com.squareup.moshi.JsonAdapter;
4 import com.squareup.moshi.Moshi;
5 import okhttp3.Headers;
6 import okhttp3.MediaType;
7 import okhttp3.OkHttpClient;
8 import okhttp3.Request;
9 import okhttp3.RequestBody;
10 import okhttp3.ResponseBody;
11 import okhttp3.logging.HttpLoggingInterceptor;
12 import okhttp3.logging.HttpLoggingInterceptor.Level;
13 import okio.BufferedSource;
14 import org.influxdb.BatchOptions;
15 import org.influxdb.InfluxDB;
16 import org.influxdb.InfluxDBException;
17 import org.influxdb.InfluxDBIOException;
18 import org.influxdb.dto.BatchPoints;
19 import org.influxdb.dto.BoundParameterQuery;
20 import org.influxdb.dto.Point;
21 import org.influxdb.dto.Pong;
22 import org.influxdb.dto.Query;
23 import org.influxdb.dto.QueryResult;
24 import org.influxdb.impl.BatchProcessor.HttpBatchEntry;
25 import org.influxdb.impl.BatchProcessor.UdpBatchEntry;
26 import org.influxdb.msgpack.MessagePackConverterFactory;
27 import org.influxdb.msgpack.MessagePackTraverser;
28 import retrofit2.Call;
29 import retrofit2.Callback;
30 import retrofit2.Converter.Factory;
31 import retrofit2.Response;
32 import retrofit2.Retrofit;
33 import retrofit2.converter.moshi.MoshiConverterFactory;
34
35 import java.io.EOFException;
36 import java.io.IOException;
37 import java.io.InputStream;
38 import java.net.DatagramPacket;
39 import java.net.DatagramSocket;
40 import java.net.InetAddress;
41 import java.net.InetSocketAddress;
42 import java.net.SocketException;
43 import java.net.URI;
44 import java.net.URISyntaxException;
45 import java.net.UnknownHostException;
46 import java.nio.charset.StandardCharsets;
47 import java.util.ArrayList;
48 import java.util.Collections;
49 import java.util.Iterator;
50 import java.util.List;
51 import java.util.concurrent.Executors;
52 import java.util.concurrent.ThreadFactory;
53 import java.util.concurrent.TimeUnit;
54 import java.util.concurrent.atomic.AtomicBoolean;
55 import java.util.concurrent.atomic.LongAdder;
56 import java.util.function.BiConsumer;
57 import java.util.function.Consumer;
58 import java.util.regex.Matcher;
59 import java.util.regex.Pattern;
60
61
66 public class InfluxDBImpl implements InfluxDB {
67
68 private static final String APPLICATION_MSGPACK = "application/x-msgpack";
69
70 static final okhttp3.MediaType MEDIA_TYPE_STRING = MediaType.parse("text/plain");
71
72 private static final String SHOW_DATABASE_COMMAND_ENCODED = Query.encode("SHOW DATABASES");
73
74
80 private static final LogLevel LOG_LEVEL = LogLevel.parseLogLevel(System.getProperty(LOG_LEVEL_PROPERTY));
81
82 private final String hostName;
83 private String version;
84 private final Retrofit retrofit;
85 private final OkHttpClient client;
86 private final InfluxDBService influxDBService;
87 private BatchProcessor batchProcessor;
88 private final AtomicBoolean batchEnabled = new AtomicBoolean(false);
89 private final LongAdder writeCount = new LongAdder();
90 private final LongAdder unBatchedCount = new LongAdder();
91 private final LongAdder batchedCount = new LongAdder();
92 private volatile DatagramSocket datagramSocket;
93 private final HttpLoggingInterceptor loggingInterceptor;
94 private final GzipRequestInterceptor gzipRequestInterceptor;
95 private LogLevel logLevel = LogLevel.NONE;
96 private String database;
97 private String retentionPolicy = "autogen";
98 private ConsistencyLevel consistency = ConsistencyLevel.ONE;
99 private final boolean messagePack;
100 private Boolean messagePackSupport;
101 private final ChunkProccesor chunkProccesor;
102
103
118 public InfluxDBImpl(final String url, final String username, final String password,
119 final OkHttpClient.Builder okHttpBuilder, final ResponseFormat responseFormat) {
120 this(url, username, password, okHttpBuilder, new Retrofit.Builder(), responseFormat);
121 }
122
123
140 public InfluxDBImpl(final String url, final String username, final String password,
141 final OkHttpClient.Builder okHttpBuilder, final Retrofit.Builder retrofitBuilder,
142 final ResponseFormat responseFormat) {
143 this.messagePack = ResponseFormat.MSGPACK.equals(responseFormat);
144 this.hostName = parseHost(url);
145
146 this.loggingInterceptor = new HttpLoggingInterceptor();
147 setLogLevel(LOG_LEVEL);
148
149 this.gzipRequestInterceptor = new GzipRequestInterceptor();
150 OkHttpClient.Builder clonedOkHttpBuilder = okHttpBuilder.build().newBuilder()
151 .addInterceptor(loggingInterceptor)
152 .addInterceptor(gzipRequestInterceptor);
153 if (username != null && password != null) {
154 clonedOkHttpBuilder.addInterceptor(new BasicAuthInterceptor(username, password));
155 }
156 Factory converterFactory = null;
157 switch (responseFormat) {
158 case MSGPACK:
159 clonedOkHttpBuilder.addInterceptor(chain -> {
160 Request request = chain.request().newBuilder().addHeader("Accept", APPLICATION_MSGPACK).build();
161 return chain.proceed(request);
162 });
163
164 converterFactory = MessagePackConverterFactory.create();
165 chunkProccesor = new MessagePackChunkProccesor();
166 break;
167 case JSON:
168 default:
169 converterFactory = MoshiConverterFactory.create();
170
171 Moshi moshi = new Moshi.Builder().build();
172 JsonAdapter<QueryResult> adapter = moshi.adapter(QueryResult.class);
173 chunkProccesor = new JSONChunkProccesor(adapter);
174 break;
175 }
176
177 this.client = clonedOkHttpBuilder.build();
178 Retrofit.Builder clonedRetrofitBuilder = retrofitBuilder.baseUrl(url).build().newBuilder();
179 this.retrofit = clonedRetrofitBuilder.client(this.client)
180 .addConverterFactory(converterFactory).build();
181 this.influxDBService = this.retrofit.create(InfluxDBService.class);
182
183 }
184
185 public InfluxDBImpl(final String url, final String username, final String password,
186 final OkHttpClient.Builder client) {
187 this(url, username, password, client, ResponseFormat.JSON);
188
189 }
190
191 InfluxDBImpl(final String url, final String username, final String password, final OkHttpClient.Builder client,
192 final InfluxDBService influxDBService, final JsonAdapter<QueryResult> adapter) {
193 super();
194 this.messagePack = false;
195 this.hostName = parseHost(url);
196
197 this.loggingInterceptor = new HttpLoggingInterceptor();
198 setLogLevel(LOG_LEVEL);
199
200 this.gzipRequestInterceptor = new GzipRequestInterceptor();
201 OkHttpClient.Builder clonedBuilder = client.build().newBuilder()
202 .addInterceptor(loggingInterceptor)
203 .addInterceptor(gzipRequestInterceptor)
204 .addInterceptor(new BasicAuthInterceptor(username, password));
205 this.client = clonedBuilder.build();
206 this.retrofit = new Retrofit.Builder().baseUrl(url)
207 .client(this.client)
208 .addConverterFactory(MoshiConverterFactory.create()).build();
209 this.influxDBService = influxDBService;
210
211 chunkProccesor = new JSONChunkProccesor(adapter);
212 }
213
214 public InfluxDBImpl(final String url, final String username, final String password, final OkHttpClient.Builder client,
215 final String database, final String retentionPolicy, final ConsistencyLevel consistency) {
216 this(url, username, password, client);
217
218 setConsistency(consistency);
219 setDatabase(database);
220 setRetentionPolicy(retentionPolicy);
221 }
222
223 private String parseHost(final String url) {
224 String hostName;
225 try {
226 URI uri = new URI(url);
227 hostName = uri.getHost();
228 } catch (URISyntaxException e1) {
229 throw new IllegalArgumentException("Unable to parse url: " + url, e1);
230 }
231
232 if (hostName == null) {
233 throw new IllegalArgumentException("Unable to parse url: " + url);
234 }
235
236 try {
237 InetAddress.getByName(hostName);
238 } catch (UnknownHostException e) {
239 throw new InfluxDBIOException(e);
240 }
241 return hostName;
242 }
243
244 @Override
245 public InfluxDB setLogLevel(final LogLevel logLevel) {
246 switch (logLevel) {
247 case NONE:
248 this.loggingInterceptor.setLevel(Level.NONE);
249 break;
250 case BASIC:
251 this.loggingInterceptor.setLevel(Level.BASIC);
252 break;
253 case HEADERS:
254 this.loggingInterceptor.setLevel(Level.HEADERS);
255 break;
256 case FULL:
257 this.loggingInterceptor.setLevel(Level.BODY);
258 break;
259 default:
260 break;
261 }
262 this.logLevel = logLevel;
263 return this;
264 }
265
266
269 @Override
270 public InfluxDB enableGzip() {
271 this.gzipRequestInterceptor.enable();
272 return this;
273 }
274
275
278 @Override
279 public InfluxDB disableGzip() {
280 this.gzipRequestInterceptor.disable();
281 return this;
282 }
283
284
287 @Override
288 public boolean isGzipEnabled() {
289 return this.gzipRequestInterceptor.isEnabled();
290 }
291
292 @Override
293 public InfluxDB enableBatch() {
294 enableBatch(BatchOptions.DEFAULTS);
295 return this;
296 }
297
298 @Override
299 public InfluxDB enableBatch(final BatchOptions batchOptions) {
300
301 if (this.batchEnabled.get()) {
302 throw new IllegalStateException("BatchProcessing is already enabled.");
303 }
304 this.batchProcessor = BatchProcessor
305 .builder(this)
306 .actions(batchOptions.getActions())
307 .exceptionHandler(batchOptions.getExceptionHandler())
308 .interval(batchOptions.getFlushDuration(), batchOptions.getJitterDuration(), TimeUnit.MILLISECONDS)
309 .threadFactory(batchOptions.getThreadFactory())
310 .bufferLimit(batchOptions.getBufferLimit())
311 .consistencyLevel(batchOptions.getConsistency())
312 .precision(batchOptions.getPrecision())
313 .dropActionsOnQueueExhaustion(batchOptions.isDropActionsOnQueueExhaustion())
314 .droppedActionHandler(batchOptions.getDroppedActionHandler())
315 .build();
316 this.batchEnabled.set(true);
317 return this;
318 }
319
320 @Override
321 public InfluxDB enableBatch(final int actions, final int flushDuration,
322 final TimeUnit flushDurationTimeUnit) {
323 enableBatch(actions, flushDuration, flushDurationTimeUnit, Executors.defaultThreadFactory());
324 return this;
325 }
326
327 @Override
328 public InfluxDB enableBatch(final int actions, final int flushDuration,
329 final TimeUnit flushDurationTimeUnit, final ThreadFactory threadFactory) {
330 enableBatch(actions, flushDuration, flushDurationTimeUnit, threadFactory, (points, throwable) -> { });
331 return this;
332 }
333
334 @Override
335 public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit,
336 final ThreadFactory threadFactory,
337 final BiConsumer<Iterable<Point>, Throwable> exceptionHandler,
338 final ConsistencyLevel consistency) {
339 enableBatch(actions, flushDuration, flushDurationTimeUnit, threadFactory, exceptionHandler)
340 .setConsistency(consistency);
341 return this;
342 }
343
344 @Override
345 public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit,
346 final ThreadFactory threadFactory,
347 final BiConsumer<Iterable<Point>, Throwable> exceptionHandler) {
348 enableBatch(actions, flushDuration, 0, flushDurationTimeUnit, threadFactory, exceptionHandler, false, null);
349 return this;
350 }
351
352 private InfluxDB enableBatch(final int actions, final int flushDuration, final int jitterDuration,
353 final TimeUnit durationTimeUnit, final ThreadFactory threadFactory,
354 final BiConsumer<Iterable<Point>, Throwable> exceptionHandler,
355 final boolean dropActionsOnQueueExhaustion, final Consumer<Point> droppedActionHandler) {
356 if (this.batchEnabled.get()) {
357 throw new IllegalStateException("BatchProcessing is already enabled.");
358 }
359 this.batchProcessor = BatchProcessor
360 .builder(this)
361 .actions(actions)
362 .exceptionHandler(exceptionHandler)
363 .interval(flushDuration, jitterDuration, durationTimeUnit)
364 .threadFactory(threadFactory)
365 .consistencyLevel(consistency)
366 .dropActionsOnQueueExhaustion(dropActionsOnQueueExhaustion)
367 .droppedActionHandler(droppedActionHandler)
368 .build();
369 this.batchEnabled.set(true);
370 return this;
371 }
372
373 @Override
374 public void disableBatch() {
375 this.batchEnabled.set(false);
376 if (this.batchProcessor != null) {
377 this.batchProcessor.flushAndShutdown();
378 }
379 }
380
381 @Override
382 public boolean isBatchEnabled() {
383 return this.batchEnabled.get();
384 }
385
386 @Override
387 public Pong ping() {
388 final long started = System.currentTimeMillis();
389 Call<ResponseBody> call = this.influxDBService.ping();
390 try {
391 Response<ResponseBody> response = call.execute();
392 Headers headers = response.headers();
393 String version = "unknown";
394 for (String name : headers.toMultimap().keySet()) {
395 if (null != name && "X-Influxdb-Version".equalsIgnoreCase(name)) {
396 version = headers.get(name);
397 break;
398 }
399 }
400 Pong pong = new Pong();
401 pong.setVersion(version);
402 pong.setResponseTime(System.currentTimeMillis() - started);
403 return pong;
404 } catch (IOException e) {
405 throw new InfluxDBIOException(e);
406 }
407 }
408
409 @Override
410 public String version() {
411 if (version == null) {
412 this.version = ping().getVersion();
413 }
414 return this.version;
415 }
416
417 @Override
418 public void write(final Point point) {
419 write(database, retentionPolicy, point);
420 }
421
422 @Override
423 public void write(final String records) {
424 write(database, retentionPolicy, consistency, records);
425 }
426
427 @Override
428 public void write(final List<String> records) {
429 write(database, retentionPolicy, consistency, records);
430 }
431
432 @Override
433 public void write(final String database, final String retentionPolicy, final Point point) {
434 if (this.batchEnabled.get()) {
435 HttpBatchEntry batchEntry = new HttpBatchEntry(point, database, retentionPolicy);
436 this.batchProcessor.put(batchEntry);
437 } else {
438 BatchPoints batchPoints = BatchPoints.database(database)
439 .retentionPolicy(retentionPolicy).build();
440 batchPoints.point(point);
441 this.write(batchPoints);
442 this.unBatchedCount.increment();
443 }
444 this.writeCount.increment();
445 }
446
447
450 @Override
451 public void write(final int udpPort, final Point point) {
452 if (this.batchEnabled.get()) {
453 UdpBatchEntry batchEntry = new UdpBatchEntry(point, udpPort);
454 this.batchProcessor.put(batchEntry);
455 } else {
456 this.write(udpPort, point.lineProtocol());
457 this.unBatchedCount.increment();
458 }
459 this.writeCount.increment();
460 }
461
462 @Override
463 public void write(final BatchPoints batchPoints) {
464 this.batchedCount.add(batchPoints.getPoints().size());
465 RequestBody lineProtocol = RequestBody.create(MEDIA_TYPE_STRING, batchPoints.lineProtocol());
466 String db = batchPoints.getDatabase();
467 if (db == null) {
468 db = this.database;
469 }
470 execute(this.influxDBService.writePoints(
471 db,
472 batchPoints.getRetentionPolicy(),
473 TimeUtil.toTimePrecision(batchPoints.getPrecision()),
474 batchPoints.getConsistency().value(),
475 lineProtocol));
476 }
477
478 @Override
479 public void writeWithRetry(final BatchPoints batchPoints) {
480 if (isBatchEnabled()) {
481 batchProcessor.getBatchWriter().write(Collections.singleton(batchPoints));
482 } else {
483 write(batchPoints);
484 }
485 }
486
487 @Override
488 public void write(final String database, final String retentionPolicy, final ConsistencyLevel consistency,
489 final TimeUnit precision, final String records) {
490 execute(this.influxDBService.writePoints(
491 database,
492 retentionPolicy,
493 TimeUtil.toTimePrecision(precision),
494 consistency.value(),
495 RequestBody.create(MEDIA_TYPE_STRING, records)));
496 }
497
498 @Override
499 public void write(final String database, final String retentionPolicy, final ConsistencyLevel consistency,
500 final String records) {
501 write(database, retentionPolicy, consistency, TimeUnit.NANOSECONDS, records);
502 }
503
504 @Override
505 public void write(final String database, final String retentionPolicy, final ConsistencyLevel consistency,
506 final List<String> records) {
507 write(database, retentionPolicy, consistency, TimeUnit.NANOSECONDS, records);
508 }
509
510
511 @Override
512 public void write(final String database, final String retentionPolicy, final ConsistencyLevel consistency,
513 final TimeUnit precision, final List<String> records) {
514 write(database, retentionPolicy, consistency, precision, String.join("\n", records));
515 }
516
517
518
521 @Override
522 public void write(final int udpPort, final String records) {
523 initialDatagramSocket();
524 byte[] bytes = records.getBytes(StandardCharsets.UTF_8);
525 try {
526 datagramSocket.send(new DatagramPacket(bytes, bytes.length, new InetSocketAddress(hostName, udpPort)));
527 } catch (IOException e) {
528 throw new InfluxDBIOException(e);
529 }
530 }
531
532 private void initialDatagramSocket() {
533 if (datagramSocket == null) {
534 synchronized (InfluxDBImpl.class) {
535 if (datagramSocket == null) {
536 try {
537 datagramSocket = new DatagramSocket();
538 } catch (SocketException e) {
539 throw new InfluxDBIOException(e);
540 }
541 }
542 }
543 }
544 }
545
546
549 @Override
550 public void write(final int udpPort, final List<String> records) {
551 write(udpPort, String.join("\n", records));
552 }
553
554
557 @Override
558 public QueryResult query(final Query query) {
559 return executeQuery(callQuery(query));
560 }
561
562
565 @Override
566 public void query(final Query query, final Consumer<QueryResult> onSuccess, final Consumer<Throwable> onFailure) {
567 final Call<QueryResult> call = callQuery(query);
568 call.enqueue(new Callback<QueryResult>() {
569 @Override
570 public void onResponse(final Call<QueryResult> call, final Response<QueryResult> response) {
571 if (response.isSuccessful()) {
572 onSuccess.accept(response.body());
573 } else {
574 Throwable t = null;
575 String errorBody = null;
576
577 try {
578 if (response.errorBody() != null) {
579 errorBody = response.errorBody().string();
580 }
581 } catch (IOException e) {
582 t = e;
583 }
584
585 if (t != null) {
586 onFailure.accept(new InfluxDBException(response.message(), t));
587 } else if (errorBody != null) {
588 onFailure.accept(new InfluxDBException(response.message() + " - " + errorBody));
589 } else {
590 onFailure.accept(new InfluxDBException(response.message()));
591 }
592 }
593 }
594
595 @Override
596 public void onFailure(final Call<QueryResult> call, final Throwable throwable) {
597 onFailure.accept(throwable);
598 }
599 });
600 }
601
602
605 @Override
606 public void query(final Query query, final int chunkSize, final Consumer<QueryResult> onNext) {
607 query(query, chunkSize, onNext, () -> { });
608 }
609
610
613 @Override
614 public void query(final Query query, final int chunkSize, final BiConsumer<Cancellable, QueryResult> onNext) {
615 query(query, chunkSize, onNext, () -> { });
616 }
617
618
621 @Override
622 public void query(final Query query, final int chunkSize, final Consumer<QueryResult> onNext,
623 final Runnable onComplete) {
624 query(query, chunkSize, (cancellable, queryResult) -> onNext.accept(queryResult), onComplete);
625 }
626
627 @Override
628 public void query(final Query query, final int chunkSize, final BiConsumer<Cancellable, QueryResult> onNext,
629 final Runnable onComplete) {
630 query(query, chunkSize, onNext, onComplete, null);
631 }
632
633
636 @Override
637 public void query(final Query query, final int chunkSize, final BiConsumer<Cancellable, QueryResult> onNext,
638 final Runnable onComplete, final Consumer<Throwable> onFailure) {
639 Call<ResponseBody> call;
640 if (query instanceof BoundParameterQuery) {
641 BoundParameterQuery boundParameterQuery = (BoundParameterQuery) query;
642 call = this.influxDBService.query(getDatabase(query), query.getCommandWithUrlEncoded(), chunkSize,
643 boundParameterQuery.getParameterJsonWithUrlEncoded());
644 } else {
645 if (query.requiresPost()) {
646 call = this.influxDBService.query(getDatabase(query), query.getCommandWithUrlEncoded(), chunkSize, null);
647 } else {
648 call = this.influxDBService.query(getDatabase(query), query.getCommandWithUrlEncoded(), chunkSize);
649 }
650 }
651
652 call.enqueue(new Callback<ResponseBody>() {
653 @Override
654 public void onResponse(final Call<ResponseBody> call, final Response<ResponseBody> response) {
655
656 Cancellable cancellable = new Cancellable() {
657 @Override
658 public void cancel() {
659 call.cancel();
660 }
661
662 @Override
663 public boolean isCanceled() {
664 return call.isCanceled();
665 }
666 };
667
668 try {
669 if (response.isSuccessful()) {
670 ResponseBody chunkedBody = response.body();
671 chunkProccesor.process(chunkedBody, cancellable, onNext, onComplete);
672 } else {
673
674 ResponseBody errorBody = response.errorBody();
675 if (errorBody != null) {
676 InfluxDBException influxDBException = new InfluxDBException(errorBody.string());
677 if (onFailure == null) {
678 throw influxDBException;
679 } else {
680 onFailure.accept(influxDBException);
681 }
682 }
683 }
684 } catch (IOException e) {
685 QueryResult queryResult = new QueryResult();
686 queryResult.setError(e.toString());
687 onNext.accept(cancellable, queryResult);
688
689
690 if (onFailure != null) {
691 onFailure.accept(e);
692 }
693 } catch (Exception e) {
694 call.cancel();
695 if (onFailure != null) {
696 onFailure.accept(e);
697 }
698 }
699
700 }
701
702 @Override
703 public void onFailure(final Call<ResponseBody> call, final Throwable t) {
704 if (onFailure == null) {
705 throw new InfluxDBException(t);
706 } else {
707 onFailure.accept(t);
708 }
709 }
710 });
711 }
712
713
716 @Override
717 public QueryResult query(final Query query, final TimeUnit timeUnit) {
718 Call<QueryResult> call;
719 if (query instanceof BoundParameterQuery) {
720 BoundParameterQuery boundParameterQuery = (BoundParameterQuery) query;
721 call = this.influxDBService.query(getDatabase(query),
722 TimeUtil.toTimePrecision(timeUnit), query.getCommandWithUrlEncoded(),
723 boundParameterQuery.getParameterJsonWithUrlEncoded());
724 } else {
725 if (query.requiresPost()) {
726 call = this.influxDBService.query(getDatabase(query),
727 TimeUtil.toTimePrecision(timeUnit), query.getCommandWithUrlEncoded(), null);
728 } else {
729 call = this.influxDBService.query(getDatabase(query),
730 TimeUtil.toTimePrecision(timeUnit), query.getCommandWithUrlEncoded());
731 }
732 }
733 return executeQuery(call);
734 }
735
736
739 @Override
740 public void createDatabase(final String name) {
741 Preconditions.checkNonEmptyString(name, "name");
742 String createDatabaseQueryString = String.format("CREATE DATABASE \"%s\"", name);
743 executeQuery(this.influxDBService.postQuery(Query.encode(createDatabaseQueryString)));
744 }
745
746
749 @Override
750 public void deleteDatabase(final String name) {
751 executeQuery(this.influxDBService.postQuery(Query.encode("DROP DATABASE \"" + name + "\"")));
752 }
753
754
757 @Override
758 public List<String> describeDatabases() {
759 QueryResult result = executeQuery(this.influxDBService.postQuery(SHOW_DATABASE_COMMAND_ENCODED));
760
761
762 List<List<Object>> databaseNames = result.getResults().get(0).getSeries().get(0).getValues();
763 List<String> databases = new ArrayList<>();
764 if (databaseNames != null) {
765 for (List<Object> database : databaseNames) {
766 databases.add(database.get(0).toString());
767 }
768 }
769 return databases;
770 }
771
772
775 @Override
776 public boolean databaseExists(final String name) {
777 List<String> databases = this.describeDatabases();
778 for (String databaseName : databases) {
779 if (databaseName.trim().equals(name)) {
780 return true;
781 }
782 }
783 return false;
784 }
785
786
789 private Call<QueryResult> callQuery(final Query query) {
790 Call<QueryResult> call;
791 if (query instanceof BoundParameterQuery) {
792 BoundParameterQuery boundParameterQuery = (BoundParameterQuery) query;
793 call = this.influxDBService.postQuery(getDatabase(query), query.getCommandWithUrlEncoded(),
794 boundParameterQuery.getParameterJsonWithUrlEncoded());
795 } else {
796 if (query.requiresPost()) {
797 call = this.influxDBService.postQuery(getDatabase(query), query.getCommandWithUrlEncoded());
798 } else {
799 call = this.influxDBService.query(getDatabase(query), query.getCommandWithUrlEncoded());
800 }
801 }
802 return call;
803 }
804
805 static class ErrorMessage {
806 public String error;
807 }
808
809 private boolean checkMessagePackSupport() {
810 Matcher matcher = Pattern.compile("(\\d+\\.*)+").matcher(version());
811 if (!matcher.find()) {
812 return false;
813 }
814 String s = matcher.group();
815 String[] versionNumbers = s.split("\\.");
816 final int major = Integer.parseInt(versionNumbers[0]);
817 final int minor = Integer.parseInt(versionNumbers[1]);
818 final int fromMinor = 4;
819 return (major >= 2) || ((major == 1) && (minor >= fromMinor));
820 }
821
822 private QueryResult executeQuery(final Call<QueryResult> call) {
823 if (messagePack) {
824 if (messagePackSupport == null) {
825 messagePackSupport = checkMessagePackSupport();
826 }
827
828 if (!messagePackSupport) {
829 throw new UnsupportedOperationException(
830 "MessagePack format is only supported from InfluxDB version 1.4 and later");
831 }
832 }
833 return execute(call);
834 }
835
836 private <T> T execute(final Call<T> call) {
837 try {
838 Response<T> response = call.execute();
839 if (response.isSuccessful()) {
840 return response.body();
841 }
842 try (ResponseBody errorBody = response.errorBody()) {
843 if (messagePack) {
844 throw InfluxDBException.buildExceptionForErrorState(errorBody.byteStream());
845 } else {
846 throw InfluxDBException.buildExceptionForErrorState(errorBody.string());
847 }
848 }
849 } catch (IOException e) {
850 throw new InfluxDBIOException(e);
851 }
852 }
853
854
857 @Override
858 public void flush() {
859 if (!batchEnabled.get()) {
860 throw new IllegalStateException("BatchProcessing is not enabled.");
861 }
862 batchProcessor.flush();
863 }
864
865
868 @Override
869 public void close() {
870 try {
871 this.disableBatch();
872 } finally {
873 if (datagramSocket != null && !datagramSocket.isClosed()) {
874 datagramSocket.close();
875 }
876 }
877 this.client.dispatcher().executorService().shutdown();
878 this.client.connectionPool().evictAll();
879 }
880
881 @Override
882 public InfluxDB setConsistency(final ConsistencyLevel consistency) {
883 this.consistency = consistency;
884 return this;
885 }
886
887 @Override
888 public InfluxDB setDatabase(final String database) {
889 this.database = database;
890 return this;
891 }
892
893 @Override
894 public InfluxDB setRetentionPolicy(final String retentionPolicy) {
895 this.retentionPolicy = retentionPolicy;
896 return this;
897 }
898
899
902 @Override
903 public void createRetentionPolicy(final String rpName, final String database, final String duration,
904 final String shardDuration, final int replicationFactor, final boolean isDefault) {
905 Preconditions.checkNonEmptyString(rpName, "retentionPolicyName");
906 Preconditions.checkNonEmptyString(database, "database");
907 Preconditions.checkNonEmptyString(duration, "retentionDuration");
908 Preconditions.checkDuration(duration, "retentionDuration");
909 if (shardDuration != null && !shardDuration.isEmpty()) {
910 Preconditions.checkDuration(shardDuration, "shardDuration");
911 }
912 Preconditions.checkPositiveNumber(replicationFactor, "replicationFactor");
913
914 StringBuilder queryBuilder = new StringBuilder("CREATE RETENTION POLICY \"");
915 queryBuilder.append(rpName)
916 .append("\" ON \"")
917 .append(database)
918 .append("\" DURATION ")
919 .append(duration)
920 .append(" REPLICATION ")
921 .append(replicationFactor);
922 if (shardDuration != null && !shardDuration.isEmpty()) {
923 queryBuilder.append(" SHARD DURATION ");
924 queryBuilder.append(shardDuration);
925 }
926 if (isDefault) {
927 queryBuilder.append(" DEFAULT");
928 }
929 executeQuery(this.influxDBService.postQuery(Query.encode(queryBuilder.toString())));
930 }
931
932
935 @Override
936 public void createRetentionPolicy(final String rpName, final String database, final String duration,
937 final int replicationFactor, final boolean isDefault) {
938 createRetentionPolicy(rpName, database, duration, null, replicationFactor, isDefault);
939 }
940
941
944 @Override
945 public void createRetentionPolicy(final String rpName, final String database, final String duration,
946 final String shardDuration, final int replicationFactor) {
947 createRetentionPolicy(rpName, database, duration, null, replicationFactor, false);
948 }
949
950
955 @Override
956 public void dropRetentionPolicy(final String rpName, final String database) {
957 Preconditions.checkNonEmptyString(rpName, "retentionPolicyName");
958 Preconditions.checkNonEmptyString(database, "database");
959 StringBuilder queryBuilder = new StringBuilder("DROP RETENTION POLICY \"");
960 queryBuilder.append(rpName)
961 .append("\" ON \"")
962 .append(database)
963 .append("\"");
964 executeQuery(this.influxDBService.postQuery(Query.encode(queryBuilder.toString())));
965 }
966
967 private String getDatabase(final Query query) {
968 String db = query.getDatabase();
969 if (db == null) {
970 return this.database;
971 }
972 return db;
973 }
974
975 private interface ChunkProccesor {
976 void process(ResponseBody chunkedBody, Cancellable cancellable,
977 BiConsumer<Cancellable, QueryResult> consumer, Runnable onComplete) throws IOException;
978 }
979
980 private class MessagePackChunkProccesor implements ChunkProccesor {
981 @Override
982 public void process(final ResponseBody chunkedBody, final Cancellable cancellable,
983 final BiConsumer<Cancellable, QueryResult> consumer, final Runnable onComplete)
984 throws IOException {
985 MessagePackTraverser traverser = new MessagePackTraverser();
986 try (InputStream is = chunkedBody.byteStream()) {
987 for (Iterator<QueryResult> it = traverser.traverse(is).iterator(); it.hasNext() && !cancellable.isCanceled();) {
988 QueryResult result = it.next();
989 consumer.accept(cancellable, result);
990 }
991 }
992 if (!cancellable.isCanceled()) {
993 onComplete.run();
994 }
995 }
996 }
997
998 private class JSONChunkProccesor implements ChunkProccesor {
999 private JsonAdapter<QueryResult> adapter;
1000
1001 public JSONChunkProccesor(final JsonAdapter<QueryResult> adapter) {
1002 this.adapter = adapter;
1003 }
1004
1005 @Override
1006 public void process(final ResponseBody chunkedBody, final Cancellable cancellable,
1007 final BiConsumer<Cancellable, QueryResult> consumer, final Runnable onComplete)
1008 throws IOException {
1009 try {
1010 BufferedSource source = chunkedBody.source();
1011 while (!cancellable.isCanceled()) {
1012 QueryResult result = adapter.fromJson(source);
1013 if (result != null) {
1014 consumer.accept(cancellable, result);
1015 }
1016 }
1017 } catch (EOFException e) {
1018 QueryResult queryResult = new QueryResult();
1019 queryResult.setError("DONE");
1020 consumer.accept(cancellable, queryResult);
1021 if (!cancellable.isCanceled()) {
1022 onComplete.run();
1023 }
1024 } finally {
1025 chunkedBody.close();
1026 }
1027 }
1028 }
1029 }
1030