1 /*
2 * Copyright 2012 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package io.netty.buffer;
17
18 import io.netty.util.ReferenceCounted;
19 import io.netty.util.internal.ObjectUtil;
20 import io.netty.util.internal.StringUtil;
21
22 import java.io.DataInput;
23 import java.io.DataInputStream;
24 import java.io.EOFException;
25 import java.io.IOException;
26 import java.io.InputStream;
27
28 /**
29 * An {@link InputStream} which reads data from a {@link ByteBuf}.
30 * <p>
31 * A read operation against this stream will occur at the {@code readerIndex}
32 * of its underlying buffer and the {@code readerIndex} will increase during
33 * the read operation. Please note that it only reads up to the number of
34 * readable bytes determined at the moment of construction. Therefore,
35 * updating {@link ByteBuf#writerIndex()} will not affect the return
36 * value of {@link #available()}.
37 * <p>
38 * This stream implements {@link DataInput} for your convenience.
39 * The endianness of the stream is not always big endian but depends on
40 * the endianness of the underlying buffer.
41 *
42 * @see ByteBufOutputStream
43 */
44 public class ByteBufInputStream extends InputStream implements DataInput {
45 private final ByteBuf buffer;
46 private final int startIndex;
47 private final int endIndex;
48 private boolean closed;
49 /**
50 * To preserve backwards compatibility (which didn't transfer ownership) we support a conditional flag which
51 * indicates if {@link #buffer} should be released when this {@link InputStream} is closed.
52 * However in future releases ownership should always be transferred and callers of this class should call
53 * {@link ReferenceCounted#retain()} if necessary.
54 */
55 private final boolean releaseOnClose;
56
57 /**
58 * Creates a new stream which reads data from the specified {@code buffer}
59 * starting at the current {@code readerIndex} and ending at the current
60 * {@code writerIndex}.
61 * @param buffer The buffer which provides the content for this {@link InputStream}.
62 */
63 public ByteBufInputStream(ByteBuf buffer) {
64 this(buffer, buffer.readableBytes());
65 }
66
67 /**
68 * Creates a new stream which reads data from the specified {@code buffer}
69 * starting at the current {@code readerIndex} and ending at
70 * {@code readerIndex + length}.
71 * @param buffer The buffer which provides the content for this {@link InputStream}.
72 * @param length The length of the buffer to use for this {@link InputStream}.
73 * @throws IndexOutOfBoundsException
74 * if {@code readerIndex + length} is greater than
75 * {@code writerIndex}
76 */
77 public ByteBufInputStream(ByteBuf buffer, int length) {
78 this(buffer, length, false);
79 }
80
81 /**
82 * Creates a new stream which reads data from the specified {@code buffer}
83 * starting at the current {@code readerIndex} and ending at the current
84 * {@code writerIndex}.
85 * @param buffer The buffer which provides the content for this {@link InputStream}.
86 * @param releaseOnClose {@code true} means that when {@link #close()} is called then {@link ByteBuf#release()} will
87 * be called on {@code buffer}.
88 */
89 public ByteBufInputStream(ByteBuf buffer, boolean releaseOnClose) {
90 this(buffer, buffer.readableBytes(), releaseOnClose);
91 }
92
93 /**
94 * Creates a new stream which reads data from the specified {@code buffer}
95 * starting at the current {@code readerIndex} and ending at
96 * {@code readerIndex + length}.
97 * @param buffer The buffer which provides the content for this {@link InputStream}.
98 * @param length The length of the buffer to use for this {@link InputStream}.
99 * @param releaseOnClose {@code true} means that when {@link #close()} is called then {@link ByteBuf#release()} will
100 * be called on {@code buffer}.
101 * @throws IndexOutOfBoundsException
102 * if {@code readerIndex + length} is greater than
103 * {@code writerIndex}
104 */
105 public ByteBufInputStream(ByteBuf buffer, int length, boolean releaseOnClose) {
106 ObjectUtil.checkNotNull(buffer, "buffer");
107 if (length < 0) {
108 if (releaseOnClose) {
109 buffer.release();
110 }
111 throw new IllegalArgumentException("length: " + length);
112 }
113 if (length > buffer.readableBytes()) {
114 if (releaseOnClose) {
115 buffer.release();
116 }
117 throw new IndexOutOfBoundsException("Too many bytes to be read - Needs "
118 + length + ", maximum is " + buffer.readableBytes());
119 }
120
121 this.releaseOnClose = releaseOnClose;
122 this.buffer = buffer;
123 startIndex = buffer.readerIndex();
124 endIndex = startIndex + length;
125 buffer.markReaderIndex();
126 }
127
128 /**
129 * Returns the number of read bytes by this stream so far.
130 */
131 public int readBytes() {
132 return buffer.readerIndex() - startIndex;
133 }
134
135 @Override
136 public void close() throws IOException {
137 try {
138 super.close();
139 } finally {
140 // The Closable interface says "If the stream is already closed then invoking this method has no effect."
141 if (releaseOnClose && !closed) {
142 closed = true;
143 buffer.release();
144 }
145 }
146 }
147
148 @Override
149 public int available() throws IOException {
150 return endIndex - buffer.readerIndex();
151 }
152
153 @Override
154 public void mark(int readlimit) {
155 buffer.markReaderIndex();
156 }
157
158 @Override
159 public boolean markSupported() {
160 return true;
161 }
162
163 @Override
164 public int read() throws IOException {
165 int available = available();
166 if (available == 0) {
167 return -1;
168 }
169 return buffer.readByte() & 0xff;
170 }
171
172 @Override
173 public int read(byte[] b, int off, int len) throws IOException {
174 int available = available();
175 if (available == 0) {
176 return -1;
177 }
178
179 len = Math.min(available, len);
180 buffer.readBytes(b, off, len);
181 return len;
182 }
183
184 @Override
185 public void reset() throws IOException {
186 buffer.resetReaderIndex();
187 }
188
189 @Override
190 public long skip(long n) throws IOException {
191 if (n > Integer.MAX_VALUE) {
192 return skipBytes(Integer.MAX_VALUE);
193 } else {
194 return skipBytes((int) n);
195 }
196 }
197
198 @Override
199 public boolean readBoolean() throws IOException {
200 checkAvailable(1);
201 return read() != 0;
202 }
203
204 @Override
205 public byte readByte() throws IOException {
206 int available = available();
207 if (available == 0) {
208 throw new EOFException();
209 }
210 return buffer.readByte();
211 }
212
213 @Override
214 public char readChar() throws IOException {
215 return (char) readShort();
216 }
217
218 @Override
219 public double readDouble() throws IOException {
220 return Double.longBitsToDouble(readLong());
221 }
222
223 @Override
224 public float readFloat() throws IOException {
225 return Float.intBitsToFloat(readInt());
226 }
227
228 @Override
229 public void readFully(byte[] b) throws IOException {
230 readFully(b, 0, b.length);
231 }
232
233 @Override
234 public void readFully(byte[] b, int off, int len) throws IOException {
235 checkAvailable(len);
236 buffer.readBytes(b, off, len);
237 }
238
239 @Override
240 public int readInt() throws IOException {
241 checkAvailable(4);
242 return buffer.readInt();
243 }
244
245 private StringBuilder lineBuf;
246
247 @Override
248 public String readLine() throws IOException {
249 int available = available();
250 if (available == 0) {
251 return null;
252 }
253
254 if (lineBuf != null) {
255 lineBuf.setLength(0);
256 }
257
258 loop: do {
259 int c = buffer.readUnsignedByte();
260 --available;
261 switch (c) {
262 case '\n':
263 break loop;
264
265 case '\r':
266 if (available > 0 && (char) buffer.getUnsignedByte(buffer.readerIndex()) == '\n') {
267 buffer.skipBytes(1);
268 --available;
269 }
270 break loop;
271
272 default:
273 if (lineBuf == null) {
274 lineBuf = new StringBuilder();
275 }
276 lineBuf.append((char) c);
277 }
278 } while (available > 0);
279
280 return lineBuf != null && lineBuf.length() > 0 ? lineBuf.toString() : StringUtil.EMPTY_STRING;
281 }
282
283 @Override
284 public long readLong() throws IOException {
285 checkAvailable(8);
286 return buffer.readLong();
287 }
288
289 @Override
290 public short readShort() throws IOException {
291 checkAvailable(2);
292 return buffer.readShort();
293 }
294
295 @Override
296 public String readUTF() throws IOException {
297 return DataInputStream.readUTF(this);
298 }
299
300 @Override
301 public int readUnsignedByte() throws IOException {
302 return readByte() & 0xff;
303 }
304
305 @Override
306 public int readUnsignedShort() throws IOException {
307 return readShort() & 0xffff;
308 }
309
310 @Override
311 public int skipBytes(int n) throws IOException {
312 int nBytes = Math.min(available(), n);
313 buffer.skipBytes(nBytes);
314 return nBytes;
315 }
316
317 private void checkAvailable(int fieldSize) throws IOException {
318 if (fieldSize < 0) {
319 throw new IndexOutOfBoundsException("fieldSize cannot be a negative number");
320 }
321 if (fieldSize > available()) {
322 throw new EOFException("fieldSize is too long! Length is " + fieldSize
323 + ", but maximum is " + available());
324 }
325 }
326 }
327