Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(58)

Side by Side Diff: sdk/lib/async/stream.dart

Issue 14071002: Added new version of reduce. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Removed more uses of max, and a few bugs. Created 7 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Core Stream types 8 // Core Stream types
9 // ------------------------------------------------------------------- 9 // -------------------------------------------------------------------
10 10
(...skipping 235 matching lines...) Expand 10 before | Expand all | Expand 10 after
246 * Chains this stream as the input of the provided [StreamTransformer]. 246 * Chains this stream as the input of the provided [StreamTransformer].
247 * 247 *
248 * Returns the result of [:streamTransformer.bind:] itself. 248 * Returns the result of [:streamTransformer.bind:] itself.
249 */ 249 */
250 Stream transform(StreamTransformer<T, dynamic> streamTransformer) { 250 Stream transform(StreamTransformer<T, dynamic> streamTransformer) {
251 return streamTransformer.bind(this); 251 return streamTransformer.bind(this);
252 } 252 }
253 253
254 /** 254 /**
255 * Reduces a sequence of values by repeatedly applying [combine]. 255 * Reduces a sequence of values by repeatedly applying [combine].
256 *
257 * *WARNING UPCOMING API-CHANGE*: This method will be changed so that
258 * it doesn't take an initial value. Use [fold] instead.
259 */ 256 */
260 Future reduce(var initialValue, combine(var previous, T element)) { 257 Future<T> reduce(T combine(T previous, T element)) {
261 return fold(initialValue, combine); 258 _FutureImpl<T> result = new _FutureImpl<T>();
259 bool seenFirst = false;
260 T value;
261 StreamSubscription subscription;
262 subscription = this.listen(
263 // TODO(ahe): Restore type when feature is implemented in dart2js
264 // checked mode. http://dartbug.com/7733
265 (/* T */ element) {
266 if (seenFirst) {
267 _runUserCode(() => combine(value, element),
268 (T newValue) { value = newValue; },
269 _cancelAndError(subscription, result));
270 } else {
271 value = element;
272 seenFirst = true;
273 }
274 },
275 onError: result._setError,
276 onDone: () {
277 if (!seenFirst) {
278 result._setError(new StateError("No elements"));
279 } else {
280 result._setValue(value);
281 }
282 },
283 unsubscribeOnError: true
284 );
285 return result;
262 } 286 }
263 287
264 /** Reduces a sequence of values by repeatedly applying [combine]. */ 288 /** Reduces a sequence of values by repeatedly applying [combine]. */
265 Future fold(var initialValue, combine(var previous, T element)) { 289 Future fold(var initialValue, combine(var previous, T element)) {
266 _FutureImpl result = new _FutureImpl(); 290 _FutureImpl result = new _FutureImpl();
267 var value = initialValue; 291 var value = initialValue;
268 StreamSubscription subscription; 292 StreamSubscription subscription;
269 subscription = this.listen( 293 subscription = this.listen(
270 // TODO(ahe): Restore type when feature is implemented in dart2js 294 // TODO(ahe): Restore type when feature is implemented in dart2js
271 // checked mode. http://dartbug.com/7733 295 // checked mode. http://dartbug.com/7733
272 (/*T*/ element) { 296 (/*T*/ element) {
273 _runUserCode( 297 _runUserCode(
274 () => combine(value, element), 298 () => combine(value, element),
275 (result) { value = result; }, 299 (newValue) { value = newValue; },
276 _cancelAndError(subscription, result) 300 _cancelAndError(subscription, result)
277 ); 301 );
278 }, 302 },
279 onError: (AsyncError e) { 303 onError: (AsyncError e) {
280 result._setError(e); 304 result._setError(e);
281 }, 305 },
282 onDone: () { 306 onDone: () {
283 result._setValue(value); 307 result._setValue(value);
284 }, 308 },
285 unsubscribeOnError: true); 309 unsubscribeOnError: true);
(...skipping 121 matching lines...) Expand 10 before | Expand all | Expand 10 after
407 this.listen( 431 this.listen(
408 (_) { count++; }, 432 (_) { count++; },
409 onError: future._setError, 433 onError: future._setError,
410 onDone: () { 434 onDone: () {
411 future._setValue(count); 435 future._setValue(count);
412 }, 436 },
413 unsubscribeOnError: true); 437 unsubscribeOnError: true);
414 return future; 438 return future;
415 } 439 }
416 440
417 /**
418 * Finds the least element in the stream.
419 *
420 * If the stream is empty, the result is [:null:].
421 * Otherwise the result is a value from the stream that is not greater
422 * than any other value from the stream (according to [compare], which must
423 * be a [Comparator]).
424 *
425 * If [compare] is omitted, it defaults to [Comparable.compare].
426 *
427 * *Deprecated*. Use [reduce] with a binary min method if needed.
428 */
429 Future<T> min([int compare(T a, T b)]) {
430 if (compare == null) {
431 var defaultCompare = Comparable.compare;
432 compare = defaultCompare;
433 }
434 _FutureImpl<T> future = new _FutureImpl<T>();
435 StreamSubscription subscription;
436 T min = null;
437 subscription = this.listen(
438 // TODO(ahe): Restore type when feature is implemented in dart2js
439 // checked mode. http://dartbug.com/7733
440 (/*T*/ value) {
441 min = value;
442 subscription.onData((T value) {
443 _runUserCode(
444 () => compare(min, value) > 0,
445 (bool foundSmaller) {
446 if (foundSmaller) {
447 min = value;
448 }
449 },
450 _cancelAndError(subscription, future)
451 );
452 });
453 },
454 onError: future._setError,
455 onDone: () {
456 future._setValue(min);
457 },
458 unsubscribeOnError: true
459 );
460 return future;
461 }
462
463 /**
464 * Finds the largest element in the stream.
465 *
466 * If the stream is empty, the result is [:null:].
467 * Otherwise the result is an value from the stream that is not smaller
468 * than any other value from the stream (according to [compare], which must
469 * be a [Comparator]).
470 *
471 * If [compare] is omitted, it defaults to [Comparable.compare].
472 *
473 * *Deprecated*. Use [reduce] with a binary max method if needed.
474 */
475 Future<T> max([int compare(T a, T b)]) {
476 if (compare == null) {
477 var defaultCompare = Comparable.compare;
478 compare = defaultCompare;
479 }
480 _FutureImpl<T> future = new _FutureImpl<T>();
481 StreamSubscription subscription;
482 T max = null;
483 subscription = this.listen(
484 // TODO(ahe): Restore type when feature is implemented in dart2js
485 // checked mode. http://dartbug.com/7733
486 (/*T*/ value) {
487 max = value;
488 subscription.onData((T value) {
489 _runUserCode(
490 () => compare(max, value) < 0,
491 (bool foundGreater) {
492 if (foundGreater) {
493 max = value;
494 }
495 },
496 _cancelAndError(subscription, future)
497 );
498 });
499 },
500 onError: future._setError,
501 onDone: () {
502 future._setValue(max);
503 },
504 unsubscribeOnError: true
505 );
506 return future;
507 }
508
509 /** Reports whether this stream contains any elements. */ 441 /** Reports whether this stream contains any elements. */
510 Future<bool> get isEmpty { 442 Future<bool> get isEmpty {
511 _FutureImpl<bool> future = new _FutureImpl<bool>(); 443 _FutureImpl<bool> future = new _FutureImpl<bool>();
512 StreamSubscription subscription; 444 StreamSubscription subscription;
513 subscription = this.listen( 445 subscription = this.listen(
514 (_) { 446 (_) {
515 subscription.cancel(); 447 subscription.cancel();
516 future._setValue(false); 448 future._setValue(false);
517 }, 449 },
518 onError: future._setError, 450 onError: future._setError,
(...skipping 708 matching lines...) Expand 10 before | Expand all | Expand 10 after
1227 1159
1228 /* TODO(8997): Implement EventSink instead, */ 1160 /* TODO(8997): Implement EventSink instead, */
1229 class _EventOutputSinkWrapper<T> extends StreamSink<T> { 1161 class _EventOutputSinkWrapper<T> extends StreamSink<T> {
1230 _EventOutputSink _sink; 1162 _EventOutputSink _sink;
1231 _EventOutputSinkWrapper(this._sink); 1163 _EventOutputSinkWrapper(this._sink);
1232 1164
1233 void add(T data) { _sink._sendData(data); } 1165 void add(T data) { _sink._sendData(data); }
1234 void addError(AsyncError error) { _sink._sendError(error); } 1166 void addError(AsyncError error) { _sink._sendError(error); }
1235 void close() { _sink._sendDone(); } 1167 void close() { _sink._sendDone(); }
1236 } 1168 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698