1 /*
2  * Copyright 2012 The Netty Project
3  *
4  * The Netty Project licenses this file to you under the Apache License,
5  * version 2.0 (the "License"); you may not use this file except in compliance
6  * with the License. You may obtain a copy of the License at:
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13  * License for the specific language governing permissions and limitations
14  * under the License.
15  */

16 package io.netty.handler.timeout;
17
18 import io.netty.bootstrap.ServerBootstrap;
19 import io.netty.channel.Channel;
20 import io.netty.channel.Channel.Unsafe;
21 import io.netty.channel.ChannelDuplexHandler;
22 import io.netty.channel.ChannelFuture;
23 import io.netty.channel.ChannelFutureListener;
24 import io.netty.channel.ChannelHandlerContext;
25 import io.netty.channel.ChannelInitializer;
26 import io.netty.channel.ChannelOutboundBuffer;
27 import io.netty.channel.ChannelPromise;
28 import io.netty.util.internal.ObjectUtil;
29
30 import java.util.concurrent.ScheduledFuture;
31 import java.util.concurrent.TimeUnit;
32
33 /**
34  * Triggers an {@link IdleStateEvent} when a {@link Channel} has not performed
35  * read, write, or both operation for a while.
36  *
37  * <h3>Supported idle states</h3>
38  * <table border="1">
39  * <tr>
40  * <th>Property</th><th>Meaning</th>
41  * </tr>
42  * <tr>
43  * <td>{@code readerIdleTime}</td>
44  * <td>an {@link IdleStateEvent} whose state is {@link IdleState#READER_IDLE}
45  *     will be triggered when no read was performed for the specified period of
46  *     time.  Specify {@code 0} to disable.</td>
47  * </tr>
48  * <tr>
49  * <td>{@code writerIdleTime}</td>
50  * <td>an {@link IdleStateEvent} whose state is {@link IdleState#WRITER_IDLE}
51  *     will be triggered when no write was performed for the specified period of
52  *     time.  Specify {@code 0} to disable.</td>
53  * </tr>
54  * <tr>
55  * <td>{@code allIdleTime}</td>
56  * <td>an {@link IdleStateEvent} whose state is {@link IdleState#ALL_IDLE}
57  *     will be triggered when neither read nor write was performed for the
58  *     specified period of time.  Specify {@code 0} to disable.</td>
59  * </tr>
60  * </table>
61  *
62  * <pre>
63  * // An example that sends a ping message when there is no outbound traffic
64  * // for 30 seconds.  The connection is closed when there is no inbound traffic
65  * // for 60 seconds.
66  *
67  * public class MyChannelInitializer extends {@link ChannelInitializer}&lt;{@link Channel}&gt; {
68  *     {@code @Override}
69  *     public void initChannel({@link Channel} channel) {
70  *         channel.pipeline().addLast("idleStateHandler"new {@link IdleStateHandler}(60, 30, 0));
71  *         channel.pipeline().addLast("myHandler"new MyHandler());
72  *     }
73  * }
74  *
75  * // Handler should handle the {@link IdleStateEvent} triggered by {@link IdleStateHandler}.
76  * public class MyHandler extends {@link ChannelDuplexHandler} {
77  *     {@code @Override}
78  *     public void userEventTriggered({@link ChannelHandlerContext} ctx, {@link Object} evt) throws {@link Exception} {
79  *         if (evt instanceof {@link IdleStateEvent}) {
80  *             {@link IdleStateEvent} e = ({@link IdleStateEvent}) evt;
81  *             if (e.state() == {@link IdleState}.READER_IDLE) {
82  *                 ctx.close();
83  *             } else if (e.state() == {@link IdleState}.WRITER_IDLE) {
84  *                 ctx.writeAndFlush(new PingMessage());
85  *             }
86  *         }
87  *     }
88  * }
89  *
90  * {@link ServerBootstrap} bootstrap = ...;
91  * ...
92  * bootstrap.childHandler(new MyChannelInitializer());
93  * ...
94  * </pre>
95  *
96  * @see ReadTimeoutHandler
97  * @see WriteTimeoutHandler
98  */

