1
14 package io.netty.util.internal.shaded.org.jctools.queues;
15
16 import static io.netty.util.internal.shaded.org.jctools.util.UnsafeAccess.UNSAFE;
17 import static io.netty.util.internal.shaded.org.jctools.util.UnsafeAccess.fieldOffset;
18 import static io.netty.util.internal.shaded.org.jctools.util.UnsafeRefArrayAccess.*;
19
20 abstract class MpscArrayQueueL1Pad<E> extends ConcurrentCircularArrayQueue<E>
21 {
22 byte b000,b001,b002,b003,b004,b005,b006,b007;
23 byte b010,b011,b012,b013,b014,b015,b016,b017;
24 byte b020,b021,b022,b023,b024,b025,b026,b027;
25 byte b030,b031,b032,b033,b034,b035,b036,b037;
26 byte b040,b041,b042,b043,b044,b045,b046,b047;
27 byte b050,b051,b052,b053,b054,b055,b056,b057;
28 byte b060,b061,b062,b063,b064,b065,b066,b067;
29 byte b070,b071,b072,b073,b074,b075,b076,b077;
30 byte b100,b101,b102,b103,b104,b105,b106,b107;
31 byte b110,b111,b112,b113,b114,b115,b116,b117;
32 byte b120,b121,b122,b123,b124,b125,b126,b127;
33 byte b130,b131,b132,b133,b134,b135,b136,b137;
34 byte b140,b141,b142,b143,b144,b145,b146,b147;
35 byte b150,b151,b152,b153,b154,b155,b156,b157;
36 byte b160,b161,b162,b163,b164,b165,b166,b167;
37
38
39 MpscArrayQueueL1Pad(int capacity)
40 {
41 super(capacity);
42 }
43 }
44
45
46 abstract class MpscArrayQueueProducerIndexField<E> extends MpscArrayQueueL1Pad<E>
47 {
48 private final static long P_INDEX_OFFSET = fieldOffset(MpscArrayQueueProducerIndexField.class, "producerIndex");
49
50 private volatile long producerIndex;
51
52 MpscArrayQueueProducerIndexField(int capacity)
53 {
54 super(capacity);
55 }
56
57 @Override
58 public final long lvProducerIndex()
59 {
60 return producerIndex;
61 }
62
63 final boolean casProducerIndex(long expect, long newValue)
64 {
65 return UNSAFE.compareAndSwapLong(this, P_INDEX_OFFSET, expect, newValue);
66 }
67 }
68
69 abstract class MpscArrayQueueMidPad<E> extends MpscArrayQueueProducerIndexField<E>
70 {
71 byte b000,b001,b002,b003,b004,b005,b006,b007;
72 byte b010,b011,b012,b013,b014,b015,b016,b017;
73 byte b020,b021,b022,b023,b024,b025,b026,b027;
74 byte b030,b031,b032,b033,b034,b035,b036,b037;
75 byte b040,b041,b042,b043,b044,b045,b046,b047;
76 byte b050,b051,b052,b053,b054,b055,b056,b057;
77 byte b060,b061,b062,b063,b064,b065,b066,b067;
78 byte b070,b071,b072,b073,b074,b075,b076,b077;
79 byte b100,b101,b102,b103,b104,b105,b106,b107;
80 byte b110,b111,b112,b113,b114,b115,b116,b117;
81 byte b120,b121,b122,b123,b124,b125,b126,b127;
82 byte b130,b131,b132,b133,b134,b135,b136,b137;
83 byte b140,b141,b142,b143,b144,b145,b146,b147;
84 byte b150,b151,b152,b153,b154,b155,b156,b157;
85 byte b160,b161,b162,b163,b164,b165,b166,b167;
86 byte b170,b171,b172,b173,b174,b175,b176,b177;
87
88 MpscArrayQueueMidPad(int capacity)
89 {
90 super(capacity);
91 }
92 }
93
94
95 abstract class MpscArrayQueueProducerLimitField<E> extends MpscArrayQueueMidPad<E>
96 {
97 private final static long P_LIMIT_OFFSET = fieldOffset(MpscArrayQueueProducerLimitField.class, "producerLimit");
98
99
100 private volatile long producerLimit;
101
102 MpscArrayQueueProducerLimitField(int capacity)
103 {
104 super(capacity);
105 this.producerLimit = capacity;
106 }
107
108 final long lvProducerLimit()
109 {
110 return producerLimit;
111 }
112
113 final void soProducerLimit(long newValue)
114 {
115 UNSAFE.putOrderedLong(this, P_LIMIT_OFFSET, newValue);
116 }
117 }
118
119 abstract class MpscArrayQueueL2Pad<E> extends MpscArrayQueueProducerLimitField<E>
120 {
121 byte b000,b001,b002,b003,b004,b005,b006,b007;
122 byte b010,b011,b012,b013,b014,b015,b016,b017;
123 byte b020,b021,b022,b023,b024,b025,b026,b027;
124 byte b030,b031,b032,b033,b034,b035,b036,b037;
125 byte b040,b041,b042,b043,b044,b045,b046,b047;
126 byte b050,b051,b052,b053,b054,b055,b056,b057;
127 byte b060,b061,b062,b063,b064,b065,b066,b067;
128 byte b070,b071,b072,b073,b074,b075,b076,b077;
129 byte b100,b101,b102,b103,b104,b105,b106,b107;
130 byte b110,b111,b112,b113,b114,b115,b116,b117;
131 byte b120,b121,b122,b123,b124,b125,b126,b127;
132 byte b130,b131,b132,b133,b134,b135,b136,b137;
133 byte b140,b141,b142,b143,b144,b145,b146,b147;
134 byte b150,b151,b152,b153,b154,b155,b156,b157;
135 byte b160,b161,b162,b163,b164,b165,b166,b167;
136
137
138 MpscArrayQueueL2Pad(int capacity)
139 {
140 super(capacity);
141 }
142 }
143
144
145 abstract class MpscArrayQueueConsumerIndexField<E> extends MpscArrayQueueL2Pad<E>
146 {
147 private final static long C_INDEX_OFFSET = fieldOffset(MpscArrayQueueConsumerIndexField.class, "consumerIndex");
148
149 private volatile long consumerIndex;
150
151 MpscArrayQueueConsumerIndexField(int capacity)
152 {
153 super(capacity);
154 }
155
156 @Override
157 public final long lvConsumerIndex()
158 {
159 return consumerIndex;
160 }
161
162 final long lpConsumerIndex()
163 {
164 return UNSAFE.getLong(this, C_INDEX_OFFSET);
165 }
166
167 final void soConsumerIndex(long newValue)
168 {
169 UNSAFE.putOrderedLong(this, C_INDEX_OFFSET, newValue);
170 }
171 }
172
173 abstract class MpscArrayQueueL3Pad<E> extends MpscArrayQueueConsumerIndexField<E>
174 {
175 byte b000,b001,b002,b003,b004,b005,b006,b007;
176 byte b010,b011,b012,b013,b014,b015,b016,b017;
177 byte b020,b021,b022,b023,b024,b025,b026,b027;
178 byte b030,b031,b032,b033,b034,b035,b036,b037;
179 byte b040,b041,b042,b043,b044,b045,b046,b047;
180 byte b050,b051,b052,b053,b054,b055,b056,b057;
181 byte b060,b061,b062,b063,b064,b065,b066,b067;
182 byte b070,b071,b072,b073,b074,b075,b076,b077;
183 byte b100,b101,b102,b103,b104,b105,b106,b107;
184 byte b110,b111,b112,b113,b114,b115,b116,b117;
185 byte b120,b121,b122,b123,b124,b125,b126,b127;
186 byte b130,b131,b132,b133,b134,b135,b136,b137;
187 byte b140,b141,b142,b143,b144,b145,b146,b147;
188 byte b150,b151,b152,b153,b154,b155,b156,b157;
189 byte b160,b161,b162,b163,b164,b165,b166,b167;
190 byte b170,b171,b172,b173,b174,b175,b176,b177;
191
192 MpscArrayQueueL3Pad(int capacity)
193 {
194 super(capacity);
195 }
196 }
197
198
207 public class MpscArrayQueue<E> extends MpscArrayQueueL3Pad<E>
208 {
209
210 public MpscArrayQueue(final int capacity)
211 {
212 super(capacity);
213 }
214
215
223 public boolean offerIfBelowThreshold(final E e, int threshold)
224 {
225 if (null == e)
226 {
227 throw new NullPointerException();
228 }
229
230 final long mask = this.mask;
231 final long capacity = mask + 1;
232
233 long producerLimit = lvProducerLimit();
234 long pIndex;
235 do
236 {
237 pIndex = lvProducerIndex();
238 long available = producerLimit - pIndex;
239 long size = capacity - available;
240 if (size >= threshold)
241 {
242 final long cIndex = lvConsumerIndex();
243 size = pIndex - cIndex;
244 if (size >= threshold)
245 {
246 return false;
247 }
248 else
249 {
250
251 producerLimit = cIndex + capacity;
252
253
254 soProducerLimit(producerLimit);
255 }
256 }
257 }
258 while (!casProducerIndex(pIndex, pIndex + 1));
259
263
264
265 final long offset = calcCircularRefElementOffset(pIndex, mask);
266 soRefElement(buffer, offset, e);
267 return true;
268 }
269
270
280 @Override
281 public boolean offer(final E e)
282 {
283 if (null == e)
284 {
285 throw new NullPointerException();
286 }
287
288
289 final long mask = this.mask;
290 long producerLimit = lvProducerLimit();
291 long pIndex;
292 do
293 {
294 pIndex = lvProducerIndex();
295 if (pIndex >= producerLimit)
296 {
297 final long cIndex = lvConsumerIndex();
298 producerLimit = cIndex + mask + 1;
299
300 if (pIndex >= producerLimit)
301 {
302 return false;
303 }
304 else
305 {
306
307
308 soProducerLimit(producerLimit);
309 }
310 }
311 }
312 while (!casProducerIndex(pIndex, pIndex + 1));
313
317
318
319 final long offset = calcCircularRefElementOffset(pIndex, mask);
320 soRefElement(buffer, offset, e);
321 return true;
322 }
323
324
330 public final int failFastOffer(final E e)
331 {
332 if (null == e)
333 {
334 throw new NullPointerException();
335 }
336 final long mask = this.mask;
337 final long capacity = mask + 1;
338 final long pIndex = lvProducerIndex();
339 long producerLimit = lvProducerLimit();
340 if (pIndex >= producerLimit)
341 {
342 final long cIndex = lvConsumerIndex();
343 producerLimit = cIndex + capacity;
344 if (pIndex >= producerLimit)
345 {
346 return 1;
347 }
348 else
349 {
350
351 soProducerLimit(producerLimit);
352 }
353 }
354
355
356 if (!casProducerIndex(pIndex, pIndex + 1))
357 {
358 return -1;
359 }
360
361
362 final long offset = calcCircularRefElementOffset(pIndex, mask);
363 soRefElement(buffer, offset, e);
364 return 0;
365 }
366
367
376 @Override
377 public E poll()
378 {
379 final long cIndex = lpConsumerIndex();
380 final long offset = calcCircularRefElementOffset(cIndex, mask);
381
382 final E[] buffer = this.buffer;
383
384
385 E e = lvRefElement(buffer, offset);
386 if (null == e)
387 {
388
393 if (cIndex != lvProducerIndex())
394 {
395 do
396 {
397 e = lvRefElement(buffer, offset);
398 }
399 while (e == null);
400 }
401 else
402 {
403 return null;
404 }
405 }
406
407 spRefElement(buffer, offset, null);
408 soConsumerIndex(cIndex + 1);
409 return e;
410 }
411
412
421 @Override
422 public E peek()
423 {
424
425 final E[] buffer = this.buffer;
426
427 final long cIndex = lpConsumerIndex();
428 final long offset = calcCircularRefElementOffset(cIndex, mask);
429 E e = lvRefElement(buffer, offset);
430 if (null == e)
431 {
432
437 if (cIndex != lvProducerIndex())
438 {
439 do
440 {
441 e = lvRefElement(buffer, offset);
442 }
443 while (e == null);
444 }
445 else
446 {
447 return null;
448 }
449 }
450 return e;
451 }
452
453 @Override
454 public boolean relaxedOffer(E e)
455 {
456 return offer(e);
457 }
458
459 @Override
460 public E relaxedPoll()
461 {
462 final E[] buffer = this.buffer;
463 final long cIndex = lpConsumerIndex();
464 final long offset = calcCircularRefElementOffset(cIndex, mask);
465
466
467 E e = lvRefElement(buffer, offset);
468 if (null == e)
469 {
470 return null;
471 }
472
473 spRefElement(buffer, offset, null);
474 soConsumerIndex(cIndex + 1);
475 return e;
476 }
477
478 @Override
479 public E relaxedPeek()
480 {
481 final E[] buffer = this.buffer;
482 final long mask = this.mask;
483 final long cIndex = lpConsumerIndex();
484 return lvRefElement(buffer, calcCircularRefElementOffset(cIndex, mask));
485 }
486
487 @Override
488 public int drain(final Consumer<E> c, final int limit)
489 {
490 if (null == c)
491 throw new IllegalArgumentException("c is null");
492 if (limit < 0)
493 throw new IllegalArgumentException("limit is negative: " + limit);
494 if (limit == 0)
495 return 0;
496
497 final E[] buffer = this.buffer;
498 final long mask = this.mask;
499 final long cIndex = lpConsumerIndex();
500
501 for (int i = 0; i < limit; i++)
502 {
503 final long index = cIndex + i;
504 final long offset = calcCircularRefElementOffset(index, mask);
505 final E e = lvRefElement(buffer, offset);
506 if (null == e)
507 {
508 return i;
509 }
510 spRefElement(buffer, offset, null);
511 soConsumerIndex(index + 1);
512 c.accept(e);
513 }
514 return limit;
515 }
516
517 @Override
518 public int fill(Supplier<E> s, int limit)
519 {
520 if (null == s)
521 throw new IllegalArgumentException("supplier is null");
522 if (limit < 0)
523 throw new IllegalArgumentException("limit is negative:" + limit);
524 if (limit == 0)
525 return 0;
526
527 final long mask = this.mask;
528 final long capacity = mask + 1;
529 long producerLimit = lvProducerLimit();
530 long pIndex;
531 int actualLimit;
532 do
533 {
534 pIndex = lvProducerIndex();
535 long available = producerLimit - pIndex;
536 if (available <= 0)
537 {
538 final long cIndex = lvConsumerIndex();
539 producerLimit = cIndex + capacity;
540 available = producerLimit - pIndex;
541 if (available <= 0)
542 {
543 return 0;
544 }
545 else
546 {
547
548 soProducerLimit(producerLimit);
549 }
550 }
551 actualLimit = Math.min((int) available, limit);
552 }
553 while (!casProducerIndex(pIndex, pIndex + actualLimit));
554
555 final E[] buffer = this.buffer;
556 for (int i = 0; i < actualLimit; i++)
557 {
558
559 final long offset = calcCircularRefElementOffset(pIndex + i, mask);
560 soRefElement(buffer, offset, s.get());
561 }
562 return actualLimit;
563 }
564
565 @Override
566 public int drain(Consumer<E> c)
567 {
568 return drain(c, capacity());
569 }
570
571 @Override
572 public int fill(Supplier<E> s)
573 {
574 return MessagePassingQueueUtil.fillBounded(this, s);
575 }
576
577 @Override
578 public void drain(Consumer<E> c, WaitStrategy w, ExitCondition exit)
579 {
580 MessagePassingQueueUtil.drain(this, c, w, exit);
581 }
582
583 @Override
584 public void fill(Supplier<E> s, WaitStrategy wait, ExitCondition exit)
585 {
586 MessagePassingQueueUtil.fill(this, s, wait, exit);
587 }
588 }
589