1 /*
2  * Copyright 2012 The Netty Project
3  *
4  * The Netty Project licenses this file to you under the Apache License,
5  * version 2.0 (the "License"); you may not use this file except in compliance
6  * with the License. You may obtain a copy of the License at:
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13  * License for the specific language governing permissions and limitations
14  * under the License.
15  */

16
17 package io.netty.buffer;
18
19
20 import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
21
22 import io.netty.buffer.PoolArena.SizeClass;
23 import io.netty.util.internal.MathUtil;
24 import io.netty.util.internal.ObjectPool;
25 import io.netty.util.internal.ObjectPool.Handle;
26 import io.netty.util.internal.ObjectPool.ObjectCreator;
27 import io.netty.util.internal.PlatformDependent;
28 import io.netty.util.internal.logging.InternalLogger;
29 import io.netty.util.internal.logging.InternalLoggerFactory;
30
31 import java.nio.ByteBuffer;
32 import java.util.Queue;
33 import java.util.concurrent.atomic.AtomicBoolean;
34
35 /**
36  * Acts a Thread cache for allocations. This implementation is moduled after
37  * <a href="http://people.freebsd.org/~jasone/jemalloc/bsdcan2006/jemalloc.pdf">jemalloc</a> and the descripted
38  * technics of
39  * <a href="https://www.facebook.com/notes/facebook-engineering/scalable-memory-allocation-using-jemalloc/480222803919">
40  * Scalable memory allocation using jemalloc</a>.
41  */

42 final class PoolThreadCache {
43
44     private static final InternalLogger logger = InternalLoggerFactory.getInstance(PoolThreadCache.class);
45     private static final int INTEGER_SIZE_MINUS_ONE = Integer.SIZE - 1;
46
47     final PoolArena<byte[]> heapArena;
48     final PoolArena<ByteBuffer> directArena;
49
50     // Hold the caches for the different size classes, which are tiny, small and normal.
51     private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
52     private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
53     private final MemoryRegionCache<byte[]>[] normalHeapCaches;
54     private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
55
56     // Used for bitshifting when calculate the index of normal caches later
57     private final int numShiftsNormalDirect;
58     private final int numShiftsNormalHeap;
59     private final int freeSweepAllocationThreshold;
60     private final AtomicBoolean freed = new AtomicBoolean();
61
62     private int allocations;
63
64     // TODO: Test if adding padding helps under contention
65     //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
66
67     PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
68                     int smallCacheSize, int normalCacheSize, int maxCachedBufferCapacity,
69                     int freeSweepAllocationThreshold) {
70         checkPositiveOrZero(maxCachedBufferCapacity, "maxCachedBufferCapacity");
71         this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
72         this.heapArena = heapArena;
73         this.directArena = directArena;
74         if (directArena != null) {
75             smallSubPageDirectCaches = createSubPageCaches(
76                     smallCacheSize, directArena.numSmallSubpagePools);
77
78             numShiftsNormalDirect = log2(directArena.pageSize);
79             normalDirectCaches = createNormalCaches(
80                     normalCacheSize, maxCachedBufferCapacity, directArena);
81
82             directArena.numThreadCaches.getAndIncrement();
83         } else {
84             // No directArea is configured so just null out all caches
85             smallSubPageDirectCaches = null;
86             normalDirectCaches = null;
87             numShiftsNormalDirect = -1;
88         }
89         if (heapArena != null) {
90             // Create the caches for the heap allocations
91             smallSubPageHeapCaches = createSubPageCaches(
92                     smallCacheSize, heapArena.numSmallSubpagePools);
93
94             numShiftsNormalHeap = log2(heapArena.pageSize);
95             normalHeapCaches = createNormalCaches(
96                     normalCacheSize, maxCachedBufferCapacity, heapArena);
97
98             heapArena.numThreadCaches.getAndIncrement();
99         } else {
100             // No heapArea is configured so just null out all caches
101             smallSubPageHeapCaches = null;
102             normalHeapCaches = null;
103             numShiftsNormalHeap = -1;
104         }
105
106         // Only check if there are caches in use.
107         if ((smallSubPageDirectCaches != null || normalDirectCaches != null
108                 || smallSubPageHeapCaches != null || normalHeapCaches != null)
109                 && freeSweepAllocationThreshold < 1) {
110             throw new IllegalArgumentException("freeSweepAllocationThreshold: "
111                     + freeSweepAllocationThreshold + " (expected: > 0)");
112         }
113     }
114
115     private static <T> MemoryRegionCache<T>[] createSubPageCaches(
116             int cacheSize, int numCaches) {
117         if (cacheSize > 0 && numCaches > 0) {
118             @SuppressWarnings("unchecked")
119             MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];
120             for (int i = 0; i < cache.length; i++) {
121                 // TODO: maybe use cacheSize / cache.length
122                 cache[i] = new SubPageMemoryRegionCache<T>(cacheSize);
123             }
124             return cache;
125         } else {
126             return null;
127         }
128     }
129
130     private static <T> MemoryRegionCache<T>[] createNormalCaches(
131             int cacheSize, int maxCachedBufferCapacity, PoolArena<T> area) {
132         if (cacheSize > 0 && maxCachedBufferCapacity > 0) {
133             int max = Math.min(area.chunkSize, maxCachedBufferCapacity);
134             int arraySize = Math.max(1, log2(max / area.pageSize) + 1);
135
136             @SuppressWarnings("unchecked")
137             MemoryRegionCache<T>[] cache = new MemoryRegionCache[arraySize];
138             for (int i = 0; i < cache.length; i++) {
139                 cache[i] = new NormalMemoryRegionCache<T>(cacheSize);
140             }
141             return cache;
142         } else {
143             return null;
144         }
145     }
146
147     // val > 0
148     static int log2(int val) {
149         return INTEGER_SIZE_MINUS_ONE - Integer.numberOfLeadingZeros(val);
150     }
151
152     /**
153      * Try to allocate a small buffer out of the cache. Returns {@code trueif successful {@code false} otherwise
154      */

