1 /*
2  * Licensed under the Apache License, Version 2.0 (the "License");
3  * you may not use this file except in compliance with the License.
4  * You may obtain a copy of the License at
5  *
6  * http://www.apache.org/licenses/LICENSE-2.0
7  *
8  * Unless required by applicable law or agreed to in writing, software
9  * distributed under the License is distributed on an "AS IS" BASIS,
10  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11  * See the License for the specific language governing permissions and
12  * limitations under the License.
13  */

14 package io.netty.util.internal.shaded.org.jctools.queues;
15
16 import static io.netty.util.internal.shaded.org.jctools.util.UnsafeAccess.UNSAFE;
17 import static io.netty.util.internal.shaded.org.jctools.util.UnsafeAccess.fieldOffset;
18 import static io.netty.util.internal.shaded.org.jctools.util.UnsafeRefArrayAccess.*;
19
20 abstract class MpscArrayQueueL1Pad<E> extends ConcurrentCircularArrayQueue<E>
21 {
22     byte b000,b001,b002,b003,b004,b005,b006,b007;//  8b
23     byte b010,b011,b012,b013,b014,b015,b016,b017;// 16b
24     byte b020,b021,b022,b023,b024,b025,b026,b027;// 24b
25     byte b030,b031,b032,b033,b034,b035,b036,b037;// 32b
26     byte b040,b041,b042,b043,b044,b045,b046,b047;// 40b
27     byte b050,b051,b052,b053,b054,b055,b056,b057;// 48b
28     byte b060,b061,b062,b063,b064,b065,b066,b067;// 56b
29     byte b070,b071,b072,b073,b074,b075,b076,b077;// 64b
30     byte b100,b101,b102,b103,b104,b105,b106,b107;// 72b
31     byte b110,b111,b112,b113,b114,b115,b116,b117;// 80b
32     byte b120,b121,b122,b123,b124,b125,b126,b127;// 88b
33     byte b130,b131,b132,b133,b134,b135,b136,b137;// 96b
34     byte b140,b141,b142,b143,b144,b145,b146,b147;//104b
35     byte b150,b151,b152,b153,b154,b155,b156,b157;//112b
36     byte b160,b161,b162,b163,b164,b165,b166,b167;//120b
37     // byte b170,b171,b172,b173,b174,b175,b176,b177;//128b
38
39     MpscArrayQueueL1Pad(int capacity)
40     {
41         super(capacity);
42     }
43 }
44
45 //$gen:ordered-fields
46 abstract class MpscArrayQueueProducerIndexField<E> extends MpscArrayQueueL1Pad<E>
47 {
48     private final static long P_INDEX_OFFSET = fieldOffset(MpscArrayQueueProducerIndexField.class"producerIndex");
49
50     private volatile long producerIndex;
51
52     MpscArrayQueueProducerIndexField(int capacity)
53     {
54         super(capacity);
55     }
56
57     @Override
58     public final long lvProducerIndex()
59     {
60         return producerIndex;
61     }
62
63     final boolean casProducerIndex(long expect, long newValue)
64     {
65         return UNSAFE.compareAndSwapLong(this, P_INDEX_OFFSET, expect, newValue);
66     }
67 }
68
69 abstract class MpscArrayQueueMidPad<E> extends MpscArrayQueueProducerIndexField<E>
70 {
71     byte b000,b001,b002,b003,b004,b005,b006,b007;//  8b
72     byte b010,b011,b012,b013,b014,b015,b016,b017;// 16b
73     byte b020,b021,b022,b023,b024,b025,b026,b027;// 24b
74     byte b030,b031,b032,b033,b034,b035,b036,b037;// 32b
75     byte b040,b041,b042,b043,b044,b045,b046,b047;// 40b
76     byte b050,b051,b052,b053,b054,b055,b056,b057;// 48b
77     byte b060,b061,b062,b063,b064,b065,b066,b067;// 56b
78     byte b070,b071,b072,b073,b074,b075,b076,b077;// 64b
79     byte b100,b101,b102,b103,b104,b105,b106,b107;// 72b
80     byte b110,b111,b112,b113,b114,b115,b116,b117;// 80b
81     byte b120,b121,b122,b123,b124,b125,b126,b127;// 88b
82     byte b130,b131,b132,b133,b134,b135,b136,b137;// 96b
83     byte b140,b141,b142,b143,b144,b145,b146,b147;//104b
84     byte b150,b151,b152,b153,b154,b155,b156,b157;//112b
85     byte b160,b161,b162,b163,b164,b165,b166,b167;//120b
86     byte b170,b171,b172,b173,b174,b175,b176,b177;//128b
87
88     MpscArrayQueueMidPad(int capacity)
89     {
90         super(capacity);
91     }
92 }
93
94 //$gen:ordered-fields
95 abstract class MpscArrayQueueProducerLimitField<E> extends MpscArrayQueueMidPad<E>
96 {
97     private final static long P_LIMIT_OFFSET = fieldOffset(MpscArrayQueueProducerLimitField.class"producerLimit");
98
99     // First unavailable index the producer may claim up to before rereading the consumer index
100     private volatile long producerLimit;
101
102     MpscArrayQueueProducerLimitField(int capacity)
103     {
104         super(capacity);
105         this.producerLimit = capacity;
106     }
107
108     final long lvProducerLimit()
109     {
110         return producerLimit;
111     }
112
113     final void soProducerLimit(long newValue)
114     {
115         UNSAFE.putOrderedLong(this, P_LIMIT_OFFSET, newValue);
116     }
117 }
118
119 abstract class MpscArrayQueueL2Pad<E> extends MpscArrayQueueProducerLimitField<E>
120 {
121     byte b000,b001,b002,b003,b004,b005,b006,b007;//  8b
122     byte b010,b011,b012,b013,b014,b015,b016,b017;// 16b
123     byte b020,b021,b022,b023,b024,b025,b026,b027;// 24b
124     byte b030,b031,b032,b033,b034,b035,b036,b037;// 32b
125     byte b040,b041,b042,b043,b044,b045,b046,b047;// 40b
126     byte b050,b051,b052,b053,b054,b055,b056,b057;// 48b
127     byte b060,b061,b062,b063,b064,b065,b066,b067;// 56b
128     byte b070,b071,b072,b073,b074,b075,b076,b077;// 64b
129     byte b100,b101,b102,b103,b104,b105,b106,b107;// 72b
130     byte b110,b111,b112,b113,b114,b115,b116,b117;// 80b
131     byte b120,b121,b122,b123,b124,b125,b126,b127;// 88b
132     byte b130,b131,b132,b133,b134,b135,b136,b137;// 96b
133     byte b140,b141,b142,b143,b144,b145,b146,b147;//104b
134     byte b150,b151,b152,b153,b154,b155,b156,b157;//112b
135     byte b160,b161,b162,b163,b164,b165,b166,b167;//120b
136     // byte b170,b171,b172,b173,b174,b175,b176,b177;//128b
137
138     MpscArrayQueueL2Pad(int capacity)
139     {
140         super(capacity);
141     }
142 }
143
144 //$gen:ordered-fields
145 abstract class MpscArrayQueueConsumerIndexField<E> extends MpscArrayQueueL2Pad<E>
146 {
147     private final static long C_INDEX_OFFSET = fieldOffset(MpscArrayQueueConsumerIndexField.class"consumerIndex");
148
149     private volatile long consumerIndex;
150
151     MpscArrayQueueConsumerIndexField(int capacity)
152     {
153         super(capacity);
154     }
155
156     @Override
157     public final long lvConsumerIndex()
158     {
159         return consumerIndex;
160     }
161
162     final long lpConsumerIndex()
163     {
164         return UNSAFE.getLong(this, C_INDEX_OFFSET);
165     }
166
167     final void soConsumerIndex(long newValue)
168     {
169         UNSAFE.putOrderedLong(this, C_INDEX_OFFSET, newValue);
170     }
171 }
172
173 abstract class MpscArrayQueueL3Pad<E> extends MpscArrayQueueConsumerIndexField<E>
174 {
175     byte b000,b001,b002,b003,b004,b005,b006,b007;//  8b
176     byte b010,b011,b012,b013,b014,b015,b016,b017;// 16b
177     byte b020,b021,b022,b023,b024,b025,b026,b027;// 24b
178     byte b030,b031,b032,b033,b034,b035,b036,b037;// 32b
179     byte b040,b041,b042,b043,b044,b045,b046,b047;// 40b
180     byte b050,b051,b052,b053,b054,b055,b056,b057;// 48b
181     byte b060,b061,b062,b063,b064,b065,b066,b067;// 56b
182     byte b070,b071,b072,b073,b074,b075,b076,b077;// 64b
183     byte b100,b101,b102,b103,b104,b105,b106,b107;// 72b
184     byte b110,b111,b112,b113,b114,b115,b116,b117;// 80b
185     byte b120,b121,b122,b123,b124,b125,b126,b127;// 88b
186     byte b130,b131,b132,b133,b134,b135,b136,b137;// 96b
187     byte b140,b141,b142,b143,b144,b145,b146,b147;//104b
188     byte b150,b151,b152,b153,b154,b155,b156,b157;//112b
189     byte b160,b161,b162,b163,b164,b165,b166,b167;//120b
190     byte b170,b171,b172,b173,b174,b175,b176,b177;//128b
191
192     MpscArrayQueueL3Pad(int capacity)
193     {
194         super(capacity);
195     }
196 }
197
198 /**
199  * A Multi-Producer-Single-Consumer queue based on a {@link io.netty.util.internal.shaded.org.jctools.queues.ConcurrentCircularArrayQueue}. This
200  * implies that any thread may call the offer method, but only a single thread may call poll/peek for correctness to
201  * maintained. <br>
202  * This implementation follows patterns documented on the package level for False Sharing protection.<br>
203  * This implementation is using the <a href="http://sourceforge.net/projects/mc-fastflow/">Fast Flow</a>
204  * method for polling from the queue (with minor change to correctly publish the index) and an extension of
205  * the Leslie Lamport concurrent queue algorithm (originated by Martin Thompson) on the producer side.
206  */