99 public class IdleStateHandler extends ChannelDuplexHandler {
100     private static final long MIN_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
101
102     // Not create a new ChannelFutureListener per write operation to reduce GC pressure.
103     private final ChannelFutureListener writeListener = new ChannelFutureListener() {
104         @Override
105         public void operationComplete(ChannelFuture future) throws Exception {
106             lastWriteTime = ticksInNanos();
107             firstWriterIdleEvent = firstAllIdleEvent = true;
108         }
109     };
110
111     private final boolean observeOutput;
112     private final long readerIdleTimeNanos;
113     private final long writerIdleTimeNanos;
114     private final long allIdleTimeNanos;
115
116     private ScheduledFuture<?> readerIdleTimeout;
117     private long lastReadTime;
118     private boolean firstReaderIdleEvent = true;
119
120     private ScheduledFuture<?> writerIdleTimeout;
121     private long lastWriteTime;
122     private boolean firstWriterIdleEvent = true;
123
124     private ScheduledFuture<?> allIdleTimeout;
125     private boolean firstAllIdleEvent = true;
126
127     private byte state; // 0 - none, 1 - initialized, 2 - destroyed
128     private boolean reading;
129
130     private long lastChangeCheckTimeStamp;
131     private int lastMessageHashCode;
132     private long lastPendingWriteBytes;
133     private long lastFlushProgress;
134
135     /**
136      * Creates a new instance firing {@link IdleStateEvent}s.
137      *
138      * @param readerIdleTimeSeconds
139      *        an {@link IdleStateEvent} whose state is {@link IdleState#READER_IDLE}
140      *        will be triggered when no read was performed for the specified
141      *        period of time.  Specify {@code 0} to disable.
142      * @param writerIdleTimeSeconds
143      *        an {@link IdleStateEvent} whose state is {@link IdleState#WRITER_IDLE}
144      *        will be triggered when no write was performed for the specified
145      *        period of time.  Specify {@code 0} to disable.
146      * @param allIdleTimeSeconds
147      *        an {@link IdleStateEvent} whose state is {@link IdleState#ALL_IDLE}
148      *        will be triggered when neither read nor write was performed for
149      *        the specified period of time.  Specify {@code 0} to disable.
150      */

151     public IdleStateHandler(
152             int readerIdleTimeSeconds,
153             int writerIdleTimeSeconds,
154             int allIdleTimeSeconds) {
155
156         this(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds,
157              TimeUnit.SECONDS);
158     }
159
160     /**
161      * @see #IdleStateHandler(booleanlonglonglong, TimeUnit)
162      */

163     public IdleStateHandler(
164             long readerIdleTime, long writerIdleTime, long allIdleTime,
165             TimeUnit unit) {
166         this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
167     }
168
169     /**
170      * Creates a new instance firing {@link IdleStateEvent}s.
171      *
172      * @param observeOutput
173      *        whether or not the consumption of {@code bytes} should be taken into
174      *        consideration when assessing write idleness. The default is {@code false}.
175      * @param readerIdleTime
176      *        an {@link IdleStateEvent} whose state is {@link IdleState#READER_IDLE}
177      *        will be triggered when no read was performed for the specified
178      *        period of time.  Specify {@code 0} to disable.
179      * @param writerIdleTime
180      *        an {@link IdleStateEvent} whose state is {@link IdleState#WRITER_IDLE}
181      *        will be triggered when no write was performed for the specified
182      *        period of time.  Specify {@code 0} to disable.
183      * @param allIdleTime
184      *        an {@link IdleStateEvent} whose state is {@link IdleState#ALL_IDLE}
185      *        will be triggered when neither read nor write was performed for
186      *        the specified period of time.  Specify {@code 0} to disable.
187      * @param unit
188      *        the {@link TimeUnit} of {@code readerIdleTime},
189      *        {@code writeIdleTime}, and {@code allIdleTime}
190      */

191     public IdleStateHandler(boolean observeOutput,
192             long readerIdleTime, long writerIdleTime, long allIdleTime,
193             TimeUnit unit) {
194         ObjectUtil.checkNotNull(unit, "unit");
195
196         this.observeOutput = observeOutput;
197
198         if (readerIdleTime <= 0) {
199             readerIdleTimeNanos = 0;
200         } else {
201             readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS);
202         }
203         if (writerIdleTime <= 0) {
204             writerIdleTimeNanos = 0;
205         } else {
206             writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS);
207         }
208         if (allIdleTime <= 0) {
209             allIdleTimeNanos = 0;
210         } else {
211             allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS);
212         }
213     }
214
215     /**
216      * Return the readerIdleTime that was given when instance this class in milliseconds.
217      *
218      */

