1
16
17 package io.netty.buffer;
18
19 import io.netty.util.internal.LongCounter;
20 import io.netty.util.internal.PlatformDependent;
21 import io.netty.util.internal.StringUtil;
22
23 import java.nio.ByteBuffer;
24 import java.util.ArrayList;
25 import java.util.Collections;
26 import java.util.List;
27 import java.util.concurrent.atomic.AtomicInteger;
28
29 import static io.netty.buffer.PoolChunk.isSubpage;
30 import static java.lang.Math.max;
31
32 abstract class PoolArena<T> extends SizeClasses implements PoolArenaMetric {
33 static final boolean HAS_UNSAFE = PlatformDependent.hasUnsafe();
34
35 enum SizeClass {
36 Small,
37 Normal
38 }
39
40 final PooledByteBufAllocator parent;
41
42 final int numSmallSubpagePools;
43 final int directMemoryCacheAlignment;
44 final int directMemoryCacheAlignmentMask;
45 private final PoolSubpage<T>[] smallSubpagePools;
46
47 private final PoolChunkList<T> q050;
48 private final PoolChunkList<T> q025;
49 private final PoolChunkList<T> q000;
50 private final PoolChunkList<T> qInit;
51 private final PoolChunkList<T> q075;
52 private final PoolChunkList<T> q100;
53
54 private final List<PoolChunkListMetric> chunkListMetrics;
55
56
57 private long allocationsNormal;
58
59 private final LongCounter allocationsSmall = PlatformDependent.newLongCounter();
60 private final LongCounter allocationsHuge = PlatformDependent.newLongCounter();
61 private final LongCounter activeBytesHuge = PlatformDependent.newLongCounter();
62
63 private long deallocationsSmall;
64 private long deallocationsNormal;
65
66
67 private final LongCounter deallocationsHuge = PlatformDependent.newLongCounter();
68
69
70 final AtomicInteger numThreadCaches = new AtomicInteger();
71
72
73
74
75 protected PoolArena(PooledByteBufAllocator parent, int pageSize,
76 int pageShifts, int chunkSize, int cacheAlignment) {
77 super(pageSize, pageShifts, chunkSize, cacheAlignment);
78 this.parent = parent;
79 directMemoryCacheAlignment = cacheAlignment;
80 directMemoryCacheAlignmentMask = cacheAlignment - 1;
81
82 numSmallSubpagePools = nSubpages;
83 smallSubpagePools = newSubpagePoolArray(numSmallSubpagePools);
84 for (int i = 0; i < smallSubpagePools.length; i ++) {
85 smallSubpagePools[i] = newSubpagePoolHead();
86 }
87
88 q100 = new PoolChunkList<T>(this, null, 100, Integer.MAX_VALUE, chunkSize);
89 q075 = new PoolChunkList<T>(this, q100, 75, 100, chunkSize);
90 q050 = new PoolChunkList<T>(this, q075, 50, 100, chunkSize);
91 q025 = new PoolChunkList<T>(this, q050, 25, 75, chunkSize);
92 q000 = new PoolChunkList<T>(this, q025, 1, 50, chunkSize);
93 qInit = new PoolChunkList<T>(this, q000, Integer.MIN_VALUE, 25, chunkSize);
94
95 q100.prevList(q075);
96 q075.prevList(q050);
97 q050.prevList(q025);
98 q025.prevList(q000);
99 q000.prevList(null);
100 qInit.prevList(qInit);
101
102 List<PoolChunkListMetric> metrics = new ArrayList<PoolChunkListMetric>(6);
103 metrics.add(qInit);
104 metrics.add(q000);
105 metrics.add(q025);
106 metrics.add(q050);
107 metrics.add(q075);
108 metrics.add(q100);
109 chunkListMetrics = Collections.unmodifiableList(metrics);
110 }
111
112 private PoolSubpage<T> newSubpagePoolHead() {
113 PoolSubpage<T> head = new PoolSubpage<T>();
114 head.prev = head;
115 head.next = head;
116 return head;
117 }
118
119 @SuppressWarnings("unchecked")
120 private PoolSubpage<T>[] newSubpagePoolArray(int size) {
121 return new PoolSubpage[size];
122 }
123
124 abstract boolean isDirect();
125
126 PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
127 PooledByteBuf<T> buf = newByteBuf(maxCapacity);
128 allocate(cache, buf, reqCapacity);
129 return buf;
130 }
131
132 private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
133 final int sizeIdx = size2SizeIdx(reqCapacity);
134
135 if (sizeIdx <= smallMaxSizeIdx) {
136 tcacheAllocateSmall(cache, buf, reqCapacity, sizeIdx);
137 } else if (sizeIdx < nSizes) {
138 tcacheAllocateNormal(cache, buf, reqCapacity, sizeIdx);
139 } else {
140 int normCapacity = directMemoryCacheAlignment > 0
141 ? normalizeSize(reqCapacity) : reqCapacity;
142
143 allocateHuge(buf, normCapacity);
144 }
145 }
146
147 private void tcacheAllocateSmall(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity,
148 final int sizeIdx) {
149
150 if (cache.allocateSmall(this, buf, reqCapacity, sizeIdx)) {
151
152 return;
153 }
154
155
159 final PoolSubpage<T> head = smallSubpagePools[sizeIdx];
160 final boolean needsNormalAllocation;
161 synchronized (head) {
162 final PoolSubpage<T> s = head.next;
163 needsNormalAllocation = s == head;
164 if (!needsNormalAllocation) {
165 assert s.doNotDestroy && s.elemSize == sizeIdx2size(sizeIdx);
166 long handle = s.allocate();
167 assert handle >= 0;
168 s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity, cache);
169 }
170 }
171
172 if (needsNormalAllocation) {
173 synchronized (this) {
174 allocateNormal(buf, reqCapacity, sizeIdx, cache);
175 }
176 }
177
178 incSmallAllocation();
179 }
180
181 private void tcacheAllocateNormal(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity,
182 final int sizeIdx) {
183 if (cache.allocateNormal(this, buf, reqCapacity, sizeIdx)) {
184
185 return;
186 }
187 synchronized (this) {
188 allocateNormal(buf, reqCapacity, sizeIdx, cache);
189 ++allocationsNormal;
190 }
191 }
192
193
194 private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int sizeIdx, PoolThreadCache threadCache) {
195 if (q050.allocate(buf, reqCapacity, sizeIdx, threadCache) ||
196 q025.allocate(buf, reqCapacity, sizeIdx, threadCache) ||
197 q000.allocate(buf, reqCapacity, sizeIdx, threadCache) ||
198 qInit.allocate(buf, reqCapacity, sizeIdx, threadCache) ||
199 q075.allocate(buf, reqCapacity, sizeIdx, threadCache)) {
200 return;
201 }
202
203
204 PoolChunk<T> c = newChunk(pageSize, nPSizes, pageShifts, chunkSize);
205 boolean success = c.allocate(buf, reqCapacity, sizeIdx, threadCache);
206 assert success;
207 qInit.add(c);
208 }
209
210 private void incSmallAllocation() {
211 allocationsSmall.increment();
212 }
213
214 private void allocateHuge(PooledByteBuf<T> buf, int reqCapacity) {
215 PoolChunk<T> chunk = newUnpooledChunk(reqCapacity);
216 activeBytesHuge.add(chunk.chunkSize());
217 buf.initUnpooled(chunk, reqCapacity);
218 allocationsHuge.increment();
219 }
220
221 void free(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int normCapacity, PoolThreadCache cache) {
222 if (chunk.unpooled) {
223 int size = chunk.chunkSize();
224 destroyChunk(chunk);
225 activeBytesHuge.add(-size);
226 deallocationsHuge.increment();
227 } else {
228 SizeClass sizeClass = sizeClass(handle);
229 if (cache != null && cache.add(this, chunk, nioBuffer, handle, normCapacity, sizeClass)) {
230
231 return;
232 }
233
234 freeChunk(chunk, handle, normCapacity, sizeClass, nioBuffer, false);
235 }
236 }
237
238 private SizeClass sizeClass(long handle) {
239 return isSubpage(handle) ? SizeClass.Small : SizeClass.Normal;
240 }
241
242 void freeChunk(PoolChunk<T> chunk, long handle, int normCapacity, SizeClass sizeClass, ByteBuffer nioBuffer,
243 boolean finalizer) {
244 final boolean destroyChunk;
245 synchronized (this) {
246
247
248 if (!finalizer) {
249 switch (sizeClass) {
250 case Normal:
251 ++deallocationsNormal;
252 break;
253 case Small:
254 ++deallocationsSmall;
255 break;
256 default:
257 throw new Error();
258 }
259 }
260 destroyChunk = !chunk.parent.free(chunk, handle, normCapacity, nioBuffer);
261 }
262 if (destroyChunk) {
263
264 destroyChunk(chunk);
265 }
266 }
267
268 PoolSubpage<T> findSubpagePoolHead(int sizeIdx) {
269 return smallSubpagePools[sizeIdx];
270 }
271
272 void reallocate(PooledByteBuf<T> buf, int newCapacity, boolean freeOldMemory) {
273 assert newCapacity >= 0 && newCapacity <= buf.maxCapacity();
274
275 int oldCapacity = buf.length;
276 if (oldCapacity == newCapacity) {
277 return;
278 }
279
280 PoolChunk<T> oldChunk = buf.chunk;
281 ByteBuffer oldNioBuffer = buf.tmpNioBuf;
282 long oldHandle = buf.handle;
283 T oldMemory = buf.memory;
284 int oldOffset = buf.offset;
285 int oldMaxLength = buf.maxLength;
286
287
288 allocate(parent.threadCache(), buf, newCapacity);
289 int bytesToCopy;
290 if (newCapacity > oldCapacity) {
291 bytesToCopy = oldCapacity;
292 } else {
293 buf.trimIndicesToCapacity(newCapacity);
294 bytesToCopy = newCapacity;
295 }
296 memoryCopy(oldMemory, oldOffset, buf, bytesToCopy);
297 if (freeOldMemory) {
298 free(oldChunk, oldNioBuffer, oldHandle, oldMaxLength, buf.cache);
299 }
300 }
301
302 @Override
303 public int numThreadCaches() {
304 return numThreadCaches.get();
305 }
306
307 @Override
308 public int numTinySubpages() {
309 return 0;
310 }
311
312 @Override
313 public int numSmallSubpages() {
314 return smallSubpagePools.length;
315 }
316
317 @Override
318 public int numChunkLists() {
319 return chunkListMetrics.size();
320 }
321
322 @Override
323 public List<PoolSubpageMetric> tinySubpages() {
324 return Collections.emptyList();
325 }
326
327 @Override
328 public List<PoolSubpageMetric> smallSubpages() {
329 return subPageMetricList(smallSubpagePools);
330 }
331
332 @Override
333 public List<PoolChunkListMetric> chunkLists() {
334 return chunkListMetrics;
335 }
336
337 private static List<PoolSubpageMetric> subPageMetricList(PoolSubpage<?>[] pages) {
338 List<PoolSubpageMetric> metrics = new ArrayList<PoolSubpageMetric>();
339 for (PoolSubpage<?> head : pages) {
340 if (head.next == head) {
341 continue;
342 }
343 PoolSubpage<?> s = head.next;
344 for (;;) {
345 metrics.add(s);
346 s = s.next;
347 if (s == head) {
348 break;
349 }
350 }
351 }
352 return metrics;
353 }
354
355 @Override
356 public long numAllocations() {
357 final long allocsNormal;
358 synchronized (this) {
359 allocsNormal = allocationsNormal;
360 }
361 return allocationsSmall.value() + allocsNormal + allocationsHuge.value();
362 }
363
364 @Override
365 public long numTinyAllocations() {
366 return 0;
367 }
368
369 @Override
370 public long numSmallAllocations() {
371 return allocationsSmall.value();
372 }
373
374 @Override
375 public synchronized long numNormalAllocations() {
376 return allocationsNormal;
377 }
378
379 @Override
380 public long numDeallocations() {
381 final long deallocs;
382 synchronized (this) {
383 deallocs = deallocationsSmall + deallocationsNormal;
384 }
385 return deallocs + deallocationsHuge.value();
386 }
387
388 @Override
389 public long numTinyDeallocations() {
390 return 0;
391 }
392
393 @Override
394 public synchronized long numSmallDeallocations() {
395 return deallocationsSmall;
396 }
397
398 @Override
399 public synchronized long numNormalDeallocations() {
400 return deallocationsNormal;
401 }
402
403 @Override
404 public long numHugeAllocations() {
405 return allocationsHuge.value();
406 }
407
408 @Override
409 public long numHugeDeallocations() {
410 return deallocationsHuge.value();
411 }
412
413 @Override
414 public long numActiveAllocations() {
415 long val = allocationsSmall.value() + allocationsHuge.value()
416 - deallocationsHuge.value();
417 synchronized (this) {
418 val += allocationsNormal - (deallocationsSmall + deallocationsNormal);
419 }
420 return max(val, 0);
421 }
422
423 @Override
424 public long numActiveTinyAllocations() {
425 return 0;
426 }
427
428 @Override
429 public long numActiveSmallAllocations() {
430 return max(numSmallAllocations() - numSmallDeallocations(), 0);
431 }
432
433 @Override
434 public long numActiveNormalAllocations() {
435 final long val;
436 synchronized (this) {
437 val = allocationsNormal - deallocationsNormal;
438 }
439 return max(val, 0);
440 }
441
442 @Override
443 public long numActiveHugeAllocations() {
444 return max(numHugeAllocations() - numHugeDeallocations(), 0);
445 }
446
447 @Override
448 public long numActiveBytes() {
449 long val = activeBytesHuge.value();
450 synchronized (this) {
451 for (int i = 0; i < chunkListMetrics.size(); i++) {
452 for (PoolChunkMetric m: chunkListMetrics.get(i)) {
453 val += m.chunkSize();
454 }
455 }
456 }
457 return max(0, val);
458 }
459
460 protected abstract PoolChunk<T> newChunk(int pageSize, int maxPageIdx, int pageShifts, int chunkSize);
461 protected abstract PoolChunk<T> newUnpooledChunk(int capacity);
462 protected abstract PooledByteBuf<T> newByteBuf(int maxCapacity);
463 protected abstract void memoryCopy(T src, int srcOffset, PooledByteBuf<T> dst, int length);
464 protected abstract void destroyChunk(PoolChunk<T> chunk);
465
466 @Override
467 public synchronized String toString() {
468 StringBuilder buf = new StringBuilder()
469 .append("Chunk(s) at 0~25%:")
470 .append(StringUtil.NEWLINE)
471 .append(qInit)
472 .append(StringUtil.NEWLINE)
473 .append("Chunk(s) at 0~50%:")
474 .append(StringUtil.NEWLINE)
475 .append(q000)
476 .append(StringUtil.NEWLINE)
477 .append("Chunk(s) at 25~75%:")
478 .append(StringUtil.NEWLINE)
479 .append(q025)
480 .append(StringUtil.NEWLINE)
481 .append("Chunk(s) at 50~100%:")
482 .append(StringUtil.NEWLINE)
483 .append(q050)
484 .append(StringUtil.NEWLINE)
485 .append("Chunk(s) at 75~100%:")
486 .append(StringUtil.NEWLINE)
487 .append(q075)
488 .append(StringUtil.NEWLINE)
489 .append("Chunk(s) at 100%:")
490 .append(StringUtil.NEWLINE)
491 .append(q100)
492 .append(StringUtil.NEWLINE)
493 .append("small subpages:");
494 appendPoolSubPages(buf, smallSubpagePools);
495 buf.append(StringUtil.NEWLINE);
496
497 return buf.toString();
498 }
499
500 private static void appendPoolSubPages(StringBuilder buf, PoolSubpage<?>[] subpages) {
501 for (int i = 0; i < subpages.length; i ++) {
502 PoolSubpage<?> head = subpages[i];
503 if (head.next == head) {
504 continue;
505 }
506
507 buf.append(StringUtil.NEWLINE)
508 .append(i)
509 .append(": ");
510 PoolSubpage<?> s = head.next;
511 for (;;) {
512 buf.append(s);
513 s = s.next;
514 if (s == head) {
515 break;
516 }
517 }
518 }
519 }
520
521 @Override
522 protected final void finalize() throws Throwable {
523 try {
524 super.finalize();
525 } finally {
526 destroyPoolSubPages(smallSubpagePools);
527 destroyPoolChunkLists(qInit, q000, q025, q050, q075, q100);
528 }
529 }
530
531 private static void destroyPoolSubPages(PoolSubpage<?>[] pages) {
532 for (PoolSubpage<?> page : pages) {
533 page.destroy();
534 }
535 }
536
537 private void destroyPoolChunkLists(PoolChunkList<T>... chunkLists) {
538 for (PoolChunkList<T> chunkList: chunkLists) {
539 chunkList.destroy(this);
540 }
541 }
542
543 static final class HeapArena extends PoolArena<byte[]> {
544
545 HeapArena(PooledByteBufAllocator parent, int pageSize, int pageShifts,
546 int chunkSize, int directMemoryCacheAlignment) {
547 super(parent, pageSize, pageShifts, chunkSize,
548 directMemoryCacheAlignment);
549 }
550
551 private static byte[] newByteArray(int size) {
552 return PlatformDependent.allocateUninitializedArray(size);
553 }
554
555 @Override
556 boolean isDirect() {
557 return false;
558 }
559
560 @Override
561 protected PoolChunk<byte[]> newChunk(int pageSize, int maxPageIdx, int pageShifts, int chunkSize) {
562 return new PoolChunk<byte[]>(this, newByteArray(chunkSize), pageSize, pageShifts, chunkSize, maxPageIdx, 0);
563 }
564
565 @Override
566 protected PoolChunk<byte[]> newUnpooledChunk(int capacity) {
567 return new PoolChunk<byte[]>(this, newByteArray(capacity), capacity, 0);
568 }
569
570 @Override
571 protected void destroyChunk(PoolChunk<byte[]> chunk) {
572
573 }
574
575 @Override
576 protected PooledByteBuf<byte[]> newByteBuf(int maxCapacity) {
577 return HAS_UNSAFE ? PooledUnsafeHeapByteBuf.newUnsafeInstance(maxCapacity)
578 : PooledHeapByteBuf.newInstance(maxCapacity);
579 }
580
581 @Override
582 protected void memoryCopy(byte[] src, int srcOffset, PooledByteBuf<byte[]> dst, int length) {
583 if (length == 0) {
584 return;
585 }
586
587 System.arraycopy(src, srcOffset, dst.memory, dst.offset, length);
588 }
589 }
590
591 static final class DirectArena extends PoolArena<ByteBuffer> {
592
593 DirectArena(PooledByteBufAllocator parent, int pageSize, int pageShifts,
594 int chunkSize, int directMemoryCacheAlignment) {
595 super(parent, pageSize, pageShifts, chunkSize,
596 directMemoryCacheAlignment);
597 }
598
599 @Override
600 boolean isDirect() {
601 return true;
602 }
603
604
605 int offsetCacheLine(ByteBuffer memory) {
606
607
608 int remainder = HAS_UNSAFE
609 ? (int) (PlatformDependent.directBufferAddress(memory) & directMemoryCacheAlignmentMask)
610 : 0;
611
612
613 return directMemoryCacheAlignment - remainder;
614 }
615
616 @Override
617 protected PoolChunk<ByteBuffer> newChunk(int pageSize, int maxPageIdx,
618 int pageShifts, int chunkSize) {
619 if (directMemoryCacheAlignment == 0) {
620 return new PoolChunk<ByteBuffer>(this,
621 allocateDirect(chunkSize), pageSize, pageShifts,
622 chunkSize, maxPageIdx, 0);
623 }
624 final ByteBuffer memory = allocateDirect(chunkSize
625 + directMemoryCacheAlignment);
626 return new PoolChunk<ByteBuffer>(this, memory, pageSize,
627 pageShifts, chunkSize, maxPageIdx,
628 offsetCacheLine(memory));
629 }
630
631 @Override
632 protected PoolChunk<ByteBuffer> newUnpooledChunk(int capacity) {
633 if (directMemoryCacheAlignment == 0) {
634 return new PoolChunk<ByteBuffer>(this,
635 allocateDirect(capacity), capacity, 0);
636 }
637 final ByteBuffer memory = allocateDirect(capacity
638 + directMemoryCacheAlignment);
639 return new PoolChunk<ByteBuffer>(this, memory, capacity,
640 offsetCacheLine(memory));
641 }
642
643 private static ByteBuffer allocateDirect(int capacity) {
644 return PlatformDependent.useDirectBufferNoCleaner() ?
645 PlatformDependent.allocateDirectNoCleaner(capacity) : ByteBuffer.allocateDirect(capacity);
646 }
647
648 @Override
649 protected void destroyChunk(PoolChunk<ByteBuffer> chunk) {
650 if (PlatformDependent.useDirectBufferNoCleaner()) {
651 PlatformDependent.freeDirectNoCleaner(chunk.memory);
652 } else {
653 PlatformDependent.freeDirectBuffer(chunk.memory);
654 }
655 }
656
657 @Override
658 protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) {
659 if (HAS_UNSAFE) {
660 return PooledUnsafeDirectByteBuf.newInstance(maxCapacity);
661 } else {
662 return PooledDirectByteBuf.newInstance(maxCapacity);
663 }
664 }
665
666 @Override
667 protected void memoryCopy(ByteBuffer src, int srcOffset, PooledByteBuf<ByteBuffer> dstBuf, int length) {
668 if (length == 0) {
669 return;
670 }
671
672 if (HAS_UNSAFE) {
673 PlatformDependent.copyMemory(
674 PlatformDependent.directBufferAddress(src) + srcOffset,
675 PlatformDependent.directBufferAddress(dstBuf.memory) + dstBuf.offset, length);
676 } else {
677
678 src = src.duplicate();
679 ByteBuffer dst = dstBuf.internalNioBuffer();
680 src.position(srcOffset).limit(srcOffset + length);
681 dst.position(dstBuf.offset);
682 dst.put(src);
683 }
684 }
685 }
686 }
687