Index: sdk/lib/async/stream_pipe.dart |
diff --git a/sdk/lib/async/stream_pipe.dart b/sdk/lib/async/stream_pipe.dart |
index cc0127ee50c9c36d892fd68744ba43005198e275..692deea8b396f054c50b4c3500e261360b8340a2 100644 |
--- a/sdk/lib/async/stream_pipe.dart |
+++ b/sdk/lib/async/stream_pipe.dart |
@@ -26,12 +26,35 @@ _runUserCode(userCode(), |
} |
} |
-/** Helper function to make an onError argument to [_runUserCode]. */ |
-_cancelAndError(StreamSubscription subscription, _Future future) => |
- (error, StackTrace stackTrace) { |
- subscription.cancel(); |
+/** Helper function to cancel a subscription and wait for the potential future, |
+ before completing with an error. */ |
+void _cancelAndError(StreamSubscription subscription, |
+ _Future future, |
+ error, |
+ StackTrace stackTrace) { |
+ var cancelFuture = subscription.cancel(); |
+ if (cancelFuture is Future) { |
+ cancelFuture.whenComplete(() => future._completeError(error, stackTrace)); |
+ } else { |
future._completeError(error, stackTrace); |
- }; |
+ } |
+} |
+ |
+/** Helper function to make an onError argument to [_runUserCode]. */ |
+_cancelAndErrorClosure(StreamSubscription subscription, _Future future) => |
+ ((error, StackTrace stackTrace) => _cancelAndError( |
+ subscription, future, error, stackTrace)); |
+ |
+/** Helper function to cancel a subscription and wait for the potential future, |
+ before completing with a value. */ |
+void _cancelAndValue(StreamSubscription subscription, _Future future, value) { |
+ var cancelFuture = subscription.cancel(); |
+ if (cancelFuture is Future) { |
+ cancelFuture.whenComplete(() => future._complete(value)); |
+ } else { |
+ future._complete(value); |
+ } |
+} |
/** |