1
18
19 package io.undertow.conduits;
20
21 import io.undertow.server.AbstractServerConnection;
22 import org.xnio.Buffers;
23 import org.xnio.IoUtils;
24 import io.undertow.connector.PooledByteBuffer;
25 import org.xnio.channels.StreamSinkChannel;
26 import org.xnio.conduits.AbstractStreamSourceConduit;
27 import org.xnio.conduits.ConduitReadableByteChannel;
28 import org.xnio.conduits.StreamSourceConduit;
29
30 import java.io.IOException;
31 import java.nio.ByteBuffer;
32 import java.nio.channels.FileChannel;
33 import java.util.concurrent.TimeUnit;
34
35
38 public class ReadDataStreamSourceConduit extends AbstractStreamSourceConduit<StreamSourceConduit> {
39
40 private final AbstractServerConnection connection;
41
42 public ReadDataStreamSourceConduit(final StreamSourceConduit next, final AbstractServerConnection connection) {
43 super(next);
44 this.connection = connection;
45 }
46
47 public long transferTo(final long position, final long count, final FileChannel target) throws IOException {
48 return target.transferFrom(new ConduitReadableByteChannel(this), position, count);
49 }
50
51 public long transferTo(final long count, final ByteBuffer throughBuffer, final StreamSinkChannel target) throws IOException {
52 return IoUtils.transfer(new ConduitReadableByteChannel(this), count, throughBuffer, target);
53 }
54
55 @Override
56 public int read(final ByteBuffer dst) throws IOException {
57 PooledByteBuffer eb = connection.getExtraBytes();
58 if (eb != null) {
59 final ByteBuffer buffer = eb.getBuffer();
60 int result = Buffers.copy(dst, buffer);
61 if (!buffer.hasRemaining()) {
62 eb.close();
63 connection.setExtraBytes(null);
64 }
65 return result;
66 } else {
67 return super.read(dst);
68 }
69 }
70
71 @Override
72 public long read(final ByteBuffer[] dsts, final int offs, final int len) throws IOException {
73 PooledByteBuffer eb = connection.getExtraBytes();
74 if (eb != null) {
75 final ByteBuffer buffer = eb.getBuffer();
76 int result = Buffers.copy(dsts, offs, len, buffer);
77 if (!buffer.hasRemaining()) {
78 eb.close();
79 connection.setExtraBytes(null);
80 }
81 return result;
82 } else {
83 return super.read(dsts, offs, len);
84 }
85 }
86
87 @Override
88 public void resumeReads() {
89 if (connection.getExtraBytes() != null) {
90 wakeupReads();
91 } else {
92 super.resumeReads();
93 }
94 }
95
96 @Override
97 public void awaitReadable() throws IOException {
98 if (connection.getExtraBytes() != null) {
99 return;
100 }
101 super.awaitReadable();
102 }
103
104 @Override
105 public void awaitReadable(final long time, final TimeUnit timeUnit) throws IOException {
106 if (connection.getExtraBytes() != null) {
107 return;
108 }
109 super.awaitReadable(time, timeUnit);
110 }
111
112 }
113