1 /*
2  * Copyright 2012 The Netty Project
3  *
4  * The Netty Project licenses this file to you under the Apache License,
5  * version 2.0 (the "License"); you may not use this file except in compliance
6  * with the License. You may obtain a copy of the License at:
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13  * License for the specific language governing permissions and limitations
14  * under the License.
15  */

16 package io.netty.channel;
17
18 import java.util.ArrayList;
19 import java.util.List;
20
21 import static io.netty.util.internal.ObjectUtil.checkPositive;
22 import static java.lang.Math.max;
23 import static java.lang.Math.min;
24
25 /**
26  * The {@link RecvByteBufAllocator} that automatically increases and
27  * decreases the predicted buffer size on feed back.
28  * <p>
29  * It gradually increases the expected number of readable bytes if the previous
30  * read fully filled the allocated buffer.  It gradually decreases the expected
31  * number of readable bytes if the read operation was not able to fill a certain
32  * amount of the allocated buffer two times consecutively.  Otherwise, it keeps
33  * returning the same prediction.
34  */

35 public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {
36
37     static final int DEFAULT_MINIMUM = 64;
38     // Use an initial value that is bigger than the common MTU of 1500
39     static final int DEFAULT_INITIAL = 2048;
40     static final int DEFAULT_MAXIMUM = 65536;
41
42     private static final int INDEX_INCREMENT = 4;
43     private static final int INDEX_DECREMENT = 1;
44
45     private static final int[] SIZE_TABLE;
46
47     static {
48         List<Integer> sizeTable = new ArrayList<Integer>();
49         for (int i = 16; i < 512; i += 16) {
50             sizeTable.add(i);
51         }
52
53         for (int i = 512; i > 0; i <<= 1) {
54             sizeTable.add(i);
55         }
56
57         SIZE_TABLE = new int[sizeTable.size()];
58         for (int i = 0; i < SIZE_TABLE.length; i ++) {
59             SIZE_TABLE[i] = sizeTable.get(i);
60         }
61     }
62
63     /**
64      * @deprecated There is state for {@link #maxMessagesPerRead()} which is typically based upon channel type.
65      */

66     @Deprecated
67     public static final AdaptiveRecvByteBufAllocator DEFAULT = new AdaptiveRecvByteBufAllocator();
68
69     private static int getSizeTableIndex(final int size) {
70         for (int low = 0, high = SIZE_TABLE.length - 1;;) {
71             if (high < low) {
72                 return low;
73             }
74             if (high == low) {
75                 return high;
76             }
77
78             int mid = low + high >>> 1;
79             int a = SIZE_TABLE[mid];
80             int b = SIZE_TABLE[mid + 1];
81             if (size > b) {
82                 low = mid + 1;
83             } else if (size < a) {
84                 high = mid - 1;
85             } else if (size == a) {
86                 return mid;
87             } else {
88                 return mid + 1;
89             }
90         }
91     }
92
93     private final class HandleImpl extends MaxMessageHandle {
94         private final int minIndex;
95         private final int maxIndex;
96         private int index;
97         private int nextReceiveBufferSize;
98         private boolean decreaseNow;
99
100         HandleImpl(int minIndex, int maxIndex, int initial) {
101             this.minIndex = minIndex;
102             this.maxIndex = maxIndex;
103
104             index = getSizeTableIndex(initial);
105             nextReceiveBufferSize = SIZE_TABLE[index];
106         }
107
108         @Override
109         public void lastBytesRead(int bytes) {
110             // If we read as much as we asked for we should check if we need to ramp up the size of our next guess.
111             // This helps adjust more quickly when large amounts of data is pending and can avoid going back to
112             // the selector to check for more data. Going back to the selector can add significant latency for large
113             // data transfers.
114             if (bytes == attemptedBytesRead()) {
115                 record(bytes);
116             }
117             super.lastBytesRead(bytes);
118         }
119
120         @Override
121         public int guess() {
122             return nextReceiveBufferSize;
123         }
124
125         private void record(int actualReadBytes) {
126             if (actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT)]) {
127                 if (decreaseNow) {
128                     index = max(index - INDEX_DECREMENT, minIndex);
129                     nextReceiveBufferSize = SIZE_TABLE[index];
130                     decreaseNow = false;
131                 } else {
132                     decreaseNow = true;
133                 }
134             } else if (actualReadBytes >= nextReceiveBufferSize) {
135                 index = min(index + INDEX_INCREMENT, maxIndex);
136                 nextReceiveBufferSize = SIZE_TABLE[index];
137                 decreaseNow = false;
138             }
139         }
140
141         @Override
142         public void readComplete() {
143             record(totalBytesRead());
144         }
145     }
146
147     private final int minIndex;
148     private final int maxIndex;
149     private final int initial;
150
151     /**
152      * Creates a new predictor with the default parameters.  With the default
153      * parameters, the expected buffer size starts from {@code 1024}, does not
154      * go down below {@code 64}, and does not go up above {@code 65536}.
155      */

156     public AdaptiveRecvByteBufAllocator() {
157         this(DEFAULT_MINIMUM, DEFAULT_INITIAL, DEFAULT_MAXIMUM);
158     }
159
160     /**
161      * Creates a new predictor with the specified parameters.
162      *
163      * @param minimum  the inclusive lower bound of the expected buffer size
164      * @param initial  the initial buffer size when no feed back was received
165      * @param maximum  the inclusive upper bound of the expected buffer size
166      */

167     public AdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum) {
168         checkPositive(minimum, "minimum");
169         if (initial < minimum) {
170             throw new IllegalArgumentException("initial: " + initial);
171         }
172         if (maximum < initial) {
173             throw new IllegalArgumentException("maximum: " + maximum);
174         }
175
176         int minIndex = getSizeTableIndex(minimum);
177         if (SIZE_TABLE[minIndex] < minimum) {
178             this.minIndex = minIndex + 1;
179         } else {
180             this.minIndex = minIndex;
181         }
182
183         int maxIndex = getSizeTableIndex(maximum);
184         if (SIZE_TABLE[maxIndex] > maximum) {
185             this.maxIndex = maxIndex - 1;
186         } else {
187             this.maxIndex = maxIndex;
188         }
189
190         this.initial = initial;
191     }
192
193     @SuppressWarnings("deprecation")
194     @Override
195     public Handle newHandle() {
196         return new HandleImpl(minIndex, maxIndex, initial);
197     }
198
199     @Override
200     public AdaptiveRecvByteBufAllocator respectMaybeMoreData(boolean respectMaybeMoreData) {
201         super.respectMaybeMoreData(respectMaybeMoreData);
202         return this;
203     }
204 }
205