1
18
19 package io.undertow.server;
20
21 import io.undertow.UndertowMessages;
22 import io.undertow.connector.ByteBufferPool;
23 import io.undertow.connector.PooledByteBuffer;
24
25 import java.lang.ref.WeakReference;
26 import java.nio.ByteBuffer;
27 import java.util.ArrayDeque;
28 import java.util.ArrayList;
29 import java.util.List;
30 import java.util.concurrent.ConcurrentLinkedQueue;
31 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
32
33
40 public class DefaultByteBufferPool implements ByteBufferPool {
41
42 private final ThreadLocal<ThreadLocalData> threadLocalCache = new ThreadLocal<>();
43
44 private final List<WeakReference<ThreadLocalData>> threadLocalDataList = new ArrayList<>();
45 private final ConcurrentLinkedQueue<ByteBuffer> queue = new ConcurrentLinkedQueue<>();
46
47 private final boolean direct;
48 private final int bufferSize;
49 private final int maximumPoolSize;
50 private final int threadLocalCacheSize;
51 private final int leakDectionPercent;
52 private int count;
53
54 @SuppressWarnings({"unused", "FieldCanBeLocal"})
55 private volatile int currentQueueLength = 0;
56 private static final AtomicIntegerFieldUpdater<DefaultByteBufferPool> currentQueueLengthUpdater = AtomicIntegerFieldUpdater.newUpdater(DefaultByteBufferPool.class, "currentQueueLength");
57
58 @SuppressWarnings({"unused", "FieldCanBeLocal"})
59 private volatile int reclaimedThreadLocals = 0;
60 private static final AtomicIntegerFieldUpdater<DefaultByteBufferPool> reclaimedThreadLocalsUpdater = AtomicIntegerFieldUpdater.newUpdater(DefaultByteBufferPool.class, "reclaimedThreadLocals");
61
62 private volatile boolean closed;
63
64 private final DefaultByteBufferPool arrayBackedPool;
65
66
67
71 public DefaultByteBufferPool(boolean direct, int bufferSize) {
72 this(direct, bufferSize, -1, 12, 0);
73 }
74
80 public DefaultByteBufferPool(boolean direct, int bufferSize, int maximumPoolSize, int threadLocalCacheSize, int leakDecetionPercent) {
81 this.direct = direct;
82 this.bufferSize = bufferSize;
83 this.maximumPoolSize = maximumPoolSize;
84 this.threadLocalCacheSize = threadLocalCacheSize;
85 this.leakDectionPercent = leakDecetionPercent;
86 if(direct) {
87 arrayBackedPool = new DefaultByteBufferPool(false, bufferSize, maximumPoolSize, 0, leakDecetionPercent);
88 } else {
89 arrayBackedPool = this;
90 }
91 }
92
93
94
100 public DefaultByteBufferPool(boolean direct, int bufferSize, int maximumPoolSize, int threadLocalCacheSize) {
101 this(direct, bufferSize, maximumPoolSize, threadLocalCacheSize, 0);
102 }
103
104 @Override
105 public int getBufferSize() {
106 return bufferSize;
107 }
108
109 @Override
110 public boolean isDirect() {
111 return direct;
112 }
113
114 @Override
115 public PooledByteBuffer allocate() {
116 if (closed) {
117 throw UndertowMessages.MESSAGES.poolIsClosed();
118 }
119 ByteBuffer buffer = null;
120 ThreadLocalData local = null;
121 if(threadLocalCacheSize > 0) {
122 local = threadLocalCache.get();
123 if (local != null) {
124 buffer = local.buffers.poll();
125 } else {
126 local = new ThreadLocalData();
127 synchronized (threadLocalDataList) {
128 if (closed) {
129 throw UndertowMessages.MESSAGES.poolIsClosed();
130 }
131 cleanupThreadLocalData();
132 threadLocalDataList.add(new WeakReference<>(local));
133 threadLocalCache.set(local);
134 }
135
136 }
137 }
138 if (buffer == null) {
139 buffer = queue.poll();
140 if (buffer != null) {
141 currentQueueLengthUpdater.decrementAndGet(this);
142 }
143 }
144 if (buffer == null) {
145 if (direct) {
146 buffer = ByteBuffer.allocateDirect(bufferSize);
147 } else {
148 buffer = ByteBuffer.allocate(bufferSize);
149 }
150 }
151 if(local != null) {
152 if(local.allocationDepth < threadLocalCacheSize) {
153 local.allocationDepth++;
154 }
155 }
156 buffer.clear();
157 return new DefaultPooledBuffer(this, buffer, leakDectionPercent == 0 ? false : (++count % 100 < leakDectionPercent));
158 }
159
160 @Override
161 public ByteBufferPool getArrayBackedPool() {
162 return arrayBackedPool;
163 }
164
165 private void cleanupThreadLocalData() {
166
167
168 int size = threadLocalDataList.size();
169
170 if (reclaimedThreadLocals > (size / 4)) {
171 int j = 0;
172 for (int i = 0; i < size; i++) {
173 WeakReference<ThreadLocalData> ref = threadLocalDataList.get(i);
174 if (ref.get() != null) {
175 threadLocalDataList.set(j++, ref);
176 }
177 }
178 for (int i = size - 1; i >= j; i--) {
179
180 threadLocalDataList.remove(i);
181 }
182 reclaimedThreadLocalsUpdater.addAndGet(this, -1 * (size - j));
183 }
184 }
185
186 private void freeInternal(ByteBuffer buffer) {
187 if (closed) {
188 DirectByteBufferDeallocator.free(buffer);
189 return;
190 }
191 ThreadLocalData local = threadLocalCache.get();
192 if(local != null) {
193 if(local.allocationDepth > 0) {
194 local.allocationDepth--;
195 if (local.buffers.size() < threadLocalCacheSize) {
196 local.buffers.add(buffer);
197 return;
198 }
199 }
200 }
201 queueIfUnderMax(buffer);
202 }
203
204 private void queueIfUnderMax(ByteBuffer buffer) {
205 int size;
206 do {
207 size = currentQueueLength;
208 if(size > maximumPoolSize) {
209 DirectByteBufferDeallocator.free(buffer);
210 return;
211 }
212 } while (!currentQueueLengthUpdater.compareAndSet(this, size, size + 1));
213 queue.add(buffer);
214 }
215
216 @Override
217 public void close() {
218 if (closed) {
219 return;
220 }
221 closed = true;
222 queue.clear();
223
224 synchronized (threadLocalDataList) {
225 for (WeakReference<ThreadLocalData> ref : threadLocalDataList) {
226 ThreadLocalData local = ref.get();
227 if (local != null) {
228 local.buffers.clear();
229 }
230 ref.clear();
231 }
232 threadLocalDataList.clear();
233 }
234 }
235
236 @Override
237 protected void finalize() throws Throwable {
238 super.finalize();
239 close();
240 }
241
242 private static class DefaultPooledBuffer implements PooledByteBuffer {
243
244 private final DefaultByteBufferPool pool;
245 private final LeakDetector leakDetector;
246 private ByteBuffer buffer;
247
248 private volatile int referenceCount = 1;
249 private static final AtomicIntegerFieldUpdater<DefaultPooledBuffer> referenceCountUpdater = AtomicIntegerFieldUpdater.newUpdater(DefaultPooledBuffer.class, "referenceCount");
250
251 DefaultPooledBuffer(DefaultByteBufferPool pool, ByteBuffer buffer, boolean detectLeaks) {
252 this.pool = pool;
253 this.buffer = buffer;
254 this.leakDetector = detectLeaks ? new LeakDetector() : null;
255 }
256
257 @Override
258 public ByteBuffer getBuffer() {
259 if(referenceCount == 0) {
260 throw UndertowMessages.MESSAGES.bufferAlreadyFreed();
261 }
262 return buffer;
263 }
264
265 @Override
266 public void close() {
267 if(referenceCountUpdater.compareAndSet(this, 1, 0)) {
268 if(leakDetector != null) {
269 leakDetector.closed = true;
270 }
271 pool.freeInternal(buffer);
272 this.buffer = null;
273 }
274 }
275
276 @Override
277 public boolean isOpen() {
278 return referenceCount > 0;
279 }
280
281 @Override
282 public String toString() {
283 return "DefaultPooledBuffer{" +
284 "buffer=" + buffer +
285 ", referenceCount=" + referenceCount +
286 '}';
287 }
288 }
289
290 private class ThreadLocalData {
291 ArrayDeque<ByteBuffer> buffers = new ArrayDeque<>(threadLocalCacheSize);
292 int allocationDepth = 0;
293
294 @Override
295 protected void finalize() throws Throwable {
296 super.finalize();
297 reclaimedThreadLocalsUpdater.incrementAndGet(DefaultByteBufferPool.this);
298 if (buffers != null) {
299
300 ByteBuffer buffer;
301 while ((buffer = buffers.poll()) != null) {
302 queueIfUnderMax(buffer);
303 }
304 }
305 }
306 }
307
308 private static class LeakDetector {
309
310 volatile boolean closed = false;
311 private final Throwable allocationPoint;
312
313 private LeakDetector() {
314 this.allocationPoint = new Throwable("Buffer leak detected");
315 }
316
317 @Override
318 protected void finalize() throws Throwable {
319 super.finalize();
320 if(!closed) {
321 allocationPoint.printStackTrace();
322 }
323 }
324 }
325
326 }
327