1 /*
2  * JBoss, Home of Professional Open Source
3  *
4  * Copyright 2011 Red Hat, Inc. and/or its affiliates, and individual
5  * contributors as indicated by the @author tags.
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *     http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */

19
20 package org.xnio;
21
22 import java.io.Closeable;
23 import java.io.EOFException;
24 import java.io.IOException;
25 import java.nio.ByteBuffer;
26 import java.nio.channels.Channel;
27 import java.nio.channels.FileChannel;
28 import java.util.concurrent.Executor;
29 import java.util.concurrent.RejectedExecutionException;
30 import java.util.concurrent.atomic.AtomicReference;
31 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
32 import org.xnio.channels.AcceptingChannel;
33 import org.xnio.channels.Channels;
34 import org.xnio.channels.ConnectedChannel;
35 import org.xnio.channels.StreamSinkChannel;
36 import org.xnio.channels.StreamSourceChannel;
37 import org.xnio.channels.SuspendableReadChannel;
38 import org.xnio.channels.SuspendableWriteChannel;
39 import org.xnio.channels.WritableMessageChannel;
40
41 import static org.xnio._private.Messages.listenerMsg;
42 import static org.xnio._private.Messages.msg;
43
44 /**
45  * Channel listener utility methods.
46  *
47  * @author <a href="mailto:david.lloyd@redhat.com">David M. Lloyd</a>
48  */

