1
16 package io.netty.util.concurrent;
17
18 import io.netty.util.internal.InternalThreadLocalMap;
19 import io.netty.util.internal.PlatformDependent;
20 import io.netty.util.internal.StringUtil;
21 import io.netty.util.internal.SystemPropertyUtil;
22 import io.netty.util.internal.ThrowableUtil;
23 import io.netty.util.internal.logging.InternalLogger;
24 import io.netty.util.internal.logging.InternalLoggerFactory;
25
26 import java.util.concurrent.CancellationException;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.TimeoutException;
30 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
31
32 import static io.netty.util.internal.ObjectUtil.checkNotNull;
33 import static java.util.concurrent.TimeUnit.MILLISECONDS;
34
35 public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
36 private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultPromise.class);
37 private static final InternalLogger rejectedExecutionLogger =
38 InternalLoggerFactory.getInstance(DefaultPromise.class.getName() + ".rejectedExecution");
39 private static final int MAX_LISTENER_STACK_DEPTH = Math.min(8,
40 SystemPropertyUtil.getInt("io.netty.defaultPromise.maxListenerStackDepth", 8));
41 @SuppressWarnings("rawtypes")
42 private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER =
43 AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");
44 private static final Object SUCCESS = new Object();
45 private static final Object UNCANCELLABLE = new Object();
46 private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(ThrowableUtil.unknownStackTrace(
47 new CancellationException(), DefaultPromise.class, "cancel(...)"));
48 private static final StackTraceElement[] CANCELLATION_STACK = CANCELLATION_CAUSE_HOLDER.cause.getStackTrace();
49
50 private volatile Object result;
51 private final EventExecutor executor;
52
58 private Object listeners;
59
62 private short waiters;
63
64
68 private boolean notifyingListeners;
69
70
82 public DefaultPromise(EventExecutor executor) {
83 this.executor = checkNotNull(executor, "executor");
84 }
85
86
89 protected DefaultPromise() {
90
91 executor = null;
92 }
93
94 @Override
95 public Promise<V> setSuccess(V result) {
96 if (setSuccess0(result)) {
97 return this;
98 }
99 throw new IllegalStateException("complete already: " + this);
100 }
101
102 @Override
103 public boolean trySuccess(V result) {
104 return setSuccess0(result);
105 }
106
107 @Override
108 public Promise<V> setFailure(Throwable cause) {
109 if (setFailure0(cause)) {
110 return this;
111 }
112 throw new IllegalStateException("complete already: " + this, cause);
113 }
114
115 @Override
116 public boolean tryFailure(Throwable cause) {
117 return setFailure0(cause);
118 }
119
120 @Override
121 public boolean setUncancellable() {
122 if (RESULT_UPDATER.compareAndSet(this, null, UNCANCELLABLE)) {
123 return true;
124 }
125 Object result = this.result;
126 return !isDone0(result) || !isCancelled0(result);
127 }
128
129 @Override
130 public boolean isSuccess() {
131 Object result = this.result;
132 return result != null && result != UNCANCELLABLE && !(result instanceof CauseHolder);
133 }
134
135 @Override
136 public boolean isCancellable() {
137 return result == null;
138 }
139
140 private static final class LeanCancellationException extends CancellationException {
141 private static final long serialVersionUID = 2794674970981187807L;
142
143 @Override
144 public Throwable fillInStackTrace() {
145 setStackTrace(CANCELLATION_STACK);
146 return this;
147 }
148
149 @Override
150 public String toString() {
151 return CancellationException.class.getName();
152 }
153 }
154
155 @Override
156 public Throwable cause() {
157 return cause0(result);
158 }
159
160 private Throwable cause0(Object result) {
161 if (!(result instanceof CauseHolder)) {
162 return null;
163 }
164 if (result == CANCELLATION_CAUSE_HOLDER) {
165 CancellationException ce = new LeanCancellationException();
166 if (RESULT_UPDATER.compareAndSet(this, CANCELLATION_CAUSE_HOLDER, new CauseHolder(ce))) {
167 return ce;
168 }
169 result = this.result;
170 }
171 return ((CauseHolder) result).cause;
172 }
173
174 @Override
175 public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
176 checkNotNull(listener, "listener");
177
178 synchronized (this) {
179 addListener0(listener);
180 }
181
182 if (isDone()) {
183 notifyListeners();
184 }
185
186 return this;
187 }
188
189 @Override
190 public Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {
191 checkNotNull(listeners, "listeners");
192
193 synchronized (this) {
194 for (GenericFutureListener<? extends Future<? super V>> listener : listeners) {
195 if (listener == null) {
196 break;
197 }
198 addListener0(listener);
199 }
200 }
201
202 if (isDone()) {
203 notifyListeners();
204 }
205
206 return this;
207 }
208
209 @Override
210 public Promise<V> removeListener(final GenericFutureListener<? extends Future<? super V>> listener) {
211 checkNotNull(listener, "listener");
212
213 synchronized (this) {
214 removeListener0(listener);
215 }
216
217 return this;
218 }
219
220 @Override
221 public Promise<V> removeListeners(final GenericFutureListener<? extends Future<? super V>>... listeners) {
222 checkNotNull(listeners, "listeners");
223
224 synchronized (this) {
225 for (GenericFutureListener<? extends Future<? super V>> listener : listeners) {
226 if (listener == null) {
227 break;
228 }
229 removeListener0(listener);
230 }
231 }
232
233 return this;
234 }
235
236 @Override
237 public Promise<V> await() throws InterruptedException {
238 if (isDone()) {
239 return this;
240 }
241
242 if (Thread.interrupted()) {
243 throw new InterruptedException(toString());
244 }
245
246 checkDeadLock();
247
248 synchronized (this) {
249 while (!isDone()) {
250 incWaiters();
251 try {
252 wait();
253 } finally {
254 decWaiters();
255 }
256 }
257 }
258 return this;
259 }
260
261 @Override
262 public Promise<V> awaitUninterruptibly() {
263 if (isDone()) {
264 return this;
265 }
266
267 checkDeadLock();
268
269 boolean interrupted = false;
270 synchronized (this) {
271 while (!isDone()) {
272 incWaiters();
273 try {
274 wait();
275 } catch (InterruptedException e) {
276
277 interrupted = true;
278 } finally {
279 decWaiters();
280 }
281 }
282 }
283
284 if (interrupted) {
285 Thread.currentThread().interrupt();
286 }
287
288 return this;
289 }
290
291 @Override
292 public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
293 return await0(unit.toNanos(timeout), true);
294 }
295
296 @Override
297 public boolean await(long timeoutMillis) throws InterruptedException {
298 return await0(MILLISECONDS.toNanos(timeoutMillis), true);
299 }
300
301 @Override
302 public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
303 try {
304 return await0(unit.toNanos(timeout), false);
305 } catch (InterruptedException e) {
306
307 throw new InternalError();
308 }
309 }
310
311 @Override
312 public boolean awaitUninterruptibly(long timeoutMillis) {
313 try {
314 return await0(MILLISECONDS.toNanos(timeoutMillis), false);
315 } catch (InterruptedException e) {
316
317 throw new InternalError();
318 }
319 }
320
321 @SuppressWarnings("unchecked")
322 @Override
323 public V getNow() {
324 Object result = this.result;
325 if (result instanceof CauseHolder || result == SUCCESS || result == UNCANCELLABLE) {
326 return null;
327 }
328 return (V) result;
329 }
330
331 @SuppressWarnings("unchecked")
332 @Override
333 public V get() throws InterruptedException, ExecutionException {
334 Object result = this.result;
335 if (!isDone0(result)) {
336 await();
337 result = this.result;
338 }
339 if (result == SUCCESS || result == UNCANCELLABLE) {
340 return null;
341 }
342 Throwable cause = cause0(result);
343 if (cause == null) {
344 return (V) result;
345 }
346 if (cause instanceof CancellationException) {
347 throw (CancellationException) cause;
348 }
349 throw new ExecutionException(cause);
350 }
351
352 @SuppressWarnings("unchecked")
353 @Override
354 public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
355 Object result = this.result;
356 if (!isDone0(result)) {
357 if (!await(timeout, unit)) {
358 throw new TimeoutException();
359 }
360 result = this.result;
361 }
362 if (result == SUCCESS || result == UNCANCELLABLE) {
363 return null;
364 }
365 Throwable cause = cause0(result);
366 if (cause == null) {
367 return (V) result;
368 }
369 if (cause instanceof CancellationException) {
370 throw (CancellationException) cause;
371 }
372 throw new ExecutionException(cause);
373 }
374
375
380 @Override
381 public boolean cancel(boolean mayInterruptIfRunning) {
382 if (RESULT_UPDATER.compareAndSet(this, null, CANCELLATION_CAUSE_HOLDER)) {
383 if (checkNotifyWaiters()) {
384 notifyListeners();
385 }
386 return true;
387 }
388 return false;
389 }
390
391 @Override
392 public boolean isCancelled() {
393 return isCancelled0(result);
394 }
395
396 @Override
397 public boolean isDone() {
398 return isDone0(result);
399 }
400
401 @Override
402 public Promise<V> sync() throws InterruptedException {
403 await();
404 rethrowIfFailed();
405 return this;
406 }
407
408 @Override
409 public Promise<V> syncUninterruptibly() {
410 awaitUninterruptibly();
411 rethrowIfFailed();
412 return this;
413 }
414
415 @Override
416 public String toString() {
417 return toStringBuilder().toString();
418 }
419
420 protected StringBuilder toStringBuilder() {
421 StringBuilder buf = new StringBuilder(64)
422 .append(StringUtil.simpleClassName(this))
423 .append('@')
424 .append(Integer.toHexString(hashCode()));
425
426 Object result = this.result;
427 if (result == SUCCESS) {
428 buf.append("(success)");
429 } else if (result == UNCANCELLABLE) {
430 buf.append("(uncancellable)");
431 } else if (result instanceof CauseHolder) {
432 buf.append("(failure: ")
433 .append(((CauseHolder) result).cause)
434 .append(')');
435 } else if (result != null) {
436 buf.append("(success: ")
437 .append(result)
438 .append(')');
439 } else {
440 buf.append("(incomplete)");
441 }
442
443 return buf;
444 }
445
446
454 protected EventExecutor executor() {
455 return executor;
456 }
457
458 protected void checkDeadLock() {
459 EventExecutor e = executor();
460 if (e != null && e.inEventLoop()) {
461 throw new BlockingOperationException(toString());
462 }
463 }
464
465
474 protected static void notifyListener(
475 EventExecutor eventExecutor, final Future<?> future, final GenericFutureListener<?> listener) {
476 notifyListenerWithStackOverFlowProtection(
477 checkNotNull(eventExecutor, "eventExecutor"),
478 checkNotNull(future, "future"),
479 checkNotNull(listener, "listener"));
480 }
481
482 private void notifyListeners() {
483 EventExecutor executor = executor();
484 if (executor.inEventLoop()) {
485 final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
486 final int stackDepth = threadLocals.futureListenerStackDepth();
487 if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
488 threadLocals.setFutureListenerStackDepth(stackDepth + 1);
489 try {
490 notifyListenersNow();
491 } finally {
492 threadLocals.setFutureListenerStackDepth(stackDepth);
493 }
494 return;
495 }
496 }
497
498 safeExecute(executor, new Runnable() {
499 @Override
500 public void run() {
501 notifyListenersNow();
502 }
503 });
504 }
505
506
511 private static void notifyListenerWithStackOverFlowProtection(final EventExecutor executor,
512 final Future<?> future,
513 final GenericFutureListener<?> listener) {
514 if (executor.inEventLoop()) {
515 final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
516 final int stackDepth = threadLocals.futureListenerStackDepth();
517 if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
518 threadLocals.setFutureListenerStackDepth(stackDepth + 1);
519 try {
520 notifyListener0(future, listener);
521 } finally {
522 threadLocals.setFutureListenerStackDepth(stackDepth);
523 }
524 return;
525 }
526 }
527
528 safeExecute(executor, new Runnable() {
529 @Override
530 public void run() {
531 notifyListener0(future, listener);
532 }
533 });
534 }
535
536 private void notifyListenersNow() {
537 Object listeners;
538 synchronized (this) {
539
540 if (notifyingListeners || this.listeners == null) {
541 return;
542 }
543 notifyingListeners = true;
544 listeners = this.listeners;
545 this.listeners = null;
546 }
547 for (;;) {
548 if (listeners instanceof DefaultFutureListeners) {
549 notifyListeners0((DefaultFutureListeners) listeners);
550 } else {
551 notifyListener0(this, (GenericFutureListener<?>) listeners);
552 }
553 synchronized (this) {
554 if (this.listeners == null) {
555
556
557 notifyingListeners = false;
558 return;
559 }
560 listeners = this.listeners;
561 this.listeners = null;
562 }
563 }
564 }
565
566 private void notifyListeners0(DefaultFutureListeners listeners) {
567 GenericFutureListener<?>[] a = listeners.listeners();
568 int size = listeners.size();
569 for (int i = 0; i < size; i ++) {
570 notifyListener0(this, a[i]);
571 }
572 }
573
574 @SuppressWarnings({ "unchecked", "rawtypes" })
575 private static void notifyListener0(Future future, GenericFutureListener l) {
576 try {
577 l.operationComplete(future);
578 } catch (Throwable t) {
579 if (logger.isWarnEnabled()) {
580 logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
581 }
582 }
583 }
584
585 private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
586 if (listeners == null) {
587 listeners = listener;
588 } else if (listeners instanceof DefaultFutureListeners) {
589 ((DefaultFutureListeners) listeners).add(listener);
590 } else {
591 listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener);
592 }
593 }
594
595 private void removeListener0(GenericFutureListener<? extends Future<? super V>> listener) {
596 if (listeners instanceof DefaultFutureListeners) {
597 ((DefaultFutureListeners) listeners).remove(listener);
598 } else if (listeners == listener) {
599 listeners = null;
600 }
601 }
602
603 private boolean setSuccess0(V result) {
604 return setValue0(result == null ? SUCCESS : result);
605 }
606
607 private boolean setFailure0(Throwable cause) {
608 return setValue0(new CauseHolder(checkNotNull(cause, "cause")));
609 }
610
611 private boolean setValue0(Object objResult) {
612 if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
613 RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
614 if (checkNotifyWaiters()) {
615 notifyListeners();
616 }
617 return true;
618 }
619 return false;
620 }
621
622
626 private synchronized boolean checkNotifyWaiters() {
627 if (waiters > 0) {
628 notifyAll();
629 }
630 return listeners != null;
631 }
632
633 private void incWaiters() {
634 if (waiters == Short.MAX_VALUE) {
635 throw new IllegalStateException("too many waiters: " + this);
636 }
637 ++waiters;
638 }
639
640 private void decWaiters() {
641 --waiters;
642 }
643
644 private void rethrowIfFailed() {
645 Throwable cause = cause();
646 if (cause == null) {
647 return;
648 }
649
650 PlatformDependent.throwException(cause);
651 }
652
653 private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {
654 if (isDone()) {
655 return true;
656 }
657
658 if (timeoutNanos <= 0) {
659 return isDone();
660 }
661
662 if (interruptable && Thread.interrupted()) {
663 throw new InterruptedException(toString());
664 }
665
666 checkDeadLock();
667
668 long startTime = System.nanoTime();
669 long waitTime = timeoutNanos;
670 boolean interrupted = false;
671 try {
672 for (;;) {
673 synchronized (this) {
674 if (isDone()) {
675 return true;
676 }
677 incWaiters();
678 try {
679 wait(waitTime / 1000000, (int) (waitTime % 1000000));
680 } catch (InterruptedException e) {
681 if (interruptable) {
682 throw e;
683 } else {
684 interrupted = true;
685 }
686 } finally {
687 decWaiters();
688 }
689 }
690 if (isDone()) {
691 return true;
692 } else {
693 waitTime = timeoutNanos - (System.nanoTime() - startTime);
694 if (waitTime <= 0) {
695 return isDone();
696 }
697 }
698 }
699 } finally {
700 if (interrupted) {
701 Thread.currentThread().interrupt();
702 }
703 }
704 }
705
706
716 @SuppressWarnings("unchecked")
717 void notifyProgressiveListeners(final long progress, final long total) {
718 final Object listeners = progressiveListeners();
719 if (listeners == null) {
720 return;
721 }
722
723 final ProgressiveFuture<V> self = (ProgressiveFuture<V>) this;
724
725 EventExecutor executor = executor();
726 if (executor.inEventLoop()) {
727 if (listeners instanceof GenericProgressiveFutureListener[]) {
728 notifyProgressiveListeners0(
729 self, (GenericProgressiveFutureListener<?>[]) listeners, progress, total);
730 } else {
731 notifyProgressiveListener0(
732 self, (GenericProgressiveFutureListener<ProgressiveFuture<V>>) listeners, progress, total);
733 }
734 } else {
735 if (listeners instanceof GenericProgressiveFutureListener[]) {
736 final GenericProgressiveFutureListener<?>[] array =
737 (GenericProgressiveFutureListener<?>[]) listeners;
738 safeExecute(executor, new Runnable() {
739 @Override
740 public void run() {
741 notifyProgressiveListeners0(self, array, progress, total);
742 }
743 });
744 } else {
745 final GenericProgressiveFutureListener<ProgressiveFuture<V>> l =
746 (GenericProgressiveFutureListener<ProgressiveFuture<V>>) listeners;
747 safeExecute(executor, new Runnable() {
748 @Override
749 public void run() {
750 notifyProgressiveListener0(self, l, progress, total);
751 }
752 });
753 }
754 }
755 }
756
757
761 private synchronized Object progressiveListeners() {
762 Object listeners = this.listeners;
763 if (listeners == null) {
764
765 return null;
766 }
767
768 if (listeners instanceof DefaultFutureListeners) {
769
770 DefaultFutureListeners dfl = (DefaultFutureListeners) listeners;
771 int progressiveSize = dfl.progressiveSize();
772 switch (progressiveSize) {
773 case 0:
774 return null;
775 case 1:
776 for (GenericFutureListener<?> l: dfl.listeners()) {
777 if (l instanceof GenericProgressiveFutureListener) {
778 return l;
779 }
780 }
781 return null;
782 }
783
784 GenericFutureListener<?>[] array = dfl.listeners();
785 GenericProgressiveFutureListener<?>[] copy = new GenericProgressiveFutureListener[progressiveSize];
786 for (int i = 0, j = 0; j < progressiveSize; i ++) {
787 GenericFutureListener<?> l = array[i];
788 if (l instanceof GenericProgressiveFutureListener) {
789 copy[j ++] = (GenericProgressiveFutureListener<?>) l;
790 }
791 }
792
793 return copy;
794 } else if (listeners instanceof GenericProgressiveFutureListener) {
795 return listeners;
796 } else {
797
798 return null;
799 }
800 }
801
802 private static void notifyProgressiveListeners0(
803 ProgressiveFuture<?> future, GenericProgressiveFutureListener<?>[] listeners, long progress, long total) {
804 for (GenericProgressiveFutureListener<?> l: listeners) {
805 if (l == null) {
806 break;
807 }
808 notifyProgressiveListener0(future, l, progress, total);
809 }
810 }
811
812 @SuppressWarnings({ "unchecked", "rawtypes" })
813 private static void notifyProgressiveListener0(
814 ProgressiveFuture future, GenericProgressiveFutureListener l, long progress, long total) {
815 try {
816 l.operationProgressed(future, progress, total);
817 } catch (Throwable t) {
818 if (logger.isWarnEnabled()) {
819 logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationProgressed()", t);
820 }
821 }
822 }
823
824 private static boolean isCancelled0(Object result) {
825 return result instanceof CauseHolder && ((CauseHolder) result).cause instanceof CancellationException;
826 }
827
828 private static boolean isDone0(Object result) {
829 return result != null && result != UNCANCELLABLE;
830 }
831
832 private static final class CauseHolder {
833 final Throwable cause;
834 CauseHolder(Throwable cause) {
835 this.cause = cause;
836 }
837 }
838
839 private static void safeExecute(EventExecutor executor, Runnable task) {
840 try {
841 executor.execute(task);
842 } catch (Throwable t) {
843 rejectedExecutionLogger.error("Failed to submit a listener notification task. Event loop shut down?", t);
844 }
845 }
846 }
847