OLD | NEW |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
8 // Core Stream types | 8 // Core Stream types |
9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
10 | 10 |
(...skipping 235 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
246 * Chains this stream as the input of the provided [StreamTransformer]. | 246 * Chains this stream as the input of the provided [StreamTransformer]. |
247 * | 247 * |
248 * Returns the result of [:streamTransformer.bind:] itself. | 248 * Returns the result of [:streamTransformer.bind:] itself. |
249 */ | 249 */ |
250 Stream transform(StreamTransformer<T, dynamic> streamTransformer) { | 250 Stream transform(StreamTransformer<T, dynamic> streamTransformer) { |
251 return streamTransformer.bind(this); | 251 return streamTransformer.bind(this); |
252 } | 252 } |
253 | 253 |
254 /** | 254 /** |
255 * Reduces a sequence of values by repeatedly applying [combine]. | 255 * Reduces a sequence of values by repeatedly applying [combine]. |
256 * | |
257 * *WARNING UPCOMING API-CHANGE*: This method will be changed so that | |
258 * it doesn't take an initial value. Use [fold] instead. | |
259 */ | 256 */ |
260 Future reduce(var initialValue, combine(var previous, T element)) { | 257 Future<T> reduce(T combine(T previous, T element)) { |
261 return fold(initialValue, combine); | 258 _FutureImpl<T> result = new _FutureImpl<T>(); |
| 259 bool seenFirst = false; |
| 260 T value; |
| 261 StreamSubscription subscription; |
| 262 subscription = this.listen( |
| 263 // TODO(ahe): Restore type when feature is implemented in dart2js |
| 264 // checked mode. http://dartbug.com/7733 |
| 265 (/* T */ element) { |
| 266 if (seenFirst) { |
| 267 _runUserCode(() => combine(value, element), |
| 268 (T newValue) { value = newValue; }, |
| 269 _cancelAndError(subscription, result)); |
| 270 } else { |
| 271 value = element; |
| 272 seenFirst = true; |
| 273 } |
| 274 }, |
| 275 onError: result._setError, |
| 276 onDone: () { |
| 277 if (!seenFirst) { |
| 278 result._setError(new StateError("No elements")); |
| 279 } else { |
| 280 result._setValue(value); |
| 281 } |
| 282 }, |
| 283 unsubscribeOnError: true |
| 284 ); |
| 285 return result; |
262 } | 286 } |
263 | 287 |
264 /** Reduces a sequence of values by repeatedly applying [combine]. */ | 288 /** Reduces a sequence of values by repeatedly applying [combine]. */ |
265 Future fold(var initialValue, combine(var previous, T element)) { | 289 Future fold(var initialValue, combine(var previous, T element)) { |
266 _FutureImpl result = new _FutureImpl(); | 290 _FutureImpl result = new _FutureImpl(); |
267 var value = initialValue; | 291 var value = initialValue; |
268 StreamSubscription subscription; | 292 StreamSubscription subscription; |
269 subscription = this.listen( | 293 subscription = this.listen( |
270 // TODO(ahe): Restore type when feature is implemented in dart2js | 294 // TODO(ahe): Restore type when feature is implemented in dart2js |
271 // checked mode. http://dartbug.com/7733 | 295 // checked mode. http://dartbug.com/7733 |
272 (/*T*/ element) { | 296 (/*T*/ element) { |
273 _runUserCode( | 297 _runUserCode( |
274 () => combine(value, element), | 298 () => combine(value, element), |
275 (result) { value = result; }, | 299 (newValue) { value = newValue; }, |
276 _cancelAndError(subscription, result) | 300 _cancelAndError(subscription, result) |
277 ); | 301 ); |
278 }, | 302 }, |
279 onError: (AsyncError e) { | 303 onError: (AsyncError e) { |
280 result._setError(e); | 304 result._setError(e); |
281 }, | 305 }, |
282 onDone: () { | 306 onDone: () { |
283 result._setValue(value); | 307 result._setValue(value); |
284 }, | 308 }, |
285 unsubscribeOnError: true); | 309 unsubscribeOnError: true); |
(...skipping 121 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
407 this.listen( | 431 this.listen( |
408 (_) { count++; }, | 432 (_) { count++; }, |
409 onError: future._setError, | 433 onError: future._setError, |
410 onDone: () { | 434 onDone: () { |
411 future._setValue(count); | 435 future._setValue(count); |
412 }, | 436 }, |
413 unsubscribeOnError: true); | 437 unsubscribeOnError: true); |
414 return future; | 438 return future; |
415 } | 439 } |
416 | 440 |
417 /** | |
418 * Finds the least element in the stream. | |
419 * | |
420 * If the stream is empty, the result is [:null:]. | |
421 * Otherwise the result is a value from the stream that is not greater | |
422 * than any other value from the stream (according to [compare], which must | |
423 * be a [Comparator]). | |
424 * | |
425 * If [compare] is omitted, it defaults to [Comparable.compare]. | |
426 * | |
427 * *Deprecated*. Use [reduce] with a binary min method if needed. | |
428 */ | |
429 Future<T> min([int compare(T a, T b)]) { | |
430 if (compare == null) { | |
431 var defaultCompare = Comparable.compare; | |
432 compare = defaultCompare; | |
433 } | |
434 _FutureImpl<T> future = new _FutureImpl<T>(); | |
435 StreamSubscription subscription; | |
436 T min = null; | |
437 subscription = this.listen( | |
438 // TODO(ahe): Restore type when feature is implemented in dart2js | |
439 // checked mode. http://dartbug.com/7733 | |
440 (/*T*/ value) { | |
441 min = value; | |
442 subscription.onData((T value) { | |
443 _runUserCode( | |
444 () => compare(min, value) > 0, | |
445 (bool foundSmaller) { | |
446 if (foundSmaller) { | |
447 min = value; | |
448 } | |
449 }, | |
450 _cancelAndError(subscription, future) | |
451 ); | |
452 }); | |
453 }, | |
454 onError: future._setError, | |
455 onDone: () { | |
456 future._setValue(min); | |
457 }, | |
458 unsubscribeOnError: true | |
459 ); | |
460 return future; | |
461 } | |
462 | |
463 /** | |
464 * Finds the largest element in the stream. | |
465 * | |
466 * If the stream is empty, the result is [:null:]. | |
467 * Otherwise the result is an value from the stream that is not smaller | |
468 * than any other value from the stream (according to [compare], which must | |
469 * be a [Comparator]). | |
470 * | |
471 * If [compare] is omitted, it defaults to [Comparable.compare]. | |
472 * | |
473 * *Deprecated*. Use [reduce] with a binary max method if needed. | |
474 */ | |
475 Future<T> max([int compare(T a, T b)]) { | |
476 if (compare == null) { | |
477 var defaultCompare = Comparable.compare; | |
478 compare = defaultCompare; | |
479 } | |
480 _FutureImpl<T> future = new _FutureImpl<T>(); | |
481 StreamSubscription subscription; | |
482 T max = null; | |
483 subscription = this.listen( | |
484 // TODO(ahe): Restore type when feature is implemented in dart2js | |
485 // checked mode. http://dartbug.com/7733 | |
486 (/*T*/ value) { | |
487 max = value; | |
488 subscription.onData((T value) { | |
489 _runUserCode( | |
490 () => compare(max, value) < 0, | |
491 (bool foundGreater) { | |
492 if (foundGreater) { | |
493 max = value; | |
494 } | |
495 }, | |
496 _cancelAndError(subscription, future) | |
497 ); | |
498 }); | |
499 }, | |
500 onError: future._setError, | |
501 onDone: () { | |
502 future._setValue(max); | |
503 }, | |
504 unsubscribeOnError: true | |
505 ); | |
506 return future; | |
507 } | |
508 | |
509 /** Reports whether this stream contains any elements. */ | 441 /** Reports whether this stream contains any elements. */ |
510 Future<bool> get isEmpty { | 442 Future<bool> get isEmpty { |
511 _FutureImpl<bool> future = new _FutureImpl<bool>(); | 443 _FutureImpl<bool> future = new _FutureImpl<bool>(); |
512 StreamSubscription subscription; | 444 StreamSubscription subscription; |
513 subscription = this.listen( | 445 subscription = this.listen( |
514 (_) { | 446 (_) { |
515 subscription.cancel(); | 447 subscription.cancel(); |
516 future._setValue(false); | 448 future._setValue(false); |
517 }, | 449 }, |
518 onError: future._setError, | 450 onError: future._setError, |
(...skipping 708 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1227 | 1159 |
1228 /* TODO(8997): Implement EventSink instead, */ | 1160 /* TODO(8997): Implement EventSink instead, */ |
1229 class _EventOutputSinkWrapper<T> extends StreamSink<T> { | 1161 class _EventOutputSinkWrapper<T> extends StreamSink<T> { |
1230 _EventOutputSink _sink; | 1162 _EventOutputSink _sink; |
1231 _EventOutputSinkWrapper(this._sink); | 1163 _EventOutputSinkWrapper(this._sink); |
1232 | 1164 |
1233 void add(T data) { _sink._sendData(data); } | 1165 void add(T data) { _sink._sendData(data); } |
1234 void addError(AsyncError error) { _sink._sendError(error); } | 1166 void addError(AsyncError error) { _sink._sendError(error); } |
1235 void close() { _sink._sendDone(); } | 1167 void close() { _sink._sendDone(); } |
1236 } | 1168 } |
OLD | NEW |