Index: sdk/lib/async/stream.dart |
diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart |
index ab31d16a5dbf6837bf9d648fa6500a7a0ef1bfde..6672f475a9373b631318cea00d8d35dc2cd5c100 100644 |
--- a/sdk/lib/async/stream.dart |
+++ b/sdk/lib/async/stream.dart |
@@ -336,7 +336,7 @@ abstract class Stream<T> { |
if (seenFirst) { |
_runUserCode(() => combine(value, element), |
(T newValue) { value = newValue; }, |
- _cancelAndError(subscription, result)); |
+ _cancelAndErrorClosure(subscription, result)); |
} else { |
value = element; |
seenFirst = true; |
@@ -365,7 +365,7 @@ abstract class Stream<T> { |
_runUserCode( |
() => combine(value, element), |
(newValue) { value = newValue; }, |
- _cancelAndError(subscription, result) |
+ _cancelAndErrorClosure(subscription, result) |
); |
}, |
onError: (e, st) { |
@@ -402,8 +402,7 @@ abstract class Stream<T> { |
try { |
buffer.write(element); |
} catch (e, s) { |
- subscription.cancel(); |
- result._completeError(_asyncError(e, s)); |
+ _cancelAndError(subscription, result, _asyncError(e, s), s); |
} |
}, |
onError: (e) { |
@@ -431,11 +430,10 @@ abstract class Stream<T> { |
() => (element == needle), |
(bool isMatch) { |
if (isMatch) { |
- subscription.cancel(); |
- future._complete(true); |
+ _cancelAndValue(subscription, future, true); |
} |
}, |
- _cancelAndError(subscription, future) |
+ _cancelAndErrorClosure(subscription, future) |
); |
}, |
onError: future._completeError, |
@@ -461,7 +459,7 @@ abstract class Stream<T> { |
_runUserCode( |
() => action(element), |
(_) {}, |
- _cancelAndError(subscription, future) |
+ _cancelAndErrorClosure(subscription, future) |
); |
}, |
onError: future._completeError, |
@@ -487,11 +485,10 @@ abstract class Stream<T> { |
() => test(element), |
(bool isMatch) { |
if (!isMatch) { |
- subscription.cancel(); |
- future._complete(false); |
+ _cancelAndValue(subscription, future, false); |
} |
}, |
- _cancelAndError(subscription, future) |
+ _cancelAndErrorClosure(subscription, future) |
); |
}, |
onError: future._completeError, |
@@ -517,11 +514,10 @@ abstract class Stream<T> { |
() => test(element), |
(bool isMatch) { |
if (isMatch) { |
- subscription.cancel(); |
- future._complete(true); |
+ _cancelAndValue(subscription, future, true); |
} |
}, |
- _cancelAndError(subscription, future) |
+ _cancelAndErrorClosure(subscription, future) |
); |
}, |
onError: future._completeError, |
@@ -553,8 +549,7 @@ abstract class Stream<T> { |
StreamSubscription subscription; |
subscription = this.listen( |
(_) { |
- subscription.cancel(); |
- future._complete(false); |
+ _cancelAndValue(subscription, future, false); |
}, |
onError: future._completeError, |
onDone: () { |
@@ -686,9 +681,7 @@ abstract class Stream<T> { |
StreamSubscription subscription; |
subscription = this.listen( |
(T value) { |
- subscription.cancel(); |
- future._complete(value); |
- return; |
+ _cancelAndValue(subscription, future, value); |
}, |
onError: future._completeError, |
onDone: () { |
@@ -742,10 +735,9 @@ abstract class Stream<T> { |
subscription = this.listen( |
(T value) { |
if (foundResult) { |
- subscription.cancel(); |
// This is the second element we get. |
Error error = new StateError("More than one element"); |
- future._completeError(error); |
+ _cancelAndError(subscription, future, error, null); |
return; |
} |
foundResult = true; |
@@ -786,11 +778,10 @@ abstract class Stream<T> { |
() => test(value), |
(bool isMatch) { |
if (isMatch) { |
- subscription.cancel(); |
- future._complete(value); |
+ _cancelAndValue(subscription, future, value); |
} |
}, |
- _cancelAndError(subscription, future) |
+ _cancelAndErrorClosure(subscription, future) |
); |
}, |
onError: future._completeError, |
@@ -827,7 +818,7 @@ abstract class Stream<T> { |
result = value; |
} |
}, |
- _cancelAndError(subscription, future) |
+ _cancelAndErrorClosure(subscription, future) |
); |
}, |
onError: future._completeError, |
@@ -864,16 +855,18 @@ abstract class Stream<T> { |
(bool isMatch) { |
if (isMatch) { |
if (foundResult) { |
- subscription.cancel(); |
- future._completeError( |
- new StateError('Multiple matches for "single"')); |
+ _cancelAndError( |
+ subscription, |
+ future, |
+ new StateError('Multiple matches for "single"'), |
+ null); |
return; |
} |
foundResult = true; |
result = value; |
} |
}, |
- _cancelAndError(subscription, future) |
+ _cancelAndErrorClosure(subscription, future) |
); |
}, |
onError: future._completeError, |
@@ -906,8 +899,7 @@ abstract class Stream<T> { |
subscription = this.listen( |
(T value) { |
if (index == 0) { |
- subscription.cancel(); |
- future._complete(value); |
+ _cancelAndValue(subscription, future, value); |
return; |
} |
index -= 1; |
@@ -935,8 +927,11 @@ abstract class StreamSubscription<T> { |
* |
* If an event is currently firing, this unsubscription will only |
* take effect after all subscribers have received the current event. |
+ * |
+ * Returns a future if the cancel-operation is not completed synchronously. |
+ * Otherwise returns `null`. |
*/ |
- void cancel(); |
+ Future cancel(); |
/** Set or override the data event handler of this subscription. */ |
void onData(void handleData(T data)); |
@@ -1220,6 +1215,9 @@ abstract class StreamIterator<T> { |
* If you need to stop listening for values before the stream iterator is |
* automatically closed, you must call [cancel] to ensure that the stream |
* is properly closed. |
+ * |
+ * Returns a future if the cancel-operation is not completed synchronously. |
+ * Otherwise returns `null`. |
*/ |
- void cancel(); |
+ Future cancel(); |
} |