1 /*
2  * Copyright 2013 The Netty Project
3  *
4  * The Netty Project licenses this file to you under the Apache License,
5  * version 2.0 (the "License"); you may not use this file except in compliance
6  * with the License. You may obtain a copy of the License at:
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13  * License for the specific language governing permissions and limitations
14  * under the License.
15  */

16 package io.netty.util.concurrent;
17
18 import io.netty.util.internal.InternalThreadLocalMap;
19 import io.netty.util.internal.PlatformDependent;
20 import io.netty.util.internal.StringUtil;
21 import io.netty.util.internal.SystemPropertyUtil;
22 import io.netty.util.internal.ThrowableUtil;
23 import io.netty.util.internal.logging.InternalLogger;
24 import io.netty.util.internal.logging.InternalLoggerFactory;
25
26 import java.util.concurrent.CancellationException;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.TimeoutException;
30 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
31
32 import static io.netty.util.internal.ObjectUtil.checkNotNull;
33 import static java.util.concurrent.TimeUnit.MILLISECONDS;
34
35 public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
36     private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultPromise.class);
37     private static final InternalLogger rejectedExecutionLogger =
38             InternalLoggerFactory.getInstance(DefaultPromise.class.getName() + ".rejectedExecution");
39     private static final int MAX_LISTENER_STACK_DEPTH = Math.min(8,
40             SystemPropertyUtil.getInt("io.netty.defaultPromise.maxListenerStackDepth", 8));
41     @SuppressWarnings("rawtypes")
42     private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER =
43             AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class"result");
44     private static final Object SUCCESS = new Object();
45     private static final Object UNCANCELLABLE = new Object();
46     private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(ThrowableUtil.unknownStackTrace(
47             new CancellationException(), DefaultPromise.class"cancel(...)"));
48     private static final StackTraceElement[] CANCELLATION_STACK = CANCELLATION_CAUSE_HOLDER.cause.getStackTrace();
49
50     private volatile Object result;
51     private final EventExecutor executor;
52     /**
53      * One or more listeners. Can be a {@link GenericFutureListener} or a {@link DefaultFutureListeners}.
54      * If {@code null}, it means either 1) no listeners were added yet or 2) all listeners were notified.
55      *
56      * Threading - synchronized(this). We must support adding listeners when there is no EventExecutor.
57      */

58     private Object listeners;
59     /**
60      * Threading - synchronized(this). We are required to hold the monitor to use Java's underlying wait()/notifyAll().
61      */

62     private short waiters;
63
64     /**
65      * Threading - synchronized(this). We must prevent concurrent notification and FIFO listener notification if the
66      * executor changes.
67      */

68     private boolean notifyingListeners;
69
70     /**
71      * Creates a new instance.
72      *
73      * It is preferable to use {@link EventExecutor#newPromise()} to create a new promise
74      *
75      * @param executor
76      *        the {@link EventExecutor} which is used to notify the promise once it is complete.
77      *        It is assumed this executor will protect against {@link StackOverflowError} exceptions.
78      *        The executor may be used to avoid {@link StackOverflowError} by executing a {@link Runnable} if the stack
79      *        depth exceeds a threshold.
80      *
81      */

82     public DefaultPromise(EventExecutor executor) {
83         this.executor = checkNotNull(executor, "executor");
84     }
85
86     /**
87      * See {@link #executor()} for expectations of the executor.
88      */

