Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(8)

Side by Side Diff: sdk/lib/async/stream.dart

Issue 14071002: Added new version of reduce. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 7 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Core Stream types 8 // Core Stream types
9 // ------------------------------------------------------------------- 9 // -------------------------------------------------------------------
10 10
(...skipping 235 matching lines...) Expand 10 before | Expand all | Expand 10 after
246 * Chains this stream as the input of the provided [StreamTransformer]. 246 * Chains this stream as the input of the provided [StreamTransformer].
247 * 247 *
248 * Returns the result of [:streamTransformer.bind:] itself. 248 * Returns the result of [:streamTransformer.bind:] itself.
249 */ 249 */
250 Stream transform(StreamTransformer<T, dynamic> streamTransformer) { 250 Stream transform(StreamTransformer<T, dynamic> streamTransformer) {
251 return streamTransformer.bind(this); 251 return streamTransformer.bind(this);
252 } 252 }
253 253
254 /** 254 /**
255 * Reduces a sequence of values by repeatedly applying [combine]. 255 * Reduces a sequence of values by repeatedly applying [combine].
256 *
257 * *WARNING UPCOMING API-CHANGE*: This method will be changed so that
258 * it doesn't take an initial value. Use [fold] instead.
259 */ 256 */
260 Future reduce(var initialValue, combine(var previous, T element)) { 257 Future<T> reduce(T combine(T previous, T element)) {
261 return fold(initialValue, combine); 258 _FutureImpl<T> result = new _FutureImpl<T>();
259 bool seenFirst = false;
260 T value;
261 StreamSubscription subscription = this.listen(
262 // TODO(ahe): Restore type when feature is implemented in dart2js
263 // checked mode. http://dartbug.com/7733
264 (/* T */ element) {
265 if (seenFirst) {
266 _runUserCode(() => combine(value, element),
267 (T newValue) { value = newValue; },
268 _cancelAndError(subscription, result));
269 } else {
270 value = element;
271 seenFirst = true;
272 }
273 },
274 onError: result._setError,
275 onDone: () {
276 if (!seenFirst) {
277 result._setError(new StateError("No elements"));
278 } else {
279 result._setValue(value);
280 }
281 },
282 unsubscribeOnError: true;
283 );
284 return result;
262 } 285 }
263 286
264 /** Reduces a sequence of values by repeatedly applying [combine]. */ 287 /** Reduces a sequence of values by repeatedly applying [combine]. */
265 Future fold(var initialValue, combine(var previous, T element)) { 288 Future fold(var initialValue, combine(var previous, T element)) {
266 _FutureImpl result = new _FutureImpl(); 289 _FutureImpl result = new _FutureImpl();
267 var value = initialValue; 290 var value = initialValue;
268 StreamSubscription subscription; 291 StreamSubscription subscription;
269 subscription = this.listen( 292 subscription = this.listen(
270 // TODO(ahe): Restore type when feature is implemented in dart2js 293 // TODO(ahe): Restore type when feature is implemented in dart2js
271 // checked mode. http://dartbug.com/7733 294 // checked mode. http://dartbug.com/7733
272 (/*T*/ element) { 295 (/*T*/ element) {
273 _runUserCode( 296 _runUserCode(
274 () => combine(value, element), 297 () => combine(value, element),
275 (result) { value = result; }, 298 (newValue) { value = newValue; },
276 _cancelAndError(subscription, result) 299 _cancelAndError(subscription, result)
277 ); 300 );
278 }, 301 },
279 onError: (AsyncError e) { 302 onError: (AsyncError e) {
280 result._setError(e); 303 result._setError(e);
281 }, 304 },
282 onDone: () { 305 onDone: () {
283 result._setValue(value); 306 result._setValue(value);
284 }, 307 },
285 unsubscribeOnError: true); 308 unsubscribeOnError: true);
(...skipping 121 matching lines...) Expand 10 before | Expand all | Expand 10 after
407 this.listen( 430 this.listen(
408 (_) { count++; }, 431 (_) { count++; },
409 onError: future._setError, 432 onError: future._setError,
410 onDone: () { 433 onDone: () {
411 future._setValue(count); 434 future._setValue(count);
412 }, 435 },
413 unsubscribeOnError: true); 436 unsubscribeOnError: true);
414 return future; 437 return future;
415 } 438 }
416 439
417 /**
418 * Finds the least element in the stream.
419 *
420 * If the stream is empty, the result is [:null:].
421 * Otherwise the result is a value from the stream that is not greater
422 * than any other value from the stream (according to [compare], which must
423 * be a [Comparator]).
424 *
425 * If [compare] is omitted, it defaults to [Comparable.compare].
426 *
427 * *Deprecated*. Use [reduce] with a binary min method if needed.
428 */
429 Future<T> min([int compare(T a, T b)]) {
430 if (compare == null) {
431 var defaultCompare = Comparable.compare;
432 compare = defaultCompare;
433 }
434 _FutureImpl<T> future = new _FutureImpl<T>();
435 StreamSubscription subscription;
436 T min = null;
437 subscription = this.listen(
438 // TODO(ahe): Restore type when feature is implemented in dart2js
439 // checked mode. http://dartbug.com/7733
440 (/*T*/ value) {
441 min = value;
442 subscription.onData((T value) {
443 _runUserCode(
444 () => compare(min, value) > 0,
445 (bool foundSmaller) {
446 if (foundSmaller) {
447 min = value;
448 }
449 },
450 _cancelAndError(subscription, future)
451 );
452 });
453 },
454 onError: future._setError,
455 onDone: () {
456 future._setValue(min);
457 },
458 unsubscribeOnError: true
459 );
460 return future;
461 }
462
463 /**
464 * Finds the largest element in the stream.
465 *
466 * If the stream is empty, the result is [:null:].
467 * Otherwise the result is an value from the stream that is not smaller
468 * than any other value from the stream (according to [compare], which must
469 * be a [Comparator]).
470 *
471 * If [compare] is omitted, it defaults to [Comparable.compare].
472 *
473 * *Deprecated*. Use [reduce] with a binary max method if needed.
474 */
475 Future<T> max([int compare(T a, T b)]) {
476 if (compare == null) {
477 var defaultCompare = Comparable.compare;
478 compare = defaultCompare;
479 }
480 _FutureImpl<T> future = new _FutureImpl<T>();
481 StreamSubscription subscription;
482 T max = null;
483 subscription = this.listen(
484 // TODO(ahe): Restore type when feature is implemented in dart2js
485 // checked mode. http://dartbug.com/7733
486 (/*T*/ value) {
487 max = value;
488 subscription.onData((T value) {
489 _runUserCode(
490 () => compare(max, value) < 0,
491 (bool foundGreater) {
492 if (foundGreater) {
493 max = value;
494 }
495 },
496 _cancelAndError(subscription, future)
497 );
498 });
499 },
500 onError: future._setError,
501 onDone: () {
502 future._setValue(max);
503 },
504 unsubscribeOnError: true
505 );
506 return future;
507 }
508
509 /** Reports whether this stream contains any elements. */ 440 /** Reports whether this stream contains any elements. */
510 Future<bool> get isEmpty { 441 Future<bool> get isEmpty {
511 _FutureImpl<bool> future = new _FutureImpl<bool>(); 442 _FutureImpl<bool> future = new _FutureImpl<bool>();
512 StreamSubscription subscription; 443 StreamSubscription subscription;
513 subscription = this.listen( 444 subscription = this.listen(
514 (_) { 445 (_) {
515 subscription.cancel(); 446 subscription.cancel();
516 future._setValue(false); 447 future._setValue(false);
517 }, 448 },
518 onError: future._setError, 449 onError: future._setError,
(...skipping 708 matching lines...) Expand 10 before | Expand all | Expand 10 after
1227 1158
1228 /* TODO(8997): Implement EventSink instead, */ 1159 /* TODO(8997): Implement EventSink instead, */
1229 class _EventOutputSinkWrapper<T> extends StreamSink<T> { 1160 class _EventOutputSinkWrapper<T> extends StreamSink<T> {
1230 _EventOutputSink _sink; 1161 _EventOutputSink _sink;
1231 _EventOutputSinkWrapper(this._sink); 1162 _EventOutputSinkWrapper(this._sink);
1232 1163
1233 void add(T data) { _sink._sendData(data); } 1164 void add(T data) { _sink._sendData(data); }
1234 void addError(AsyncError error) { _sink._sendError(error); } 1165 void addError(AsyncError error) { _sink._sendError(error); }
1235 void close() { _sink._sendDone(); } 1166 void close() { _sink._sendDone(); }
1236 } 1167 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698