1 /*
2  * JBoss, Home of Professional Open Source
3  *
4  * Copyright 2008 Red Hat, Inc. and/or its affiliates.
5  *
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18
19 package org.xnio;
20
21 import java.io.Closeable;
22 import java.io.IOException;
23 import java.net.DatagramSocket;
24 import java.net.ServerSocket;
25 import java.net.Socket;
26 import java.nio.ByteBuffer;
27 import java.nio.channels.ClosedChannelException;
28 import java.nio.channels.ReadableByteChannel;
29 import java.nio.channels.Selector;
30 import java.nio.channels.Channel;
31 import java.nio.channels.WritableByteChannel;
32 import java.util.Random;
33 import java.util.concurrent.CancellationException;
34 import java.util.concurrent.CountDownLatch;
35 import java.util.concurrent.ExecutionException;
36 import java.util.concurrent.Executor;
37 import java.util.concurrent.Future;
38 import java.util.concurrent.ThreadLocalRandom;
39 import java.util.concurrent.TimeUnit;
40 import java.util.concurrent.TimeoutException;
41 import java.util.zip.ZipFile;
42 import org.xnio.channels.SuspendableReadChannel;
43
44 import java.util.logging.Handler;
45
46 import static org.xnio._private.Messages.closeMsg;
47 import static org.xnio._private.Messages.msg;
48
49 /**
50  * General I/O utility methods.
51  *
52  * @apiviz.exclude
53  */