89     protected DefaultPromise() {
90         // only for subclasses
91         executor = null;
92     }
93
94     @Override
95     public Promise<V> setSuccess(V result) {
96         if (setSuccess0(result)) {
97             return this;
98         }
99         throw new IllegalStateException("complete already: " + this);
100     }
101
102     @Override
103     public boolean trySuccess(V result) {
104         return setSuccess0(result);
105     }
106
107     @Override
108     public Promise<V> setFailure(Throwable cause) {
109         if (setFailure0(cause)) {
110             return this;
111         }
112         throw new IllegalStateException("complete already: " + this, cause);
113     }
114
115     @Override
116     public boolean tryFailure(Throwable cause) {
117         return setFailure0(cause);
118     }
119
120     @Override
121     public boolean setUncancellable() {
122         if (RESULT_UPDATER.compareAndSet(thisnull, UNCANCELLABLE)) {
123             return true;
124         }
125         Object result = this.result;
126         return !isDone0(result) || !isCancelled0(result);
127     }
128
129     @Override
130     public boolean isSuccess() {
131         Object result = this.result;
132         return result != null && result != UNCANCELLABLE && !(result instanceof CauseHolder);
133     }
134
135     @Override
136     public boolean isCancellable() {
137         return result == null;
138     }
139
140     private static final class LeanCancellationException extends CancellationException {
141         private static final long serialVersionUID = 2794674970981187807L;
142
143         @Override
144         public Throwable fillInStackTrace() {
145             setStackTrace(CANCELLATION_STACK);
146             return this;
147         }
148
149         @Override
150         public String toString() {
151             return CancellationException.class.getName();
152         }
153     }
154
155     @Override
156     public Throwable cause() {
157         return cause0(result);
158     }
159
160     private Throwable cause0(Object result) {
161         if (!(result instanceof CauseHolder)) {
162             return null;
163         }
164         if (result == CANCELLATION_CAUSE_HOLDER) {
165             CancellationException ce = new LeanCancellationException();
166             if (RESULT_UPDATER.compareAndSet(this, CANCELLATION_CAUSE_HOLDER, new CauseHolder(ce))) {
167                 return ce;
168             }
169             result = this.result;
170         }
171         return ((CauseHolder) result).cause;
172     }
173
174     @Override
175     public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
176         checkNotNull(listener, "listener");
177
178         synchronized (this) {
179             addListener0(listener);
180         }
181
182         if (isDone()) {
183             notifyListeners();
184         }
185
186         return this;
187     }
188
189     @Override
190     public Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {
191         checkNotNull(listeners, "listeners");
192
193         synchronized (this) {
194             for (GenericFutureListener<? extends Future<? super V>> listener : listeners) {
195                 if (listener == null) {
196                     break;
197                 }
198                 addListener0(listener);
199             }
200         }
201
202         if (isDone()) {
203             notifyListeners();
204         }
205
206         return this;
207     }
208
209     @Override
210     public Promise<V> removeListener(final GenericFutureListener<? extends Future<? super V>> listener) {
211         checkNotNull(listener, "listener");
212
213         synchronized (this) {
214             removeListener0(listener);
215         }
216
217         return this;
218     }
219
220     @Override
221     public Promise<V> removeListeners(final GenericFutureListener<? extends Future<? super V>>... listeners) {
222         checkNotNull(listeners, "listeners");
223
224         synchronized (this) {
225             for (GenericFutureListener<? extends Future<? super V>> listener : listeners) {
226                 if (listener == null) {
227                     break;
228                 }
229                 removeListener0(listener);
230             }
231         }
232
233         return this;
234     }
235
236     @Override
237     public Promise<V> await() throws InterruptedException {
238         if (isDone()) {
239             return this;
240         }
241
242         if (Thread.interrupted()) {
243             throw new InterruptedException(toString());
244         }
245
246         checkDeadLock();
247
248         synchronized (this) {
249             while (!isDone()) {
250                 incWaiters();
251                 try {
252                     wait();
253                 } finally {
254                     decWaiters();
255                 }
256             }
257         }
258         return this;
259     }
260
261     @Override
262     public Promise<V> awaitUninterruptibly() {
263         if (isDone()) {
264             return this;
265         }
266
267         checkDeadLock();
268
269         boolean interrupted = false;
270         synchronized (this) {
271             while (!isDone()) {
272                 incWaiters();
273                 try {
274                     wait();
275                 } catch (InterruptedException e) {
276                     // Interrupted while waiting.
277                     interrupted = true;
278                 } finally {
279                     decWaiters();
280                 }
281             }
282         }
283
284         if (interrupted) {
285             Thread.currentThread().interrupt();
286         }
287
288         return this;
289     }
290
291     @Override
292     public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
293         return await0(unit.toNanos(timeout), true);
294     }
295
296     @Override
297     public boolean await(long timeoutMillis) throws InterruptedException {
298         return await0(MILLISECONDS.toNanos(timeoutMillis), true);
299     }
300
301     @Override
302     public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
303         try {
304             return await0(unit.toNanos(timeout), false);
305         } catch (InterruptedException e) {
306             // Should not be raised at all.
307             throw new InternalError();
308         }
309     }
310
311     @Override
312     public boolean awaitUninterruptibly(long timeoutMillis) {
313         try {
314             return await0(MILLISECONDS.toNanos(timeoutMillis), false);
315         } catch (InterruptedException e) {
316             // Should not be raised at all.
317             throw new InternalError();
318         }
319     }
320
321     @SuppressWarnings("unchecked")
322     @Override
323     public V getNow() {
324         Object result = this.result;
325         if (result instanceof CauseHolder || result == SUCCESS || result == UNCANCELLABLE) {
326             return null;
327         }
328         return (V) result;
329     }
330
331     @SuppressWarnings("unchecked")
332     @Override
333     public V get() throws InterruptedException, ExecutionException {
334         Object result = this.result;
335         if (!isDone0(result)) {
336             await();
337             result = this.result;
338         }
339         if (result == SUCCESS || result == UNCANCELLABLE) {
340             return null;
341         }
342         Throwable cause = cause0(result);
343         if (cause == null) {
344             return (V) result;
345         }
346         if (cause instanceof CancellationException) {
347             throw (CancellationException) cause;
348         }
349         throw new ExecutionException(cause);
350     }
351
352     @SuppressWarnings("unchecked")
353     @Override
354     public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
355         Object result = this.result;
356         if (!isDone0(result)) {
357             if (!await(timeout, unit)) {
358                 throw new TimeoutException();
359             }
360             result = this.result;
361         }
362         if (result == SUCCESS || result == UNCANCELLABLE) {
363             return null;
364         }
365         Throwable cause = cause0(result);
366         if (cause == null) {
367             return (V) result;
368         }
369         if (cause instanceof CancellationException) {
370             throw (CancellationException) cause;
371         }
372         throw new ExecutionException(cause);
373     }
374
375     /**
376      * {@inheritDoc}
377      *
378      * @param mayInterruptIfRunning this value has no effect in this implementation.
379      */