219     public long getReaderIdleTimeInMillis() {
220         return TimeUnit.NANOSECONDS.toMillis(readerIdleTimeNanos);
221     }
222
223     /**
224      * Return the writerIdleTime that was given when instance this class in milliseconds.
225      *
226      */

227     public long getWriterIdleTimeInMillis() {
228         return TimeUnit.NANOSECONDS.toMillis(writerIdleTimeNanos);
229     }
230
231     /**
232      * Return the allIdleTime that was given when instance this class in milliseconds.
233      *
234      */

235     public long getAllIdleTimeInMillis() {
236         return TimeUnit.NANOSECONDS.toMillis(allIdleTimeNanos);
237     }
238
239     @Override
240     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
241         if (ctx.channel().isActive() && ctx.channel().isRegistered()) {
242             // channelActive() event has been fired already, which means this.channelActive() will
243             // not be invoked. We have to initialize here instead.
244             initialize(ctx);
245         } else {
246             // channelActive() event has not been fired yet.  this.channelActive() will be invoked
247             // and initialization will occur there.
248         }
249     }
250
251     @Override
252     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
253         destroy();
254     }
255
256     @Override
257     public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
258         // Initialize early if channel is active already.
259         if (ctx.channel().isActive()) {
260             initialize(ctx);
261         }
262         super.channelRegistered(ctx);
263     }
264
265     @Override
266     public void channelActive(ChannelHandlerContext ctx) throws Exception {
267         // This method will be invoked only if this handler was added
268         // before channelActive() event is fired.  If a user adds this handler
269         // after the channelActive() event, initialize() will be called by beforeAdd().
270         initialize(ctx);
271         super.channelActive(ctx);
272     }
273
274     @Override
275     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
276         destroy();
277         super.channelInactive(ctx);
278     }
279
280     @Override
281     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
282         if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
283             reading = true;
284             firstReaderIdleEvent = firstAllIdleEvent = true;
285         }
286         ctx.fireChannelRead(msg);
287     }
288
289     @Override
290     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
291         if ((readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) && reading) {
292             lastReadTime = ticksInNanos();
293             reading = false;
294         }
295         ctx.fireChannelReadComplete();
296     }
297
298     @Override
299     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
300         // Allow writing with void promise if handler is only configured for read timeout events.
301         if (writerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
302             ctx.write(msg, promise.unvoid()).addListener(writeListener);
303         } else {
304             ctx.write(msg, promise);
305         }
306     }
307
308     private void initialize(ChannelHandlerContext ctx) {
309         // Avoid the case where destroy() is called before scheduling timeouts.
310         // See: https://github.com/netty/netty/issues/143
311         switch (state) {
312         case 1:
313         case 2:
314             return;
315         }
316
317         state = 1;
318         initOutputChanged(ctx);
319
320         lastReadTime = lastWriteTime = ticksInNanos();
321         if (readerIdleTimeNanos > 0) {
322             readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
323                     readerIdleTimeNanos, TimeUnit.NANOSECONDS);
324         }
325         if (writerIdleTimeNanos > 0) {
326             writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
327                     writerIdleTimeNanos, TimeUnit.NANOSECONDS);
328         }
329         if (allIdleTimeNanos > 0) {
330             allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
331                     allIdleTimeNanos, TimeUnit.NANOSECONDS);
332         }
333     }
334
335     /**
336      * This method is visible for testing!
337      */

338     long ticksInNanos() {
339         return System.nanoTime();
340     }
341
342     /**
343      * This method is visible for testing!
344      */

