1
19
20 package org.xnio;
21
22 import java.io.Closeable;
23 import java.io.EOFException;
24 import java.io.IOException;
25 import java.nio.ByteBuffer;
26 import java.nio.channels.Channel;
27 import java.nio.channels.FileChannel;
28 import java.util.concurrent.Executor;
29 import java.util.concurrent.RejectedExecutionException;
30 import java.util.concurrent.atomic.AtomicReference;
31 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
32 import org.xnio.channels.AcceptingChannel;
33 import org.xnio.channels.Channels;
34 import org.xnio.channels.ConnectedChannel;
35 import org.xnio.channels.StreamSinkChannel;
36 import org.xnio.channels.StreamSourceChannel;
37 import org.xnio.channels.SuspendableReadChannel;
38 import org.xnio.channels.SuspendableWriteChannel;
39 import org.xnio.channels.WritableMessageChannel;
40
41 import static org.xnio._private.Messages.listenerMsg;
42 import static org.xnio._private.Messages.msg;
43
44
49 @SuppressWarnings("unused")
50 public final class ChannelListeners {
51
52 private static final ChannelListener<Channel> NULL_LISTENER = new ChannelListener<Channel>() {
53 public void handleEvent(final Channel channel) {
54 }
55
56 public String toString() {
57 return "Null channel listener";
58 }
59 };
60 private static final ChannelListener.Setter<?> NULL_SETTER = new ChannelListener.Setter<Channel>() {
61 public void set(final ChannelListener<? super Channel> channelListener) {
62 }
63
64 public String toString() {
65 return "Null channel listener setter";
66 }
67 };
68 private static ChannelListener<Channel> CLOSING_CHANNEL_LISTENER = new ChannelListener<Channel>() {
69 public void handleEvent(final Channel channel) {
70 IoUtils.safeClose(channel);
71 }
72
73 public String toString() {
74 return "Closing channel listener";
75 }
76 };
77
78 private ChannelListeners() {
79 }
80
81
89 public static <T extends Channel> boolean invokeChannelListener(T channel, ChannelListener<? super T> channelListener) {
90 if (channelListener != null) try {
91 listenerMsg.tracef("Invoking listener %s on channel %s", channelListener, channel);
92 channelListener.handleEvent(channel);
93 } catch (Throwable t) {
94 listenerMsg.listenerException(t);
95 return false;
96 }
97 return true;
98 }
99
100
108 public static <T extends Channel> void invokeChannelListener(Executor executor, T channel, ChannelListener<? super T> channelListener) {
109 try {
110 executor.execute(getChannelListenerTask(channel, channelListener));
111 } catch (RejectedExecutionException ree) {
112 invokeChannelListener(channel, channelListener);
113 }
114 }
115
116
124 public static <T extends Channel> void invokeChannelExceptionHandler(final T channel, final ChannelExceptionHandler<? super T> exceptionHandler, final IOException exception) {
125 try {
126 exceptionHandler.handleException(channel, exception);
127 } catch (Throwable t) {
128 listenerMsg.exceptionHandlerException(t);
129 }
130 }
131
132
140 public static <T extends Channel> Runnable getChannelListenerTask(final T channel, final ChannelListener<? super T> channelListener) {
141 return new Runnable() {
142 public String toString() {
143 return "Channel listener task for " + channel + " -> " + channelListener;
144 }
145
146 public void run() {
147 invokeChannelListener(channel, channelListener);
148 }
149 };
150 }
151
152
160 public static <T extends Channel> Runnable getChannelListenerTask(final T channel, final ChannelListener.SimpleSetter<T> setter) {
161 return new Runnable() {
162 public String toString() {
163 return "Channel listener task for " + channel + " -> " + setter;
164 }
165
166 public void run() {
167 invokeChannelListener(channel, setter.get());
168 }
169 };
170 }
171
172
177 public static ChannelListener<Channel> closingChannelListener() {
178 return CLOSING_CHANNEL_LISTENER;
179 }
180
181
187 public static ChannelListener<Channel> closingChannelListener(final Closeable resource) {
188 return new ChannelListener<Channel>() {
189 public void handleEvent(final Channel channel) {
190 IoUtils.safeClose(resource);
191 }
192
193 public String toString() {
194 return "Closing channel listener for " + resource;
195 }
196 };
197 }
198
199
205 public static ChannelListener<Channel> closingChannelListener(final Closeable... resources) {
206 return new ChannelListener<Channel>() {
207 public void handleEvent(final Channel channel) {
208 IoUtils.safeClose(resources);
209 }
210
211 public String toString() {
212 return "Closing channel listener for " + resources.length + " items";
213 }
214 };
215 }
216
217
224 public static <T extends Channel> ChannelListener<T> closingChannelListener(final ChannelListener<T> delegate, final Closeable resource) {
225 return new ChannelListener<T>() {
226 public void handleEvent(final T channel) {
227 IoUtils.safeClose(resource);
228 delegate.handleEvent(channel);
229 }
230
231 public String toString() {
232 return "Closing channel listener for " + resource + " -> " + delegate;
233 }
234 };
235 }
236
237
244 public static <T extends Channel> ChannelListener<T> closingChannelListener(final ChannelListener<T> delegate, final Closeable... resources) {
245 return new ChannelListener<T>() {
246 public void handleEvent(final T channel) {
247 IoUtils.safeClose(resources);
248 delegate.handleEvent(channel);
249 }
250
251 public String toString() {
252 return "Closing channel listener for " + resources.length + " items -> " + delegate;
253 }
254 };
255 }
256
257
262 public static ChannelListener<Channel> nullChannelListener() {
263 return NULL_LISTENER;
264 }
265
266
271 public static ChannelExceptionHandler<Channel> closingChannelExceptionHandler() {
272 return CLOSING_HANDLER;
273 }
274
275
282 public static <C extends ConnectedChannel> ChannelListener<AcceptingChannel<C>> openListenerAdapter(final ChannelListener<? super C> openListener) {
283 if (openListener == null) {
284 throw msg.nullParameter("openListener");
285 }
286 return new ChannelListener<AcceptingChannel<C>>() {
287 public void handleEvent(final AcceptingChannel<C> channel) {
288 try {
289 final C accepted = channel.accept();
290 if (accepted != null) {
291 invokeChannelListener(accepted, openListener);
292 }
293 } catch (IOException e) {
294 listenerMsg.acceptFailed(channel, e);
295 }
296 }
297
298 public String toString() {
299 return "Accepting listener for " + openListener;
300 }
301 };
302 }
303
304
315 @Deprecated
316 public static <T extends Channel, C> ChannelListener.Setter<T> getSetter(final C channel, final AtomicReferenceFieldUpdater<C, ChannelListener> updater) {
317 return new ChannelListener.Setter<T>() {
318 public void set(final ChannelListener<? super T> channelListener) {
319 updater.set(channel, channelListener);
320 }
321
322 public String toString() {
323 return "Atomic reference field updater setter for " + updater;
324 }
325 };
326 }
327
328
336 public static <T extends Channel> ChannelListener.Setter<T> getSetter(final AtomicReference<ChannelListener<? super T>> atomicReference) {
337 return new ChannelListener.Setter<T>() {
338 public void set(final ChannelListener<? super T> channelListener) {
339 atomicReference.set(channelListener);
340 }
341
342 public String toString() {
343 return "Atomic reference setter (currently=" + atomicReference.get() + ")";
344 }
345 };
346 }
347
348
356 public static <T extends Channel> ChannelListener.Setter<T> getDelegatingSetter(final ChannelListener.Setter<? extends Channel> target, final T realChannel) {
357 return target == null ? null : new DelegatingSetter<T>(target, realChannel);
358 }
359
360
366 @SuppressWarnings({ "unchecked" })
367 public static <T extends Channel> ChannelListener.Setter<T> nullSetter() {
368 return (ChannelListener.Setter<T>) NULL_SETTER;
369 }
370
371
380 public static <T extends Channel> ChannelListener<T> executorChannelListener(final ChannelListener<T> listener, final Executor executor) {
381 return new ChannelListener<T>() {
382 public void handleEvent(final T channel) {
383 try {
384 executor.execute(getChannelListenerTask(channel, listener));
385 } catch (RejectedExecutionException e) {
386 listenerMsg.executorSubmitFailed(e, channel);
387 IoUtils.safeClose(channel);
388 }
389 }
390
391 public String toString() {
392 return "Executor channel listener -> " + listener;
393 }
394 };
395 }
396
397
408 public static <T extends SuspendableWriteChannel> ChannelListener<T> flushingChannelListener(final ChannelListener<? super T> delegate, final ChannelExceptionHandler<? super T> exceptionHandler) {
409 return new ChannelListener<T>() {
410 public void handleEvent(final T channel) {
411 final boolean result;
412 try {
413 result = channel.flush();
414 } catch (IOException e) {
415 channel.suspendWrites();
416 invokeChannelExceptionHandler(channel, exceptionHandler, e);
417 return;
418 }
419 if (result) {
420 Channels.setWriteListener(channel, delegate);
421 invokeChannelListener(channel, delegate);
422 } else {
423 Channels.setWriteListener(channel, this);
424 channel.resumeWrites();
425 }
426 }
427
428 public String toString() {
429 return "Flushing channel listener -> " + delegate;
430 }
431 };
432 }
433
434
444 public static <T extends SuspendableWriteChannel> ChannelListener<T> writeShutdownChannelListener(final ChannelListener<? super T> delegate, final ChannelExceptionHandler<? super T> exceptionHandler) {
445 final ChannelListener<T> flushingListener = flushingChannelListener(delegate, exceptionHandler);
446 return new ChannelListener<T>() {
447 public void handleEvent(final T channel) {
448 try {
449 channel.shutdownWrites();
450 } catch (IOException e) {
451 invokeChannelExceptionHandler(channel, exceptionHandler, e);
452 return;
453 }
454 invokeChannelListener(channel, flushingListener);
455 }
456
457 public String toString() {
458 return "Write shutdown channel listener -> " + delegate;
459 }
460 };
461 }
462
463
475 public static <T extends StreamSinkChannel> ChannelListener<T> writingChannelListener(final Pooled<ByteBuffer> pooled, final ChannelListener<? super T> delegate, final ChannelExceptionHandler<? super T> exceptionHandler) {
476 return new ChannelListener<T>() {
477 public void handleEvent(final T channel) {
478 final ByteBuffer buffer = pooled.getResource();
479 int result;
480 boolean ok = false;
481 do {
482 try {
483 result = channel.write(buffer);
484 ok = true;
485 } catch (IOException e) {
486 channel.suspendWrites();
487 pooled.free();
488 invokeChannelExceptionHandler(channel, exceptionHandler, e);
489 return;
490 } finally {
491 if (! ok) {
492 pooled.free();
493 }
494 }
495 if (result == 0) {
496 Channels.setWriteListener(channel, this);
497 channel.resumeWrites();
498 return;
499 }
500 } while (buffer.hasRemaining());
501 pooled.free();
502 invokeChannelListener(channel, delegate);
503 }
504
505 public String toString() {
506 return "Writing channel listener -> " + delegate;
507 }
508 };
509 }
510
511
523 public static <T extends WritableMessageChannel> ChannelListener<T> sendingChannelListener(final Pooled<ByteBuffer> pooled, final ChannelListener<? super T> delegate, final ChannelExceptionHandler<? super T> exceptionHandler) {
524 return new ChannelListener<T>() {
525 public void handleEvent(final T channel) {
526 final ByteBuffer buffer = pooled.getResource();
527 boolean free = true;
528 try {
529 if (! (free = channel.send(buffer))) {
530 Channels.setWriteListener(channel, this);
531 channel.resumeWrites();
532 return;
533 }
534 } catch (IOException e) {
535 channel.suspendWrites();
536 pooled.free();
537 invokeChannelExceptionHandler(channel, exceptionHandler, e);
538 return;
539 } finally {
540 if (free) pooled.free();
541 }
542 invokeChannelListener(channel, delegate);
543 }
544
545 public String toString() {
546 return "Sending channel listener -> " + delegate;
547 }
548 };
549 }
550
551
565 public static <T extends StreamSinkChannel> ChannelListener<T> fileSendingChannelListener(final FileChannel source, final long position, final long count, final ChannelListener<? super T> delegate, final ChannelExceptionHandler<? super T> exceptionHandler) {
566 if (count == 0L) {
567 return delegatingChannelListener(delegate);
568 }
569 return new ChannelListener<T>() {
570 private long p = position;
571 private long cnt = count;
572
573 public void handleEvent(final T channel) {
574 long result;
575 long cnt = this.cnt;
576 long p = this.p;
577 try {
578 do {
579 try {
580 result = channel.transferFrom(source, p, cnt);
581 } catch (IOException e) {
582 invokeChannelExceptionHandler(channel, exceptionHandler, e);
583 return;
584 }
585 if (result == 0L) {
586 Channels.setWriteListener(channel, this);
587 channel.resumeWrites();
588 return;
589 }
590 p += result;
591 cnt -= result;
592 } while (cnt > 0L);
593
594 invokeChannelListener(channel, delegate);
595 return;
596 } finally {
597 this.p = p;
598 this.cnt = cnt;
599 }
600 }
601
602 public String toString() {
603 return "File sending channel listener (" + source + ") -> " + delegate;
604 }
605 };
606 }
607
608
622 public static <T extends StreamSourceChannel> ChannelListener<T> fileReceivingChannelListener(final FileChannel target, final long position, final long count, final ChannelListener<? super T> delegate, final ChannelExceptionHandler<? super T> exceptionHandler) {
623 if (count == 0L) {
624 return delegatingChannelListener(delegate);
625 }
626 return new ChannelListener<T>() {
627 private long p = position;
628 private long cnt = count;
629
630 public void handleEvent(final T channel) {
631 long result;
632 long cnt = this.cnt;
633 long p = this.p;
634 try {
635 do {
636 try {
637 result = channel.transferTo(p, cnt, target);
638 } catch (IOException e) {
639 invokeChannelExceptionHandler(channel, exceptionHandler, e);
640 return;
641 }
642 if (result == 0L) {
643 Channels.setReadListener(channel, this);
644 channel.resumeReads();
645 return;
646 }
647 p += result;
648 cnt -= result;
649 } while (cnt > 0L);
650
651 invokeChannelListener(channel, delegate);
652 return;
653 } finally {
654 this.p = p;
655 this.cnt = cnt;
656 }
657 }
658
659 public String toString() {
660 return "File receiving channel listener (" + target + ") -> " + delegate;
661 }
662 };
663 }
664
665
672 public static <T extends Channel> ChannelListener<T> delegatingChannelListener(final ChannelListener<? super T> delegate) {
673 return new ChannelListener<T>() {
674 public void handleEvent(final T channel) {
675 invokeChannelListener(channel, delegate);
676 }
677
678 public String toString() {
679 return "Delegating channel listener -> " + delegate;
680 }
681 };
682 }
683
684
693 public static <C extends Channel, T extends Channel> ChannelListener<C> delegatingChannelListener(final T channel, final ChannelListener.SimpleSetter<T> setter) {
694 return new SetterDelegatingListener<C, T>(setter, channel);
695 }
696
697
704 public static <T extends SuspendableWriteChannel> ChannelListener<T> writeSuspendingChannelListener(final ChannelListener<? super T> delegate) {
705 return new ChannelListener<T>() {
706 public void handleEvent(final T channel) {
707 channel.suspendWrites();
708 invokeChannelListener(channel, delegate);
709 }
710
711 public String toString() {
712 return "Write-suspending channel listener -> " + delegate;
713 }
714 };
715 }
716
717
724 public static <T extends SuspendableReadChannel> ChannelListener<T> readSuspendingChannelListener(final ChannelListener<? super T> delegate) {
725 return new ChannelListener<T>() {
726 public void handleEvent(final T channel) {
727 channel.suspendReads();
728 invokeChannelListener(channel, delegate);
729 }
730
731 public String toString() {
732 return "Read-suspending channel listener -> " + delegate;
733 }
734 };
735 }
736
737 static final class TransferListener<I extends StreamSourceChannel, O extends StreamSinkChannel> implements ChannelListener<Channel> {
738 private final Pooled<ByteBuffer> pooledBuffer;
739 private final I source;
740 private final O sink;
741 private final ChannelListener<? super I> sourceListener;
742 private final ChannelListener<? super O> sinkListener;
743 private final ChannelExceptionHandler<? super O> writeExceptionHandler;
744 private final ChannelExceptionHandler<? super I> readExceptionHandler;
745 private long count;
746 private volatile int state;
747
748 TransferListener(final long count, final Pooled<ByteBuffer> pooledBuffer, final I source, final O sink, final ChannelListener<? super I> sourceListener, final ChannelListener<? super O> sinkListener, final ChannelExceptionHandler<? super O> writeExceptionHandler, final ChannelExceptionHandler<? super I> readExceptionHandler, final int state) {
749 this.count = count;
750 this.pooledBuffer = pooledBuffer;
751 this.source = source;
752 this.sink = sink;
753 this.sourceListener = sourceListener;
754 this.sinkListener = sinkListener;
755 this.writeExceptionHandler = writeExceptionHandler;
756 this.readExceptionHandler = readExceptionHandler;
757 this.state = state;
758 }
759
760 public void handleEvent(final Channel channel) {
761 final ByteBuffer buffer = pooledBuffer.getResource();
762 int state = this.state;
763
764 long count = this.count;
765 long lres;
766 int ires;
767
768 switch (state) {
769 case 0: {
770
771 for (;;) {
772 try {
773 lres = source.transferTo(count, buffer, sink);
774 } catch (IOException e) {
775 readFailed(e);
776 return;
777 }
778 if (lres == 0 && !buffer.hasRemaining()) {
779 this.count = count;
780 return;
781 }
782 if (lres == -1) {
783
784 if (count == Long.MAX_VALUE) {
785
786 done();
787 return;
788 } else {
789 readFailed(new EOFException());
790 return;
791 }
792 }
793 if (count != Long.MAX_VALUE) {
794 count -= lres;
795 }
796 while (buffer.hasRemaining()) {
797 try {
798 ires = sink.write(buffer);
799 } catch (IOException e) {
800 writeFailed(e);
801 return;
802 }
803 if (count != Long.MAX_VALUE) {
804 count -= ires;
805 }
806 if (ires == 0) {
807 this.count = count;
808 this.state = 1;
809 source.suspendReads();
810 sink.resumeWrites();
811 return;
812 }
813 }
814
815 if (count == 0) {
816 done();
817 return;
818 }
819 }
820 }
821 case 1: {
822
823 for (;;) {
824 while (buffer.hasRemaining()) {
825 try {
826 ires = sink.write(buffer);
827 } catch (IOException e) {
828 writeFailed(e);
829 return;
830 }
831 if (count != Long.MAX_VALUE) {
832 count -= ires;
833 }
834 if (ires == 0) {
835 return;
836 }
837 }
838 try {
839 lres = source.transferTo(count, buffer, sink);
840 } catch (IOException e) {
841 readFailed(e);
842 return;
843 }
844 if (lres == 0 && !buffer.hasRemaining()) {
845 this.count = count;
846 this.state = 0;
847 sink.suspendWrites();
848 source.resumeReads();
849 return;
850 }
851 if (lres == -1) {
852
853 if (count == Long.MAX_VALUE) {
854
855 done();
856 return;
857 } else {
858 readFailed(new EOFException());
859 return;
860 }
861 }
862 if (count != Long.MAX_VALUE) {
863 count -= lres;
864 }
865
866 if (count == 0) {
867 done();
868 return;
869 }
870 }
871 }
872 }
873
874 }
875
876 private void writeFailed(final IOException e) {
877 try {
878 source.suspendReads();
879 sink.suspendWrites();
880 invokeChannelExceptionHandler(sink, writeExceptionHandler, e);
881 } finally {
882 pooledBuffer.free();
883 }
884 }
885
886 private void readFailed(final IOException e) {
887 try {
888 source.suspendReads();
889 sink.suspendWrites();
890 invokeChannelExceptionHandler(source, readExceptionHandler, e);
891 } finally {
892 pooledBuffer.free();
893 }
894 }
895
896 private void done() {
897 try {
898 final ChannelListener<? super I> sourceListener = this.sourceListener;
899 final ChannelListener<? super O> sinkListener = this.sinkListener;
900 final I source = this.source;
901 final O sink = this.sink;
902
903 Channels.setReadListener(source, sourceListener);
904 if (sourceListener == null) {
905 source.suspendReads();
906 } else {
907 source.wakeupReads();
908 }
909
910 Channels.setWriteListener(sink, sinkListener);
911 if (sinkListener == null) {
912 sink.suspendWrites();
913 } else {
914 sink.wakeupWrites();
915 }
916 } finally {
917 pooledBuffer.free();
918 }
919 }
920
921 public String toString() {
922 return "Transfer channel listener (" + source + " to " + sink + ") -> (" + sourceListener + " and " + sinkListener + ")";
923 }
924 }
925
926
936 public static <I extends StreamSourceChannel, O extends StreamSinkChannel> void initiateTransfer(final I source, final O sink, Pool<ByteBuffer> pool) {
937 initiateTransfer(Long.MAX_VALUE, source, sink, CLOSING_CHANNEL_LISTENER, CLOSING_CHANNEL_LISTENER, CLOSING_HANDLER, CLOSING_HANDLER, pool);
938 }
939
940
953 public static <I extends StreamSourceChannel, O extends StreamSinkChannel> void initiateTransfer(long count, final I source, final O sink, final ChannelListener<? super I> sourceListener, final ChannelListener<? super O> sinkListener, final ChannelExceptionHandler<? super I> readExceptionHandler, final ChannelExceptionHandler<? super O> writeExceptionHandler, Pool<ByteBuffer> pool) {
954 if (pool == null) {
955 throw msg.nullParameter("pool");
956 }
957 final Pooled<ByteBuffer> allocated = pool.allocate();
958 boolean free = true;
959 try {
960 final ByteBuffer buffer = allocated.getResource();
961 long transferred;
962 for(;;) {
963 try {
964 transferred = source.transferTo(count, buffer, sink);
965 } catch (IOException e) {
966 invokeChannelExceptionHandler(source, readExceptionHandler, e);
967 return;
968 }
969 if (transferred == 0 && !buffer.hasRemaining()) {
970 break;
971 }
972 if (transferred == -1) {
973 if (count == Long.MAX_VALUE) {
974 Channels.setReadListener(source, sourceListener);
975 if (sourceListener == null) {
976 source.suspendReads();
977 } else {
978 source.wakeupReads();
979 }
980
981 Channels.setWriteListener(sink, sinkListener);
982 if (sinkListener == null) {
983 sink.suspendWrites();
984 } else {
985 sink.wakeupWrites();
986 }
987 } else {
988 source.suspendReads();
989 sink.suspendWrites();
990 invokeChannelExceptionHandler(source, readExceptionHandler, new EOFException());
991 }
992 return;
993 }
994 if (count != Long.MAX_VALUE) {
995 count -= transferred;
996 }
997 while (buffer.hasRemaining()) {
998 final int res;
999 try {
1000 res = sink.write(buffer);
1001 } catch (IOException e) {
1002 invokeChannelExceptionHandler(sink, writeExceptionHandler, e);
1003 return;
1004 }
1005 if (res == 0) {
1006
1007 final TransferListener<I, O> listener = new TransferListener<I, O>(count, allocated, source, sink, sourceListener, sinkListener, writeExceptionHandler, readExceptionHandler, 1);
1008 source.suspendReads();
1009 source.getReadSetter().set(listener);
1010 sink.getWriteSetter().set(listener);
1011 sink.resumeWrites();
1012 free = false;
1013 return;
1014 } else if (count != Long.MAX_VALUE) {
1015 count -= res;
1016 }
1017 }
1018 if (count == 0) {
1019
1020 Channels.setReadListener(source, sourceListener);
1021 if (sourceListener == null) {
1022 source.suspendReads();
1023 } else {
1024 source.wakeupReads();
1025 }
1026
1027 Channels.setWriteListener(sink, sinkListener);
1028 if (sinkListener == null) {
1029 sink.suspendWrites();
1030 } else {
1031 sink.wakeupWrites();
1032 }
1033 return;
1034 }
1035 }
1036
1037 final TransferListener<I, O> listener = new TransferListener<I, O>(count, allocated, source, sink, sourceListener, sinkListener, writeExceptionHandler, readExceptionHandler, 0);
1038 sink.suspendWrites();
1039 sink.getWriteSetter().set(listener);
1040 source.getReadSetter().set(listener);
1041 source.resumeReads();
1042 free = false;
1043 return;
1044 } finally {
1045 if (free) allocated.free();
1046 }
1047 }
1048
1049
1059 public static <T extends StreamSourceChannel> ChannelListener<T> drainListener(long bytes, ChannelListener<? super T> finishListener, ChannelExceptionHandler<? super T> exceptionHandler) {
1060 return new DrainListener<T>(finishListener, exceptionHandler, bytes);
1061 }
1062
1063 private static class DelegatingSetter<T extends Channel> implements ChannelListener.Setter<T> {
1064 private final ChannelListener.Setter<? extends Channel> setter;
1065 private final T realChannel;
1066
1067 DelegatingSetter(final ChannelListener.Setter<? extends Channel> setter, final T realChannel) {
1068 this.setter = setter;
1069 this.realChannel = realChannel;
1070 }
1071
1072 public void set(final ChannelListener<? super T> channelListener) {
1073 setter.set(channelListener == null ? null : new DelegatingChannelListener<T>(channelListener, realChannel));
1074 }
1075
1076 public String toString() {
1077 return "Delegating setter -> " + setter;
1078 }
1079 }
1080
1081 private static class DelegatingChannelListener<T extends Channel> implements ChannelListener<Channel> {
1082
1083 private final ChannelListener<? super T> channelListener;
1084 private final T realChannel;
1085
1086 public DelegatingChannelListener(final ChannelListener<? super T> channelListener, final T realChannel) {
1087 this.channelListener = channelListener;
1088 this.realChannel = realChannel;
1089 }
1090
1091 public void handleEvent(final Channel channel) {
1092 invokeChannelListener(realChannel, channelListener);
1093 }
1094
1095 public String toString() {
1096 return "Delegating channel listener -> " + channelListener;
1097 }
1098 }
1099
1100 private static class SetterDelegatingListener<C extends Channel, T extends Channel> implements ChannelListener<C> {
1101
1102 private final SimpleSetter<T> setter;
1103 private final T channel;
1104
1105 public SetterDelegatingListener(final SimpleSetter<T> setter, final T channel) {
1106 this.setter = setter;
1107 this.channel = channel;
1108 }
1109
1110 public void handleEvent(final C channel) {
1111 invokeChannelListener(this.channel, setter.get());
1112 }
1113
1114 public String toString() {
1115 return "Setter delegating channel listener -> " + setter;
1116 }
1117 }
1118
1119 private static final ChannelExceptionHandler<Channel> CLOSING_HANDLER = new ChannelExceptionHandler<Channel>() {
1120 public void handleException(final Channel channel, final IOException exception) {
1121 IoUtils.safeClose(channel);
1122 }
1123 };
1124
1125 private static class DrainListener<T extends StreamSourceChannel> implements ChannelListener<T> {
1126 private final ChannelListener<? super T> finishListener;
1127 private final ChannelExceptionHandler<? super T> exceptionHandler;
1128 private long count;
1129
1130 private DrainListener(final ChannelListener<? super T> finishListener, final ChannelExceptionHandler<? super T> exceptionHandler, final long count) {
1131 this.finishListener = finishListener;
1132 this.exceptionHandler = exceptionHandler;
1133 this.count = count;
1134 }
1135
1136 public void handleEvent(final T channel) {
1137 try {
1138 long count = this.count;
1139 try {
1140 long res;
1141 for (;;) {
1142 res = Channels.drain(channel, count);
1143 if (res == -1 || res == count) {
1144 this.count = 0L;
1145 invokeChannelListener(channel, finishListener);
1146 return;
1147 } else if (res == 0) {
1148 return;
1149 } else if (count < Long.MAX_VALUE) {
1150
1151 count -= res;
1152 }
1153 }
1154 } finally {
1155 this.count = count;
1156 }
1157 } catch (IOException e) {
1158 this.count = 0L;
1159 if (exceptionHandler != null) {
1160 invokeChannelExceptionHandler(channel, exceptionHandler, e);
1161 } else {
1162 IoUtils.safeShutdownReads(channel);
1163 }
1164 }
1165 }
1166
1167 public String toString() {
1168 return "Draining channel listener (" + count + " bytes) -> " + finishListener;
1169 }
1170 }
1171 }
1172