1 /*
2  * Copyright 2012 The Netty Project
3  *
4  * The Netty Project licenses this file to you under the Apache License,
5  * version 2.0 (the "License"); you may not use this file except in compliance
6  * with the License. You may obtain a copy of the License at:
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13  * License for the specific language governing permissions and limitations
14  * under the License.
15  */

16 package io.netty.channel;
17
18 import io.netty.buffer.ByteBufAllocator;
19 import io.netty.util.internal.ObjectUtil;
20
21 import java.util.IdentityHashMap;
22 import java.util.Map;
23 import java.util.Map.Entry;
24 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
25 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
26
27 import static io.netty.channel.ChannelOption.ALLOCATOR;
28 import static io.netty.channel.ChannelOption.AUTO_CLOSE;
29 import static io.netty.channel.ChannelOption.AUTO_READ;
30 import static io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS;
31 import static io.netty.channel.ChannelOption.MAX_MESSAGES_PER_READ;
32 import static io.netty.channel.ChannelOption.MESSAGE_SIZE_ESTIMATOR;
33 import static io.netty.channel.ChannelOption.RCVBUF_ALLOCATOR;
34 import static io.netty.channel.ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP;
35 import static io.netty.channel.ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK;
36 import static io.netty.channel.ChannelOption.WRITE_BUFFER_LOW_WATER_MARK;
37 import static io.netty.channel.ChannelOption.WRITE_BUFFER_WATER_MARK;
38 import static io.netty.channel.ChannelOption.WRITE_SPIN_COUNT;
39 import static io.netty.util.internal.ObjectUtil.checkNotNull;
40 import static io.netty.util.internal.ObjectUtil.checkPositive;
41 import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
42
43 /**
44  * The default {@link ChannelConfig} implementation.
45  */

