1
18
19 package org.xnio.nio;
20
21 import static org.xnio.nio.Log.log;
22
23 import java.io.IOException;
24 import java.net.SocketException;
25 import java.nio.ByteBuffer;
26 import java.nio.channels.CancelledKeyException;
27 import java.nio.channels.ClosedChannelException;
28 import java.nio.channels.FileChannel;
29 import java.nio.channels.SelectionKey;
30 import java.nio.channels.SocketChannel;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
33 import org.xnio.Bits;
34 import org.xnio.Xnio;
35 import org.xnio.XnioIoThread;
36 import org.xnio.XnioWorker;
37 import org.xnio.channels.ReadTimeoutException;
38 import org.xnio.channels.StreamSinkChannel;
39 import org.xnio.channels.StreamSourceChannel;
40 import org.xnio.channels.WriteTimeoutException;
41 import org.xnio.conduits.Conduits;
42 import org.xnio.conduits.ReadReadyHandler;
43 import org.xnio.conduits.StreamSinkConduit;
44 import org.xnio.conduits.StreamSourceConduit;
45 import org.xnio.conduits.WriteReadyHandler;
46
47 final class NioSocketConduit extends NioHandle implements StreamSourceConduit, StreamSinkConduit {
48 private final SocketChannel socketChannel;
49 private final NioSocketStreamConnection connection;
50 private ReadReadyHandler readReadyHandler;
51 private WriteReadyHandler writeReadyHandler;
52
53 @SuppressWarnings("unused")
54 private volatile int readTimeout;
55 private long lastRead;
56
57 @SuppressWarnings("rawtypes")
58 private static final AtomicIntegerFieldUpdater<NioSocketConduit> readTimeoutUpdater = AtomicIntegerFieldUpdater.newUpdater(NioSocketConduit.class, "readTimeout");
59
60 @SuppressWarnings("unused")
61 private volatile int writeTimeout;
62 private long lastWrite;
63
64 @SuppressWarnings("rawtypes")
65 private static final AtomicIntegerFieldUpdater<NioSocketConduit> writeTimeoutUpdater = AtomicIntegerFieldUpdater.newUpdater(NioSocketConduit.class, "writeTimeout");
66
67 NioSocketConduit(final WorkerThread workerThread, final SelectionKey selectionKey, final NioSocketStreamConnection connection) {
68 super(workerThread, selectionKey);
69 this.connection = connection;
70 this.socketChannel = (SocketChannel) selectionKey.channel();
71 }
72
73 void handleReady(int ops) {
74 try {
75 if (ops == 0) {
76
77 final SelectionKey key = getSelectionKey();
78 final int interestOps = key.interestOps();
79 if (interestOps != 0) {
80 ops = interestOps;
81 } else {
82
83 forceTermination();
84 return;
85 }
86 }
87 if (Bits.allAreSet(ops, SelectionKey.OP_READ)) try {
88 if (isReadShutdown()) suspendReads();
89 readReadyHandler.readReady();
90 } catch (Throwable ignored) {
91 }
92 if (Bits.allAreSet(ops, SelectionKey.OP_WRITE)) try {
93 if (isWriteShutdown()) suspendWrites();
94 writeReadyHandler.writeReady();
95 } catch (Throwable ignored) {
96 }
97 } catch (CancelledKeyException ignored) {}
98 }
99
100 public XnioWorker getWorker() {
101 return getWorkerThread().getWorker();
102 }
103
104 void forceTermination() {
105 final ReadReadyHandler read = readReadyHandler;
106 if (read != null) read.forceTermination();
107 final WriteReadyHandler write = writeReadyHandler;
108 if (write != null) write.forceTermination();
109 }
110
111 void terminated() {
112 final ReadReadyHandler read = readReadyHandler;
113 if (read != null) read.terminated();
114 final WriteReadyHandler write = writeReadyHandler;
115 if (write != null) write.terminated();
116 }
117
118
119
120 int getAndSetWriteTimeout(int newVal) {
121 return writeTimeoutUpdater.getAndSet(this, newVal);
122 }
123
124 int getWriteTimeout() {
125 return writeTimeout;
126 }
127
128 private void checkWriteTimeout(final boolean xfer) throws WriteTimeoutException {
129 int timeout = writeTimeout;
130 if (timeout > 0) {
131 if (xfer) {
132 lastWrite = System.nanoTime();
133 } else {
134 long lastRead = this.lastWrite;
135 if (lastRead > 0L && ((System.nanoTime() - lastRead) / 1000000L) > (long) timeout) {
136 throw log.writeTimeout();
137 }
138 }
139 }
140 }
141
142 public final long transferFrom(final FileChannel src, final long position, final long count) throws IOException {
143 long res = src.transferTo(position, count, socketChannel);
144 checkWriteTimeout(res > 0L);
145 return res;
146 }
147
148 public long transferFrom(final StreamSourceChannel source, final long count, final ByteBuffer throughBuffer) throws IOException {
149 return Conduits.transfer(source, count, throughBuffer, this);
150 }
151
152 public int write(final ByteBuffer src) throws IOException {
153 int res = socketChannel.write(src);
154 checkWriteTimeout(res > 0);
155 return res;
156 }
157
158 public long write(final ByteBuffer[] srcs, final int offset, final int length) throws IOException {
159 if (length == 1) {
160 return write(srcs[offset]);
161 }
162 long res = socketChannel.write(srcs, offset, length);
163 checkWriteTimeout(res > 0L);
164 return res;
165 }
166
167 @Override
168 public int writeFinal(ByteBuffer src) throws IOException {
169 return Conduits.writeFinalBasic(this, src);
170 }
171
172 @Override
173 public long writeFinal(ByteBuffer[] srcs, int offset, int length) throws IOException {
174 return Conduits.writeFinalBasic(this, srcs, offset, length);
175 }
176
177 public boolean flush() throws IOException {
178 return true;
179 }
180
181 public void terminateWrites() throws IOException {
182 if (connection.writeClosed()) try {
183 if (getSelectionKey().isValid()) {
184 suspend(SelectionKey.OP_WRITE);
185 }
186 if (socketChannel.isOpen()) try {
187 socketChannel.socket().shutdownOutput();
188 } catch (SocketException ignored) {
189
190 }
191 } catch (ClosedChannelException ignored) {
192 } finally {
193 writeTerminated();
194 }
195 }
196
197 public void truncateWrites() throws IOException {
198 terminateWrites();
199 }
200
201 void writeTerminated() {
202 final WriteReadyHandler writeReadyHandler = this.writeReadyHandler;
203 if (writeReadyHandler != null) try {
204 writeReadyHandler.terminated();
205 } catch (Throwable ignored) {}
206 }
207
208 public boolean isWriteShutdown() {
209 return connection.isWriteShutdown();
210 }
211
212 public void resumeWrites() {
213 resume(SelectionKey.OP_WRITE);
214 }
215
216 public void suspendWrites() {
217 suspend(SelectionKey.OP_WRITE);
218 }
219
220 public void wakeupWrites() {
221 wakeup(SelectionKey.OP_WRITE);
222 }
223
224 public boolean isWriteResumed() {
225 return isResumed(SelectionKey.OP_WRITE);
226 }
227
228 public void awaitWritable() throws IOException {
229 Xnio.checkBlockingAllowed();
230 if (isWriteShutdown()) {
231 return;
232 }
233 SelectorUtils.await((NioXnio)getWorker().getXnio(), socketChannel, SelectionKey.OP_WRITE);
234 }
235
236 public void awaitWritable(final long time, final TimeUnit timeUnit) throws IOException {
237 Xnio.checkBlockingAllowed();
238 if (isWriteShutdown()) {
239 return;
240 }
241 SelectorUtils.await((NioXnio)getWorker().getXnio(), socketChannel, SelectionKey.OP_WRITE, time, timeUnit);
242 }
243
244 public XnioIoThread getWriteThread() {
245 return getWorkerThread();
246 }
247
248 public void setWriteReadyHandler(final WriteReadyHandler handler) {
249 writeReadyHandler = handler;
250 }
251
252
253
254 int getAndSetReadTimeout(int newVal) {
255 return readTimeoutUpdater.getAndSet(this, newVal);
256 }
257
258 int getReadTimeout() {
259 return readTimeout;
260 }
261
262 private void checkReadTimeout(final boolean xfer) throws ReadTimeoutException {
263 int timeout = readTimeout;
264 if (timeout > 0) {
265 if (xfer) {
266 lastRead = System.nanoTime();
267 } else {
268 long lastRead = this.lastRead;
269 if (lastRead > 0L && ((System.nanoTime() - lastRead) / 1000000L) > (long) timeout) {
270 throw log.readTimeout();
271 }
272 }
273 }
274 }
275
276 public long transferTo(final long position, final long count, final FileChannel target) throws IOException {
277 long res = target.transferFrom(socketChannel, position, count);
278 checkReadTimeout(res > 0L);
279 return res;
280 }
281
282 public long transferTo(final long count, final ByteBuffer throughBuffer, final StreamSinkChannel target) throws IOException {
283 return Conduits.transfer(this, count, throughBuffer, target);
284 }
285
286 public int read(final ByteBuffer dst) throws IOException {
287 int res;
288 try {
289 res = socketChannel.read(dst);
290 } catch (ClosedChannelException e) {
291 return -1;
292 }
293 if (res != -1) checkReadTimeout(res > 0);
294 else terminateReads();
295 return res;
296 }
297
298 public long read(final ByteBuffer[] dsts, final int offset, final int length) throws IOException {
299 if (length == 1) {
300 return read(dsts[offset]);
301 }
302 long res;
303 try {
304 res = socketChannel.read(dsts, offset, length);
305 } catch (ClosedChannelException e) {
306 return -1L;
307 }
308 if (res != -1L) checkReadTimeout(res > 0L);
309 else terminateReads();
310 return res;
311 }
312
313 public void terminateReads() throws IOException {
314 if (connection.readClosed()) try {
315 if (getSelectionKey().isValid()) {
316 suspend(SelectionKey.OP_READ);
317 }
318 if (socketChannel.isOpen()) try {
319 socketChannel.socket().shutdownInput();
320 } catch (SocketException ignored) {
321
322 }
323 } catch (ClosedChannelException ignored) {
324 } finally {
325 readTerminated();
326 }
327 }
328
329 void readTerminated() {
330 final ReadReadyHandler readReadyHandler = this.readReadyHandler;
331 if (readReadyHandler != null) try {
332 readReadyHandler.terminated();
333 } catch (Throwable ignored) {}
334 }
335
336 public boolean isReadShutdown() {
337 return connection.isReadShutdown();
338 }
339
340 public void resumeReads() {
341 resume(SelectionKey.OP_READ);
342 }
343
344 public void suspendReads() {
345 suspend(SelectionKey.OP_READ);
346 }
347
348 public void wakeupReads() {
349 wakeup(SelectionKey.OP_READ);
350 }
351
352 public boolean isReadResumed() {
353 return isResumed(SelectionKey.OP_READ);
354 }
355
356 public void awaitReadable() throws IOException {
357 Xnio.checkBlockingAllowed();
358 SelectorUtils.await((NioXnio)getWorker().getXnio(), socketChannel, SelectionKey.OP_READ);
359 }
360
361 public void awaitReadable(final long time, final TimeUnit timeUnit) throws IOException {
362 Xnio.checkBlockingAllowed();
363 SelectorUtils.await((NioXnio)getWorker().getXnio(), socketChannel, SelectionKey.OP_READ, time, timeUnit);
364 }
365
366 public XnioIoThread getReadThread() {
367 return getWorkerThread();
368 }
369
370 public void setReadReadyHandler(final ReadReadyHandler handler) {
371 this.readReadyHandler = handler;
372 }
373
374 SocketChannel getSocketChannel() {
375 return socketChannel;
376 }
377 }
378