1 /*
2 * JBoss, Home of Professional Open Source
3 *
4 * Copyright 2008 Red Hat, Inc. and/or its affiliates.
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.xnio.channels;
20
21 import java.io.FileNotFoundException;
22 import java.io.FileOutputStream;
23 import java.io.IOError;
24 import java.io.InterruptedIOException;
25 import java.nio.channels.Channel;
26 import java.nio.channels.FileChannel;
27 import java.security.AccessController;
28 import java.security.PrivilegedAction;
29 import java.util.Locale;
30 import org.xnio.Buffers;
31
32 import java.io.IOException;
33 import java.nio.ByteBuffer;
34 import java.nio.channels.WritableByteChannel;
35 import java.nio.channels.GatheringByteChannel;
36 import java.nio.channels.ReadableByteChannel;
37 import java.nio.channels.ScatteringByteChannel;
38 import java.util.concurrent.TimeUnit;
39 import org.xnio.ChannelListener;
40 import org.xnio.Option;
41 import org.xnio.XnioIoThread;
42
43 /**
44 * A utility class containing static methods to support channel usage.
45 *
46 * @apiviz.exclude
47 */
48 public final class Channels {
49
50 private Channels() {
51 }
52
53 /**
54 * Simple utility method to execute a blocking flush on a writable channel. The method blocks until there are no
55 * remaining bytes in the send queue.
56 *
57 * @param channel the writable channel
58 * @throws IOException if an I/O exception occurs
59 *
60 * @since 2.0
61 */
62 public static void flushBlocking(SuspendableWriteChannel channel) throws IOException {
63 while (! channel.flush()) {
64 channel.awaitWritable();
65 }
66 }
67
68 /**
69 * Simple utility method to execute a blocking flush on a writable channel. The method blocks until there are no
70 * remaining bytes in the send queue or the timeout is reached.
71 *
72 * @param channel the writable channel
73 * @param time the amount of time to wait
74 * @param unit the unit of time to wait
75 * @return true if the channel was successfully flushed, false if the timeout was reached
76 * @throws IOException if an I/O exception occurs
77 *
78 * @since 3.8
79 */
80 public static boolean flushBlocking(SuspendableWriteChannel channel, long time, TimeUnit unit) throws IOException {
81 // In the fast path, the timeout is not used because bytes can be flushed without blocking.
82 if (channel.flush()) {
83 return true;
84 }
85 long remaining = unit.toNanos(time);
86 long now = System.nanoTime();
87 do {
88 // awaitWritable may return spuriously so looping is required.
89 channel.awaitWritable(remaining, TimeUnit.NANOSECONDS);
90 // flush prior to recalculating remaining time to avoid a nanoTime
91 // invocation in the optimal path.
92 if (channel.flush()) {
93 return true;
94 }
95 // Nanotime must only be used in comparison with another nanotime value
96 // This implementation allows us to avoid immediate subsequent nanoTime calls
97 } while ((remaining -= Math.max(-now + (now = System.nanoTime()), 0L)) > 0L);
98 return false;
99 }
100
101 /**
102 * Simple utility method to execute a blocking write shutdown on a writable channel. The method blocks until the
103 * channel's output side is fully shut down.
104 *
105 * @param channel the writable channel
106 * @throws IOException if an I/O exception occurs
107 *
108 * @since 2.0
109 */
110 public static void shutdownWritesBlocking(SuspendableWriteChannel channel) throws IOException {
111 channel.shutdownWrites();
112 flushBlocking(channel);
113 }
114
115 /**
116 * Simple utility method to execute a blocking write shutdown on a writable channel. The method blocks until the
117 * channel's output side is fully shut down or the timeout is reached.
118 *
119 * @param channel the writable channel
120 * @param time the amount of time to wait
121 * @param unit the unit of time to wait
122 * @return true if the channel was successfully flushed, false if the timeout was reached
123 * @throws IOException if an I/O exception occurs
124 *
125 * @since 3.8
126 */
127 public static boolean shutdownWritesBlocking(SuspendableWriteChannel channel, long time, TimeUnit unit) throws IOException {
128 channel.shutdownWrites();
129 return flushBlocking(channel, time, unit);
130 }
131
132 /**
133 * Simple utility method to execute a blocking write on a byte channel. The method blocks until the bytes in the
134 * buffer have been fully written. To ensure that the data is sent, the {@link #flushBlocking(SuspendableWriteChannel)}
135 * method should be called after all writes are complete.
136 *
137 * @param channel the channel to write on
138 * @param buffer the data to write
139 * @param <C> the channel type
140 * @return the number of bytes written
141 * @throws IOException if an I/O exception occurs
142 * @since 1.2
143 */
144 public static <C extends WritableByteChannel & SuspendableWriteChannel> int writeBlocking(C channel, ByteBuffer buffer) throws IOException {
145 int t = 0;
146 while (buffer.hasRemaining()) {
147 final int res = channel.write(buffer);
148 if (res == 0) {
149 channel.awaitWritable();
150 } else {
151 t += res;
152 }
153 }
154 return t;
155 }
156
157 /**
158 * Simple utility method to execute a blocking write on a byte channel with a timeout. The method blocks until
159 * either the bytes in the buffer have been fully written, or the timeout expires, whichever comes first.
160 *
161 * @param channel the channel to write on
162 * @param buffer the data to write
163 * @param time the amount of time to wait
164 * @param unit the unit of time to wait
165 * @param <C> the channel type
166 * @return the number of bytes written
167 * @throws IOException if an I/O exception occurs
168 * @since 1.2
169 */
170 public static <C extends WritableByteChannel & SuspendableWriteChannel> int writeBlocking(C channel, ByteBuffer buffer, long time, TimeUnit unit) throws IOException {
171 long remaining = unit.toNanos(time);
172 long now = System.nanoTime();
173 int t = 0;
174 while (buffer.hasRemaining() && remaining > 0L) {
175 int res = channel.write(buffer);
176 if (res == 0) {
177 channel.awaitWritable(remaining, TimeUnit.NANOSECONDS);
178 remaining -= Math.max(-now + (now = System.nanoTime()), 0L);
179 } else {
180 t += res;
181 }
182 }
183 return t;
184 }
185
186 /**
187 * Simple utility method to execute a blocking write on a gathering byte channel. The method blocks until the
188 * bytes in the buffer have been fully written.
189 *
190 * @param channel the channel to write on
191 * @param buffers the data to write
192 * @param offs the index of the first buffer to write
193 * @param len the number of buffers to write
194 * @param <C> the channel type
195 * @return the number of bytes written
196 * @throws IOException if an I/O exception occurs
197 * @since 1.2
198 */
199 public static <C extends GatheringByteChannel & SuspendableWriteChannel> long writeBlocking(C channel, ByteBuffer[] buffers, int offs, int len) throws IOException {
200 long t = 0;
201 while (Buffers.hasRemaining(buffers, offs, len)) {
202 final long res = channel.write(buffers, offs, len);
203 if (res == 0) {
204 channel.awaitWritable();
205 } else {
206 t += res;
207 }
208 }
209 return t;
210 }
211
212 /**
213 * Simple utility method to execute a blocking write on a gathering byte channel with a timeout. The method blocks until all
214 * the bytes are written, or until the timeout occurs.
215 *
216 * @param channel the channel to write on
217 * @param buffers the data to write
218 * @param offs the index of the first buffer to write
219 * @param len the number of buffers to write
220 * @param time the amount of time to wait
221 * @param unit the unit of time to wait
222 * @param <C> the channel type
223 * @return the number of bytes written
224 * @throws IOException if an I/O exception occurs
225 * @since 1.2
226 */
227 public static <C extends GatheringByteChannel & SuspendableWriteChannel> long writeBlocking(C channel, ByteBuffer[] buffers, int offs, int len, long time, TimeUnit unit) throws IOException {
228 long remaining = unit.toNanos(time);
229 long now = System.nanoTime();
230 long t = 0;
231 while (Buffers.hasRemaining(buffers, offs, len) && remaining > 0L) {
232 long res = channel.write(buffers, offs, len);
233 if (res == 0) {
234 channel.awaitWritable(remaining, TimeUnit.NANOSECONDS);
235 remaining -= Math.max(-now + (now = System.nanoTime()), 0L);
236 } else {
237 t += res;
238 }
239 }
240 return t;
241 }
242
243 /**
244 * Simple utility method to execute a blocking send on a message channel. The method blocks until the message is written.
245 *
246 * @param channel the channel to write on
247 * @param buffer the data to write
248 * @param <C> the channel type
249 * @throws IOException if an I/O exception occurs
250 * @since 1.2
251 */
252 public static <C extends WritableMessageChannel> void sendBlocking(C channel, ByteBuffer buffer) throws IOException {
253 while (! channel.send(buffer)) {
254 channel.awaitWritable();
255 }
256 }
257
258 /**
259 * Simple utility method to execute a blocking send on a message channel with a timeout. The method blocks until the channel
260 * is writable, and then the message is written.
261 *
262 * @param channel the channel to write on
263 * @param buffer the data to write
264 * @param time the amount of time to wait
265 * @param unit the unit of time to wait
266 * @param <C> the channel type
267 * @return the write result
268 * @throws IOException if an I/O exception occurs
269 * @since 1.2
270 */
271 public static <C extends WritableMessageChannel> boolean sendBlocking(C channel, ByteBuffer buffer, long time, TimeUnit unit) throws IOException {
272 long remaining = unit.toNanos(time);
273 long now = System.nanoTime();
274 while (remaining > 0L) {
275 if (!channel.send(buffer)) {
276 channel.awaitWritable(remaining, TimeUnit.NANOSECONDS);
277 remaining -= Math.max(-now + (now = System.nanoTime()), 0L);
278 } else {
279 return true;
280 }
281 }
282 return false;
283 }
284
285 /**
286 * Simple utility method to execute a blocking gathering send on a message channel. The method blocks until the message is written.
287 *
288 * @param channel the channel to write on
289 * @param buffers the data to write
290 * @param offs the index of the first buffer to write
291 * @param len the number of buffers to write
292 * @param <C> the channel type
293 * @throws IOException if an I/O exception occurs
294 * @since 1.2
295 */
296 public static <C extends WritableMessageChannel> void sendBlocking(C channel, ByteBuffer[] buffers, int offs, int len) throws IOException {
297 while (! channel.send(buffers, offs, len)) {
298 channel.awaitWritable();
299 }
300 }
301
302 /**
303 * Simple utility method to execute a blocking gathering send on a message channel with a timeout. The method blocks until either
304 * the message is written or the timeout expires.
305 *
306 * @param channel the channel to write on
307 * @param buffers the data to write
308 * @param offs the index of the first buffer to write
309 * @param len the number of buffers to write
310 * @param time the amount of time to wait
311 * @param unit the unit of time to wait
312 * @param <C> the channel type
313 * @return {@code true} if the message was written before the timeout
314 * @throws IOException if an I/O exception occurs
315 * @since 1.2
316 */
317 public static <C extends WritableMessageChannel> boolean sendBlocking(C channel, ByteBuffer[] buffers, int offs, int len, long time, TimeUnit unit) throws IOException {
318 long remaining = unit.toNanos(time);
319 long now = System.nanoTime();
320 while (remaining > 0L) {
321 if (!channel.send(buffers, offs, len)) {
322 channel.awaitWritable(remaining, TimeUnit.NANOSECONDS);
323 remaining -= Math.max(-now + (now = System.nanoTime()), 0L);
324 } else {
325 return true;
326 }
327 }
328 return false;
329 }
330
331 /**
332 * Simple utility method to execute a blocking read on a readable byte channel. This method blocks until the
333 * channel is readable, and then the message is read.
334 *
335 * @param channel the channel to read from
336 * @param buffer the buffer into which bytes are to be transferred
337 * @param <C> the channel type
338 * @return the number of bytes read
339 * @throws IOException if an I/O exception occurs
340 * @since 1.2
341 */
342 public static <C extends ReadableByteChannel & SuspendableReadChannel> int readBlocking(C channel, ByteBuffer buffer) throws IOException {
343 int res;
344 while ((res = channel.read(buffer)) == 0 && buffer.hasRemaining()) {
345 channel.awaitReadable();
346 }
347 return res;
348 }
349
350 /**
351 * Simple utility method to execute a blocking read on a readable byte channel with a timeout. This method blocks until the
352 * channel is readable, and then the message is read.
353 *
354 * @param channel the channel to read from
355 * @param buffer the buffer into which bytes are to be transferred
356 * @param time the amount of time to wait
357 * @param unit the unit of time to wait
358 * @param <C> the channel type
359 * @return the number of bytes read
360 * @throws IOException if an I/O exception occurs
361 * @since 1.2
362 */
363 public static <C extends ReadableByteChannel & SuspendableReadChannel> int readBlocking(C channel, ByteBuffer buffer, long time, TimeUnit unit) throws IOException {
364 // In the fast path, the timeout is not used because bytes can be read without blocking.
365 int res = channel.read(buffer);
366 if (res != 0) {
367 return res;
368 }
369 long remaining = unit.toNanos(time);
370 long now = System.nanoTime();
371 while (buffer.hasRemaining() && remaining > 0) {
372 // awaitReadable may return spuriously, so looping is required.
373 channel.awaitReadable(remaining, TimeUnit.NANOSECONDS);
374 // read prior to recalculating remaining time to avoid a nanoTime
375 // invocation in the optimal path.
376 res = channel.read(buffer);
377 if (res != 0) {
378 return res;
379 }
380 // Nanotime must only be used in comparison with another nanotime value
381 // This implementation allows us to avoid immediate subsequent nanoTime calls
382 remaining -= Math.max(-now + (now = System.nanoTime()), 0L);
383 }
384 return res;
385 }
386
387 /**
388 * Simple utility method to execute a blocking read on a scattering byte channel. This method blocks until the
389 * channel is readable, and then the message is read.
390 *
391 * @param channel the channel to read from
392 * @param buffers the buffers into which bytes are to be transferred
393 * @param offs the first buffer to use
394 * @param len the number of buffers to use
395 * @param <C> the channel type
396 * @return the number of bytes read
397 * @throws IOException if an I/O exception occurs
398 * @since 1.2
399 */
400 public static <C extends ScatteringByteChannel & SuspendableReadChannel> long readBlocking(C channel, ByteBuffer[] buffers, int offs, int len) throws IOException {
401 long res;
402 while ((res = channel.read(buffers, offs, len)) == 0) {
403 channel.awaitReadable();
404 }
405 return res;
406 }
407
408 /**
409 * Simple utility method to execute a blocking read on a scattering byte channel with a timeout. This method blocks until the
410 * channel is readable, and then the message is read.
411 *
412 * @param channel the channel to read from
413 * @param buffers the buffers into which bytes are to be transferred
414 * @param offs the first buffer to use
415 * @param len the number of buffers to use
416 * @param time the amount of time to wait
417 * @param unit the unit of time to wait
418 * @param <C> the channel type
419 * @return the number of bytes read
420 * @throws IOException if an I/O exception occurs
421 * @since 1.2
422 */
423 public static <C extends ScatteringByteChannel & SuspendableReadChannel> long readBlocking(C channel, ByteBuffer[] buffers, int offs, int len, long time, TimeUnit unit) throws IOException {
424 // In the fast path, the timeout is not used because bytes can be read
425 // without blocking or interaction with the precise clock.
426 long res = channel.read(buffers, offs, len);
427 if (res != 0L) {
428 return res;
429 }
430 long remaining = unit.toNanos(time);
431 long now = System.nanoTime();
432 while (Buffers.hasRemaining(buffers, offs, len) && remaining > 0) {
433 // awaitReadable may return spuriously, looping is required.
434 channel.awaitReadable(remaining, TimeUnit.NANOSECONDS);
435 // read prior to recalculating remaining time to avoid a nanoTime
436 // invocation in the optimal path.
437 res = channel.read(buffers, offs, len);
438 if (res != 0) {
439 return res;
440 }
441 // Nanotime must only be used in comparison with another nanotime value
442 // This implementation allows us to avoid immediate subsequent nanoTime calls
443 remaining -= Math.max(-now + (now = System.nanoTime()), 0L);
444 }
445 return res;
446 }
447
448 /**
449 * Simple utility method to execute a blocking receive on a readable message channel. This method blocks until the
450 * channel is readable, and then the message is received.
451 *
452 * @param channel the channel to read from
453 * @param buffer the buffer into which bytes are to be transferred
454 * @param <C> the channel type
455 * @return the number of bytes read
456 * @throws IOException if an I/O exception occurs
457 * @since 1.2
458 */
459 public static <C extends ReadableMessageChannel> int receiveBlocking(C channel, ByteBuffer buffer) throws IOException {
460 int res;
461 while ((res = channel.receive(buffer)) == 0) {
462 channel.awaitReadable();
463 }
464 return res;
465 }
466
467 /**
468 * Simple utility method to execute a blocking receive on a readable message channel with a timeout. This method blocks until the
469 * channel is readable, and then the message is received.
470 *
471 * @param channel the channel to read from
472 * @param buffer the buffer into which bytes are to be transferred
473 * @param time the amount of time to wait
474 * @param unit the unit of time to wait
475 * @param <C> the channel type
476 * @return the number of bytes read
477 * @throws IOException if an I/O exception occurs
478 * @since 1.2
479 */
480 public static <C extends ReadableMessageChannel> int receiveBlocking(C channel, ByteBuffer buffer, long time, TimeUnit unit) throws IOException {
481 // In the fast path, the timeout is not used because bytes can be read without blocking.
482 int res = channel.receive(buffer);
483 if (res != 0) {
484 return res;
485 }
486 long remaining = unit.toNanos(time);
487 long now = System.nanoTime();
488 while (buffer.hasRemaining() && remaining > 0) {
489 // awaitReadable may return spuriously, looping is required.
490 channel.awaitReadable(remaining, TimeUnit.NANOSECONDS);
491 // read prior to recalculating remaining time to avoid a nanoTime
492 // invocation in the optimal path.
493 res = channel.receive(buffer);
494 if (res != 0) {
495 return res;
496 }
497 // Nanotime must only be used in comparison with another nanotime value
498 // This implementation allows us to avoid immediate subsequent nanoTime calls
499 remaining -= Math.max(-now + (now = System.nanoTime()), 0L);
500 }
501 return res;
502 }
503
504 /**
505 * Simple utility method to execute a blocking receive on a readable message channel. This method blocks until the
506 * channel is readable, and then the message is received.
507 *
508 * @param channel the channel to read from
509 * @param buffers the buffers into which bytes are to be transferred
510 * @param offs the first buffer to use
511 * @param len the number of buffers to use
512 * @param <C> the channel type
513 * @return the number of bytes read
514 * @throws IOException if an I/O exception occurs
515 * @since 1.2
516 */
517 public static <C extends ReadableMessageChannel> long receiveBlocking(C channel, ByteBuffer[] buffers, int offs, int len) throws IOException {
518 long res;
519 while ((res = channel.receive(buffers, offs, len)) == 0) {
520 channel.awaitReadable();
521 }
522 return res;
523 }
524
525 /**
526 * Simple utility method to execute a blocking receive on a readable message channel with a timeout. This method blocks until the
527 * channel is readable, and then the message is received.
528 *
529 * @param channel the channel to read from
530 * @param buffers the buffers into which bytes are to be transferred
531 * @param offs the first buffer to use
532 * @param len the number of buffers to use
533 * @param time the amount of time to wait
534 * @param unit the unit of time to wait
535 * @param <C> the channel type
536 * @return the number of bytes read
537 * @throws IOException if an I/O exception occurs
538 * @since 1.2
539 */
540 public static <C extends ReadableMessageChannel> long receiveBlocking(C channel, ByteBuffer[] buffers, int offs, int len, long time, TimeUnit unit) throws IOException {
541 // In the fast path, the timeout is not used because bytes can be read without blocking.
542 long res = channel.receive(buffers, offs, len);
543 if (res != 0L) {
544 return res;
545 }
546 long remaining = unit.toNanos(time);
547 long now = System.nanoTime();
548 while (Buffers.hasRemaining(buffers, offs, len) && remaining > 0) {
549 // awaitReadable may return spuriously, looping is required.
550 channel.awaitReadable(remaining, TimeUnit.NANOSECONDS);
551 res = channel.receive(buffers, offs, len);
552 if (res != 0) {
553 return res;
554 }
555 // Nanotime must only be used in comparison with another nanotime value
556 // This implementation allows us to avoid immediate subsequent nanoTime calls
557 remaining -= Math.max(-now + (now = System.nanoTime()), 0L);
558 }
559 return res;
560 }
561
562 /**
563 * Simple utility method to execute a blocking accept on an accepting channel. This method blocks until
564 * an accept is possible, and then returns the accepted connection.
565 *
566 * @param channel the accepting channel
567 * @param <C> the connection channel type
568 * @param <A> the accepting channel type
569 * @return the accepted channel
570 * @throws IOException if an I/O error occurs
571 * @since 3.0
572 */
573 public static <C extends ConnectedChannel, A extends AcceptingChannel<C>> C acceptBlocking(A channel) throws IOException {
574 C accepted;
575 while ((accepted = channel.accept()) == null) {
576 channel.awaitAcceptable();
577 }
578 return accepted;
579 }
580
581 /**
582 * Simple utility method to execute a blocking accept on an accepting channel, with a timeout. This method blocks until
583 * an accept is possible, and then returns the accepted connection.
584 *
585 * @param channel the accepting channel
586 * @param time the amount of time to wait
587 * @param unit the unit of time to wait
588 * @param <C> the connection channel type
589 * @param <A> the accepting channel type
590 * @return the accepted channel, or {@code null} if the timeout occurred before a connection was accepted
591 * @throws IOException if an I/O error occurs
592 * @since 3.0
593 */
594 public static <C extends ConnectedChannel, A extends AcceptingChannel<C>> C acceptBlocking(A channel, long time, TimeUnit unit) throws IOException {
595 final C accepted = channel.accept();
596 if (accepted == null) {
597 channel.awaitAcceptable(time, unit);
598 return channel.accept();
599 } else {
600 return accepted;
601 }
602 }
603
604 /**
605 * Transfer bytes between two channels efficiently, blocking if necessary.
606 *
607 * @param destination the destination channel
608 * @param source the source file channel
609 * @param startPosition the start position in the source file
610 * @param count the number of bytes to transfer
611 * @throws IOException if an I/O error occurs
612 */
613 public static void transferBlocking(StreamSinkChannel destination, FileChannel source, long startPosition, final long count) throws IOException {
614 long remaining = count;
615 long res;
616 while (remaining > 0L) {
617 while ((res = destination.transferFrom(source, startPosition, remaining)) == 0L) {
618 try {
619 destination.awaitWritable();
620 } catch (InterruptedIOException e) {
621 final long bytes = count - remaining;
622 if (bytes > (long) Integer.MAX_VALUE) {
623 e.bytesTransferred = -1;
624 } else {
625 e.bytesTransferred = (int) bytes;
626 }
627 }
628 }
629 remaining -= res;
630 startPosition += res;
631 }
632 }
633
634 /**
635 * Transfer bytes between two channels efficiently, blocking if necessary.
636 *
637 * @param destination the destination file channel
638 * @param source the source channel
639 * @param startPosition the start position in the destination file
640 * @param count the number of bytes to transfer
641 * @throws IOException if an I/O error occurs
642 */
643 public static void transferBlocking(FileChannel destination, StreamSourceChannel source, long startPosition, final long count) throws IOException {
644 long remaining = count;
645 long res;
646 while (remaining > 0L) {
647 while ((res = source.transferTo(startPosition, remaining, destination)) == 0L) {
648 try {
649 source.awaitReadable();
650 } catch (InterruptedIOException e) {
651 final long bytes = count - remaining;
652 if (bytes > (long) Integer.MAX_VALUE) {
653 e.bytesTransferred = -1;
654 } else {
655 e.bytesTransferred = (int) bytes;
656 }
657 }
658 }
659 remaining -= res;
660 startPosition += res;
661 }
662 }
663
664 /**
665 * Transfer bytes between two channels efficiently, blocking if necessary.
666 *
667 * @param destination the destination channel
668 * @param source the source channel
669 * @param throughBuffer the buffer to transfer through,
670 * @param count the number of bytes to transfer
671 * @return the number of bytes actually transferred (will be fewer than {@code count} if EOF was reached)
672 * @throws IOException if the transfer fails
673 */
674 public static long transferBlocking(StreamSinkChannel destination, StreamSourceChannel source, ByteBuffer throughBuffer, long count) throws IOException {
675 long t = 0L;
676 long res;
677 while (t < count) {
678 try {
679 while ((res = source.transferTo(count, throughBuffer, destination)) == 0L) {
680 if (throughBuffer.hasRemaining()) {
681 writeBlocking(destination, throughBuffer);
682 } else {
683 source.awaitReadable();
684 }
685 }
686 t += res;
687 } catch (InterruptedIOException e) {
688 int transferred = e.bytesTransferred;
689 t += transferred;
690 if (transferred < 0 || t > (long) Integer.MAX_VALUE) {
691 e.bytesTransferred = -1;
692 } else {
693 e.bytesTransferred = (int) t;
694 }
695 throw e;
696 }
697 if (res == -1L) {
698 return t == 0L ? -1L : t;
699 }
700 }
701 return t;
702 }
703
704 /**
705 * Set the close listener for a channel (type-safe).
706 *
707 * @param channel the channel
708 * @param listener the listener to set
709 * @param <T> the channel type
710 */
711 public static <T extends CloseableChannel> void setCloseListener(T channel, ChannelListener<? super T> listener) {
712 @SuppressWarnings("unchecked")
713 ChannelListener.Setter<? extends T> setter = (ChannelListener.Setter<? extends T>) channel.getCloseSetter();
714 setter.set(listener);
715 }
716
717 /**
718 * Set the accept listener for a channel (type-safe).
719 *
720 * @param channel the channel
721 * @param listener the listener to set
722 * @param <T> the channel type
723 */
724 public static <T extends AcceptingChannel<?>> void setAcceptListener(T channel, ChannelListener<? super T> listener) {
725 @SuppressWarnings("unchecked")
726 ChannelListener.Setter<? extends T> setter = (ChannelListener.Setter<? extends T>) channel.getAcceptSetter();
727 setter.set(listener);
728 }
729
730 /**
731 * Set the read listener for a channel (type-safe).
732 *
733 * @param channel the channel
734 * @param listener the listener to set
735 * @param <T> the channel type
736 */
737 public static <T extends SuspendableReadChannel> void setReadListener(T channel, ChannelListener<? super T> listener) {
738 @SuppressWarnings("unchecked")
739 ChannelListener.Setter<? extends T> setter = (ChannelListener.Setter<? extends T>) channel.getReadSetter();
740 setter.set(listener);
741 }
742
743 /**
744 * Set the write listener for a channel (type-safe).
745 *
746 * @param channel the channel
747 * @param listener the listener to set
748 * @param <T> the channel type
749 */
750 public static <T extends SuspendableWriteChannel> void setWriteListener(T channel, ChannelListener<? super T> listener) {
751 @SuppressWarnings("unchecked")
752 ChannelListener.Setter<? extends T> setter = (ChannelListener.Setter<? extends T>) channel.getWriteSetter();
753 setter.set(listener);
754 }
755
756 /**
757 * Create a wrapper for a byte channel which does not expose other methods.
758 *
759 * @param original the original
760 * @return the wrapped channel
761 */
762 public static ByteChannel wrapByteChannel(final ByteChannel original) {
763 return new ByteChannel() {
764 public int read(final ByteBuffer dst) throws IOException {
765 return original.read(dst);
766 }
767
768 public boolean isOpen() {
769 return original.isOpen();
770 }
771
772 public void close() throws IOException {
773 original.close();
774 }
775
776 public int write(final ByteBuffer src) throws IOException {
777 return original.write(src);
778 }
779
780 public long write(final ByteBuffer[] srcs, final int offset, final int length) throws IOException {
781 return original.write(srcs, offset, length);
782 }
783
784 public long write(final ByteBuffer[] srcs) throws IOException {
785 return original.write(srcs);
786 }
787
788 public long read(final ByteBuffer[] dsts, final int offset, final int length) throws IOException {
789 return original.read(dsts, offset, length);
790 }
791
792 public long read(final ByteBuffer[] dsts) throws IOException {
793 return original.read(dsts);
794 }
795 };
796 }
797
798 /**
799 * Get an option value from a configurable target. If the method throws an exception then the default value
800 * is returned.
801 *
802 * @param configurable the configurable target
803 * @param option the option
804 * @param defaultValue the default value
805 * @param <T> the option value type
806 * @return the value
807 */
808 public static <T> T getOption(Configurable configurable, Option<T> option, T defaultValue) {
809 try {
810 final T value = configurable.getOption(option);
811 return value == null ? defaultValue : value;
812 } catch (IOException e) {
813 return defaultValue;
814 }
815 }
816
817 /**
818 * Get an option value from a configurable target. If the method throws an exception then the default value
819 * is returned.
820 *
821 * @param configurable the configurable target
822 * @param option the option
823 * @param defaultValue the default value
824 * @return the value
825 */
826 public static boolean getOption(Configurable configurable, Option<Boolean> option, boolean defaultValue) {
827 try {
828 final Boolean value = configurable.getOption(option);
829 return value == null ? defaultValue : value.booleanValue();
830 } catch (IOException e) {
831 return defaultValue;
832 }
833 }
834
835 /**
836 * Get an option value from a configurable target. If the method throws an exception then the default value
837 * is returned.
838 *
839 * @param configurable the configurable target
840 * @param option the option
841 * @param defaultValue the default value
842 * @return the value
843 */
844 public static int getOption(Configurable configurable, Option<Integer> option, int defaultValue) {
845 try {
846 final Integer value = configurable.getOption(option);
847 return value == null ? defaultValue : value.intValue();
848 } catch (IOException e) {
849 return defaultValue;
850 }
851 }
852
853 /**
854 * Get an option value from a configurable target. If the method throws an exception then the default value
855 * is returned.
856 *
857 * @param configurable the configurable target
858 * @param option the option
859 * @param defaultValue the default value
860 * @return the value
861 */
862 public static long getOption(Configurable configurable, Option<Long> option, long defaultValue) {
863 try {
864 final Long value = configurable.getOption(option);
865 return value == null ? defaultValue : value.longValue();
866 } catch (IOException e) {
867 return defaultValue;
868 }
869 }
870
871 /**
872 * Unwrap a nested channel type. If the channel does not wrap the target type, {@code null} is returned.
873 *
874 * @param targetType the class to unwrap
875 * @param channel the channel
876 * @param <T> the type to unwrap
877 * @return the unwrapped type, or {@code null} if the given type is not wrapped
878 * @see WrappedChannel
879 */
880 public static <T extends Channel> T unwrap(Class<T> targetType, Channel channel) {
881 for (;;) {
882 if (channel == null) {
883 return null;
884 } else if (targetType.isInstance(channel)) {
885 return targetType.cast(channel);
886 } else if (channel instanceof WrappedChannel) {
887 channel = ((WrappedChannel<?>)channel).getChannel();
888 } else {
889 return null;
890 }
891 }
892 }
893
894 private static final FileChannel NULL_FILE_CHANNEL;
895 private static final ByteBuffer DRAIN_BUFFER = ByteBuffer.allocateDirect(16384);
896
897 /**
898 * Attempt to drain the given number of bytes from the stream source channel.
899 *
900 * @param channel the channel to drain
901 * @param count the number of bytes
902 * @return the number of bytes drained, 0 if reading the channel would block, or -1 if the EOF was reached
903 * @throws IOException if an error occurs
904 */
905 public static long drain(StreamSourceChannel channel, long count) throws IOException {
906 long total = 0L, lres;
907 int ires;
908 ByteBuffer buffer = null;
909 for (;;) {
910 if (count == 0L) return total;
911 if (NULL_FILE_CHANNEL != null) {
912 while (count > 0) {
913 if ((lres = channel.transferTo(0, count, NULL_FILE_CHANNEL)) == 0L) {
914 break;
915 }
916 total += lres;
917 count -= lres;
918 }
919 // jump out quick if we drained the fast way
920 if (total > 0L) return total;
921 }
922 if (buffer == null) buffer = DRAIN_BUFFER.duplicate();
923 if ((long) buffer.limit() > count) buffer.limit((int) count);
924 ires = channel.read(buffer);
925 buffer.clear();
926 switch (ires) {
927 case -1: return total == 0L ? -1L : total;
928 case 0: return total;
929 default: total += (long) ires; count -= (long) ires;
930 }
931 }
932 }
933
934 /**
935 * Attempt to drain the given number of bytes from the readable byte channel.
936 *
937 * @param channel the channel to drain
938 * @param count the number of bytes
939 * @return the number of bytes drained, 0 if reading the channel would block, or -1 if the EOF was reached
940 * @throws IOException if an error occurs
941 */
942 public static long drain(ReadableByteChannel channel, long count) throws IOException {
943 if (channel instanceof StreamSourceChannel) {
944 return drain((StreamSourceChannel) channel, count);
945 } else {
946 long total = 0L, lres;
947 int ires;
948 ByteBuffer buffer = null;
949 for (;;) {
950 if (count == 0L) return total;
951 if (NULL_FILE_CHANNEL != null) {
952 while (count > 0) {
953 if ((lres = NULL_FILE_CHANNEL.transferFrom(channel, 0, count)) == 0L) {
954 break;
955 }
956 total += lres;
957 count -= lres;
958 }
959 // jump out quick if we drained the fast way
960 if (total > 0L) return total;
961 }
962 if (buffer == null) buffer = DRAIN_BUFFER.duplicate();
963 if ((long) buffer.limit() > count) buffer.limit((int) count);
964 ires = channel.read(buffer);
965 buffer.clear();
966 switch (ires) {
967 case -1: return total == 0L ? -1L : total;
968 case 0: return total;
969 default: total += (long) ires; count -= (long) ires;
970 }
971 }
972 }
973 }
974
975 /**
976 * Attempt to drain the given number of bytes from the file channel. This does nothing more than force a
977 * read of bytes in the file.
978 *
979 * @param channel the channel to drain
980 * @param position the position to drain from
981 * @param count the number of bytes
982 * @return the number of bytes drained, 0 if reading the channel would block, or -1 if the EOF was reached
983 * @throws IOException if an error occurs
984 */
985 public static long drain(FileChannel channel, long position, long count) throws IOException {
986 if (channel instanceof StreamSourceChannel) {
987 return drain((StreamSourceChannel) channel, count);
988 } else {
989 long total = 0L, lres;
990 int ires;
991 ByteBuffer buffer = null;
992 for (;;) {
993 if (count == 0L) return total;
994 if (NULL_FILE_CHANNEL != null) {
995 while (count > 0) {
996 if ((lres = channel.transferTo(position, count, NULL_FILE_CHANNEL)) == 0L) {
997 break;
998 }
999 total += lres;
1000 count -= lres;
1001 }
1002 // jump out quick if we drained the fast way
1003 if (total > 0L) return total;
1004 }
1005 if (buffer == null) buffer = DRAIN_BUFFER.duplicate();
1006 if ((long) buffer.limit() > count) buffer.limit((int) count);
1007 ires = channel.read(buffer);
1008 buffer.clear();
1009 switch (ires) {
1010 case -1: return total == 0L ? -1L : total;
1011 case 0: return total;
1012 default: total += (long) ires;
1013 }
1014 }
1015 }
1016 }
1017
1018 /**
1019 * Resume reads asynchronously. Queues a task on the channel's I/O thread to resume. Note that if a channel
1020 * has multiple threads associated with it, the results may not be desirable.
1021 *
1022 * @param channel the channel to resume
1023 */
1024 public static void resumeReadsAsync(final SuspendableReadChannel channel) {
1025 final XnioIoThread ioThread = channel.getIoThread();
1026 if (ioThread == Thread.currentThread()) {
1027 channel.resumeReads();
1028 } else {
1029 ioThread.execute(new Runnable() {
1030 public void run() {
1031 channel.resumeReads();
1032 }
1033 });
1034 }
1035 }
1036
1037 /**
1038 * Resume writes asynchronously. Queues a task on the channel's I/O thread to resume. Note that if a channel
1039 * has multiple threads associated with it, the results may not be desirable.
1040 *
1041 * @param channel the channel to resume
1042 */
1043 public static void resumeWritesAsync(final SuspendableWriteChannel channel) {
1044 final XnioIoThread ioThread = channel.getIoThread();
1045 if (ioThread == Thread.currentThread()) {
1046 channel.resumeWrites();
1047 } else {
1048 ioThread.execute(new Runnable() {
1049 public void run() {
1050 channel.resumeWrites();
1051 }
1052 });
1053 }
1054 }
1055
1056 /**
1057 * Writes out the data in the buffer to the channel. If all the data is written out
1058 * then the channel will have its writes shutdown.
1059 *
1060 * @param channel The channel
1061 * @param src The buffer
1062 * @return The number of bytes written
1063 * @throws IOException
1064 */
1065 public static int writeFinalBasic(StreamSinkChannel channel, ByteBuffer src) throws IOException {
1066 int res = channel.write(src);
1067 if(!src.hasRemaining()) {
1068 channel.shutdownWrites();
1069 }
1070 return res;
1071 }
1072
1073 /**
1074 * Writes out the data in the buffer to the channel. If all the data is written out
1075 * then the channel will have its writes shutdown.
1076 *
1077 * @param channel The channel
1078 * @param srcs The buffers
1079 * @param offset The offset into the srcs array
1080 * @param length The number buffers to write
1081 * @return The number of bytes written
1082 * @throws IOException
1083 */
1084 public static long writeFinalBasic(StreamSinkChannel channel, ByteBuffer[] srcs, int offset, int length) throws IOException {
1085 final long res = channel.write(srcs, offset, length);
1086 if (!Buffers.hasRemaining(srcs, offset, length)) {
1087 channel.shutdownWrites();
1088 }
1089 return res;
1090 }
1091
1092 static {
1093 NULL_FILE_CHANNEL = AccessController.doPrivileged(new PrivilegedAction<FileChannel>() {
1094 public FileChannel run() {
1095 final String osName = System.getProperty("os.name", "unknown").toLowerCase(Locale.US);
1096 try {
1097 if (osName.contains("windows")) {
1098 return new FileOutputStream("NUL:").getChannel();
1099 } else {
1100 return new FileOutputStream("/dev/null").getChannel();
1101 }
1102 } catch (FileNotFoundException e) {
1103 throw new IOError(e);
1104 }
1105 }
1106 });
1107 }
1108 }
1109