1 /*
2  * JBoss, Home of Professional Open Source.
3  * Copyright 2014 Red Hat, Inc., and individual contributors
4  * as indicated by the @author tags.
5  *
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  */

18
19 package io.undertow.server;
20
21 import io.undertow.UndertowMessages;
22 import io.undertow.connector.ByteBufferPool;
23 import io.undertow.connector.PooledByteBuffer;
24
25 import java.lang.ref.WeakReference;
26 import java.nio.ByteBuffer;
27 import java.util.ArrayDeque;
28 import java.util.ArrayList;
29 import java.util.List;
30 import java.util.concurrent.ConcurrentLinkedQueue;
31 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
32
33 /**
34  * A byte buffer pool that supports reference counted pools.
35  *
36  * TODO: move this somewhere more appropriate
37  *
38  * @author Stuart Douglas
39  */

40 public class DefaultByteBufferPool implements ByteBufferPool {
41
42     private final ThreadLocal<ThreadLocalData> threadLocalCache = new ThreadLocal<>();
43     // Access requires synchronization on the threadLocalDataList instance
44     private final List<WeakReference<ThreadLocalData>> threadLocalDataList = new ArrayList<>();
45     private final ConcurrentLinkedQueue<ByteBuffer> queue = new ConcurrentLinkedQueue<>();
46
47     private final boolean direct;
48     private final int bufferSize;
49     private final int maximumPoolSize;
50     private final int threadLocalCacheSize;
51     private final int leakDectionPercent;
52     private int count; //racily updated count used in leak detection
53
54     @SuppressWarnings({"unused""FieldCanBeLocal"})
55     private volatile int currentQueueLength = 0;
56     private static final AtomicIntegerFieldUpdater<DefaultByteBufferPool> currentQueueLengthUpdater = AtomicIntegerFieldUpdater.newUpdater(DefaultByteBufferPool.class"currentQueueLength");
57
58     @SuppressWarnings({"unused""FieldCanBeLocal"})
59     private volatile int reclaimedThreadLocals = 0;
60     private static final AtomicIntegerFieldUpdater<DefaultByteBufferPool> reclaimedThreadLocalsUpdater = AtomicIntegerFieldUpdater.newUpdater(DefaultByteBufferPool.class"reclaimedThreadLocals");
61
62     private volatile boolean closed;
63
64     private final DefaultByteBufferPool arrayBackedPool;
65
66
67     /**
68      * @param direct               If this implementation should use direct buffers
69      * @param bufferSize           The buffer size to use
70      */

71     public DefaultByteBufferPool(boolean direct, int bufferSize) {
72         this(direct, bufferSize, -1, 12, 0);
73     }
74     /**
75      * @param direct               If this implementation should use direct buffers
76      * @param bufferSize           The buffer size to use
77      * @param maximumPoolSize      The maximum pool size, in number of buffers, it does not include buffers in thread local caches
78      * @param threadLocalCacheSize The maximum number of buffers that can be stored in a thread local cache
79      */

80     public DefaultByteBufferPool(boolean direct, int bufferSize, int maximumPoolSize, int threadLocalCacheSize, int leakDecetionPercent) {
81         this.direct = direct;
82         this.bufferSize = bufferSize;
83         this.maximumPoolSize = maximumPoolSize;
84         this.threadLocalCacheSize = threadLocalCacheSize;
85         this.leakDectionPercent = leakDecetionPercent;
86         if(direct) {
87             arrayBackedPool = new DefaultByteBufferPool(false, bufferSize, maximumPoolSize, 0, leakDecetionPercent);
88         } else {
89             arrayBackedPool = this;
90         }
91     }
92
93
94     /**
95      * @param direct               If this implementation should use direct buffers
96      * @param bufferSize           The buffer size to use
97      * @param maximumPoolSize      The maximum pool size, in number of buffers, it does not include buffers in thread local caches
98      * @param threadLocalCacheSize The maximum number of buffers that can be stored in a thread local cache
99      */

100     public DefaultByteBufferPool(boolean direct, int bufferSize, int maximumPoolSize, int threadLocalCacheSize) {
101         this(direct, bufferSize, maximumPoolSize, threadLocalCacheSize, 0);
102     }
103
104     @Override
105     public int getBufferSize() {
106         return bufferSize;
107     }
108
109     @Override
110     public boolean isDirect() {
111         return direct;
112     }
113
114     @Override
115     public PooledByteBuffer allocate() {
116         if (closed) {
117             throw UndertowMessages.MESSAGES.poolIsClosed();
118         }
119         ByteBuffer buffer = null;
120         ThreadLocalData local = null;
121         if(threadLocalCacheSize > 0) {
122             local = threadLocalCache.get();
123             if (local != null) {
124                 buffer = local.buffers.poll();
125             } else {
126                 local = new ThreadLocalData();
127                 synchronized (threadLocalDataList) {
128                     if (closed) {
129                         throw UndertowMessages.MESSAGES.poolIsClosed();
130                     }
131                     cleanupThreadLocalData();
132                     threadLocalDataList.add(new WeakReference<>(local));
133                     threadLocalCache.set(local);
134                 }
135
136             }
137         }
138         if (buffer == null) {
139             buffer = queue.poll();
140             if (buffer != null) {
141                 currentQueueLengthUpdater.decrementAndGet(this);
142             }
143         }
144         if (buffer == null) {
145             if (direct) {
146                 buffer = ByteBuffer.allocateDirect(bufferSize);
147             } else {
148                 buffer = ByteBuffer.allocate(bufferSize);
149             }
150         }
151         if(local != null) {
152             if(local.allocationDepth < threadLocalCacheSize) { //prevent overflow if the thread only allocates and never frees
153                 local.allocationDepth++;
154             }
155         }
156         buffer.clear();
157         return new DefaultPooledBuffer(this, buffer, leakDectionPercent == 0 ? false : (++count % 100 < leakDectionPercent));
158     }
159
160     @Override
161     public ByteBufferPool getArrayBackedPool() {
162         return arrayBackedPool;
163     }
164
165     private void cleanupThreadLocalData() {
166         // Called under lock, and only when at least quarter of the capacity has been collected.
167
168         int size = threadLocalDataList.size();
169
170         if (reclaimedThreadLocals > (size / 4)) {
171             int j = 0;
172             for (int i = 0; i < size; i++) {
173                 WeakReference<ThreadLocalData> ref = threadLocalDataList.get(i);
174                 if (ref.get() != null) {
175                     threadLocalDataList.set(j++, ref);
176                 }
177             }
178             for (int i = size - 1; i >= j; i--) {
179                 // A tail remove is inlined to a range change check and a decrement
180                 threadLocalDataList.remove(i);
181             }
182             reclaimedThreadLocalsUpdater.addAndGet(this, -1 * (size - j));
183         }
184     }
185
186     private void freeInternal(ByteBuffer buffer) {
187         if (closed) {
188             DirectByteBufferDeallocator.free(buffer);
189             return//GC will take care of it
190         }
191         ThreadLocalData local = threadLocalCache.get();
192         if(local != null) {
193             if(local.allocationDepth > 0) {
194                 local.allocationDepth--;
195                 if (local.buffers.size() < threadLocalCacheSize) {
196                     local.buffers.add(buffer);
197                     return;
198                 }
199             }
200         }
201         queueIfUnderMax(buffer);
202     }
203
204     private void queueIfUnderMax(ByteBuffer buffer) {
205         int size;
206         do {
207             size = currentQueueLength;
208             if(size > maximumPoolSize) {
209                 DirectByteBufferDeallocator.free(buffer);
210                 return;
211             }
212         } while (!currentQueueLengthUpdater.compareAndSet(this, size, size + 1));
213         queue.add(buffer);
214     }
215
216     @Override
217     public void close() {
218         if (closed) {
219             return;
220         }
221         closed = true;
222         queue.clear();
223
224         synchronized (threadLocalDataList) {
225             for (WeakReference<ThreadLocalData> ref : threadLocalDataList) {
226                 ThreadLocalData local = ref.get();
227                 if (local != null) {
228                     local.buffers.clear();
229                 }
230                 ref.clear();
231             }
232             threadLocalDataList.clear();
233         }
234     }
235
236     @Override
237     protected void finalize() throws Throwable {
238         super.finalize();
239         close();
240     }
241
242     private static class DefaultPooledBuffer implements PooledByteBuffer {
243
244         private final DefaultByteBufferPool pool;
245         private final LeakDetector leakDetector;
246         private ByteBuffer buffer;
247
248         private volatile int referenceCount = 1;
249         private static final AtomicIntegerFieldUpdater<DefaultPooledBuffer> referenceCountUpdater = AtomicIntegerFieldUpdater.newUpdater(DefaultPooledBuffer.class"referenceCount");
250
251         DefaultPooledBuffer(DefaultByteBufferPool pool, ByteBuffer buffer, boolean detectLeaks) {
252             this.pool = pool;
253             this.buffer = buffer;
254             this.leakDetector = detectLeaks ? new LeakDetector() : null;
255         }
256
257         @Override
258         public ByteBuffer getBuffer() {
259             if(referenceCount == 0) {
260                 throw UndertowMessages.MESSAGES.bufferAlreadyFreed();
261             }
262             return buffer;
263         }
264
265         @Override
266         public void close() {
267             if(referenceCountUpdater.compareAndSet(this, 1, 0)) {
268                 if(leakDetector != null) {
269                     leakDetector.closed = true;
270                 }
271                 pool.freeInternal(buffer);
272                 this.buffer = null;
273             }
274         }
275
276         @Override
277         public boolean isOpen() {
278             return referenceCount > 0;
279         }
280
281         @Override
282         public String toString() {
283             return "DefaultPooledBuffer{" +
284                     "buffer=" + buffer +
285                     ", referenceCount=" + referenceCount +
286                     '}';
287         }
288     }
289
290     private class ThreadLocalData {
291         ArrayDeque<ByteBuffer> buffers = new ArrayDeque<>(threadLocalCacheSize);
292         int allocationDepth = 0;
293
294         @Override
295         protected void finalize() throws Throwable {
296             super.finalize();
297             reclaimedThreadLocalsUpdater.incrementAndGet(DefaultByteBufferPool.this);
298             if (buffers != null) {
299                 // Recycle them
300                 ByteBuffer buffer;
301                 while ((buffer = buffers.poll()) != null) {
302                     queueIfUnderMax(buffer);
303                 }
304             }
305         }
306     }
307
308     private static class LeakDetector {
309
310         volatile boolean closed = false;
311         private final Throwable allocationPoint;
312
313         private LeakDetector() {
314             this.allocationPoint = new Throwable("Buffer leak detected");
315         }
316
317         @Override
318         protected void finalize() throws Throwable {
319             super.finalize();
320             if(!closed) {
321                 allocationPoint.printStackTrace();
322             }
323         }
324     }
325
326 }
327