1
18
19 package io.undertow.channels;
20
21 import io.undertow.UndertowLogger;
22 import io.undertow.UndertowMessages;
23 import org.xnio.ChannelListener;
24 import org.xnio.ChannelListeners;
25 import org.xnio.Option;
26 import org.xnio.XnioExecutor;
27 import org.xnio.XnioIoThread;
28 import org.xnio.XnioWorker;
29 import org.xnio.channels.StreamSinkChannel;
30 import org.xnio.channels.StreamSourceChannel;
31 import org.xnio.conduits.ConduitStreamSinkChannel;
32
33 import java.io.IOException;
34 import java.nio.ByteBuffer;
35 import java.nio.channels.FileChannel;
36 import java.util.concurrent.TimeUnit;
37
38
44 public abstract class DetachableStreamSinkChannel implements StreamSinkChannel {
45
46
47 protected final StreamSinkChannel delegate;
48 protected ChannelListener.SimpleSetter<DetachableStreamSinkChannel> writeSetter;
49 protected ChannelListener.SimpleSetter<DetachableStreamSinkChannel> closeSetter;
50
51 public DetachableStreamSinkChannel(final StreamSinkChannel delegate) {
52 this.delegate = delegate;
53 }
54
55 protected abstract boolean isFinished();
56
57 @Override
58 public void suspendWrites() {
59 if (isFinished()) {
60 return;
61 }
62 delegate.suspendWrites();
63 }
64
65
66 @Override
67 public boolean isWriteResumed() {
68 if (isFinished()) {
69 return false;
70 }
71 return delegate.isWriteResumed();
72 }
73
74 @Override
75 public void shutdownWrites() throws IOException {
76 if (isFinished()) {
77 return;
78 }
79 delegate.shutdownWrites();
80 }
81
82 @Override
83 public void awaitWritable() throws IOException {
84 if (isFinished()) {
85 throw UndertowMessages.MESSAGES.channelIsClosed();
86 }
87 delegate.awaitWritable();
88 }
89
90 @Override
91 public void awaitWritable(final long time, final TimeUnit timeUnit) throws IOException {
92 if (isFinished()) {
93 throw UndertowMessages.MESSAGES.channelIsClosed();
94 }
95 delegate.awaitWritable(time, timeUnit);
96 }
97
98 @Override
99 public XnioExecutor getWriteThread() {
100 return delegate.getWriteThread();
101 }
102
103 @Override
104 public boolean isOpen() {
105 return !isFinished() && delegate.isOpen();
106 }
107
108 @Override
109 public void close() throws IOException {
110 if (isFinished()) return;
111 delegate.close();
112 }
113
114 @Override
115 public boolean flush() throws IOException {
116 if (isFinished()) {
117 return true;
118 }
119 return delegate.flush();
120 }
121
122 @Override
123 public long transferFrom(final FileChannel src, final long position, final long count) throws IOException {
124 if (isFinished()) {
125 throw UndertowMessages.MESSAGES.channelIsClosed();
126 }
127 return delegate.transferFrom(src, position, count);
128 }
129
130 @Override
131 public long transferFrom(final StreamSourceChannel source, final long count, final ByteBuffer throughBuffer) throws IOException {
132 if (isFinished()) {
133 throw UndertowMessages.MESSAGES.channelIsClosed();
134 }
135 return delegate.transferFrom(source, count, throughBuffer);
136 }
137
138 @Override
139 public ChannelListener.Setter<? extends StreamSinkChannel> getWriteSetter() {
140 if (writeSetter == null) {
141 writeSetter = new ChannelListener.SimpleSetter<>();
142 if (!isFinished()) {
143 if(delegate instanceof ConduitStreamSinkChannel) {
144 ((ConduitStreamSinkChannel) delegate).setWriteListener(new SetterDelegatingListener((ChannelListener.SimpleSetter)writeSetter, this));
145 } else {
146 delegate.getWriteSetter().set(new SetterDelegatingListener((ChannelListener.SimpleSetter)writeSetter, this));
147 }
148 }
149 }
150 return writeSetter;
151 }
152
153 @Override
154 public ChannelListener.Setter<? extends StreamSinkChannel> getCloseSetter() {
155 if (closeSetter == null) {
156 closeSetter = new ChannelListener.SimpleSetter<>();
157 if (!isFinished()) {
158 delegate.getCloseSetter().set(ChannelListeners.delegatingChannelListener(this, closeSetter));
159 }
160 }
161 return closeSetter;
162 }
163
164 @Override
165 public XnioWorker getWorker() {
166 return delegate.getWorker();
167 }
168
169 @Override
170 public XnioIoThread getIoThread() {
171 return delegate.getIoThread();
172 }
173
174 @Override
175 public long write(final ByteBuffer[] srcs, final int offset, final int length) throws IOException {
176 if (isFinished()) {
177 throw UndertowMessages.MESSAGES.channelIsClosed();
178 }
179 return delegate.write(srcs, offset, length);
180 }
181
182 @Override
183 public long write(final ByteBuffer[] srcs) throws IOException {
184 if (isFinished()) {
185 throw UndertowMessages.MESSAGES.channelIsClosed();
186 }
187 return delegate.write(srcs);
188 }
189
190 @Override
191 public int writeFinal(ByteBuffer src) throws IOException {
192 if (isFinished()) {
193 throw UndertowMessages.MESSAGES.channelIsClosed();
194 }
195 return delegate.writeFinal(src);
196 }
197
198 @Override
199 public long writeFinal(ByteBuffer[] srcs, int offset, int length) throws IOException {
200 if (isFinished()) {
201 throw UndertowMessages.MESSAGES.channelIsClosed();
202 }
203 return delegate.writeFinal(srcs, offset, length);
204 }
205
206 @Override
207 public long writeFinal(ByteBuffer[] srcs) throws IOException {
208 if (isFinished()) {
209 throw UndertowMessages.MESSAGES.channelIsClosed();
210 }
211 return delegate.writeFinal(srcs);
212 }
213
214 @Override
215 public boolean supportsOption(final Option<?> option) {
216 return delegate.supportsOption(option);
217 }
218
219 @Override
220 public <T> T getOption(final Option<T> option) throws IOException {
221 if (isFinished()) {
222 throw UndertowMessages.MESSAGES.channelIsClosed();
223 }
224 return delegate.getOption(option);
225 }
226
227 @Override
228 public <T> T setOption(final Option<T> option, final T value) throws IllegalArgumentException, IOException {
229 if (isFinished()) {
230 throw UndertowMessages.MESSAGES.channelIsClosed();
231 }
232 return delegate.setOption(option, value);
233 }
234
235 @Override
236 public int write(final ByteBuffer src) throws IOException {
237 if (isFinished()) {
238 throw UndertowMessages.MESSAGES.channelIsClosed();
239 }
240 return delegate.write(src);
241 }
242
243 @Override
244 public void resumeWrites() {
245 if (isFinished()) {
246 return;
247 }
248 delegate.resumeWrites();
249 }
250
251 @Override
252 public void wakeupWrites() {
253 if (isFinished()) {
254 return;
255 }
256 delegate.wakeupWrites();
257 }
258
259 public void responseDone() {
260 if(delegate instanceof ConduitStreamSinkChannel) {
261 ((ConduitStreamSinkChannel) delegate).setCloseListener(null);
262 ((ConduitStreamSinkChannel) delegate).setWriteListener(null);
263 } else {
264 delegate.getCloseSetter().set(null);
265 delegate.getWriteSetter().set(null);
266 }
267 if (delegate.isWriteResumed()) {
268 delegate.suspendWrites();
269 }
270 }
271
272 private static class SetterDelegatingListener implements ChannelListener<StreamSinkChannel> {
273
274 private final SimpleSetter<StreamSinkChannel> setter;
275 private final StreamSinkChannel channel;
276
277 SetterDelegatingListener(final SimpleSetter<StreamSinkChannel> setter, final StreamSinkChannel channel) {
278 this.setter = setter;
279 this.channel = channel;
280 }
281
282 public void handleEvent(final StreamSinkChannel channel) {
283 ChannelListener<? super StreamSinkChannel> channelListener = setter.get();
284 if(channelListener != null) {
285 ChannelListeners.invokeChannelListener(this.channel, channelListener);
286 } else {
287 UndertowLogger.REQUEST_LOGGER.debugf("suspending writes on %s to prevent listener runaway", channel);
288 channel.suspendWrites();
289 }
290 }
291
292 public String toString() {
293 return "Setter delegating channel listener -> " + setter;
294 }
295 }
296
297 }
298