380     @Override
381     public boolean cancel(boolean mayInterruptIfRunning) {
382         if (RESULT_UPDATER.compareAndSet(thisnull, CANCELLATION_CAUSE_HOLDER)) {
383             if (checkNotifyWaiters()) {
384                 notifyListeners();
385             }
386             return true;
387         }
388         return false;
389     }
390
391     @Override
392     public boolean isCancelled() {
393         return isCancelled0(result);
394     }
395
396     @Override
397     public boolean isDone() {
398         return isDone0(result);
399     }
400
401     @Override
402     public Promise<V> sync() throws InterruptedException {
403         await();
404         rethrowIfFailed();
405         return this;
406     }
407
408     @Override
409     public Promise<V> syncUninterruptibly() {
410         awaitUninterruptibly();
411         rethrowIfFailed();
412         return this;
413     }
414
415     @Override
416     public String toString() {
417         return toStringBuilder().toString();
418     }
419
420     protected StringBuilder toStringBuilder() {
421         StringBuilder buf = new StringBuilder(64)
422                 .append(StringUtil.simpleClassName(this))
423                 .append('@')
424                 .append(Integer.toHexString(hashCode()));
425
426         Object result = this.result;
427         if (result == SUCCESS) {
428             buf.append("(success)");
429         } else if (result == UNCANCELLABLE) {
430             buf.append("(uncancellable)");
431         } else if (result instanceof CauseHolder) {
432             buf.append("(failure: ")
433                     .append(((CauseHolder) result).cause)
434                     .append(')');
435         } else if (result != null) {
436             buf.append("(success: ")
437                     .append(result)
438                     .append(')');
439         } else {
440             buf.append("(incomplete)");
441         }
442
443         return buf;
444     }
445
446     /**
447      * Get the executor used to notify listeners when this promise is complete.
448      * <p>
449      * It is assumed this executor will protect against {@link StackOverflowError} exceptions.
450      * The executor may be used to avoid {@link StackOverflowError} by executing a {@link Runnable} if the stack
451      * depth exceeds a threshold.
452      * @return The executor used to notify listeners when this promise is complete.
453      */

