1
16
17 package io.netty.util;
18
19 import io.netty.util.concurrent.FastThreadLocal;
20 import io.netty.util.internal.ObjectPool;
21 import io.netty.util.internal.SystemPropertyUtil;
22 import io.netty.util.internal.logging.InternalLogger;
23 import io.netty.util.internal.logging.InternalLoggerFactory;
24
25 import java.lang.ref.WeakReference;
26 import java.util.Arrays;
27 import java.util.Map;
28 import java.util.WeakHashMap;
29 import java.util.concurrent.atomic.AtomicInteger;
30
31 import static io.netty.util.internal.MathUtil.safeFindNextPositivePowerOfTwo;
32 import static java.lang.Math.max;
33 import static java.lang.Math.min;
34
35
40 public abstract class Recycler<T> {
41
42 private static final InternalLogger logger = InternalLoggerFactory.getInstance(Recycler.class);
43
44 @SuppressWarnings("rawtypes")
45 private static final Handle NOOP_HANDLE = new Handle() {
46 @Override
47 public void recycle(Object object) {
48
49 }
50 };
51 private static final AtomicInteger ID_GENERATOR = new AtomicInteger(Integer.MIN_VALUE);
52 private static final int OWN_THREAD_ID = ID_GENERATOR.getAndIncrement();
53 private static final int DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD = 4 * 1024;
54 private static final int DEFAULT_MAX_CAPACITY_PER_THREAD;
55 private static final int INITIAL_CAPACITY;
56 private static final int MAX_SHARED_CAPACITY_FACTOR;
57 private static final int MAX_DELAYED_QUEUES_PER_THREAD;
58 private static final int LINK_CAPACITY;
59 private static final int RATIO;
60 private static final int DELAYED_QUEUE_RATIO;
61
62 static {
63
64
65
66 int maxCapacityPerThread = SystemPropertyUtil.getInt("io.netty.recycler.maxCapacityPerThread",
67 SystemPropertyUtil.getInt("io.netty.recycler.maxCapacity", DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD));
68 if (maxCapacityPerThread < 0) {
69 maxCapacityPerThread = DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD;
70 }
71
72 DEFAULT_MAX_CAPACITY_PER_THREAD = maxCapacityPerThread;
73
74 MAX_SHARED_CAPACITY_FACTOR = max(2,
75 SystemPropertyUtil.getInt("io.netty.recycler.maxSharedCapacityFactor",
76 2));
77
78 MAX_DELAYED_QUEUES_PER_THREAD = max(0,
79 SystemPropertyUtil.getInt("io.netty.recycler.maxDelayedQueuesPerThread",
80
81 NettyRuntime.availableProcessors() * 2));
82
83 LINK_CAPACITY = safeFindNextPositivePowerOfTwo(
84 max(SystemPropertyUtil.getInt("io.netty.recycler.linkCapacity", 16), 16));
85
86
87
88
89 RATIO = max(0, SystemPropertyUtil.getInt("io.netty.recycler.ratio", 8));
90 DELAYED_QUEUE_RATIO = max(0, SystemPropertyUtil.getInt("io.netty.recycler.delayedQueue.ratio", RATIO));
91
92 if (logger.isDebugEnabled()) {
93 if (DEFAULT_MAX_CAPACITY_PER_THREAD == 0) {
94 logger.debug("-Dio.netty.recycler.maxCapacityPerThread: disabled");
95 logger.debug("-Dio.netty.recycler.maxSharedCapacityFactor: disabled");
96 logger.debug("-Dio.netty.recycler.linkCapacity: disabled");
97 logger.debug("-Dio.netty.recycler.ratio: disabled");
98 logger.debug("-Dio.netty.recycler.delayedQueue.ratio: disabled");
99 } else {
100 logger.debug("-Dio.netty.recycler.maxCapacityPerThread: {}", DEFAULT_MAX_CAPACITY_PER_THREAD);
101 logger.debug("-Dio.netty.recycler.maxSharedCapacityFactor: {}", MAX_SHARED_CAPACITY_FACTOR);
102 logger.debug("-Dio.netty.recycler.linkCapacity: {}", LINK_CAPACITY);
103 logger.debug("-Dio.netty.recycler.ratio: {}", RATIO);
104 logger.debug("-Dio.netty.recycler.delayedQueue.ratio: {}", DELAYED_QUEUE_RATIO);
105 }
106 }
107
108 INITIAL_CAPACITY = min(DEFAULT_MAX_CAPACITY_PER_THREAD, 256);
109 }
110
111 private final int maxCapacityPerThread;
112 private final int maxSharedCapacityFactor;
113 private final int interval;
114 private final int maxDelayedQueuesPerThread;
115 private final int delayedQueueInterval;
116
117 private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() {
118 @Override
119 protected Stack<T> initialValue() {
120 return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacityPerThread, maxSharedCapacityFactor,
121 interval, maxDelayedQueuesPerThread, delayedQueueInterval);
122 }
123
124 @Override
125 protected void onRemoval(Stack<T> value) {
126
127 if (value.threadRef.get() == Thread.currentThread()) {
128 if (DELAYED_RECYCLED.isSet()) {
129 DELAYED_RECYCLED.get().remove(value);
130 }
131 }
132 }
133 };
134
135 protected Recycler() {
136 this(DEFAULT_MAX_CAPACITY_PER_THREAD);
137 }
138
139 protected Recycler(int maxCapacityPerThread) {
140 this(maxCapacityPerThread, MAX_SHARED_CAPACITY_FACTOR);
141 }
142
143 protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor) {
144 this(maxCapacityPerThread, maxSharedCapacityFactor, RATIO, MAX_DELAYED_QUEUES_PER_THREAD);
145 }
146
147 protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor,
148 int ratio, int maxDelayedQueuesPerThread) {
149 this(maxCapacityPerThread, maxSharedCapacityFactor, ratio, maxDelayedQueuesPerThread,
150 DELAYED_QUEUE_RATIO);
151 }
152
153 protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor,
154 int ratio, int maxDelayedQueuesPerThread, int delayedQueueRatio) {
155 interval = max(0, ratio);
156 delayedQueueInterval = max(0, delayedQueueRatio);
157 if (maxCapacityPerThread <= 0) {
158 this.maxCapacityPerThread = 0;
159 this.maxSharedCapacityFactor = 1;
160 this.maxDelayedQueuesPerThread = 0;
161 } else {
162 this.maxCapacityPerThread = maxCapacityPerThread;
163 this.maxSharedCapacityFactor = max(1, maxSharedCapacityFactor);
164 this.maxDelayedQueuesPerThread = max(0, maxDelayedQueuesPerThread);
165 }
166 }
167
168 @SuppressWarnings("unchecked")
169 public final T get() {
170 if (maxCapacityPerThread == 0) {
171 return newObject((Handle<T>) NOOP_HANDLE);
172 }
173 Stack<T> stack = threadLocal.get();
174 DefaultHandle<T> handle = stack.pop();
175 if (handle == null) {
176 handle = stack.newHandle();
177 handle.value = newObject(handle);
178 }
179 return (T) handle.value;
180 }
181
182
185 @Deprecated
186 public final boolean recycle(T o, Handle<T> handle) {
187 if (handle == NOOP_HANDLE) {
188 return false;
189 }
190
191 DefaultHandle<T> h = (DefaultHandle<T>) handle;
192 if (h.stack.parent != this) {
193 return false;
194 }
195
196 h.recycle(o);
197 return true;
198 }
199
200 final int threadLocalCapacity() {
201 return threadLocal.get().elements.length;
202 }
203
204 final int threadLocalSize() {
205 return threadLocal.get().size;
206 }
207
208 protected abstract T newObject(Handle<T> handle);
209
210 public interface Handle<T> extends ObjectPool.Handle<T> { }
211
212 private static final class DefaultHandle<T> implements Handle<T> {
213 int lastRecycledId;
214 int recycleId;
215
216 boolean hasBeenRecycled;
217
218 Stack<?> stack;
219 Object value;
220
221 DefaultHandle(Stack<?> stack) {
222 this.stack = stack;
223 }
224
225 @Override
226 public void recycle(Object object) {
227 if (object != value) {
228 throw new IllegalArgumentException("object does not belong to handle");
229 }
230
231 Stack<?> stack = this.stack;
232 if (lastRecycledId != recycleId || stack == null) {
233 throw new IllegalStateException("recycled already");
234 }
235
236 stack.push(this);
237 }
238 }
239
240 private static final FastThreadLocal<Map<Stack<?>, WeakOrderQueue>> DELAYED_RECYCLED =
241 new FastThreadLocal<Map<Stack<?>, WeakOrderQueue>>() {
242 @Override
243 protected Map<Stack<?>, WeakOrderQueue> initialValue() {
244 return new WeakHashMap<Stack<?>, WeakOrderQueue>();
245 }
246 };
247
248
249
250 private static final class WeakOrderQueue extends WeakReference<Thread> {
251
252 static final WeakOrderQueue DUMMY = new WeakOrderQueue();
253
254
255 @SuppressWarnings("serial")
256 static final class Link extends AtomicInteger {
257 final DefaultHandle<?>[] elements = new DefaultHandle[LINK_CAPACITY];
258
259 int readIndex;
260 Link next;
261 }
262
263
264 private static final class Head {
265 private final AtomicInteger availableSharedCapacity;
266
267 Link link;
268
269 Head(AtomicInteger availableSharedCapacity) {
270 this.availableSharedCapacity = availableSharedCapacity;
271 }
272
273
276 void reclaimAllSpaceAndUnlink() {
277 Link head = link;
278 link = null;
279 int reclaimSpace = 0;
280 while (head != null) {
281 reclaimSpace += LINK_CAPACITY;
282 Link next = head.next;
283
284 head.next = null;
285 head = next;
286 }
287 if (reclaimSpace > 0) {
288 reclaimSpace(reclaimSpace);
289 }
290 }
291
292 private void reclaimSpace(int space) {
293 availableSharedCapacity.addAndGet(space);
294 }
295
296 void relink(Link link) {
297 reclaimSpace(LINK_CAPACITY);
298 this.link = link;
299 }
300
301
305 Link newLink() {
306 return reserveSpaceForLink(availableSharedCapacity) ? new Link() : null;
307 }
308
309 static boolean reserveSpaceForLink(AtomicInteger availableSharedCapacity) {
310 for (;;) {
311 int available = availableSharedCapacity.get();
312 if (available < LINK_CAPACITY) {
313 return false;
314 }
315 if (availableSharedCapacity.compareAndSet(available, available - LINK_CAPACITY)) {
316 return true;
317 }
318 }
319 }
320 }
321
322
323 private final Head head;
324 private Link tail;
325
326 private WeakOrderQueue next;
327 private final int id = ID_GENERATOR.getAndIncrement();
328 private final int interval;
329 private int handleRecycleCount;
330
331 private WeakOrderQueue() {
332 super(null);
333 head = new Head(null);
334 interval = 0;
335 }
336
337 private WeakOrderQueue(Stack<?> stack, Thread thread) {
338 super(thread);
339 tail = new Link();
340
341
342
343
344 head = new Head(stack.availableSharedCapacity);
345 head.link = tail;
346 interval = stack.delayedQueueInterval;
347 handleRecycleCount = interval;
348 }
349
350 static WeakOrderQueue newQueue(Stack<?> stack, Thread thread) {
351
352 if (!Head.reserveSpaceForLink(stack.availableSharedCapacity)) {
353 return null;
354 }
355 final WeakOrderQueue queue = new WeakOrderQueue(stack, thread);
356
357
358 stack.setHead(queue);
359
360 return queue;
361 }
362
363 WeakOrderQueue getNext() {
364 return next;
365 }
366
367 void setNext(WeakOrderQueue next) {
368 assert next != this;
369 this.next = next;
370 }
371
372 void reclaimAllSpaceAndUnlink() {
373 head.reclaimAllSpaceAndUnlink();
374 this.next = null;
375 }
376
377 void add(DefaultHandle<?> handle) {
378 handle.lastRecycledId = id;
379
380
381
382
383 if (handleRecycleCount < interval) {
384 handleRecycleCount++;
385
386 return;
387 }
388 handleRecycleCount = 0;
389
390 Link tail = this.tail;
391 int writeIndex;
392 if ((writeIndex = tail.get()) == LINK_CAPACITY) {
393 Link link = head.newLink();
394 if (link == null) {
395
396 return;
397 }
398
399 this.tail = tail = tail.next = link;
400
401 writeIndex = tail.get();
402 }
403 tail.elements[writeIndex] = handle;
404 handle.stack = null;
405
406
407 tail.lazySet(writeIndex + 1);
408 }
409
410 boolean hasFinalData() {
411 return tail.readIndex != tail.get();
412 }
413
414
415 @SuppressWarnings("rawtypes")
416 boolean transfer(Stack<?> dst) {
417 Link head = this.head.link;
418 if (head == null) {
419 return false;
420 }
421
422 if (head.readIndex == LINK_CAPACITY) {
423 if (head.next == null) {
424 return false;
425 }
426 head = head.next;
427 this.head.relink(head);
428 }
429
430 final int srcStart = head.readIndex;
431 int srcEnd = head.get();
432 final int srcSize = srcEnd - srcStart;
433 if (srcSize == 0) {
434 return false;
435 }
436
437 final int dstSize = dst.size;
438 final int expectedCapacity = dstSize + srcSize;
439
440 if (expectedCapacity > dst.elements.length) {
441 final int actualCapacity = dst.increaseCapacity(expectedCapacity);
442 srcEnd = min(srcStart + actualCapacity - dstSize, srcEnd);
443 }
444
445 if (srcStart != srcEnd) {
446 final DefaultHandle[] srcElems = head.elements;
447 final DefaultHandle[] dstElems = dst.elements;
448 int newDstSize = dstSize;
449 for (int i = srcStart; i < srcEnd; i++) {
450 DefaultHandle<?> element = srcElems[i];
451 if (element.recycleId == 0) {
452 element.recycleId = element.lastRecycledId;
453 } else if (element.recycleId != element.lastRecycledId) {
454 throw new IllegalStateException("recycled already");
455 }
456 srcElems[i] = null;
457
458 if (dst.dropHandle(element)) {
459
460 continue;
461 }
462 element.stack = dst;
463 dstElems[newDstSize ++] = element;
464 }
465
466 if (srcEnd == LINK_CAPACITY && head.next != null) {
467
468 this.head.relink(head.next);
469 }
470
471 head.readIndex = srcEnd;
472 if (dst.size == newDstSize) {
473 return false;
474 }
475 dst.size = newDstSize;
476 return true;
477 } else {
478
479 return false;
480 }
481 }
482 }
483
484 private static final class Stack<T> {
485
486
487
488
489
490 final Recycler<T> parent;
491
492
493
494
495
496
497
498 final WeakReference<Thread> threadRef;
499 final AtomicInteger availableSharedCapacity;
500 private final int maxDelayedQueues;
501
502 private final int maxCapacity;
503 private final int interval;
504 private final int delayedQueueInterval;
505 DefaultHandle<?>[] elements;
506 int size;
507 private int handleRecycleCount;
508 private WeakOrderQueue cursor, prev;
509 private volatile WeakOrderQueue head;
510
511 Stack(Recycler<T> parent, Thread thread, int maxCapacity, int maxSharedCapacityFactor,
512 int interval, int maxDelayedQueues, int delayedQueueInterval) {
513 this.parent = parent;
514 threadRef = new WeakReference<Thread>(thread);
515 this.maxCapacity = maxCapacity;
516 availableSharedCapacity = new AtomicInteger(max(maxCapacity / maxSharedCapacityFactor, LINK_CAPACITY));
517 elements = new DefaultHandle[min(INITIAL_CAPACITY, maxCapacity)];
518 this.interval = interval;
519 this.delayedQueueInterval = delayedQueueInterval;
520 handleRecycleCount = interval;
521 this.maxDelayedQueues = maxDelayedQueues;
522 }
523
524
525 synchronized void setHead(WeakOrderQueue queue) {
526 queue.setNext(head);
527 head = queue;
528 }
529
530 int increaseCapacity(int expectedCapacity) {
531 int newCapacity = elements.length;
532 int maxCapacity = this.maxCapacity;
533 do {
534 newCapacity <<= 1;
535 } while (newCapacity < expectedCapacity && newCapacity < maxCapacity);
536
537 newCapacity = min(newCapacity, maxCapacity);
538 if (newCapacity != elements.length) {
539 elements = Arrays.copyOf(elements, newCapacity);
540 }
541
542 return newCapacity;
543 }
544
545 @SuppressWarnings({ "unchecked", "rawtypes" })
546 DefaultHandle<T> pop() {
547 int size = this.size;
548 if (size == 0) {
549 if (!scavenge()) {
550 return null;
551 }
552 size = this.size;
553 if (size <= 0) {
554
555 return null;
556 }
557 }
558 size --;
559 DefaultHandle ret = elements[size];
560 elements[size] = null;
561
562
563
564 this.size = size;
565
566 if (ret.lastRecycledId != ret.recycleId) {
567 throw new IllegalStateException("recycled multiple times");
568 }
569 ret.recycleId = 0;
570 ret.lastRecycledId = 0;
571 return ret;
572 }
573
574 private boolean scavenge() {
575
576 if (scavengeSome()) {
577 return true;
578 }
579
580
581 prev = null;
582 cursor = head;
583 return false;
584 }
585
586 private boolean scavengeSome() {
587 WeakOrderQueue prev;
588 WeakOrderQueue cursor = this.cursor;
589 if (cursor == null) {
590 prev = null;
591 cursor = head;
592 if (cursor == null) {
593 return false;
594 }
595 } else {
596 prev = this.prev;
597 }
598
599 boolean success = false;
600 do {
601 if (cursor.transfer(this)) {
602 success = true;
603 break;
604 }
605 WeakOrderQueue next = cursor.getNext();
606 if (cursor.get() == null) {
607
608
609
610 if (cursor.hasFinalData()) {
611 for (;;) {
612 if (cursor.transfer(this)) {
613 success = true;
614 } else {
615 break;
616 }
617 }
618 }
619
620 if (prev != null) {
621
622 cursor.reclaimAllSpaceAndUnlink();
623 prev.setNext(next);
624 }
625 } else {
626 prev = cursor;
627 }
628
629 cursor = next;
630
631 } while (cursor != null && !success);
632
633 this.prev = prev;
634 this.cursor = cursor;
635 return success;
636 }
637
638 void push(DefaultHandle<?> item) {
639 Thread currentThread = Thread.currentThread();
640 if (threadRef.get() == currentThread) {
641
642 pushNow(item);
643 } else {
644
645
646
647 pushLater(item, currentThread);
648 }
649 }
650
651 private void pushNow(DefaultHandle<?> item) {
652 if ((item.recycleId | item.lastRecycledId) != 0) {
653 throw new IllegalStateException("recycled already");
654 }
655 item.recycleId = item.lastRecycledId = OWN_THREAD_ID;
656
657 int size = this.size;
658 if (size >= maxCapacity || dropHandle(item)) {
659
660 return;
661 }
662 if (size == elements.length) {
663 elements = Arrays.copyOf(elements, min(size << 1, maxCapacity));
664 }
665
666 elements[size] = item;
667 this.size = size + 1;
668 }
669
670 private void pushLater(DefaultHandle<?> item, Thread thread) {
671 if (maxDelayedQueues == 0) {
672
673 return;
674 }
675
676
677
678
679 Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get();
680 WeakOrderQueue queue = delayedRecycled.get(this);
681 if (queue == null) {
682 if (delayedRecycled.size() >= maxDelayedQueues) {
683
684 delayedRecycled.put(this, WeakOrderQueue.DUMMY);
685 return;
686 }
687
688 if ((queue = newWeakOrderQueue(thread)) == null) {
689
690 return;
691 }
692 delayedRecycled.put(this, queue);
693 } else if (queue == WeakOrderQueue.DUMMY) {
694
695 return;
696 }
697
698 queue.add(item);
699 }
700
701
704 private WeakOrderQueue newWeakOrderQueue(Thread thread) {
705 return WeakOrderQueue.newQueue(this, thread);
706 }
707
708 boolean dropHandle(DefaultHandle<?> handle) {
709 if (!handle.hasBeenRecycled) {
710 if (handleRecycleCount < interval) {
711 handleRecycleCount++;
712
713 return true;
714 }
715 handleRecycleCount = 0;
716 handle.hasBeenRecycled = true;
717 }
718 return false;
719 }
720
721 DefaultHandle<T> newHandle() {
722 return new DefaultHandle<T>(this);
723 }
724 }
725 }
726