46 public class DefaultChannelConfig implements ChannelConfig {
47     private static final MessageSizeEstimator DEFAULT_MSG_SIZE_ESTIMATOR = DefaultMessageSizeEstimator.DEFAULT;
48
49     private static final int DEFAULT_CONNECT_TIMEOUT = 30000;
50
51     private static final AtomicIntegerFieldUpdater<DefaultChannelConfig> AUTOREAD_UPDATER =
52             AtomicIntegerFieldUpdater.newUpdater(DefaultChannelConfig.class"autoRead");
53     private static final AtomicReferenceFieldUpdater<DefaultChannelConfig, WriteBufferWaterMark> WATERMARK_UPDATER =
54             AtomicReferenceFieldUpdater.newUpdater(
55                     DefaultChannelConfig.class, WriteBufferWaterMark.class"writeBufferWaterMark");
56
57     protected final Channel channel;
58
59     private volatile ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;
60     private volatile RecvByteBufAllocator rcvBufAllocator;
61     private volatile MessageSizeEstimator msgSizeEstimator = DEFAULT_MSG_SIZE_ESTIMATOR;
62
63     private volatile int connectTimeoutMillis = DEFAULT_CONNECT_TIMEOUT;
64     private volatile int writeSpinCount = 16;
65     @SuppressWarnings("FieldMayBeFinal")
66     private volatile int autoRead = 1;
67     private volatile boolean autoClose = true;
68     private volatile WriteBufferWaterMark writeBufferWaterMark = WriteBufferWaterMark.DEFAULT;
69     private volatile boolean pinEventExecutor = true;
70
71     public DefaultChannelConfig(Channel channel) {
72         this(channel, new AdaptiveRecvByteBufAllocator());
73     }
74
75     protected DefaultChannelConfig(Channel channel, RecvByteBufAllocator allocator) {
76         setRecvByteBufAllocator(allocator, channel.metadata());
77         this.channel = channel;
78     }
79
80     @Override
81     @SuppressWarnings("deprecation")
82     public Map<ChannelOption<?>, Object> getOptions() {
83         return getOptions(
84                 null,
85                 CONNECT_TIMEOUT_MILLIS, MAX_MESSAGES_PER_READ, WRITE_SPIN_COUNT,
86                 ALLOCATOR, AUTO_READ, AUTO_CLOSE, RCVBUF_ALLOCATOR, WRITE_BUFFER_HIGH_WATER_MARK,
87                 WRITE_BUFFER_LOW_WATER_MARK, WRITE_BUFFER_WATER_MARK, MESSAGE_SIZE_ESTIMATOR,
88                 SINGLE_EVENTEXECUTOR_PER_GROUP);
89     }
90
91     protected Map<ChannelOption<?>, Object> getOptions(
92             Map<ChannelOption<?>, Object> result, ChannelOption<?>... options) {
93         if (result == null) {
94             result = new IdentityHashMap<ChannelOption<?>, Object>();
95         }
96         for (ChannelOption<?> o: options) {
97             result.put(o, getOption(o));
98         }
99         return result;
100     }
101
102     @SuppressWarnings("unchecked")
103     @Override
104     public boolean setOptions(Map<ChannelOption<?>, ?> options) {
105         ObjectUtil.checkNotNull(options, "options");
106
107         boolean setAllOptions = true;
108         for (Entry<ChannelOption<?>, ?> e: options.entrySet()) {
109             if (!setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
110                 setAllOptions = false;
111             }
112         }
113
114         return setAllOptions;
115     }
116
117     @Override
118     @SuppressWarnings({ "unchecked""deprecation" })
119     public <T> T getOption(ChannelOption<T> option) {
120         ObjectUtil.checkNotNull(option, "option");
121
122         if (option == CONNECT_TIMEOUT_MILLIS) {
123             return (T) Integer.valueOf(getConnectTimeoutMillis());
124         }
125         if (option == MAX_MESSAGES_PER_READ) {
126             return (T) Integer.valueOf(getMaxMessagesPerRead());
127         }
128         if (option == WRITE_SPIN_COUNT) {
129             return (T) Integer.valueOf(getWriteSpinCount());
130         }
131         if (option == ALLOCATOR) {
132             return (T) getAllocator();
133         }
134         if (option == RCVBUF_ALLOCATOR) {
135             return (T) getRecvByteBufAllocator();
136         }
137         if (option == AUTO_READ) {
138             return (T) Boolean.valueOf(isAutoRead());
139         }
140         if (option == AUTO_CLOSE) {
141             return (T) Boolean.valueOf(isAutoClose());
142         }
143         if (option == WRITE_BUFFER_HIGH_WATER_MARK) {
144             return (T) Integer.valueOf(getWriteBufferHighWaterMark());
145         }
146         if (option == WRITE_BUFFER_LOW_WATER_MARK) {
147             return (T) Integer.valueOf(getWriteBufferLowWaterMark());
148         }
149         if (option == WRITE_BUFFER_WATER_MARK) {
150             return (T) getWriteBufferWaterMark();
151         }
152         if (option == MESSAGE_SIZE_ESTIMATOR) {
153             return (T) getMessageSizeEstimator();
154         }
155         if (option == SINGLE_EVENTEXECUTOR_PER_GROUP) {
156             return (T) Boolean.valueOf(getPinEventExecutorPerGroup());
157         }
158         return null;
159     }
160
161     @Override
162     @SuppressWarnings("deprecation")
163     public <T> boolean setOption(ChannelOption<T> option, T value) {
164         validate(option, value);
165
166         if (option == CONNECT_TIMEOUT_MILLIS) {
167             setConnectTimeoutMillis((Integer) value);
168         } else if (option == MAX_MESSAGES_PER_READ) {
169             setMaxMessagesPerRead((Integer) value);
170         } else if (option == WRITE_SPIN_COUNT) {
171             setWriteSpinCount((Integer) value);
172         } else if (option == ALLOCATOR) {
173             setAllocator((ByteBufAllocator) value);
174         } else if (option == RCVBUF_ALLOCATOR) {
175             setRecvByteBufAllocator((RecvByteBufAllocator) value);
176         } else if (option == AUTO_READ) {
177             setAutoRead((Boolean) value);
178         } else if (option == AUTO_CLOSE) {
179             setAutoClose((Boolean) value);
180         } else if (option == WRITE_BUFFER_HIGH_WATER_MARK) {
181             setWriteBufferHighWaterMark((Integer) value);
182         } else if (option == WRITE_BUFFER_LOW_WATER_MARK) {
183             setWriteBufferLowWaterMark((Integer) value);
184         } else if (option == WRITE_BUFFER_WATER_MARK) {
185             setWriteBufferWaterMark((WriteBufferWaterMark) value);
186         } else if (option == MESSAGE_SIZE_ESTIMATOR) {
187             setMessageSizeEstimator((MessageSizeEstimator) value);
188         } else if (option == SINGLE_EVENTEXECUTOR_PER_GROUP) {
189             setPinEventExecutorPerGroup((Boolean) value);
190         } else {
191             return false;
192         }
193
194         return true;
195     }
196
197     protected <T> void validate(ChannelOption<T> option, T value) {
198         ObjectUtil.checkNotNull(option, "option").validate(value);
199     }
200
201     @Override
202     public int getConnectTimeoutMillis() {
203         return connectTimeoutMillis;
204     }
205
206     @Override
207     public ChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) {
208         checkPositiveOrZero(connectTimeoutMillis, "connectTimeoutMillis");
209         this.connectTimeoutMillis = connectTimeoutMillis;
210         return this;
211     }
212
213     /**
214      * {@inheritDoc}
215      * <p>
216      * @throws IllegalStateException if {@link #getRecvByteBufAllocator()} does not return an object of type
217      * {@link MaxMessagesRecvByteBufAllocator}.
218      */