54 public final class IoUtils {
55
56     private static final Executor NULL_EXECUTOR = new Executor() {
57         private final String string = String.format("null executor <%s>", Integer.toHexString(hashCode()));
58
59         public void execute(final Runnable command) {
60             // no operation
61         }
62
63         public String toString() {
64             return string;
65         }
66     };
67     private static final Executor DIRECT_EXECUTOR = new Executor() {
68         private final String string = String.format("direct executor <%s>", Integer.toHexString(hashCode()));
69
70         public void execute(final Runnable command) {
71             command.run();
72         }
73
74         public String toString() {
75             return string;
76         }
77     };
78     private static final Closeable NULL_CLOSEABLE = new Closeable() {
79         private final String string = String.format("null closeable <%s>", Integer.toHexString(hashCode()));
80         public void close() throws IOException {
81             // no operation
82         }
83
84         public String toString() {
85             return string;
86         }
87     };
88     private static final Cancellable NULL_CANCELLABLE = new Cancellable() {
89         public Cancellable cancel() {
90             return this;
91         }
92     };
93     @SuppressWarnings("rawtypes")
94     private static final IoUtils.ResultNotifier RESULT_NOTIFIER = new IoUtils.ResultNotifier();
95
96     private IoUtils() {}
97
98     /**
99      * Get the direct executor.  This is an executor that executes the provided task in the same thread.
100      *
101      * @return a direct executor
102      */

103     public static Executor directExecutor() {
104         return DIRECT_EXECUTOR;
105     }
106
107     /**
108      * Get the null executor.  This is an executor that never actually executes the provided task.
109      *
110      * @return a null executor
111      */

112     public static Executor nullExecutor() {
113         return NULL_EXECUTOR;
114     }
115
116     /**
117      * Get the null closeable.  This is a simple {@code Closeable} instance that does nothing when its {@code close()}
118      * method is invoked.
119      *
120      * @return the null closeable
121      */

122     public static Closeable nullCloseable() {
123         return NULL_CLOSEABLE;
124     }
125
126     /**
127      * Close a resource, logging an error if an error occurs.
128      *
129      * @param resource the resource to close
130      */

131     public static void safeClose(final AutoCloseable resource) {
132         try {
133             if (resource != null) {
134                 closeMsg.closingResource(resource);
135                 resource.close();
136             }
137         } catch (ClosedChannelException ignored) {
138         } catch (Throwable t) {
139             closeMsg.resourceCloseFailed(t, resource);
140         }
141     }
142
143     /**
144      * Close a resource, logging an error if an error occurs.
145      *
146      * @param resource the resource to close
147      */

148     public static void safeClose(final Closeable resource) {
149         try {
150             if (resource != null) {
151                 closeMsg.closingResource(resource);
152                 resource.close();
153             }
154         } catch (ClosedChannelException ignored) {
155             msg.tracef("safeClose, ignoring ClosedChannelException exception");
156         } catch (Throwable t) {
157             closeMsg.resourceCloseFailed(t, resource);
158         }
159     }
160
161     /**
162      * Close a series of resources, logging errors if they occur.
163      *
164      * @param resources the resources to close
165      */

166     public static void safeClose(final Closeable... resources) {
167         for (Closeable resource : resources) {
168             safeClose(resource);
169         }
170     }
171
172     /**
173      * Close a resource, logging an error if an error occurs.
174      *
175      * @param resource the resource to close
176      */

177     public static void safeClose(final Socket resource) {
178         try {
179             if (resource != null) {
180                 closeMsg.closingResource(resource);
181                 resource.close();
182             }
183         } catch (ClosedChannelException ignored) {
184         } catch (Throwable t) {
185             closeMsg.resourceCloseFailed(t, resource);
186         }
187     }
188
189     /**
190      * Close a resource, logging an error if an error occurs.
191      *
192      * @param resource the resource to close
193      */

194     public static void safeClose(final DatagramSocket resource) {
195         try {
196             if (resource != null) {
197                 closeMsg.closingResource(resource);
198                 resource.close();
199             }
200         } catch (Throwable t) {
201             closeMsg.resourceCloseFailed(t, resource);
202         }
203     }
204
205     /**
206      * Close a resource, logging an error if an error occurs.
207      *
208      * @param resource the resource to close
209      */

210     public static void safeClose(final Selector resource) {
211         try {
212             if (resource != null) {
213                 closeMsg.closingResource(resource);
214                 resource.close();
215             }
216         } catch (ClosedChannelException ignored) {
217         } catch (Throwable t) {
218             closeMsg.resourceCloseFailed(t, resource);
219         }
220     }
221
222     /**
223      * Close a resource, logging an error if an error occurs.
224      *
225      * @param resource the resource to close
226      */

227     public static void safeClose(final ServerSocket resource) {
228         try {
229             if (resource != null) {
230                 closeMsg.closingResource(resource);
231                 resource.close();
232             }
233         } catch (ClosedChannelException ignored) {
234         } catch (Throwable t) {
235             closeMsg.resourceCloseFailed(t, resource);
236         }
237     }
238
239     /**
240      * Close a resource, logging an error if an error occurs.
241      *
242      * @param resource the resource to close
243      */

244     public static void safeClose(final ZipFile resource) {
245         try {
246             if (resource != null) {
247                 closeMsg.closingResource(resource);
248                 resource.close();
249             }
250         } catch (Throwable t) {
251             closeMsg.resourceCloseFailed(t, resource);
252         }
253     }
254
255     /**
256      * Close a resource, logging an error if an error occurs.
257      *
258      * @param resource the resource to close
259      */

260     public static void safeClose(final Handler resource) {
261         try {
262             if (resource != null) {
263                 closeMsg.closingResource(resource);
264                 resource.close();
265             }
266         } catch (Throwable t) {
267             closeMsg.resourceCloseFailed(t, resource);
268         }
269     }
270
271     /**
272      * Close a future resource, logging an error if an error occurs.  Attempts to cancel the operation if it is
273      * still in progress.
274      *
275      * @param futureResource the resource to close
276      */

277     public static void safeClose(final IoFuture<? extends Closeable> futureResource) {
278         if (futureResource != null) {
279             futureResource.cancel().addNotifier(closingNotifier(), null);
280         }
281     }
282
283     private static final IoFuture.Notifier<Object, Closeable> ATTACHMENT_CLOSING_NOTIFIER = new IoFuture.Notifier<Object, Closeable>() {
284         public void notify(final IoFuture<?> future, final Closeable attachment) {
285             IoUtils.safeClose(attachment);
286         }
287     };
288
289     private static final IoFuture.Notifier<Closeable, Void> CLOSING_NOTIFIER = new IoFuture.HandlingNotifier<Closeable, Void>() {
290         public void handleDone(final Closeable result, final Void attachment) {
291             IoUtils.safeClose(result);
292         }
293     };
294
295     /**
296      * Get a notifier that closes the attachment.
297      *
298      * @return a notifier which will close its attachment
299      */

300     public static IoFuture.Notifier<Object, Closeable> attachmentClosingNotifier() {
301         return ATTACHMENT_CLOSING_NOTIFIER;
302     }
303
304     /**
305      * Get a notifier that closes the result.
306      *
307      * @return a notifier which will close the result of the operation (if successful)
308      */

309     public static IoFuture.Notifier<Closeable, Void> closingNotifier() {
310         return CLOSING_NOTIFIER;
311     }
312
313     /**
314      * Get a notifier that runs the supplied action.
315      *
316      * @param runnable the notifier type
317      * @param <T> the future type (not used)
318      * @return a notifier which will run the given command
319      */

320     public static <T> IoFuture.Notifier<T, Void> runnableNotifier(final Runnable runnable) {
321         return new IoFuture.Notifier<T, Void>() {
322             public void notify(final IoFuture<? extends T> future, final Void attachment) {
323                 runnable.run();
324             }
325         };
326     }
327
328     /**
329      * Get the result notifier.  This notifier will forward the result of the {@code IoFuture} to the attached
330      * {@code Result}.
331      *
332      * @param <T> the result type
333      * @return the notifier
334      */

335     @SuppressWarnings({ "unchecked" })
336     public static <T> IoFuture.Notifier<T, Result<T>> resultNotifier() {
337         return RESULT_NOTIFIER;
338     }
339
340     /**
341      * Get the notifier that invokes the channel listener given as an attachment.
342      *
343      * @param <T> the channel type
344      * @return the notifier
345      */

346     @SuppressWarnings({ "unchecked" })
347     public static <T extends Channel> IoFuture.Notifier<T, ChannelListener<? super T>> channelListenerNotifier() {
348         return CHANNEL_LISTENER_NOTIFIER;
349     }
350
351     @SuppressWarnings("rawtypes")
352     private static final IoFuture.Notifier CHANNEL_LISTENER_NOTIFIER = new IoFuture.HandlingNotifier<Channel, ChannelListener<? super Channel>>() {
353         @SuppressWarnings({ "unchecked" })
354         public void handleDone(final Channel channel, final ChannelListener channelListener) {
355             channelListener.handleEvent(channel);
356         }
357     };
358
359     /**
360      * Get a {@code java.util.concurrent}-style {@code Future} instance wrapper for an {@code IoFuture} instance.
361      *
362      * @param ioFuture the {@code IoFuture} to wrap
363      * @return a {@code Future}
364      */

365     public static <T> Future<T> getFuture(final IoFuture<T> ioFuture) {
366         return new Future<T>() {
367             public boolean cancel(final boolean mayInterruptIfRunning) {
368                 ioFuture.cancel();
369                 return ioFuture.await() == IoFuture.Status.CANCELLED;
370             }
371
372             public boolean isCancelled() {
373                 return ioFuture.getStatus() == IoFuture.Status.CANCELLED;
374             }
375
376             public boolean isDone() {
377                 return ioFuture.getStatus() == IoFuture.Status.DONE;
378             }
379
380             public T get() throws InterruptedException, ExecutionException {
381                 try {
382                     return ioFuture.getInterruptibly();
383                 } catch (IOException e) {
384                     throw new ExecutionException(e);
385                 }
386             }
387
388             public T get(final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
389                 try {
390                     if (ioFuture.awaitInterruptibly(timeout, unit) == IoFuture.Status.WAITING) {
391                         throw msg.opTimedOut();
392                     }
393                     return ioFuture.getInterruptibly();
394                 } catch (IOException e) {
395                     throw new ExecutionException(e);
396                 }
397             }
398
399             public String toString() {
400                 return String.format("java.util.concurrent.Future wrapper <%s> for %s", Integer.toHexString(hashCode()), ioFuture);
401             }
402         };
403     }
404
405     private static final IoFuture.Notifier<Object, CountDownLatch> COUNT_DOWN_NOTIFIER = new IoFuture.Notifier<Object, CountDownLatch>() {
406         public void notify(final IoFuture<?> future, final CountDownLatch latch) {
407             latch.countDown();
408         }
409     };
410
411     /**
412      * Wait for all the futures to complete.
413      *
414      * @param futures the futures to wait for
415      */

416     public static void awaitAll(IoFuture<?>... futures) {
417         final int len = futures.length;
418         final CountDownLatch cdl = new CountDownLatch(len);
419         for (IoFuture<?> future : futures) {
420             future.addNotifier(COUNT_DOWN_NOTIFIER, cdl);
421         }
422         boolean intr = false;
423         try {
424             while (cdl.getCount() > 0L) {
425                 try {
426                     cdl.await();
427                 } catch (InterruptedException e) {
428                     intr = true;
429                 }
430             }
431         } finally {
432             if (intr) {
433                 Thread.currentThread().interrupt();
434             }
435         }
436     }
437
438     /**
439      * Wait for all the futures to complete.
440      *
441      * @param futures the futures to wait for
442      * @throws InterruptedException if the current thread is interrupted while waiting
443      */

444     public static void awaitAllInterruptibly(IoFuture<?>... futures) throws InterruptedException {
445         final int len = futures.length;
446         final CountDownLatch cdl = new CountDownLatch(len);
447         for (IoFuture<?> future : futures) {
448             future.addNotifier(COUNT_DOWN_NOTIFIER, cdl);
449         }
450         cdl.await();
451     }
452
453     /**
454      * Create an {@code IoFuture} which wraps another {@code IoFuture}, but returns a different type.
455      *
456      * @param parent the original {@code IoFuture}
457      * @param type the class of the new {@code IoFuture}
458      * @param <I> the type of the original result
459      * @param <O> the type of the wrapped result
460      * @return a wrapper {@code IoFuture}
461      */

462     public static <I, O> IoFuture<? extends O> cast(final IoFuture<I> parent, final Class<O> type) {
463         return new CastingIoFuture<O, I>(parent, type);
464     }
465
466     /**
467      * Safely shutdown reads on the given channel.
468      *
469      * @param channel the channel
470      */

471     public static void safeShutdownReads(final SuspendableReadChannel channel) {
472         if (channel != null) {
473             try {
474                 channel.shutdownReads();
475             } catch (IOException e) {
476                 closeMsg.resourceReadShutdownFailed(nullnull);
477             }
478         }
479     }
480
481     /**
482      * Platform-independent channel-to-channel transfer method.  Uses regular {@code read} and {@code write} operations
483      * to move bytes from the {@code source} channel to the {@code sink} channel.  After this call, the {@code throughBuffer}
484      * should be checked for remaining bytes; if there are any, they should be written to the {@code sink} channel before
485      * proceeding.  This method may be used with NIO channels, XNIO channels, or a combination of the two.
486      * <p>
487      * If either or both of the given channels are blocking channels, then this method may block.
488      *
489      * @param source the source channel to read bytes from
490      * @param count the number of bytes to transfer (must be >= {@code 0L})
491      * @param throughBuffer the buffer to transfer through (must not be {@code null})
492      * @param sink the sink channel to write bytes to
493      * @return the number of bytes actually transferred (possibly 0)
494      * @throws IOException if an I/O error occurs during the transfer of bytes
495      */

496     public static long transfer(final ReadableByteChannel source, final long count, final ByteBuffer throughBuffer, final WritableByteChannel sink) throws IOException {
497         long res;
498         long total = 0L;
499         throughBuffer.limit(0);
500         while (total < count) {
501             throughBuffer.compact();
502             try {
503                 if (count - total < (long) throughBuffer.remaining()) {
504                     throughBuffer.limit((int) (count - total));
505                 }
506                 res = source.read(throughBuffer);
507                 if (res <= 0) {
508                     return total == 0L ? res : total;
509                 }
510             } finally {
511                 throughBuffer.flip();
512             }
513             res = sink.write(throughBuffer);
514             if (res == 0) {
515                 return total;
516             }
517             total += res;
518         }
519         return total;
520     }
521
522     // nested classes
523
524     private static class CastingIoFuture<O, I> implements IoFuture<O> {
525
526         private final IoFuture<I> parent;
527         private final Class<O> type;
528
529         private CastingIoFuture(final IoFuture<I> parent, final Class<O> type) {
530             this.parent = parent;
531             this.type = type;
532         }
533
534         public IoFuture<O> cancel() {
535             parent.cancel();
536             return this;
537         }
538
539         public Status getStatus() {
540             return parent.getStatus();
541         }
542
543         public Status await() {
544             return parent.await();
545         }
546
547         public Status await(final long time, final TimeUnit timeUnit) {
548             return parent.await(time, timeUnit);
549         }
550
551         public Status awaitInterruptibly() throws InterruptedException {
552             return parent.awaitInterruptibly();
553         }
554
555         public Status awaitInterruptibly(final long time, final TimeUnit timeUnit) throws InterruptedException {
556             return parent.awaitInterruptibly(time, timeUnit);
557         }
558
559         public O get() throws IOException, CancellationException {
560             return type.cast(parent.get());
561         }
562
563         public O getInterruptibly() throws IOException, InterruptedException, CancellationException {
564             return type.cast(parent.getInterruptibly());
565         }
566
567         public IOException getException() throws IllegalStateException {
568             return parent.getException();
569         }
570
571         public <A> IoFuture<O> addNotifier(final Notifier<? super O, A> notifier, final A attachment) {
572             parent.addNotifier(new Notifier<I, A>() {
573                 public void notify(final IoFuture<? extends I> future, final A attachment) {
574                     notifier.notify(CastingIoFuture.this, attachment);
575                 }
576             }, attachment);
577             return this;
578         }
579     }
580
581     /**
582      * Get a notifier which forwards the result to another {@code IoFuture}'s manager.
583      *
584      * @param <T> the channel type
585      * @return the notifier
586      */

587     @SuppressWarnings({ "unchecked" })
588     public static <T> IoFuture.Notifier<T, FutureResult<T>> getManagerNotifier() {
589         return MANAGER_NOTIFIER;
590     }
591
592     @SuppressWarnings("rawtypes")
593     private static final ManagerNotifier MANAGER_NOTIFIER = new ManagerNotifier();
594
595     private static class ManagerNotifier<T extends Channel> extends IoFuture.HandlingNotifier<T, FutureResult<T>> {
596         public void handleCancelled(final FutureResult<T> manager) {
597             manager.setCancelled();
598         }
599
600         public void handleFailed(final IOException exception, final FutureResult<T> manager) {
601             manager.setException(exception);
602         }
603
604         public void handleDone(final T result, final FutureResult<T> manager) {
605             manager.setResult(result);
606         }
607     }
608
609     /**
610      * A channel source which tries to acquire a channel from a delegate channel source the given number of times before
611      * giving up.
612      *
613      * @param delegate the delegate channel source
614      * @param maxTries the number of times to retry
615      * @param <T> the channel type
616      * @return the retrying channel source
617      */

618     public static <T extends Channel> ChannelSource<T> getRetryingChannelSource(final ChannelSource<T> delegate, final int maxTries) throws IllegalArgumentException {
619         if (maxTries < 1) {
620             throw msg.minRange("maxTries", 1);
621         }
622         return new RetryingChannelSource<T>(maxTries, delegate);
623     }
624
625     private static class RetryingNotifier<T extends Channel> extends IoFuture.HandlingNotifier<T, Result<T>> {
626
627         private volatile int remaining;
628         private final int maxTries;
629         private final Result<T> result;
630         private final ChannelSource<T> delegate;
631         private final ChannelListener<? super T> openListener;
632
633         RetryingNotifier(final int maxTries, final Result<T> result, final ChannelSource<T> delegate, final ChannelListener<? super T> openListener) {
634             this.maxTries = maxTries;
635             this.result = result;
636             this.delegate = delegate;
637             this.openListener = openListener;
638             remaining = maxTries;
639         }
640
641         public void handleFailed(final IOException exception, final Result<T> attachment) {
642             if (remaining-- == 0) {
643                 result.setException(new IOException("Failed to create channel after " + maxTries + " tries", exception));
644                 return;
645             }
646             tryOne(attachment);
647         }
648
649         public void handleCancelled(final Result<T> attachment) {
650             result.setCancelled();
651         }
652
653         public void handleDone(final T data, final Result<T> attachment) {
654             result.setResult(data);
655         }
656
657         void tryOne(final Result<T> attachment) {
658             final IoFuture<? extends T> ioFuture = delegate.open(openListener);
659             ioFuture.addNotifier(this, attachment);
660         }
661     }
662
663     private static class RetryingChannelSource<T extends Channel> implements ChannelSource<T> {
664
665         private final int maxTries;
666         private final ChannelSource<T> delegate;
667
668         RetryingChannelSource(final int maxTries, final ChannelSource<T> delegate) {
669             this.maxTries = maxTries;
670             this.delegate = delegate;
671         }
672
673         public IoFuture<T> open(final ChannelListener<? super T> openListener) {
674             final FutureResult<T> result = new FutureResult<T>();
675             final IoUtils.RetryingNotifier<T> notifier = new IoUtils.RetryingNotifier<T>(maxTries, result, delegate, openListener);
676             notifier.tryOne(result);
677             return result.getIoFuture();
678         }
679     }
680
681     /**
682      * A cancellable which closes the given resource on cancel.
683      *
684      * @param c the resource
685      * @return the cancellable
686      */

687     public static Cancellable closingCancellable(final Closeable c) {
688         return new ClosingCancellable(c);
689     }
690
691     private static class ClosingCancellable implements Cancellable {
692
693         private final Closeable c;
694
695         ClosingCancellable(final Closeable c) {
696             this.c = c;
697         }
698
699         public Cancellable cancel() {
700             safeClose(c);
701             return this;
702         }
703     }
704
705     /**
706      * Get the null cancellable.
707      *
708      * @return the null cancellable
709      */

710     public static Cancellable nullCancellable() {
711         return NULL_CANCELLABLE;
712     }
713
714     private static class ResultNotifier<T> extends IoFuture.HandlingNotifier<T, Result<T>> {
715
716         public void handleCancelled(final Result<T> result) {
717             result.setCancelled();
718         }
719
720         public void handleFailed(final IOException exception, final Result<T> result) {
721             result.setException(exception);
722         }
723
724         public void handleDone(final T value, final Result<T> result) {
725             result.setResult(value);
726         }
727     }
728
729     /**
730      * Get a thread-local RNG.  Do not share this instance with other threads.
731      *
732      * @return the thread-local RNG
733      */

734     public static Random getThreadLocalRandom() {
735         return ThreadLocalRandom.current();
736     }
737 }
738