1 /*
2  * JBoss, Home of Professional Open Source.
3  * Copyright 2014 Red Hat, Inc., and individual contributors
4  * as indicated by the @author tags.
5  *
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  */

18
19 package io.undertow.channels;
20
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23 import java.nio.channels.FileChannel;
24 import java.util.concurrent.TimeUnit;
25
26 import io.undertow.UndertowLogger;
27 import org.xnio.ChannelListener;
28 import org.xnio.ChannelListeners;
29 import org.xnio.Option;
30 import org.xnio.XnioExecutor;
31 import org.xnio.XnioIoThread;
32 import org.xnio.XnioWorker;
33 import org.xnio.channels.StreamSinkChannel;
34 import org.xnio.channels.StreamSourceChannel;
35 import org.xnio.conduits.ConduitStreamSourceChannel;
36
37 import io.undertow.UndertowMessages;
38
39 /**
40  * A stream source channel that can be marked as detached. Once this is marked as detached then
41  * calls will no longer be forwarded to the delegate.
42  *
43  * @author Stuart Douglas
44  */

45 public abstract class DetachableStreamSourceChannel implements StreamSourceChannel{
46
47     protected final StreamSourceChannel delegate;
48
49     protected ChannelListener.SimpleSetter<DetachableStreamSourceChannel> readSetter;
50     protected ChannelListener.SimpleSetter<DetachableStreamSourceChannel> closeSetter;
51
52     public DetachableStreamSourceChannel(final StreamSourceChannel delegate) {
53         this.delegate = delegate;
54     }
55
56     protected abstract boolean isFinished();
57
58     @Override
59     public void resumeReads() {
60         if (isFinished()) {
61             return;
62         }
63         delegate.resumeReads();
64     }
65
66     public long transferTo(final long position, final long count, final FileChannel target) throws IOException {
67         if (isFinished()) {
68             return -1;
69         }
70         return delegate.transferTo(position, count, target);
71     }
72
73     public void awaitReadable() throws IOException {
74         if (isFinished()) {
75             return;
76         }
77         delegate.awaitReadable();
78     }
79
80     public void suspendReads() {
81         if (isFinished()) {
82             return;
83         }
84         delegate.suspendReads();
85     }
86
87     public long transferTo(final long count, final ByteBuffer throughBuffer, final StreamSinkChannel target) throws IOException {
88         if (isFinished()) {
89             return -1;
90         }
91         return delegate.transferTo(count, throughBuffer, target);
92     }
93
94     public XnioWorker getWorker() {
95         return delegate.getWorker();
96     }
97
98     public boolean isReadResumed() {
99         if (isFinished()) {
100             return false;
101         }
102         return delegate.isReadResumed();
103     }
104
105     public <T> T setOption(final Option<T> option, final T value) throws IllegalArgumentException, IOException {
106
107         if (isFinished()) {
108             throw UndertowMessages.MESSAGES.channelIsClosed();
109         }
110         return delegate.setOption(option, value);
111     }
112
113     public boolean supportsOption(final Option<?> option) {
114         return delegate.supportsOption(option);
115     }
116
117     public void shutdownReads() throws IOException {
118         if (isFinished()) {
119             return;
120         }
121         delegate.shutdownReads();
122     }
123
124     public ChannelListener.Setter<? extends StreamSourceChannel> getReadSetter() {
125         if (readSetter == null) {
126             readSetter = new ChannelListener.SimpleSetter<>();
127             if (!isFinished()) {
128                 if(delegate instanceof ConduitStreamSourceChannel) {
129                     ((ConduitStreamSourceChannel)delegate).setReadListener(new SetterDelegatingListener((ChannelListener.SimpleSetter)readSetter, this));
130                 } else {
131                     delegate.getReadSetter().set(new SetterDelegatingListener((ChannelListener.SimpleSetter)readSetter, this));
132                 }
133             }
134         }
135         return readSetter;
136     }
137
138     public boolean isOpen() {
139         if (isFinished()) {
140             return false;
141         }
142         return delegate.isOpen();
143     }
144
145     public long read(final ByteBuffer[] dsts) throws IOException {
146         if (isFinished()) {
147             return -1;
148         }
149         return delegate.read(dsts);
150     }
151
152     public long read(final ByteBuffer[] dsts, final int offset, final int length) throws IOException {
153         if (isFinished()) {
154             return -1;
155         }
156         return delegate.read(dsts, offset, length);
157     }
158
159     public void wakeupReads() {
160         if (isFinished()) {
161             return;
162         }
163         delegate.wakeupReads();
164     }
165
166     public XnioExecutor getReadThread() {
167         return delegate.getReadThread();
168     }
169
170     public void awaitReadable(final long time, final TimeUnit timeUnit) throws IOException {
171         if (isFinished()) {
172             throw UndertowMessages.MESSAGES.channelIsClosed();
173         }
174         delegate.awaitReadable(time, timeUnit);
175     }
176
177     public ChannelListener.Setter<? extends StreamSourceChannel> getCloseSetter() {
178         if (closeSetter == null) {
179             closeSetter = new ChannelListener.SimpleSetter<>();
180             if (!isFinished()) {
181                 if(delegate instanceof ConduitStreamSourceChannel) {
182                     ((ConduitStreamSourceChannel)delegate).setCloseListener(ChannelListeners.delegatingChannelListener(this, closeSetter));
183                 } else {
184                     delegate.getCloseSetter().set(ChannelListeners.delegatingChannelListener(this, closeSetter));
185                 }
186             }
187         }
188         return closeSetter;
189     }
190
191     public void close() throws IOException {
192         if (isFinished()) {
193             return;
194         }
195         delegate.close();
196     }
197
198     public <T> T getOption(final Option<T> option) throws IOException {
199         if (isFinished()) {
200             throw UndertowMessages.MESSAGES.streamIsClosed();
201         }
202         return delegate.getOption(option);
203     }
204
205     public int read(final ByteBuffer dst) throws IOException {
206         if (isFinished()) {
207             return -1;
208         }
209         return delegate.read(dst);
210     }
211
212     @Override
213     public XnioIoThread getIoThread() {
214         return delegate.getIoThread();
215     }
216
217
218     private static class SetterDelegatingListener implements ChannelListener<StreamSourceChannel> {
219
220         private final SimpleSetter<StreamSourceChannel> setter;
221         private final StreamSourceChannel channel;
222
223         SetterDelegatingListener(final SimpleSetter<StreamSourceChannel> setter, final StreamSourceChannel channel) {
224             this.setter = setter;
225             this.channel = channel;
226         }
227
228         public void handleEvent(final StreamSourceChannel channel) {
229             ChannelListener<? super StreamSourceChannel> channelListener = setter.get();
230             if(channelListener != null) {
231                 ChannelListeners.invokeChannelListener(this.channel, channelListener);
232             } else {
233                 UndertowLogger.REQUEST_LOGGER.debugf("suspending reads on %s to prevent listener runaway", channel);
234                 channel.suspendReads();
235             }
236         }
237
238         public String toString() {
239             return "Setter delegating channel listener -> " + setter;
240         }
241     }
242 }
243