454     protected EventExecutor executor() {
455         return executor;
456     }
457
458     protected void checkDeadLock() {
459         EventExecutor e = executor();
460         if (e != null && e.inEventLoop()) {
461             throw new BlockingOperationException(toString());
462         }
463     }
464
465     /**
466      * Notify a listener that a future has completed.
467      * <p>
468      * This method has a fixed depth of {@link #MAX_LISTENER_STACK_DEPTH} that will limit recursion to prevent
469      * {@link StackOverflowError} and will stop notifying listeners added after this threshold is exceeded.
470      * @param eventExecutor the executor to use to notify the listener {@code listener}.
471      * @param future the future that is complete.
472      * @param listener the listener to notify.
473      */

474     protected static void notifyListener(
475             EventExecutor eventExecutor, final Future<?> future, final GenericFutureListener<?> listener) {
476         notifyListenerWithStackOverFlowProtection(
477                 checkNotNull(eventExecutor, "eventExecutor"),
478                 checkNotNull(future, "future"),
479                 checkNotNull(listener, "listener"));
480     }
481
482     private void notifyListeners() {
483         EventExecutor executor = executor();
484         if (executor.inEventLoop()) {
485             final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
486             final int stackDepth = threadLocals.futureListenerStackDepth();
487             if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
488                 threadLocals.setFutureListenerStackDepth(stackDepth + 1);
489                 try {
490                     notifyListenersNow();
491                 } finally {
492                     threadLocals.setFutureListenerStackDepth(stackDepth);
493                 }
494                 return;
495             }
496         }
497
498         safeExecute(executor, new Runnable() {
499             @Override
500             public void run() {
501                 notifyListenersNow();
502             }
503         });
504     }
505
506     /**
507      * The logic in this method should be identical to {@link #notifyListeners()} but
508      * cannot share code because the listener(s) cannot be cached for an instance of {@link DefaultPromise} since the
509      * listener(s) may be changed and is protected by a synchronized operation.
510      */

511     private static void notifyListenerWithStackOverFlowProtection(final EventExecutor executor,
512                                                                   final Future<?> future,
513                                                                   final GenericFutureListener<?> listener) {
514         if (executor.inEventLoop()) {
515             final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
516             final int stackDepth = threadLocals.futureListenerStackDepth();
517             if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
518                 threadLocals.setFutureListenerStackDepth(stackDepth + 1);
519                 try {
520                     notifyListener0(future, listener);
521                 } finally {
522                     threadLocals.setFutureListenerStackDepth(stackDepth);
523                 }
524                 return;
525             }
526         }
527
528         safeExecute(executor, new Runnable() {
529             @Override
530             public void run() {
531                 notifyListener0(future, listener);
532             }
533         });
534     }
535
536     private void notifyListenersNow() {
537         Object listeners;
538         synchronized (this) {
539             // Only proceed if there are listeners to notify and we are not already notifying listeners.
540             if (notifyingListeners || this.listeners == null) {
541                 return;
542             }
543             notifyingListeners = true;
544             listeners = this.listeners;
545             this.listeners = null;
546         }
547         for (;;) {
548             if (listeners instanceof DefaultFutureListeners) {
549                 notifyListeners0((DefaultFutureListeners) listeners);
550             } else {
551                 notifyListener0(this, (GenericFutureListener<?>) listeners);
552             }
553             synchronized (this) {
554                 if (this.listeners == null) {
555                     // Nothing can throw from within this method, so setting notifyingListeners back to false does not
556                     // need to be in a finally block.
557                     notifyingListeners = false;
558                     return;
559                 }
560                 listeners = this.listeners;
561                 this.listeners = null;
562             }
563         }
564     }
565
566     private void notifyListeners0(DefaultFutureListeners listeners) {
567         GenericFutureListener<?>[] a = listeners.listeners();
568         int size = listeners.size();
569         for (int i = 0; i < size; i ++) {
570             notifyListener0(this, a[i]);
571         }
572     }
573
574     @SuppressWarnings({ "unchecked""rawtypes" })
575     private static void notifyListener0(Future future, GenericFutureListener l) {
576         try {
577             l.operationComplete(future);
578         } catch (Throwable t) {
579             if (logger.isWarnEnabled()) {
580                 logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
581             }
582         }
583     }
584
585     private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
586         if (listeners == null) {
587             listeners = listener;
588         } else if (listeners instanceof DefaultFutureListeners) {
589             ((DefaultFutureListeners) listeners).add(listener);
590         } else {
591             listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener);
592         }
593     }
594
595     private void removeListener0(GenericFutureListener<? extends Future<? super V>> listener) {
596         if (listeners instanceof DefaultFutureListeners) {
597             ((DefaultFutureListeners) listeners).remove(listener);
598         } else if (listeners == listener) {
599             listeners = null;
600         }
601     }
602
603     private boolean setSuccess0(V result) {
604         return setValue0(result == null ? SUCCESS : result);
605     }
606
607     private boolean setFailure0(Throwable cause) {
608         return setValue0(new CauseHolder(checkNotNull(cause, "cause")));
609     }
610
611     private boolean setValue0(Object objResult) {
612         if (RESULT_UPDATER.compareAndSet(thisnull, objResult) ||
613             RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
614             if (checkNotifyWaiters()) {
615                 notifyListeners();
616             }
617             return true;
618         }
619         return false;
620     }
621
622     /**
623      * Check if there are any waiters and if so notify these.
624      * @return {@code trueif there are any listeners attached to the promise, {@code false} otherwise.
625      */

