1 /*
2 * Copyright 2015 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package io.netty.channel;
17
18 import static io.netty.util.internal.ObjectUtil.checkPositive;
19
20 import io.netty.buffer.ByteBuf;
21 import io.netty.buffer.ByteBufAllocator;
22 import io.netty.util.UncheckedBooleanSupplier;
23
24 /**
25 * Default implementation of {@link MaxMessagesRecvByteBufAllocator} which respects {@link ChannelConfig#isAutoRead()}
26 * and also prevents overflow.
27 */
28 public abstract class DefaultMaxMessagesRecvByteBufAllocator implements MaxMessagesRecvByteBufAllocator {
29 private volatile int maxMessagesPerRead;
30 private volatile boolean respectMaybeMoreData = true;
31
32 public DefaultMaxMessagesRecvByteBufAllocator() {
33 this(1);
34 }
35
36 public DefaultMaxMessagesRecvByteBufAllocator(int maxMessagesPerRead) {
37 maxMessagesPerRead(maxMessagesPerRead);
38 }
39
40 @Override
41 public int maxMessagesPerRead() {
42 return maxMessagesPerRead;
43 }
44
45 @Override
46 public MaxMessagesRecvByteBufAllocator maxMessagesPerRead(int maxMessagesPerRead) {
47 checkPositive(maxMessagesPerRead, "maxMessagesPerRead");
48 this.maxMessagesPerRead = maxMessagesPerRead;
49 return this;
50 }
51
52 /**
53 * Determine if future instances of {@link #newHandle()} will stop reading if we think there is no more data.
54 * @param respectMaybeMoreData
55 * <ul>
56 * <li>{@code true} to stop reading if we think there is no more data. This may save a system call to read from
57 * the socket, but if data has arrived in a racy fashion we may give up our {@link #maxMessagesPerRead()}
58 * quantum and have to wait for the selector to notify us of more data.</li>
59 * <li>{@code false} to keep reading (up to {@link #maxMessagesPerRead()}) or until there is no data when we
60 * attempt to read.</li>
61 * </ul>
62 * @return {@code this}.
63 */
64 public DefaultMaxMessagesRecvByteBufAllocator respectMaybeMoreData(boolean respectMaybeMoreData) {
65 this.respectMaybeMoreData = respectMaybeMoreData;
66 return this;
67 }
68
69 /**
70 * Get if future instances of {@link #newHandle()} will stop reading if we think there is no more data.
71 * @return
72 * <ul>
73 * <li>{@code true} to stop reading if we think there is no more data. This may save a system call to read from
74 * the socket, but if data has arrived in a racy fashion we may give up our {@link #maxMessagesPerRead()}
75 * quantum and have to wait for the selector to notify us of more data.</li>
76 * <li>{@code false} to keep reading (up to {@link #maxMessagesPerRead()}) or until there is no data when we
77 * attempt to read.</li>
78 * </ul>
79 */
80 public final boolean respectMaybeMoreData() {
81 return respectMaybeMoreData;
82 }
83
84 /**
85 * Focuses on enforcing the maximum messages per read condition for {@link #continueReading()}.
86 */
87 public abstract class MaxMessageHandle implements ExtendedHandle {
88 private ChannelConfig config;
89 private int maxMessagePerRead;
90 private int totalMessages;
91 private int totalBytesRead;
92 private int attemptedBytesRead;
93 private int lastBytesRead;
94 private final boolean respectMaybeMoreData = DefaultMaxMessagesRecvByteBufAllocator.this.respectMaybeMoreData;
95 private final UncheckedBooleanSupplier defaultMaybeMoreSupplier = new UncheckedBooleanSupplier() {
96 @Override
97 public boolean get() {
98 return attemptedBytesRead == lastBytesRead;
99 }
100 };
101
102 /**
103 * Only {@link ChannelConfig#getMaxMessagesPerRead()} is used.
104 */
105 @Override
106 public void reset(ChannelConfig config) {
107 this.config = config;
108 maxMessagePerRead = maxMessagesPerRead();
109 totalMessages = totalBytesRead = 0;
110 }
111
112 @Override
113 public ByteBuf allocate(ByteBufAllocator alloc) {
114 return alloc.ioBuffer(guess());
115 }
116
117 @Override
118 public final void incMessagesRead(int amt) {
119 totalMessages += amt;
120 }
121
122 @Override
123 public void lastBytesRead(int bytes) {
124 lastBytesRead = bytes;
125 if (bytes > 0) {
126 totalBytesRead += bytes;
127 }
128 }
129
130 @Override
131 public final int lastBytesRead() {
132 return lastBytesRead;
133 }
134
135 @Override
136 public boolean continueReading() {
137 return continueReading(defaultMaybeMoreSupplier);
138 }
139
140 @Override
141 public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
142 return config.isAutoRead() &&
143 (!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&
144 totalMessages < maxMessagePerRead &&
145 totalBytesRead > 0;
146 }
147
148 @Override
149 public void readComplete() {
150 }
151
152 @Override
153 public int attemptedBytesRead() {
154 return attemptedBytesRead;
155 }
156
157 @Override
158 public void attemptedBytesRead(int bytes) {
159 attemptedBytesRead = bytes;
160 }
161
162 protected final int totalBytesRead() {
163 return totalBytesRead < 0 ? Integer.MAX_VALUE : totalBytesRead;
164 }
165 }
166 }
167