1 /*
2  * Copyright 2012 The Netty Project
3  *
4  * The Netty Project licenses this file to you under the Apache License,
5  * version 2.0 (the "License"); you may not use this file except in compliance
6  * with the License. You may obtain a copy of the License at:
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13  * License for the specific language governing permissions and limitations
14  * under the License.
15  */

16 package io.netty.buffer;
17
18 import io.netty.util.ReferenceCounted;
19 import io.netty.util.internal.ObjectUtil;
20 import io.netty.util.internal.StringUtil;
21
22 import java.io.DataInput;
23 import java.io.DataInputStream;
24 import java.io.EOFException;
25 import java.io.IOException;
26 import java.io.InputStream;
27
28 /**
29  * An {@link InputStream} which reads data from a {@link ByteBuf}.
30  * <p>
31  * A read operation against this stream will occur at the {@code readerIndex}
32  * of its underlying buffer and the {@code readerIndex} will increase during
33  * the read operation.  Please note that it only reads up to the number of
34  * readable bytes determined at the moment of construction.  Therefore,
35  * updating {@link ByteBuf#writerIndex()} will not affect the return
36  * value of {@link #available()}.
37  * <p>
38  * This stream implements {@link DataInput} for your convenience.
39  * The endianness of the stream is not always big endian but depends on
40  * the endianness of the underlying buffer.
41  *
42  * @see ByteBufOutputStream
43  */

