1 /*
2  * Copyright 2012 The Netty Project
3  *
4  * The Netty Project licenses this file to you under the Apache License,
5  * version 2.0 (the "License"); you may not use this file except in compliance
6  * with the License. You may obtain a copy of the License at:
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13  * License for the specific language governing permissions and limitations
14  * under the License.
15  */

16
17 package io.netty.buffer;
18
19 import io.netty.util.internal.LongCounter;
20 import io.netty.util.internal.PlatformDependent;
21 import io.netty.util.internal.StringUtil;
22
23 import java.nio.ByteBuffer;
24 import java.util.ArrayList;
25 import java.util.Collections;
26 import java.util.List;
27 import java.util.concurrent.atomic.AtomicInteger;
28
29 import static io.netty.buffer.PoolChunk.isSubpage;
30 import static java.lang.Math.max;
31
32 abstract class PoolArena<T> extends SizeClasses implements PoolArenaMetric {
33     static final boolean HAS_UNSAFE = PlatformDependent.hasUnsafe();
34
35     enum SizeClass {
36         Small,
37         Normal
38     }
39
40     final PooledByteBufAllocator parent;
41
42     final int numSmallSubpagePools;
43     final int directMemoryCacheAlignment;
44     final int directMemoryCacheAlignmentMask;
45     private final PoolSubpage<T>[] smallSubpagePools;
46
47     private final PoolChunkList<T> q050;
48     private final PoolChunkList<T> q025;
49     private final PoolChunkList<T> q000;
50     private final PoolChunkList<T> qInit;
51     private final PoolChunkList<T> q075;
52     private final PoolChunkList<T> q100;
53
54     private final List<PoolChunkListMetric> chunkListMetrics;
55
56     // Metrics for allocations and deallocations
57     private long allocationsNormal;
58     // We need to use the LongCounter here as this is not guarded via synchronized block.
59     private final LongCounter allocationsSmall = PlatformDependent.newLongCounter();
60     private final LongCounter allocationsHuge = PlatformDependent.newLongCounter();
61     private final LongCounter activeBytesHuge = PlatformDependent.newLongCounter();
62
63     private long deallocationsSmall;
64     private long deallocationsNormal;
65
66     // We need to use the LongCounter here as this is not guarded via synchronized block.
67     private final LongCounter deallocationsHuge = PlatformDependent.newLongCounter();
68
69     // Number of thread caches backed by this arena.
70     final AtomicInteger numThreadCaches = new AtomicInteger();
71
72     // TODO: Test if adding padding helps under contention
73     //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
74
75     protected PoolArena(PooledByteBufAllocator parent, int pageSize,
76           int pageShifts, int chunkSize, int cacheAlignment) {
77         super(pageSize, pageShifts, chunkSize, cacheAlignment);
78         this.parent = parent;
79         directMemoryCacheAlignment = cacheAlignment;
80         directMemoryCacheAlignmentMask = cacheAlignment - 1;
81
82         numSmallSubpagePools = nSubpages;
83         smallSubpagePools = newSubpagePoolArray(numSmallSubpagePools);
84         for (int i = 0; i < smallSubpagePools.length; i ++) {
85             smallSubpagePools[i] = newSubpagePoolHead();
86         }
87
88         q100 = new PoolChunkList<T>(thisnull, 100, Integer.MAX_VALUE, chunkSize);
89         q075 = new PoolChunkList<T>(this, q100, 75, 100, chunkSize);
90         q050 = new PoolChunkList<T>(this, q075, 50, 100, chunkSize);
91         q025 = new PoolChunkList<T>(this, q050, 25, 75, chunkSize);
92         q000 = new PoolChunkList<T>(this, q025, 1, 50, chunkSize);
93         qInit = new PoolChunkList<T>(this, q000, Integer.MIN_VALUE, 25, chunkSize);
94
95         q100.prevList(q075);
96         q075.prevList(q050);
97         q050.prevList(q025);
98         q025.prevList(q000);
99         q000.prevList(null);
100         qInit.prevList(qInit);
101
102         List<PoolChunkListMetric> metrics = new ArrayList<PoolChunkListMetric>(6);
103         metrics.add(qInit);
104         metrics.add(q000);
105         metrics.add(q025);
106         metrics.add(q050);
107         metrics.add(q075);
108         metrics.add(q100);
109         chunkListMetrics = Collections.unmodifiableList(metrics);
110     }
111
112     private PoolSubpage<T> newSubpagePoolHead() {
113         PoolSubpage<T> head = new PoolSubpage<T>();
114         head.prev = head;
115         head.next = head;
116         return head;
117     }
118
119     @SuppressWarnings("unchecked")
120     private PoolSubpage<T>[] newSubpagePoolArray(int size) {
121         return new PoolSubpage[size];
122     }
123
124     abstract boolean isDirect();
125
126     PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
127         PooledByteBuf<T> buf = newByteBuf(maxCapacity);
128         allocate(cache, buf, reqCapacity);
129         return buf;
130     }
131
132     private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
133         final int sizeIdx = size2SizeIdx(reqCapacity);
134
135         if (sizeIdx <= smallMaxSizeIdx) {
136             tcacheAllocateSmall(cache, buf, reqCapacity, sizeIdx);
137         } else if (sizeIdx < nSizes) {
138             tcacheAllocateNormal(cache, buf, reqCapacity, sizeIdx);
139         } else {
140             int normCapacity = directMemoryCacheAlignment > 0
141                     ? normalizeSize(reqCapacity) : reqCapacity;
142             // Huge allocations are never served via the cache so just call allocateHuge
143             allocateHuge(buf, normCapacity);
144         }
145     }
146
147     private void tcacheAllocateSmall(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity,
148                                      final int sizeIdx) {
149
150         if (cache.allocateSmall(this, buf, reqCapacity, sizeIdx)) {
151             // was able to allocate out of the cache so move on
152             return;
153         }
154
155         /**
156          * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
157          * {@link PoolChunk#free(long)} may modify the doubly linked list as well.
158          */

159         final PoolSubpage<T> head = smallSubpagePools[sizeIdx];
160         final boolean needsNormalAllocation;
161         synchronized (head) {
162             final PoolSubpage<T> s = head.next;
163             needsNormalAllocation = s == head;
164             if (!needsNormalAllocation) {
165                 assert s.doNotDestroy && s.elemSize == sizeIdx2size(sizeIdx);
166                 long handle = s.allocate();
167                 assert handle >= 0;
168                 s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity, cache);
169             }
170         }
171
172         if (needsNormalAllocation) {
173             synchronized (this) {
174                 allocateNormal(buf, reqCapacity, sizeIdx, cache);
175             }
176         }
177
178         incSmallAllocation();
179     }
180
181     private void tcacheAllocateNormal(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity,
182                                       final int sizeIdx) {
183         if (cache.allocateNormal(this, buf, reqCapacity, sizeIdx)) {
184             // was able to allocate out of the cache so move on
185             return;
186         }
187         synchronized (this) {
188             allocateNormal(buf, reqCapacity, sizeIdx, cache);
189             ++allocationsNormal;
190         }
191     }
192
193     // Method must be called inside synchronized(this) { ... } block
194     private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int sizeIdx, PoolThreadCache threadCache) {
195         if (q050.allocate(buf, reqCapacity, sizeIdx, threadCache) ||
196             q025.allocate(buf, reqCapacity, sizeIdx, threadCache) ||
197             q000.allocate(buf, reqCapacity, sizeIdx, threadCache) ||
198             qInit.allocate(buf, reqCapacity, sizeIdx, threadCache) ||
199             q075.allocate(buf, reqCapacity, sizeIdx, threadCache)) {
200             return;
201         }
202
203         // Add a new chunk.
204         PoolChunk<T> c = newChunk(pageSize, nPSizes, pageShifts, chunkSize);
205         boolean success = c.allocate(buf, reqCapacity, sizeIdx, threadCache);
206         assert success;
207         qInit.add(c);
208     }
209
210     private void incSmallAllocation() {
211         allocationsSmall.increment();
212     }
213
214     private void allocateHuge(PooledByteBuf<T> buf, int reqCapacity) {
215         PoolChunk<T> chunk = newUnpooledChunk(reqCapacity);
216         activeBytesHuge.add(chunk.chunkSize());
217         buf.initUnpooled(chunk, reqCapacity);
218         allocationsHuge.increment();
219     }
220
221     void free(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int normCapacity, PoolThreadCache cache) {
222         if (chunk.unpooled) {
223             int size = chunk.chunkSize();
224             destroyChunk(chunk);
225             activeBytesHuge.add(-size);
226             deallocationsHuge.increment();
227         } else {
228             SizeClass sizeClass = sizeClass(handle);
229             if (cache != null && cache.add(this, chunk, nioBuffer, handle, normCapacity, sizeClass)) {
230                 // cached so not free it.
231                 return;
232             }
233
234             freeChunk(chunk, handle, normCapacity, sizeClass, nioBuffer, false);
235         }
236     }
237
238     private SizeClass sizeClass(long handle) {
239         return isSubpage(handle) ? SizeClass.Small : SizeClass.Normal;
240     }
241
242     void freeChunk(PoolChunk<T> chunk, long handle, int normCapacity, SizeClass sizeClass, ByteBuffer nioBuffer,
243                    boolean finalizer) {
244         final boolean destroyChunk;
245         synchronized (this) {
246             // We only call this if freeChunk is not called because of the PoolThreadCache finalizer as otherwise this
247             // may fail due lazy class-loading in for example tomcat.
248             if (!finalizer) {
249                 switch (sizeClass) {
250                     case Normal:
251                         ++deallocationsNormal;
252                         break;
253                     case Small:
254                         ++deallocationsSmall;
255                         break;
256                     default:
257                         throw new Error();
258                 }
259             }
260             destroyChunk = !chunk.parent.free(chunk, handle, normCapacity, nioBuffer);
261         }
262         if (destroyChunk) {
263             // destroyChunk not need to be called while holding the synchronized lock.
264             destroyChunk(chunk);
265         }
266     }
267
268     PoolSubpage<T> findSubpagePoolHead(int sizeIdx) {
269         return smallSubpagePools[sizeIdx];
270     }
271
272     void reallocate(PooledByteBuf<T> buf, int newCapacity, boolean freeOldMemory) {
273         assert newCapacity >= 0 && newCapacity <= buf.maxCapacity();
274
275         int oldCapacity = buf.length;
276         if (oldCapacity == newCapacity) {
277             return;
278         }
279
280         PoolChunk<T> oldChunk = buf.chunk;
281         ByteBuffer oldNioBuffer = buf.tmpNioBuf;
282         long oldHandle = buf.handle;
283         T oldMemory = buf.memory;
284         int oldOffset = buf.offset;
285         int oldMaxLength = buf.maxLength;
286
287         // This does not touch buf's reader/writer indices
288         allocate(parent.threadCache(), buf, newCapacity);
289         int bytesToCopy;
290         if (newCapacity > oldCapacity) {
291             bytesToCopy = oldCapacity;
292         } else {
293             buf.trimIndicesToCapacity(newCapacity);
294             bytesToCopy = newCapacity;
295         }
296         memoryCopy(oldMemory, oldOffset, buf, bytesToCopy);
297         if (freeOldMemory) {
298             free(oldChunk, oldNioBuffer, oldHandle, oldMaxLength, buf.cache);
299         }
300     }
301
302     @Override
303     public int numThreadCaches() {
304         return numThreadCaches.get();
305     }
306
307     @Override
308     public int numTinySubpages() {
309         return 0;
310     }
311
312     @Override
313     public int numSmallSubpages() {
314         return smallSubpagePools.length;
315     }
316
317     @Override
318     public int numChunkLists() {
319         return chunkListMetrics.size();
320     }
321
322     @Override
323     public List<PoolSubpageMetric> tinySubpages() {
324         return Collections.emptyList();
325     }
326
327     @Override
328     public List<PoolSubpageMetric> smallSubpages() {
329         return subPageMetricList(smallSubpagePools);
330     }
331
332     @Override
333     public List<PoolChunkListMetric> chunkLists() {
334         return chunkListMetrics;
335     }
336
337     private static List<PoolSubpageMetric> subPageMetricList(PoolSubpage<?>[] pages) {
338         List<PoolSubpageMetric> metrics = new ArrayList<PoolSubpageMetric>();
339         for (PoolSubpage<?> head : pages) {
340             if (head.next == head) {
341                 continue;
342             }
343             PoolSubpage<?> s = head.next;
344             for (;;) {
345                 metrics.add(s);
346                 s = s.next;
347                 if (s == head) {
348                     break;
349                 }
350             }
351         }
352         return metrics;
353     }
354
355     @Override
356     public long numAllocations() {
357         final long allocsNormal;
358         synchronized (this) {
359             allocsNormal = allocationsNormal;
360         }
361         return allocationsSmall.value() + allocsNormal + allocationsHuge.value();
362     }
363
364     @Override
365     public long numTinyAllocations() {
366         return 0;
367     }
368
369     @Override
370     public long numSmallAllocations() {
371         return allocationsSmall.value();
372     }
373
374     @Override
375     public synchronized long numNormalAllocations() {
376         return allocationsNormal;
377     }
378
379     @Override
380     public long numDeallocations() {
381         final long deallocs;
382         synchronized (this) {
383             deallocs = deallocationsSmall + deallocationsNormal;
384         }
385         return deallocs + deallocationsHuge.value();
386     }
387
388     @Override
389     public long numTinyDeallocations() {
390         return 0;
391     }
392
393     @Override
394     public synchronized long numSmallDeallocations() {
395         return deallocationsSmall;
396     }
397
398     @Override
399     public synchronized long numNormalDeallocations() {
400         return deallocationsNormal;
401     }
402
403     @Override
404     public long numHugeAllocations() {
405         return allocationsHuge.value();
406     }
407
408     @Override
409     public long numHugeDeallocations() {
410         return deallocationsHuge.value();
411     }
412
413     @Override
414     public  long numActiveAllocations() {
415         long val = allocationsSmall.value() + allocationsHuge.value()
416                 - deallocationsHuge.value();
417         synchronized (this) {
418             val += allocationsNormal - (deallocationsSmall + deallocationsNormal);
419         }
420         return max(val, 0);
421     }
422
423     @Override
424     public long numActiveTinyAllocations() {
425         return 0;
426     }
427
428     @Override
429     public long numActiveSmallAllocations() {
430         return max(numSmallAllocations() - numSmallDeallocations(), 0);
431     }
432
433     @Override
434     public long numActiveNormalAllocations() {
435         final long val;
436         synchronized (this) {
437             val = allocationsNormal - deallocationsNormal;
438         }
439         return max(val, 0);
440     }
441
442     @Override
443     public long numActiveHugeAllocations() {
444         return max(numHugeAllocations() - numHugeDeallocations(), 0);
445     }
446
447     @Override
448     public long numActiveBytes() {
449         long val = activeBytesHuge.value();
450         synchronized (this) {
451             for (int i = 0; i < chunkListMetrics.size(); i++) {
452                 for (PoolChunkMetric m: chunkListMetrics.get(i)) {
453                     val += m.chunkSize();
454                 }
455             }
456         }
457         return max(0, val);
458     }
459
460     protected abstract PoolChunk<T> newChunk(int pageSize, int maxPageIdx, int pageShifts, int chunkSize);
461     protected abstract PoolChunk<T> newUnpooledChunk(int capacity);
462     protected abstract PooledByteBuf<T> newByteBuf(int maxCapacity);
463     protected abstract void memoryCopy(T src, int srcOffset, PooledByteBuf<T> dst, int length);
464     protected abstract void destroyChunk(PoolChunk<T> chunk);
465
466     @Override
467     public synchronized String toString() {
468         StringBuilder buf = new StringBuilder()
469             .append("Chunk(s) at 0~25%:")
470             .append(StringUtil.NEWLINE)
471             .append(qInit)
472             .append(StringUtil.NEWLINE)
473             .append("Chunk(s) at 0~50%:")
474             .append(StringUtil.NEWLINE)
475             .append(q000)
476             .append(StringUtil.NEWLINE)
477             .append("Chunk(s) at 25~75%:")
478             .append(StringUtil.NEWLINE)
479             .append(q025)
480             .append(StringUtil.NEWLINE)
481             .append("Chunk(s) at 50~100%:")
482             .append(StringUtil.NEWLINE)
483             .append(q050)
484             .append(StringUtil.NEWLINE)
485             .append("Chunk(s) at 75~100%:")
486             .append(StringUtil.NEWLINE)
487             .append(q075)
488             .append(StringUtil.NEWLINE)
489             .append("Chunk(s) at 100%:")
490             .append(StringUtil.NEWLINE)
491             .append(q100)
492             .append(StringUtil.NEWLINE)
493             .append("small subpages:");
494         appendPoolSubPages(buf, smallSubpagePools);
495         buf.append(StringUtil.NEWLINE);
496
497         return buf.toString();
498     }
499
500     private static void appendPoolSubPages(StringBuilder buf, PoolSubpage<?>[] subpages) {
501         for (int i = 0; i < subpages.length; i ++) {
502             PoolSubpage<?> head = subpages[i];
503             if (head.next == head) {
504                 continue;
505             }
506
507             buf.append(StringUtil.NEWLINE)
508                     .append(i)
509                     .append(": ");
510             PoolSubpage<?> s = head.next;
511             for (;;) {
512                 buf.append(s);
513                 s = s.next;
514                 if (s == head) {
515                     break;
516                 }
517             }
518         }
519     }
520
521     @Override
522     protected final void finalize() throws Throwable {
523         try {
524             super.finalize();
525         } finally {
526             destroyPoolSubPages(smallSubpagePools);
527             destroyPoolChunkLists(qInit, q000, q025, q050, q075, q100);
528         }
529     }
530
531     private static void destroyPoolSubPages(PoolSubpage<?>[] pages) {
532         for (PoolSubpage<?> page : pages) {
533             page.destroy();
534         }
535     }
536
537     private void destroyPoolChunkLists(PoolChunkList<T>... chunkLists) {
538         for (PoolChunkList<T> chunkList: chunkLists) {
539             chunkList.destroy(this);
540         }
541     }
542
543     static final class HeapArena extends PoolArena<byte[]> {
544
545         HeapArena(PooledByteBufAllocator parent, int pageSize, int pageShifts,
546                   int chunkSize, int directMemoryCacheAlignment) {
547             super(parent, pageSize, pageShifts, chunkSize,
548                   directMemoryCacheAlignment);
549         }
550
551         private static byte[] newByteArray(int size) {
552             return PlatformDependent.allocateUninitializedArray(size);
553         }
554
555         @Override
556         boolean isDirect() {
557             return false;
558         }
559
560         @Override
561         protected PoolChunk<byte[]> newChunk(int pageSize, int maxPageIdx, int pageShifts, int chunkSize) {
562             return new PoolChunk<byte[]>(this, newByteArray(chunkSize), pageSize, pageShifts, chunkSize, maxPageIdx, 0);
563         }
564
565         @Override
566         protected PoolChunk<byte[]> newUnpooledChunk(int capacity) {
567             return new PoolChunk<byte[]>(this, newByteArray(capacity), capacity, 0);
568         }
569
570         @Override
571         protected void destroyChunk(PoolChunk<byte[]> chunk) {
572             // Rely on GC.
573         }
574
575         @Override
576         protected PooledByteBuf<byte[]> newByteBuf(int maxCapacity) {
577             return HAS_UNSAFE ? PooledUnsafeHeapByteBuf.newUnsafeInstance(maxCapacity)
578                     : PooledHeapByteBuf.newInstance(maxCapacity);
579         }
580
581         @Override
582         protected void memoryCopy(byte[] src, int srcOffset, PooledByteBuf<byte[]> dst, int length) {
583             if (length == 0) {
584                 return;
585             }
586
587             System.arraycopy(src, srcOffset, dst.memory, dst.offset, length);
588         }
589     }
590
591     static final class DirectArena extends PoolArena<ByteBuffer> {
592
593         DirectArena(PooledByteBufAllocator parent, int pageSize, int pageShifts,
594                     int chunkSize, int directMemoryCacheAlignment) {
595             super(parent, pageSize, pageShifts, chunkSize,
596                   directMemoryCacheAlignment);
597         }
598
599         @Override
600         boolean isDirect() {
601             return true;
602         }
603
604         // mark as package-private, only for unit test
605         int offsetCacheLine(ByteBuffer memory) {
606             // We can only calculate the offset if Unsafe is present as otherwise directBufferAddress(...) will
607             // throw an NPE.
608             int remainder = HAS_UNSAFE
609                     ? (int) (PlatformDependent.directBufferAddress(memory) & directMemoryCacheAlignmentMask)
610                     : 0;
611
612             // offset = alignment - address & (alignment - 1)
613             return directMemoryCacheAlignment - remainder;
614         }
615
616         @Override
617         protected PoolChunk<ByteBuffer> newChunk(int pageSize, int maxPageIdx,
618             int pageShifts, int chunkSize) {
619             if (directMemoryCacheAlignment == 0) {
620                 return new PoolChunk<ByteBuffer>(this,
621                         allocateDirect(chunkSize), pageSize, pageShifts,
622                         chunkSize, maxPageIdx, 0);
623             }
624             final ByteBuffer memory = allocateDirect(chunkSize
625                     + directMemoryCacheAlignment);
626             return new PoolChunk<ByteBuffer>(this, memory, pageSize,
627                     pageShifts, chunkSize, maxPageIdx,
628                     offsetCacheLine(memory));
629         }
630
631         @Override
632         protected PoolChunk<ByteBuffer> newUnpooledChunk(int capacity) {
633             if (directMemoryCacheAlignment == 0) {
634                 return new PoolChunk<ByteBuffer>(this,
635                         allocateDirect(capacity), capacity, 0);
636             }
637             final ByteBuffer memory = allocateDirect(capacity
638                     + directMemoryCacheAlignment);
639             return new PoolChunk<ByteBuffer>(this, memory, capacity,
640                     offsetCacheLine(memory));
641         }
642
643         private static ByteBuffer allocateDirect(int capacity) {
644             return PlatformDependent.useDirectBufferNoCleaner() ?
645                     PlatformDependent.allocateDirectNoCleaner(capacity) : ByteBuffer.allocateDirect(capacity);
646         }
647
648         @Override
649         protected void destroyChunk(PoolChunk<ByteBuffer> chunk) {
650             if (PlatformDependent.useDirectBufferNoCleaner()) {
651                 PlatformDependent.freeDirectNoCleaner(chunk.memory);
652             } else {
653                 PlatformDependent.freeDirectBuffer(chunk.memory);
654             }
655         }
656
657         @Override
658         protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) {
659             if (HAS_UNSAFE) {
660                 return PooledUnsafeDirectByteBuf.newInstance(maxCapacity);
661             } else {
662                 return PooledDirectByteBuf.newInstance(maxCapacity);
663             }
664         }
665
666         @Override
667         protected void memoryCopy(ByteBuffer src, int srcOffset, PooledByteBuf<ByteBuffer> dstBuf, int length) {
668             if (length == 0) {
669                 return;
670             }
671
672             if (HAS_UNSAFE) {
673                 PlatformDependent.copyMemory(
674                         PlatformDependent.directBufferAddress(src) + srcOffset,
675                         PlatformDependent.directBufferAddress(dstBuf.memory) + dstBuf.offset, length);
676             } else {
677                 // We must duplicate the NIO buffers because they may be accessed by other Netty buffers.
678                 src = src.duplicate();
679                 ByteBuffer dst = dstBuf.internalNioBuffer();
680                 src.position(srcOffset).limit(srcOffset + length);
681                 dst.position(dstBuf.offset);
682                 dst.put(src);
683             }
684         }
685     }
686 }
687