1
16
17 package io.netty.buffer;
18
19
20 import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
21
22 import io.netty.buffer.PoolArena.SizeClass;
23 import io.netty.util.internal.MathUtil;
24 import io.netty.util.internal.ObjectPool;
25 import io.netty.util.internal.ObjectPool.Handle;
26 import io.netty.util.internal.ObjectPool.ObjectCreator;
27 import io.netty.util.internal.PlatformDependent;
28 import io.netty.util.internal.logging.InternalLogger;
29 import io.netty.util.internal.logging.InternalLoggerFactory;
30
31 import java.nio.ByteBuffer;
32 import java.util.Queue;
33 import java.util.concurrent.atomic.AtomicBoolean;
34
35
42 final class PoolThreadCache {
43
44 private static final InternalLogger logger = InternalLoggerFactory.getInstance(PoolThreadCache.class);
45 private static final int INTEGER_SIZE_MINUS_ONE = Integer.SIZE - 1;
46
47 final PoolArena<byte[]> heapArena;
48 final PoolArena<ByteBuffer> directArena;
49
50
51 private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
52 private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
53 private final MemoryRegionCache<byte[]>[] normalHeapCaches;
54 private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
55
56
57 private final int numShiftsNormalDirect;
58 private final int numShiftsNormalHeap;
59 private final int freeSweepAllocationThreshold;
60 private final AtomicBoolean freed = new AtomicBoolean();
61
62 private int allocations;
63
64
65
66
67 PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
68 int smallCacheSize, int normalCacheSize, int maxCachedBufferCapacity,
69 int freeSweepAllocationThreshold) {
70 checkPositiveOrZero(maxCachedBufferCapacity, "maxCachedBufferCapacity");
71 this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
72 this.heapArena = heapArena;
73 this.directArena = directArena;
74 if (directArena != null) {
75 smallSubPageDirectCaches = createSubPageCaches(
76 smallCacheSize, directArena.numSmallSubpagePools);
77
78 numShiftsNormalDirect = log2(directArena.pageSize);
79 normalDirectCaches = createNormalCaches(
80 normalCacheSize, maxCachedBufferCapacity, directArena);
81
82 directArena.numThreadCaches.getAndIncrement();
83 } else {
84
85 smallSubPageDirectCaches = null;
86 normalDirectCaches = null;
87 numShiftsNormalDirect = -1;
88 }
89 if (heapArena != null) {
90
91 smallSubPageHeapCaches = createSubPageCaches(
92 smallCacheSize, heapArena.numSmallSubpagePools);
93
94 numShiftsNormalHeap = log2(heapArena.pageSize);
95 normalHeapCaches = createNormalCaches(
96 normalCacheSize, maxCachedBufferCapacity, heapArena);
97
98 heapArena.numThreadCaches.getAndIncrement();
99 } else {
100
101 smallSubPageHeapCaches = null;
102 normalHeapCaches = null;
103 numShiftsNormalHeap = -1;
104 }
105
106
107 if ((smallSubPageDirectCaches != null || normalDirectCaches != null
108 || smallSubPageHeapCaches != null || normalHeapCaches != null)
109 && freeSweepAllocationThreshold < 1) {
110 throw new IllegalArgumentException("freeSweepAllocationThreshold: "
111 + freeSweepAllocationThreshold + " (expected: > 0)");
112 }
113 }
114
115 private static <T> MemoryRegionCache<T>[] createSubPageCaches(
116 int cacheSize, int numCaches) {
117 if (cacheSize > 0 && numCaches > 0) {
118 @SuppressWarnings("unchecked")
119 MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];
120 for (int i = 0; i < cache.length; i++) {
121
122 cache[i] = new SubPageMemoryRegionCache<T>(cacheSize);
123 }
124 return cache;
125 } else {
126 return null;
127 }
128 }
129
130 private static <T> MemoryRegionCache<T>[] createNormalCaches(
131 int cacheSize, int maxCachedBufferCapacity, PoolArena<T> area) {
132 if (cacheSize > 0 && maxCachedBufferCapacity > 0) {
133 int max = Math.min(area.chunkSize, maxCachedBufferCapacity);
134 int arraySize = Math.max(1, log2(max / area.pageSize) + 1);
135
136 @SuppressWarnings("unchecked")
137 MemoryRegionCache<T>[] cache = new MemoryRegionCache[arraySize];
138 for (int i = 0; i < cache.length; i++) {
139 cache[i] = new NormalMemoryRegionCache<T>(cacheSize);
140 }
141 return cache;
142 } else {
143 return null;
144 }
145 }
146
147
148 static int log2(int val) {
149 return INTEGER_SIZE_MINUS_ONE - Integer.numberOfLeadingZeros(val);
150 }
151
152
155 boolean allocateSmall(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int sizeIdx) {
156 return allocate(cacheForSmall(area, sizeIdx), buf, reqCapacity);
157 }
158
159
162 boolean allocateNormal(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
163 return allocate(cacheForNormal(area, normCapacity), buf, reqCapacity);
164 }
165
166 @SuppressWarnings({ "unchecked", "rawtypes" })
167 private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {
168 if (cache == null) {
169
170 return false;
171 }
172 boolean allocated = cache.allocate(buf, reqCapacity, this);
173 if (++ allocations >= freeSweepAllocationThreshold) {
174 allocations = 0;
175 trim();
176 }
177 return allocated;
178 }
179
180
184 @SuppressWarnings({ "unchecked", "rawtypes" })
185 boolean add(PoolArena<?> area, PoolChunk chunk, ByteBuffer nioBuffer,
186 long handle, int normCapacity, SizeClass sizeClass) {
187 int sizeIdx = area.size2SizeIdx(normCapacity);
188 MemoryRegionCache<?> cache = cache(area, sizeIdx, sizeClass);
189 if (cache == null) {
190 return false;
191 }
192 return cache.add(chunk, nioBuffer, handle, normCapacity);
193 }
194
195 private MemoryRegionCache<?> cache(PoolArena<?> area, int sizeIdx, SizeClass sizeClass) {
196 switch (sizeClass) {
197 case Normal:
198 return cacheForNormal(area, sizeIdx);
199 case Small:
200 return cacheForSmall(area, sizeIdx);
201 default:
202 throw new Error();
203 }
204 }
205
206
207 @Override
208 protected void finalize() throws Throwable {
209 try {
210 super.finalize();
211 } finally {
212 free(true);
213 }
214 }
215
216
219 void free(boolean finalizer) {
220
221
222 if (freed.compareAndSet(false, true)) {
223 int numFreed = free(smallSubPageDirectCaches, finalizer) +
224 free(normalDirectCaches, finalizer) +
225 free(smallSubPageHeapCaches, finalizer) +
226 free(normalHeapCaches, finalizer);
227
228 if (numFreed > 0 && logger.isDebugEnabled()) {
229 logger.debug("Freed {} thread-local buffer(s) from thread: {}", numFreed,
230 Thread.currentThread().getName());
231 }
232
233 if (directArena != null) {
234 directArena.numThreadCaches.getAndDecrement();
235 }
236
237 if (heapArena != null) {
238 heapArena.numThreadCaches.getAndDecrement();
239 }
240 }
241 }
242
243 private static int free(MemoryRegionCache<?>[] caches, boolean finalizer) {
244 if (caches == null) {
245 return 0;
246 }
247
248 int numFreed = 0;
249 for (MemoryRegionCache<?> c: caches) {
250 numFreed += free(c, finalizer);
251 }
252 return numFreed;
253 }
254
255 private static int free(MemoryRegionCache<?> cache, boolean finalizer) {
256 if (cache == null) {
257 return 0;
258 }
259 return cache.free(finalizer);
260 }
261
262 void trim() {
263 trim(smallSubPageDirectCaches);
264 trim(normalDirectCaches);
265 trim(smallSubPageHeapCaches);
266 trim(normalHeapCaches);
267 }
268
269 private static void trim(MemoryRegionCache<?>[] caches) {
270 if (caches == null) {
271 return;
272 }
273 for (MemoryRegionCache<?> c: caches) {
274 trim(c);
275 }
276 }
277
278 private static void trim(MemoryRegionCache<?> cache) {
279 if (cache == null) {
280 return;
281 }
282 cache.trim();
283 }
284
285 private MemoryRegionCache<?> cacheForSmall(PoolArena<?> area, int sizeIdx) {
286 if (area.isDirect()) {
287 return cache(smallSubPageDirectCaches, sizeIdx);
288 }
289 return cache(smallSubPageHeapCaches, sizeIdx);
290 }
291
292 private MemoryRegionCache<?> cacheForNormal(PoolArena<?> area, int sizeIdx) {
293 if (area.isDirect()) {
294 return cache(normalDirectCaches, sizeIdx);
295 }
296 return cache(normalHeapCaches, sizeIdx);
297 }
298
299 private static <T> MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int sizeIdx) {
300 if (cache == null || sizeIdx > cache.length - 1) {
301 return null;
302 }
303 return cache[sizeIdx];
304 }
305
306
309 private static final class SubPageMemoryRegionCache<T> extends MemoryRegionCache<T> {
310 SubPageMemoryRegionCache(int size) {
311 super(size, SizeClass.Small);
312 }
313
314 @Override
315 protected void initBuf(
316 PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, PooledByteBuf<T> buf, int reqCapacity,
317 PoolThreadCache threadCache) {
318 chunk.initBufWithSubpage(buf, nioBuffer, handle, reqCapacity, threadCache);
319 }
320 }
321
322
325 private static final class NormalMemoryRegionCache<T> extends MemoryRegionCache<T> {
326 NormalMemoryRegionCache(int size) {
327 super(size, SizeClass.Normal);
328 }
329
330 @Override
331 protected void initBuf(
332 PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, PooledByteBuf<T> buf, int reqCapacity,
333 PoolThreadCache threadCache) {
334 chunk.initBuf(buf, nioBuffer, handle, reqCapacity, threadCache);
335 }
336 }
337
338 private abstract static class MemoryRegionCache<T> {
339 private final int size;
340 private final Queue<Entry<T>> queue;
341 private final SizeClass sizeClass;
342 private int allocations;
343
344 MemoryRegionCache(int size, SizeClass sizeClass) {
345 this.size = MathUtil.safeFindNextPositivePowerOfTwo(size);
346 queue = PlatformDependent.newFixedMpscQueue(this.size);
347 this.sizeClass = sizeClass;
348 }
349
350
353 protected abstract void initBuf(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle,
354 PooledByteBuf<T> buf, int reqCapacity, PoolThreadCache threadCache);
355
356
359 @SuppressWarnings("unchecked")
360 public final boolean add(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int normCapacity) {
361 Entry<T> entry = newEntry(chunk, nioBuffer, handle, normCapacity);
362 boolean queued = queue.offer(entry);
363 if (!queued) {
364
365 entry.recycle();
366 }
367
368 return queued;
369 }
370
371
374 public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity, PoolThreadCache threadCache) {
375 Entry<T> entry = queue.poll();
376 if (entry == null) {
377 return false;
378 }
379 initBuf(entry.chunk, entry.nioBuffer, entry.handle, buf, reqCapacity, threadCache);
380 entry.recycle();
381
382
383 ++ allocations;
384 return true;
385 }
386
387
390 public final int free(boolean finalizer) {
391 return free(Integer.MAX_VALUE, finalizer);
392 }
393
394 private int free(int max, boolean finalizer) {
395 int numFreed = 0;
396 for (; numFreed < max; numFreed++) {
397 Entry<T> entry = queue.poll();
398 if (entry != null) {
399 freeEntry(entry, finalizer);
400 } else {
401
402 return numFreed;
403 }
404 }
405 return numFreed;
406 }
407
408
411 public final void trim() {
412 int free = size - allocations;
413 allocations = 0;
414
415
416 if (free > 0) {
417 free(free, false);
418 }
419 }
420
421 @SuppressWarnings({ "unchecked", "rawtypes" })
422 private void freeEntry(Entry entry, boolean finalizer) {
423 PoolChunk chunk = entry.chunk;
424 long handle = entry.handle;
425 ByteBuffer nioBuffer = entry.nioBuffer;
426
427 if (!finalizer) {
428
429
430 entry.recycle();
431 }
432
433 chunk.arena.freeChunk(chunk, handle, entry.normCapacity, sizeClass, nioBuffer, finalizer);
434 }
435
436 static final class Entry<T> {
437 final Handle<Entry<?>> recyclerHandle;
438 PoolChunk<T> chunk;
439 ByteBuffer nioBuffer;
440 long handle = -1;
441 int normCapacity;
442
443 Entry(Handle<Entry<?>> recyclerHandle) {
444 this.recyclerHandle = recyclerHandle;
445 }
446
447 void recycle() {
448 chunk = null;
449 nioBuffer = null;
450 handle = -1;
451 recyclerHandle.recycle(this);
452 }
453 }
454
455 @SuppressWarnings("rawtypes")
456 private static Entry newEntry(PoolChunk<?> chunk, ByteBuffer nioBuffer, long handle, int normCapacity) {
457 Entry entry = RECYCLER.get();
458 entry.chunk = chunk;
459 entry.nioBuffer = nioBuffer;
460 entry.handle = handle;
461 entry.normCapacity = normCapacity;
462 return entry;
463 }
464
465 @SuppressWarnings("rawtypes")
466 private static final ObjectPool<Entry> RECYCLER = ObjectPool.newPool(new ObjectCreator<Entry>() {
467 @SuppressWarnings("unchecked")
468 @Override
469 public Entry newObject(Handle<Entry> handle) {
470 return new Entry(handle);
471 }
472 });
473 }
474 }
475