Chromium Code Reviews| Index: sdk/lib/async/stream.dart |
| diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart |
| index ab31d16a5dbf6837bf9d648fa6500a7a0ef1bfde..4b5e46fc42e853b3623b87f91d1076dbf1537c1a 100644 |
| --- a/sdk/lib/async/stream.dart |
| +++ b/sdk/lib/async/stream.dart |
| @@ -336,7 +336,7 @@ abstract class Stream<T> { |
| if (seenFirst) { |
| _runUserCode(() => combine(value, element), |
| (T newValue) { value = newValue; }, |
| - _cancelAndError(subscription, result)); |
| + _cancelAndErrorClosure(subscription, result)); |
| } else { |
| value = element; |
| seenFirst = true; |
| @@ -365,7 +365,7 @@ abstract class Stream<T> { |
| _runUserCode( |
| () => combine(value, element), |
| (newValue) { value = newValue; }, |
| - _cancelAndError(subscription, result) |
| + _cancelAndErrorClosure(subscription, result) |
| ); |
| }, |
| onError: (e, st) { |
| @@ -402,8 +402,7 @@ abstract class Stream<T> { |
| try { |
| buffer.write(element); |
| } catch (e, s) { |
| - subscription.cancel(); |
| - result._completeError(_asyncError(e, s)); |
| + _cancelAndError(subscription, result, _asyncError(e, s), s); |
| } |
| }, |
| onError: (e) { |
| @@ -431,11 +430,10 @@ abstract class Stream<T> { |
| () => (element == needle), |
| (bool isMatch) { |
| if (isMatch) { |
| - subscription.cancel(); |
| - future._complete(true); |
| + _cancelAndValue(subscription, future, true); |
| } |
| }, |
| - _cancelAndError(subscription, future) |
| + _cancelAndErrorClosure(subscription, future) |
| ); |
| }, |
| onError: future._completeError, |
| @@ -461,7 +459,7 @@ abstract class Stream<T> { |
| _runUserCode( |
| () => action(element), |
| (_) {}, |
| - _cancelAndError(subscription, future) |
| + _cancelAndErrorClosure(subscription, future) |
| ); |
| }, |
| onError: future._completeError, |
| @@ -487,11 +485,10 @@ abstract class Stream<T> { |
| () => test(element), |
| (bool isMatch) { |
| if (!isMatch) { |
| - subscription.cancel(); |
| - future._complete(false); |
| + _cancelAndValue(subscription, future, false); |
| } |
| }, |
| - _cancelAndError(subscription, future) |
| + _cancelAndErrorClosure(subscription, future) |
| ); |
| }, |
| onError: future._completeError, |
| @@ -517,11 +514,10 @@ abstract class Stream<T> { |
| () => test(element), |
| (bool isMatch) { |
| if (isMatch) { |
| - subscription.cancel(); |
| - future._complete(true); |
| + _cancelAndValue(subscription, future, true); |
| } |
| }, |
| - _cancelAndError(subscription, future) |
| + _cancelAndErrorClosure(subscription, future) |
| ); |
| }, |
| onError: future._completeError, |
| @@ -553,8 +549,7 @@ abstract class Stream<T> { |
| StreamSubscription subscription; |
| subscription = this.listen( |
| (_) { |
| - subscription.cancel(); |
| - future._complete(false); |
| + _cancelAndValue(subscription, future, false); |
| }, |
| onError: future._completeError, |
| onDone: () { |
| @@ -686,9 +681,7 @@ abstract class Stream<T> { |
| StreamSubscription subscription; |
| subscription = this.listen( |
| (T value) { |
| - subscription.cancel(); |
| - future._complete(value); |
| - return; |
| + _cancelAndValue(subscription, future, value); |
| }, |
| onError: future._completeError, |
| onDone: () { |
| @@ -742,10 +735,9 @@ abstract class Stream<T> { |
| subscription = this.listen( |
| (T value) { |
| if (foundResult) { |
| - subscription.cancel(); |
| // This is the second element we get. |
| Error error = new StateError("More than one element"); |
| - future._completeError(error); |
| + _cancelAndError(subscription, future, error, null); |
| return; |
| } |
| foundResult = true; |
| @@ -786,11 +778,10 @@ abstract class Stream<T> { |
| () => test(value), |
| (bool isMatch) { |
| if (isMatch) { |
| - subscription.cancel(); |
| - future._complete(value); |
| + _cancelAndValue(subscription, future, value); |
| } |
| }, |
| - _cancelAndError(subscription, future) |
| + _cancelAndErrorClosure(subscription, future) |
| ); |
| }, |
| onError: future._completeError, |
| @@ -827,7 +818,7 @@ abstract class Stream<T> { |
| result = value; |
| } |
| }, |
| - _cancelAndError(subscription, future) |
| + _cancelAndErrorClosure(subscription, future) |
| ); |
| }, |
| onError: future._completeError, |
| @@ -864,16 +855,18 @@ abstract class Stream<T> { |
| (bool isMatch) { |
| if (isMatch) { |
| if (foundResult) { |
| - subscription.cancel(); |
| - future._completeError( |
| - new StateError('Multiple matches for "single"')); |
| + _cancelAndError( |
| + subscription, |
| + future, |
| + new StateError('Multiple matches for "single"'), |
| + null); |
| return; |
| } |
| foundResult = true; |
| result = value; |
| } |
| }, |
| - _cancelAndError(subscription, future) |
| + _cancelAndErrorClosure(subscription, future) |
| ); |
| }, |
| onError: future._completeError, |
| @@ -906,8 +899,7 @@ abstract class Stream<T> { |
| subscription = this.listen( |
| (T value) { |
| if (index == 0) { |
| - subscription.cancel(); |
| - future._complete(value); |
| + _cancelAndValue(subscription, future, value); |
| return; |
| } |
| index -= 1; |
| @@ -935,8 +927,12 @@ abstract class StreamSubscription<T> { |
| * |
| * If an event is currently firing, this unsubscription will only |
| * take effect after all subscribers have received the current event. |
| + * |
| + * In case the cancel is asynchronous, the returned [Future] can be used to |
| + * wait for the operation to complete. If in doubt, wait for it. Note that |
|
floitsch
2013/10/16 14:43:44
Start sentence with "Returns".
===
Returns a futur
Anders Johnsen
2013/10/21 08:01:46
Done.
|
| + * cancel can return `null`, if the operation was immediate. |
| */ |
| - void cancel(); |
| + Future cancel(); |
| /** Set or override the data event handler of this subscription. */ |
| void onData(void handleData(T data)); |
| @@ -1220,6 +1216,10 @@ abstract class StreamIterator<T> { |
| * If you need to stop listening for values before the stream iterator is |
| * automatically closed, you must call [cancel] to ensure that the stream |
| * is properly closed. |
| + * |
|
floitsch
2013/10/16 14:43:44
ditto.
Anders Johnsen
2013/10/21 08:01:46
Done.
|
| + * In case the cancel is asynchronous, the returned [Future] can be used to |
| + * wait for the operation to complete. If in doubt, wait for it. Note that |
| + * cancel can return `null`, if the operation was immediate. |
| */ |
| - void cancel(); |
| + Future cancel(); |
| } |