1 /*
2  * JBoss, Home of Professional Open Source
3  *
4  * Copyright 2008 Red Hat, Inc. and/or its affiliates.
5  *
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18
19 package org.xnio.channels;
20
21 import java.io.FileNotFoundException;
22 import java.io.FileOutputStream;
23 import java.io.IOError;
24 import java.io.InterruptedIOException;
25 import java.nio.channels.Channel;
26 import java.nio.channels.FileChannel;
27 import java.security.AccessController;
28 import java.security.PrivilegedAction;
29 import java.util.Locale;
30 import org.xnio.Buffers;
31
32 import java.io.IOException;
33 import java.nio.ByteBuffer;
34 import java.nio.channels.WritableByteChannel;
35 import java.nio.channels.GatheringByteChannel;
36 import java.nio.channels.ReadableByteChannel;
37 import java.nio.channels.ScatteringByteChannel;
38 import java.util.concurrent.TimeUnit;
39 import org.xnio.ChannelListener;
40 import org.xnio.Option;
41 import org.xnio.XnioIoThread;
42
43 /**
44  * A utility class containing static methods to support channel usage.
45  *
46  * @apiviz.exclude
47  */

48 public final class Channels {
49
50     private Channels() {
51     }
52
53     /**
54      * Simple utility method to execute a blocking flush on a writable channel.  The method blocks until there are no
55      * remaining bytes in the send queue.
56      *
57      * @param channel the writable channel
58      * @throws IOException if an I/O exception occurs
59      *
60      * @since 2.0
61      */

62     public static void flushBlocking(SuspendableWriteChannel channel) throws IOException {
63         while (! channel.flush()) {
64             channel.awaitWritable();
65         }
66     }
67
68     /**
69      * Simple utility method to execute a blocking flush on a writable channel. The method blocks until there are no
70      * remaining bytes in the send queue or the timeout is reached.
71      *
72      * @param channel the writable channel
73      * @param time the amount of time to wait
74      * @param unit the unit of time to wait
75      * @return true if the channel was successfully flushed, false if the timeout was reached
76      * @throws IOException if an I/O exception occurs
77      *
78      * @since 3.8
79      */

80     public static boolean flushBlocking(SuspendableWriteChannel channel, long time, TimeUnit unit) throws IOException {
81         // In the fast path, the timeout is not used because bytes can be flushed without blocking.
82         if (channel.flush()) {
83             return true;
84         }
85         long remaining = unit.toNanos(time);
86         long now = System.nanoTime();
87         do {
88             // awaitWritable may return spuriously so looping is required.
89             channel.awaitWritable(remaining, TimeUnit.NANOSECONDS);
90             // flush prior to recalculating remaining time to avoid a nanoTime
91             // invocation in the optimal path.
92             if (channel.flush()) {
93                 return true;
94             }
95             // Nanotime must only be used in comparison with another nanotime value
96             // This implementation allows us to avoid immediate subsequent nanoTime calls
97         } while ((remaining -= Math.max(-now + (now = System.nanoTime()), 0L)) > 0L);
98         return false;
99     }
100
101     /**
102      * Simple utility method to execute a blocking write shutdown on a writable channel.  The method blocks until the
103      * channel's output side is fully shut down.
104      *
105      * @param channel the writable channel
106      * @throws IOException if an I/O exception occurs
107      *
108      * @since 2.0
109      */

110     public static void shutdownWritesBlocking(SuspendableWriteChannel channel) throws IOException {
111         channel.shutdownWrites();
112         flushBlocking(channel);
113     }
114
115     /**
116      * Simple utility method to execute a blocking write shutdown on a writable channel.  The method blocks until the
117      * channel's output side is fully shut down or the timeout is reached.
118      *
119      * @param channel the writable channel
120      * @param time the amount of time to wait
121      * @param unit the unit of time to wait
122      * @return true if the channel was successfully flushed, false if the timeout was reached
123      * @throws IOException if an I/O exception occurs
124      *
125      * @since 3.8
126      */

127     public static boolean shutdownWritesBlocking(SuspendableWriteChannel channel, long time, TimeUnit unit) throws IOException {
128         channel.shutdownWrites();
129         return flushBlocking(channel, time, unit);
130     }
131
132     /**
133      * Simple utility method to execute a blocking write on a byte channel.  The method blocks until the bytes in the
134      * buffer have been fully written.  To ensure that the data is sent, the {@link #flushBlocking(SuspendableWriteChannel)}
135      * method should be called after all writes are complete.
136      *
137      * @param channel the channel to write on
138      * @param buffer the data to write
139      * @param <C> the channel type
140      * @return the number of bytes written
141      * @throws IOException if an I/O exception occurs
142      * @since 1.2
143      */

144     public static <C extends WritableByteChannel & SuspendableWriteChannel> int writeBlocking(C channel, ByteBuffer buffer) throws IOException {
145         int t = 0;
146         while (buffer.hasRemaining()) {
147             final int res = channel.write(buffer);
148             if (res == 0) {
149                 channel.awaitWritable();
150             } else {
151                 t += res;
152             }
153         }
154         return t;
155     }
156
157     /**
158      * Simple utility method to execute a blocking write on a byte channel with a timeout.  The method blocks until
159      * either the bytes in the buffer have been fully written, or the timeout expires, whichever comes first.
160      *
161      * @param channel the channel to write on
162      * @param buffer the data to write
163      * @param time the amount of time to wait
164      * @param unit the unit of time to wait
165      * @param <C> the channel type
166      * @return the number of bytes written
167      * @throws IOException if an I/O exception occurs
168      * @since 1.2
169      */

170     public static <C extends WritableByteChannel & SuspendableWriteChannel> int writeBlocking(C channel, ByteBuffer buffer, long time, TimeUnit unit) throws IOException {
171         long remaining = unit.toNanos(time);
172         long now = System.nanoTime();
173         int t = 0;
174         while (buffer.hasRemaining() && remaining > 0L) {
175             int res = channel.write(buffer);
176             if (res == 0) {
177                 channel.awaitWritable(remaining, TimeUnit.NANOSECONDS);
178                 remaining -= Math.max(-now + (now = System.nanoTime()), 0L);
179             } else {
180                 t += res;
181             }
182         }
183         return t;
184     }
185
186     /**
187      * Simple utility method to execute a blocking write on a gathering byte channel.  The method blocks until the
188      * bytes in the buffer have been fully written.
189      *
190      * @param channel the channel to write on
191      * @param buffers the data to write
192      * @param offs the index of the first buffer to write
193      * @param len the number of buffers to write
194      * @param <C> the channel type
195      * @return the number of bytes written
196      * @throws IOException if an I/O exception occurs
197      * @since 1.2
198      */

199     public static <C extends GatheringByteChannel & SuspendableWriteChannel> long writeBlocking(C channel, ByteBuffer[] buffers, int offs, int len) throws IOException {
200         long t = 0;
201         while (Buffers.hasRemaining(buffers, offs, len)) {
202             final long res = channel.write(buffers, offs, len);
203             if (res == 0) {
204                 channel.awaitWritable();
205             } else {
206                 t += res;
207             }
208         }
209         return t;
210     }
211
212     /**
213      * Simple utility method to execute a blocking write on a gathering byte channel with a timeout.  The method blocks until all
214      * the bytes are written, or until the timeout occurs.
215      *
216      * @param channel the channel to write on
217      * @param buffers the data to write
218      * @param offs the index of the first buffer to write
219      * @param len the number of buffers to write
220      * @param time the amount of time to wait
221      * @param unit the unit of time to wait
222      * @param <C> the channel type
223      * @return the number of bytes written
224      * @throws IOException if an I/O exception occurs
225      * @since 1.2
226      */

227     public static <C extends GatheringByteChannel & SuspendableWriteChannel> long writeBlocking(C channel, ByteBuffer[] buffers, int offs, int len, long time, TimeUnit unit) throws IOException {
228         long remaining = unit.toNanos(time);
229         long now = System.nanoTime();
230         long t = 0;
231         while (Buffers.hasRemaining(buffers, offs, len) && remaining > 0L) {
232             long res = channel.write(buffers, offs, len);
233             if (res == 0) {
234                 channel.awaitWritable(remaining, TimeUnit.NANOSECONDS);
235                 remaining -= Math.max(-now + (now = System.nanoTime()), 0L);
236             } else {
237                 t += res;
238             }
239         }
240         return t;
241     }
242
243     /**
244      * Simple utility method to execute a blocking send on a message channel.  The method blocks until the message is written.
245      *
246      * @param channel the channel to write on
247      * @param buffer the data to write
248      * @param <C> the channel type
249      * @throws IOException if an I/O exception occurs
250      * @since 1.2
251      */

252     public static <C extends WritableMessageChannel> void sendBlocking(C channel, ByteBuffer buffer) throws IOException {
253         while (! channel.send(buffer)) {
254             channel.awaitWritable();
255         }
256     }
257
258     /**
259      * Simple utility method to execute a blocking send on a message channel with a timeout.  The method blocks until the channel
260      * is writable, and then the message is written.
261      *
262      * @param channel the channel to write on
263      * @param buffer the data to write
264      * @param time the amount of time to wait
265      * @param unit the unit of time to wait
266      * @param <C> the channel type
267      * @return the write result
268      * @throws IOException if an I/O exception occurs
269      * @since 1.2
270      */

271     public static <C extends WritableMessageChannel> boolean sendBlocking(C channel, ByteBuffer buffer, long time, TimeUnit unit) throws IOException {
272         long remaining = unit.toNanos(time);
273         long now = System.nanoTime();
274         while (remaining > 0L) {
275             if (!channel.send(buffer)) {
276                 channel.awaitWritable(remaining, TimeUnit.NANOSECONDS);
277                 remaining -= Math.max(-now + (now = System.nanoTime()), 0L);
278             } else {
279                 return true;
280             }
281         }
282         return false;
283     }
284
285     /**
286      * Simple utility method to execute a blocking gathering send on a message channel.  The method blocks until the message is written.
287      *
288      * @param channel the channel to write on
289      * @param buffers the data to write
290      * @param offs the index of the first buffer to write
291      * @param len the number of buffers to write
292      * @param <C> the channel type
293      * @throws IOException if an I/O exception occurs
294      * @since 1.2
295      */

296     public static <C extends WritableMessageChannel> void sendBlocking(C channel, ByteBuffer[] buffers, int offs, int len) throws IOException {
297         while (! channel.send(buffers, offs, len)) {
298             channel.awaitWritable();
299         }
300     }
301
302     /**
303      * Simple utility method to execute a blocking gathering send on a message channel with a timeout.  The method blocks until either
304      * the message is written or the timeout expires.
305      *
306      * @param channel the channel to write on
307      * @param buffers the data to write
308      * @param offs the index of the first buffer to write
309      * @param len the number of buffers to write
310      * @param time the amount of time to wait
311      * @param unit the unit of time to wait
312      * @param <C> the channel type
313      * @return {@code trueif the message was written before the timeout
314      * @throws IOException if an I/O exception occurs
315      * @since 1.2
316      */

317     public static <C extends WritableMessageChannel> boolean sendBlocking(C channel, ByteBuffer[] buffers, int offs, int len, long time, TimeUnit unit) throws IOException {
318         long remaining = unit.toNanos(time);
319         long now = System.nanoTime();
320         while (remaining > 0L) {
321             if (!channel.send(buffers, offs, len)) {
322                 channel.awaitWritable(remaining, TimeUnit.NANOSECONDS);
323                 remaining -= Math.max(-now + (now = System.nanoTime()), 0L);
324             } else {
325                 return true;
326             }
327         }
328         return false;
329     }
330
331     /**
332      * Simple utility method to execute a blocking read on a readable byte channel.  This method blocks until the
333      * channel is readable, and then the message is read.
334      *
335      * @param channel the channel to read from
336      * @param buffer the buffer into which bytes are to be transferred
337      * @param <C> the channel type
338      * @return the number of bytes read
339      * @throws IOException if an I/O exception occurs
340      * @since 1.2
341      */

342     public static <C extends ReadableByteChannel & SuspendableReadChannel> int readBlocking(C channel, ByteBuffer buffer) throws IOException {
343         int res;
344         while ((res = channel.read(buffer)) == 0 && buffer.hasRemaining()) {
345             channel.awaitReadable();
346         }
347         return res;
348     }
349
350     /**
351      * Simple utility method to execute a blocking read on a readable byte channel with a timeout.  This method blocks until the
352      * channel is readable, and then the message is read.
353      *
354      * @param channel the channel to read from
355      * @param buffer the buffer into which bytes are to be transferred
356      * @param time the amount of time to wait
357      * @param unit the unit of time to wait
358      * @param <C> the channel type
359      * @return the number of bytes read
360      * @throws IOException if an I/O exception occurs
361      * @since 1.2
362      */

363     public static <C extends ReadableByteChannel & SuspendableReadChannel> int readBlocking(C channel, ByteBuffer buffer, long time, TimeUnit unit) throws IOException {
364         // In the fast path, the timeout is not used because bytes can be read without blocking.
365         int res = channel.read(buffer);
366         if (res != 0) {
367             return res;
368         }
369         long remaining = unit.toNanos(time);
370         long now = System.nanoTime();
371         while (buffer.hasRemaining() && remaining > 0) {
372             // awaitReadable may return spuriously, so looping is required.
373             channel.awaitReadable(remaining, TimeUnit.NANOSECONDS);
374             // read prior to recalculating remaining time to avoid a nanoTime
375             // invocation in the optimal path.
376             res = channel.read(buffer);
377             if (res != 0) {
378                 return res;
379             }
380             // Nanotime must only be used in comparison with another nanotime value
381             // This implementation allows us to avoid immediate subsequent nanoTime calls
382             remaining -= Math.max(-now + (now = System.nanoTime()), 0L);
383         }
384         return res;
385     }
386
387     /**
388      * Simple utility method to execute a blocking read on a scattering byte channel.  This method blocks until the
389      * channel is readable, and then the message is read.
390      *
391      * @param channel the channel to read from
392      * @param buffers the buffers into which bytes are to be transferred
393      * @param offs the first buffer to use
394      * @param len the number of buffers to use
395      * @param <C> the channel type
396      * @return the number of bytes read
397      * @throws IOException if an I/O exception occurs
398      * @since 1.2
399      */

400     public static <C extends ScatteringByteChannel & SuspendableReadChannel> long readBlocking(C channel, ByteBuffer[] buffers, int offs, int len) throws IOException {
401         long res;
402         while ((res = channel.read(buffers, offs, len)) == 0) {
403             channel.awaitReadable();
404         }
405         return res;
406     }
407
408     /**
409      * Simple utility method to execute a blocking read on a scattering byte channel with a timeout.  This method blocks until the
410      * channel is readable, and then the message is read.
411      *
412      * @param channel the channel to read from
413      * @param buffers the buffers into which bytes are to be transferred
414      * @param offs the first buffer to use
415      * @param len the number of buffers to use
416      * @param time the amount of time to wait
417      * @param unit the unit of time to wait
418      * @param <C> the channel type
419      * @return the number of bytes read
420      * @throws IOException if an I/O exception occurs
421      * @since 1.2
422      */

423     public static <C extends ScatteringByteChannel & SuspendableReadChannel> long readBlocking(C channel, ByteBuffer[] buffers, int offs, int len, long time, TimeUnit unit) throws IOException {
424         // In the fast path, the timeout is not used because bytes can be read
425         // without blocking or interaction with the precise clock.
426         long res = channel.read(buffers, offs, len);
427         if (res != 0L) {
428             return res;
429         }
430         long remaining = unit.toNanos(time);
431         long now = System.nanoTime();
432         while (Buffers.hasRemaining(buffers, offs, len) && remaining > 0) {
433             // awaitReadable may return spuriously, looping is required.
434             channel.awaitReadable(remaining, TimeUnit.NANOSECONDS);
435             // read prior to recalculating remaining time to avoid a nanoTime
436             // invocation in the optimal path.
437             res = channel.read(buffers, offs, len);
438             if (res != 0) {
439                 return res;
440             }
441             // Nanotime must only be used in comparison with another nanotime value
442             // This implementation allows us to avoid immediate subsequent nanoTime calls
443             remaining -= Math.max(-now + (now = System.nanoTime()), 0L);
444         }
445         return res;
446     }
447
448     /**
449      * Simple utility method to execute a blocking receive on a readable message channel.  This method blocks until the
450      * channel is readable, and then the message is received.
451      *
452      * @param channel the channel to read from
453      * @param buffer the buffer into which bytes are to be transferred
454      * @param <C> the channel type
455      * @return the number of bytes read
456      * @throws IOException if an I/O exception occurs
457      * @since 1.2
458      */

459     public static <C extends ReadableMessageChannel> int receiveBlocking(C channel, ByteBuffer buffer) throws IOException {
460         int res;
461         while ((res = channel.receive(buffer)) == 0) {
462             channel.awaitReadable();
463         }
464         return res;
465     }
466
467     /**
468      * Simple utility method to execute a blocking receive on a readable message channel with a timeout.  This method blocks until the
469      * channel is readable, and then the message is received.
470      *
471      * @param channel the channel to read from
472      * @param buffer the buffer into which bytes are to be transferred
473      * @param time the amount of time to wait
474      * @param unit the unit of time to wait
475      * @param <C> the channel type
476      * @return the number of bytes read
477      * @throws IOException if an I/O exception occurs
478      * @since 1.2
479      */

480     public static <C extends ReadableMessageChannel> int receiveBlocking(C channel, ByteBuffer buffer, long time, TimeUnit unit) throws IOException {
481         // In the fast path, the timeout is not used because bytes can be read without blocking.
482         int res = channel.receive(buffer);
483         if (res != 0) {
484             return res;
485         }
486         long remaining = unit.toNanos(time);
487         long now = System.nanoTime();
488         while (buffer.hasRemaining() && remaining > 0) {
489             // awaitReadable may return spuriously, looping is required.
490             channel.awaitReadable(remaining, TimeUnit.NANOSECONDS);
491             // read prior to recalculating remaining time to avoid a nanoTime
492             // invocation in the optimal path.
493             res = channel.receive(buffer);
494             if (res != 0) {
495                 return res;
496             }
497             // Nanotime must only be used in comparison with another nanotime value
498             // This implementation allows us to avoid immediate subsequent nanoTime calls
499             remaining -= Math.max(-now + (now = System.nanoTime()), 0L);
500         }
501         return res;
502     }
503
504     /**
505      * Simple utility method to execute a blocking receive on a readable message channel.  This method blocks until the
506      * channel is readable, and then the message is received.
507      *
508      * @param channel the channel to read from
509      * @param buffers the buffers into which bytes are to be transferred
510      * @param offs the first buffer to use
511      * @param len the number of buffers to use
512      * @param <C> the channel type
513      * @return the number of bytes read
514      * @throws IOException if an I/O exception occurs
515      * @since 1.2
516      */

517     public static <C extends ReadableMessageChannel> long receiveBlocking(C channel, ByteBuffer[] buffers, int offs, int len) throws IOException {
518         long res;
519         while ((res = channel.receive(buffers, offs, len)) == 0) {
520             channel.awaitReadable();
521         }
522         return res;
523     }
524
525     /**
526      * Simple utility method to execute a blocking receive on a readable message channel with a timeout.  This method blocks until the
527      * channel is readable, and then the message is received.
528      *
529      * @param channel the channel to read from
530      * @param buffers the buffers into which bytes are to be transferred
531      * @param offs the first buffer to use
532      * @param len the number of buffers to use
533      * @param time the amount of time to wait
534      * @param unit the unit of time to wait
535      * @param <C> the channel type
536      * @return the number of bytes read
537      * @throws IOException if an I/O exception occurs
538      * @since 1.2
539      */

540     public static <C extends ReadableMessageChannel> long receiveBlocking(C channel, ByteBuffer[] buffers, int offs, int len, long time, TimeUnit unit) throws IOException {
541         // In the fast path, the timeout is not used because bytes can be read without blocking.
542         long res = channel.receive(buffers, offs, len);
543         if (res != 0L) {
544             return res;
545         }
546         long remaining = unit.toNanos(time);
547         long now = System.nanoTime();
548         while (Buffers.hasRemaining(buffers, offs, len) && remaining > 0) {
549             // awaitReadable may return spuriously, looping is required.
550             channel.awaitReadable(remaining, TimeUnit.NANOSECONDS);
551             res = channel.receive(buffers, offs, len);
552             if (res != 0) {
553                 return res;
554             }
555             // Nanotime must only be used in comparison with another nanotime value
556             // This implementation allows us to avoid immediate subsequent nanoTime calls
557             remaining -= Math.max(-now + (now = System.nanoTime()), 0L);
558         }
559         return res;
560     }
561
562     /**
563      * Simple utility method to execute a blocking accept on an accepting channel.  This method blocks until
564      * an accept is possible, and then returns the accepted connection.
565      *
566      * @param channel the accepting channel
567      * @param <C> the connection channel type
568      * @param <A> the accepting channel type
569      * @return the accepted channel
570      * @throws IOException if an I/O error occurs
571      * @since 3.0
572      */

573     public static <C extends ConnectedChannel, A extends AcceptingChannel<C>> C acceptBlocking(A channel) throws IOException {
574         C accepted;
575         while ((accepted = channel.accept()) == null) {
576             channel.awaitAcceptable();
577         }
578         return accepted;
579     }
580
581     /**
582      * Simple utility method to execute a blocking accept on an accepting channel, with a timeout.  This method blocks until
583      * an accept is possible, and then returns the accepted connection.
584      *
585      * @param channel the accepting channel
586      * @param time the amount of time to wait
587      * @param unit the unit of time to wait
588      * @param <C> the connection channel type
589      * @param <A> the accepting channel type
590      * @return the accepted channel, or {@code nullif the timeout occurred before a connection was accepted
591      * @throws IOException if an I/O error occurs
592      * @since 3.0
593      */

594     public static <C extends ConnectedChannel, A extends AcceptingChannel<C>> C acceptBlocking(A channel, long time, TimeUnit unit) throws IOException {
595         final C accepted = channel.accept();
596         if (accepted == null) {
597             channel.awaitAcceptable(time, unit);
598             return channel.accept();
599         } else {
600             return accepted;
601         }
602     }
603
604     /**
605      * Transfer bytes between two channels efficiently, blocking if necessary.
606      *
607      * @param destination the destination channel
608      * @param source the source file channel
609      * @param startPosition the start position in the source file
610      * @param count the number of bytes to transfer
611      * @throws IOException if an I/O error occurs
612      */

613     public static void transferBlocking(StreamSinkChannel destination, FileChannel source, long startPosition, final long count) throws IOException {
614         long remaining = count;
615         long res;
616         while (remaining > 0L) {
617             while ((res = destination.transferFrom(source, startPosition, remaining)) == 0L) {
618                 try {
619                     destination.awaitWritable();
620                 } catch (InterruptedIOException e) {
621                     final long bytes = count - remaining;
622                     if (bytes > (long) Integer.MAX_VALUE) {
623                         e.bytesTransferred = -1;
624                     } else {
625                         e.bytesTransferred = (int) bytes;
626                     }
627                 }
628             }
629             remaining -= res;
630             startPosition += res;
631         }
632     }
633
634     /**
635      * Transfer bytes between two channels efficiently, blocking if necessary.
636      *
637      * @param destination the destination file channel
638      * @param source the source channel
639      * @param startPosition the start position in the destination file
640      * @param count the number of bytes to transfer
641      * @throws IOException if an I/O error occurs
642      */

643     public static void transferBlocking(FileChannel destination, StreamSourceChannel source, long startPosition, final long count) throws IOException {
644         long remaining = count;
645         long res;
646         while (remaining > 0L) {
647             while ((res = source.transferTo(startPosition, remaining, destination)) == 0L) {
648                 try {
649                     source.awaitReadable();
650                 } catch (InterruptedIOException e) {
651                     final long bytes = count - remaining;
652                     if (bytes > (long) Integer.MAX_VALUE) {
653                         e.bytesTransferred = -1;
654                     } else {
655                         e.bytesTransferred = (int) bytes;
656                     }
657                 }
658             }
659             remaining -= res;
660             startPosition += res;
661         }
662     }
663
664     /**
665      * Transfer bytes between two channels efficiently, blocking if necessary.
666      *
667      * @param destination the destination channel
668      * @param source the source channel
669      * @param throughBuffer the buffer to transfer through,
670      * @param count the number of bytes to transfer
671      * @return the number of bytes actually transferred (will be fewer than {@code count} if EOF was reached)
672      * @throws IOException if the transfer fails
673      */

674     public static long transferBlocking(StreamSinkChannel destination, StreamSourceChannel source, ByteBuffer throughBuffer, long count) throws IOException {
675         long t = 0L;
676         long res;
677         while (t < count) {
678             try {
679                 while ((res = source.transferTo(count, throughBuffer, destination)) == 0L) {
680                     if (throughBuffer.hasRemaining()) {
681                         writeBlocking(destination, throughBuffer);
682                     } else {
683                         source.awaitReadable();
684                     }
685                 }
686                 t += res;
687             } catch (InterruptedIOException e) {
688                 int transferred = e.bytesTransferred;
689                 t += transferred;
690                 if (transferred < 0 || t > (long) Integer.MAX_VALUE) {
691                     e.bytesTransferred = -1;
692                 } else {
693                     e.bytesTransferred = (int) t;
694                 }
695                 throw e;
696             }
697             if (res == -1L) {
698                 return t == 0L ? -1L : t;
699             }
700         }
701         return t;
702     }
703
704     /**
705      * Set the close listener for a channel (type-safe).
706      *
707      * @param channel the channel
708      * @param listener the listener to set
709      * @param <T> the channel type
710      */

711     public static <T extends CloseableChannel> void setCloseListener(T channel, ChannelListener<? super T> listener) {
712         @SuppressWarnings("unchecked")
713         ChannelListener.Setter<? extends T> setter = (ChannelListener.Setter<? extends T>) channel.getCloseSetter();
714         setter.set(listener);
715     }
716
717     /**
718      * Set the accept listener for a channel (type-safe).
719      *
720      * @param channel the channel
721      * @param listener the listener to set
722      * @param <T> the channel type
723      */

724     public static <T extends AcceptingChannel<?>> void setAcceptListener(T channel, ChannelListener<? super T> listener) {
725         @SuppressWarnings("unchecked")
726         ChannelListener.Setter<? extends T> setter = (ChannelListener.Setter<? extends T>) channel.getAcceptSetter();
727         setter.set(listener);
728     }
729
730     /**
731      * Set the read listener for a channel (type-safe).
732      *
733      * @param channel the channel
734      * @param listener the listener to set
735      * @param <T> the channel type
736      */

737     public static <T extends SuspendableReadChannel> void setReadListener(T channel, ChannelListener<? super T> listener) {
738         @SuppressWarnings("unchecked")
739         ChannelListener.Setter<? extends T> setter = (ChannelListener.Setter<? extends T>) channel.getReadSetter();
740         setter.set(listener);
741     }
742
743     /**
744      * Set the write listener for a channel (type-safe).
745      *
746      * @param channel the channel
747      * @param listener the listener to set
748      * @param <T> the channel type
749      */

750     public static <T extends SuspendableWriteChannel> void setWriteListener(T channel, ChannelListener<? super T> listener) {
751         @SuppressWarnings("unchecked")
752         ChannelListener.Setter<? extends T> setter = (ChannelListener.Setter<? extends T>) channel.getWriteSetter();
753         setter.set(listener);
754     }
755
756     /**
757      * Create a wrapper for a byte channel which does not expose other methods.
758      *
759      * @param original the original
760      * @return the wrapped channel
761      */

762     public static ByteChannel wrapByteChannel(final ByteChannel original) {
763         return new ByteChannel() {
764             public int read(final ByteBuffer dst) throws IOException {
765                 return original.read(dst);
766             }
767
768             public boolean isOpen() {
769                 return original.isOpen();
770             }
771
772             public void close() throws IOException {
773                 original.close();
774             }
775
776             public int write(final ByteBuffer src) throws IOException {
777                 return original.write(src);
778             }
779
780             public long write(final ByteBuffer[] srcs, final int offset, final int length) throws IOException {
781                 return original.write(srcs, offset, length);
782             }
783
784             public long write(final ByteBuffer[] srcs) throws IOException {
785                 return original.write(srcs);
786             }
787
788             public long read(final ByteBuffer[] dsts, final int offset, final int length) throws IOException {
789                 return original.read(dsts, offset, length);
790             }
791
792             public long read(final ByteBuffer[] dsts) throws IOException {
793                 return original.read(dsts);
794             }
795         };
796     }
797
798     /**
799      * Get an option value from a configurable target.  If the method throws an exception then the default value
800      * is returned.
801      *
802      * @param configurable the configurable target
803      * @param option the option
804      * @param defaultValue the default value
805      * @param <T> the option value type
806      * @return the value
807      */

808     public static <T> T getOption(Configurable configurable, Option<T> option, T defaultValue) {
809         try {
810             final T value = configurable.getOption(option);
811             return value == null ? defaultValue : value;
812         } catch (IOException e) {
813             return defaultValue;
814         }
815     }
816
817     /**
818      * Get an option value from a configurable target.  If the method throws an exception then the default value
819      * is returned.
820      *
821      * @param configurable the configurable target
822      * @param option the option
823      * @param defaultValue the default value
824      * @return the value
825      */

826     public static boolean getOption(Configurable configurable, Option<Boolean> option, boolean defaultValue) {
827         try {
828             final Boolean value = configurable.getOption(option);
829             return value == null ? defaultValue : value.booleanValue();
830         } catch (IOException e) {
831             return defaultValue;
832         }
833     }
834
835     /**
836      * Get an option value from a configurable target.  If the method throws an exception then the default value
837      * is returned.
838      *
839      * @param configurable the configurable target
840      * @param option the option
841      * @param defaultValue the default value
842      * @return the value
843      */

844     public static int getOption(Configurable configurable, Option<Integer> option, int defaultValue) {
845         try {
846             final Integer value = configurable.getOption(option);
847             return value == null ? defaultValue : value.intValue();
848         } catch (IOException e) {
849             return defaultValue;
850         }
851     }
852
853     /**
854      * Get an option value from a configurable target.  If the method throws an exception then the default value
855      * is returned.
856      *
857      * @param configurable the configurable target
858      * @param option the option
859      * @param defaultValue the default value
860      * @return the value
861      */

862     public static long getOption(Configurable configurable, Option<Long> option, long defaultValue) {
863         try {
864             final Long value = configurable.getOption(option);
865             return value == null ? defaultValue : value.longValue();
866         } catch (IOException e) {
867             return defaultValue;
868         }
869     }
870
871     /**
872      * Unwrap a nested channel type.  If the channel does not wrap the target type, {@code null} is returned.
873      *
874      * @param targetType the class to unwrap
875      * @param channel the channel
876      * @param <T> the type to unwrap
877      * @return the unwrapped type, or {@code nullif the given type is not wrapped
878      * @see WrappedChannel
879      */

880     public static <T extends Channel> T unwrap(Class<T> targetType, Channel channel) {
881         for (;;) {
882             if (channel == null) {
883                 return null;
884             } else if (targetType.isInstance(channel)) {
885                 return targetType.cast(channel);
886             } else if (channel instanceof WrappedChannel) {
887                 channel = ((WrappedChannel<?>)channel).getChannel();
888             } else {
889                 return null;
890             }
891         }
892     }
893
894     private static final FileChannel NULL_FILE_CHANNEL;
895     private static final ByteBuffer DRAIN_BUFFER = ByteBuffer.allocateDirect(16384);
896
897     /**
898      * Attempt to drain the given number of bytes from the stream source channel.
899      *
900      * @param channel the channel to drain
901      * @param count the number of bytes
902      * @return the number of bytes drained, 0 if reading the channel would block, or -1 if the EOF was reached
903      * @throws IOException if an error occurs
904      */

905     public static long drain(StreamSourceChannel channel, long count) throws IOException {
906         long total = 0L, lres;
907         int ires;
908         ByteBuffer buffer = null;
909         for (;;) {
910             if (count == 0L) return total;
911             if (NULL_FILE_CHANNEL != null) {
912                 while (count > 0) {
913                     if ((lres = channel.transferTo(0, count, NULL_FILE_CHANNEL)) == 0L) {
914                         break;
915                     }
916                     total += lres;
917                     count -= lres;
918                 }
919                 // jump out quick if we drained the fast way
920                 if (total > 0L) return total;
921             }
922             if (buffer == null) buffer = DRAIN_BUFFER.duplicate();
923             if ((long) buffer.limit() > count) buffer.limit((int) count);
924             ires = channel.read(buffer);
925             buffer.clear();
926             switch (ires) {
927                 case -1: return total == 0L ? -1L : total;
928                 case 0: return total;
929                 default: total += (long) ires; count -= (long) ires;
930             }
931         }
932     }
933
934     /**
935      * Attempt to drain the given number of bytes from the readable byte channel.
936      *
937      * @param channel the channel to drain
938      * @param count the number of bytes
939      * @return the number of bytes drained, 0 if reading the channel would block, or -1 if the EOF was reached
940      * @throws IOException if an error occurs
941      */

942     public static long drain(ReadableByteChannel channel, long count) throws IOException {
943         if (channel instanceof StreamSourceChannel) {
944             return drain((StreamSourceChannel) channel, count);
945         } else {
946             long total = 0L, lres;
947             int ires;
948             ByteBuffer buffer = null;
949             for (;;) {
950                 if (count == 0L) return total;
951                 if (NULL_FILE_CHANNEL != null) {
952                     while (count > 0) {
953                         if ((lres = NULL_FILE_CHANNEL.transferFrom(channel, 0, count)) == 0L) {
954                             break;
955                         }
956                         total += lres;
957                         count -= lres;
958                     }
959                     // jump out quick if we drained the fast way
960                     if (total > 0L) return total;
961                 }
962                 if (buffer == null) buffer = DRAIN_BUFFER.duplicate();
963                 if ((long) buffer.limit() > count) buffer.limit((int) count);
964                 ires = channel.read(buffer);
965                 buffer.clear();
966                 switch (ires) {
967                     case -1: return total == 0L ? -1L : total;
968                     case 0: return total;
969                     default: total += (long) ires; count -= (long) ires;
970                 }
971             }
972         }
973     }
974
975     /**
976      * Attempt to drain the given number of bytes from the file channel.  This does nothing more than force a
977      * read of bytes in the file.
978      *
979      * @param channel the channel to drain
980      * @param position the position to drain from
981      * @param count the number of bytes
982      * @return the number of bytes drained, 0 if reading the channel would block, or -1 if the EOF was reached
983      * @throws IOException if an error occurs
984      */

985     public static long drain(FileChannel channel, long position, long count) throws IOException {
986         if (channel instanceof StreamSourceChannel) {
987             return drain((StreamSourceChannel) channel, count);
988         } else {
989             long total = 0L, lres;
990             int ires;
991             ByteBuffer buffer = null;
992             for (;;) {
993                 if (count == 0L) return total;
994                 if (NULL_FILE_CHANNEL != null) {
995                     while (count > 0) {
996                         if ((lres = channel.transferTo(position, count, NULL_FILE_CHANNEL)) == 0L) {
997                             break;
998                         }
999                         total += lres;
1000                         count -= lres;
1001                     }
1002                     // jump out quick if we drained the fast way
1003                     if (total > 0L) return total;
1004                 }
1005                 if (buffer == null) buffer = DRAIN_BUFFER.duplicate();
1006                 if ((long) buffer.limit() > count) buffer.limit((int) count);
1007                 ires = channel.read(buffer);
1008                 buffer.clear();
1009                 switch (ires) {
1010                     case -1: return total == 0L ? -1L : total;
1011                     case 0: return total;
1012                     default: total += (long) ires;
1013                 }
1014             }
1015         }
1016     }
1017
1018     /**
1019      * Resume reads asynchronously.  Queues a task on the channel's I/O thread to resume.  Note that if a channel
1020      * has multiple threads associated with it, the results may not be desirable.
1021      *
1022      * @param channel the channel to resume
1023      */

1024     public static void resumeReadsAsync(final SuspendableReadChannel channel) {
1025         final XnioIoThread ioThread = channel.getIoThread();
1026         if (ioThread == Thread.currentThread()) {
1027             channel.resumeReads();
1028         } else {
1029             ioThread.execute(new Runnable() {
1030                 public void run() {
1031                     channel.resumeReads();
1032                 }
1033             });
1034         }
1035     }
1036
1037     /**
1038      * Resume writes asynchronously.  Queues a task on the channel's I/O thread to resume.  Note that if a channel
1039      * has multiple threads associated with it, the results may not be desirable.
1040      *
1041      * @param channel the channel to resume
1042      */

1043     public static void resumeWritesAsync(final SuspendableWriteChannel channel) {
1044         final XnioIoThread ioThread = channel.getIoThread();
1045         if (ioThread == Thread.currentThread()) {
1046             channel.resumeWrites();
1047         } else {
1048             ioThread.execute(new Runnable() {
1049                 public void run() {
1050                     channel.resumeWrites();
1051                 }
1052             });
1053         }
1054     }
1055
1056     /**
1057      * Writes out the data in the buffer to the channel. If all the data is written out
1058      * then the channel will have its writes shutdown.
1059      *
1060      * @param channel The channel
1061      * @param src The buffer
1062      * @return The number of bytes written
1063      * @throws IOException
1064      */

1065     public static int writeFinalBasic(StreamSinkChannel channel, ByteBuffer src) throws IOException {
1066         int res = channel.write(src);
1067         if(!src.hasRemaining()) {
1068             channel.shutdownWrites();
1069         }
1070         return res;
1071     }
1072
1073     /**
1074      * Writes out the data in the buffer to the channel. If all the data is written out
1075      * then the channel will have its writes shutdown.
1076      *
1077      * @param channel The channel
1078      * @param srcs The buffers
1079      * @param offset The offset into the srcs array
1080      * @param length The number buffers to write
1081      * @return The number of bytes written
1082      * @throws IOException
1083      */

1084     public static long writeFinalBasic(StreamSinkChannel channel, ByteBuffer[] srcs, int offset, int length) throws IOException {
1085         final long res = channel.write(srcs, offset, length);
1086         if (!Buffers.hasRemaining(srcs, offset, length)) {
1087             channel.shutdownWrites();
1088         }
1089         return res;
1090     }
1091
1092     static {
1093         NULL_FILE_CHANNEL = AccessController.doPrivileged(new PrivilegedAction<FileChannel>() {
1094             public FileChannel run() {
1095                 final String osName = System.getProperty("os.name""unknown").toLowerCase(Locale.US);
1096                 try {
1097                     if (osName.contains("windows")) {
1098                         return new FileOutputStream("NUL:").getChannel();
1099                     } else {
1100                         return new FileOutputStream("/dev/null").getChannel();
1101                     }
1102                 } catch (FileNotFoundException e) {
1103                     throw new IOError(e);
1104                 }
1105             }
1106         });
1107     }
1108 }
1109