1
18
19 package io.undertow.channels;
20
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23 import java.nio.channels.FileChannel;
24 import java.util.concurrent.TimeUnit;
25
26 import io.undertow.UndertowLogger;
27 import org.xnio.ChannelListener;
28 import org.xnio.ChannelListeners;
29 import org.xnio.Option;
30 import org.xnio.XnioExecutor;
31 import org.xnio.XnioIoThread;
32 import org.xnio.XnioWorker;
33 import org.xnio.channels.StreamSinkChannel;
34 import org.xnio.channels.StreamSourceChannel;
35 import org.xnio.conduits.ConduitStreamSourceChannel;
36
37 import io.undertow.UndertowMessages;
38
39
45 public abstract class DetachableStreamSourceChannel implements StreamSourceChannel{
46
47 protected final StreamSourceChannel delegate;
48
49 protected ChannelListener.SimpleSetter<DetachableStreamSourceChannel> readSetter;
50 protected ChannelListener.SimpleSetter<DetachableStreamSourceChannel> closeSetter;
51
52 public DetachableStreamSourceChannel(final StreamSourceChannel delegate) {
53 this.delegate = delegate;
54 }
55
56 protected abstract boolean isFinished();
57
58 @Override
59 public void resumeReads() {
60 if (isFinished()) {
61 return;
62 }
63 delegate.resumeReads();
64 }
65
66 public long transferTo(final long position, final long count, final FileChannel target) throws IOException {
67 if (isFinished()) {
68 return -1;
69 }
70 return delegate.transferTo(position, count, target);
71 }
72
73 public void awaitReadable() throws IOException {
74 if (isFinished()) {
75 return;
76 }
77 delegate.awaitReadable();
78 }
79
80 public void suspendReads() {
81 if (isFinished()) {
82 return;
83 }
84 delegate.suspendReads();
85 }
86
87 public long transferTo(final long count, final ByteBuffer throughBuffer, final StreamSinkChannel target) throws IOException {
88 if (isFinished()) {
89 return -1;
90 }
91 return delegate.transferTo(count, throughBuffer, target);
92 }
93
94 public XnioWorker getWorker() {
95 return delegate.getWorker();
96 }
97
98 public boolean isReadResumed() {
99 if (isFinished()) {
100 return false;
101 }
102 return delegate.isReadResumed();
103 }
104
105 public <T> T setOption(final Option<T> option, final T value) throws IllegalArgumentException, IOException {
106
107 if (isFinished()) {
108 throw UndertowMessages.MESSAGES.channelIsClosed();
109 }
110 return delegate.setOption(option, value);
111 }
112
113 public boolean supportsOption(final Option<?> option) {
114 return delegate.supportsOption(option);
115 }
116
117 public void shutdownReads() throws IOException {
118 if (isFinished()) {
119 return;
120 }
121 delegate.shutdownReads();
122 }
123
124 public ChannelListener.Setter<? extends StreamSourceChannel> getReadSetter() {
125 if (readSetter == null) {
126 readSetter = new ChannelListener.SimpleSetter<>();
127 if (!isFinished()) {
128 if(delegate instanceof ConduitStreamSourceChannel) {
129 ((ConduitStreamSourceChannel)delegate).setReadListener(new SetterDelegatingListener((ChannelListener.SimpleSetter)readSetter, this));
130 } else {
131 delegate.getReadSetter().set(new SetterDelegatingListener((ChannelListener.SimpleSetter)readSetter, this));
132 }
133 }
134 }
135 return readSetter;
136 }
137
138 public boolean isOpen() {
139 if (isFinished()) {
140 return false;
141 }
142 return delegate.isOpen();
143 }
144
145 public long read(final ByteBuffer[] dsts) throws IOException {
146 if (isFinished()) {
147 return -1;
148 }
149 return delegate.read(dsts);
150 }
151
152 public long read(final ByteBuffer[] dsts, final int offset, final int length) throws IOException {
153 if (isFinished()) {
154 return -1;
155 }
156 return delegate.read(dsts, offset, length);
157 }
158
159 public void wakeupReads() {
160 if (isFinished()) {
161 return;
162 }
163 delegate.wakeupReads();
164 }
165
166 public XnioExecutor getReadThread() {
167 return delegate.getReadThread();
168 }
169
170 public void awaitReadable(final long time, final TimeUnit timeUnit) throws IOException {
171 if (isFinished()) {
172 throw UndertowMessages.MESSAGES.channelIsClosed();
173 }
174 delegate.awaitReadable(time, timeUnit);
175 }
176
177 public ChannelListener.Setter<? extends StreamSourceChannel> getCloseSetter() {
178 if (closeSetter == null) {
179 closeSetter = new ChannelListener.SimpleSetter<>();
180 if (!isFinished()) {
181 if(delegate instanceof ConduitStreamSourceChannel) {
182 ((ConduitStreamSourceChannel)delegate).setCloseListener(ChannelListeners.delegatingChannelListener(this, closeSetter));
183 } else {
184 delegate.getCloseSetter().set(ChannelListeners.delegatingChannelListener(this, closeSetter));
185 }
186 }
187 }
188 return closeSetter;
189 }
190
191 public void close() throws IOException {
192 if (isFinished()) {
193 return;
194 }
195 delegate.close();
196 }
197
198 public <T> T getOption(final Option<T> option) throws IOException {
199 if (isFinished()) {
200 throw UndertowMessages.MESSAGES.streamIsClosed();
201 }
202 return delegate.getOption(option);
203 }
204
205 public int read(final ByteBuffer dst) throws IOException {
206 if (isFinished()) {
207 return -1;
208 }
209 return delegate.read(dst);
210 }
211
212 @Override
213 public XnioIoThread getIoThread() {
214 return delegate.getIoThread();
215 }
216
217
218 private static class SetterDelegatingListener implements ChannelListener<StreamSourceChannel> {
219
220 private final SimpleSetter<StreamSourceChannel> setter;
221 private final StreamSourceChannel channel;
222
223 SetterDelegatingListener(final SimpleSetter<StreamSourceChannel> setter, final StreamSourceChannel channel) {
224 this.setter = setter;
225 this.channel = channel;
226 }
227
228 public void handleEvent(final StreamSourceChannel channel) {
229 ChannelListener<? super StreamSourceChannel> channelListener = setter.get();
230 if(channelListener != null) {
231 ChannelListeners.invokeChannelListener(this.channel, channelListener);
232 } else {
233 UndertowLogger.REQUEST_LOGGER.debugf("suspending reads on %s to prevent listener runaway", channel);
234 channel.suspendReads();
235 }
236 }
237
238 public String toString() {
239 return "Setter delegating channel listener -> " + setter;
240 }
241 }
242 }
243