1 /*
2  * JBoss, Home of Professional Open Source.
3  * Copyright 2014 Red Hat, Inc., and individual contributors
4  * as indicated by the @author tags.
5  *
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  */

18
19 package io.undertow.channels;
20
21 import io.undertow.UndertowLogger;
22 import io.undertow.UndertowMessages;
23 import org.xnio.ChannelListener;
24 import org.xnio.ChannelListeners;
25 import org.xnio.Option;
26 import org.xnio.XnioExecutor;
27 import org.xnio.XnioIoThread;
28 import org.xnio.XnioWorker;
29 import org.xnio.channels.StreamSinkChannel;
30 import org.xnio.channels.StreamSourceChannel;
31 import org.xnio.conduits.ConduitStreamSinkChannel;
32
33 import java.io.IOException;
34 import java.nio.ByteBuffer;
35 import java.nio.channels.FileChannel;
36 import java.util.concurrent.TimeUnit;
37
38 /**
39  * Stream sink channel. When this channel is considered detached it will no longer forward
40  * calls to the delegate
41  *
42  * @author Stuart Douglas
43  */

44 public abstract class DetachableStreamSinkChannel implements StreamSinkChannel {
45
46
47     protected final StreamSinkChannel delegate;
48     protected ChannelListener.SimpleSetter<DetachableStreamSinkChannel> writeSetter;
49     protected ChannelListener.SimpleSetter<DetachableStreamSinkChannel> closeSetter;
50
51     public DetachableStreamSinkChannel(final StreamSinkChannel delegate) {
52         this.delegate = delegate;
53     }
54
55     protected abstract boolean isFinished();
56
57     @Override
58     public void suspendWrites() {
59         if (isFinished()) {
60             return;
61         }
62         delegate.suspendWrites();
63     }
64
65
66     @Override
67     public boolean isWriteResumed() {
68         if (isFinished()) {
69             return false;
70         }
71         return delegate.isWriteResumed();
72     }
73
74     @Override
75     public void shutdownWrites() throws IOException {
76         if (isFinished()) {
77             return;
78         }
79         delegate.shutdownWrites();
80     }
81
82     @Override
83     public void awaitWritable() throws IOException {
84         if (isFinished()) {
85             throw UndertowMessages.MESSAGES.channelIsClosed();
86         }
87         delegate.awaitWritable();
88     }
89
90     @Override
91     public void awaitWritable(final long time, final TimeUnit timeUnit) throws IOException {
92         if (isFinished()) {
93             throw UndertowMessages.MESSAGES.channelIsClosed();
94         }
95         delegate.awaitWritable(time, timeUnit);
96     }
97
98     @Override
99     public XnioExecutor getWriteThread() {
100         return delegate.getWriteThread();
101     }
102
103     @Override
104     public boolean isOpen() {
105         return !isFinished() && delegate.isOpen();
106     }
107
108     @Override
109     public void close() throws IOException {
110         if (isFinished()) return;
111         delegate.close();
112     }
113
114     @Override
115     public boolean flush() throws IOException {
116         if (isFinished()) {
117             return true;
118         }
119         return delegate.flush();
120     }
121
122     @Override
123     public long transferFrom(final FileChannel src, final long position, final long count) throws IOException {
124         if (isFinished()) {
125             throw UndertowMessages.MESSAGES.channelIsClosed();
126         }
127         return delegate.transferFrom(src, position, count);
128     }
129
130     @Override
131     public long transferFrom(final StreamSourceChannel source, final long count, final ByteBuffer throughBuffer) throws IOException {
132         if (isFinished()) {
133             throw UndertowMessages.MESSAGES.channelIsClosed();
134         }
135         return delegate.transferFrom(source, count, throughBuffer);
136     }
137
138     @Override
139     public ChannelListener.Setter<? extends StreamSinkChannel> getWriteSetter() {
140         if (writeSetter == null) {
141             writeSetter = new ChannelListener.SimpleSetter<>();
142             if (!isFinished()) {
143                 if(delegate instanceof ConduitStreamSinkChannel) {
144                     ((ConduitStreamSinkChannel) delegate).setWriteListener(new SetterDelegatingListener((ChannelListener.SimpleSetter)writeSetter, this));
145                 } else {
146                     delegate.getWriteSetter().set(new SetterDelegatingListener((ChannelListener.SimpleSetter)writeSetter, this));
147                 }
148             }
149         }
150         return writeSetter;
151     }
152
153     @Override
154     public ChannelListener.Setter<? extends StreamSinkChannel> getCloseSetter() {
155         if (closeSetter == null) {
156             closeSetter = new ChannelListener.SimpleSetter<>();
157             if (!isFinished()) {
158                 delegate.getCloseSetter().set(ChannelListeners.delegatingChannelListener(this, closeSetter));
159             }
160         }
161         return closeSetter;
162     }
163
164     @Override
165     public XnioWorker getWorker() {
166         return delegate.getWorker();
167     }
168
169     @Override
170     public XnioIoThread getIoThread() {
171         return delegate.getIoThread();
172     }
173
174     @Override
175     public long write(final ByteBuffer[] srcs, final int offset, final int length) throws IOException {
176         if (isFinished()) {
177             throw UndertowMessages.MESSAGES.channelIsClosed();
178         }
179         return delegate.write(srcs, offset, length);
180     }
181
182     @Override
183     public long write(final ByteBuffer[] srcs) throws IOException {
184         if (isFinished()) {
185             throw UndertowMessages.MESSAGES.channelIsClosed();
186         }
187         return delegate.write(srcs);
188     }
189
190     @Override
191     public int writeFinal(ByteBuffer src) throws IOException {
192         if (isFinished()) {
193             throw UndertowMessages.MESSAGES.channelIsClosed();
194         }
195         return delegate.writeFinal(src);
196     }
197
198     @Override
199     public long writeFinal(ByteBuffer[] srcs, int offset, int length) throws IOException {
200         if (isFinished()) {
201             throw UndertowMessages.MESSAGES.channelIsClosed();
202         }
203         return delegate.writeFinal(srcs, offset, length);
204     }
205
206     @Override
207     public long writeFinal(ByteBuffer[] srcs) throws IOException {
208         if (isFinished()) {
209             throw UndertowMessages.MESSAGES.channelIsClosed();
210         }
211         return delegate.writeFinal(srcs);
212     }
213
214     @Override
215     public boolean supportsOption(final Option<?> option) {
216         return delegate.supportsOption(option);
217     }
218
219     @Override
220     public <T> T getOption(final Option<T> option) throws IOException {
221         if (isFinished()) {
222             throw UndertowMessages.MESSAGES.channelIsClosed();
223         }
224         return delegate.getOption(option);
225     }
226
227     @Override
228     public <T> T setOption(final Option<T> option, final T value) throws IllegalArgumentException, IOException {
229         if (isFinished()) {
230             throw UndertowMessages.MESSAGES.channelIsClosed();
231         }
232         return delegate.setOption(option, value);
233     }
234
235     @Override
236     public int write(final ByteBuffer src) throws IOException {
237         if (isFinished()) {
238             throw UndertowMessages.MESSAGES.channelIsClosed();
239         }
240         return delegate.write(src);
241     }
242
243     @Override
244     public void resumeWrites() {
245         if (isFinished()) {
246             return;
247         }
248         delegate.resumeWrites();
249     }
250
251     @Override
252     public void wakeupWrites() {
253         if (isFinished()) {
254             return;
255         }
256         delegate.wakeupWrites();
257     }
258
259     public void responseDone() {
260         if(delegate instanceof ConduitStreamSinkChannel) {
261             ((ConduitStreamSinkChannel) delegate).setCloseListener(null);
262             ((ConduitStreamSinkChannel) delegate).setWriteListener(null);
263         } else {
264             delegate.getCloseSetter().set(null);
265             delegate.getWriteSetter().set(null);
266         }
267         if (delegate.isWriteResumed()) {
268             delegate.suspendWrites();
269         }
270     }
271
272     private static class SetterDelegatingListener implements ChannelListener<StreamSinkChannel> {
273
274         private final SimpleSetter<StreamSinkChannel> setter;
275         private final StreamSinkChannel channel;
276
277         SetterDelegatingListener(final SimpleSetter<StreamSinkChannel> setter, final StreamSinkChannel channel) {
278             this.setter = setter;
279             this.channel = channel;
280         }
281
282         public void handleEvent(final StreamSinkChannel channel) {
283             ChannelListener<? super StreamSinkChannel> channelListener = setter.get();
284             if(channelListener != null) {
285                 ChannelListeners.invokeChannelListener(this.channel, channelListener);
286             } else {
287                 UndertowLogger.REQUEST_LOGGER.debugf("suspending writes on %s to prevent listener runaway", channel);
288                 channel.suspendWrites();
289             }
290         }
291
292         public String toString() {
293             return "Setter delegating channel listener -> " + setter;
294         }
295     }
296
297 }
298