1
18
19 package org.xnio;
20
21 import java.io.Closeable;
22 import java.io.IOException;
23 import java.net.DatagramSocket;
24 import java.net.ServerSocket;
25 import java.net.Socket;
26 import java.nio.ByteBuffer;
27 import java.nio.channels.ClosedChannelException;
28 import java.nio.channels.ReadableByteChannel;
29 import java.nio.channels.Selector;
30 import java.nio.channels.Channel;
31 import java.nio.channels.WritableByteChannel;
32 import java.util.Random;
33 import java.util.concurrent.CancellationException;
34 import java.util.concurrent.CountDownLatch;
35 import java.util.concurrent.ExecutionException;
36 import java.util.concurrent.Executor;
37 import java.util.concurrent.Future;
38 import java.util.concurrent.ThreadLocalRandom;
39 import java.util.concurrent.TimeUnit;
40 import java.util.concurrent.TimeoutException;
41 import java.util.zip.ZipFile;
42 import org.xnio.channels.SuspendableReadChannel;
43
44 import java.util.logging.Handler;
45
46 import static org.xnio._private.Messages.closeMsg;
47 import static org.xnio._private.Messages.msg;
48
49
54 public final class IoUtils {
55
56 private static final Executor NULL_EXECUTOR = new Executor() {
57 private final String string = String.format("null executor <%s>", Integer.toHexString(hashCode()));
58
59 public void execute(final Runnable command) {
60
61 }
62
63 public String toString() {
64 return string;
65 }
66 };
67 private static final Executor DIRECT_EXECUTOR = new Executor() {
68 private final String string = String.format("direct executor <%s>", Integer.toHexString(hashCode()));
69
70 public void execute(final Runnable command) {
71 command.run();
72 }
73
74 public String toString() {
75 return string;
76 }
77 };
78 private static final Closeable NULL_CLOSEABLE = new Closeable() {
79 private final String string = String.format("null closeable <%s>", Integer.toHexString(hashCode()));
80 public void close() throws IOException {
81
82 }
83
84 public String toString() {
85 return string;
86 }
87 };
88 private static final Cancellable NULL_CANCELLABLE = new Cancellable() {
89 public Cancellable cancel() {
90 return this;
91 }
92 };
93 @SuppressWarnings("rawtypes")
94 private static final IoUtils.ResultNotifier RESULT_NOTIFIER = new IoUtils.ResultNotifier();
95
96 private IoUtils() {}
97
98
103 public static Executor directExecutor() {
104 return DIRECT_EXECUTOR;
105 }
106
107
112 public static Executor nullExecutor() {
113 return NULL_EXECUTOR;
114 }
115
116
122 public static Closeable nullCloseable() {
123 return NULL_CLOSEABLE;
124 }
125
126
131 public static void safeClose(final AutoCloseable resource) {
132 try {
133 if (resource != null) {
134 closeMsg.closingResource(resource);
135 resource.close();
136 }
137 } catch (ClosedChannelException ignored) {
138 } catch (Throwable t) {
139 closeMsg.resourceCloseFailed(t, resource);
140 }
141 }
142
143
148 public static void safeClose(final Closeable resource) {
149 try {
150 if (resource != null) {
151 closeMsg.closingResource(resource);
152 resource.close();
153 }
154 } catch (ClosedChannelException ignored) {
155 msg.tracef("safeClose, ignoring ClosedChannelException exception");
156 } catch (Throwable t) {
157 closeMsg.resourceCloseFailed(t, resource);
158 }
159 }
160
161
166 public static void safeClose(final Closeable... resources) {
167 for (Closeable resource : resources) {
168 safeClose(resource);
169 }
170 }
171
172
177 public static void safeClose(final Socket resource) {
178 try {
179 if (resource != null) {
180 closeMsg.closingResource(resource);
181 resource.close();
182 }
183 } catch (ClosedChannelException ignored) {
184 } catch (Throwable t) {
185 closeMsg.resourceCloseFailed(t, resource);
186 }
187 }
188
189
194 public static void safeClose(final DatagramSocket resource) {
195 try {
196 if (resource != null) {
197 closeMsg.closingResource(resource);
198 resource.close();
199 }
200 } catch (Throwable t) {
201 closeMsg.resourceCloseFailed(t, resource);
202 }
203 }
204
205
210 public static void safeClose(final Selector resource) {
211 try {
212 if (resource != null) {
213 closeMsg.closingResource(resource);
214 resource.close();
215 }
216 } catch (ClosedChannelException ignored) {
217 } catch (Throwable t) {
218 closeMsg.resourceCloseFailed(t, resource);
219 }
220 }
221
222
227 public static void safeClose(final ServerSocket resource) {
228 try {
229 if (resource != null) {
230 closeMsg.closingResource(resource);
231 resource.close();
232 }
233 } catch (ClosedChannelException ignored) {
234 } catch (Throwable t) {
235 closeMsg.resourceCloseFailed(t, resource);
236 }
237 }
238
239
244 public static void safeClose(final ZipFile resource) {
245 try {
246 if (resource != null) {
247 closeMsg.closingResource(resource);
248 resource.close();
249 }
250 } catch (Throwable t) {
251 closeMsg.resourceCloseFailed(t, resource);
252 }
253 }
254
255
260 public static void safeClose(final Handler resource) {
261 try {
262 if (resource != null) {
263 closeMsg.closingResource(resource);
264 resource.close();
265 }
266 } catch (Throwable t) {
267 closeMsg.resourceCloseFailed(t, resource);
268 }
269 }
270
271
277 public static void safeClose(final IoFuture<? extends Closeable> futureResource) {
278 if (futureResource != null) {
279 futureResource.cancel().addNotifier(closingNotifier(), null);
280 }
281 }
282
283 private static final IoFuture.Notifier<Object, Closeable> ATTACHMENT_CLOSING_NOTIFIER = new IoFuture.Notifier<Object, Closeable>() {
284 public void notify(final IoFuture<?> future, final Closeable attachment) {
285 IoUtils.safeClose(attachment);
286 }
287 };
288
289 private static final IoFuture.Notifier<Closeable, Void> CLOSING_NOTIFIER = new IoFuture.HandlingNotifier<Closeable, Void>() {
290 public void handleDone(final Closeable result, final Void attachment) {
291 IoUtils.safeClose(result);
292 }
293 };
294
295
300 public static IoFuture.Notifier<Object, Closeable> attachmentClosingNotifier() {
301 return ATTACHMENT_CLOSING_NOTIFIER;
302 }
303
304
309 public static IoFuture.Notifier<Closeable, Void> closingNotifier() {
310 return CLOSING_NOTIFIER;
311 }
312
313
320 public static <T> IoFuture.Notifier<T, Void> runnableNotifier(final Runnable runnable) {
321 return new IoFuture.Notifier<T, Void>() {
322 public void notify(final IoFuture<? extends T> future, final Void attachment) {
323 runnable.run();
324 }
325 };
326 }
327
328
335 @SuppressWarnings({ "unchecked" })
336 public static <T> IoFuture.Notifier<T, Result<T>> resultNotifier() {
337 return RESULT_NOTIFIER;
338 }
339
340
346 @SuppressWarnings({ "unchecked" })
347 public static <T extends Channel> IoFuture.Notifier<T, ChannelListener<? super T>> channelListenerNotifier() {
348 return CHANNEL_LISTENER_NOTIFIER;
349 }
350
351 @SuppressWarnings("rawtypes")
352 private static final IoFuture.Notifier CHANNEL_LISTENER_NOTIFIER = new IoFuture.HandlingNotifier<Channel, ChannelListener<? super Channel>>() {
353 @SuppressWarnings({ "unchecked" })
354 public void handleDone(final Channel channel, final ChannelListener channelListener) {
355 channelListener.handleEvent(channel);
356 }
357 };
358
359
365 public static <T> Future<T> getFuture(final IoFuture<T> ioFuture) {
366 return new Future<T>() {
367 public boolean cancel(final boolean mayInterruptIfRunning) {
368 ioFuture.cancel();
369 return ioFuture.await() == IoFuture.Status.CANCELLED;
370 }
371
372 public boolean isCancelled() {
373 return ioFuture.getStatus() == IoFuture.Status.CANCELLED;
374 }
375
376 public boolean isDone() {
377 return ioFuture.getStatus() == IoFuture.Status.DONE;
378 }
379
380 public T get() throws InterruptedException, ExecutionException {
381 try {
382 return ioFuture.getInterruptibly();
383 } catch (IOException e) {
384 throw new ExecutionException(e);
385 }
386 }
387
388 public T get(final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
389 try {
390 if (ioFuture.awaitInterruptibly(timeout, unit) == IoFuture.Status.WAITING) {
391 throw msg.opTimedOut();
392 }
393 return ioFuture.getInterruptibly();
394 } catch (IOException e) {
395 throw new ExecutionException(e);
396 }
397 }
398
399 public String toString() {
400 return String.format("java.util.concurrent.Future wrapper <%s> for %s", Integer.toHexString(hashCode()), ioFuture);
401 }
402 };
403 }
404
405 private static final IoFuture.Notifier<Object, CountDownLatch> COUNT_DOWN_NOTIFIER = new IoFuture.Notifier<Object, CountDownLatch>() {
406 public void notify(final IoFuture<?> future, final CountDownLatch latch) {
407 latch.countDown();
408 }
409 };
410
411
416 public static void awaitAll(IoFuture<?>... futures) {
417 final int len = futures.length;
418 final CountDownLatch cdl = new CountDownLatch(len);
419 for (IoFuture<?> future : futures) {
420 future.addNotifier(COUNT_DOWN_NOTIFIER, cdl);
421 }
422 boolean intr = false;
423 try {
424 while (cdl.getCount() > 0L) {
425 try {
426 cdl.await();
427 } catch (InterruptedException e) {
428 intr = true;
429 }
430 }
431 } finally {
432 if (intr) {
433 Thread.currentThread().interrupt();
434 }
435 }
436 }
437
438
444 public static void awaitAllInterruptibly(IoFuture<?>... futures) throws InterruptedException {
445 final int len = futures.length;
446 final CountDownLatch cdl = new CountDownLatch(len);
447 for (IoFuture<?> future : futures) {
448 future.addNotifier(COUNT_DOWN_NOTIFIER, cdl);
449 }
450 cdl.await();
451 }
452
453
462 public static <I, O> IoFuture<? extends O> cast(final IoFuture<I> parent, final Class<O> type) {
463 return new CastingIoFuture<O, I>(parent, type);
464 }
465
466
471 public static void safeShutdownReads(final SuspendableReadChannel channel) {
472 if (channel != null) {
473 try {
474 channel.shutdownReads();
475 } catch (IOException e) {
476 closeMsg.resourceReadShutdownFailed(null, null);
477 }
478 }
479 }
480
481
496 public static long transfer(final ReadableByteChannel source, final long count, final ByteBuffer throughBuffer, final WritableByteChannel sink) throws IOException {
497 long res;
498 long total = 0L;
499 throughBuffer.limit(0);
500 while (total < count) {
501 throughBuffer.compact();
502 try {
503 if (count - total < (long) throughBuffer.remaining()) {
504 throughBuffer.limit((int) (count - total));
505 }
506 res = source.read(throughBuffer);
507 if (res <= 0) {
508 return total == 0L ? res : total;
509 }
510 } finally {
511 throughBuffer.flip();
512 }
513 res = sink.write(throughBuffer);
514 if (res == 0) {
515 return total;
516 }
517 total += res;
518 }
519 return total;
520 }
521
522
523
524 private static class CastingIoFuture<O, I> implements IoFuture<O> {
525
526 private final IoFuture<I> parent;
527 private final Class<O> type;
528
529 private CastingIoFuture(final IoFuture<I> parent, final Class<O> type) {
530 this.parent = parent;
531 this.type = type;
532 }
533
534 public IoFuture<O> cancel() {
535 parent.cancel();
536 return this;
537 }
538
539 public Status getStatus() {
540 return parent.getStatus();
541 }
542
543 public Status await() {
544 return parent.await();
545 }
546
547 public Status await(final long time, final TimeUnit timeUnit) {
548 return parent.await(time, timeUnit);
549 }
550
551 public Status awaitInterruptibly() throws InterruptedException {
552 return parent.awaitInterruptibly();
553 }
554
555 public Status awaitInterruptibly(final long time, final TimeUnit timeUnit) throws InterruptedException {
556 return parent.awaitInterruptibly(time, timeUnit);
557 }
558
559 public O get() throws IOException, CancellationException {
560 return type.cast(parent.get());
561 }
562
563 public O getInterruptibly() throws IOException, InterruptedException, CancellationException {
564 return type.cast(parent.getInterruptibly());
565 }
566
567 public IOException getException() throws IllegalStateException {
568 return parent.getException();
569 }
570
571 public <A> IoFuture<O> addNotifier(final Notifier<? super O, A> notifier, final A attachment) {
572 parent.addNotifier(new Notifier<I, A>() {
573 public void notify(final IoFuture<? extends I> future, final A attachment) {
574 notifier.notify(CastingIoFuture.this, attachment);
575 }
576 }, attachment);
577 return this;
578 }
579 }
580
581
587 @SuppressWarnings({ "unchecked" })
588 public static <T> IoFuture.Notifier<T, FutureResult<T>> getManagerNotifier() {
589 return MANAGER_NOTIFIER;
590 }
591
592 @SuppressWarnings("rawtypes")
593 private static final ManagerNotifier MANAGER_NOTIFIER = new ManagerNotifier();
594
595 private static class ManagerNotifier<T extends Channel> extends IoFuture.HandlingNotifier<T, FutureResult<T>> {
596 public void handleCancelled(final FutureResult<T> manager) {
597 manager.setCancelled();
598 }
599
600 public void handleFailed(final IOException exception, final FutureResult<T> manager) {
601 manager.setException(exception);
602 }
603
604 public void handleDone(final T result, final FutureResult<T> manager) {
605 manager.setResult(result);
606 }
607 }
608
609
618 public static <T extends Channel> ChannelSource<T> getRetryingChannelSource(final ChannelSource<T> delegate, final int maxTries) throws IllegalArgumentException {
619 if (maxTries < 1) {
620 throw msg.minRange("maxTries", 1);
621 }
622 return new RetryingChannelSource<T>(maxTries, delegate);
623 }
624
625 private static class RetryingNotifier<T extends Channel> extends IoFuture.HandlingNotifier<T, Result<T>> {
626
627 private volatile int remaining;
628 private final int maxTries;
629 private final Result<T> result;
630 private final ChannelSource<T> delegate;
631 private final ChannelListener<? super T> openListener;
632
633 RetryingNotifier(final int maxTries, final Result<T> result, final ChannelSource<T> delegate, final ChannelListener<? super T> openListener) {
634 this.maxTries = maxTries;
635 this.result = result;
636 this.delegate = delegate;
637 this.openListener = openListener;
638 remaining = maxTries;
639 }
640
641 public void handleFailed(final IOException exception, final Result<T> attachment) {
642 if (remaining-- == 0) {
643 result.setException(new IOException("Failed to create channel after " + maxTries + " tries", exception));
644 return;
645 }
646 tryOne(attachment);
647 }
648
649 public void handleCancelled(final Result<T> attachment) {
650 result.setCancelled();
651 }
652
653 public void handleDone(final T data, final Result<T> attachment) {
654 result.setResult(data);
655 }
656
657 void tryOne(final Result<T> attachment) {
658 final IoFuture<? extends T> ioFuture = delegate.open(openListener);
659 ioFuture.addNotifier(this, attachment);
660 }
661 }
662
663 private static class RetryingChannelSource<T extends Channel> implements ChannelSource<T> {
664
665 private final int maxTries;
666 private final ChannelSource<T> delegate;
667
668 RetryingChannelSource(final int maxTries, final ChannelSource<T> delegate) {
669 this.maxTries = maxTries;
670 this.delegate = delegate;
671 }
672
673 public IoFuture<T> open(final ChannelListener<? super T> openListener) {
674 final FutureResult<T> result = new FutureResult<T>();
675 final IoUtils.RetryingNotifier<T> notifier = new IoUtils.RetryingNotifier<T>(maxTries, result, delegate, openListener);
676 notifier.tryOne(result);
677 return result.getIoFuture();
678 }
679 }
680
681
687 public static Cancellable closingCancellable(final Closeable c) {
688 return new ClosingCancellable(c);
689 }
690
691 private static class ClosingCancellable implements Cancellable {
692
693 private final Closeable c;
694
695 ClosingCancellable(final Closeable c) {
696 this.c = c;
697 }
698
699 public Cancellable cancel() {
700 safeClose(c);
701 return this;
702 }
703 }
704
705
710 public static Cancellable nullCancellable() {
711 return NULL_CANCELLABLE;
712 }
713
714 private static class ResultNotifier<T> extends IoFuture.HandlingNotifier<T, Result<T>> {
715
716 public void handleCancelled(final Result<T> result) {
717 result.setCancelled();
718 }
719
720 public void handleFailed(final IOException exception, final Result<T> result) {
721 result.setException(exception);
722 }
723
724 public void handleDone(final T value, final Result<T> result) {
725 result.setResult(value);
726 }
727 }
728
729
734 public static Random getThreadLocalRandom() {
735 return ThreadLocalRandom.current();
736 }
737 }
738