44 public class ByteBufInputStream extends InputStream implements DataInput {
45     private final ByteBuf buffer;
46     private final int startIndex;
47     private final int endIndex;
48     private boolean closed;
49     /**
50      * To preserve backwards compatibility (which didn't transfer ownership) we support a conditional flag which
51      * indicates if {@link #buffer} should be released when this {@link InputStream} is closed.
52      * However in future releases ownership should always be transferred and callers of this class should call
53      * {@link ReferenceCounted#retain()} if necessary.
54      */

55     private final boolean releaseOnClose;
56
57     /**
58      * Creates a new stream which reads data from the specified {@code buffer}
59      * starting at the current {@code readerIndex} and ending at the current
60      * {@code writerIndex}.
61      * @param buffer The buffer which provides the content for this {@link InputStream}.
62      */

63     public ByteBufInputStream(ByteBuf buffer) {
64         this(buffer, buffer.readableBytes());
65     }
66
67     /**
68      * Creates a new stream which reads data from the specified {@code buffer}
69      * starting at the current {@code readerIndex} and ending at
70      * {@code readerIndex + length}.
71      * @param buffer The buffer which provides the content for this {@link InputStream}.
72      * @param length The length of the buffer to use for this {@link InputStream}.
73      * @throws IndexOutOfBoundsException
74      *         if {@code readerIndex + length} is greater than
75      *            {@code writerIndex}
76      */

77     public ByteBufInputStream(ByteBuf buffer, int length) {
78         this(buffer, length, false);
79     }
80
81     /**
82      * Creates a new stream which reads data from the specified {@code buffer}
83      * starting at the current {@code readerIndex} and ending at the current
84      * {@code writerIndex}.
85      * @param buffer The buffer which provides the content for this {@link InputStream}.
86      * @param releaseOnClose {@code true} means that when {@link #close()} is called then {@link ByteBuf#release()} will
87      *                       be called on {@code buffer}.
88      */

89     public ByteBufInputStream(ByteBuf buffer, boolean releaseOnClose) {
90         this(buffer, buffer.readableBytes(), releaseOnClose);
91     }
92
93     /**
94      * Creates a new stream which reads data from the specified {@code buffer}
95      * starting at the current {@code readerIndex} and ending at
96      * {@code readerIndex + length}.
97      * @param buffer The buffer which provides the content for this {@link InputStream}.
98      * @param length The length of the buffer to use for this {@link InputStream}.
99      * @param releaseOnClose {@code true} means that when {@link #close()} is called then {@link ByteBuf#release()} will
100      *                       be called on {@code buffer}.
101      * @throws IndexOutOfBoundsException
102      *         if {@code readerIndex + length} is greater than
103      *            {@code writerIndex}
104      */

105     public ByteBufInputStream(ByteBuf buffer, int length, boolean releaseOnClose) {
106         ObjectUtil.checkNotNull(buffer, "buffer");
107         if (length < 0) {
108             if (releaseOnClose) {
109                 buffer.release();
110             }
111             throw new IllegalArgumentException("length: " + length);
112         }
113         if (length > buffer.readableBytes()) {
114             if (releaseOnClose) {
115                 buffer.release();
116             }
117             throw new IndexOutOfBoundsException("Too many bytes to be read - Needs "
118                     + length + ", maximum is " + buffer.readableBytes());
119         }
120
121         this.releaseOnClose = releaseOnClose;
122         this.buffer = buffer;
123         startIndex = buffer.readerIndex();
124         endIndex = startIndex + length;
125         buffer.markReaderIndex();
126     }
127
128     /**
129      * Returns the number of read bytes by this stream so far.
130      */

131     public int readBytes() {
132         return buffer.readerIndex() - startIndex;
133     }
134
135     @Override
136     public void close() throws IOException {
137         try {
138             super.close();
139         } finally {
140             // The Closable interface says "If the stream is already closed then invoking this method has no effect."
141             if (releaseOnClose && !closed) {
142                 closed = true;
143                 buffer.release();
144             }
145         }
146     }
147
148     @Override
149     public int available() throws IOException {
150         return endIndex - buffer.readerIndex();
151     }
152
153     @Override
154     public void mark(int readlimit) {
155         buffer.markReaderIndex();
156     }
157
158     @Override
159     public boolean markSupported() {
160         return true;
161     }
162
163     @Override
164     public int read() throws IOException {
165         int available = available();
166         if (available == 0) {
167             return -1;
168         }
169         return buffer.readByte() & 0xff;
170     }
171
172     @Override
173     public int read(byte[] b, int off, int len) throws IOException {
174         int available = available();
175         if (available == 0) {
176             return -1;
177         }
178
179         len = Math.min(available, len);
180         buffer.readBytes(b, off, len);
181         return len;
182     }
183
184     @Override
185     public void reset() throws IOException {
186         buffer.resetReaderIndex();
187     }
188
189     @Override
190     public long skip(long n) throws IOException {
191         if (n > Integer.MAX_VALUE) {
192             return skipBytes(Integer.MAX_VALUE);
193         } else {
194             return skipBytes((int) n);
195         }
196     }
197
198     @Override
199     public boolean readBoolean() throws IOException {
200         checkAvailable(1);
201         return read() != 0;
202     }
203
204     @Override
205     public byte readByte() throws IOException {
206         int available = available();
207         if (available == 0) {
208             throw new EOFException();
209         }
210         return buffer.readByte();
211     }
212
213     @Override
214     public char readChar() throws IOException {
215         return (char) readShort();
216     }
217
218     @Override
219     public double readDouble() throws IOException {
220         return Double.longBitsToDouble(readLong());
221     }
222
223     @Override
224     public float readFloat() throws IOException {
225         return Float.intBitsToFloat(readInt());
226     }
227
228     @Override
229     public void readFully(byte[] b) throws IOException {
230         readFully(b, 0, b.length);
231     }
232
233     @Override
234     public void readFully(byte[] b, int off, int len) throws IOException {
235         checkAvailable(len);
236         buffer.readBytes(b, off, len);
237     }
238
239     @Override
240     public int readInt() throws IOException {
241         checkAvailable(4);
242         return buffer.readInt();
243     }
244
245     private StringBuilder lineBuf;
246
247     @Override
248     public String readLine() throws IOException {
249         int available = available();
250         if (available == 0) {
251             return null;
252         }
253
254         if (lineBuf != null) {
255             lineBuf.setLength(0);
256         }
257
258         loop: do {
259             int c = buffer.readUnsignedByte();
260             --available;
261             switch (c) {
262                 case '\n':
263                     break loop;
264
265                 case '\r':
266                     if (available > 0 && (char) buffer.getUnsignedByte(buffer.readerIndex()) == '\n') {
267                         buffer.skipBytes(1);
268                         --available;
269                     }
270                     break loop;
271
272                 default:
273                     if (lineBuf == null) {
274                         lineBuf = new StringBuilder();
275                     }
276                     lineBuf.append((char) c);
277             }
278         } while (available > 0);
279
280         return lineBuf != null && lineBuf.length() > 0 ? lineBuf.toString() : StringUtil.EMPTY_STRING;
281     }
282
283     @Override
284     public long readLong() throws IOException {
285         checkAvailable(8);
286         return buffer.readLong();
287     }
288
289     @Override
290     public short readShort() throws IOException {
291         checkAvailable(2);
292         return buffer.readShort();
293     }
294
295     @Override
296     public String readUTF() throws IOException {
297         return DataInputStream.readUTF(this);
298     }
299
300     @Override
301     public int readUnsignedByte() throws IOException {
302         return readByte() & 0xff;
303     }
304
305     @Override
306     public int readUnsignedShort() throws IOException {
307         return readShort() & 0xffff;
308     }
309
310     @Override
311     public int skipBytes(int n) throws IOException {
312         int nBytes = Math.min(available(), n);
313         buffer.skipBytes(nBytes);
314         return nBytes;
315     }
316
317     private void checkAvailable(int fieldSize) throws IOException {
318         if (fieldSize < 0) {
319             throw new IndexOutOfBoundsException("fieldSize cannot be a negative number");
320         }
321         if (fieldSize > available()) {
322             throw new EOFException("fieldSize is too long! Length is " + fieldSize
323                     + ", but maximum is " + available());
324         }
325     }
326 }
327