1 /*
2  * Copyright 2012 The Netty Project
3  *
4  * The Netty Project licenses this file to you under the Apache License,
5  * version 2.0 (the "License"); you may not use this file except in compliance
6  * with the License. You may obtain a copy of the License at:
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13  * License for the specific language governing permissions and limitations
14  * under the License.
15  */

16
17 package io.netty.bootstrap;
18
19 import io.netty.channel.Channel;
20 import io.netty.channel.ChannelFuture;
21 import io.netty.channel.ChannelFutureListener;
22 import io.netty.channel.ChannelHandler;
23 import io.netty.channel.ChannelOption;
24 import io.netty.channel.ChannelPromise;
25 import io.netty.channel.DefaultChannelPromise;
26 import io.netty.channel.EventLoop;
27 import io.netty.channel.EventLoopGroup;
28 import io.netty.channel.ReflectiveChannelFactory;
29 import io.netty.util.AttributeKey;
30 import io.netty.util.concurrent.EventExecutor;
31 import io.netty.util.concurrent.GlobalEventExecutor;
32 import io.netty.util.internal.ObjectUtil;
33 import io.netty.util.internal.SocketUtils;
34 import io.netty.util.internal.StringUtil;
35 import io.netty.util.internal.logging.InternalLogger;
36
37 import java.net.InetAddress;
38 import java.net.InetSocketAddress;
39 import java.net.SocketAddress;
40 import java.util.Collections;
41 import java.util.HashMap;
42 import java.util.LinkedHashMap;
43 import java.util.Map;
44 import java.util.concurrent.ConcurrentHashMap;
45
46 /**
47  * {@link AbstractBootstrap} is a helper class that makes it easy to bootstrap a {@link Channel}. It support
48  * method-chaining to provide an easy way to configure the {@link AbstractBootstrap}.
49  *
50  * <p>When not used in a {@link ServerBootstrap} context, the {@link #bind()} methods are useful for connectionless
51  * transports such as datagram (UDP).</p>
52  */

53 public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
54     @SuppressWarnings("unchecked")
55     static final Map.Entry<ChannelOption<?>, Object>[] EMPTY_OPTION_ARRAY = new Map.Entry[0];
56     @SuppressWarnings("unchecked")
57     static final Map.Entry<AttributeKey<?>, Object>[] EMPTY_ATTRIBUTE_ARRAY = new Map.Entry[0];
58
59     volatile EventLoopGroup group;
60     @SuppressWarnings("deprecation")
61     private volatile ChannelFactory<? extends C> channelFactory;
62     private volatile SocketAddress localAddress;
63
64     // The order in which ChannelOptions are applied is important they may depend on each other for validation
65     // purposes.
66     private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();
67     private final Map<AttributeKey<?>, Object> attrs = new ConcurrentHashMap<AttributeKey<?>, Object>();
68     private volatile ChannelHandler handler;
69
70     AbstractBootstrap() {
71         // Disallow extending from a different package.
72     }
73
74     AbstractBootstrap(AbstractBootstrap<B, C> bootstrap) {
75         group = bootstrap.group;
76         channelFactory = bootstrap.channelFactory;
77         handler = bootstrap.handler;
78         localAddress = bootstrap.localAddress;
79         synchronized (bootstrap.options) {
80             options.putAll(bootstrap.options);
81         }
82         attrs.putAll(bootstrap.attrs);
83     }
84
85     /**
86      * The {@link EventLoopGroup} which is used to handle all the events for the to-be-created
87      * {@link Channel}
88      */

89     public B group(EventLoopGroup group) {
90         ObjectUtil.checkNotNull(group, "group");
91         if (this.group != null) {
92             throw new IllegalStateException("group set already");
93         }
94         this.group = group;
95         return self();
96     }
97
98     @SuppressWarnings("unchecked")
99     private B self() {
100         return (B) this;
101     }
102
103     /**
104      * The {@link Class} which is used to create {@link Channel} instances from.
105      * You either use this or {@link #channelFactory(io.netty.channel.ChannelFactory)} if your
106      * {@link Channel} implementation has no no-args constructor.
107      */