626     private synchronized boolean checkNotifyWaiters() {
627         if (waiters > 0) {
628             notifyAll();
629         }
630         return listeners != null;
631     }
632
633     private void incWaiters() {
634         if (waiters == Short.MAX_VALUE) {
635             throw new IllegalStateException("too many waiters: " + this);
636         }
637         ++waiters;
638     }
639
640     private void decWaiters() {
641         --waiters;
642     }
643
644     private void rethrowIfFailed() {
645         Throwable cause = cause();
646         if (cause == null) {
647             return;
648         }
649
650         PlatformDependent.throwException(cause);
651     }
652
653     private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {
654         if (isDone()) {
655             return true;
656         }
657
658         if (timeoutNanos <= 0) {
659             return isDone();
660         }
661
662         if (interruptable && Thread.interrupted()) {
663             throw new InterruptedException(toString());
664         }
665
666         checkDeadLock();
667
668         long startTime = System.nanoTime();
669         long waitTime = timeoutNanos;
670         boolean interrupted = false;
671         try {
672             for (;;) {
673                 synchronized (this) {
674                     if (isDone()) {
675                         return true;
676                     }
677                     incWaiters();
678                     try {
679                         wait(waitTime / 1000000, (int) (waitTime % 1000000));
680                     } catch (InterruptedException e) {
681                         if (interruptable) {
682                             throw e;
683                         } else {
684                             interrupted = true;
685                         }
686                     } finally {
687                         decWaiters();
688                     }
689                 }
690                 if (isDone()) {
691                     return true;
692                 } else {
693                     waitTime = timeoutNanos - (System.nanoTime() - startTime);
694                     if (waitTime <= 0) {
695                         return isDone();
696                     }
697                 }
698             }
699         } finally {
700             if (interrupted) {
701                 Thread.currentThread().interrupt();
702             }
703         }
704     }
705
706     /**
707      * Notify all progressive listeners.
708      * <p>
709      * No attempt is made to ensure notification order if multiple calls are made to this method before
710      * the original invocation completes.
711      * <p>
712      * This will do an iteration over all listeners to get all of type {@link GenericProgressiveFutureListener}s.
713      * @param progress the new progress.
714      * @param total the total progress.
715      */

