Chromium Code Reviews| Index: sdk/lib/async/stream.dart |
| diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart |
| index ac9aa44f57100d188513095e9d91567aaa58f2cc..4a342a9d5a687698aab3363d96a88ece6ba3ae0f 100644 |
| --- a/sdk/lib/async/stream.dart |
| +++ b/sdk/lib/async/stream.dart |
| @@ -62,10 +62,10 @@ abstract class Stream<T> { |
| * If [unsubscribeOnError] is true, the subscription is ended when |
| * the first error is reported. The default is false. |
| */ |
| - StreamSubscription<T> subscribe({void onData(T event), |
| - void onError(AsyncError error), |
| - void onDone(), |
| - bool unsubscribeOnError}); |
| + StreamSubscription<T> listen(void onData(T event), |
| + { void onError(AsyncError error), |
| + void onDone(), |
| + bool unsubscribeOnError}); |
| /** |
| * Creates a new stream from this stream that discards some data events. |
| @@ -149,19 +149,23 @@ abstract class Stream<T> { |
| Future reduce(var initialValue, combine(var previous, T element)) { |
| Completer completer = new Completer(); |
| var value = initialValue; |
| - StreamSubscription subscription = this.subscribe(unsubscribeOnError: true); |
| - subscription..onData((T element) { |
| - try { |
| - value = combine(value, element); |
| - } catch (e, s) { |
| - subscription.unsubscribe(); |
| - completer.completeError(e, s); |
| - } |
| - })..onError((AsyncError e) { |
| - completer.completeError(e.error, e.stackTrace); |
| - })..onDone(() { |
| - completer.complete(value); |
| - }); |
| + StreamSubscription subscription; |
| + subscription = this.listen( |
| + (T element) { |
| + try { |
| + value = combine(value, element); |
| + } catch (e, s) { |
| + subscription.cancel(); |
| + completer.completeError(e, s); |
| + } |
| + }, |
| + unsubscribeOnError: true, |
| + onError: (AsyncError e) { |
| + completer.completeError(e.error, e.stackTrace); |
| + }, |
| + onDone: () { |
| + completer.complete(value); |
| + }); |
| return completer.future; |
| } |
| @@ -170,8 +174,8 @@ abstract class Stream<T> { |
| {void onError(AsyncError error), |
| bool unsubscribeOnError}) { |
| SignalCompleter completer = new SignalCompleter(); |
| - this.subscribe( |
| - onData: sink.add, |
| + this.listen( |
| + sink.add, |
| onError: onError, |
| onDone: () { |
| sink.close(); |
| @@ -191,10 +195,10 @@ abstract class Stream<T> { |
| Future<bool> contains(T match) { |
| _FutureImpl<bool> future = new _FutureImpl<bool>(); |
| StreamSubscription subscription; |
| - subscription = subscribe( |
| - onData: (T element) { |
| + subscription = listen( |
|
Lasse Reichstein Nielsen
2013/01/04 08:17:55
Not your code (probably mine), but ...
This uses "
floitsch
2013/01/04 15:51:36
Added this. here at the other use-sites.
|
| + (T element) { |
| if (element == match) { |
| - subscription.unsubscribe(); |
| + subscription.cancel(); |
| future._setValue(true); |
| } |
| }, |
| @@ -215,10 +219,10 @@ abstract class Stream<T> { |
| Future<bool> every(bool test(T element)) { |
| _FutureImpl<bool> future = new _FutureImpl<bool>(); |
| StreamSubscription subscription; |
| - subscription = subscribe( |
| - onData: (T element) { |
| + subscription = listen( |
| + (T element) { |
| if (!test(element)) { |
| - subscription.unsubscribe(); |
| + subscription.cancel(); |
| future._setValue(false); |
| } |
| }, |
| @@ -239,10 +243,10 @@ abstract class Stream<T> { |
| Future<bool> any(bool test(T element)) { |
| _FutureImpl<bool> future = new _FutureImpl<bool>(); |
| StreamSubscription subscription; |
| - subscription = subscribe( |
| - onData: (T element) { |
| + subscription = listen( |
| + (T element) { |
| if (test(element)) { |
| - subscription.unsubscribe(); |
| + subscription.cancel(); |
| future._setValue(true); |
| } |
| }, |
| @@ -259,8 +263,8 @@ abstract class Stream<T> { |
| Future<int> get length { |
| _FutureImpl<int> future = new _FutureImpl<int>(); |
| int count = 0; |
| - subscribe( |
| - onData: (_) { count++; }, |
| + listen( |
| + (_) { count++; }, |
| onError: future._setError, |
| onDone: () { |
| future._setValue(count); |
| @@ -283,8 +287,8 @@ abstract class Stream<T> { |
| _FutureImpl<T> future = new _FutureImpl<T>(); |
| StreamSubscription subscription; |
| T min = null; |
| - subscription = subscribe( |
| - onData: (T value) { |
| + subscription = listen( |
| + (T value) { |
| min = value; |
| subscription.onData = (T value) { |
| if (compare(min, value) > 0) min = value; |
| @@ -312,8 +316,8 @@ abstract class Stream<T> { |
| _FutureImpl<T> future = new _FutureImpl<T>(); |
| StreamSubscription subscription; |
| T max = null; |
| - subscription = subscribe( |
| - onData: (T value) { |
| + subscription = listen( |
| + (T value) { |
| max = value; |
| subscription.onData = (T value) { |
| if (compare(max, value) < 0) max = value; |
| @@ -331,9 +335,9 @@ abstract class Stream<T> { |
| Future<bool> get isEmpty { |
| _FutureImpl<bool> future = new _FutureImpl<bool>(); |
| StreamSubscription subscription; |
| - subscription = subscribe( |
| - onData: (_) { |
| - subscription.unsubscribe(); |
| + subscription = listen( |
| + (_) { |
| + subscription.cancel(); |
| future._setValue(false); |
| }, |
| onError: future._setError, |
| @@ -348,8 +352,8 @@ abstract class Stream<T> { |
| Future<List<T>> toList() { |
| List<T> result = <T>[]; |
| _FutureImpl<List<T>> future = new _FutureImpl<List<T>>(); |
| - subscribe( |
| - onData: (T data) { |
| + listen( |
| + (T data) { |
| result.add(data); |
| }, |
| onError: future._setError, |
| @@ -364,8 +368,8 @@ abstract class Stream<T> { |
| Future<Set<T>> toSet() { |
| Set<T> result = new Set<T>(); |
| _FutureImpl<Set<T>> future = new _FutureImpl<Set<T>>(); |
| - subscribe( |
| - onData: (T data) { |
| + listen( |
| + (T data) { |
| result.add(data); |
| }, |
| onError: future._setError, |
| @@ -442,10 +446,10 @@ abstract class Stream<T> { |
| Future<T> get first { |
| _FutureImpl<T> future = new _FutureImpl(); |
| StreamSubscription subscription; |
| - subscription = subscribe( |
| - onData: (T value) { |
| + subscription = listen( |
| + (T value) { |
| future._setValue(value); |
| - subscription.unsubscribe(); |
| + subscription.cancel(); |
| return; |
| }, |
| onError: future._setError, |
| @@ -466,8 +470,8 @@ abstract class Stream<T> { |
| T result = null; |
| bool foundResult = false; |
| StreamSubscription subscription; |
| - subscription = subscribe( |
| - onData: (T value) { |
| + subscription = listen( |
| + (T value) { |
| foundResult = true; |
| result = value; |
| }, |
| @@ -493,13 +497,13 @@ abstract class Stream<T> { |
| T result = null; |
| bool foundResult = false; |
| StreamSubscription subscription; |
| - subscription = subscribe( |
| - onData: (T value) { |
| + subscription = listen( |
| + (T value) { |
| if (foundResult) { |
| // This is the second element we get. |
| Error error = new StateError("More than one element"); |
| future._setError(new AsyncError(error)); |
| - subscription.unsubscribe(); |
| + subscription.cancel(); |
| return; |
| } |
| foundResult = true; |
| @@ -534,19 +538,19 @@ abstract class Stream<T> { |
| Future<T> firstMatching(bool test(T value), {T defaultValue()}) { |
| _FutureImpl<T> future = new _FutureImpl<T>(); |
| StreamSubscription subscription; |
| - subscription = subscribe( |
| - onData: (T value) { |
| + subscription = listen( |
| + (T value) { |
| bool matches; |
| try { |
| matches = (true == test(value)); |
| } catch (e, s) { |
| future._setError(new AsyncError(e, s)); |
| - subscription.unsubscribe(); |
| + subscription.cancel(); |
| return; |
| } |
| if (matches) { |
| future._setValue(value); |
| - subscription.unsubscribe(); |
| + subscription.cancel(); |
| } |
| }, |
| onError: future._setError, |
| @@ -581,14 +585,14 @@ abstract class Stream<T> { |
| T result = null; |
| bool foundResult = false; |
| StreamSubscription subscription; |
| - subscription = subscribe( |
| - onData: (T value) { |
| + subscription = listen( |
| + (T value) { |
| bool matches; |
| try { |
| matches = (true == test(value)); |
| } catch (e, s) { |
| future._setError(new AsyncError(e, s)); |
| - subscription.unsubscribe(); |
| + subscription.cancel(); |
| return; |
| } |
| if (matches) { |
| @@ -631,21 +635,21 @@ abstract class Stream<T> { |
| T result = null; |
| bool foundResult = false; |
| StreamSubscription subscription; |
| - subscription = subscribe( |
| - onData: (T value) { |
| + subscription = listen( |
| + (T value) { |
| bool matches; |
| try { |
| matches = (true == test(value)); |
| } catch (e, s) { |
| future._setError(new AsyncError(e, s)); |
| - subscription.unsubscribe(); |
| + subscription.cancel(); |
| return; |
| } |
| if (matches) { |
| if (foundResult) { |
| future._setError(new AsyncError( |
| new StateError('Multiple matches for "single"'))); |
| - subscription.unsubscribe(); |
| + subscription.cancel(); |
| return; |
| } |
| foundResult = true; |
| @@ -676,11 +680,11 @@ abstract class Stream<T> { |
| Future<T> elementAt(int index) { |
| _FutureImpl<T> future = new _FutureImpl(); |
| StreamSubscription subscription; |
| - subscription = subscribe( |
| - onData: (T value) { |
| + subscription = listen( |
| + (T value) { |
| if (index == 0) { |
| future._setValue(value); |
| - subscription.unsubscribe(); |
| + subscription.cancel(); |
| return; |
| } |
| index -= 1; |
| @@ -710,7 +714,7 @@ abstract class StreamSubscription<T> { |
| * If an event is currently firing, this unsubscription will only |
| * take effect after all subscribers have received the current event. |
| */ |
| - void unsubscribe(); |
| + void cancel(); |
| /** Set or override the data event handler of this subscription. */ |
| void onData(void handleData(T data)); |
| @@ -756,11 +760,11 @@ class StreamView<T> extends Stream<T> { |
| StreamView(this._stream); |
| - StreamSubscription<T> subscribe({void onData(T value), |
| - void onError(AsyncError error), |
| - void onDone(), |
| - bool unsubscribeOnError}) { |
| - return _stream.subscribe(onData: onData, onError: onError, onDone: onDone, |
| + StreamSubscription<T> listen(void onData(T value), |
| + { void onError(AsyncError error), |
| + void onDone(), |
| + bool unsubscribeOnError}) { |
| + return _stream.listen(onData, onError: onError, onDone: onDone, |
| unsubscribeOnError: unsubscribeOnError); |
| } |
| } |