1
16 package io.netty.util.concurrent;
17
18 import io.netty.util.internal.DefaultPriorityQueue;
19 import io.netty.util.internal.ObjectUtil;
20 import io.netty.util.internal.PriorityQueue;
21
22 import static io.netty.util.concurrent.ScheduledFutureTask.deadlineNanos;
23
24 import java.util.Comparator;
25 import java.util.Queue;
26 import java.util.concurrent.Callable;
27 import java.util.concurrent.TimeUnit;
28
29
32 public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
33 private static final Comparator<ScheduledFutureTask<?>> SCHEDULED_FUTURE_TASK_COMPARATOR =
34 new Comparator<ScheduledFutureTask<?>>() {
35 @Override
36 public int compare(ScheduledFutureTask<?> o1, ScheduledFutureTask<?> o2) {
37 return o1.compareTo(o2);
38 }
39 };
40
41 static final Runnable WAKEUP_TASK = new Runnable() {
42 @Override
43 public void run() { }
44 };
45
46 PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;
47
48 long nextTaskId;
49
50 protected AbstractScheduledEventExecutor() {
51 }
52
53 protected AbstractScheduledEventExecutor(EventExecutorGroup parent) {
54 super(parent);
55 }
56
57 protected static long nanoTime() {
58 return ScheduledFutureTask.nanoTime();
59 }
60
61
67 protected static long deadlineToDelayNanos(long deadlineNanos) {
68 return ScheduledFutureTask.deadlineToDelayNanos(deadlineNanos);
69 }
70
71
75 protected static long initialNanoTime() {
76 return ScheduledFutureTask.initialNanoTime();
77 }
78
79 PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue() {
80 if (scheduledTaskQueue == null) {
81 scheduledTaskQueue = new DefaultPriorityQueue<ScheduledFutureTask<?>>(
82 SCHEDULED_FUTURE_TASK_COMPARATOR,
83
84 11);
85 }
86 return scheduledTaskQueue;
87 }
88
89 private static boolean isNullOrEmpty(Queue<ScheduledFutureTask<?>> queue) {
90 return queue == null || queue.isEmpty();
91 }
92
93
98 protected void cancelScheduledTasks() {
99 assert inEventLoop();
100 PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
101 if (isNullOrEmpty(scheduledTaskQueue)) {
102 return;
103 }
104
105 final ScheduledFutureTask<?>[] scheduledTasks =
106 scheduledTaskQueue.toArray(new ScheduledFutureTask<?>[0]);
107
108 for (ScheduledFutureTask<?> task: scheduledTasks) {
109 task.cancelWithoutRemove(false);
110 }
111
112 scheduledTaskQueue.clearIgnoringIndexes();
113 }
114
115
118 protected final Runnable pollScheduledTask() {
119 return pollScheduledTask(nanoTime());
120 }
121
122
126 protected final Runnable pollScheduledTask(long nanoTime) {
127 assert inEventLoop();
128
129 ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
130 if (scheduledTask == null || scheduledTask.deadlineNanos() - nanoTime > 0) {
131 return null;
132 }
133 scheduledTaskQueue.remove();
134 scheduledTask.setConsumed();
135 return scheduledTask;
136 }
137
138
141 protected final long nextScheduledTaskNano() {
142 ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
143 return scheduledTask != null ? scheduledTask.delayNanos() : -1;
144 }
145
146
150 protected final long nextScheduledTaskDeadlineNanos() {
151 ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
152 return scheduledTask != null ? scheduledTask.deadlineNanos() : -1;
153 }
154
155 final ScheduledFutureTask<?> peekScheduledTask() {
156 Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
157 return scheduledTaskQueue != null ? scheduledTaskQueue.peek() : null;
158 }
159
160
163 protected final boolean hasScheduledTasks() {
164 ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
165 return scheduledTask != null && scheduledTask.deadlineNanos() <= nanoTime();
166 }
167
168 @Override
169 public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
170 ObjectUtil.checkNotNull(command, "command");
171 ObjectUtil.checkNotNull(unit, "unit");
172 if (delay < 0) {
173 delay = 0;
174 }
175 validateScheduled0(delay, unit);
176
177 return schedule(new ScheduledFutureTask<Void>(
178 this,
179 command,
180 deadlineNanos(unit.toNanos(delay))));
181 }
182
183 @Override
184 public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
185 ObjectUtil.checkNotNull(callable, "callable");
186 ObjectUtil.checkNotNull(unit, "unit");
187 if (delay < 0) {
188 delay = 0;
189 }
190 validateScheduled0(delay, unit);
191
192 return schedule(new ScheduledFutureTask<V>(this, callable, deadlineNanos(unit.toNanos(delay))));
193 }
194
195 @Override
196 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
197 ObjectUtil.checkNotNull(command, "command");
198 ObjectUtil.checkNotNull(unit, "unit");
199 if (initialDelay < 0) {
200 throw new IllegalArgumentException(
201 String.format("initialDelay: %d (expected: >= 0)", initialDelay));
202 }
203 if (period <= 0) {
204 throw new IllegalArgumentException(
205 String.format("period: %d (expected: > 0)", period));
206 }
207 validateScheduled0(initialDelay, unit);
208 validateScheduled0(period, unit);
209
210 return schedule(new ScheduledFutureTask<Void>(
211 this, command, deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period)));
212 }
213
214 @Override
215 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
216 ObjectUtil.checkNotNull(command, "command");
217 ObjectUtil.checkNotNull(unit, "unit");
218 if (initialDelay < 0) {
219 throw new IllegalArgumentException(
220 String.format("initialDelay: %d (expected: >= 0)", initialDelay));
221 }
222 if (delay <= 0) {
223 throw new IllegalArgumentException(
224 String.format("delay: %d (expected: > 0)", delay));
225 }
226
227 validateScheduled0(initialDelay, unit);
228 validateScheduled0(delay, unit);
229
230 return schedule(new ScheduledFutureTask<Void>(
231 this, command, deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay)));
232 }
233
234 @SuppressWarnings("deprecation")
235 private void validateScheduled0(long amount, TimeUnit unit) {
236 validateScheduled(amount, unit);
237 }
238
239
244 @Deprecated
245 protected void validateScheduled(long amount, TimeUnit unit) {
246
247 }
248
249 final void scheduleFromEventLoop(final ScheduledFutureTask<?> task) {
250
251 scheduledTaskQueue().add(task.setId(++nextTaskId));
252 }
253
254 private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
255 if (inEventLoop()) {
256 scheduleFromEventLoop(task);
257 } else {
258 final long deadlineNanos = task.deadlineNanos();
259
260 if (beforeScheduledTaskSubmitted(deadlineNanos)) {
261 execute(task);
262 } else {
263 lazyExecute(task);
264
265 if (afterScheduledTaskSubmitted(deadlineNanos)) {
266 execute(WAKEUP_TASK);
267 }
268 }
269 }
270
271 return task;
272 }
273
274 final void removeScheduled(final ScheduledFutureTask<?> task) {
275 assert task.isCancelled();
276 if (inEventLoop()) {
277 scheduledTaskQueue().removeTyped(task);
278 } else {
279
280 lazyExecute(task);
281 }
282 }
283
284
297 protected boolean beforeScheduledTaskSubmitted(long deadlineNanos) {
298 return true;
299 }
300
301
307 protected boolean afterScheduledTaskSubmitted(long deadlineNanos) {
308 return true;
309 }
310 }
311