1 /*
2  * JBoss, Home of Professional Open Source
3  *
4  * Copyright 2013 Red Hat, Inc. and/or its affiliates.
5  *
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18
19 package org.xnio.nio;
20
21 import static org.xnio.nio.Log.log;
22
23 import java.io.IOException;
24 import java.net.SocketException;
25 import java.nio.ByteBuffer;
26 import java.nio.channels.CancelledKeyException;
27 import java.nio.channels.ClosedChannelException;
28 import java.nio.channels.FileChannel;
29 import java.nio.channels.SelectionKey;
30 import java.nio.channels.SocketChannel;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
33 import org.xnio.Bits;
34 import org.xnio.Xnio;
35 import org.xnio.XnioIoThread;
36 import org.xnio.XnioWorker;
37 import org.xnio.channels.ReadTimeoutException;
38 import org.xnio.channels.StreamSinkChannel;
39 import org.xnio.channels.StreamSourceChannel;
40 import org.xnio.channels.WriteTimeoutException;
41 import org.xnio.conduits.Conduits;
42 import org.xnio.conduits.ReadReadyHandler;
43 import org.xnio.conduits.StreamSinkConduit;
44 import org.xnio.conduits.StreamSourceConduit;
45 import org.xnio.conduits.WriteReadyHandler;
46
47 final class NioSocketConduit extends NioHandle implements StreamSourceConduit, StreamSinkConduit {
48     private final SocketChannel socketChannel;
49     private final NioSocketStreamConnection connection;
50     private ReadReadyHandler readReadyHandler;
51     private WriteReadyHandler writeReadyHandler;
52
53     @SuppressWarnings("unused")
54     private volatile int readTimeout;
55     private long lastRead;
56
57     @SuppressWarnings("rawtypes")
58     private static final AtomicIntegerFieldUpdater<NioSocketConduit> readTimeoutUpdater = AtomicIntegerFieldUpdater.newUpdater(NioSocketConduit.class"readTimeout");
59
60     @SuppressWarnings("unused")
61     private volatile int writeTimeout;
62     private long lastWrite;
63
64     @SuppressWarnings("rawtypes")
65     private static final AtomicIntegerFieldUpdater<NioSocketConduit> writeTimeoutUpdater = AtomicIntegerFieldUpdater.newUpdater(NioSocketConduit.class"writeTimeout");
66
67     NioSocketConduit(final WorkerThread workerThread, final SelectionKey selectionKey, final NioSocketStreamConnection connection) {
68         super(workerThread, selectionKey);
69         this.connection = connection;
70         this.socketChannel = (SocketChannel) selectionKey.channel();
71     }
72
73     void handleReady(int ops) {
74         try {
75             if (ops == 0) {
76                 // the dreaded bug
77                 final SelectionKey key = getSelectionKey();
78                 final int interestOps = key.interestOps();
79                 if (interestOps != 0) {
80                     ops = interestOps;
81                 } else {
82                     // urp
83                     forceTermination();
84                     return;
85                 }
86             }
87             if (Bits.allAreSet(ops, SelectionKey.OP_READ)) try {
88                 if (isReadShutdown()) suspendReads();
89                 readReadyHandler.readReady();
90             } catch (Throwable ignored) {
91             }
92             if (Bits.allAreSet(ops, SelectionKey.OP_WRITE)) try {
93                 if (isWriteShutdown()) suspendWrites();
94                 writeReadyHandler.writeReady();
95             } catch (Throwable ignored) {
96             }
97         } catch (CancelledKeyException ignored) {}
98     }
99
100     public XnioWorker getWorker() {
101         return getWorkerThread().getWorker();
102     }
103
104     void forceTermination() {
105         final ReadReadyHandler read = readReadyHandler;
106         if (read != null) read.forceTermination();
107         final WriteReadyHandler write = writeReadyHandler;
108         if (write != null) write.forceTermination();
109     }
110
111     void terminated() {
112         final ReadReadyHandler read = readReadyHandler;
113         if (read != null) read.terminated();
114         final WriteReadyHandler write = writeReadyHandler;
115         if (write != null) write.terminated();
116     }
117
118     // Write methods
119
120     int getAndSetWriteTimeout(int newVal) {
121         return writeTimeoutUpdater.getAndSet(this, newVal);
122     }
123
124     int getWriteTimeout() {
125         return writeTimeout;
126     }
127
128     private void checkWriteTimeout(final boolean xfer) throws WriteTimeoutException {
129         int timeout = writeTimeout;
130         if (timeout > 0) {
131             if (xfer) {
132                 lastWrite = System.nanoTime();
133             } else {
134                 long lastRead = this.lastWrite;
135                 if (lastRead > 0L && ((System.nanoTime() - lastRead) / 1000000L) > (long) timeout) {
136                     throw log.writeTimeout();
137                 }
138             }
139         }
140     }
141
142     public final long transferFrom(final FileChannel src, final long position, final long count) throws IOException {
143         long res = src.transferTo(position, count, socketChannel);
144         checkWriteTimeout(res > 0L);
145         return res;
146     }
147
148     public long transferFrom(final StreamSourceChannel source, final long count, final ByteBuffer throughBuffer) throws IOException {
149         return Conduits.transfer(source, count, throughBuffer, this);
150     }
151
152     public int write(final ByteBuffer src) throws IOException {
153         int res = socketChannel.write(src);
154         checkWriteTimeout(res > 0);
155         return res;
156     }
157
158     public long write(final ByteBuffer[] srcs, final int offset, final int length) throws IOException {
159         if (length == 1) {
160             return write(srcs[offset]);
161         }
162         long res = socketChannel.write(srcs, offset, length);
163         checkWriteTimeout(res > 0L);
164         return res;
165     }
166
167     @Override
168     public int writeFinal(ByteBuffer src) throws IOException {
169         return Conduits.writeFinalBasic(this, src);
170     }
171
172     @Override
173     public long writeFinal(ByteBuffer[] srcs, int offset, int length) throws IOException {
174         return Conduits.writeFinalBasic(this, srcs, offset, length);
175     }
176
177     public boolean flush() throws IOException {
178         return true;
179     }
180
181     public void terminateWrites() throws IOException {
182         if (connection.writeClosed()) try {
183             if (getSelectionKey().isValid()) {
184                 suspend(SelectionKey.OP_WRITE);
185             }
186             if (socketChannel.isOpen()) try {
187                 socketChannel.socket().shutdownOutput();
188             } catch (SocketException ignored) {
189                 // IBM incorrectly throws this exception on ENOTCONN; it's probably less harmful just to swallow it
190             }
191         } catch (ClosedChannelException ignored) {
192         } finally {
193             writeTerminated();
194         }
195     }
196
197     public void truncateWrites() throws IOException {
198         terminateWrites();
199     }
200
201     void writeTerminated() {
202         final WriteReadyHandler writeReadyHandler = this.writeReadyHandler;
203         if (writeReadyHandler != nulltry {
204             writeReadyHandler.terminated();
205         } catch (Throwable ignored) {}
206     }
207
208     public boolean isWriteShutdown() {
209         return connection.isWriteShutdown();
210     }
211
212     public void resumeWrites() {
213         resume(SelectionKey.OP_WRITE);
214     }
215
216     public void suspendWrites() {
217         suspend(SelectionKey.OP_WRITE);
218     }
219
220     public void wakeupWrites() {
221         wakeup(SelectionKey.OP_WRITE);
222     }
223
224     public boolean isWriteResumed() {
225         return isResumed(SelectionKey.OP_WRITE);
226     }
227
228     public void awaitWritable() throws IOException {
229         Xnio.checkBlockingAllowed();
230         if (isWriteShutdown()) {
231             return;
232         }
233         SelectorUtils.await((NioXnio)getWorker().getXnio(), socketChannel, SelectionKey.OP_WRITE);
234     }
235
236     public void awaitWritable(final long time, final TimeUnit timeUnit) throws IOException {
237         Xnio.checkBlockingAllowed();
238         if (isWriteShutdown()) {
239             return;
240         }
241         SelectorUtils.await((NioXnio)getWorker().getXnio(), socketChannel, SelectionKey.OP_WRITE, time, timeUnit);
242     }
243
244     public XnioIoThread getWriteThread() {
245         return getWorkerThread();
246     }
247
248     public void setWriteReadyHandler(final WriteReadyHandler handler) {
249         writeReadyHandler = handler;
250     }
251
252     // Read methods
253
254     int getAndSetReadTimeout(int newVal) {
255         return readTimeoutUpdater.getAndSet(this, newVal);
256     }
257
258     int getReadTimeout() {
259         return readTimeout;
260     }
261
262     private void checkReadTimeout(final boolean xfer) throws ReadTimeoutException {
263         int timeout = readTimeout;
264         if (timeout > 0) {
265             if (xfer) {
266                 lastRead = System.nanoTime();
267             } else {
268                 long lastRead = this.lastRead;
269                 if (lastRead > 0L && ((System.nanoTime() - lastRead) / 1000000L) > (long) timeout) {
270                     throw log.readTimeout();
271                 }
272             }
273         }
274     }
275
276     public long transferTo(final long position, final long count, final FileChannel target) throws IOException {
277         long res = target.transferFrom(socketChannel, position, count);
278         checkReadTimeout(res > 0L);
279         return res;
280     }
281
282     public long transferTo(final long count, final ByteBuffer throughBuffer, final StreamSinkChannel target) throws IOException {
283         return Conduits.transfer(this, count, throughBuffer, target);
284     }
285
286     public int read(final ByteBuffer dst) throws IOException {
287         int res;
288         try {
289             res = socketChannel.read(dst);
290         } catch (ClosedChannelException e) {
291             return -1;
292         }
293         if (res != -1) checkReadTimeout(res > 0);
294         else terminateReads();
295         return res;
296     }
297
298     public long read(final ByteBuffer[] dsts, final int offset, final int length) throws IOException {
299         if (length == 1) {
300             return read(dsts[offset]);
301         }
302         long res;
303         try {
304             res = socketChannel.read(dsts, offset, length);
305         } catch (ClosedChannelException e) {
306             return -1L;
307         }
308         if (res != -1L) checkReadTimeout(res > 0L);
309         else terminateReads();
310         return res;
311     }
312
313     public void terminateReads() throws IOException {
314         if (connection.readClosed()) try {
315             if (getSelectionKey().isValid()) {
316                 suspend(SelectionKey.OP_READ);
317             }
318             if (socketChannel.isOpen()) try {
319                 socketChannel.socket().shutdownInput();
320             } catch (SocketException ignored) {
321                 // IBM incorrectly throws this exception on ENOTCONN; it's probably less harmful just to swallow it
322             }
323         } catch (ClosedChannelException ignored) {
324         } finally {
325             readTerminated();
326         }
327     }
328
329     void readTerminated() {
330         final ReadReadyHandler readReadyHandler = this.readReadyHandler;
331         if (readReadyHandler != nulltry {
332             readReadyHandler.terminated();
333         } catch (Throwable ignored) {}
334     }
335
336     public boolean isReadShutdown() {
337         return connection.isReadShutdown();
338     }
339
340     public void resumeReads() {
341         resume(SelectionKey.OP_READ);
342     }
343
344     public void suspendReads() {
345         suspend(SelectionKey.OP_READ);
346     }
347
348     public void wakeupReads() {
349         wakeup(SelectionKey.OP_READ);
350     }
351
352     public boolean isReadResumed() {
353         return isResumed(SelectionKey.OP_READ);
354     }
355
356     public void awaitReadable() throws IOException {
357         Xnio.checkBlockingAllowed();
358         SelectorUtils.await((NioXnio)getWorker().getXnio(), socketChannel, SelectionKey.OP_READ);
359     }
360
361     public void awaitReadable(final long time, final TimeUnit timeUnit) throws IOException {
362         Xnio.checkBlockingAllowed();
363         SelectorUtils.await((NioXnio)getWorker().getXnio(), socketChannel, SelectionKey.OP_READ, time, timeUnit);
364     }
365
366     public XnioIoThread getReadThread() {
367         return getWorkerThread();
368     }
369
370     public void setReadReadyHandler(final ReadReadyHandler handler) {
371         this.readReadyHandler = handler;
372     }
373
374     SocketChannel getSocketChannel() {
375         return socketChannel;
376     }
377 }
378