155     boolean allocateSmall(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int sizeIdx) {
156         return allocate(cacheForSmall(area, sizeIdx), buf, reqCapacity);
157     }
158
159     /**
160      * Try to allocate a small buffer out of the cache. Returns {@code trueif successful {@code false} otherwise
161      */

162     boolean allocateNormal(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
163         return allocate(cacheForNormal(area, normCapacity), buf, reqCapacity);
164     }
165
166     @SuppressWarnings({ "unchecked""rawtypes" })
167     private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {
168         if (cache == null) {
169             // no cache found so just return false here
170             return false;
171         }
172         boolean allocated = cache.allocate(buf, reqCapacity, this);
173         if (++ allocations >= freeSweepAllocationThreshold) {
174             allocations = 0;
175             trim();
176         }
177         return allocated;
178     }
179
180     /**
181      * Add {@link PoolChunk} and {@code handle} to the cache if there is enough room.
182      * Returns {@code trueif it fit into the cache {@code false} otherwise.
183      */

184     @SuppressWarnings({ "unchecked""rawtypes" })
185     boolean add(PoolArena<?> area, PoolChunk chunk, ByteBuffer nioBuffer,
186                 long handle, int normCapacity, SizeClass sizeClass) {
187         int sizeIdx = area.size2SizeIdx(normCapacity);
188         MemoryRegionCache<?> cache = cache(area, sizeIdx, sizeClass);
189         if (cache == null) {
190             return false;
191         }
192         return cache.add(chunk, nioBuffer, handle, normCapacity);
193     }
194
195     private MemoryRegionCache<?> cache(PoolArena<?> area, int sizeIdx, SizeClass sizeClass) {
196         switch (sizeClass) {
197         case Normal:
198             return cacheForNormal(area, sizeIdx);
199         case Small:
200             return cacheForSmall(area, sizeIdx);
201         default:
202             throw new Error();
203         }
204     }
205
206     /// TODO: In the future when we move to Java9+ we should use java.lang.ref.Cleaner.
207     @Override
208     protected void finalize() throws Throwable {
209         try {
210             super.finalize();
211         } finally {
212             free(true);
213         }
214     }
215
216     /**
217      *  Should be called if the Thread that uses this cache is about to exist to release resources out of the cache
218      */

