| Index: sdk/lib/async/stream.dart
|
| diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart
|
| index ab31d16a5dbf6837bf9d648fa6500a7a0ef1bfde..6672f475a9373b631318cea00d8d35dc2cd5c100 100644
|
| --- a/sdk/lib/async/stream.dart
|
| +++ b/sdk/lib/async/stream.dart
|
| @@ -336,7 +336,7 @@ abstract class Stream<T> {
|
| if (seenFirst) {
|
| _runUserCode(() => combine(value, element),
|
| (T newValue) { value = newValue; },
|
| - _cancelAndError(subscription, result));
|
| + _cancelAndErrorClosure(subscription, result));
|
| } else {
|
| value = element;
|
| seenFirst = true;
|
| @@ -365,7 +365,7 @@ abstract class Stream<T> {
|
| _runUserCode(
|
| () => combine(value, element),
|
| (newValue) { value = newValue; },
|
| - _cancelAndError(subscription, result)
|
| + _cancelAndErrorClosure(subscription, result)
|
| );
|
| },
|
| onError: (e, st) {
|
| @@ -402,8 +402,7 @@ abstract class Stream<T> {
|
| try {
|
| buffer.write(element);
|
| } catch (e, s) {
|
| - subscription.cancel();
|
| - result._completeError(_asyncError(e, s));
|
| + _cancelAndError(subscription, result, _asyncError(e, s), s);
|
| }
|
| },
|
| onError: (e) {
|
| @@ -431,11 +430,10 @@ abstract class Stream<T> {
|
| () => (element == needle),
|
| (bool isMatch) {
|
| if (isMatch) {
|
| - subscription.cancel();
|
| - future._complete(true);
|
| + _cancelAndValue(subscription, future, true);
|
| }
|
| },
|
| - _cancelAndError(subscription, future)
|
| + _cancelAndErrorClosure(subscription, future)
|
| );
|
| },
|
| onError: future._completeError,
|
| @@ -461,7 +459,7 @@ abstract class Stream<T> {
|
| _runUserCode(
|
| () => action(element),
|
| (_) {},
|
| - _cancelAndError(subscription, future)
|
| + _cancelAndErrorClosure(subscription, future)
|
| );
|
| },
|
| onError: future._completeError,
|
| @@ -487,11 +485,10 @@ abstract class Stream<T> {
|
| () => test(element),
|
| (bool isMatch) {
|
| if (!isMatch) {
|
| - subscription.cancel();
|
| - future._complete(false);
|
| + _cancelAndValue(subscription, future, false);
|
| }
|
| },
|
| - _cancelAndError(subscription, future)
|
| + _cancelAndErrorClosure(subscription, future)
|
| );
|
| },
|
| onError: future._completeError,
|
| @@ -517,11 +514,10 @@ abstract class Stream<T> {
|
| () => test(element),
|
| (bool isMatch) {
|
| if (isMatch) {
|
| - subscription.cancel();
|
| - future._complete(true);
|
| + _cancelAndValue(subscription, future, true);
|
| }
|
| },
|
| - _cancelAndError(subscription, future)
|
| + _cancelAndErrorClosure(subscription, future)
|
| );
|
| },
|
| onError: future._completeError,
|
| @@ -553,8 +549,7 @@ abstract class Stream<T> {
|
| StreamSubscription subscription;
|
| subscription = this.listen(
|
| (_) {
|
| - subscription.cancel();
|
| - future._complete(false);
|
| + _cancelAndValue(subscription, future, false);
|
| },
|
| onError: future._completeError,
|
| onDone: () {
|
| @@ -686,9 +681,7 @@ abstract class Stream<T> {
|
| StreamSubscription subscription;
|
| subscription = this.listen(
|
| (T value) {
|
| - subscription.cancel();
|
| - future._complete(value);
|
| - return;
|
| + _cancelAndValue(subscription, future, value);
|
| },
|
| onError: future._completeError,
|
| onDone: () {
|
| @@ -742,10 +735,9 @@ abstract class Stream<T> {
|
| subscription = this.listen(
|
| (T value) {
|
| if (foundResult) {
|
| - subscription.cancel();
|
| // This is the second element we get.
|
| Error error = new StateError("More than one element");
|
| - future._completeError(error);
|
| + _cancelAndError(subscription, future, error, null);
|
| return;
|
| }
|
| foundResult = true;
|
| @@ -786,11 +778,10 @@ abstract class Stream<T> {
|
| () => test(value),
|
| (bool isMatch) {
|
| if (isMatch) {
|
| - subscription.cancel();
|
| - future._complete(value);
|
| + _cancelAndValue(subscription, future, value);
|
| }
|
| },
|
| - _cancelAndError(subscription, future)
|
| + _cancelAndErrorClosure(subscription, future)
|
| );
|
| },
|
| onError: future._completeError,
|
| @@ -827,7 +818,7 @@ abstract class Stream<T> {
|
| result = value;
|
| }
|
| },
|
| - _cancelAndError(subscription, future)
|
| + _cancelAndErrorClosure(subscription, future)
|
| );
|
| },
|
| onError: future._completeError,
|
| @@ -864,16 +855,18 @@ abstract class Stream<T> {
|
| (bool isMatch) {
|
| if (isMatch) {
|
| if (foundResult) {
|
| - subscription.cancel();
|
| - future._completeError(
|
| - new StateError('Multiple matches for "single"'));
|
| + _cancelAndError(
|
| + subscription,
|
| + future,
|
| + new StateError('Multiple matches for "single"'),
|
| + null);
|
| return;
|
| }
|
| foundResult = true;
|
| result = value;
|
| }
|
| },
|
| - _cancelAndError(subscription, future)
|
| + _cancelAndErrorClosure(subscription, future)
|
| );
|
| },
|
| onError: future._completeError,
|
| @@ -906,8 +899,7 @@ abstract class Stream<T> {
|
| subscription = this.listen(
|
| (T value) {
|
| if (index == 0) {
|
| - subscription.cancel();
|
| - future._complete(value);
|
| + _cancelAndValue(subscription, future, value);
|
| return;
|
| }
|
| index -= 1;
|
| @@ -935,8 +927,11 @@ abstract class StreamSubscription<T> {
|
| *
|
| * If an event is currently firing, this unsubscription will only
|
| * take effect after all subscribers have received the current event.
|
| + *
|
| + * Returns a future if the cancel-operation is not completed synchronously.
|
| + * Otherwise returns `null`.
|
| */
|
| - void cancel();
|
| + Future cancel();
|
|
|
| /** Set or override the data event handler of this subscription. */
|
| void onData(void handleData(T data));
|
| @@ -1220,6 +1215,9 @@ abstract class StreamIterator<T> {
|
| * If you need to stop listening for values before the stream iterator is
|
| * automatically closed, you must call [cancel] to ensure that the stream
|
| * is properly closed.
|
| + *
|
| + * Returns a future if the cancel-operation is not completed synchronously.
|
| + * Otherwise returns `null`.
|
| */
|
| - void cancel();
|
| + Future cancel();
|
| }
|
|
|