1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17 package org.apache.commons.io.input;
18
19 import static org.apache.commons.io.IOUtils.EOF;
20
21 import java.io.FilterInputStream;
22 import java.io.IOException;
23 import java.io.InputStream;
24
25 import org.apache.commons.io.IOUtils;
26 import org.apache.commons.io.build.AbstractStreamBuilder;
27 import org.apache.commons.io.function.Erase;
28 import org.apache.commons.io.function.IOConsumer;
29 import org.apache.commons.io.function.IOIntConsumer;
30
31 /**
32 * A proxy stream which acts as a {@link FilterInputStream}, by passing all method calls on to the proxied stream, not changing which methods are called.
33 * <p>
34 * It is an alternative base class to {@link FilterInputStream} to increase reusability, because {@link FilterInputStream} changes the methods being called,
35 * such as read(byte[]) to read(byte[], int, int).
36 * </p>
37 * <p>
38 * In addition, this class allows you to:
39 * </p>
40 * <ul>
41 * <li>notify a subclass that <em>n</em> bytes are about to be read through {@link #beforeRead(int)}</li>
42 * <li>notify a subclass that <em>n</em> bytes were read through {@link #afterRead(int)}</li>
43 * <li>notify a subclass that an exception was caught through {@link #handleIOException(IOException)}</li>
44 * <li>{@link #unwrap()} itself</li>
45 * </ul>
46 */
47 public abstract class ProxyInputStream extends FilterInputStream {
48
49 /**
50 * Abstracts builder properties for subclasses.
51 *
52 * @param <T> The InputStream type.
53 * @param <B> The builder type.
54 * @since 2.18.0
55 */
56 protected abstract static class AbstractBuilder<T, B extends AbstractStreamBuilder<T, B>> extends AbstractStreamBuilder<T, B> {
57
58 private IOIntConsumer afterRead;
59
60 /**
61 * Constructs a builder of {@code T}.
62 */
63 protected AbstractBuilder() {
64 // empty
65 }
66
67 /**
68 * Gets the {@link ProxyInputStream#afterRead(int)} consumer.
69 *
70 * @return the {@link ProxyInputStream#afterRead(int)} consumer.
71 */
72 public IOIntConsumer getAfterRead() {
73 return afterRead;
74 }
75
76 /**
77 * Sets the {@link ProxyInputStream#afterRead(int)} behavior, null resets to a NOOP.
78 * <p>
79 * Setting this value causes the {@link ProxyInputStream#afterRead(int) afterRead} method to delegate to the given consumer.
80 * </p>
81 * <p>
82 * If a subclass overrides {@link ProxyInputStream#afterRead(int) afterRead} and does not call {@code super.afterRead(int)}, then the given consumer is
83 * not called.
84 * </p>
85 * <p>
86 * This does <em>not</em> override a {@code ProxyInputStream} subclass' implementation of the {@link ProxyInputStream#afterRead(int)} method, it can
87 * supplement it.
88 * </p>
89 *
90 * @param afterRead the {@link ProxyInputStream#afterRead(int)} behavior.
91 * @return this instance.
92 */
93 public B setAfterRead(final IOIntConsumer afterRead) {
94 this.afterRead = afterRead;
95 return asThis();
96 }
97
98 }
99
100 /**
101 * Tracks whether {@link #close()} has been called or not.
102 */
103 private volatile boolean closed;
104
105 /**
106 * Handles exceptions.
107 */
108 private final IOConsumer<IOException> exceptionHandler;
109
110 private final IOIntConsumer afterRead;
111
112 /**
113 * Constructs a new ProxyInputStream.
114 *
115 * @param builder How to build an instance.
116 * @throws IOException if an I/O error occurs.
117 * @since 2.18.0
118 */
119 @SuppressWarnings("resource")
120 protected ProxyInputStream(final AbstractBuilder<?, ?> builder) throws IOException {
121 // the delegate is stored in a protected superclass instance variable named 'in'.
122 this(builder.getInputStream(), builder);
123 }
124
125 /**
126 * Constructs a new ProxyInputStream.
127 *
128 * @param proxy the InputStream to proxy.
129 */
130 public ProxyInputStream(final InputStream proxy) {
131 // the delegate is stored in a protected superclass variable named 'in'.
132 super(proxy);
133 this.exceptionHandler = Erase::rethrow;
134 this.afterRead = IOIntConsumer.NOOP;
135 }
136
137 /**
138 * Constructs a new ProxyInputStream.
139 *
140 * @param proxy the InputStream to proxy.
141 * @param builder How to build an instance.
142 * @since 2.18.0
143 */
144 protected ProxyInputStream(final InputStream proxy, final AbstractBuilder<?, ?> builder) {
145 // the delegate is stored in a protected superclass instance variable named 'in'.
146 super(proxy);
147 this.exceptionHandler = Erase::rethrow;
148 this.afterRead = builder.getAfterRead() != null ? builder.getAfterRead() : IOIntConsumer.NOOP;
149 }
150
151 /**
152 * Called by the {@code read} methods after the proxied call has returned successfully. The argument is the number of bytes returned to the caller or
153 * {@link IOUtils#EOF EOF} if the end of stream was reached.
154 * <p>
155 * The default delegates to the consumer given to {@link AbstractBuilder#setAfterRead(IOIntConsumer)}.
156 * </p>
157 * <p>
158 * Alternatively, a subclasses can override this method to add post-processing functionality without having to override all the read methods.
159 * </p>
160 * <p>
161 * Note this method is <em>not</em> called from {@link #skip(long)} or {@link #reset()}. You need to explicitly override those methods if you want to add
162 * post-processing steps also to them.
163 * </p>
164 *
165 * @param n number of bytes read, or {@link IOUtils#EOF EOF} if the end of stream was reached.
166 * @throws IOException Thrown by a subclass or the consumer given to {@link AbstractBuilder#setAfterRead(IOIntConsumer)}.
167 * @since 2.0
168 */
169 protected void afterRead(final int n) throws IOException {
170 afterRead.accept(n);
171 }
172
173 /**
174 * Invokes the delegate's {@link InputStream#available()} method.
175 *
176 * @return the number of available bytes, 0 if the stream is closed.
177 * @throws IOException if an I/O error occurs.
178 */
179 @Override
180 public int available() throws IOException {
181 if (in != null && !isClosed()) {
182 try {
183 return in.available();
184 } catch (final IOException e) {
185 handleIOException(e);
186 }
187 }
188 return 0;
189 }
190
191 /**
192 * Invoked by the {@code read} methods before the call is proxied. The number
193 * of bytes that the caller wanted to read (1 for the {@link #read()}
194 * method, buffer length for {@link #read(byte[])}, etc.) is given as
195 * an argument.
196 * <p>
197 * Subclasses can override this method to add common pre-processing
198 * functionality without having to override all the read methods.
199 * The default implementation does nothing.
200 * </p>
201 * <p>
202 * Note this method is <em>not</em> called from {@link #skip(long)} or
203 * {@link #reset()}. You need to explicitly override those methods if
204 * you want to add pre-processing steps also to them.
205 * </p>
206 *
207 * @param n number of bytes that the caller asked to be read.
208 * @throws IOException if the pre-processing fails in a subclass.
209 * @since 2.0
210 */
211 @SuppressWarnings("unused") // Possibly thrown from subclasses.
212 protected void beforeRead(final int n) throws IOException {
213 // no-op default
214 }
215
216 /**
217 * Checks if this instance is closed and throws an IOException if so.
218 *
219 * @throws IOException if this instance is closed.
220 */
221 void checkOpen() throws IOException {
222 Input.checkOpen(!isClosed());
223 }
224
225 /**
226 * Invokes the delegate's {@link InputStream#close()} method.
227 *
228 * @throws IOException if an I/O error occurs.
229 */
230 @Override
231 public void close() throws IOException {
232 IOUtils.close(in, this::handleIOException);
233 closed = true;
234 }
235
236 /**
237 * Handles any IOExceptions thrown; by default, throws the given exception.
238 * <p>
239 * This method provides a point to implement custom exception
240 * handling. The default behavior is to re-throw the exception.
241 * </p>
242 *
243 * @param e The IOException thrown.
244 * @throws IOException if an I/O error occurs.
245 * @since 2.0
246 */
247 protected void handleIOException(final IOException e) throws IOException {
248 exceptionHandler.accept(e);
249 }
250
251 /**
252 * Tests whether this instance is closed.
253 *
254 * @return whether this instance is closed.
255 */
256 boolean isClosed() {
257 return closed;
258 }
259
260 /**
261 * Invokes the delegate's {@link InputStream#mark(int)} method.
262 *
263 * @param readLimit read ahead limit.
264 */
265 @Override
266 public synchronized void mark(final int readLimit) {
267 if (in != null) {
268 in.mark(readLimit);
269 }
270 }
271
272 /**
273 * Invokes the delegate's {@link InputStream#markSupported()} method.
274 *
275 * @return {@code true} if this stream instance supports the mark and reset methods; {@code false} otherwise.
276 * @see #mark(int)
277 * @see #reset()
278 */
279 @Override
280 public boolean markSupported() {
281 return in != null && in.markSupported();
282 }
283
284 /**
285 * Invokes the delegate's {@link InputStream#read()} method unless the stream is closed.
286 *
287 * @return the byte read or {@link IOUtils#EOF EOF} if we reached the end of stream.
288 * @throws IOException if an I/O error occurs.
289 */
290 @Override
291 public int read() throws IOException {
292 try {
293 beforeRead(1);
294 final int b = in.read();
295 afterRead(b != EOF ? 1 : EOF);
296 return b;
297 } catch (final IOException e) {
298 handleIOException(e);
299 return EOF;
300 }
301 }
302
303 /**
304 * Invokes the delegate's {@link InputStream#read(byte[])} method.
305 *
306 * @param b the buffer to read the bytes into.
307 * @return the number of bytes read or {@link IOUtils#EOF EOF} if we reached the end of stream.
308 * @throws IOException
309 * <ul>
310 * <li>If the first byte cannot be read for any reason other than the end of the file,
311 * <li>if the input stream has been closed, or</li>
312 * <li>if some other I/O error occurs.</li>
313 * </ul>
314 */
315 @Override
316 public int read(final byte[] b) throws IOException {
317 try {
318 beforeRead(IOUtils.length(b));
319 final int n = in.read(b);
320 afterRead(n);
321 return n;
322 } catch (final IOException e) {
323 handleIOException(e);
324 return EOF;
325 }
326 }
327
328 /**
329 * Invokes the delegate's {@link InputStream#read(byte[], int, int)} method.
330 *
331 * @param b the buffer to read the bytes into.
332 * @param off The start offset.
333 * @param len The number of bytes to read.
334 * @return the number of bytes read or {@link IOUtils#EOF EOF} if we reached the end of stream.
335 * @throws IOException
336 * <ul>
337 * <li>If the first byte cannot be read for any reason other than the end of the file,
338 * <li>if the input stream has been closed, or</li>
339 * <li>if some other I/O error occurs.</li>
340 * </ul>
341 */
342 @Override
343 public int read(final byte[] b, final int off, final int len) throws IOException {
344 try {
345 beforeRead(len);
346 final int n = in.read(b, off, len);
347 afterRead(n);
348 return n;
349 } catch (final IOException e) {
350 handleIOException(e);
351 return EOF;
352 }
353 }
354
355 /**
356 * Invokes the delegate's {@link InputStream#reset()} method.
357 *
358 * @throws IOException if this stream has not been marked or if the mark has been invalidated.
359 */
360 @Override
361 public synchronized void reset() throws IOException {
362 try {
363 in.reset();
364 } catch (final IOException e) {
365 handleIOException(e);
366 }
367 }
368
369 /**
370 * Sets the underlying input stream.
371 *
372 * @param in The input stream to set in {@link java.io.FilterInputStream#in}.
373 * @return this instance.
374 * @since 2.19.0
375 */
376 public ProxyInputStream setReference(final InputStream in) {
377 this.in = in;
378 return this;
379 }
380
381 /**
382 * Invokes the delegate's {@link InputStream#skip(long)} method.
383 *
384 * @param n the number of bytes to skip.
385 * @return the actual number of bytes skipped.
386 * @throws IOException if the stream does not support seek, or if some other I/O error occurs.
387 */
388 @Override
389 public long skip(final long n) throws IOException {
390 try {
391 return in.skip(n);
392 } catch (final IOException e) {
393 handleIOException(e);
394 return 0;
395 }
396 }
397
398 /**
399 * Unwraps this instance by returning the underlying {@link InputStream}.
400 * <p>
401 * Use with caution; useful to query the underlying {@link InputStream}.
402 * </p>
403 *
404 * @return the underlying {@link InputStream}.
405 * @since 2.16.0
406 */
407 public InputStream unwrap() {
408 return in;
409 }
410
411 }
412