OLD | NEW |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
8 // Core Stream types | 8 // Core Stream types |
9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
10 | 10 |
(...skipping 366 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
377 * If [convert] throws, the returned stream reports the exception as an error | 377 * If [convert] throws, the returned stream reports the exception as an error |
378 * event instead. | 378 * event instead. |
379 * | 379 * |
380 * Error and done events are passed through unchanged to the returned stream. | 380 * Error and done events are passed through unchanged to the returned stream. |
381 * | 381 * |
382 * The returned stream is a broadcast stream if this stream is. | 382 * The returned stream is a broadcast stream if this stream is. |
383 * The [convert] function is called once per data event per listener. | 383 * The [convert] function is called once per data event per listener. |
384 * If a broadcast stream is listened to more than once, each subscription | 384 * If a broadcast stream is listened to more than once, each subscription |
385 * will individually call [convert] on each data event. | 385 * will individually call [convert] on each data event. |
386 */ | 386 */ |
387 Stream/*<S>*/ map/*<S>*/(/*=S*/ convert(T event)) { | 387 Stream<S> map<S>(S convert(T event)) { |
388 return new _MapStream<T, dynamic/*=S*/>(this, convert); | 388 return new _MapStream<T, S>(this, convert); |
389 } | 389 } |
390 | 390 |
391 /** | 391 /** |
392 * Creates a new stream with each data event of this stream asynchronously | 392 * Creates a new stream with each data event of this stream asynchronously |
393 * mapped to a new event. | 393 * mapped to a new event. |
394 * | 394 * |
395 * This acts like [map], except that [convert] may return a [Future], | 395 * This acts like [map], except that [convert] may return a [Future], |
396 * and in that case, the stream waits for that future to complete before | 396 * and in that case, the stream waits for that future to complete before |
397 * continuing with its result. | 397 * continuing with its result. |
398 * | 398 * |
399 * The returned stream is a broadcast stream if this stream is. | 399 * The returned stream is a broadcast stream if this stream is. |
400 */ | 400 */ |
401 Stream/*<E>*/ asyncMap/*<E>*/(convert(T event)) { | 401 Stream<E> asyncMap<E>(convert(T event)) { |
402 StreamController/*<E>*/ controller; | 402 StreamController<E> controller; |
403 StreamSubscription/*<T>*/ subscription; | 403 StreamSubscription<T> subscription; |
404 | 404 |
405 void onListen() { | 405 void onListen() { |
406 final add = controller.add; | 406 final add = controller.add; |
407 assert(controller is _StreamController || | 407 assert(controller is _StreamController || |
408 controller is _BroadcastStreamController); | 408 controller is _BroadcastStreamController); |
409 final _EventSink/*<E>*/ eventSink = | 409 final _EventSink<E> eventSink = |
410 controller as Object /*=_EventSink<E>*/; | 410 controller as Object /*=_EventSink<E>*/; |
411 final addError = eventSink._addError; | 411 final addError = eventSink._addError; |
412 subscription = this.listen( | 412 subscription = this.listen( |
413 (T event) { | 413 (T event) { |
414 dynamic newValue; | 414 dynamic newValue; |
415 try { | 415 try { |
416 newValue = convert(event); | 416 newValue = convert(event); |
417 } catch (e, s) { | 417 } catch (e, s) { |
418 controller.addError(e, s); | 418 controller.addError(e, s); |
419 return; | 419 return; |
420 } | 420 } |
421 if (newValue is Future) { | 421 if (newValue is Future) { |
422 subscription.pause(); | 422 subscription.pause(); |
423 newValue.then(add, onError: addError) | 423 newValue.then(add, onError: addError) |
424 .whenComplete(subscription.resume); | 424 .whenComplete(subscription.resume); |
425 } else { | 425 } else { |
426 controller.add(newValue as Object/*=E*/); | 426 controller.add(newValue as Object/*=E*/); |
427 } | 427 } |
428 }, | 428 }, |
429 onError: addError, | 429 onError: addError, |
430 onDone: controller.close | 430 onDone: controller.close |
431 ); | 431 ); |
432 } | 432 } |
433 | 433 |
434 if (this.isBroadcast) { | 434 if (this.isBroadcast) { |
435 controller = new StreamController/*<E>*/.broadcast( | 435 controller = new StreamController<E>.broadcast( |
436 onListen: onListen, | 436 onListen: onListen, |
437 onCancel: () { subscription.cancel(); }, | 437 onCancel: () { subscription.cancel(); }, |
438 sync: true | 438 sync: true |
439 ); | 439 ); |
440 } else { | 440 } else { |
441 controller = new StreamController/*<E>*/( | 441 controller = new StreamController<E>( |
442 onListen: onListen, | 442 onListen: onListen, |
443 onPause: () { subscription.pause(); }, | 443 onPause: () { subscription.pause(); }, |
444 onResume: () { subscription.resume(); }, | 444 onResume: () { subscription.resume(); }, |
445 onCancel: () => subscription.cancel(), | 445 onCancel: () => subscription.cancel(), |
446 sync: true | 446 sync: true |
447 ); | 447 ); |
448 } | 448 } |
449 return controller.stream; | 449 return controller.stream; |
450 } | 450 } |
451 | 451 |
452 /** | 452 /** |
453 * Creates a new stream with the events of a stream per original event. | 453 * Creates a new stream with the events of a stream per original event. |
454 * | 454 * |
455 * This acts like [expand], except that [convert] returns a [Stream] | 455 * This acts like [expand], except that [convert] returns a [Stream] |
456 * instead of an [Iterable]. | 456 * instead of an [Iterable]. |
457 * The events of the returned stream becomes the events of the returned | 457 * The events of the returned stream becomes the events of the returned |
458 * stream, in the order they are produced. | 458 * stream, in the order they are produced. |
459 * | 459 * |
460 * If [convert] returns `null`, no value is put on the output stream, | 460 * If [convert] returns `null`, no value is put on the output stream, |
461 * just as if it returned an empty stream. | 461 * just as if it returned an empty stream. |
462 * | 462 * |
463 * The returned stream is a broadcast stream if this stream is. | 463 * The returned stream is a broadcast stream if this stream is. |
464 */ | 464 */ |
465 Stream/*<E>*/ asyncExpand/*<E>*/(Stream/*<E>*/ convert(T event)) { | 465 Stream<E> asyncExpand<E>(Stream<E> convert(T event)) { |
466 StreamController/*<E>*/ controller; | 466 StreamController<E> controller; |
467 StreamSubscription<T> subscription; | 467 StreamSubscription<T> subscription; |
468 void onListen() { | 468 void onListen() { |
469 assert(controller is _StreamController || | 469 assert(controller is _StreamController || |
470 controller is _BroadcastStreamController); | 470 controller is _BroadcastStreamController); |
471 final _EventSink/*<E>*/ eventSink = | 471 final _EventSink<E> eventSink = |
472 controller as Object /*=_EventSink<E>*/; | 472 controller as Object /*=_EventSink<E>*/; |
473 subscription = this.listen( | 473 subscription = this.listen( |
474 (T event) { | 474 (T event) { |
475 Stream/*<E>*/ newStream; | 475 Stream<E> newStream; |
476 try { | 476 try { |
477 newStream = convert(event); | 477 newStream = convert(event); |
478 } catch (e, s) { | 478 } catch (e, s) { |
479 controller.addError(e, s); | 479 controller.addError(e, s); |
480 return; | 480 return; |
481 } | 481 } |
482 if (newStream != null) { | 482 if (newStream != null) { |
483 subscription.pause(); | 483 subscription.pause(); |
484 controller.addStream(newStream) | 484 controller.addStream(newStream) |
485 .whenComplete(subscription.resume); | 485 .whenComplete(subscription.resume); |
486 } | 486 } |
487 }, | 487 }, |
488 onError: eventSink._addError, // Avoid Zone error replacement. | 488 onError: eventSink._addError, // Avoid Zone error replacement. |
489 onDone: controller.close | 489 onDone: controller.close |
490 ); | 490 ); |
491 } | 491 } |
492 if (this.isBroadcast) { | 492 if (this.isBroadcast) { |
493 controller = new StreamController/*<E>*/.broadcast( | 493 controller = new StreamController<E>.broadcast( |
494 onListen: onListen, | 494 onListen: onListen, |
495 onCancel: () { subscription.cancel(); }, | 495 onCancel: () { subscription.cancel(); }, |
496 sync: true | 496 sync: true |
497 ); | 497 ); |
498 } else { | 498 } else { |
499 controller = new StreamController/*<E>*/( | 499 controller = new StreamController<E>( |
500 onListen: onListen, | 500 onListen: onListen, |
501 onPause: () { subscription.pause(); }, | 501 onPause: () { subscription.pause(); }, |
502 onResume: () { subscription.resume(); }, | 502 onResume: () { subscription.resume(); }, |
503 onCancel: () => subscription.cancel(), | 503 onCancel: () => subscription.cancel(), |
504 sync: true | 504 sync: true |
505 ); | 505 ); |
506 } | 506 } |
507 return controller.stream; | 507 return controller.stream; |
508 } | 508 } |
509 | 509 |
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
544 * into zero or more events. | 544 * into zero or more events. |
545 * | 545 * |
546 * Each incoming event is converted to an [Iterable] of new events, | 546 * Each incoming event is converted to an [Iterable] of new events, |
547 * and each of these new events are then sent by the returned stream | 547 * and each of these new events are then sent by the returned stream |
548 * in order. | 548 * in order. |
549 * | 549 * |
550 * The returned stream is a broadcast stream if this stream is. | 550 * The returned stream is a broadcast stream if this stream is. |
551 * If a broadcast stream is listened to more than once, each subscription | 551 * If a broadcast stream is listened to more than once, each subscription |
552 * will individually call `convert` and expand the events. | 552 * will individually call `convert` and expand the events. |
553 */ | 553 */ |
554 Stream/*<S>*/ expand/*<S>*/(Iterable/*<S>*/ convert(T value)) { | 554 Stream<S> expand<S>(Iterable<S> convert(T value)) { |
555 return new _ExpandStream<T, dynamic/*=S*/>(this, convert); | 555 return new _ExpandStream<T, S>(this, convert); |
556 } | 556 } |
557 | 557 |
558 /** | 558 /** |
559 * Pipe the events of this stream into [streamConsumer]. | 559 * Pipe the events of this stream into [streamConsumer]. |
560 * | 560 * |
561 * The events of this stream are added to `streamConsumer` using | 561 * The events of this stream are added to `streamConsumer` using |
562 * [StreamConsumer.addStream]. | 562 * [StreamConsumer.addStream]. |
563 * The `streamConsumer` is closed when this stream has been successfully added | 563 * The `streamConsumer` is closed when this stream has been successfully added |
564 * to it - when the future returned by `addStream` completes without an error. | 564 * to it - when the future returned by `addStream` completes without an error. |
565 * | 565 * |
(...skipping 12 matching lines...) Expand all Loading... |
578 } | 578 } |
579 | 579 |
580 /** | 580 /** |
581 * Chains this stream as the input of the provided [StreamTransformer]. | 581 * Chains this stream as the input of the provided [StreamTransformer]. |
582 * | 582 * |
583 * Returns the result of [:streamTransformer.bind:] itself. | 583 * Returns the result of [:streamTransformer.bind:] itself. |
584 * | 584 * |
585 * The `streamTransformer` can decide whether it wants to return a | 585 * The `streamTransformer` can decide whether it wants to return a |
586 * broadcast stream or not. | 586 * broadcast stream or not. |
587 */ | 587 */ |
588 Stream/*<S>*/ transform/*<S>*/( | 588 Stream<S> transform<S>( |
589 StreamTransformer<T, dynamic/*=S*/ > streamTransformer) { | 589 StreamTransformer<T, S > streamTransformer) { |
590 return streamTransformer.bind(this); | 590 return streamTransformer.bind(this); |
591 } | 591 } |
592 | 592 |
593 /** | 593 /** |
594 * Reduces a sequence of values by repeatedly applying [combine]. | 594 * Reduces a sequence of values by repeatedly applying [combine]. |
595 */ | 595 */ |
596 Future<T> reduce(T combine(T previous, T element)) { | 596 Future<T> reduce(T combine(T previous, T element)) { |
597 _Future<T> result = new _Future<T>(); | 597 _Future<T> result = new _Future<T>(); |
598 bool seenFirst = false; | 598 bool seenFirst = false; |
599 T value; | 599 T value; |
(...skipping 20 matching lines...) Expand all Loading... |
620 } else { | 620 } else { |
621 result._complete(value); | 621 result._complete(value); |
622 } | 622 } |
623 }, | 623 }, |
624 cancelOnError: true | 624 cancelOnError: true |
625 ); | 625 ); |
626 return result; | 626 return result; |
627 } | 627 } |
628 | 628 |
629 /** Reduces a sequence of values by repeatedly applying [combine]. */ | 629 /** Reduces a sequence of values by repeatedly applying [combine]. */ |
630 Future/*<S>*/ fold/*<S>*/(var/*=S*/ initialValue, | 630 Future<S> fold<S>(S initialValue, |
631 /*=S*/ combine(var/*=S*/ previous, T element)) { | 631 S combine(S previous, T element)) { |
632 | 632 |
633 _Future/*<S>*/ result = new _Future/*<S>*/(); | 633 _Future<S> result = new _Future<S>(); |
634 var/*=S*/ value = initialValue; | 634 S value = initialValue; |
635 StreamSubscription subscription; | 635 StreamSubscription subscription; |
636 subscription = this.listen( | 636 subscription = this.listen( |
637 (T element) { | 637 (T element) { |
638 _runUserCode( | 638 _runUserCode( |
639 () => combine(value, element), | 639 () => combine(value, element), |
640 (/*=S*/ newValue) { value = newValue; }, | 640 (S newValue) { value = newValue; }, |
641 _cancelAndErrorClosure(subscription, result) | 641 _cancelAndErrorClosure(subscription, result) |
642 ); | 642 ); |
643 }, | 643 }, |
644 onError: (e, st) { | 644 onError: (e, st) { |
645 result._completeError(e, st); | 645 result._completeError(e, st); |
646 }, | 646 }, |
647 onDone: () { | 647 onDone: () { |
648 result._complete(value); | 648 result._complete(value); |
649 }, | 649 }, |
650 cancelOnError: true); | 650 cancelOnError: true); |
(...skipping 241 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
892 * Discards all data on the stream, but signals when it's done or an error | 892 * Discards all data on the stream, but signals when it's done or an error |
893 * occured. | 893 * occured. |
894 * | 894 * |
895 * When subscribing using [drain], cancelOnError will be true. This means | 895 * When subscribing using [drain], cancelOnError will be true. This means |
896 * that the future will complete with the first error on the stream and then | 896 * that the future will complete with the first error on the stream and then |
897 * cancel the subscription. | 897 * cancel the subscription. |
898 * | 898 * |
899 * In case of a `done` event the future completes with the given | 899 * In case of a `done` event the future completes with the given |
900 * [futureValue]. | 900 * [futureValue]. |
901 */ | 901 */ |
902 Future/*<E>*/ drain/*<E>*/([/*=E*/ futureValue]) | 902 Future<E> drain<E>([E futureValue]) |
903 => listen(null, cancelOnError: true).asFuture/*<E>*/(futureValue); | 903 => listen(null, cancelOnError: true).asFuture<E>(futureValue); |
904 | 904 |
905 /** | 905 /** |
906 * Provides at most the first [count] data events of this stream. | 906 * Provides at most the first [count] data events of this stream. |
907 * | 907 * |
908 * Forwards all events of this stream to the returned stream | 908 * Forwards all events of this stream to the returned stream |
909 * until [count] data events have been forwarded or this stream ends, | 909 * until [count] data events have been forwarded or this stream ends, |
910 * then ends the returned stream with a done event. | 910 * then ends the returned stream with a done event. |
911 * | 911 * |
912 * If this stream produces fewer than [count] data events before it's done, | 912 * If this stream produces fewer than [count] data events before it's done, |
913 * so will the returned stream. | 913 * so will the returned stream. |
(...skipping 430 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1344 zone = Zone.current; | 1344 zone = Zone.current; |
1345 if (onTimeout == null) { | 1345 if (onTimeout == null) { |
1346 timeout = () { | 1346 timeout = () { |
1347 controller.addError(new TimeoutException("No stream event", | 1347 controller.addError(new TimeoutException("No stream event", |
1348 timeLimit), null); | 1348 timeLimit), null); |
1349 }; | 1349 }; |
1350 } else { | 1350 } else { |
1351 // TODO(floitsch): the return type should be 'void', and the type | 1351 // TODO(floitsch): the return type should be 'void', and the type |
1352 // should be inferred. | 1352 // should be inferred. |
1353 var registeredOnTimeout = | 1353 var registeredOnTimeout = |
1354 zone.registerUnaryCallback/*<dynamic, EventSink<T>>*/(onTimeout); | 1354 zone.registerUnaryCallback<dynamic, EventSink<T>>(onTimeout); |
1355 _ControllerEventSinkWrapper wrapper = | 1355 _ControllerEventSinkWrapper wrapper = |
1356 new _ControllerEventSinkWrapper(null); | 1356 new _ControllerEventSinkWrapper(null); |
1357 timeout = () { | 1357 timeout = () { |
1358 wrapper._sink = controller; // Only valid during call. | 1358 wrapper._sink = controller; // Only valid during call. |
1359 zone.runUnaryGuarded(registeredOnTimeout, wrapper); | 1359 zone.runUnaryGuarded(registeredOnTimeout, wrapper); |
1360 wrapper._sink = null; | 1360 wrapper._sink = null; |
1361 }; | 1361 }; |
1362 } | 1362 } |
1363 | 1363 |
1364 subscription = this.listen(onData, onError: onError, onDone: onDone); | 1364 subscription = this.listen(onData, onError: onError, onDone: onDone); |
(...skipping 126 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1491 * | 1491 * |
1492 * This method *overwrites* the existing [onDone] and [onError] callbacks | 1492 * This method *overwrites* the existing [onDone] and [onError] callbacks |
1493 * with new ones that complete the returned future. | 1493 * with new ones that complete the returned future. |
1494 * | 1494 * |
1495 * In case of an error the subscription will automatically cancel (even | 1495 * In case of an error the subscription will automatically cancel (even |
1496 * when it was listening with `cancelOnError` set to `false`). | 1496 * when it was listening with `cancelOnError` set to `false`). |
1497 * | 1497 * |
1498 * In case of a `done` event the future completes with the given | 1498 * In case of a `done` event the future completes with the given |
1499 * [futureValue]. | 1499 * [futureValue]. |
1500 */ | 1500 */ |
1501 Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]); | 1501 Future<E> asFuture<E>([E futureValue]); |
1502 } | 1502 } |
1503 | 1503 |
1504 | 1504 |
1505 /** | 1505 /** |
1506 * An interface that abstracts creation or handling of [Stream] events. | 1506 * An interface that abstracts creation or handling of [Stream] events. |
1507 */ | 1507 */ |
1508 abstract class EventSink<T> implements Sink<T> { | 1508 abstract class EventSink<T> implements Sink<T> { |
1509 /** Send a data event to a stream. */ | 1509 /** Send a data event to a stream. */ |
1510 void add(T event); | 1510 void add(T event); |
1511 | 1511 |
(...skipping 315 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1827 class _ControllerEventSinkWrapper<T> implements EventSink<T> { | 1827 class _ControllerEventSinkWrapper<T> implements EventSink<T> { |
1828 EventSink _sink; | 1828 EventSink _sink; |
1829 _ControllerEventSinkWrapper(this._sink); | 1829 _ControllerEventSinkWrapper(this._sink); |
1830 | 1830 |
1831 void add(T data) { _sink.add(data); } | 1831 void add(T data) { _sink.add(data); } |
1832 void addError(error, [StackTrace stackTrace]) { | 1832 void addError(error, [StackTrace stackTrace]) { |
1833 _sink.addError(error, stackTrace); | 1833 _sink.addError(error, stackTrace); |
1834 } | 1834 } |
1835 void close() { _sink.close(); } | 1835 void close() { _sink.close(); } |
1836 } | 1836 } |
OLD | NEW |