716     @SuppressWarnings("unchecked")
717     void notifyProgressiveListeners(final long progress, final long total) {
718         final Object listeners = progressiveListeners();
719         if (listeners == null) {
720             return;
721         }
722
723         final ProgressiveFuture<V> self = (ProgressiveFuture<V>) this;
724
725         EventExecutor executor = executor();
726         if (executor.inEventLoop()) {
727             if (listeners instanceof GenericProgressiveFutureListener[]) {
728                 notifyProgressiveListeners0(
729                         self, (GenericProgressiveFutureListener<?>[]) listeners, progress, total);
730             } else {
731                 notifyProgressiveListener0(
732                         self, (GenericProgressiveFutureListener<ProgressiveFuture<V>>) listeners, progress, total);
733             }
734         } else {
735             if (listeners instanceof GenericProgressiveFutureListener[]) {
736                 final GenericProgressiveFutureListener<?>[] array =
737                         (GenericProgressiveFutureListener<?>[]) listeners;
738                 safeExecute(executor, new Runnable() {
739                     @Override
740                     public void run() {
741                         notifyProgressiveListeners0(self, array, progress, total);
742                     }
743                 });
744             } else {
745                 final GenericProgressiveFutureListener<ProgressiveFuture<V>> l =
746                         (GenericProgressiveFutureListener<ProgressiveFuture<V>>) listeners;
747                 safeExecute(executor, new Runnable() {
748                     @Override
749                     public void run() {
750                         notifyProgressiveListener0(self, l, progress, total);
751                     }
752                 });
753             }
754         }
755     }
756
757     /**
758      * Returns a {@link GenericProgressiveFutureListener}, an array of {@link GenericProgressiveFutureListener}, or
759      * {@code null}.
760      */

761     private synchronized Object progressiveListeners() {
762         Object listeners = this.listeners;
763         if (listeners == null) {
764             // No listeners added
765             return null;
766         }
767
768         if (listeners instanceof DefaultFutureListeners) {
769             // Copy DefaultFutureListeners into an array of listeners.
770             DefaultFutureListeners dfl = (DefaultFutureListeners) listeners;
771             int progressiveSize = dfl.progressiveSize();
772             switch (progressiveSize) {
773                 case 0:
774                     return null;
775                 case 1:
776                     for (GenericFutureListener<?> l: dfl.listeners()) {
777                         if (l instanceof GenericProgressiveFutureListener) {
778                             return l;
779                         }
780                     }
781                     return null;
782             }
783
784             GenericFutureListener<?>[] array = dfl.listeners();
785             GenericProgressiveFutureListener<?>[] copy = new GenericProgressiveFutureListener[progressiveSize];
786             for (int i = 0, j = 0; j < progressiveSize; i ++) {
787                 GenericFutureListener<?> l = array[i];
788                 if (l instanceof GenericProgressiveFutureListener) {
789                     copy[j ++] = (GenericProgressiveFutureListener<?>) l;
790                 }
791             }
792
793             return copy;
794         } else if (listeners instanceof GenericProgressiveFutureListener) {
795             return listeners;
796         } else {
797             // Only one listener was added and it's not a progressive listener.
798             return null;
799         }
800     }
801
802     private static void notifyProgressiveListeners0(
803             ProgressiveFuture<?> future, GenericProgressiveFutureListener<?>[] listeners, long progress, long total) {
804         for (GenericProgressiveFutureListener<?> l: listeners) {
805             if (l == null) {
806                 break;
807             }
808             notifyProgressiveListener0(future, l, progress, total);
809         }
810     }
811
812     @SuppressWarnings({ "unchecked""rawtypes" })
813     private static void notifyProgressiveListener0(
814             ProgressiveFuture future, GenericProgressiveFutureListener l, long progress, long total) {
815         try {
816             l.operationProgressed(future, progress, total);
817         } catch (Throwable t) {
818             if (logger.isWarnEnabled()) {
819                 logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationProgressed()", t);
820             }
821         }
822     }
823
824     private static boolean isCancelled0(Object result) {
825         return result instanceof CauseHolder && ((CauseHolder) result).cause instanceof CancellationException;
826     }
827
828     private static boolean isDone0(Object result) {
829         return result != null && result != UNCANCELLABLE;
830     }
831
832     private static final class CauseHolder {
833         final Throwable cause;
834         CauseHolder(Throwable cause) {
835             this.cause = cause;
836         }
837     }
838
839     private static void safeExecute(EventExecutor executor, Runnable task) {
840         try {
841             executor.execute(task);
842         } catch (Throwable t) {
843             rejectedExecutionLogger.error("Failed to submit a listener notification task. Event loop shut down?", t);
844         }
845     }
846 }
847