1 /*
2  * Copyright 2012 The Netty Project
3  *
4  * The Netty Project licenses this file to you under the Apache License,
5  * version 2.0 (the "License"); you may not use this file except in compliance
6  * with the License. You may obtain a copy of the License at:
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13  * License for the specific language governing permissions and limitations
14  * under the License.
15  */

16 package io.netty.buffer;
17
18 import io.netty.util.collection.IntObjectHashMap;
19 import io.netty.util.collection.IntObjectMap;
20
21 import java.nio.ByteBuffer;
22 import java.util.ArrayDeque;
23 import java.util.Deque;
24 import java.util.PriorityQueue;
25
26 /**
27  * Description of algorithm for PageRun/PoolSubpage allocation from PoolChunk
28  *
29  * Notation: The following terms are important to understand the code
30  * > page  - a page is the smallest unit of memory chunk that can be allocated
31  * > run   - a run is a collection of pages
32  * > chunk - a chunk is a collection of runs
33  * > in this code chunkSize = maxPages * pageSize
34  *
35  * To begin we allocate a byte array of size = chunkSize
36  * Whenever a ByteBuf of given size needs to be created we search for the first position
37  * in the byte array that has enough empty space to accommodate the requested size and
38  * return a (long) handle that encodes this offset information, (this memory segment is then
39  * marked as reserved so it is always used by exactly one ByteBuf and no more)
40  *
41  * For simplicity all sizes are normalized according to {@link PoolArena#size2SizeIdx(int)} method.
42  * This ensures that when we request for memory segments of size > pageSize the normalizedCapacity
43  * equals the next nearest size in {@link SizeClasses}.
44  *
45  *
46  *  A chunk has the following layout:
47  *
48  *     /-----------------\
49  *     | run             |
50  *     |                 |
51  *     |                 |
52  *     |-----------------|
53  *     | run             |
54  *     |                 |
55  *     |-----------------|
56  *     | unalloctated    |
57  *     | (freed)         |
58  *     |                 |
59  *     |-----------------|
60  *     | subpage         |
61  *     |-----------------|
62  *     | unallocated     |
63  *     | (freed)         |
64  *     | ...             |
65  *     | ...             |
66  *     | ...             |
67  *     |                 |
68  *     |                 |
69  *     |                 |
70  *     \-----------------/
71  *
72  *
73  * handle:
74  * -------
75  * a handle is a long number, the bit layout of a run looks like:
76  *
77  * oooooooo ooooooos ssssssss ssssssue bbbbbbbb bbbbbbbb bbbbbbbb bbbbbbbb
78  *
79  * o: runOffset (page offset in the chunk), 15bit
80  * s: size (number of pages) of this run, 15bit
81  * u: isUsed?, 1bit
82  * e: isSubpage?, 1bit
83  * b: bitmapIdx of subpage, zero if it's not subpage, 32bit
84  *
85  * runsAvailMap:
86  * ------
87  * a map which manages all runs (used and not in used).
88  * For each run, the first runOffset and last runOffset are stored in runsAvailMap.
89  * key: runOffset
90  * value: handle
91  *
92  * runsAvail:
93  * ----------
94  * an array of {@link PriorityQueue}.
95  * Each queue manages same size of runs.
96  * Runs are sorted by offset, so that we always allocate runs with smaller offset.
97  *
98  *
99  * Algorithm:
100  * ----------
101  *
102  *   As we allocate runs, we update values stored in runsAvailMap and runsAvail so that the property is maintained.
103  *
104  * Initialization -
105  *  In the beginning we store the initial run which is the whole chunk.
106  *  The initial run:
107  *  runOffset = 0
108  *  size = chunkSize
109  *  isUsed = no
110  *  isSubpage = no
111  *  bitmapIdx = 0
112  *
113  *
114  * Algorithm: [allocateRun(size)]
115  * ----------
116  * 1) find the first avail run using in runsAvails according to size
117  * 2) if pages of run is larger than request pages then split it, and save the tailing run
118  *    for later using
119  *
120  * Algorithm: [allocateSubpage(size)]
121  * ----------
122  * 1) find a not full subpage according to size.
123  *    if it already exists just return, otherwise allocate a new PoolSubpage and call init()
124  *    note that this subpage object is added to subpagesPool in the PoolArena when we init() it
125  * 2) call subpage.allocate()
126  *
127  * Algorithm: [free(handle, length, nioBuffer)]
128  * ----------
129  * 1) if it is a subpage, return the slab back into this subpage
130  * 2) if the subpage is not used or it is a run, then start free this run
131  * 3) merge continuous avail runs
132  * 4) save the merged run
133  *
134  */

