1 /*
2  * Copyright 2012 The Netty Project
3  *
4  * The Netty Project licenses this file to you under the Apache License,
5  * version 2.0 (the "License"); you may not use this file except in compliance
6  * with the License. You may obtain a copy of the License at:
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13  * License for the specific language governing permissions and limitations
14  * under the License.
15  */

16 package io.netty.util.concurrent;
17
18 import java.util.Collections;
19 import java.util.Iterator;
20 import java.util.LinkedHashSet;
21 import java.util.Set;
22 import java.util.concurrent.Executor;
23 import java.util.concurrent.ThreadFactory;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.atomic.AtomicInteger;
26
27 /**
28  * Abstract base class for {@link EventExecutorGroup} implementations that handles their tasks with multiple threads at
29  * the same time.
30  */

31 public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
32
33     private final EventExecutor[] children;
34     private final Set<EventExecutor> readonlyChildren;
35     private final AtomicInteger terminatedChildren = new AtomicInteger();
36     private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
37     private final EventExecutorChooserFactory.EventExecutorChooser chooser;
38
39     /**
40      * Create a new instance.
41      *
42      * @param nThreads          the number of threads that will be used by this instance.
43      * @param threadFactory     the ThreadFactory to use, or {@code nullif the default should be used.
44      * @param args              arguments which will passed to each {@link #newChild(Executor, Object...)} call
45      */

46     protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
47         this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);
48     }
49
50     /**
51      * Create a new instance.
52      *
53      * @param nThreads          the number of threads that will be used by this instance.
54      * @param executor          the Executor to use, or {@code nullif the default should be used.
55      * @param args              arguments which will passed to each {@link #newChild(Executor, Object...)} call
56      */

57     protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
58         this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
59     }
60
61     /**
62      * Create a new instance.
63      *
64      * @param nThreads          the number of threads that will be used by this instance.
65      * @param executor          the Executor to use, or {@code nullif the default should be used.
66      * @param chooserFactory    the {@link EventExecutorChooserFactory} to use.
67      * @param args              arguments which will passed to each {@link #newChild(Executor, Object...)} call
68      */

69     protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
70                                             EventExecutorChooserFactory chooserFactory, Object... args) {
71         if (nThreads <= 0) {
72             throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
73         }
74
75         if (executor == null) {
76             executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
77         }
78
79         children = new EventExecutor[nThreads];
80
81         for (int i = 0; i < nThreads; i ++) {
82             boolean success = false;
83             try {
84                 children[i] = newChild(executor, args);
85                 success = true;
86             } catch (Exception e) {
87                 // TODO: Think about if this is a good exception type
88                 throw new IllegalStateException("failed to create a child event loop", e);
89             } finally {
90                 if (!success) {
91                     for (int j = 0; j < i; j ++) {
92                         children[j].shutdownGracefully();
93                     }
94
95                     for (int j = 0; j < i; j ++) {
96                         EventExecutor e = children[j];
97                         try {
98                             while (!e.isTerminated()) {
99                                 e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
100                             }
101                         } catch (InterruptedException interrupted) {
102                             // Let the caller handle the interruption.
103                             Thread.currentThread().interrupt();
104                             break;
105                         }
106                     }
107                 }
108             }
109         }
110
111         chooser = chooserFactory.newChooser(children);
112
113         final FutureListener<Object> terminationListener = new FutureListener<Object>() {
114             @Override
115             public void operationComplete(Future<Object> future) throws Exception {
116                 if (terminatedChildren.incrementAndGet() == children.length) {
117                     terminationFuture.setSuccess(null);
118                 }
119             }
120         };
121
122         for (EventExecutor e: children) {
123             e.terminationFuture().addListener(terminationListener);
124         }
125
126         Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
127         Collections.addAll(childrenSet, children);
128         readonlyChildren = Collections.unmodifiableSet(childrenSet);
129     }
130
131     protected ThreadFactory newDefaultThreadFactory() {
132         return new DefaultThreadFactory(getClass());
133     }
134
135     @Override
136     public EventExecutor next() {
137         return chooser.next();
138     }
139
140     @Override
141     public Iterator<EventExecutor> iterator() {
142         return readonlyChildren.iterator();
143     }
144
145     /**
146      * Return the number of {@link EventExecutor} this implementation uses. This number is the maps
147      * 1:1 to the threads it use.
148      */

149     public final int executorCount() {
150         return children.length;
151     }
152
153     /**
154      * Create a new EventExecutor which will later then accessible via the {@link #next()}  method. This method will be
155      * called for each thread that will serve this {@link MultithreadEventExecutorGroup}.
156      *
157      */

158     protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;
159
160     @Override
161     public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
162         for (EventExecutor l: children) {
163             l.shutdownGracefully(quietPeriod, timeout, unit);
164         }
165         return terminationFuture();
166     }
167
168     @Override
169     public Future<?> terminationFuture() {
170         return terminationFuture;
171     }
172
173     @Override
174     @Deprecated
175     public void shutdown() {
176         for (EventExecutor l: children) {
177             l.shutdown();
178         }
179     }
180
181     @Override
182     public boolean isShuttingDown() {
183         for (EventExecutor l: children) {
184             if (!l.isShuttingDown()) {
185                 return false;
186             }
187         }
188         return true;
189     }
190
191     @Override
192     public boolean isShutdown() {
193         for (EventExecutor l: children) {
194             if (!l.isShutdown()) {
195                 return false;
196             }
197         }
198         return true;
199     }
200
201     @Override
202     public boolean isTerminated() {
203         for (EventExecutor l: children) {
204             if (!l.isTerminated()) {
205                 return false;
206             }
207         }
208         return true;
209     }
210
211     @Override
212     public boolean awaitTermination(long timeout, TimeUnit unit)
213             throws InterruptedException {
214         long deadline = System.nanoTime() + unit.toNanos(timeout);
215         loop: for (EventExecutor l: children) {
216             for (;;) {
217                 long timeLeft = deadline - System.nanoTime();
218                 if (timeLeft <= 0) {
219                     break loop;
220                 }
221                 if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
222                     break;
223                 }
224             }
225         }
226         return isTerminated();
227     }
228 }
229