1 /*
2 * Copyright 2012 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package io.netty.handler.timeout;
17
18 import io.netty.bootstrap.ServerBootstrap;
19 import io.netty.channel.Channel;
20 import io.netty.channel.Channel.Unsafe;
21 import io.netty.channel.ChannelDuplexHandler;
22 import io.netty.channel.ChannelFuture;
23 import io.netty.channel.ChannelFutureListener;
24 import io.netty.channel.ChannelHandlerContext;
25 import io.netty.channel.ChannelInitializer;
26 import io.netty.channel.ChannelOutboundBuffer;
27 import io.netty.channel.ChannelPromise;
28 import io.netty.util.internal.ObjectUtil;
29
30 import java.util.concurrent.ScheduledFuture;
31 import java.util.concurrent.TimeUnit;
32
33 /**
34 * Triggers an {@link IdleStateEvent} when a {@link Channel} has not performed
35 * read, write, or both operation for a while.
36 *
37 * <h3>Supported idle states</h3>
38 * <table border="1">
39 * <tr>
40 * <th>Property</th><th>Meaning</th>
41 * </tr>
42 * <tr>
43 * <td>{@code readerIdleTime}</td>
44 * <td>an {@link IdleStateEvent} whose state is {@link IdleState#READER_IDLE}
45 * will be triggered when no read was performed for the specified period of
46 * time. Specify {@code 0} to disable.</td>
47 * </tr>
48 * <tr>
49 * <td>{@code writerIdleTime}</td>
50 * <td>an {@link IdleStateEvent} whose state is {@link IdleState#WRITER_IDLE}
51 * will be triggered when no write was performed for the specified period of
52 * time. Specify {@code 0} to disable.</td>
53 * </tr>
54 * <tr>
55 * <td>{@code allIdleTime}</td>
56 * <td>an {@link IdleStateEvent} whose state is {@link IdleState#ALL_IDLE}
57 * will be triggered when neither read nor write was performed for the
58 * specified period of time. Specify {@code 0} to disable.</td>
59 * </tr>
60 * </table>
61 *
62 * <pre>
63 * // An example that sends a ping message when there is no outbound traffic
64 * // for 30 seconds. The connection is closed when there is no inbound traffic
65 * // for 60 seconds.
66 *
67 * public class MyChannelInitializer extends {@link ChannelInitializer}<{@link Channel}> {
68 * {@code @Override}
69 * public void initChannel({@link Channel} channel) {
70 * channel.pipeline().addLast("idleStateHandler", new {@link IdleStateHandler}(60, 30, 0));
71 * channel.pipeline().addLast("myHandler", new MyHandler());
72 * }
73 * }
74 *
75 * // Handler should handle the {@link IdleStateEvent} triggered by {@link IdleStateHandler}.
76 * public class MyHandler extends {@link ChannelDuplexHandler} {
77 * {@code @Override}
78 * public void userEventTriggered({@link ChannelHandlerContext} ctx, {@link Object} evt) throws {@link Exception} {
79 * if (evt instanceof {@link IdleStateEvent}) {
80 * {@link IdleStateEvent} e = ({@link IdleStateEvent}) evt;
81 * if (e.state() == {@link IdleState}.READER_IDLE) {
82 * ctx.close();
83 * } else if (e.state() == {@link IdleState}.WRITER_IDLE) {
84 * ctx.writeAndFlush(new PingMessage());
85 * }
86 * }
87 * }
88 * }
89 *
90 * {@link ServerBootstrap} bootstrap = ...;
91 * ...
92 * bootstrap.childHandler(new MyChannelInitializer());
93 * ...
94 * </pre>
95 *
96 * @see ReadTimeoutHandler
97 * @see WriteTimeoutHandler
98 */
99 public class IdleStateHandler extends ChannelDuplexHandler {
100 private static final long MIN_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
101
102 // Not create a new ChannelFutureListener per write operation to reduce GC pressure.
103 private final ChannelFutureListener writeListener = new ChannelFutureListener() {
104 @Override
105 public void operationComplete(ChannelFuture future) throws Exception {
106 lastWriteTime = ticksInNanos();
107 firstWriterIdleEvent = firstAllIdleEvent = true;
108 }
109 };
110
111 private final boolean observeOutput;
112 private final long readerIdleTimeNanos;
113 private final long writerIdleTimeNanos;
114 private final long allIdleTimeNanos;
115
116 private ScheduledFuture<?> readerIdleTimeout;
117 private long lastReadTime;
118 private boolean firstReaderIdleEvent = true;
119
120 private ScheduledFuture<?> writerIdleTimeout;
121 private long lastWriteTime;
122 private boolean firstWriterIdleEvent = true;
123
124 private ScheduledFuture<?> allIdleTimeout;
125 private boolean firstAllIdleEvent = true;
126
127 private byte state; // 0 - none, 1 - initialized, 2 - destroyed
128 private boolean reading;
129
130 private long lastChangeCheckTimeStamp;
131 private int lastMessageHashCode;
132 private long lastPendingWriteBytes;
133 private long lastFlushProgress;
134
135 /**
136 * Creates a new instance firing {@link IdleStateEvent}s.
137 *
138 * @param readerIdleTimeSeconds
139 * an {@link IdleStateEvent} whose state is {@link IdleState#READER_IDLE}
140 * will be triggered when no read was performed for the specified
141 * period of time. Specify {@code 0} to disable.
142 * @param writerIdleTimeSeconds
143 * an {@link IdleStateEvent} whose state is {@link IdleState#WRITER_IDLE}
144 * will be triggered when no write was performed for the specified
145 * period of time. Specify {@code 0} to disable.
146 * @param allIdleTimeSeconds
147 * an {@link IdleStateEvent} whose state is {@link IdleState#ALL_IDLE}
148 * will be triggered when neither read nor write was performed for
149 * the specified period of time. Specify {@code 0} to disable.
150 */
151 public IdleStateHandler(
152 int readerIdleTimeSeconds,
153 int writerIdleTimeSeconds,
154 int allIdleTimeSeconds) {
155
156 this(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds,
157 TimeUnit.SECONDS);
158 }
159
160 /**
161 * @see #IdleStateHandler(boolean, long, long, long, TimeUnit)
162 */
163 public IdleStateHandler(
164 long readerIdleTime, long writerIdleTime, long allIdleTime,
165 TimeUnit unit) {
166 this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
167 }
168
169 /**
170 * Creates a new instance firing {@link IdleStateEvent}s.
171 *
172 * @param observeOutput
173 * whether or not the consumption of {@code bytes} should be taken into
174 * consideration when assessing write idleness. The default is {@code false}.
175 * @param readerIdleTime
176 * an {@link IdleStateEvent} whose state is {@link IdleState#READER_IDLE}
177 * will be triggered when no read was performed for the specified
178 * period of time. Specify {@code 0} to disable.
179 * @param writerIdleTime
180 * an {@link IdleStateEvent} whose state is {@link IdleState#WRITER_IDLE}
181 * will be triggered when no write was performed for the specified
182 * period of time. Specify {@code 0} to disable.
183 * @param allIdleTime
184 * an {@link IdleStateEvent} whose state is {@link IdleState#ALL_IDLE}
185 * will be triggered when neither read nor write was performed for
186 * the specified period of time. Specify {@code 0} to disable.
187 * @param unit
188 * the {@link TimeUnit} of {@code readerIdleTime},
189 * {@code writeIdleTime}, and {@code allIdleTime}
190 */
191 public IdleStateHandler(boolean observeOutput,
192 long readerIdleTime, long writerIdleTime, long allIdleTime,
193 TimeUnit unit) {
194 ObjectUtil.checkNotNull(unit, "unit");
195
196 this.observeOutput = observeOutput;
197
198 if (readerIdleTime <= 0) {
199 readerIdleTimeNanos = 0;
200 } else {
201 readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS);
202 }
203 if (writerIdleTime <= 0) {
204 writerIdleTimeNanos = 0;
205 } else {
206 writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS);
207 }
208 if (allIdleTime <= 0) {
209 allIdleTimeNanos = 0;
210 } else {
211 allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS);
212 }
213 }
214
215 /**
216 * Return the readerIdleTime that was given when instance this class in milliseconds.
217 *
218 */
219 public long getReaderIdleTimeInMillis() {
220 return TimeUnit.NANOSECONDS.toMillis(readerIdleTimeNanos);
221 }
222
223 /**
224 * Return the writerIdleTime that was given when instance this class in milliseconds.
225 *
226 */
227 public long getWriterIdleTimeInMillis() {
228 return TimeUnit.NANOSECONDS.toMillis(writerIdleTimeNanos);
229 }
230
231 /**
232 * Return the allIdleTime that was given when instance this class in milliseconds.
233 *
234 */
235 public long getAllIdleTimeInMillis() {
236 return TimeUnit.NANOSECONDS.toMillis(allIdleTimeNanos);
237 }
238
239 @Override
240 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
241 if (ctx.channel().isActive() && ctx.channel().isRegistered()) {
242 // channelActive() event has been fired already, which means this.channelActive() will
243 // not be invoked. We have to initialize here instead.
244 initialize(ctx);
245 } else {
246 // channelActive() event has not been fired yet. this.channelActive() will be invoked
247 // and initialization will occur there.
248 }
249 }
250
251 @Override
252 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
253 destroy();
254 }
255
256 @Override
257 public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
258 // Initialize early if channel is active already.
259 if (ctx.channel().isActive()) {
260 initialize(ctx);
261 }
262 super.channelRegistered(ctx);
263 }
264
265 @Override
266 public void channelActive(ChannelHandlerContext ctx) throws Exception {
267 // This method will be invoked only if this handler was added
268 // before channelActive() event is fired. If a user adds this handler
269 // after the channelActive() event, initialize() will be called by beforeAdd().
270 initialize(ctx);
271 super.channelActive(ctx);
272 }
273
274 @Override
275 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
276 destroy();
277 super.channelInactive(ctx);
278 }
279
280 @Override
281 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
282 if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
283 reading = true;
284 firstReaderIdleEvent = firstAllIdleEvent = true;
285 }
286 ctx.fireChannelRead(msg);
287 }
288
289 @Override
290 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
291 if ((readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) && reading) {
292 lastReadTime = ticksInNanos();
293 reading = false;
294 }
295 ctx.fireChannelReadComplete();
296 }
297
298 @Override
299 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
300 // Allow writing with void promise if handler is only configured for read timeout events.
301 if (writerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
302 ctx.write(msg, promise.unvoid()).addListener(writeListener);
303 } else {
304 ctx.write(msg, promise);
305 }
306 }
307
308 private void initialize(ChannelHandlerContext ctx) {
309 // Avoid the case where destroy() is called before scheduling timeouts.
310 // See: https://github.com/netty/netty/issues/143
311 switch (state) {
312 case 1:
313 case 2:
314 return;
315 }
316
317 state = 1;
318 initOutputChanged(ctx);
319
320 lastReadTime = lastWriteTime = ticksInNanos();
321 if (readerIdleTimeNanos > 0) {
322 readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
323 readerIdleTimeNanos, TimeUnit.NANOSECONDS);
324 }
325 if (writerIdleTimeNanos > 0) {
326 writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
327 writerIdleTimeNanos, TimeUnit.NANOSECONDS);
328 }
329 if (allIdleTimeNanos > 0) {
330 allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
331 allIdleTimeNanos, TimeUnit.NANOSECONDS);
332 }
333 }
334
335 /**
336 * This method is visible for testing!
337 */
338 long ticksInNanos() {
339 return System.nanoTime();
340 }
341
342 /**
343 * This method is visible for testing!
344 */
345 ScheduledFuture<?> schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) {
346 return ctx.executor().schedule(task, delay, unit);
347 }
348
349 private void destroy() {
350 state = 2;
351
352 if (readerIdleTimeout != null) {
353 readerIdleTimeout.cancel(false);
354 readerIdleTimeout = null;
355 }
356 if (writerIdleTimeout != null) {
357 writerIdleTimeout.cancel(false);
358 writerIdleTimeout = null;
359 }
360 if (allIdleTimeout != null) {
361 allIdleTimeout.cancel(false);
362 allIdleTimeout = null;
363 }
364 }
365
366 /**
367 * Is called when an {@link IdleStateEvent} should be fired. This implementation calls
368 * {@link ChannelHandlerContext#fireUserEventTriggered(Object)}.
369 */
370 protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
371 ctx.fireUserEventTriggered(evt);
372 }
373
374 /**
375 * Returns a {@link IdleStateEvent}.
376 */
377 protected IdleStateEvent newIdleStateEvent(IdleState state, boolean first) {
378 switch (state) {
379 case ALL_IDLE:
380 return first ? IdleStateEvent.FIRST_ALL_IDLE_STATE_EVENT : IdleStateEvent.ALL_IDLE_STATE_EVENT;
381 case READER_IDLE:
382 return first ? IdleStateEvent.FIRST_READER_IDLE_STATE_EVENT : IdleStateEvent.READER_IDLE_STATE_EVENT;
383 case WRITER_IDLE:
384 return first ? IdleStateEvent.FIRST_WRITER_IDLE_STATE_EVENT : IdleStateEvent.WRITER_IDLE_STATE_EVENT;
385 default:
386 throw new IllegalArgumentException("Unhandled: state=" + state + ", first=" + first);
387 }
388 }
389
390 /**
391 * @see #hasOutputChanged(ChannelHandlerContext, boolean)
392 */
393 private void initOutputChanged(ChannelHandlerContext ctx) {
394 if (observeOutput) {
395 Channel channel = ctx.channel();
396 Unsafe unsafe = channel.unsafe();
397 ChannelOutboundBuffer buf = unsafe.outboundBuffer();
398
399 if (buf != null) {
400 lastMessageHashCode = System.identityHashCode(buf.current());
401 lastPendingWriteBytes = buf.totalPendingWriteBytes();
402 lastFlushProgress = buf.currentProgress();
403 }
404 }
405 }
406
407 /**
408 * Returns {@code true} if and only if the {@link IdleStateHandler} was constructed
409 * with {@link #observeOutput} enabled and there has been an observed change in the
410 * {@link ChannelOutboundBuffer} between two consecutive calls of this method.
411 *
412 * https://github.com/netty/netty/issues/6150
413 */
414 private boolean hasOutputChanged(ChannelHandlerContext ctx, boolean first) {
415 if (observeOutput) {
416
417 // We can take this shortcut if the ChannelPromises that got passed into write()
418 // appear to complete. It indicates "change" on message level and we simply assume
419 // that there's change happening on byte level. If the user doesn't observe channel
420 // writability events then they'll eventually OOME and there's clearly a different
421 // problem and idleness is least of their concerns.
422 if (lastChangeCheckTimeStamp != lastWriteTime) {
423 lastChangeCheckTimeStamp = lastWriteTime;
424
425 // But this applies only if it's the non-first call.
426 if (!first) {
427 return true;
428 }
429 }
430
431 Channel channel = ctx.channel();
432 Unsafe unsafe = channel.unsafe();
433 ChannelOutboundBuffer buf = unsafe.outboundBuffer();
434
435 if (buf != null) {
436 int messageHashCode = System.identityHashCode(buf.current());
437 long pendingWriteBytes = buf.totalPendingWriteBytes();
438
439 if (messageHashCode != lastMessageHashCode || pendingWriteBytes != lastPendingWriteBytes) {
440 lastMessageHashCode = messageHashCode;
441 lastPendingWriteBytes = pendingWriteBytes;
442
443 if (!first) {
444 return true;
445 }
446 }
447
448 long flushProgress = buf.currentProgress();
449 if (flushProgress != lastFlushProgress) {
450 lastFlushProgress = flushProgress;
451
452 if (!first) {
453 return true;
454 }
455 }
456 }
457 }
458
459 return false;
460 }
461
462 private abstract static class AbstractIdleTask implements Runnable {
463
464 private final ChannelHandlerContext ctx;
465
466 AbstractIdleTask(ChannelHandlerContext ctx) {
467 this.ctx = ctx;
468 }
469
470 @Override
471 public void run() {
472 if (!ctx.channel().isOpen()) {
473 return;
474 }
475
476 run(ctx);
477 }
478
479 protected abstract void run(ChannelHandlerContext ctx);
480 }
481
482 private final class ReaderIdleTimeoutTask extends AbstractIdleTask {
483
484 ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
485 super(ctx);
486 }
487
488 @Override
489 protected void run(ChannelHandlerContext ctx) {
490 long nextDelay = readerIdleTimeNanos;
491 if (!reading) {
492 nextDelay -= ticksInNanos() - lastReadTime;
493 }
494
495 if (nextDelay <= 0) {
496 // Reader is idle - set a new timeout and notify the callback.
497 readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
498
499 boolean first = firstReaderIdleEvent;
500 firstReaderIdleEvent = false;
501
502 try {
503 IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
504 channelIdle(ctx, event);
505 } catch (Throwable t) {
506 ctx.fireExceptionCaught(t);
507 }
508 } else {
509 // Read occurred before the timeout - set a new timeout with shorter delay.
510 readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
511 }
512 }
513 }
514
515 private final class WriterIdleTimeoutTask extends AbstractIdleTask {
516
517 WriterIdleTimeoutTask(ChannelHandlerContext ctx) {
518 super(ctx);
519 }
520
521 @Override
522 protected void run(ChannelHandlerContext ctx) {
523
524 long lastWriteTime = IdleStateHandler.this.lastWriteTime;
525 long nextDelay = writerIdleTimeNanos - (ticksInNanos() - lastWriteTime);
526 if (nextDelay <= 0) {
527 // Writer is idle - set a new timeout and notify the callback.
528 writerIdleTimeout = schedule(ctx, this, writerIdleTimeNanos, TimeUnit.NANOSECONDS);
529
530 boolean first = firstWriterIdleEvent;
531 firstWriterIdleEvent = false;
532
533 try {
534 if (hasOutputChanged(ctx, first)) {
535 return;
536 }
537
538 IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, first);
539 channelIdle(ctx, event);
540 } catch (Throwable t) {
541 ctx.fireExceptionCaught(t);
542 }
543 } else {
544 // Write occurred before the timeout - set a new timeout with shorter delay.
545 writerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
546 }
547 }
548 }
549
550 private final class AllIdleTimeoutTask extends AbstractIdleTask {
551
552 AllIdleTimeoutTask(ChannelHandlerContext ctx) {
553 super(ctx);
554 }
555
556 @Override
557 protected void run(ChannelHandlerContext ctx) {
558
559 long nextDelay = allIdleTimeNanos;
560 if (!reading) {
561 nextDelay -= ticksInNanos() - Math.max(lastReadTime, lastWriteTime);
562 }
563 if (nextDelay <= 0) {
564 // Both reader and writer are idle - set a new timeout and
565 // notify the callback.
566 allIdleTimeout = schedule(ctx, this, allIdleTimeNanos, TimeUnit.NANOSECONDS);
567
568 boolean first = firstAllIdleEvent;
569 firstAllIdleEvent = false;
570
571 try {
572 if (hasOutputChanged(ctx, first)) {
573 return;
574 }
575
576 IdleStateEvent event = newIdleStateEvent(IdleState.ALL_IDLE, first);
577 channelIdle(ctx, event);
578 } catch (Throwable t) {
579 ctx.fireExceptionCaught(t);
580 }
581 } else {
582 // Either read or write occurred before the timeout - set a new
583 // timeout with shorter delay.
584 allIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
585 }
586 }
587 }
588 }
589