135 final class PoolChunk<T> implements PoolChunkMetric {
136
137     private static final int OFFSET_BIT_LENGTH = 15;
138     private static final int SIZE_BIT_LENGTH = 15;
139     private static final int INUSED_BIT_LENGTH = 1;
140     private static final int SUBPAGE_BIT_LENGTH = 1;
141     private static final int BITMAP_IDX_BIT_LENGTH = 32;
142
143     static final int IS_SUBPAGE_SHIFT = BITMAP_IDX_BIT_LENGTH;
144     static final int IS_USED_SHIFT = SUBPAGE_BIT_LENGTH + IS_SUBPAGE_SHIFT;
145     static final int SIZE_SHIFT = INUSED_BIT_LENGTH + IS_USED_SHIFT;
146     static final int RUN_OFFSET_SHIFT = SIZE_BIT_LENGTH + SIZE_SHIFT;
147
148     final PoolArena<T> arena;
149     final T memory;
150     final boolean unpooled;
151     final int offset;
152
153     /**
154      * store the first page and last page of each avail run
155      */

156     private final IntObjectMap<Long> runsAvailMap;
157
158     /**
159      * manage all avail runs
160      */

161     private final PriorityQueue<Long>[] runsAvail;
162
163     /**
164      * manage all subpages in this chunk
165      */

166     private final PoolSubpage<T>[] subpages;
167
168     private final int pageSize;
169     private final int pageShifts;
170     private final int chunkSize;
171
172     // Use as cache for ByteBuffer created from the memory. These are just duplicates and so are only a container
173     // around the memory itself. These are often needed for operations within the Pooled*ByteBuf and so
174     // may produce extra GC, which can be greatly reduced by caching the duplicates.
175     //
176     // This may be null if the PoolChunk is unpooled as pooling the ByteBuffer instances does not make any sense here.
177     private final Deque<ByteBuffer> cachedNioBuffers;
178
179     int freeBytes;
180
181     PoolChunkList<T> parent;
182     PoolChunk<T> prev;
183     PoolChunk<T> next;
184
185     // TODO: Test if adding padding helps under contention
186     //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
187
188     @SuppressWarnings("unchecked")
189     PoolChunk(PoolArena<T> arena, T memory, int pageSize, int pageShifts, int chunkSize, int maxPageIdx, int offset) {
190         unpooled = false;
191         this.arena = arena;
192         this.memory = memory;
193         this.pageSize = pageSize;
194         this.pageShifts = pageShifts;
195         this.chunkSize = chunkSize;
196         this.offset = offset;
197         freeBytes = chunkSize;
198
199         runsAvail = newRunsAvailqueueArray(maxPageIdx);
200         runsAvailMap = new IntObjectHashMap<Long>();
201         subpages = new PoolSubpage[chunkSize >> pageShifts];
202
203         //insert initial run, offset = 0, pages = chunkSize / pageSize
204         int pages = chunkSize >> pageShifts;
205         long initHandle = (long) pages << SIZE_SHIFT;
206         insertAvailRun(0, pages, initHandle);
207
208         cachedNioBuffers = new ArrayDeque<ByteBuffer>(8);
209     }
210
211     /** Creates a special chunk that is not pooled. */
212     PoolChunk(PoolArena<T> arena, T memory, int size, int offset) {
213         unpooled = true;
214         this.arena = arena;
215         this.memory = memory;
216         this.offset = offset;
217         pageSize = 0;
218         pageShifts = 0;
219         runsAvailMap = null;
220         runsAvail = null;
221         subpages = null;
222         chunkSize = size;
223         cachedNioBuffers = null;
224     }
225
226     @SuppressWarnings("unchecked")
227     private static PriorityQueue<Long>[] newRunsAvailqueueArray(int size) {
228         PriorityQueue<Long>[] queueArray = new PriorityQueue[size];
229         for (int i = 0; i < queueArray.length; i++) {
230             queueArray[i] = new PriorityQueue<Long>();
231         }
232         return queueArray;
233     }
234
235     private void insertAvailRun(int runOffset, int pages, Long handle) {
236         int pageIdxFloor = arena.pages2pageIdxFloor(pages);
237         PriorityQueue<Long> queue = runsAvail[pageIdxFloor];
238         queue.offer(handle);
239
240         //insert first page of run
241         insertAvailRun0(runOffset, handle);
242         if (pages > 1) {
243             //insert last page of run
244             insertAvailRun0(lastPage(runOffset, pages), handle);
245         }
246     }
247
248     private void insertAvailRun0(int runOffset, Long handle) {
249         Long pre = runsAvailMap.put(runOffset, handle);
250         assert pre == null;
251     }
252
253     private void removeAvailRun(long handle) {
254         int pageIdxFloor = arena.pages2pageIdxFloor(runPages(handle));
255         PriorityQueue<Long> queue = runsAvail[pageIdxFloor];
256         removeAvailRun(queue, handle);
257     }
258
259     private void removeAvailRun(PriorityQueue<Long> queue, long handle) {
260         queue.remove(handle);
261
262         int runOffset = runOffset(handle);
263         int pages = runPages(handle);
264         //remove first page of run
265         runsAvailMap.remove(runOffset);
266         if (pages > 1) {
267             //remove last page of run
268             runsAvailMap.remove(lastPage(runOffset, pages));
269         }
270     }
271
272     private static int lastPage(int runOffset, int pages) {
273         return runOffset + pages - 1;
274     }
275
276     private Long getAvailRunByOffset(int runOffset) {
277         return runsAvailMap.get(runOffset);
278     }
279
280     @Override
281     public int usage() {
282         final int freeBytes;
283         synchronized (arena) {
284             freeBytes = this.freeBytes;
285         }
286         return usage(freeBytes);
287     }
288
289     private int usage(int freeBytes) {
290         if (freeBytes == 0) {
291             return 100;
292         }
293
294         int freePercentage = (int) (freeBytes * 100L / chunkSize);
295         if (freePercentage == 0) {
296             return 99;
297         }
298         return 100 - freePercentage;
299     }
300
301     boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int sizeIdx, PoolThreadCache cache) {
302         final long handle;
303         if (sizeIdx <= arena.smallMaxSizeIdx) {
304             // small
305             handle = allocateSubpage(sizeIdx);
306             if (handle < 0) {
307                 return false;
308             }
309             assert isSubpage(handle);
310         } else {
311             // normal
312             // runSize must be multiple of pageSize
313             int runSize = arena.sizeIdx2size(sizeIdx);
314             handle = allocateRun(runSize);
315             if (handle < 0) {
316                 return false;
317             }
318         }
319
320         ByteBuffer nioBuffer = cachedNioBuffers != null? cachedNioBuffers.pollLast() : null;
321         initBuf(buf, nioBuffer, handle, reqCapacity, cache);
322         return true;
323     }
324
325     private long allocateRun(int runSize) {
326         int pages = runSize >> pageShifts;
327         int pageIdx = arena.pages2pageIdx(pages);
328
329         synchronized (runsAvail) {
330             //find first queue which has at least one big enough run
331             int queueIdx = runFirstBestFit(pageIdx);
332             if (queueIdx == -1) {
333                 return -1;
334             }
335
336             //get run with min offset in this queue
337             PriorityQueue<Long> queue = runsAvail[queueIdx];
338             long handle = queue.poll();
339
340             assert !isUsed(handle);
341
342             removeAvailRun(queue, handle);
343
344             if (handle != -1) {
345                 handle = splitLargeRun(handle, pages);
346             }
347
348             freeBytes -= runSize(pageShifts, handle);
349             return handle;
350         }
351     }
352
353     private int calculateRunSize(int sizeIdx) {
354         int maxElements = 1 << pageShifts - SizeClasses.LOG2_QUANTUM;
355         int runSize = 0;
356         int nElements;
357
358         final int elemSize = arena.sizeIdx2size(sizeIdx);
359
360         //find lowest common multiple of pageSize and elemSize
361         do {
362             runSize += pageSize;
363             nElements = runSize / elemSize;
364         } while (nElements < maxElements && runSize != nElements * elemSize);
365
366         while (nElements > maxElements) {
367             runSize -= pageSize;
368             nElements = runSize / elemSize;
369         }
370
371         assert nElements > 0;
372         assert runSize <= chunkSize;
373         assert runSize >= elemSize;
374
375         return runSize;
376     }
377
378     private int runFirstBestFit(int pageIdx) {
379         if (freeBytes == chunkSize) {
380             return arena.nPSizes - 1;
381         }
382         for (int i = pageIdx; i < arena.nPSizes; i++) {
383             PriorityQueue<Long> queue = runsAvail[i];
384             if (queue != null && !queue.isEmpty()) {
385                 return i;
386             }
387         }
388         return -1;
389     }
390
391     private long splitLargeRun(long handle, int needPages) {
392         assert needPages > 0;
393
394         int totalPages = runPages(handle);
395         assert needPages <= totalPages;
396
397         int remPages = totalPages - needPages;
398
399         if (remPages > 0) {
400             int runOffset = runOffset(handle);
401
402             // keep track of trailing unused pages for later use
403             int availOffset = runOffset + needPages;
404             long availRun = toRunHandle(availOffset, remPages, 0);
405             insertAvailRun(availOffset, remPages, availRun);
406
407             // not avail
408             return toRunHandle(runOffset, needPages, 1);
409         }
410
411         //mark it as used
412         handle |= 1L << IS_USED_SHIFT;
413         return handle;
414     }
415
416     /**
417      * Create / initialize a new PoolSubpage of normCapacity. Any PoolSubpage created / initialized here is added to
418      * subpage pool in the PoolArena that owns this PoolChunk
419      *
420      * @param sizeIdx sizeIdx of normalized size
421      *
422      * @return index in memoryMap
423      */

