OLD | NEW |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
8 // Core Stream types | 8 // Core Stream types |
9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
10 | 10 |
(...skipping 235 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
246 * Chains this stream as the input of the provided [StreamTransformer]. | 246 * Chains this stream as the input of the provided [StreamTransformer]. |
247 * | 247 * |
248 * Returns the result of [:streamTransformer.bind:] itself. | 248 * Returns the result of [:streamTransformer.bind:] itself. |
249 */ | 249 */ |
250 Stream transform(StreamTransformer<T, dynamic> streamTransformer) { | 250 Stream transform(StreamTransformer<T, dynamic> streamTransformer) { |
251 return streamTransformer.bind(this); | 251 return streamTransformer.bind(this); |
252 } | 252 } |
253 | 253 |
254 /** | 254 /** |
255 * Reduces a sequence of values by repeatedly applying [combine]. | 255 * Reduces a sequence of values by repeatedly applying [combine]. |
256 * | |
257 * *WARNING UPCOMING API-CHANGE*: This method will be changed so that | |
258 * it doesn't take an initial value. Use [fold] instead. | |
259 */ | 256 */ |
260 Future reduce(var initialValue, combine(var previous, T element)) { | 257 Future<T> reduce(T combine(T previous, T element)) { |
261 return fold(initialValue, combine); | 258 _FutureImpl<T> result = new _FutureImpl<T>(); |
| 259 bool seenFirst = false; |
| 260 T value; |
| 261 StreamSubscription subscription = this.listen( |
| 262 // TODO(ahe): Restore type when feature is implemented in dart2js |
| 263 // checked mode. http://dartbug.com/7733 |
| 264 (/* T */ element) { |
| 265 if (seenFirst) { |
| 266 _runUserCode(() => combine(value, element), |
| 267 (T newValue) { value = newValue; }, |
| 268 _cancelAndError(subscription, result)); |
| 269 } else { |
| 270 value = element; |
| 271 seenFirst = true; |
| 272 } |
| 273 }, |
| 274 onError: result._setError, |
| 275 onDone: () { |
| 276 if (!seenFirst) { |
| 277 result._setError(new StateError("No elements")); |
| 278 } else { |
| 279 result._setValue(value); |
| 280 } |
| 281 }, |
| 282 unsubscribeOnError: true; |
| 283 ); |
| 284 return result; |
262 } | 285 } |
263 | 286 |
264 /** Reduces a sequence of values by repeatedly applying [combine]. */ | 287 /** Reduces a sequence of values by repeatedly applying [combine]. */ |
265 Future fold(var initialValue, combine(var previous, T element)) { | 288 Future fold(var initialValue, combine(var previous, T element)) { |
266 _FutureImpl result = new _FutureImpl(); | 289 _FutureImpl result = new _FutureImpl(); |
267 var value = initialValue; | 290 var value = initialValue; |
268 StreamSubscription subscription; | 291 StreamSubscription subscription; |
269 subscription = this.listen( | 292 subscription = this.listen( |
270 // TODO(ahe): Restore type when feature is implemented in dart2js | 293 // TODO(ahe): Restore type when feature is implemented in dart2js |
271 // checked mode. http://dartbug.com/7733 | 294 // checked mode. http://dartbug.com/7733 |
272 (/*T*/ element) { | 295 (/*T*/ element) { |
273 _runUserCode( | 296 _runUserCode( |
274 () => combine(value, element), | 297 () => combine(value, element), |
275 (result) { value = result; }, | 298 (newValue) { value = newValue; }, |
276 _cancelAndError(subscription, result) | 299 _cancelAndError(subscription, result) |
277 ); | 300 ); |
278 }, | 301 }, |
279 onError: (AsyncError e) { | 302 onError: (AsyncError e) { |
280 result._setError(e); | 303 result._setError(e); |
281 }, | 304 }, |
282 onDone: () { | 305 onDone: () { |
283 result._setValue(value); | 306 result._setValue(value); |
284 }, | 307 }, |
285 unsubscribeOnError: true); | 308 unsubscribeOnError: true); |
(...skipping 121 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
407 this.listen( | 430 this.listen( |
408 (_) { count++; }, | 431 (_) { count++; }, |
409 onError: future._setError, | 432 onError: future._setError, |
410 onDone: () { | 433 onDone: () { |
411 future._setValue(count); | 434 future._setValue(count); |
412 }, | 435 }, |
413 unsubscribeOnError: true); | 436 unsubscribeOnError: true); |
414 return future; | 437 return future; |
415 } | 438 } |
416 | 439 |
417 /** | |
418 * Finds the least element in the stream. | |
419 * | |
420 * If the stream is empty, the result is [:null:]. | |
421 * Otherwise the result is a value from the stream that is not greater | |
422 * than any other value from the stream (according to [compare], which must | |
423 * be a [Comparator]). | |
424 * | |
425 * If [compare] is omitted, it defaults to [Comparable.compare]. | |
426 * | |
427 * *Deprecated*. Use [reduce] with a binary min method if needed. | |
428 */ | |
429 Future<T> min([int compare(T a, T b)]) { | |
430 if (compare == null) { | |
431 var defaultCompare = Comparable.compare; | |
432 compare = defaultCompare; | |
433 } | |
434 _FutureImpl<T> future = new _FutureImpl<T>(); | |
435 StreamSubscription subscription; | |
436 T min = null; | |
437 subscription = this.listen( | |
438 // TODO(ahe): Restore type when feature is implemented in dart2js | |
439 // checked mode. http://dartbug.com/7733 | |
440 (/*T*/ value) { | |
441 min = value; | |
442 subscription.onData((T value) { | |
443 _runUserCode( | |
444 () => compare(min, value) > 0, | |
445 (bool foundSmaller) { | |
446 if (foundSmaller) { | |
447 min = value; | |
448 } | |
449 }, | |
450 _cancelAndError(subscription, future) | |
451 ); | |
452 }); | |
453 }, | |
454 onError: future._setError, | |
455 onDone: () { | |
456 future._setValue(min); | |
457 }, | |
458 unsubscribeOnError: true | |
459 ); | |
460 return future; | |
461 } | |
462 | |
463 /** | |
464 * Finds the largest element in the stream. | |
465 * | |
466 * If the stream is empty, the result is [:null:]. | |
467 * Otherwise the result is an value from the stream that is not smaller | |
468 * than any other value from the stream (according to [compare], which must | |
469 * be a [Comparator]). | |
470 * | |
471 * If [compare] is omitted, it defaults to [Comparable.compare]. | |
472 * | |
473 * *Deprecated*. Use [reduce] with a binary max method if needed. | |
474 */ | |
475 Future<T> max([int compare(T a, T b)]) { | |
476 if (compare == null) { | |
477 var defaultCompare = Comparable.compare; | |
478 compare = defaultCompare; | |
479 } | |
480 _FutureImpl<T> future = new _FutureImpl<T>(); | |
481 StreamSubscription subscription; | |
482 T max = null; | |
483 subscription = this.listen( | |
484 // TODO(ahe): Restore type when feature is implemented in dart2js | |
485 // checked mode. http://dartbug.com/7733 | |
486 (/*T*/ value) { | |
487 max = value; | |
488 subscription.onData((T value) { | |
489 _runUserCode( | |
490 () => compare(max, value) < 0, | |
491 (bool foundGreater) { | |
492 if (foundGreater) { | |
493 max = value; | |
494 } | |
495 }, | |
496 _cancelAndError(subscription, future) | |
497 ); | |
498 }); | |
499 }, | |
500 onError: future._setError, | |
501 onDone: () { | |
502 future._setValue(max); | |
503 }, | |
504 unsubscribeOnError: true | |
505 ); | |
506 return future; | |
507 } | |
508 | |
509 /** Reports whether this stream contains any elements. */ | 440 /** Reports whether this stream contains any elements. */ |
510 Future<bool> get isEmpty { | 441 Future<bool> get isEmpty { |
511 _FutureImpl<bool> future = new _FutureImpl<bool>(); | 442 _FutureImpl<bool> future = new _FutureImpl<bool>(); |
512 StreamSubscription subscription; | 443 StreamSubscription subscription; |
513 subscription = this.listen( | 444 subscription = this.listen( |
514 (_) { | 445 (_) { |
515 subscription.cancel(); | 446 subscription.cancel(); |
516 future._setValue(false); | 447 future._setValue(false); |
517 }, | 448 }, |
518 onError: future._setError, | 449 onError: future._setError, |
(...skipping 708 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1227 | 1158 |
1228 /* TODO(8997): Implement EventSink instead, */ | 1159 /* TODO(8997): Implement EventSink instead, */ |
1229 class _EventOutputSinkWrapper<T> extends StreamSink<T> { | 1160 class _EventOutputSinkWrapper<T> extends StreamSink<T> { |
1230 _EventOutputSink _sink; | 1161 _EventOutputSink _sink; |
1231 _EventOutputSinkWrapper(this._sink); | 1162 _EventOutputSinkWrapper(this._sink); |
1232 | 1163 |
1233 void add(T data) { _sink._sendData(data); } | 1164 void add(T data) { _sink._sendData(data); } |
1234 void addError(AsyncError error) { _sink._sendError(error); } | 1165 void addError(AsyncError error) { _sink._sendError(error); } |
1235 void close() { _sink._sendDone(); } | 1166 void close() { _sink._sendDone(); } |
1236 } | 1167 } |
OLD | NEW |