219     void free(boolean finalizer) {
220         // As free() may be called either by the finalizer or by FastThreadLocal.onRemoval(...) we need to ensure
221         // we only call this one time.
222         if (freed.compareAndSet(falsetrue)) {
223             int numFreed = free(smallSubPageDirectCaches, finalizer) +
224                     free(normalDirectCaches, finalizer) +
225                     free(smallSubPageHeapCaches, finalizer) +
226                     free(normalHeapCaches, finalizer);
227
228             if (numFreed > 0 && logger.isDebugEnabled()) {
229                 logger.debug("Freed {} thread-local buffer(s) from thread: {}", numFreed,
230                         Thread.currentThread().getName());
231             }
232
233             if (directArena != null) {
234                 directArena.numThreadCaches.getAndDecrement();
235             }
236
237             if (heapArena != null) {
238                 heapArena.numThreadCaches.getAndDecrement();
239             }
240         }
241     }
242
243     private static int free(MemoryRegionCache<?>[] caches, boolean finalizer) {
244         if (caches == null) {
245             return 0;
246         }
247
248         int numFreed = 0;
249         for (MemoryRegionCache<?> c: caches) {
250             numFreed += free(c, finalizer);
251         }
252         return numFreed;
253     }
254
255     private static int free(MemoryRegionCache<?> cache, boolean finalizer) {
256         if (cache == null) {
257             return 0;
258         }
259         return cache.free(finalizer);
260     }
261
262     void trim() {
263         trim(smallSubPageDirectCaches);
264         trim(normalDirectCaches);
265         trim(smallSubPageHeapCaches);
266         trim(normalHeapCaches);
267     }
268
269     private static void trim(MemoryRegionCache<?>[] caches) {
270         if (caches == null) {
271             return;
272         }
273         for (MemoryRegionCache<?> c: caches) {
274             trim(c);
275         }
276     }
277
278     private static void trim(MemoryRegionCache<?> cache) {
279         if (cache == null) {
280             return;
281         }
282         cache.trim();
283     }
284
285     private MemoryRegionCache<?> cacheForSmall(PoolArena<?> area, int sizeIdx) {
286         if (area.isDirect()) {
287             return cache(smallSubPageDirectCaches, sizeIdx);
288         }
289         return cache(smallSubPageHeapCaches, sizeIdx);
290     }
291
292     private MemoryRegionCache<?> cacheForNormal(PoolArena<?> area, int sizeIdx) {
293         if (area.isDirect()) {
294             return cache(normalDirectCaches, sizeIdx);
295         }
296         return cache(normalHeapCaches, sizeIdx);
297     }
298
299     private static <T> MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int sizeIdx) {
300         if (cache == null || sizeIdx > cache.length - 1) {
301             return null;
302         }
303         return cache[sizeIdx];
304     }
305
306     /**
307      * Cache used for buffers which are backed by TINY or SMALL size.
308      */

309     private static final class SubPageMemoryRegionCache<T> extends MemoryRegionCache<T> {
310         SubPageMemoryRegionCache(int size) {
311             super(size, SizeClass.Small);
312         }
313
314         @Override
315         protected void initBuf(
316                 PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, PooledByteBuf<T> buf, int reqCapacity,
317                 PoolThreadCache threadCache) {
318             chunk.initBufWithSubpage(buf, nioBuffer, handle, reqCapacity, threadCache);
319         }
320     }
321
322     /**
323      * Cache used for buffers which are backed by NORMAL size.
324      */