207 public class MpscArrayQueue<E> extends MpscArrayQueueL3Pad<E>
208 {
209
210     public MpscArrayQueue(final int capacity)
211     {
212         super(capacity);
213     }
214
215     /**
216      * {@link #offer}} if {@link #size()} is less than threshold.
217      *
218      * @param e         the object to offer onto the queue, not null
219      * @param threshold the maximum allowable size
220      * @return true if the offer is successful, false if queue size exceeds threshold
221      * @since 1.0.1
222      */

223     public boolean offerIfBelowThreshold(final E e, int threshold)
224     {
225         if (null == e)
226         {
227             throw new NullPointerException();
228         }
229
230         final long mask = this.mask;
231         final long capacity = mask + 1;
232
233         long producerLimit = lvProducerLimit();
234         long pIndex;
235         do
236         {
237             pIndex = lvProducerIndex();
238             long available = producerLimit - pIndex;
239             long size = capacity - available;
240             if (size >= threshold)
241             {
242                 final long cIndex = lvConsumerIndex();
243                 size = pIndex - cIndex;
244                 if (size >= threshold)
245                 {
246                     return false// the size exceeds threshold
247                 }
248                 else
249                 {
250                     // update producer limit to the next index that we must recheck the consumer index
251                     producerLimit = cIndex + capacity;
252
253                     // this is racy, but the race is benign
254                     soProducerLimit(producerLimit);
255                 }
256             }
257         }
258         while (!casProducerIndex(pIndex, pIndex + 1));
259         /*
260          * NOTE: the new producer index value is made visible BEFORE the element in the array. If we relied on
261          * the index visibility to poll() we would need to handle the case where the element is not visible.
262          */

263
264         // Won CAS, move on to storing
265         final long offset = calcCircularRefElementOffset(pIndex, mask);
266         soRefElement(buffer, offset, e);
267         return true// AWESOME :)
268     }
269
270     /**
271      * {@inheritDoc} <br>
272      * <p>
273      * IMPLEMENTATION NOTES:<br>
274      * Lock free offer using a single CAS. As class name suggests access is permitted to many threads
275      * concurrently.
276      *
277      * @see java.util.Queue#offer
278      * @see io.netty.util.internal.shaded.org.jctools.queues.MessagePassingQueue#offer
279      */

280     @Override
281     public boolean offer(final E e)
282     {
283         if (null == e)
284         {
285             throw new NullPointerException();
286         }
287
288         // use a cached view on consumer index (potentially updated in loop)
289         final long mask = this.mask;
290         long producerLimit = lvProducerLimit();
291         long pIndex;
292         do
293         {
294             pIndex = lvProducerIndex();
295             if (pIndex >= producerLimit)
296             {
297                 final long cIndex = lvConsumerIndex();
298                 producerLimit = cIndex + mask + 1;
299
300                 if (pIndex >= producerLimit)
301                 {
302                     return false// FULL :(
303                 }
304                 else
305                 {
306                     // update producer limit to the next index that we must recheck the consumer index
307                     // this is racy, but the race is benign
308                     soProducerLimit(producerLimit);
309                 }
310             }
311         }
312         while (!casProducerIndex(pIndex, pIndex + 1));
313         /*
314          * NOTE: the new producer index value is made visible BEFORE the element in the array. If we relied on
315          * the index visibility to poll() we would need to handle the case where the element is not visible.
316          */

317
318         // Won CAS, move on to storing
319         final long offset = calcCircularRefElementOffset(pIndex, mask);
320         soRefElement(buffer, offset, e);
321         return true// AWESOME :)
322     }
323
324     /**
325      * A wait free alternative to offer which fails on CAS failure.
326      *
327      * @param e new element, not null
328      * @return 1 if next element cannot be filled, -1 if CAS failed, 0 if successful
329      */

330     public final int failFastOffer(final E e)
331     {
332         if (null == e)
333         {
334             throw new NullPointerException();
335         }
336         final long mask = this.mask;
337         final long capacity = mask + 1;
338         final long pIndex = lvProducerIndex();
339         long producerLimit = lvProducerLimit();
340         if (pIndex >= producerLimit)
341         {
342             final long cIndex = lvConsumerIndex();
343             producerLimit = cIndex + capacity;
344             if (pIndex >= producerLimit)
345             {
346                 return 1; // FULL :(
347             }
348             else
349             {
350                 // update producer limit to the next index that we must recheck the consumer index
351                 soProducerLimit(producerLimit);
352             }
353         }
354
355         // look Ma, no loop!
356         if (!casProducerIndex(pIndex, pIndex + 1))
357         {
358             return -1; // CAS FAIL :(
359         }
360
361         // Won CAS, move on to storing
362         final long offset = calcCircularRefElementOffset(pIndex, mask);
363         soRefElement(buffer, offset, e);
364         return 0; // AWESOME :)
365     }
366
367     /**
368      * {@inheritDoc}
369      * <p>
370      * IMPLEMENTATION NOTES:<br>
371      * Lock free poll using ordered loads/stores. As class name suggests access is limited to a single thread.
372      *
373      * @see java.util.Queue#poll
374      * @see io.netty.util.internal.shaded.org.jctools.queues.MessagePassingQueue#poll
375      */

376     @Override
377     public E poll()
378     {
379         final long cIndex = lpConsumerIndex();
380         final long offset = calcCircularRefElementOffset(cIndex, mask);
381         // Copy field to avoid re-reading after volatile load
382         final E[] buffer = this.buffer;
383
384         // If we can't see the next available element we can't poll
385         E e = lvRefElement(buffer, offset);
386         if (null == e)
387         {
388             /*
389              * NOTE: Queue may not actually be empty in the case of a producer (P1) being interrupted after
390              * winning the CAS on offer but before storing the element in the queue. Other producers may go on
391              * to fill up the queue after this element.
392              */

393             if (cIndex != lvProducerIndex())
394             {
395                 do
396                 {
397                     e = lvRefElement(buffer, offset);
398                 }
399                 while (e == null);
400             }
401             else
402             {
403                 return null;
404             }
405         }
406
407         spRefElement(buffer, offset, null);
408         soConsumerIndex(cIndex + 1);
409         return e;
410     }
411
412     /**
413      * {@inheritDoc}
414      * <p>
415      * IMPLEMENTATION NOTES:<br>
416      * Lock free peek using ordered loads. As class name suggests access is limited to a single thread.
417      *
418      * @see java.util.Queue#poll
419      * @see io.netty.util.internal.shaded.org.jctools.queues.MessagePassingQueue#poll
420      */

421     @Override
422     public E peek()
423     {
424         // Copy field to avoid re-reading after volatile load
425         final E[] buffer = this.buffer;
426
427         final long cIndex = lpConsumerIndex();
428         final long offset = calcCircularRefElementOffset(cIndex, mask);
429         E e = lvRefElement(buffer, offset);
430         if (null == e)
431         {
432             /*
433              * NOTE: Queue may not actually be empty in the case of a producer (P1) being interrupted after
434              * winning the CAS on offer but before storing the element in the queue. Other producers may go on
435              * to fill up the queue after this element.
436              */

437             if (cIndex != lvProducerIndex())
438             {
439                 do
440                 {
441                     e = lvRefElement(buffer, offset);
442                 }
443                 while (e == null);
444             }
445             else
446             {
447                 return null;
448             }
449         }
450         return e;
451     }
452
453     @Override
454     public boolean relaxedOffer(E e)
455     {
456         return offer(e);
457     }
458
459     @Override
460     public E relaxedPoll()
461     {
462         final E[] buffer = this.buffer;
463         final long cIndex = lpConsumerIndex();
464         final long offset = calcCircularRefElementOffset(cIndex, mask);
465
466         // If we can't see the next available element we can't poll
467         E e = lvRefElement(buffer, offset);
468         if (null == e)
469         {
470             return null;
471         }
472
473         spRefElement(buffer, offset, null);
474         soConsumerIndex(cIndex + 1);
475         return e;
476     }
477
478     @Override
479     public E relaxedPeek()
480     {
481         final E[] buffer = this.buffer;
482         final long mask = this.mask;
483         final long cIndex = lpConsumerIndex();
484         return lvRefElement(buffer, calcCircularRefElementOffset(cIndex, mask));
485     }
486
487     @Override
488     public int drain(final Consumer<E> c, final int limit)
489     {
490         if (null == c)
491             throw new IllegalArgumentException("c is null");
492         if (limit < 0)
493             throw new IllegalArgumentException("limit is negative: " + limit);
494         if (limit == 0)
495             return 0;
496
497         final E[] buffer = this.buffer;
498         final long mask = this.mask;
499         final long cIndex = lpConsumerIndex();
500
501         for (int i = 0; i < limit; i++)
502         {
503             final long index = cIndex + i;
504             final long offset = calcCircularRefElementOffset(index, mask);
505             final E e = lvRefElement(buffer, offset);
506             if (null == e)
507             {
508                 return i;
509             }
510             spRefElement(buffer, offset, null);
511             soConsumerIndex(index + 1); // ordered store -> atomic and ordered for size()
512             c.accept(e);
513         }
514         return limit;
515     }
516
517     @Override
518     public int fill(Supplier<E> s, int limit)
519     {
520         if (null == s)
521             throw new IllegalArgumentException("supplier is null");
522         if (limit < 0)
523             throw new IllegalArgumentException("limit is negative:" + limit);
524         if (limit == 0)
525             return 0;
526
527         final long mask = this.mask;
528         final long capacity = mask + 1;
529         long producerLimit = lvProducerLimit();
530         long pIndex;
531         int actualLimit;
532         do
533         {
534             pIndex = lvProducerIndex();
535             long available = producerLimit - pIndex;
536             if (available <= 0)
537             {
538                 final long cIndex = lvConsumerIndex();
539                 producerLimit = cIndex + capacity;
540                 available = producerLimit - pIndex;
541                 if (available <= 0)
542                 {
543                     return 0; // FULL :(
544                 }
545                 else
546                 {
547                     // update producer limit to the next index that we must recheck the consumer index
548                     soProducerLimit(producerLimit);
549                 }
550             }
551             actualLimit = Math.min((int) available, limit);
552         }
553         while (!casProducerIndex(pIndex, pIndex + actualLimit));
554         // right, now we claimed a few slots and can fill them with goodness
555         final E[] buffer = this.buffer;
556         for (int i = 0; i < actualLimit; i++)
557         {
558             // Won CAS, move on to storing
559             final long offset = calcCircularRefElementOffset(pIndex + i, mask);
560             soRefElement(buffer, offset, s.get());
561         }
562         return actualLimit;
563     }
564
565     @Override
566     public int drain(Consumer<E> c)
567     {
568         return drain(c, capacity());
569     }
570
571     @Override
572     public int fill(Supplier<E> s)
573     {
574         return MessagePassingQueueUtil.fillBounded(this, s);
575     }
576
577     @Override
578     public void drain(Consumer<E> c, WaitStrategy w, ExitCondition exit)
579     {
580         MessagePassingQueueUtil.drain(this, c, w, exit);
581     }
582
583     @Override
584     public void fill(Supplier<E> s, WaitStrategy wait, ExitCondition exit)
585     {
586         MessagePassingQueueUtil.fill(this, s, wait, exit);
587     }
588 }
589