219     @Override
220     @Deprecated
221     public int getMaxMessagesPerRead() {
222         try {
223             MaxMessagesRecvByteBufAllocator allocator = getRecvByteBufAllocator();
224             return allocator.maxMessagesPerRead();
225         } catch (ClassCastException e) {
226             throw new IllegalStateException("getRecvByteBufAllocator() must return an object of type " +
227                     "MaxMessagesRecvByteBufAllocator", e);
228         }
229     }
230
231     /**
232      * {@inheritDoc}
233      * <p>
234      * @throws IllegalStateException if {@link #getRecvByteBufAllocator()} does not return an object of type
235      * {@link MaxMessagesRecvByteBufAllocator}.
236      */

237     @Override
238     @Deprecated
239     public ChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
240         try {
241             MaxMessagesRecvByteBufAllocator allocator = getRecvByteBufAllocator();
242             allocator.maxMessagesPerRead(maxMessagesPerRead);
243             return this;
244         } catch (ClassCastException e) {
245             throw new IllegalStateException("getRecvByteBufAllocator() must return an object of type " +
246                     "MaxMessagesRecvByteBufAllocator", e);
247         }
248     }
249
250     @Override
251     public int getWriteSpinCount() {
252         return writeSpinCount;
253     }
254
255     @Override
256     public ChannelConfig setWriteSpinCount(int writeSpinCount) {
257         checkPositive(writeSpinCount, "writeSpinCount");
258         // Integer.MAX_VALUE is used as a special value in the channel implementations to indicate the channel cannot
259         // accept any more data, and results in the writeOp being set on the selector (or execute a runnable which tries
260         // to flush later because the writeSpinCount quantum has been exhausted). This strategy prevents additional
261         // conditional logic in the channel implementations, and shouldn't be noticeable in practice.
262         if (writeSpinCount == Integer.MAX_VALUE) {
263             --writeSpinCount;
264         }
265         this.writeSpinCount = writeSpinCount;
266         return this;
267     }
268
269     @Override
270     public ByteBufAllocator getAllocator() {
271         return allocator;
272     }
273
274     @Override
275     public ChannelConfig setAllocator(ByteBufAllocator allocator) {
276         this.allocator = ObjectUtil.checkNotNull(allocator, "allocator");
277         return this;
278     }
279
280     @SuppressWarnings("unchecked")
281     @Override
282     public <T extends RecvByteBufAllocator> T getRecvByteBufAllocator() {
283         return (T) rcvBufAllocator;
284     }
285
286     @Override
287     public ChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
288         rcvBufAllocator = checkNotNull(allocator, "allocator");
289         return this;
290     }
291
292     /**
293      * Set the {@link RecvByteBufAllocator} which is used for the channel to allocate receive buffers.
294      * @param allocator the allocator to set.
295      * @param metadata Used to set the {@link ChannelMetadata#defaultMaxMessagesPerRead()} if {@code allocator}
296      * is of type {@link MaxMessagesRecvByteBufAllocator}.
297      */

