Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(322)

Unified Diff: sdk/lib/async/stream.dart

Issue 14071002: Added new version of reduce. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Removed more uses of max, and a few bugs. Created 7 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: sdk/lib/async/stream.dart
diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart
index c1699f0d636b1a22025551fcd36f9d5ccc4f3674..04cecc61c17cca0edbe68286354c50b42b9b2890 100644
--- a/sdk/lib/async/stream.dart
+++ b/sdk/lib/async/stream.dart
@@ -253,12 +253,36 @@ abstract class Stream<T> {
/**
* Reduces a sequence of values by repeatedly applying [combine].
- *
- * *WARNING UPCOMING API-CHANGE*: This method will be changed so that
- * it doesn't take an initial value. Use [fold] instead.
*/
- Future reduce(var initialValue, combine(var previous, T element)) {
- return fold(initialValue, combine);
+ Future<T> reduce(T combine(T previous, T element)) {
+ _FutureImpl<T> result = new _FutureImpl<T>();
+ bool seenFirst = false;
+ T value;
+ StreamSubscription subscription;
+ subscription = this.listen(
+ // TODO(ahe): Restore type when feature is implemented in dart2js
+ // checked mode. http://dartbug.com/7733
+ (/* T */ element) {
+ if (seenFirst) {
+ _runUserCode(() => combine(value, element),
+ (T newValue) { value = newValue; },
+ _cancelAndError(subscription, result));
+ } else {
+ value = element;
+ seenFirst = true;
+ }
+ },
+ onError: result._setError,
+ onDone: () {
+ if (!seenFirst) {
+ result._setError(new StateError("No elements"));
+ } else {
+ result._setValue(value);
+ }
+ },
+ unsubscribeOnError: true
+ );
+ return result;
}
/** Reduces a sequence of values by repeatedly applying [combine]. */
@@ -272,7 +296,7 @@ abstract class Stream<T> {
(/*T*/ element) {
_runUserCode(
() => combine(value, element),
- (result) { value = result; },
+ (newValue) { value = newValue; },
_cancelAndError(subscription, result)
);
},
@@ -414,98 +438,6 @@ abstract class Stream<T> {
return future;
}
- /**
- * Finds the least element in the stream.
- *
- * If the stream is empty, the result is [:null:].
- * Otherwise the result is a value from the stream that is not greater
- * than any other value from the stream (according to [compare], which must
- * be a [Comparator]).
- *
- * If [compare] is omitted, it defaults to [Comparable.compare].
- *
- * *Deprecated*. Use [reduce] with a binary min method if needed.
- */
- Future<T> min([int compare(T a, T b)]) {
- if (compare == null) {
- var defaultCompare = Comparable.compare;
- compare = defaultCompare;
- }
- _FutureImpl<T> future = new _FutureImpl<T>();
- StreamSubscription subscription;
- T min = null;
- subscription = this.listen(
- // TODO(ahe): Restore type when feature is implemented in dart2js
- // checked mode. http://dartbug.com/7733
- (/*T*/ value) {
- min = value;
- subscription.onData((T value) {
- _runUserCode(
- () => compare(min, value) > 0,
- (bool foundSmaller) {
- if (foundSmaller) {
- min = value;
- }
- },
- _cancelAndError(subscription, future)
- );
- });
- },
- onError: future._setError,
- onDone: () {
- future._setValue(min);
- },
- unsubscribeOnError: true
- );
- return future;
- }
-
- /**
- * Finds the largest element in the stream.
- *
- * If the stream is empty, the result is [:null:].
- * Otherwise the result is an value from the stream that is not smaller
- * than any other value from the stream (according to [compare], which must
- * be a [Comparator]).
- *
- * If [compare] is omitted, it defaults to [Comparable.compare].
- *
- * *Deprecated*. Use [reduce] with a binary max method if needed.
- */
- Future<T> max([int compare(T a, T b)]) {
- if (compare == null) {
- var defaultCompare = Comparable.compare;
- compare = defaultCompare;
- }
- _FutureImpl<T> future = new _FutureImpl<T>();
- StreamSubscription subscription;
- T max = null;
- subscription = this.listen(
- // TODO(ahe): Restore type when feature is implemented in dart2js
- // checked mode. http://dartbug.com/7733
- (/*T*/ value) {
- max = value;
- subscription.onData((T value) {
- _runUserCode(
- () => compare(max, value) < 0,
- (bool foundGreater) {
- if (foundGreater) {
- max = value;
- }
- },
- _cancelAndError(subscription, future)
- );
- });
- },
- onError: future._setError,
- onDone: () {
- future._setValue(max);
- },
- unsubscribeOnError: true
- );
- return future;
- }
-
/** Reports whether this stream contains any elements. */
Future<bool> get isEmpty {
_FutureImpl<bool> future = new _FutureImpl<bool>();

Powered by Google App Engine
This is Rietveld 408576698