49 @SuppressWarnings("unused")
50 public final class ChannelListeners {
51
52     private static final ChannelListener<Channel> NULL_LISTENER = new ChannelListener<Channel>() {
53         public void handleEvent(final Channel channel) {
54         }
55
56         public String toString() {
57             return "Null channel listener";
58         }
59     };
60     private static final ChannelListener.Setter<?> NULL_SETTER = new ChannelListener.Setter<Channel>() {
61         public void set(final ChannelListener<? super Channel> channelListener) {
62         }
63
64         public String toString() {
65             return "Null channel listener setter";
66         }
67     };
68     private static ChannelListener<Channel> CLOSING_CHANNEL_LISTENER = new ChannelListener<Channel>() {
69         public void handleEvent(final Channel channel) {
70             IoUtils.safeClose(channel);
71         }
72
73         public String toString() {
74             return "Closing channel listener";
75         }
76     };
77
78     private ChannelListeners() {
79     }
80
81     /**
82      * Invoke a channel listener on a given channel, logging any errors.
83      *
84      * @param channel the channel
85      * @param channelListener the channel listener
86      * @param <T> the channel type
87      * @return {@code trueif the listener completed successfully, or {@code falseif it failed
88      */

89     public static <T extends Channel> boolean invokeChannelListener(T channel, ChannelListener<? super T> channelListener) {
90         if (channelListener != nulltry {
91             listenerMsg.tracef("Invoking listener %s on channel %s", channelListener, channel);
92             channelListener.handleEvent(channel);
93         } catch (Throwable t) {
94             listenerMsg.listenerException(t);
95             return false;
96         }
97         return true;
98     }
99
100     /**
101      * Invoke a channel listener on a given channel, logging any errors, using the given executor.
102      *
103      * @param executor the executor
104      * @param channel the channel
105      * @param channelListener the channel listener
106      * @param <T> the channel type
107      */

108     public static <T extends Channel> void invokeChannelListener(Executor executor, T channel, ChannelListener<? super T> channelListener) {
109         try {
110             executor.execute(getChannelListenerTask(channel, channelListener));
111         } catch (RejectedExecutionException ree) {
112             invokeChannelListener(channel, channelListener);
113         }
114     }
115
116     /**
117      * Safely invoke a channel exception handler, logging any errors.
118      *
119      * @param channel the channel
120      * @param exceptionHandler the exception handler
121      * @param exception the exception to pass in
122      * @param <T> the exception type
123      */

124     public static <T extends Channel> void invokeChannelExceptionHandler(final T channel, final ChannelExceptionHandler<? super T> exceptionHandler, final IOException exception) {
125         try {
126             exceptionHandler.handleException(channel, exception);
127         } catch (Throwable t) {
128             listenerMsg.exceptionHandlerException(t);
129         }
130     }
131
132     /**
133      * Get a task which invokes the given channel listener on the given channel.
134      *
135      * @param channel the channel
136      * @param channelListener the channel listener
137      * @param <T> the channel type
138      * @return the runnable task
139      */

140     public static <T extends Channel> Runnable getChannelListenerTask(final T channel, final ChannelListener<? super T> channelListener) {
141         return new Runnable() {
142             public String toString() {
143                 return "Channel listener task for " + channel + " -> " + channelListener;
144             }
145
146             public void run() {
147                 invokeChannelListener(channel, channelListener);
148             }
149         };
150     }
151
152     /**
153      * Get a task which invokes the given channel listener on the given channel via its setter.
154      *
155      * @param channel the channel
156      * @param setter the setter for the channel listener
157      * @param <T> the channel type
158      * @return the runnable task
159      */

160     public static <T extends Channel> Runnable getChannelListenerTask(final T channel, final ChannelListener.SimpleSetter<T> setter) {
161         return new Runnable() {
162             public String toString() {
163                 return "Channel listener task for " + channel + " -> " + setter;
164             }
165
166             public void run() {
167                 invokeChannelListener(channel, setter.get());
168             }
169         };
170     }
171
172     /**
173      * Get a channel listener which closes the channel when notified.
174      *
175      * @return the channel listener
176      */

177     public static ChannelListener<Channel> closingChannelListener() {
178         return CLOSING_CHANNEL_LISTENER;
179     }
180
181     /**
182      * Get a channel listener which closes the given resource when notified.
183      *
184      * @param resource the resource to close
185      * @return the channel listener
186      */

187     public static ChannelListener<Channel> closingChannelListener(final Closeable resource) {
188         return new ChannelListener<Channel>() {
189             public void handleEvent(final Channel channel) {
190                 IoUtils.safeClose(resource);
191             }
192
193             public String toString() {
194                 return "Closing channel listener for " + resource;
195             }
196         };
197     }
198
199     /**
200      * Get a channel listener which closes the given resources when notified.
201      *
202      * @param resources the resources to close
203      * @return the channel listener
204      */

205     public static ChannelListener<Channel> closingChannelListener(final Closeable... resources) {
206         return new ChannelListener<Channel>() {
207             public void handleEvent(final Channel channel) {
208                 IoUtils.safeClose(resources);
209             }
210
211             public String toString() {
212                 return "Closing channel listener for " + resources.length + " items";
213             }
214         };
215     }
216
217     /**
218      * Get a channel listener which closes the given resource when notified.
219      *
220      * @param delegate the listener to call next
221      * @param resource the resource to close
222      * @return the channel listener
223      */

224     public static <T extends Channel> ChannelListener<T> closingChannelListener(final ChannelListener<T> delegate, final Closeable resource) {
225         return new ChannelListener<T>() {
226             public void handleEvent(final T channel) {
227                 IoUtils.safeClose(resource);
228                 delegate.handleEvent(channel);
229             }
230
231             public String toString() {
232                 return "Closing channel listener for " + resource + " -> " + delegate;
233             }
234         };
235     }
236
237     /**
238      * Get a channel listener which closes the given resource when notified.
239      *
240      * @param delegate the listener to call next
241      * @param resources the resource to close
242      * @return the channel listener
243      */

244     public static <T extends Channel> ChannelListener<T> closingChannelListener(final ChannelListener<T> delegate, final Closeable... resources) {
245         return new ChannelListener<T>() {
246             public void handleEvent(final T channel) {
247                 IoUtils.safeClose(resources);
248                 delegate.handleEvent(channel);
249             }
250
251             public String toString() {
252                 return "Closing channel listener for " + resources.length + " items -> " + delegate;
253             }
254         };
255     }
256
257     /**
258      * Get a channel listener which does nothing.
259      *
260      * @return the null channel listener
261      */

262     public static ChannelListener<Channel> nullChannelListener() {
263         return NULL_LISTENER;
264     }
265
266     /**
267      * Get a channel exception handler which closes the channel upon exception.
268      *
269      * @return the channel exception handler
270      */

271     public static ChannelExceptionHandler<Channel> closingChannelExceptionHandler() {
272         return CLOSING_HANDLER;
273     }
274
275     /**
276      * Create an open listener adapter which automatically accepts connections and invokes an open listener.
277      *
278      * @param openListener the channel open listener
279      * @param <C> the connected channel type
280      * @return a channel accept listener
281      */

282     public static <C extends ConnectedChannel> ChannelListener<AcceptingChannel<C>> openListenerAdapter(final ChannelListener<? super C> openListener) {
283         if (openListener == null) {
284             throw msg.nullParameter("openListener");
285         }
286         return new ChannelListener<AcceptingChannel<C>>() {
287             public void handleEvent(final AcceptingChannel<C> channel) {
288                 try {
289                     final C accepted = channel.accept();
290                     if (accepted != null) {
291                         invokeChannelListener(accepted, openListener);
292                     }
293                 } catch (IOException e) {
294                     listenerMsg.acceptFailed(channel, e);
295                 }
296             }
297
298             public String toString() {
299                 return "Accepting listener for " + openListener;
300             }
301         };
302     }
303
304     /**
305      * Get a setter based on an atomic reference field updater.  Used by channel implementations to avoid having to
306      * define an anonymous class for each listener field.
307      *
308      * @param channel the channel
309      * @param updater the updater
310      * @param <T> the channel type
311      * @param <C> the holding class
312      * @return the setter
313      * @deprecated Not recommended as a security manager will enforce unreasonable restrictions on the updater.
314      */

315     @Deprecated
316     public static <T extends Channel, C> ChannelListener.Setter<T> getSetter(final C channel, final AtomicReferenceFieldUpdater<C, ChannelListener> updater) {
317         return new ChannelListener.Setter<T>() {
318             public void set(final ChannelListener<? super T> channelListener) {
319                 updater.set(channel, channelListener);
320             }
321
322             public String toString() {
323                 return "Atomic reference field updater setter for " + updater;
324             }
325         };
326     }
327
328     /**
329      * Get a setter based on an atomic reference.  Used by channel implementations to avoid having to
330      * define an anonymous class for each listener field.
331      *
332      * @param atomicReference the atomic reference
333      * @param <T> the channel type
334      * @return the setter
335      */

336     public static <T extends Channel> ChannelListener.Setter<T> getSetter(final AtomicReference<ChannelListener<? super T>> atomicReference) {
337         return new ChannelListener.Setter<T>() {
338             public void set(final ChannelListener<? super T> channelListener) {
339                 atomicReference.set(channelListener);
340             }
341
342             public String toString() {
343                 return "Atomic reference setter (currently=" + atomicReference.get() + ")";
344             }
345         };
346     }
347
348     /**
349      * Get a channel listener setter which delegates to the given target setter with a different channel type.
350      *
351      * @param target the target setter
352      * @param realChannel the channel to send in to the listener
353      * @param <T> the real channel type
354      * @return the delegating setter
355      */

356     public static <T extends Channel> ChannelListener.Setter<T> getDelegatingSetter(final ChannelListener.Setter<? extends Channel> target, final T realChannel) {
357         return target == null ? null : new DelegatingSetter<T>(target, realChannel);
358     }
359
360     /**
361      * Get a channel listener setter which does nothing.
362      *
363      * @param <T> the channel type
364      * @return a setter which does nothing
365      */

366     @SuppressWarnings({ "unchecked" })
367     public static <T extends Channel> ChannelListener.Setter<T> nullSetter() {
368         return (ChannelListener.Setter<T>) NULL_SETTER;
369     }
370
371     /**
372      * Get a channel listener which executes a delegate channel listener via an executor.  If an exception occurs
373      * submitting the task, the associated channel is closed.
374      *
375      * @param listener the listener to invoke
376      * @param executor the executor with which to invoke the listener
377      * @param <T> the channel type
378      * @return a delegating channel listener
379      */

380     public static <T extends Channel> ChannelListener<T> executorChannelListener(final ChannelListener<T> listener, final Executor executor) {
381         return new ChannelListener<T>() {
382             public void handleEvent(final T channel) {
383                 try {
384                     executor.execute(getChannelListenerTask(channel, listener));
385                 } catch (RejectedExecutionException e) {
386                     listenerMsg.executorSubmitFailed(e, channel);
387                     IoUtils.safeClose(channel);
388                 }
389             }
390
391             public String toString() {
392                 return "Executor channel listener -> " + listener;
393             }
394         };
395     }
396
397     /**
398      * A flushing channel listener.  Flushes the channel and then calls the delegate listener.  Calls the exception
399      * handler if an exception occurs.  The delegate listener should ensure that the channel write listener is appropriately set.
400      * <p>
401      * The returned listener is stateless and may be reused on any number of channels concurrently or sequentially.
402      *
403      * @param delegate the delegate listener
404      * @param exceptionHandler the exception handler
405      * @param <T> the channel type
406      * @return the flushing channel listener
407      */

408     public static <T extends SuspendableWriteChannel> ChannelListener<T> flushingChannelListener(final ChannelListener<? super T> delegate, final ChannelExceptionHandler<? super T> exceptionHandler) {
409         return new ChannelListener<T>() {
410             public void handleEvent(final T channel) {
411                 final boolean result;
412                 try {
413                     result = channel.flush();
414                 } catch (IOException e) {
415                     channel.suspendWrites();
416                     invokeChannelExceptionHandler(channel, exceptionHandler, e);
417                     return;
418                 }
419                 if (result) {
420                     Channels.setWriteListener(channel, delegate);
421                     invokeChannelListener(channel, delegate);
422                 } else {
423                     Channels.setWriteListener(channel, this);
424                     channel.resumeWrites();
425                 }
426             }
427
428             public String toString() {
429                 return "Flushing channel listener -> " + delegate;
430             }
431         };
432     }
433
434     /**
435      * A write shutdown channel listener.  Shuts down and flushes the channel and calls the delegate listener.  Calls
436      * the exception handler if an exception occurs.  When the delegate listener is called, the channel's write side will be shut down and flushed.
437      * The delegate listener should ensure that the channel write listener is appropriately set.
438      *
439      * @param delegate the delegate listener
440      * @param exceptionHandler the exception handler
441      * @param <T> the channel type
442      * @return the channel listener
443      */

444     public static <T extends SuspendableWriteChannel> ChannelListener<T> writeShutdownChannelListener(final ChannelListener<? super T> delegate, final ChannelExceptionHandler<? super T> exceptionHandler) {
445         final ChannelListener<T> flushingListener = flushingChannelListener(delegate, exceptionHandler);
446         return new ChannelListener<T>() {
447             public void handleEvent(final T channel) {
448                 try {
449                     channel.shutdownWrites();
450                 } catch (IOException e) {
451                     invokeChannelExceptionHandler(channel, exceptionHandler, e);
452                     return;
453                 }
454                 invokeChannelListener(channel, flushingListener);
455             }
456
457             public String toString() {
458                 return "Write shutdown channel listener -> " + delegate;
459             }
460         };
461     }
462
463     /**
464      * A writing channel listener.  Writes the buffer to the channel and then calls the delegate listener.  Calls the exception
465      * handler if an exception occurs.  The delegate listener should ensure that the channel write listener is appropriately set.
466      * <p>
467      * The returned listener is stateful and will not execute properly if reused.
468      *
469      * @param pooled the buffer to write
470      * @param delegate the delegate listener
471      * @param exceptionHandler the exception handler
472      * @param <T> the channel type
473      * @return the writing channel listener
474      */

475     public static <T extends StreamSinkChannel> ChannelListener<T> writingChannelListener(final Pooled<ByteBuffer> pooled, final ChannelListener<? super T> delegate, final ChannelExceptionHandler<? super T> exceptionHandler) {
476         return new ChannelListener<T>() {
477             public void handleEvent(final T channel) {
478                 final ByteBuffer buffer = pooled.getResource();
479                 int result;
480                 boolean ok = false;
481                 do {
482                     try {
483                         result = channel.write(buffer);
484                         ok = true;
485                     } catch (IOException e) {
486                         channel.suspendWrites();
487                         pooled.free();
488                         invokeChannelExceptionHandler(channel, exceptionHandler, e);
489                         return;
490                     } finally {
491                         if (! ok) {
492                             pooled.free();
493                         }
494                     }
495                     if (result == 0) {
496                         Channels.setWriteListener(channel, this);
497                         channel.resumeWrites();
498                         return;
499                     }
500                 } while (buffer.hasRemaining());
501                 pooled.free();
502                 invokeChannelListener(channel, delegate);
503             }
504
505             public String toString() {
506                 return "Writing channel listener -> " + delegate;
507             }
508         };
509     }
510
511     /**
512      * A sending channel listener.  Writes the buffer to the channel and then calls the delegate listener.  Calls the exception
513      * handler if an exception occurs.  The delegate listener should ensure that the channel write listener is appropriately set.
514      * <p>
515      * The returned listener is stateful and will not execute properly if reused.
516      *
517      * @param pooled the buffer to send
518      * @param delegate the delegate listener
519      * @param exceptionHandler the exception handler
520      * @param <T> the channel type
521      * @return the sending channel listener
522      */

523     public static <T extends WritableMessageChannel> ChannelListener<T> sendingChannelListener(final Pooled<ByteBuffer> pooled, final ChannelListener<? super T> delegate, final ChannelExceptionHandler<? super T> exceptionHandler) {
524         return new ChannelListener<T>() {
525             public void handleEvent(final T channel) {
526                 final ByteBuffer buffer = pooled.getResource();
527                 boolean free = true;
528                 try {
529                     if (! (free = channel.send(buffer))) {
530                         Channels.setWriteListener(channel, this);
531                         channel.resumeWrites();
532                         return;
533                     }
534                 } catch (IOException e) {
535                     channel.suspendWrites();
536                     pooled.free();
537                     invokeChannelExceptionHandler(channel, exceptionHandler, e);
538                     return;
539                 } finally {
540                     if (free) pooled.free();
541                 }
542                 invokeChannelListener(channel, delegate);
543             }
544
545             public String toString() {
546                 return "Sending channel listener -> " + delegate;
547             }
548         };
549     }
550
551     /**
552      * A file-sending channel listener.  Writes the file to the channel and then calls the delegate listener.  Calls the exception
553      * handler if an exception occurs.  The delegate listener should ensure that the channel write listener is appropriately set.
554      * <p>
555      * The returned listener is stateful and will not execute properly if reused.
556      *
557      * @param source the file to read from
558      * @param position the position in the source file to read from
559      * @param count the number of bytes to read
560      * @param delegate the listener to call when the file is sent
561      * @param exceptionHandler the exception handler to call if a problem occurs
562      * @param <T> the channel type
563      * @return the channel listener
564      */

565     public static <T extends StreamSinkChannel> ChannelListener<T> fileSendingChannelListener(final FileChannel source, final long position, final long count, final ChannelListener<? super T> delegate, final ChannelExceptionHandler<? super T> exceptionHandler) {
566         if (count == 0L) {
567             return delegatingChannelListener(delegate);
568         }
569         return new ChannelListener<T>() {
570             private long p = position;
571             private long cnt = count;
572
573             public void handleEvent(final T channel) {
574                 long result;
575                 long cnt = this.cnt;
576                 long p = this.p;
577                 try {
578                     do {
579                         try {
580                             result = channel.transferFrom(source, p, cnt);
581                         } catch (IOException e) {
582                             invokeChannelExceptionHandler(channel, exceptionHandler, e);
583                             return;
584                         }
585                         if (result == 0L) {
586                             Channels.setWriteListener(channel, this);
587                             channel.resumeWrites();
588                             return;
589                         }
590                         p += result;
591                         cnt -= result;
592                     } while (cnt > 0L);
593                     // cnt is 0
594                     invokeChannelListener(channel, delegate);
595                     return;
596                 } finally {
597                     this.p = p;
598                     this.cnt = cnt;
599                 }
600             }
601
602             public String toString() {
603                 return "File sending channel listener (" + source + ") -> " + delegate;
604             }
605         };
606     }
607
608     /**
609      * A file-receiving channel listener.  Writes the file from the channel and then calls the delegate listener.  Calls the exception
610      * handler if an exception occurs.  The delegate listener should ensure that the channel read listener is appropriately set.
611      * <p>
612      * The returned listener is stateful and will not execute properly if reused.
613      *
614      * @param target the file to write to
615      * @param position the position in the target file to write to
616      * @param count the number of bytes to write
617      * @param delegate the listener to call when the file is sent
618      * @param exceptionHandler the exception handler to call if a problem occurs
619      * @param <T> the channel type
620      * @return the channel listener
621      */

622     public static <T extends StreamSourceChannel> ChannelListener<T> fileReceivingChannelListener(final FileChannel target, final long position, final long count, final ChannelListener<? super T> delegate, final ChannelExceptionHandler<? super T> exceptionHandler) {
623         if (count == 0L) {
624             return delegatingChannelListener(delegate);
625         }
626         return new ChannelListener<T>() {
627             private long p = position;
628             private long cnt = count;
629
630             public void handleEvent(final T channel) {
631                 long result;
632                 long cnt = this.cnt;
633                 long p = this.p;
634                 try {
635                     do {
636                         try {
637                             result = channel.transferTo(p, cnt, target);
638                         } catch (IOException e) {
639                             invokeChannelExceptionHandler(channel, exceptionHandler, e);
640                             return;
641                         }
642                         if (result == 0L) {
643                             Channels.setReadListener(channel, this);
644                             channel.resumeReads();
645                             return;
646                         }
647                         p += result;
648                         cnt -= result;
649                     } while (cnt > 0L);
650                     // cnt = 0
651                     invokeChannelListener(channel, delegate);
652                     return;
653                 } finally {
654                     this.p = p;
655                     this.cnt = cnt;
656                 }
657             }
658
659             public String toString() {
660                 return "File receiving channel listener (" + target + ") -> " + delegate;
661             }
662         };
663     }
664
665     /**
666      * A delegating channel listener which passes an event to another listener of the same or a super type.
667      *
668      * @param delegate the delegate channel listener
669      * @param <T> the channel type
670      * @return the listener
671      */

672     public static <T extends Channel> ChannelListener<T> delegatingChannelListener(final ChannelListener<? super T> delegate) {
673         return new ChannelListener<T>() {
674             public void handleEvent(final T channel) {
675                 invokeChannelListener(channel, delegate);
676             }
677
678             public String toString() {
679                 return "Delegating channel listener -> " + delegate;
680             }
681         };
682     }
683
684     /**
685      * A delegating channel listener which passes an event to the listener stored in the given setter.
686      *
687      * @param channel the channel to pass in
688      * @param setter the channel listener setter
689      * @param <C> the listener channel type
690      * @param <T> the passed in channel type
691      * @return the listener
692      */

693     public static <C extends Channel, T extends Channel> ChannelListener<C> delegatingChannelListener(final T channel, final ChannelListener.SimpleSetter<T> setter) {
694         return new SetterDelegatingListener<C, T>(setter, channel);
695     }
696
697     /**
698      * A write-suspending channel listener.  The returned listener will suspend writes when called.  Useful for chaining
699      * writing listeners to a flush listener to this listener. The delegate listener should ensure that the channel write listener is appropriately set.
700      *
701      * @param delegate the delegate channel listener
702      * @return the suspending channel listener
703      */

704     public static <T extends SuspendableWriteChannel> ChannelListener<T> writeSuspendingChannelListener(final ChannelListener<? super T> delegate) {
705         return new ChannelListener<T>() {
706             public void handleEvent(final T channel) {
707                 channel.suspendWrites();
708                 invokeChannelListener(channel, delegate);
709             }
710
711             public String toString() {
712                 return "Write-suspending channel listener -> " + delegate;
713             }
714         };
715     }
716
717     /**
718      * A read-suspending channel listener.  The returned listener will suspend read when called.
719      * The delegate listener should ensure that the channel read listener is appropriately set.
720      *
721      * @param delegate the delegate channel listener
722      * @return the suspending channel listener
723      */

724     public static <T extends SuspendableReadChannel> ChannelListener<T> readSuspendingChannelListener(final ChannelListener<? super T> delegate) {
725         return new ChannelListener<T>() {
726             public void handleEvent(final T channel) {
727                 channel.suspendReads();
728                 invokeChannelListener(channel, delegate);
729             }
730
731             public String toString() {
732                 return "Read-suspending channel listener -> " + delegate;
733             }
734         };
735     }
736
737     static final class TransferListener<I extends StreamSourceChannel, O extends StreamSinkChannel> implements ChannelListener<Channel> {
738         private final Pooled<ByteBuffer> pooledBuffer;
739         private final I source;
740         private final O sink;
741         private final ChannelListener<? super I> sourceListener;
742         private final ChannelListener<? super O> sinkListener;
743         private final ChannelExceptionHandler<? super O> writeExceptionHandler;
744         private final ChannelExceptionHandler<? super I> readExceptionHandler;
745         private long count;
746         private volatile int state;
747
748         TransferListener(final long count, final Pooled<ByteBuffer> pooledBuffer, final I source, final O sink, final ChannelListener<? super I> sourceListener, final ChannelListener<? super O> sinkListener, final ChannelExceptionHandler<? super O> writeExceptionHandler, final ChannelExceptionHandler<? super I> readExceptionHandler, final int state) {
749             this.count = count;
750             this.pooledBuffer = pooledBuffer;
751             this.source = source;
752             this.sink = sink;
753             this.sourceListener = sourceListener;
754             this.sinkListener = sinkListener;
755             this.writeExceptionHandler = writeExceptionHandler;
756             this.readExceptionHandler = readExceptionHandler;
757             this.state = state;
758         }
759
760         public void handleEvent(final Channel channel) {
761             final ByteBuffer buffer = pooledBuffer.getResource();
762             int state = this.state;
763             // always read after and write before state
764             long count = this.count;
765             long lres;
766             int ires;
767
768             switch (state) {
769                 case 0: {
770                     // read listener
771                     for (;;) {
772                         try {
773                             lres = source.transferTo(count, buffer, sink);
774                         } catch (IOException e) {
775                             readFailed(e);
776                             return;
777                         }
778                         if (lres == 0 && !buffer.hasRemaining()) {
779                             this.count = count;
780                             return;
781                         }
782                         if (lres == -1) {
783                             // possibly unexpected EOF
784                             if (count == Long.MAX_VALUE) {
785                                 // it's OK; just be done
786                                 done();
787                                 return;
788                             } else {
789                                 readFailed(new EOFException());
790                                 return;
791                             }
792                         }
793                         if (count != Long.MAX_VALUE) {
794                             count -= lres;
795                         }
796                         while (buffer.hasRemaining()) {
797                             try {
798                                 ires = sink.write(buffer);
799                             } catch (IOException e) {
800                                 writeFailed(e);
801                                 return;
802                             }
803                             if (count != Long.MAX_VALUE) {
804                                 count -= ires;
805                             }
806                             if (ires == 0) {
807                                 this.count = count;
808                                 this.state = 1;
809                                 source.suspendReads();
810                                 sink.resumeWrites();
811                                 return;
812                             }
813                         }
814
815                         if (count == 0) {
816                             done();
817                             return;
818                         }
819                     }
820                 }
821                 case 1: {
822                     // write listener
823                     for (;;) {
824                         while (buffer.hasRemaining()) {
825                             try {
826                                 ires = sink.write(buffer);
827                             } catch (IOException e) {
828                                 writeFailed(e);
829                                 return;
830                             }
831                             if (count != Long.MAX_VALUE) {
832                                 count -= ires;
833                             }
834                             if (ires == 0) {
835                                 return;
836                             }
837                         }
838                         try {
839                             lres = source.transferTo(count, buffer, sink);
840                         } catch (IOException e) {
841                             readFailed(e);
842                             return;
843                         }
844                         if (lres == 0 && !buffer.hasRemaining()) {
845                             this.count = count;
846                             this.state = 0;
847                             sink.suspendWrites();
848                             source.resumeReads();
849                             return;
850                         }
851                         if (lres == -1) {
852                             // possibly unexpected EOF
853                             if (count == Long.MAX_VALUE) {
854                                 // it's OK; just be done
855                                 done();
856                                 return;
857                             } else {
858                                 readFailed(new EOFException());
859                                 return;
860                             }
861                         }
862                         if (count != Long.MAX_VALUE) {
863                             count -= lres;
864                         }
865
866                         if (count == 0) {
867                             done();
868                             return;
869                         }
870                     }
871                 }
872             }
873
874         }
875
876         private void writeFailed(final IOException e) {
877             try {
878                 source.suspendReads();
879                 sink.suspendWrites();
880                 invokeChannelExceptionHandler(sink, writeExceptionHandler, e);
881             } finally {
882                 pooledBuffer.free();
883             }
884         }
885
886         private void readFailed(final IOException e) {
887             try {
888                 source.suspendReads();
889                 sink.suspendWrites();
890                 invokeChannelExceptionHandler(source, readExceptionHandler, e);
891             } finally {
892                 pooledBuffer.free();
893             }
894         }
895
896         private void done() {
897             try {
898                 final ChannelListener<? super I> sourceListener = this.sourceListener;
899                 final ChannelListener<? super O> sinkListener = this.sinkListener;
900                 final I source = this.source;
901                 final O sink = this.sink;
902
903                 Channels.setReadListener(source, sourceListener);
904                 if (sourceListener == null) {
905                     source.suspendReads();
906                 } else {
907                     source.wakeupReads();
908                 }
909
910                 Channels.setWriteListener(sink, sinkListener);
911                 if (sinkListener == null) {
912                     sink.suspendWrites();
913                 } else {
914                     sink.wakeupWrites();
915                 }
916             } finally {
917                 pooledBuffer.free();
918             }
919         }
920
921         public String toString() {
922             return "Transfer channel listener (" + source + " to " + sink + ") -> (" + sourceListener + " and " + sinkListener + ")";
923         }
924     }
925
926     /**
927      * Initiate a low-copy transfer between two stream channels.  The pool should be a direct buffer pool for best
928      * performance.  The channels will be closed when the transfer completes or if there is an error.
929      *
930      * @param source the source channel
931      * @param sink the target channel
932      * @param pool the pool from which the transfer buffer should be allocated
933      * @param <I> the source stream type
934      * @param <O> the sink stream type
935      */

936     public static <I extends StreamSourceChannel, O extends StreamSinkChannel> void initiateTransfer(final I source, final O sink, Pool<ByteBuffer> pool) {
937         initiateTransfer(Long.MAX_VALUE, source, sink, CLOSING_CHANNEL_LISTENER, CLOSING_CHANNEL_LISTENER, CLOSING_HANDLER, CLOSING_HANDLER, pool);
938     }
939
940     /**
941      * Initiate a low-copy transfer between two stream channels.  The pool should be a direct buffer pool for best
942      * performance.
943      *
944      * @param count the number of bytes to transfer, or {@link Long#MAX_VALUE} to transfer all remaining bytes
945      * @param source the source channel
946      * @param sink the target channel
947      * @param sourceListener the source listener to set and call when the transfer is complete, or {@code null} to clear the listener at that time
948      * @param sinkListener the target listener to set and call when the transfer is complete, or {@code null} to clear the listener at that time
949      * @param readExceptionHandler the read exception handler to call if an error occurs during a read operation
950      * @param writeExceptionHandler the write exception handler to call if an error occurs during a write operation
951      * @param pool the pool from which the transfer buffer should be allocated
952      */

953     public static <I extends StreamSourceChannel, O extends StreamSinkChannel> void initiateTransfer(long count, final I source, final O sink, final ChannelListener<? super I> sourceListener, final ChannelListener<? super O> sinkListener, final ChannelExceptionHandler<? super I> readExceptionHandler, final ChannelExceptionHandler<? super O> writeExceptionHandler, Pool<ByteBuffer> pool) {
954         if (pool == null) {
955             throw msg.nullParameter("pool");
956         }
957         final Pooled<ByteBuffer> allocated = pool.allocate();
958         boolean free = true;
959         try {
960             final ByteBuffer buffer = allocated.getResource();
961             long transferred;
962             for(;;) {
963                 try {
964                     transferred = source.transferTo(count, buffer, sink);
965                 } catch (IOException e) {
966                     invokeChannelExceptionHandler(source, readExceptionHandler, e);
967                     return;
968                 }
969                 if (transferred == 0 && !buffer.hasRemaining()) {
970                     break;
971                 }
972                 if (transferred == -1) {
973                     if (count == Long.MAX_VALUE) {
974                         Channels.setReadListener(source, sourceListener);
975                         if (sourceListener == null) {
976                             source.suspendReads();
977                         } else {
978                             source.wakeupReads();
979                         }
980
981                         Channels.setWriteListener(sink, sinkListener);
982                         if (sinkListener == null) {
983                             sink.suspendWrites();
984                         } else {
985                             sink.wakeupWrites();
986                         }
987                     } else {
988                         source.suspendReads();
989                         sink.suspendWrites();
990                         invokeChannelExceptionHandler(source, readExceptionHandler, new EOFException());
991                     }
992                     return;
993                 }
994                 if (count != Long.MAX_VALUE) {
995                     count -= transferred;
996                 }
997                 while (buffer.hasRemaining()) {
998                     final int res;
999                     try {
1000                         res = sink.write(buffer);
1001                     } catch (IOException e) {
1002                         invokeChannelExceptionHandler(sink, writeExceptionHandler, e);
1003                         return;
1004                     }
1005                     if (res == 0) {
1006                         // write first listener
1007                         final TransferListener<I, O> listener = new TransferListener<I, O>(count, allocated, source, sink, sourceListener, sinkListener, writeExceptionHandler, readExceptionHandler, 1);
1008                         source.suspendReads();
1009                         source.getReadSetter().set(listener);
1010                         sink.getWriteSetter().set(listener);
1011                         sink.resumeWrites();
1012                         free = false;
1013                         return;
1014                     } else if (count != Long.MAX_VALUE) {
1015                         count -= res;
1016                     }
1017                 }
1018                 if (count == 0) {
1019                     //we are done
1020                     Channels.setReadListener(source, sourceListener);
1021                     if (sourceListener == null) {
1022                         source.suspendReads();
1023                     } else {
1024                         source.wakeupReads();
1025                     }
1026
1027                     Channels.setWriteListener(sink, sinkListener);
1028                     if (sinkListener == null) {
1029                         sink.suspendWrites();
1030                     } else {
1031                         sink.wakeupWrites();
1032                     }
1033                     return;
1034                 }
1035             }
1036             // read first listener
1037             final TransferListener<I, O> listener = new TransferListener<I, O>(count, allocated, source, sink, sourceListener, sinkListener, writeExceptionHandler, readExceptionHandler, 0);
1038             sink.suspendWrites();
1039             sink.getWriteSetter().set(listener);
1040             source.getReadSetter().set(listener);
1041             source.resumeReads();
1042             free = false;
1043             return;
1044         } finally {
1045             if (free) allocated.free();
1046         }
1047     }
1048
1049     /**
1050      * Create a channel listener which automatically drains the given number of bytes from the channel and then calls
1051      * a listener.
1052      *
1053      * @param bytes the number of bytes to drain, or {@code Long.MAX_VALUE} to drain the channel completely
1054      * @param finishListener the listener to call when the drain is complete
1055      * @param exceptionHandler the handler to call if the drain fails
1056      * @param <T> the channel type
1057      * @return the channel listener
1058      */

1059     public static <T extends StreamSourceChannel> ChannelListener<T> drainListener(long bytes, ChannelListener<? super T> finishListener, ChannelExceptionHandler<? super T> exceptionHandler) {
1060         return new DrainListener<T>(finishListener, exceptionHandler, bytes);
1061     }
1062
1063     private static class DelegatingSetter<T extends Channel> implements ChannelListener.Setter<T> {
1064         private final ChannelListener.Setter<? extends Channel> setter;
1065         private final T realChannel;
1066
1067         DelegatingSetter(final ChannelListener.Setter<? extends Channel> setter, final T realChannel) {
1068             this.setter = setter;
1069             this.realChannel = realChannel;
1070         }
1071
1072         public void set(final ChannelListener<? super T> channelListener) {
1073             setter.set(channelListener == null ? null : new DelegatingChannelListener<T>(channelListener, realChannel));
1074         }
1075
1076         public String toString() {
1077             return "Delegating setter -> " + setter;
1078         }
1079     }
1080
1081     private static class DelegatingChannelListener<T extends Channel> implements ChannelListener<Channel> {
1082
1083         private final ChannelListener<? super T> channelListener;
1084         private final T realChannel;
1085
1086         public DelegatingChannelListener(final ChannelListener<? super T> channelListener, final T realChannel) {
1087             this.channelListener = channelListener;
1088             this.realChannel = realChannel;
1089         }
1090
1091         public void handleEvent(final Channel channel) {
1092             invokeChannelListener(realChannel, channelListener);
1093         }
1094
1095         public String toString() {
1096             return "Delegating channel listener -> " + channelListener;
1097         }
1098     }
1099
1100     private static class SetterDelegatingListener<C extends Channel, T extends Channel> implements ChannelListener<C> {
1101
1102         private final SimpleSetter<T> setter;
1103         private final T channel;
1104
1105         public SetterDelegatingListener(final SimpleSetter<T> setter, final T channel) {
1106             this.setter = setter;
1107             this.channel = channel;
1108         }
1109
1110         public void handleEvent(final C channel) {
1111             invokeChannelListener(this.channel, setter.get());
1112         }
1113
1114         public String toString() {
1115             return "Setter delegating channel listener -> " + setter;
1116         }
1117     }
1118
1119     private static final ChannelExceptionHandler<Channel> CLOSING_HANDLER = new ChannelExceptionHandler<Channel>() {
1120         public void handleException(final Channel channel, final IOException exception) {
1121             IoUtils.safeClose(channel);
1122         }
1123     };
1124
1125     private static class DrainListener<T extends StreamSourceChannel> implements ChannelListener<T> {
1126         private final ChannelListener<? super T> finishListener;
1127         private final ChannelExceptionHandler<? super T> exceptionHandler;
1128         private long count;
1129
1130         private DrainListener(final ChannelListener<? super T> finishListener, final ChannelExceptionHandler<? super T> exceptionHandler, final long count) {
1131             this.finishListener = finishListener;
1132             this.exceptionHandler = exceptionHandler;
1133             this.count = count;
1134         }
1135
1136         public void handleEvent(final T channel) {
1137             try {
1138                 long count = this.count;
1139                 try {
1140                     long res;
1141                     for (;;) {
1142                         res = Channels.drain(channel, count);
1143                         if (res == -1 || res == count) {
1144                             this.count = 0L;
1145                             invokeChannelListener(channel, finishListener);
1146                             return;
1147                         } else if (res == 0) {
1148                             return;
1149                         } else if (count < Long.MAX_VALUE) {
1150                             // MAX_VALUE means drain to EOF
1151                             count -= res;
1152                         }
1153                     }
1154                 } finally {
1155                     this.count = count;
1156                 }
1157             } catch (IOException e) {
1158                 this.count = 0L;
1159                 if (exceptionHandler != null) {
1160                     invokeChannelExceptionHandler(channel, exceptionHandler, e);
1161                 } else {
1162                     IoUtils.safeShutdownReads(channel);
1163                 }
1164             }
1165         }
1166
1167         public String toString() {
1168             return "Draining channel listener (" + count + " bytes) -> " + finishListener;
1169         }
1170     }
1171 }
1172