424     private long allocateSubpage(int sizeIdx) {
425         // Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it.
426         // This is need as we may add it back and so alter the linked-list structure.
427         PoolSubpage<T> head = arena.findSubpagePoolHead(sizeIdx);
428         synchronized (head) {
429             //allocate a new run
430             int runSize = calculateRunSize(sizeIdx);
431             //runSize must be multiples of pageSize
432             long runHandle = allocateRun(runSize);
433             if (runHandle < 0) {
434                 return -1;
435             }
436
437             int runOffset = runOffset(runHandle);
438             int elemSize = arena.sizeIdx2size(sizeIdx);
439
440             PoolSubpage<T> subpage = new PoolSubpage<T>(head, this, pageShifts, runOffset,
441                                runSize(pageShifts, runHandle), elemSize);
442
443             subpages[runOffset] = subpage;
444             return subpage.allocate();
445         }
446     }
447
448     /**
449      * Free a subpage or a run of pages When a subpage is freed from PoolSubpage, it might be added back to subpage pool
450      * of the owning PoolArena. If the subpage pool in PoolArena has at least one other PoolSubpage of given elemSize,
451      * we can completely free the owning Page so it is available for subsequent allocations
452      *
453      * @param handle handle to free
454      */

455     void free(long handle, int normCapacity, ByteBuffer nioBuffer) {
456         if (isSubpage(handle)) {
457             int sizeIdx = arena.size2SizeIdx(normCapacity);
458             PoolSubpage<T> head = arena.findSubpagePoolHead(sizeIdx);
459
460             PoolSubpage<T> subpage = subpages[runOffset(handle)];
461             assert subpage != null && subpage.doNotDestroy;
462
463             // Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it.
464             // This is need as we may add it back and so alter the linked-list structure.
465             synchronized (head) {
466                 if (subpage.free(head, bitmapIdx(handle))) {
467                     //the subpage is still used, do not free it
468                     return;
469                 }
470             }
471         }
472
473         //start free run
474         int pages = runPages(handle);
475
476         synchronized (runsAvail) {
477             // collapse continuous runs, successfully collapsed runs
478             // will be removed from runsAvail and runsAvailMap
479             long finalRun = collapseRuns(handle);
480
481             //set run as not used
482             finalRun &= ~(1L << IS_USED_SHIFT);
483             //if it is a subpage, set it to run
484             finalRun &= ~(1L << IS_SUBPAGE_SHIFT);
485
486             insertAvailRun(runOffset(finalRun), runPages(finalRun), finalRun);
487             freeBytes += pages << pageShifts;
488         }
489
490         if (nioBuffer != null && cachedNioBuffers != null &&
491             cachedNioBuffers.size() < PooledByteBufAllocator.DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK) {
492             cachedNioBuffers.offer(nioBuffer);
493         }
494     }
495
496     private long collapseRuns(long handle) {
497         return collapseNext(collapsePast(handle));
498     }
499
500     private long collapsePast(long handle) {
501         for (;;) {
502             int runOffset = runOffset(handle);
503             int runPages = runPages(handle);
504
505             Long pastRun = getAvailRunByOffset(runOffset - 1);
506             if (pastRun == null) {
507                 return handle;
508             }
509
510             int pastOffset = runOffset(pastRun);
511             int pastPages = runPages(pastRun);
512
513             //is continuous
514             if (pastRun != handle && pastOffset + pastPages == runOffset) {
515                 //remove past run
516                 removeAvailRun(pastRun);
517                 handle = toRunHandle(pastOffset, pastPages + runPages, 0);
518             } else {
519                 return handle;
520             }
521         }
522     }
523
524     private long collapseNext(long handle) {
525         for (;;) {
526             int runOffset = runOffset(handle);
527             int runPages = runPages(handle);
528
529             Long nextRun = getAvailRunByOffset(runOffset + runPages);
530             if (nextRun == null) {
531                 return handle;
532             }
533
534             int nextOffset = runOffset(nextRun);
535             int nextPages = runPages(nextRun);
536
537             //is continuous
538             if (nextRun != handle && runOffset + runPages == nextOffset) {
539                 //remove next run
540                 removeAvailRun(nextRun);
541                 handle = toRunHandle(runOffset, runPages + nextPages, 0);
542             } else {
543                 return handle;
544             }
545         }
546     }
547
548     private static long toRunHandle(int runOffset, int runPages, int inUsed) {
549         return (long) runOffset << RUN_OFFSET_SHIFT
550                | (long) runPages << SIZE_SHIFT
551                | (long) inUsed << IS_USED_SHIFT;
552     }
553
554     void initBuf(PooledByteBuf<T> buf, ByteBuffer nioBuffer, long handle, int reqCapacity,
555                  PoolThreadCache threadCache) {
556         if (isRun(handle)) {
557             buf.init(this, nioBuffer, handle, runOffset(handle) << pageShifts,
558                      reqCapacity, runSize(pageShifts, handle), arena.parent.threadCache());
559         } else {
560             initBufWithSubpage(buf, nioBuffer, handle, reqCapacity, threadCache);
561         }
562     }
563
564     void initBufWithSubpage(PooledByteBuf<T> buf, ByteBuffer nioBuffer, long handle, int reqCapacity,
565                             PoolThreadCache threadCache) {
566         int runOffset = runOffset(handle);
567         int bitmapIdx = bitmapIdx(handle);
568
569         PoolSubpage<T> s = subpages[runOffset];
570         assert s.doNotDestroy;
571         assert reqCapacity <= s.elemSize;
572
573         buf.init(this, nioBuffer, handle,
574                  (runOffset << pageShifts) + bitmapIdx * s.elemSize + offset,
575                  reqCapacity, s.elemSize, threadCache);
576     }
577
578     @Override
579     public int chunkSize() {
580         return chunkSize;
581     }
582
583     @Override
584     public int freeBytes() {
585         synchronized (arena) {
586             return freeBytes;
587         }
588     }
589
590     @Override
591     public String toString() {
592         final int freeBytes;
593         synchronized (arena) {
594             freeBytes = this.freeBytes;
595         }
596
597         return new StringBuilder()
598                 .append("Chunk(")
599                 .append(Integer.toHexString(System.identityHashCode(this)))
600                 .append(": ")
601                 .append(usage(freeBytes))
602                 .append("%, ")
603                 .append(chunkSize - freeBytes)
604                 .append('/')
605                 .append(chunkSize)
606                 .append(')')
607                 .toString();
608     }
609
610     void destroy() {
611         arena.destroyChunk(this);
612     }
613
614     static int runOffset(long handle) {
615         return (int) (handle >> RUN_OFFSET_SHIFT);
616     }
617
618     static int runSize(int pageShifts, long handle) {
619         return runPages(handle) << pageShifts;
620     }
621
622     static int runPages(long handle) {
623         return (int) (handle >> SIZE_SHIFT & 0x7fff);
624     }
625
626     static boolean isUsed(long handle) {
627         return (handle >> IS_USED_SHIFT & 1) == 1L;
628     }
629
630     static boolean isRun(long handle) {
631         return !isSubpage(handle);
632     }
633
634     static boolean isSubpage(long handle) {
635         return (handle >> IS_SUBPAGE_SHIFT & 1) == 1L;
636     }
637
638     static int bitmapIdx(long handle) {
639         return (int) handle;
640     }
641 }
642