108     public B channel(Class<? extends C> channelClass) {
109         return channelFactory(new ReflectiveChannelFactory<C>(
110                 ObjectUtil.checkNotNull(channelClass, "channelClass")
111         ));
112     }
113
114     /**
115      * @deprecated Use {@link #channelFactory(io.netty.channel.ChannelFactory)} instead.
116      */

117     @Deprecated
118     public B channelFactory(ChannelFactory<? extends C> channelFactory) {
119         ObjectUtil.checkNotNull(channelFactory, "channelFactory");
120         if (this.channelFactory != null) {
121             throw new IllegalStateException("channelFactory set already");
122         }
123
124         this.channelFactory = channelFactory;
125         return self();
126     }
127
128     /**
129      * {@link io.netty.channel.ChannelFactory} which is used to create {@link Channel} instances from
130      * when calling {@link #bind()}. This method is usually only used if {@link #channel(Class)}
131      * is not working for you because of some more complex needs. If your {@link Channel} implementation
132      * has a no-args constructor, its highly recommend to just use {@link #channel(Class)} to
133      * simplify your code.
134      */

135     @SuppressWarnings({ "unchecked""deprecation" })
136     public B channelFactory(io.netty.channel.ChannelFactory<? extends C> channelFactory) {
137         return channelFactory((ChannelFactory<C>) channelFactory);
138     }
139
140     /**
141      * The {@link SocketAddress} which is used to bind the local "end" to.
142      */

143     public B localAddress(SocketAddress localAddress) {
144         this.localAddress = localAddress;
145         return self();
146     }
147
148     /**
149      * @see #localAddress(SocketAddress)
150      */

151     public B localAddress(int inetPort) {
152         return localAddress(new InetSocketAddress(inetPort));
153     }
154
155     /**
156      * @see #localAddress(SocketAddress)
157      */

158     public B localAddress(String inetHost, int inetPort) {
159         return localAddress(SocketUtils.socketAddress(inetHost, inetPort));
160     }
161
162     /**
163      * @see #localAddress(SocketAddress)
164      */

165     public B localAddress(InetAddress inetHost, int inetPort) {
166         return localAddress(new InetSocketAddress(inetHost, inetPort));
167     }
168
169     /**
170      * Allow to specify a {@link ChannelOption} which is used for the {@link Channel} instances once they got
171      * created. Use a value of {@code null} to remove a previous set {@link ChannelOption}.
172      */

173     public <T> B option(ChannelOption<T> option, T value) {
174         ObjectUtil.checkNotNull(option, "option");
175         synchronized (options) {
176             if (value == null) {
177                 options.remove(option);
178             } else {
179                 options.put(option, value);
180             }
181         }
182         return self();
183     }
184
185     /**
186      * Allow to specify an initial attribute of the newly created {@link Channel}.  If the {@code value} is
187      * {@code null}, the attribute of the specified {@code key} is removed.
188      */

189     public <T> B attr(AttributeKey<T> key, T value) {
190         ObjectUtil.checkNotNull(key, "key");
191         if (value == null) {
192             attrs.remove(key);
193         } else {
194             attrs.put(key, value);
195         }
196         return self();
197     }
198
199     /**
200      * Validate all the parameters. Sub-classes may override this, but should
201      * call the super method in that case.
202      */

203     public B validate() {
204         if (group == null) {
205             throw new IllegalStateException("group not set");
206         }
207         if (channelFactory == null) {
208             throw new IllegalStateException("channel or channelFactory not set");
209         }
210         return self();
211     }
212
213     /**
214      * Returns a deep clone of this bootstrap which has the identical configuration.  This method is useful when making
215      * multiple {@link Channel}s with similar settings.  Please note that this method does not clone the
216      * {@link EventLoopGroup} deeply but shallowly, making the group a shared resource.
217      */

218     @Override
219     @SuppressWarnings("CloneDoesntDeclareCloneNotSupportedException")
220     public abstract B clone();
221
222     /**
223      * Create a new {@link Channel} and register it with an {@link EventLoop}.
224      */

225     public ChannelFuture register() {
226         validate();
227         return initAndRegister();
228     }
229
230     /**
231      * Create a new {@link Channel} and bind it.
232      */