345     ScheduledFuture<?> schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) {
346         return ctx.executor().schedule(task, delay, unit);
347     }
348
349     private void destroy() {
350         state = 2;
351
352         if (readerIdleTimeout != null) {
353             readerIdleTimeout.cancel(false);
354             readerIdleTimeout = null;
355         }
356         if (writerIdleTimeout != null) {
357             writerIdleTimeout.cancel(false);
358             writerIdleTimeout = null;
359         }
360         if (allIdleTimeout != null) {
361             allIdleTimeout.cancel(false);
362             allIdleTimeout = null;
363         }
364     }
365
366     /**
367      * Is called when an {@link IdleStateEvent} should be fired. This implementation calls
368      * {@link ChannelHandlerContext#fireUserEventTriggered(Object)}.
369      */

370     protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
371         ctx.fireUserEventTriggered(evt);
372     }
373
374     /**
375      * Returns a {@link IdleStateEvent}.
376      */

377     protected IdleStateEvent newIdleStateEvent(IdleState state, boolean first) {
378         switch (state) {
379             case ALL_IDLE:
380                 return first ? IdleStateEvent.FIRST_ALL_IDLE_STATE_EVENT : IdleStateEvent.ALL_IDLE_STATE_EVENT;
381             case READER_IDLE:
382                 return first ? IdleStateEvent.FIRST_READER_IDLE_STATE_EVENT : IdleStateEvent.READER_IDLE_STATE_EVENT;
383             case WRITER_IDLE:
384                 return first ? IdleStateEvent.FIRST_WRITER_IDLE_STATE_EVENT : IdleStateEvent.WRITER_IDLE_STATE_EVENT;
385             default:
386                 throw new IllegalArgumentException("Unhandled: state=" + state + ", first=" + first);
387         }
388     }
389
390     /**
391      * @see #hasOutputChanged(ChannelHandlerContext, boolean)
392      */

393     private void initOutputChanged(ChannelHandlerContext ctx) {
394         if (observeOutput) {
395             Channel channel = ctx.channel();
396             Unsafe unsafe = channel.unsafe();
397             ChannelOutboundBuffer buf = unsafe.outboundBuffer();
398
399             if (buf != null) {
400                 lastMessageHashCode = System.identityHashCode(buf.current());
401                 lastPendingWriteBytes = buf.totalPendingWriteBytes();
402                 lastFlushProgress = buf.currentProgress();
403             }
404         }
405     }
406
407     /**
408      * Returns {@code trueif and only if the {@link IdleStateHandler} was constructed
409      * with {@link #observeOutput} enabled and there has been an observed change in the
410      * {@link ChannelOutboundBuffer} between two consecutive calls of this method.
411      *
412      * https://github.com/netty/netty/issues/6150
413      */

