1 /*
2  * JBoss, Home of Professional Open Source.
3  * Copyright 2014 Red Hat, Inc., and individual contributors
4  * as indicated by the @author tags.
5  *
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  */

18
19 package io.undertow.conduits;
20
21 import io.undertow.server.AbstractServerConnection;
22 import org.xnio.Buffers;
23 import org.xnio.IoUtils;
24 import io.undertow.connector.PooledByteBuffer;
25 import org.xnio.channels.StreamSinkChannel;
26 import org.xnio.conduits.AbstractStreamSourceConduit;
27 import org.xnio.conduits.ConduitReadableByteChannel;
28 import org.xnio.conduits.StreamSourceConduit;
29
30 import java.io.IOException;
31 import java.nio.ByteBuffer;
32 import java.nio.channels.FileChannel;
33 import java.util.concurrent.TimeUnit;
34
35 /**
36  * @author Stuart Douglas
37  */

38 public class ReadDataStreamSourceConduit extends AbstractStreamSourceConduit<StreamSourceConduit> {
39
40     private final AbstractServerConnection connection;
41
42     public ReadDataStreamSourceConduit(final StreamSourceConduit next, final AbstractServerConnection connection) {
43         super(next);
44         this.connection = connection;
45     }
46
47     public long transferTo(final long position, final long count, final FileChannel target) throws IOException {
48         return target.transferFrom(new ConduitReadableByteChannel(this), position, count);
49     }
50
51     public long transferTo(final long count, final ByteBuffer throughBuffer, final StreamSinkChannel target) throws IOException {
52         return IoUtils.transfer(new ConduitReadableByteChannel(this), count, throughBuffer, target);
53     }
54
55     @Override
56     public int read(final ByteBuffer dst) throws IOException {
57         PooledByteBuffer eb = connection.getExtraBytes();
58         if (eb != null) {
59             final ByteBuffer buffer = eb.getBuffer();
60             int result = Buffers.copy(dst, buffer);
61             if (!buffer.hasRemaining()) {
62                 eb.close();
63                 connection.setExtraBytes(null);
64             }
65             return result;
66         } else {
67             return super.read(dst);
68         }
69     }
70
71     @Override
72     public long read(final ByteBuffer[] dsts, final int offs, final int len) throws IOException {
73         PooledByteBuffer eb = connection.getExtraBytes();
74         if (eb != null) {
75             final ByteBuffer buffer = eb.getBuffer();
76             int result = Buffers.copy(dsts, offs, len, buffer);
77             if (!buffer.hasRemaining()) {
78                 eb.close();
79                 connection.setExtraBytes(null);
80             }
81             return result;
82         } else {
83             return super.read(dsts, offs, len);
84         }
85     }
86
87     @Override
88     public void resumeReads() {
89         if (connection.getExtraBytes() != null) {
90             wakeupReads();
91         } else {
92             super.resumeReads();
93         }
94     }
95
96     @Override
97     public void awaitReadable() throws IOException {
98         if (connection.getExtraBytes() != null) {
99             return;
100         }
101         super.awaitReadable();
102     }
103
104     @Override
105     public void awaitReadable(final long time, final TimeUnit timeUnit) throws IOException {
106         if (connection.getExtraBytes() != null) {
107             return;
108         }
109         super.awaitReadable(time, timeUnit);
110     }
111
112 }
113