Index: sdk/lib/async/stream.dart |
diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart |
index c1699f0d636b1a22025551fcd36f9d5ccc4f3674..5224acf6abde146789edae26512cc4914b5af399 100644 |
--- a/sdk/lib/async/stream.dart |
+++ b/sdk/lib/async/stream.dart |
@@ -253,12 +253,35 @@ abstract class Stream<T> { |
/** |
* Reduces a sequence of values by repeatedly applying [combine]. |
- * |
- * *WARNING UPCOMING API-CHANGE*: This method will be changed so that |
- * it doesn't take an initial value. Use [fold] instead. |
*/ |
- Future reduce(var initialValue, combine(var previous, T element)) { |
- return fold(initialValue, combine); |
+ Future<T> reduce(T combine(T previous, T element)) { |
+ _FutureImpl<T> result = new _FutureImpl<T>(); |
+ bool seenFirst = false; |
+ T value; |
+ StreamSubscription subscription = this.listen( |
+ // TODO(ahe): Restore type when feature is implemented in dart2js |
+ // checked mode. http://dartbug.com/7733 |
+ (/* T */ element) { |
+ if (seenFirst) { |
+ _runUserCode(() => combine(value, element), |
+ (T newValue) { value = newValue; }, |
+ _cancelAndError(subscription, result)); |
+ } else { |
+ value = element; |
+ seenFirst = true; |
+ } |
+ }, |
+ onError: result._setError, |
+ onDone: () { |
+ if (!seenFirst) { |
+ result._setError(new StateError("No elements")); |
+ } else { |
+ result._setValue(value); |
+ } |
+ }, |
+ unsubscribeOnError: true; |
+ ); |
+ return result; |
} |
/** Reduces a sequence of values by repeatedly applying [combine]. */ |
@@ -272,7 +295,7 @@ abstract class Stream<T> { |
(/*T*/ element) { |
_runUserCode( |
() => combine(value, element), |
- (result) { value = result; }, |
+ (newValue) { value = newValue; }, |
_cancelAndError(subscription, result) |
); |
}, |
@@ -414,98 +437,6 @@ abstract class Stream<T> { |
return future; |
} |
- /** |
- * Finds the least element in the stream. |
- * |
- * If the stream is empty, the result is [:null:]. |
- * Otherwise the result is a value from the stream that is not greater |
- * than any other value from the stream (according to [compare], which must |
- * be a [Comparator]). |
- * |
- * If [compare] is omitted, it defaults to [Comparable.compare]. |
- * |
- * *Deprecated*. Use [reduce] with a binary min method if needed. |
- */ |
- Future<T> min([int compare(T a, T b)]) { |
- if (compare == null) { |
- var defaultCompare = Comparable.compare; |
- compare = defaultCompare; |
- } |
- _FutureImpl<T> future = new _FutureImpl<T>(); |
- StreamSubscription subscription; |
- T min = null; |
- subscription = this.listen( |
- // TODO(ahe): Restore type when feature is implemented in dart2js |
- // checked mode. http://dartbug.com/7733 |
- (/*T*/ value) { |
- min = value; |
- subscription.onData((T value) { |
- _runUserCode( |
- () => compare(min, value) > 0, |
- (bool foundSmaller) { |
- if (foundSmaller) { |
- min = value; |
- } |
- }, |
- _cancelAndError(subscription, future) |
- ); |
- }); |
- }, |
- onError: future._setError, |
- onDone: () { |
- future._setValue(min); |
- }, |
- unsubscribeOnError: true |
- ); |
- return future; |
- } |
- |
- /** |
- * Finds the largest element in the stream. |
- * |
- * If the stream is empty, the result is [:null:]. |
- * Otherwise the result is an value from the stream that is not smaller |
- * than any other value from the stream (according to [compare], which must |
- * be a [Comparator]). |
- * |
- * If [compare] is omitted, it defaults to [Comparable.compare]. |
- * |
- * *Deprecated*. Use [reduce] with a binary max method if needed. |
- */ |
- Future<T> max([int compare(T a, T b)]) { |
- if (compare == null) { |
- var defaultCompare = Comparable.compare; |
- compare = defaultCompare; |
- } |
- _FutureImpl<T> future = new _FutureImpl<T>(); |
- StreamSubscription subscription; |
- T max = null; |
- subscription = this.listen( |
- // TODO(ahe): Restore type when feature is implemented in dart2js |
- // checked mode. http://dartbug.com/7733 |
- (/*T*/ value) { |
- max = value; |
- subscription.onData((T value) { |
- _runUserCode( |
- () => compare(max, value) < 0, |
- (bool foundGreater) { |
- if (foundGreater) { |
- max = value; |
- } |
- }, |
- _cancelAndError(subscription, future) |
- ); |
- }); |
- }, |
- onError: future._setError, |
- onDone: () { |
- future._setValue(max); |
- }, |
- unsubscribeOnError: true |
- ); |
- return future; |
- } |
- |
/** Reports whether this stream contains any elements. */ |
Future<bool> get isEmpty { |
_FutureImpl<bool> future = new _FutureImpl<bool>(); |