233     public ChannelFuture bind() {
234         validate();
235         SocketAddress localAddress = this.localAddress;
236         if (localAddress == null) {
237             throw new IllegalStateException("localAddress not set");
238         }
239         return doBind(localAddress);
240     }
241
242     /**
243      * Create a new {@link Channel} and bind it.
244      */

245     public ChannelFuture bind(int inetPort) {
246         return bind(new InetSocketAddress(inetPort));
247     }
248
249     /**
250      * Create a new {@link Channel} and bind it.
251      */

252     public ChannelFuture bind(String inetHost, int inetPort) {
253         return bind(SocketUtils.socketAddress(inetHost, inetPort));
254     }
255
256     /**
257      * Create a new {@link Channel} and bind it.
258      */

259     public ChannelFuture bind(InetAddress inetHost, int inetPort) {
260         return bind(new InetSocketAddress(inetHost, inetPort));
261     }
262
263     /**
264      * Create a new {@link Channel} and bind it.
265      */

266     public ChannelFuture bind(SocketAddress localAddress) {
267         validate();
268         return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
269     }
270
271     private ChannelFuture doBind(final SocketAddress localAddress) {
272         final ChannelFuture regFuture = initAndRegister();
273         final Channel channel = regFuture.channel();
274         if (regFuture.cause() != null) {
275             return regFuture;
276         }
277
278         if (regFuture.isDone()) {
279             // At this point we know that the registration was complete and successful.
280             ChannelPromise promise = channel.newPromise();
281             doBind0(regFuture, channel, localAddress, promise);
282             return promise;
283         } else {
284             // Registration future is almost always fulfilled already, but just in case it's not.
285             final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
286             regFuture.addListener(new ChannelFutureListener() {
287                 @Override
288                 public void operationComplete(ChannelFuture future) throws Exception {
289                     Throwable cause = future.cause();
290                     if (cause != null) {
291                         // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
292                         // IllegalStateException once we try to access the EventLoop of the Channel.
293                         promise.setFailure(cause);
294                     } else {
295                         // Registration was successful, so set the correct executor to use.
296                         // See https://github.com/netty/netty/issues/2586
297                         promise.registered();
298
299                         doBind0(regFuture, channel, localAddress, promise);
300                     }
301                 }
302             });
303             return promise;
304         }
305     }
306
307     final ChannelFuture initAndRegister() {
308         Channel channel = null;
309         try {
310             channel = channelFactory.newChannel();
311             init(channel);
312         } catch (Throwable t) {
313             if (channel != null) {
314                 // channel can be null if newChannel crashed (eg SocketException("too many open files"))
315                 channel.unsafe().closeForcibly();
316                 // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
317                 return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
318             }
319             // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
320             return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
321         }
322
323         ChannelFuture regFuture = config().group().register(channel);
324         if (regFuture.cause() != null) {
325             if (channel.isRegistered()) {
326                 channel.close();
327             } else {
328                 channel.unsafe().closeForcibly();
329             }
330         }
331
332         // If we are here and the promise is not failed, it's one of the following cases:
333         // 1) If we attempted registration from the event loop, the registration has been completed at this point.
334         //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
335         // 2) If we attempted registration from the other thread, the registration request has been successfully
336         //    added to the event loop's task queue for later execution.
337         //    i.e. It's safe to attempt bind() or connect() now:
338         //         because bind() or connect() will be executed *after* the scheduled registration task is executed
339         //         because register(), bind(), and connect() are all bound to the same thread.
340
341         return regFuture;
342     }
343
344     abstract void init(Channel channel) throws Exception;
345
346     private static void doBind0(
347             final ChannelFuture regFuture, final Channel channel,
348             final SocketAddress localAddress, final ChannelPromise promise) {
349
350         // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
351         // the pipeline in its channelRegistered() implementation.
352         channel.eventLoop().execute(new Runnable() {
353             @Override
354             public void run() {
355                 if (regFuture.isSuccess()) {
356                     channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
357                 } else {
358                     promise.setFailure(regFuture.cause());
359                 }
360             }
361         });
362     }
363
364     /**
365      * the {@link ChannelHandler} to use for serving the requests.
366      */

