1 /*
2  * Copyright 2015 The Netty Project
3  *
4  * The Netty Project licenses this file to you under the Apache License,
5  * version 2.0 (the "License"); you may not use this file except in compliance
6  * with the License. You may obtain a copy of the License at:
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13  * License for the specific language governing permissions and limitations
14  * under the License.
15  */

16 package io.netty.util.concurrent;
17
18 import io.netty.util.internal.DefaultPriorityQueue;
19 import io.netty.util.internal.ObjectUtil;
20 import io.netty.util.internal.PriorityQueue;
21
22 import static io.netty.util.concurrent.ScheduledFutureTask.deadlineNanos;
23
24 import java.util.Comparator;
25 import java.util.Queue;
26 import java.util.concurrent.Callable;
27 import java.util.concurrent.TimeUnit;
28
29 /**
30  * Abstract base class for {@link EventExecutor}s that want to support scheduling.
31  */

32 public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
33     private static final Comparator<ScheduledFutureTask<?>> SCHEDULED_FUTURE_TASK_COMPARATOR =
34             new Comparator<ScheduledFutureTask<?>>() {
35                 @Override
36                 public int compare(ScheduledFutureTask<?> o1, ScheduledFutureTask<?> o2) {
37                     return o1.compareTo(o2);
38                 }
39             };
40
41    static final Runnable WAKEUP_TASK = new Runnable() {
42        @Override
43        public void run() { } // Do nothing
44     };
45
46     PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;
47
48     long nextTaskId;
49
50     protected AbstractScheduledEventExecutor() {
51     }
52
53     protected AbstractScheduledEventExecutor(EventExecutorGroup parent) {
54         super(parent);
55     }
56
57     protected static long nanoTime() {
58         return ScheduledFutureTask.nanoTime();
59     }
60
61     /**
62      * Given an arbitrary deadline {@code deadlineNanos}, calculate the number of nano seconds from now
63      * {@code deadlineNanos} would expire.
64      * @param deadlineNanos An arbitrary deadline in nano seconds.
65      * @return the number of nano seconds from now {@code deadlineNanos} would expire.
66      */

67     protected static long deadlineToDelayNanos(long deadlineNanos) {
68         return ScheduledFutureTask.deadlineToDelayNanos(deadlineNanos);
69     }
70
71     /**
72      * The initial value used for delay and computations based upon a monatomic time source.
73      * @return initial value used for delay and computations based upon a monatomic time source.
74      */

75     protected static long initialNanoTime() {
76         return ScheduledFutureTask.initialNanoTime();
77     }
78
79     PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue() {
80         if (scheduledTaskQueue == null) {
81             scheduledTaskQueue = new DefaultPriorityQueue<ScheduledFutureTask<?>>(
82                     SCHEDULED_FUTURE_TASK_COMPARATOR,
83                     // Use same initial capacity as java.util.PriorityQueue
84                     11);
85         }
86         return scheduledTaskQueue;
87     }
88
89     private static boolean isNullOrEmpty(Queue<ScheduledFutureTask<?>> queue) {
90         return queue == null || queue.isEmpty();
91     }
92
93     /**
94      * Cancel all scheduled tasks.
95      *
96      * This method MUST be called only when {@link #inEventLoop()} is {@code true}.
97      */

98     protected void cancelScheduledTasks() {
99         assert inEventLoop();
100         PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
101         if (isNullOrEmpty(scheduledTaskQueue)) {
102             return;
103         }
104
105         final ScheduledFutureTask<?>[] scheduledTasks =
106                 scheduledTaskQueue.toArray(new ScheduledFutureTask<?>[0]);
107
108         for (ScheduledFutureTask<?> task: scheduledTasks) {
109             task.cancelWithoutRemove(false);
110         }
111
112         scheduledTaskQueue.clearIgnoringIndexes();
113     }
114
115     /**
116      * @see #pollScheduledTask(long)
117      */

118     protected final Runnable pollScheduledTask() {
119         return pollScheduledTask(nanoTime());
120     }
121
122     /**
123      * Return the {@link Runnable} which is ready to be executed with the given {@code nanoTime}.
124      * You should use {@link #nanoTime()} to retrieve the correct {@code nanoTime}.
125      */

126     protected final Runnable pollScheduledTask(long nanoTime) {
127         assert inEventLoop();
128
129         ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
130         if (scheduledTask == null || scheduledTask.deadlineNanos() - nanoTime > 0) {
131             return null;
132         }
133         scheduledTaskQueue.remove();
134         scheduledTask.setConsumed();
135         return scheduledTask;
136     }
137
138     /**
139      * Return the nanoseconds until the next scheduled task is ready to be run or {@code -1} if no task is scheduled.
140      */

141     protected final long nextScheduledTaskNano() {
142         ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
143         return scheduledTask != null ? scheduledTask.delayNanos() : -1;
144     }
145
146     /**
147      * Return the deadline (in nanoseconds) when the next scheduled task is ready to be run or {@code -1}
148      * if no task is scheduled.
149      */

150     protected final long nextScheduledTaskDeadlineNanos() {
151         ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
152         return scheduledTask != null ? scheduledTask.deadlineNanos() : -1;
153     }
154
155     final ScheduledFutureTask<?> peekScheduledTask() {
156         Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
157         return scheduledTaskQueue != null ? scheduledTaskQueue.peek() : null;
158     }
159
160     /**
161      * Returns {@code trueif a scheduled task is ready for processing.
162      */