325     private static final class NormalMemoryRegionCache<T> extends MemoryRegionCache<T> {
326         NormalMemoryRegionCache(int size) {
327             super(size, SizeClass.Normal);
328         }
329
330         @Override
331         protected void initBuf(
332                 PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, PooledByteBuf<T> buf, int reqCapacity,
333                 PoolThreadCache threadCache) {
334             chunk.initBuf(buf, nioBuffer, handle, reqCapacity, threadCache);
335         }
336     }
337
338     private abstract static class MemoryRegionCache<T> {
339         private final int size;
340         private final Queue<Entry<T>> queue;
341         private final SizeClass sizeClass;
342         private int allocations;
343
344         MemoryRegionCache(int size, SizeClass sizeClass) {
345             this.size = MathUtil.safeFindNextPositivePowerOfTwo(size);
346             queue = PlatformDependent.newFixedMpscQueue(this.size);
347             this.sizeClass = sizeClass;
348         }
349
350         /**
351          * Init the {@link PooledByteBuf} using the provided chunk and handle with the capacity restrictions.
352          */

353         protected abstract void initBuf(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle,
354                                         PooledByteBuf<T> buf, int reqCapacity, PoolThreadCache threadCache);
355
356         /**
357          * Add to cache if not already full.
358          */

359         @SuppressWarnings("unchecked")
360         public final boolean add(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int normCapacity) {
361             Entry<T> entry = newEntry(chunk, nioBuffer, handle, normCapacity);
362             boolean queued = queue.offer(entry);
363             if (!queued) {
364                 // If it was not possible to cache the chunk, immediately recycle the entry
365                 entry.recycle();
366             }
367
368             return queued;
369         }
370
371         /**
372          * Allocate something out of the cache if possible and remove the entry from the cache.
373          */

374         public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity, PoolThreadCache threadCache) {
375             Entry<T> entry = queue.poll();
376             if (entry == null) {
377                 return false;
378             }
379             initBuf(entry.chunk, entry.nioBuffer, entry.handle, buf, reqCapacity, threadCache);
380             entry.recycle();
381
382             // allocations is not thread-safe which is fine as this is only called from the same thread all time.
383             ++ allocations;
384             return true;
385         }
386
387         /**
388          * Clear out this cache and free up all previous cached {@link PoolChunk}s and {@code handle}s.
389          */

390         public final int free(boolean finalizer) {
391             return free(Integer.MAX_VALUE, finalizer);
392         }
393
394         private int free(int max, boolean finalizer) {
395             int numFreed = 0;
396             for (; numFreed < max; numFreed++) {
397                 Entry<T> entry = queue.poll();
398                 if (entry != null) {
399                     freeEntry(entry, finalizer);
400                 } else {
401                     // all cleared
402                     return numFreed;
403                 }
404             }
405             return numFreed;
406         }
407
408         /**
409          * Free up cached {@link PoolChunk}s if not allocated frequently enough.
410          */

411         public final void trim() {
412             int free = size - allocations;
413             allocations = 0;
414
415             // We not even allocated all the number that are
416             if (free > 0) {
417                 free(free, false);
418             }
419         }
420
421         @SuppressWarnings({ "unchecked""rawtypes" })
422         private  void freeEntry(Entry entry, boolean finalizer) {
423             PoolChunk chunk = entry.chunk;
424             long handle = entry.handle;
425             ByteBuffer nioBuffer = entry.nioBuffer;
426
427             if (!finalizer) {
428                 // recycle now so PoolChunk can be GC'ed. This will only be done if this is not freed because of
429                 // a finalizer.
430                 entry.recycle();
431             }
432
433             chunk.arena.freeChunk(chunk, handle, entry.normCapacity, sizeClass, nioBuffer, finalizer);
434         }
435
436         static final class Entry<T> {
437             final Handle<Entry<?>> recyclerHandle;
438             PoolChunk<T> chunk;
439             ByteBuffer nioBuffer;
440             long handle = -1;
441             int normCapacity;
442
443             Entry(Handle<Entry<?>> recyclerHandle) {
444                 this.recyclerHandle = recyclerHandle;
445             }
446
447             void recycle() {
448                 chunk = null;
449                 nioBuffer = null;
450                 handle = -1;
451                 recyclerHandle.recycle(this);
452             }
453         }
454
455         @SuppressWarnings("rawtypes")
456         private static Entry newEntry(PoolChunk<?> chunk, ByteBuffer nioBuffer, long handle, int normCapacity) {
457             Entry entry = RECYCLER.get();
458             entry.chunk = chunk;
459             entry.nioBuffer = nioBuffer;
460             entry.handle = handle;
461             entry.normCapacity = normCapacity;
462             return entry;
463         }
464
465         @SuppressWarnings("rawtypes")
466         private static final ObjectPool<Entry> RECYCLER = ObjectPool.newPool(new ObjectCreator<Entry>() {
467             @SuppressWarnings("unchecked")
468             @Override
469             public Entry newObject(Handle<Entry> handle) {
470                 return new Entry(handle);
471             }
472         });
473     }
474 }
475