367     public B handler(ChannelHandler handler) {
368         this.handler = ObjectUtil.checkNotNull(handler, "handler");
369         return self();
370     }
371
372     /**
373      * Returns the configured {@link EventLoopGroup} or {@code nullif non is configured yet.
374      *
375      * @deprecated Use {@link #config()} instead.
376      */

377     @Deprecated
378     public final EventLoopGroup group() {
379         return group;
380     }
381
382     /**
383      * Returns the {@link AbstractBootstrapConfig} object that can be used to obtain the current config
384      * of the bootstrap.
385      */

386     public abstract AbstractBootstrapConfig<B, C> config();
387
388     final Map.Entry<ChannelOption<?>, Object>[] newOptionsArray() {
389         synchronized (options) {
390             return options.entrySet().toArray(EMPTY_OPTION_ARRAY);
391         }
392     }
393
394     final Map<ChannelOption<?>, Object> options0() {
395         return options;
396     }
397
398     final Map<AttributeKey<?>, Object> attrs0() {
399         return attrs;
400     }
401
402     final SocketAddress localAddress() {
403         return localAddress;
404     }
405
406     @SuppressWarnings("deprecation")
407     final ChannelFactory<? extends C> channelFactory() {
408         return channelFactory;
409     }
410
411     final ChannelHandler handler() {
412         return handler;
413     }
414
415     final Map<ChannelOption<?>, Object> options() {
416         synchronized (options) {
417             return copiedMap(options);
418         }
419     }
420
421     final Map<AttributeKey<?>, Object> attrs() {
422         return copiedMap(attrs);
423     }
424
425     static <K, V> Map<K, V> copiedMap(Map<K, V> map) {
426         if (map.isEmpty()) {
427             return Collections.emptyMap();
428         }
429         return Collections.unmodifiableMap(new HashMap<K, V>(map));
430     }
431
432     static void setAttributes(Channel channel, Map.Entry<AttributeKey<?>, Object>[] attrs) {
433         for (Map.Entry<AttributeKey<?>, Object> e: attrs) {
434             @SuppressWarnings("unchecked")
435             AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
436             channel.attr(key).set(e.getValue());
437         }
438     }
439
440     static void setChannelOptions(
441             Channel channel, Map.Entry<ChannelOption<?>, Object>[] options, InternalLogger logger) {
442         for (Map.Entry<ChannelOption<?>, Object> e: options) {
443             setChannelOption(channel, e.getKey(), e.getValue(), logger);
444         }
445     }
446
447     @SuppressWarnings("unchecked")
448     private static void setChannelOption(
449             Channel channel, ChannelOption<?> option, Object value, InternalLogger logger) {
450         try {
451             if (!channel.config().setOption((ChannelOption<Object>) option, value)) {
452                 logger.warn("Unknown channel option '{}' for channel '{}'", option, channel);
453             }
454         } catch (Throwable t) {
455             logger.warn(
456                     "Failed to set channel option '{}' with value '{}' for channel '{}'", option, value, channel, t);
457         }
458     }
459
460     @Override
461     public String toString() {
462         StringBuilder buf = new StringBuilder()
463             .append(StringUtil.simpleClassName(this))
464             .append('(').append(config()).append(')');
465         return buf.toString();
466     }
467
468     static final class PendingRegistrationPromise extends DefaultChannelPromise {
469
470         // Is set to the correct EventExecutor once the registration was successful. Otherwise it will
471         // stay null and so the GlobalEventExecutor.INSTANCE will be used for notifications.
472         private volatile boolean registered;
473
474         PendingRegistrationPromise(Channel channel) {
475             super(channel);
476         }
477
478         void registered() {
479             registered = true;
480         }
481
482         @Override
483         protected EventExecutor executor() {
484             if (registered) {
485                 // If the registration was a success executor is set.
486                 //
487                 // See https://github.com/netty/netty/issues/2586
488                 return super.executor();
489             }
490             // The registration failed so we can only use the GlobalEventExecutor as last resort to notify.
491             return GlobalEventExecutor.INSTANCE;
492         }
493     }
494 }
495