414     private boolean hasOutputChanged(ChannelHandlerContext ctx, boolean first) {
415         if (observeOutput) {
416
417             // We can take this shortcut if the ChannelPromises that got passed into write()
418             // appear to complete. It indicates "change" on message level and we simply assume
419             // that there's change happening on byte level. If the user doesn't observe channel
420             // writability events then they'll eventually OOME and there's clearly a different
421             // problem and idleness is least of their concerns.
422             if (lastChangeCheckTimeStamp != lastWriteTime) {
423                 lastChangeCheckTimeStamp = lastWriteTime;
424
425                 // But this applies only if it's the non-first call.
426                 if (!first) {
427                     return true;
428                 }
429             }
430
431             Channel channel = ctx.channel();
432             Unsafe unsafe = channel.unsafe();
433             ChannelOutboundBuffer buf = unsafe.outboundBuffer();
434
435             if (buf != null) {
436                 int messageHashCode = System.identityHashCode(buf.current());
437                 long pendingWriteBytes = buf.totalPendingWriteBytes();
438
439                 if (messageHashCode != lastMessageHashCode || pendingWriteBytes != lastPendingWriteBytes) {
440                     lastMessageHashCode = messageHashCode;
441                     lastPendingWriteBytes = pendingWriteBytes;
442
443                     if (!first) {
444                         return true;
445                     }
446                 }
447
448                 long flushProgress = buf.currentProgress();
449                 if (flushProgress != lastFlushProgress) {
450                     lastFlushProgress = flushProgress;
451
452                     if (!first) {
453                         return true;
454                     }
455                 }
456             }
457         }
458
459         return false;
460     }
461
462     private abstract static class AbstractIdleTask implements Runnable {
463
464         private final ChannelHandlerContext ctx;
465
466         AbstractIdleTask(ChannelHandlerContext ctx) {
467             this.ctx = ctx;
468         }
469
470         @Override
471         public void run() {
472             if (!ctx.channel().isOpen()) {
473                 return;
474             }
475
476             run(ctx);
477         }
478
479         protected abstract void run(ChannelHandlerContext ctx);
480     }
481
482     private final class ReaderIdleTimeoutTask extends AbstractIdleTask {
483
484         ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
485             super(ctx);
486         }
487
488         @Override
489         protected void run(ChannelHandlerContext ctx) {
490             long nextDelay = readerIdleTimeNanos;
491             if (!reading) {
492                 nextDelay -= ticksInNanos() - lastReadTime;
493             }
494
495             if (nextDelay <= 0) {
496                 // Reader is idle - set a new timeout and notify the callback.
497                 readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
498
499                 boolean first = firstReaderIdleEvent;
500                 firstReaderIdleEvent = false;
501
502                 try {
503                     IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
504                     channelIdle(ctx, event);
505                 } catch (Throwable t) {
506                     ctx.fireExceptionCaught(t);
507                 }
508             } else {
509                 // Read occurred before the timeout - set a new timeout with shorter delay.
510                 readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
511             }
512         }
513     }
514
515     private final class WriterIdleTimeoutTask extends AbstractIdleTask {
516
517         WriterIdleTimeoutTask(ChannelHandlerContext ctx) {
518             super(ctx);
519         }
520
521         @Override
522         protected void run(ChannelHandlerContext ctx) {
523
524             long lastWriteTime = IdleStateHandler.this.lastWriteTime;
525             long nextDelay = writerIdleTimeNanos - (ticksInNanos() - lastWriteTime);
526             if (nextDelay <= 0) {
527                 // Writer is idle - set a new timeout and notify the callback.
528                 writerIdleTimeout = schedule(ctx, this, writerIdleTimeNanos, TimeUnit.NANOSECONDS);
529
530                 boolean first = firstWriterIdleEvent;
531                 firstWriterIdleEvent = false;
532
533                 try {
534                     if (hasOutputChanged(ctx, first)) {
535                         return;
536                     }
537
538                     IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, first);
539                     channelIdle(ctx, event);
540                 } catch (Throwable t) {
541                     ctx.fireExceptionCaught(t);
542                 }
543             } else {
544                 // Write occurred before the timeout - set a new timeout with shorter delay.
545                 writerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
546             }
547         }
548     }
549
550     private final class AllIdleTimeoutTask extends AbstractIdleTask {
551
552         AllIdleTimeoutTask(ChannelHandlerContext ctx) {
553             super(ctx);
554         }
555
556         @Override
557         protected void run(ChannelHandlerContext ctx) {
558
559             long nextDelay = allIdleTimeNanos;
560             if (!reading) {
561                 nextDelay -= ticksInNanos() - Math.max(lastReadTime, lastWriteTime);
562             }
563             if (nextDelay <= 0) {
564                 // Both reader and writer are idle - set a new timeout and
565                 // notify the callback.
566                 allIdleTimeout = schedule(ctx, this, allIdleTimeNanos, TimeUnit.NANOSECONDS);
567
568                 boolean first = firstAllIdleEvent;
569                 firstAllIdleEvent = false;
570
571                 try {
572                     if (hasOutputChanged(ctx, first)) {
573                         return;
574                     }
575
576                     IdleStateEvent event = newIdleStateEvent(IdleState.ALL_IDLE, first);
577                     channelIdle(ctx, event);
578                 } catch (Throwable t) {
579                     ctx.fireExceptionCaught(t);
580                 }
581             } else {
582                 // Either read or write occurred before the timeout - set a new
583                 // timeout with shorter delay.
584                 allIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
585             }
586         }
587     }
588 }
589