1 /*
2  * Copyright 2012 The Netty Project
3  *
4  * The Netty Project licenses this file to you under the Apache License,
5  * version 2.0 (the "License"); you may not use this file except in compliance
6  * with the License. You may obtain a copy of the License at:
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13  * License for the specific language governing permissions and limitations
14  * under the License.
15  */

16 package io.netty.channel;
17
18 import io.netty.util.concurrent.RejectedExecutionHandler;
19 import io.netty.util.concurrent.RejectedExecutionHandlers;
20 import io.netty.util.concurrent.SingleThreadEventExecutor;
21 import io.netty.util.internal.ObjectUtil;
22 import io.netty.util.internal.SystemPropertyUtil;
23 import io.netty.util.internal.UnstableApi;
24
25 import java.util.Queue;
26 import java.util.concurrent.Executor;
27 import java.util.concurrent.ThreadFactory;
28
29 /**
30  * Abstract base class for {@link EventLoop}s that execute all its submitted tasks in a single thread.
31  *
32  */

33 public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
34
35     protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16,
36             SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE));
37
38     private final Queue<Runnable> tailTasks;
39
40     protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
41         this(parent, threadFactory, addTaskWakesUp, DEFAULT_MAX_PENDING_TASKS, RejectedExecutionHandlers.reject());
42     }
43
44     protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp) {
45         this(parent, executor, addTaskWakesUp, DEFAULT_MAX_PENDING_TASKS, RejectedExecutionHandlers.reject());
46     }
47
48     protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory,
49                                     boolean addTaskWakesUp, int maxPendingTasks,
50                                     RejectedExecutionHandler rejectedExecutionHandler) {
51         super(parent, threadFactory, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
52         tailTasks = newTaskQueue(maxPendingTasks);
53     }
54
55     protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
56                                     boolean addTaskWakesUp, int maxPendingTasks,
57                                     RejectedExecutionHandler rejectedExecutionHandler) {
58         super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
59         tailTasks = newTaskQueue(maxPendingTasks);
60     }
61
62     protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
63                                     boolean addTaskWakesUp, Queue<Runnable> taskQueue, Queue<Runnable> tailTaskQueue,
64                                     RejectedExecutionHandler rejectedExecutionHandler) {
65         super(parent, executor, addTaskWakesUp, taskQueue, rejectedExecutionHandler);
66         tailTasks = ObjectUtil.checkNotNull(tailTaskQueue, "tailTaskQueue");
67     }
68
69     @Override
70     public EventLoopGroup parent() {
71         return (EventLoopGroup) super.parent();
72     }
73
74     @Override
75     public EventLoop next() {
76         return (EventLoop) super.next();
77     }
78
79     @Override
80     public ChannelFuture register(Channel channel) {
81         return register(new DefaultChannelPromise(channel, this));
82     }
83
84     @Override
85     public ChannelFuture register(final ChannelPromise promise) {
86         ObjectUtil.checkNotNull(promise, "promise");
87         promise.channel().unsafe().register(this, promise);
88         return promise;
89     }
90
91     @Deprecated
92     @Override
93     public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
94         ObjectUtil.checkNotNull(promise, "promise");
95         ObjectUtil.checkNotNull(channel, "channel");
96         channel.unsafe().register(this, promise);
97         return promise;
98     }
99
100     /**
101      * Adds a task to be run once at the end of next (or current) {@code eventloop} iteration.
102      *
103      * @param task to be added.
104      */

105     @UnstableApi
106     public final void executeAfterEventLoopIteration(Runnable task) {
107         ObjectUtil.checkNotNull(task, "task");
108         if (isShutdown()) {
109             reject();
110         }
111
112         if (!tailTasks.offer(task)) {
113             reject(task);
114         }
115
116         if (!(task instanceof LazyRunnable) && wakesUpForTask(task)) {
117             wakeup(inEventLoop());
118         }
119     }
120
121     /**
122      * Removes a task that was added previously via {@link #executeAfterEventLoopIteration(Runnable)}.
123      *
124      * @param task to be removed.
125      *
126      * @return {@code trueif the task was removed as a result of this call.
127      */

128     @UnstableApi
129     final boolean removeAfterEventLoopIterationTask(Runnable task) {
130         return tailTasks.remove(ObjectUtil.checkNotNull(task, "task"));
131     }
132
133     @Override
134     protected void afterRunningAllTasks() {
135         runAllTasksFrom(tailTasks);
136     }
137
138     @Override
139     protected boolean hasTasks() {
140         return super.hasTasks() || !tailTasks.isEmpty();
141     }
142
143     @Override
144     public int pendingTasks() {
145         return super.pendingTasks() + tailTasks.size();
146     }
147
148     /**
149      * Returns the number of {@link Channel}s registered with this {@link EventLoop} or {@code -1}
150      * if operation is not supported. The returned value is not guaranteed to be exact accurate and
151      * should be viewed as a best effort.
152      */

153     @UnstableApi
154     public int registeredChannels() {
155         return -1;
156     }
157 }
158