163     protected final boolean hasScheduledTasks() {
164         ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
165         return scheduledTask != null && scheduledTask.deadlineNanos() <= nanoTime();
166     }
167
168     @Override
169     public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
170         ObjectUtil.checkNotNull(command, "command");
171         ObjectUtil.checkNotNull(unit, "unit");
172         if (delay < 0) {
173             delay = 0;
174         }
175         validateScheduled0(delay, unit);
176
177         return schedule(new ScheduledFutureTask<Void>(
178                 this,
179                 command,
180                 deadlineNanos(unit.toNanos(delay))));
181     }
182
183     @Override
184     public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
185         ObjectUtil.checkNotNull(callable, "callable");
186         ObjectUtil.checkNotNull(unit, "unit");
187         if (delay < 0) {
188             delay = 0;
189         }
190         validateScheduled0(delay, unit);
191
192         return schedule(new ScheduledFutureTask<V>(this, callable, deadlineNanos(unit.toNanos(delay))));
193     }
194
195     @Override
196     public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
197         ObjectUtil.checkNotNull(command, "command");
198         ObjectUtil.checkNotNull(unit, "unit");
199         if (initialDelay < 0) {
200             throw new IllegalArgumentException(
201                     String.format("initialDelay: %d (expected: >= 0)", initialDelay));
202         }
203         if (period <= 0) {
204             throw new IllegalArgumentException(
205                     String.format("period: %d (expected: > 0)", period));
206         }
207         validateScheduled0(initialDelay, unit);
208         validateScheduled0(period, unit);
209
210         return schedule(new ScheduledFutureTask<Void>(
211                 this, command, deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period)));
212     }
213
214     @Override
215     public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
216         ObjectUtil.checkNotNull(command, "command");
217         ObjectUtil.checkNotNull(unit, "unit");
218         if (initialDelay < 0) {
219             throw new IllegalArgumentException(
220                     String.format("initialDelay: %d (expected: >= 0)", initialDelay));
221         }
222         if (delay <= 0) {
223             throw new IllegalArgumentException(
224                     String.format("delay: %d (expected: > 0)", delay));
225         }
226
227         validateScheduled0(initialDelay, unit);
228         validateScheduled0(delay, unit);
229
230         return schedule(new ScheduledFutureTask<Void>(
231                 this, command, deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay)));
232     }
233
234     @SuppressWarnings("deprecation")
235     private void validateScheduled0(long amount, TimeUnit unit) {
236         validateScheduled(amount, unit);
237     }
238
239     /**
240      * Sub-classes may override this to restrict the maximal amount of time someone can use to schedule a task.
241      *
242      * @deprecated will be removed in the future.
243      */

244     @Deprecated
245     protected void validateScheduled(long amount, TimeUnit unit) {
246         // NOOP
247     }
248
249     final void scheduleFromEventLoop(final ScheduledFutureTask<?> task) {
250         // nextTaskId a long and so there is no chance it will overflow back to 0
251         scheduledTaskQueue().add(task.setId(++nextTaskId));
252     }
253
254     private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
255         if (inEventLoop()) {
256             scheduleFromEventLoop(task);
257         } else {
258             final long deadlineNanos = task.deadlineNanos();
259             // task will add itself to scheduled task queue when run if not expired
260             if (beforeScheduledTaskSubmitted(deadlineNanos)) {
261                 execute(task);
262             } else {
263                 lazyExecute(task);
264                 // Second hook after scheduling to facilitate race-avoidance
265                 if (afterScheduledTaskSubmitted(deadlineNanos)) {
266                     execute(WAKEUP_TASK);
267                 }
268             }
269         }
270
271         return task;
272     }
273
274     final void removeScheduled(final ScheduledFutureTask<?> task) {
275         assert task.isCancelled();
276         if (inEventLoop()) {
277             scheduledTaskQueue().removeTyped(task);
278         } else {
279             // task will remove itself from scheduled task queue when it runs
280             lazyExecute(task);
281         }
282     }
283
284     /**
285      * Called from arbitrary non-{@link EventExecutor} threads prior to scheduled task submission.
286      * Returns {@code trueif the {@link EventExecutor} thread should be woken immediately to
287      * process the scheduled task (if not already awake).
288      * <p>
289      * If {@code false} is returned, {@link #afterScheduledTaskSubmitted(long)} will be called with
290      * the same value <i>after</i> the scheduled task is enqueued, providing another opportunity
291      * to wake the {@link EventExecutor} thread if required.
292      *
293      * @param deadlineNanos deadline of the to-be-scheduled task
294      *     relative to {@link AbstractScheduledEventExecutor#nanoTime()}
295      * @return {@code trueif the {@link EventExecutor} thread should be woken, {@code false} otherwise
296      */

297     protected boolean beforeScheduledTaskSubmitted(long deadlineNanos) {
298         return true;
299     }
300
301     /**
302      * See {@link #beforeScheduledTaskSubmitted(long)}. Called only after that method returns false.
303      *
304      * @param deadlineNanos relative to {@link AbstractScheduledEventExecutor#nanoTime()}
305      * @return  {@code trueif the {@link EventExecutor} thread should be woken, {@code false} otherwise
306      */

307     protected boolean afterScheduledTaskSubmitted(long deadlineNanos) {
308         return true;
309     }
310 }
311