298     private void setRecvByteBufAllocator(RecvByteBufAllocator allocator, ChannelMetadata metadata) {
299         if (allocator instanceof MaxMessagesRecvByteBufAllocator) {
300             ((MaxMessagesRecvByteBufAllocator) allocator).maxMessagesPerRead(metadata.defaultMaxMessagesPerRead());
301         } else if (allocator == null) {
302             throw new NullPointerException("allocator");
303         }
304         setRecvByteBufAllocator(allocator);
305     }
306
307     @Override
308     public boolean isAutoRead() {
309         return autoRead == 1;
310     }
311
312     @Override
313     public ChannelConfig setAutoRead(boolean autoRead) {
314         boolean oldAutoRead = AUTOREAD_UPDATER.getAndSet(this, autoRead ? 1 : 0) == 1;
315         if (autoRead && !oldAutoRead) {
316             channel.read();
317         } else if (!autoRead && oldAutoRead) {
318             autoReadCleared();
319         }
320         return this;
321     }
322
323     /**
324      * Is called once {@link #setAutoRead(boolean)} is called with {@code false} and {@link #isAutoRead()} was
325      * {@code true} before.
326      */

327     protected void autoReadCleared() { }
328
329     @Override
330     public boolean isAutoClose() {
331         return autoClose;
332     }
333
334     @Override
335     public ChannelConfig setAutoClose(boolean autoClose) {
336         this.autoClose = autoClose;
337         return this;
338     }
339
340     @Override
341     public int getWriteBufferHighWaterMark() {
342         return writeBufferWaterMark.high();
343     }
344
345     @Override
346     public ChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
347         checkPositiveOrZero(writeBufferHighWaterMark, "writeBufferHighWaterMark");
348         for (;;) {
349             WriteBufferWaterMark waterMark = writeBufferWaterMark;
350             if (writeBufferHighWaterMark < waterMark.low()) {
351                 throw new IllegalArgumentException(
352                         "writeBufferHighWaterMark cannot be less than " +
353                                 "writeBufferLowWaterMark (" + waterMark.low() + "): " +
354                                 writeBufferHighWaterMark);
355             }
356             if (WATERMARK_UPDATER.compareAndSet(this, waterMark,
357                     new WriteBufferWaterMark(waterMark.low(), writeBufferHighWaterMark, false))) {
358                 return this;
359             }
360         }
361     }
362
363     @Override
364     public int getWriteBufferLowWaterMark() {
365         return writeBufferWaterMark.low();
366     }
367
368     @Override
369     public ChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
370         checkPositiveOrZero(writeBufferLowWaterMark, "writeBufferLowWaterMark");
371         for (;;) {
372             WriteBufferWaterMark waterMark = writeBufferWaterMark;
373             if (writeBufferLowWaterMark > waterMark.high()) {
374                 throw new IllegalArgumentException(
375                         "writeBufferLowWaterMark cannot be greater than " +
376                                 "writeBufferHighWaterMark (" + waterMark.high() + "): " +
377                                 writeBufferLowWaterMark);
378             }
379             if (WATERMARK_UPDATER.compareAndSet(this, waterMark,
380                     new WriteBufferWaterMark(writeBufferLowWaterMark, waterMark.high(), false))) {
381                 return this;
382             }
383         }
384     }
385
386     @Override
387     public ChannelConfig setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark) {
388         this.writeBufferWaterMark = checkNotNull(writeBufferWaterMark, "writeBufferWaterMark");
389         return this;
390     }
391
392     @Override
393     public WriteBufferWaterMark getWriteBufferWaterMark() {
394         return writeBufferWaterMark;
395     }
396
397     @Override
398     public MessageSizeEstimator getMessageSizeEstimator() {
399         return msgSizeEstimator;
400     }
401
402     @Override
403     public ChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
404         this.msgSizeEstimator = ObjectUtil.checkNotNull(estimator, "estimator");
405         return this;
406     }
407
408     private ChannelConfig setPinEventExecutorPerGroup(boolean pinEventExecutor) {
409         this.pinEventExecutor = pinEventExecutor;
410         return this;
411     }
412
413     private boolean getPinEventExecutorPerGroup() {
414         return pinEventExecutor;
415     }
416
417 }
418