OLD | NEW |
---|---|
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
8 // Core Stream types | 8 // Core Stream types |
9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
10 | 10 |
11 typedef void _TimerCallback(); | 11 typedef void _TimerCallback(); |
12 | 12 |
13 /** | 13 /** |
14 * A source of asynchronous data events. | 14 * A source of asynchronous data events. |
15 * | 15 * |
16 * A Stream provides a way to receive a sequence of events. | 16 * A Stream provides a way to receive a sequence of events. |
17 * Each event is either a data event or an error event, | 17 * Each event is either a data event, also called an *element* of the stream, |
18 * representing the result of a single computation. | 18 * or an error event, which is a notification that something has failed. |
19 * When the events provided by a Stream have all been sent, | 19 * When a stream has emitted all its event, |
20 * a single "done" event will mark the end. | 20 * a single "done" event will notify the listener that the end has been reached. |
21 * | 21 * |
22 * You can [listen] on a stream to make it start generating events, | 22 * You [listen] on a stream to make it start generating events, |
23 * and to set up listeners that receive the events. | 23 * and to set up listeners that receive the events. |
24 * When you listen, you receive a [StreamSubscription] object | 24 * When you listen, you receive a [StreamSubscription] object |
25 * which is the active object providing the events, | 25 * which is the active object providing the events, |
26 * and which can be used to stop listening again, | 26 * and which can be used to stop listening again, |
27 * or to temporarily pause events from the subscription. | 27 * or to temporarily pause events from the subscription. |
28 * | 28 * |
29 * There are two kinds of streams: "Single-subscription" streams and | 29 * There are two kinds of streams: "Single-subscription" streams and |
30 * "broadcast" streams. | 30 * "broadcast" streams. |
31 * | 31 * |
32 * *A single-subscription stream* allows only a single listener during the whole | 32 * *A single-subscription stream* allows only a single listener during the whole |
(...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
111 controller._closeUnchecked(); | 111 controller._closeUnchecked(); |
112 }); | 112 }); |
113 return controller.stream; | 113 return controller.stream; |
114 } | 114 } |
115 | 115 |
116 /** | 116 /** |
117 * Create a stream from a group of futures. | 117 * Create a stream from a group of futures. |
118 * | 118 * |
119 * The stream reports the results of the futures on the stream in the order | 119 * The stream reports the results of the futures on the stream in the order |
120 * in which the futures complete. | 120 * in which the futures complete. |
121 * Each future provides either a data event or an error event, | |
122 * depending on how the future completes. | |
121 * | 123 * |
122 * If some futures have completed before calling `Stream.fromFutures`, | 124 * If some futures have already completed when `Stream.fromFutures` is called, |
123 * their result will be output on the created stream in some unspecified | 125 * their results will be emitted in some unspecified order. |
124 * order. | |
125 * | 126 * |
126 * When all futures have completed, the stream is closed. | 127 * When all futures have completed, the stream is closed. |
127 * | 128 * |
128 * If no future is passed, the stream closes as soon as possible. | 129 * If [futures] is empty, the stream closes as soon as possible. |
129 */ | 130 */ |
130 factory Stream.fromFutures(Iterable<Future<T>> futures) { | 131 factory Stream.fromFutures(Iterable<Future<T>> futures) { |
131 _StreamController<T> controller = new StreamController<T>(sync: true); | 132 _StreamController<T> controller = new StreamController<T>(sync: true); |
132 int count = 0; | 133 int count = 0; |
134 // Declare these as variables holding closures instead of as | |
135 // function declarations. | |
136 // This avoids creating a new closure from the functions for each future. | |
133 var onValue = (T value) { | 137 var onValue = (T value) { |
134 if (!controller.isClosed) { | 138 if (!controller.isClosed) { |
135 controller._add(value); | 139 controller._add(value); |
136 if (--count == 0) controller._closeUnchecked(); | 140 if (--count == 0) controller._closeUnchecked(); |
137 } | 141 } |
138 }; | 142 }; |
139 var onError = (error, stack) { | 143 var onError = (error, stack) { |
140 if (!controller.isClosed) { | 144 if (!controller.isClosed) { |
141 controller._addError(error, stack); | 145 controller._addError(error, stack); |
142 if (--count == 0) controller._closeUnchecked(); | 146 if (--count == 0) controller._closeUnchecked(); |
(...skipping 192 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
335 * stack trace). | 339 * stack trace). |
336 * Otherwise it is called with just the error object. | 340 * Otherwise it is called with just the error object. |
337 * If [onError] is omitted, any errors on the stream are considered unhandled, | 341 * If [onError] is omitted, any errors on the stream are considered unhandled, |
338 * and will be passed to the current [Zone]'s error handler. | 342 * and will be passed to the current [Zone]'s error handler. |
339 * By default unhandled async errors are treated | 343 * By default unhandled async errors are treated |
340 * as if they were uncaught top-level errors. | 344 * as if they were uncaught top-level errors. |
341 * | 345 * |
342 * If this stream closes and sends a done event, the [onDone] handler is | 346 * If this stream closes and sends a done event, the [onDone] handler is |
343 * called. If [onDone] is `null`, nothing happens. | 347 * called. If [onDone] is `null`, nothing happens. |
344 * | 348 * |
345 * If [cancelOnError] is true, the subscription is automatically cancelled | 349 * If [cancelOnError] is true, the subscription is automatically canceled |
346 * when the first error event is delivered. The default is `false`. | 350 * when the first error event is delivered. The default is `false`. |
347 * | 351 * |
348 * While a subscription is paused, or when it has been cancelled, | 352 * While a subscription is paused, or when it has been canceled, |
349 * the subscription doesn't receive events and none of the | 353 * the subscription doesn't receive events and none of the |
350 * event handler functions are called. | 354 * event handler functions are called. |
351 */ | 355 */ |
352 StreamSubscription<T> listen(void onData(T event), | 356 StreamSubscription<T> listen(void onData(T event), |
353 {Function onError, void onDone(), bool cancelOnError}); | 357 {Function onError, void onDone(), bool cancelOnError}); |
354 | 358 |
355 /** | 359 /** |
356 * Creates a new stream from this stream that discards some data events. | 360 * Creates a new stream from this stream that discards some elements. |
357 * | 361 * |
358 * The new stream sends the same error and done events as this stream, | 362 * The new stream sends the same error and done events as this stream, |
359 * but it only sends the data events that satisfy the [test]. | 363 * but it only sends the data events that satisfy the [test]. |
360 * | 364 * |
365 * If the [test] function throws, the data event is dropped and the | |
366 * error is emitted on the returned stream instead. | |
367 * | |
361 * The returned stream is a broadcast stream if this stream is. | 368 * The returned stream is a broadcast stream if this stream is. |
362 * If a broadcast stream is listened to more than once, each subscription | 369 * If a broadcast stream is listened to more than once, each subscription |
363 * will individually perform the `test`. | 370 * will individually perform the `test`. |
364 */ | 371 */ |
365 Stream<T> where(bool test(T event)) { | 372 Stream<T> where(bool test(T event)) { |
366 return new _WhereStream<T>(this, test); | 373 return new _WhereStream<T>(this, test); |
367 } | 374 } |
368 | 375 |
369 /** | 376 /** |
377 * Transforms each element of this stream into a new stream event. | |
378 * | |
370 * Creates a new stream that converts each element of this stream | 379 * Creates a new stream that converts each element of this stream |
371 * to a new value using the [convert] function. | 380 * to a new value using the [convert] function, and emits the result. |
372 * | 381 * |
373 * For each data event, `o`, in this stream, the returned stream | 382 * For each data event, `o`, in this stream, the returned stream |
374 * provides a data event with the value `convert(o)`. | 383 * provides a data event with the value `convert(o)`. |
375 * If [convert] throws, the returned stream reports the exception as an error | 384 * If [convert] throws, the returned stream reports it as an error |
376 * event instead. | 385 * event instead. |
377 * | 386 * |
378 * Error and done events are passed through unchanged to the returned stream. | 387 * Error and done events are passed through unchanged to the returned stream. |
379 * | 388 * |
380 * The returned stream is a broadcast stream if this stream is. | 389 * The returned stream is a broadcast stream if this stream is. |
381 * The [convert] function is called once per data event per listener. | 390 * The [convert] function is called once per data event per listener. |
382 * If a broadcast stream is listened to more than once, each subscription | 391 * If a broadcast stream is listened to more than once, each subscription |
383 * will individually call [convert] on each data event. | 392 * will individually call [convert] on each data event. |
384 */ | 393 */ |
385 Stream<S> map<S>(S convert(T event)) { | 394 Stream<S> map<S>(S convert(T event)) { |
(...skipping 125 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
511 onResume: () { | 520 onResume: () { |
512 subscription.resume(); | 521 subscription.resume(); |
513 }, | 522 }, |
514 onCancel: () => subscription.cancel(), | 523 onCancel: () => subscription.cancel(), |
515 sync: true); | 524 sync: true); |
516 } | 525 } |
517 return controller.stream; | 526 return controller.stream; |
518 } | 527 } |
519 | 528 |
520 /** | 529 /** |
521 * Creates a new stream with the events of a stream per original event. | 530 * Transforms each element into a sequence of asynchronous events. |
522 * | 531 * |
523 * This acts like [expand], except that [convert] returns a [Stream] | 532 * Returns a new stream and for each event of this stream, do the following: |
524 * instead of an [Iterable]. | |
525 * The events of the returned stream becomes the events of the returned | |
526 * stream, in the order they are produced. | |
527 * | 533 * |
528 * If [convert] returns `null`, no value is put on the output stream, | 534 * * If the event is an error event or a done event, it is emitted directly |
529 * just as if it returned an empty stream. | 535 * by the returned stream. |
536 * * Otherwise it is an element. Then the [convert] function is called | |
537 * with the element as argument to produce a convert-stream for the element. | |
538 * * If that call throws, the error is emitted on the returned stream. | |
539 * * If the call returnes `null`, no further action is taken for the elements. | |
540 * * Otherwise, this stream is paused and convert-stream is listened to. | |
541 * Every data and error event of the convert-stream is emitted on the returned | |
542 * stream in the order it is produced. | |
543 * When the convert-stream ends, this stream is resumed. | |
530 * | 544 * |
531 * The returned stream is a broadcast stream if this stream is. | 545 * The returned stream is a broadcast stream if this stream is. |
532 */ | 546 */ |
533 Stream<E> asyncExpand<E>(Stream<E> convert(T event)) { | 547 Stream<E> asyncExpand<E>(Stream<E> convert(T event)) { |
534 StreamController<E> controller; | 548 StreamController<E> controller; |
535 StreamSubscription<T> subscription; | 549 StreamSubscription<T> subscription; |
536 void onListen() { | 550 void onListen() { |
537 assert(controller is _StreamController || | 551 assert(controller is _StreamController || |
538 controller is _BroadcastStreamController); | 552 controller is _BroadcastStreamController); |
539 final _EventSink<E> eventSink = controller as Object/*=_EventSink<E>*/; | 553 final _EventSink<E> eventSink = controller as Object/*=_EventSink<E>*/; |
(...skipping 48 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
588 * trace. The stack trace argument might be `null` if the stream itself | 602 * trace. The stack trace argument might be `null` if the stream itself |
589 * received an error without stack trace. | 603 * received an error without stack trace. |
590 * | 604 * |
591 * An asynchronous error `error` is matched by a test function if | 605 * An asynchronous error `error` is matched by a test function if |
592 *`test(error)` returns true. If [test] is omitted, every error is considered | 606 *`test(error)` returns true. If [test] is omitted, every error is considered |
593 * matching. | 607 * matching. |
594 * | 608 * |
595 * If the error is intercepted, the [onError] function can decide what to do | 609 * If the error is intercepted, the [onError] function can decide what to do |
596 * with it. It can throw if it wants to raise a new (or the same) error, | 610 * with it. It can throw if it wants to raise a new (or the same) error, |
597 * or simply return to make the stream forget the error. | 611 * or simply return to make the stream forget the error. |
612 * If the received `error` value is thrown again by the [onError] function, | |
613 * it acts like a `rethrow` and it is emitted along with its original | |
614 * stack trace, not the stack trace of the `throw` inside [onError]. | |
598 * | 615 * |
599 * If you need to transform an error into a data event, use the more generic | 616 * If you need to transform an error into a data event, use the more generic |
600 * [Stream.transform] to handle the event by writing a data event to | 617 * [Stream.transform] to handle the event by writing a data event to |
601 * the output sink. | 618 * the output sink. |
602 * | 619 * |
603 * The returned stream is a broadcast stream if this stream is. | 620 * The returned stream is a broadcast stream if this stream is. |
604 * If a broadcast stream is listened to more than once, each subscription | 621 * If a broadcast stream is listened to more than once, each subscription |
605 * will individually perform the `test` and handle the error. | 622 * will individually perform the `test` and handle the error. |
606 */ | 623 */ |
607 Stream<T> handleError(Function onError, {bool test(error)}) { | 624 Stream<T> handleError(Function onError, {bool test(error)}) { |
608 return new _HandleErrorStream<T>(this, onError, test); | 625 return new _HandleErrorStream<T>(this, onError, test); |
609 } | 626 } |
610 | 627 |
611 /** | 628 /** |
612 * Creates a new stream from this stream that converts each element | 629 * Transforms each element of this stream into a sequence of elements. |
613 * into zero or more events. | |
614 * | 630 * |
615 * Each incoming event is converted to an [Iterable] of new events, | 631 * Returns a new stream where each element of this stream is replaced |
616 * and each of these new events are then sent by the returned stream | 632 * by zero or more data events. |
617 * in order. | 633 * The event values are proveded as an [Iterable] by a call to [convert] |
floitsch
2017/05/30 17:52:47
provided
| |
634 * with the element as argument, and the elements of that iterable is | |
635 * emitted in iteration order. | |
636 * If calling [convert] throws, or if iteration of the returned values throws, | |
floitsch
2017/05/30 17:52:47
if the iteration
| |
637 * the error is emitted on the returned stream and iteration ends for that | |
638 * element of this stream. | |
639 * | |
640 * Error events and the done event of this stream are forwarded directly | |
641 * to the returned stream. | |
618 * | 642 * |
619 * The returned stream is a broadcast stream if this stream is. | 643 * The returned stream is a broadcast stream if this stream is. |
620 * If a broadcast stream is listened to more than once, each subscription | 644 * If a broadcast stream is listened to more than once, each subscription |
621 * will individually call `convert` and expand the events. | 645 * will individually call `convert` and expand the events. |
622 */ | 646 */ |
623 Stream<S> expand<S>(Iterable<S> convert(T value)) { | 647 Stream<S> expand<S>(Iterable<S> convert(T element)) { |
624 return new _ExpandStream<T, S>(this, convert); | 648 return new _ExpandStream<T, S>(this, convert); |
625 } | 649 } |
626 | 650 |
627 /** | 651 /** |
628 * Pipe the events of this stream into [streamConsumer]. | 652 * Pipes the events of this stream into [streamConsumer]. |
629 * | 653 * |
630 * The events of this stream are added to `streamConsumer` using | 654 * All events of this stream are added to `streamConsumer` using |
631 * [StreamConsumer.addStream]. | 655 * [StreamConsumer.addStream]. |
632 * The `streamConsumer` is closed when this stream has been successfully added | 656 * The `streamConsumer` is closed when this stream has been successfully added |
633 * to it - when the future returned by `addStream` completes without an error. | 657 * to it - when the future returned by `addStream` completes without an error. |
634 * | 658 * |
635 * Returns a future which completes when the stream has been consumed | 659 * Returns a future which completes when the stream has been consumed |
636 * and the consumer has been closed. | 660 * and the consumer has been closed. |
637 * | 661 * |
638 * The returned future completes with the same result as the future returned | 662 * The returned future completes with the same result as the future returned |
639 * by [StreamConsumer.close]. | 663 * by [StreamConsumer.close]. |
640 * If the adding of the stream itself fails in some way, | 664 * If the call to [StreamConsumer.addStream] fails in some way, this |
641 * then the consumer is expected to be closed, and won't be closed again. | 665 * method fails in the same way. |
642 * In that case the returned future completes with the error from calling | |
643 * `addStream`. | |
644 */ | 666 */ |
645 Future pipe(StreamConsumer<T> streamConsumer) { | 667 Future pipe(StreamConsumer<T> streamConsumer) { |
646 return streamConsumer.addStream(this).then((_) => streamConsumer.close()); | 668 return streamConsumer.addStream(this).then((_) => streamConsumer.close()); |
647 } | 669 } |
648 | 670 |
649 /** | 671 /** |
650 * Chains this stream as the input of the provided [StreamTransformer]. | 672 * Applies a [StreamTransformer] to the current stream. |
651 * | 673 * |
652 * Returns the result of [:streamTransformer.bind:] itself. | 674 * Returns the result of the stream transformation, |
675 * that is, the result of `streamTransformer.bind(this)`. | |
676 * This method simply allows writing the call to `streamTransformer.bind` | |
677 * in a chained fashion, like | |
678 * ``` | |
679 * stream.map(mapping).transform(transformation).toList() | |
680 * ``` | |
681 * which can be more convenient than calling `bind` directly. | |
653 * | 682 * |
654 * The `streamTransformer` can decide whether it wants to return a | 683 * The [streamTransformer] can return any stream. |
655 * broadcast stream or not. | 684 * Whether the returned stream is a broadcast stream or not, |
685 * and which elements it will contain, | |
686 * is entirely up to the transformation. | |
656 */ | 687 */ |
657 Stream<S> transform<S>(StreamTransformer<T, S> streamTransformer) { | 688 Stream<S> transform<S>(StreamTransformer<T, S> streamTransformer) { |
658 return streamTransformer.bind(this); | 689 return streamTransformer.bind(this); |
659 } | 690 } |
660 | 691 |
661 /** | 692 /** |
662 * Reduces a sequence of values by repeatedly applying [combine]. | 693 * Combines a sequence of values by repeatedly applying [combine]. |
694 * | |
695 * Similar to [Iterable.reduce], this function maintains a value, | |
696 * starting with the first element of the stream | |
697 * and updated for each further element of this stream. | |
698 * For each element after the first, | |
699 * the value is updated to the result of calling [combine] | |
700 * with the previous value and the element. | |
701 * | |
702 * When this stream is done, the returned future is completed with | |
703 * the value at that time. | |
704 * | |
705 * If the stream is empty, the returned future is completed with | |
706 * an error. | |
707 * If this stream emits an error, or the call to [combine] throws, | |
708 * the returned future is completed with that error, | |
709 * and processing is stopped. | |
663 */ | 710 */ |
664 Future<T> reduce(T combine(T previous, T element)) { | 711 Future<T> reduce(T combine(T previous, T element)) { |
665 _Future<T> result = new _Future<T>(); | 712 _Future<T> result = new _Future<T>(); |
666 bool seenFirst = false; | 713 bool seenFirst = false; |
667 T value; | 714 T value; |
668 StreamSubscription subscription; | 715 StreamSubscription subscription; |
669 subscription = this.listen( | 716 subscription = this.listen( |
670 (T element) { | 717 (T element) { |
671 if (seenFirst) { | 718 if (seenFirst) { |
672 _runUserCode(() => combine(value, element), (T newValue) { | 719 _runUserCode(() => combine(value, element), (T newValue) { |
673 value = newValue; | 720 value = newValue; |
674 }, _cancelAndErrorClosure(subscription, result)); | 721 }, _cancelAndErrorClosure(subscription, result)); |
675 } else { | 722 } else { |
676 value = element; | 723 value = element; |
677 seenFirst = true; | 724 seenFirst = true; |
678 } | 725 } |
679 }, | 726 }, |
680 onError: result._completeError, | 727 onError: result._completeError, |
681 onDone: () { | 728 onDone: () { |
682 if (!seenFirst) { | 729 if (!seenFirst) { |
683 try { | 730 try { |
731 // Throw and recatch, instead of just doing | |
732 // _completeWithErrorCallback, e, theError, StackTrace.current), | |
733 // to ensure that the stackTrace is set on the error. | |
684 throw IterableElementError.noElement(); | 734 throw IterableElementError.noElement(); |
685 } catch (e, s) { | 735 } catch (e, s) { |
686 _completeWithErrorCallback(result, e, s); | 736 _completeWithErrorCallback(result, e, s); |
687 } | 737 } |
688 } else { | 738 } else { |
689 result._complete(value); | 739 result._complete(value); |
690 } | 740 } |
691 }, | 741 }, |
692 cancelOnError: true); | 742 cancelOnError: true); |
693 return result; | 743 return result; |
694 } | 744 } |
695 | 745 |
696 /** Reduces a sequence of values by repeatedly applying [combine]. */ | 746 /** |
747 * Combines a sequence of values by repeatedly applying [combine]. | |
748 * | |
749 * Similar to [Iterable.fold], this function maintains a value, | |
750 * starting with [initialValue] and updated for each element of | |
751 * this stream. | |
752 * For each element, the value is updated to the result of calling | |
753 * [combine] with the previous value and the element. | |
754 * | |
755 * When this stream is done, the returned future is completed with | |
756 * the value at that time. | |
757 * For an empty stream, the future is completed with [initialValue]. | |
758 * | |
759 * If this stream emits an error, or the call to [combine] throws, | |
760 * the returned future is completed with that error, | |
761 * and processing is stopped. | |
762 */ | |
697 Future<S> fold<S>(S initialValue, S combine(S previous, T element)) { | 763 Future<S> fold<S>(S initialValue, S combine(S previous, T element)) { |
698 _Future<S> result = new _Future<S>(); | 764 _Future<S> result = new _Future<S>(); |
699 S value = initialValue; | 765 S value = initialValue; |
700 StreamSubscription subscription; | 766 StreamSubscription subscription; |
701 subscription = this.listen((T element) { | 767 subscription = this.listen( |
702 _runUserCode(() => combine(value, element), (S newValue) { | 768 (T element) { |
703 value = newValue; | 769 _runUserCode(() => combine(value, element), (S newValue) { |
704 }, _cancelAndErrorClosure(subscription, result)); | 770 value = newValue; |
705 }, onError: (e, st) { | 771 }, _cancelAndErrorClosure(subscription, result)); |
706 result._completeError(e, st); | 772 }, |
707 }, onDone: () { | 773 onError: result._completeError, |
708 result._complete(value); | 774 onDone: () { |
709 }, cancelOnError: true); | 775 result._complete(value); |
776 }, | |
777 cancelOnError: true); | |
710 return result; | 778 return result; |
711 } | 779 } |
712 | 780 |
713 /** | 781 /** |
714 * Collects string of data events' string representations. | 782 * Combines the string representation of elements into a single string. |
715 * | 783 * |
716 * If [separator] is provided, it is inserted between any two | 784 * Each element is converted to a string using its [Object.toString] method. |
717 * elements. | 785 * If [separator] is provided, it is inserted between element string |
786 * representations. | |
718 * | 787 * |
719 * Any error in the stream causes the future to complete with that | 788 * The returned future is completed with the combined string when the stream |
720 * error. Otherwise it completes with the collected string when | 789 * is done. |
721 * the "done" event arrives. | 790 * |
791 * If the stream contains an error, or if the call to [Object.toString] | |
792 * throws, the returned future is completed with that error, | |
793 * and processing stops. | |
722 */ | 794 */ |
723 Future<String> join([String separator = ""]) { | 795 Future<String> join([String separator = ""]) { |
724 _Future<String> result = new _Future<String>(); | 796 _Future<String> result = new _Future<String>(); |
725 StringBuffer buffer = new StringBuffer(); | 797 StringBuffer buffer = new StringBuffer(); |
726 StreamSubscription subscription; | 798 StreamSubscription subscription; |
727 bool first = true; | 799 bool first = true; |
728 subscription = this.listen((T element) { | 800 subscription = this.listen((T element) { |
729 if (!first) { | 801 if (!first) { |
730 buffer.write(separator); | 802 buffer.write(separator); |
731 } | 803 } |
732 first = false; | 804 first = false; |
733 try { | 805 try { |
734 buffer.write(element); | 806 buffer.write(element); |
735 } catch (e, s) { | 807 } catch (e, s) { |
736 _cancelAndErrorWithReplacement(subscription, result, e, s); | 808 _cancelAndErrorWithReplacement(subscription, result, e, s); |
737 } | 809 } |
738 }, onError: (e) { | 810 }, onError: (e) { |
739 result._completeError(e); | 811 result._completeError(e); |
740 }, onDone: () { | 812 }, onDone: () { |
741 result._complete(buffer.toString()); | 813 result._complete(buffer.toString()); |
742 }, cancelOnError: true); | 814 }, cancelOnError: true); |
743 return result; | 815 return result; |
744 } | 816 } |
745 | 817 |
746 /** | 818 /** |
747 * Checks whether [needle] occurs in the elements provided by this stream. | 819 * Returns whether [needle] occurs in the elements provided by this stream. |
748 * | 820 * |
749 * Completes the [Future] when the answer is known. | 821 * Compares each element of this stream to [needle] using [Object.==]. |
750 * If this stream reports an error, the [Future] will report that error. | 822 * If an equal element is found, the returned future is completed with `true`. |
823 * If the stream ends without finding a match, the future is completed with | |
824 * `false`. | |
825 * | |
826 * If the stream contains an error, or the call to `Object.==` throws, | |
827 * the returned future is completed with that error, and processing stops. | |
751 */ | 828 */ |
752 Future<bool> contains(Object needle) { | 829 Future<bool> contains(Object needle) { |
753 _Future<bool> future = new _Future<bool>(); | 830 _Future<bool> future = new _Future<bool>(); |
754 StreamSubscription subscription; | 831 StreamSubscription subscription; |
755 subscription = this.listen( | 832 subscription = this.listen( |
756 (T element) { | 833 (T element) { |
757 _runUserCode(() => (element == needle), (bool isMatch) { | 834 _runUserCode(() => (element == needle), (bool isMatch) { |
758 if (isMatch) { | 835 if (isMatch) { |
759 _cancelAndValue(subscription, future, true); | 836 _cancelAndValue(subscription, future, true); |
760 } | 837 } |
761 }, _cancelAndErrorClosure(subscription, future)); | 838 }, _cancelAndErrorClosure(subscription, future)); |
762 }, | 839 }, |
763 onError: future._completeError, | 840 onError: future._completeError, |
764 onDone: () { | 841 onDone: () { |
765 future._complete(false); | 842 future._complete(false); |
766 }, | 843 }, |
767 cancelOnError: true); | 844 cancelOnError: true); |
768 return future; | 845 return future; |
769 } | 846 } |
770 | 847 |
771 /** | 848 /** |
772 * Executes [action] on each data event of the stream. | 849 * Executes [action] on each element of the stream. |
773 * | 850 * |
774 * Completes the returned [Future] when all events of the stream | 851 * Completes the returned [Future] when all elements of the stream |
775 * have been processed. Completes the future with an error if the | 852 * have been processed. |
776 * stream has an error event, or if [action] throws. | 853 * |
854 * If the stream contains an error, or if the call to [action] throws, | |
855 * the returne future completes with that error, and processing stops. | |
777 */ | 856 */ |
778 Future forEach(void action(T element)) { | 857 Future forEach(void action(T element)) { |
779 _Future future = new _Future(); | 858 _Future future = new _Future(); |
780 StreamSubscription subscription; | 859 StreamSubscription subscription; |
781 subscription = this.listen( | 860 subscription = this.listen( |
782 (T element) { | 861 (T element) { |
783 // TODO(floitsch): the type should be 'void' and inferred. | 862 // TODO(floitsch): the type should be 'void' and inferred. |
784 _runUserCode<dynamic>(() => action(element), (_) {}, | 863 _runUserCode<dynamic>(() => action(element), (_) {}, |
785 _cancelAndErrorClosure(subscription, future)); | 864 _cancelAndErrorClosure(subscription, future)); |
786 }, | 865 }, |
787 onError: future._completeError, | 866 onError: future._completeError, |
788 onDone: () { | 867 onDone: () { |
789 future._complete(null); | 868 future._complete(null); |
790 }, | 869 }, |
791 cancelOnError: true); | 870 cancelOnError: true); |
792 return future; | 871 return future; |
793 } | 872 } |
794 | 873 |
795 /** | 874 /** |
796 * Checks whether [test] accepts all elements provided by this stream. | 875 * Checks whether [test] accepts all elements provided by this stream. |
797 * | 876 * |
798 * Completes the [Future] when the answer is known. | 877 * Calls [test] on each element of the stream. |
799 * If this stream reports an error, the [Future] will report that error. | 878 * If the call returns `false`, the returned future is completed with `false` |
879 * and processing stops. | |
880 * | |
881 * If the stream ends without finding an element that [test] rejects, | |
882 * the returned future is completed with `true`. | |
883 * | |
884 * If this stream contains an error, or if the call to [test] throws, | |
885 * the returned future is completed with that error, and processing stops. | |
800 */ | 886 */ |
801 Future<bool> every(bool test(T element)) { | 887 Future<bool> every(bool test(T element)) { |
802 _Future<bool> future = new _Future<bool>(); | 888 _Future<bool> future = new _Future<bool>(); |
803 StreamSubscription subscription; | 889 StreamSubscription subscription; |
804 subscription = this.listen( | 890 subscription = this.listen( |
805 (T element) { | 891 (T element) { |
806 _runUserCode(() => test(element), (bool isMatch) { | 892 _runUserCode(() => test(element), (bool isMatch) { |
807 if (!isMatch) { | 893 if (!isMatch) { |
808 _cancelAndValue(subscription, future, false); | 894 _cancelAndValue(subscription, future, false); |
809 } | 895 } |
810 }, _cancelAndErrorClosure(subscription, future)); | 896 }, _cancelAndErrorClosure(subscription, future)); |
811 }, | 897 }, |
812 onError: future._completeError, | 898 onError: future._completeError, |
813 onDone: () { | 899 onDone: () { |
814 future._complete(true); | 900 future._complete(true); |
815 }, | 901 }, |
816 cancelOnError: true); | 902 cancelOnError: true); |
817 return future; | 903 return future; |
818 } | 904 } |
819 | 905 |
820 /** | 906 /** |
821 * Checks whether [test] accepts any element provided by this stream. | 907 * Checks whether [test] accepts any element provided by this stream. |
822 * | 908 * |
823 * Completes the [Future] when the answer is known. | 909 * Calls [test] on each element of the stream. |
910 * If the call returns `true`, the returned future is completed with `true` | |
911 * and processing stops. | |
824 * | 912 * |
825 * If this stream reports an error, the [Future] reports that error. | 913 * If the stream ends without finding an element that [test] accepts, |
914 * the returned future is completed with `false`. | |
826 * | 915 * |
827 * Stops listening to the stream after the first matching element has been | 916 * If this stream contains an error, or if the call to [test] throws, |
828 * found. | 917 * the returned future is completed with that error, and processing stops. |
829 * | |
830 * Internally the method cancels its subscription after this element. This | |
831 * means that single-subscription (non-broadcast) streams are closed and | |
832 * cannot be reused after a call to this method. | |
833 */ | 918 */ |
834 Future<bool> any(bool test(T element)) { | 919 Future<bool> any(bool test(T element)) { |
835 _Future<bool> future = new _Future<bool>(); | 920 _Future<bool> future = new _Future<bool>(); |
836 StreamSubscription subscription; | 921 StreamSubscription subscription; |
837 subscription = this.listen( | 922 subscription = this.listen( |
838 (T element) { | 923 (T element) { |
839 _runUserCode(() => test(element), (bool isMatch) { | 924 _runUserCode(() => test(element), (bool isMatch) { |
840 if (isMatch) { | 925 if (isMatch) { |
841 _cancelAndValue(subscription, future, true); | 926 _cancelAndValue(subscription, future, true); |
842 } | 927 } |
843 }, _cancelAndErrorClosure(subscription, future)); | 928 }, _cancelAndErrorClosure(subscription, future)); |
844 }, | 929 }, |
845 onError: future._completeError, | 930 onError: future._completeError, |
846 onDone: () { | 931 onDone: () { |
847 future._complete(false); | 932 future._complete(false); |
848 }, | 933 }, |
849 cancelOnError: true); | 934 cancelOnError: true); |
850 return future; | 935 return future; |
851 } | 936 } |
852 | 937 |
853 /** Counts the elements in the stream. */ | 938 /** |
939 * The number of elements in this stream. | |
940 * | |
941 * Waits for all elements of this stream. When the stream ends, | |
942 * the returned future is completed with the number of elements. | |
943 * | |
944 * If the stream contains an error, the returned future is completed with | |
945 * that error, and processing stops. | |
946 * | |
947 * This operation listens to the stream, and a non-broadcast stream cannot | |
948 * be reused after finding its length. | |
949 */ | |
854 Future<int> get length { | 950 Future<int> get length { |
855 _Future<int> future = new _Future<int>(); | 951 _Future<int> future = new _Future<int>(); |
856 int count = 0; | 952 int count = 0; |
857 this.listen( | 953 this.listen( |
858 (_) { | 954 (_) { |
859 count++; | 955 count++; |
860 }, | 956 }, |
861 onError: future._completeError, | 957 onError: future._completeError, |
862 onDone: () { | 958 onDone: () { |
863 future._complete(count); | 959 future._complete(count); |
864 }, | 960 }, |
865 cancelOnError: true); | 961 cancelOnError: true); |
866 return future; | 962 return future; |
867 } | 963 } |
868 | 964 |
869 /** | 965 /** |
870 * Reports whether this stream contains any elements. | 966 * Whether this stream contains any elements. |
871 * | 967 * |
872 * Stops listening to the stream after the first element has been received. | 968 * Waits for the first element of this stream, then completes the returned |
969 * future with `true`. | |
970 * If the stream ends without emitting any elements, the returned future is | |
971 * completed with `false`. | |
873 * | 972 * |
874 * Internally the method cancels its subscription after the first element. | 973 * If the first event is an error, the returned future is completed with that |
875 * This means that single-subscription (non-broadcast) streams are closed and | 974 * error. |
876 * cannot be reused after a call to this getter. | 975 * |
976 * This operation listens to the stream, and a non-broadcast stream cannot | |
977 * be reused after checking whether it is empty. | |
877 */ | 978 */ |
878 Future<bool> get isEmpty { | 979 Future<bool> get isEmpty { |
879 _Future<bool> future = new _Future<bool>(); | 980 _Future<bool> future = new _Future<bool>(); |
880 StreamSubscription subscription; | 981 StreamSubscription subscription; |
881 subscription = this.listen( | 982 subscription = this.listen( |
882 (_) { | 983 (_) { |
883 _cancelAndValue(subscription, future, false); | 984 _cancelAndValue(subscription, future, false); |
884 }, | 985 }, |
885 onError: future._completeError, | 986 onError: future._completeError, |
886 onDone: () { | 987 onDone: () { |
887 future._complete(true); | 988 future._complete(true); |
888 }, | 989 }, |
889 cancelOnError: true); | 990 cancelOnError: true); |
890 return future; | 991 return future; |
891 } | 992 } |
892 | 993 |
893 /** Collects the data of this stream in a [List]. */ | 994 /** |
995 * Collects all elements of this stream in a [List]. | |
996 * | |
997 * Creates a `List<T>` and adds all elements of the stream to the list | |
998 * in the order they arrive. | |
999 * When the stream ends, the returned future is completed with that list. | |
1000 * | |
1001 * If the stream contains an error, the returned future is completed | |
1002 * with that error, and processing stops. | |
1003 */ | |
894 Future<List<T>> toList() { | 1004 Future<List<T>> toList() { |
895 List<T> result = <T>[]; | 1005 List<T> result = <T>[]; |
896 _Future<List<T>> future = new _Future<List<T>>(); | 1006 _Future<List<T>> future = new _Future<List<T>>(); |
897 this.listen( | 1007 this.listen( |
898 (T data) { | 1008 (T data) { |
899 result.add(data); | 1009 result.add(data); |
900 }, | 1010 }, |
901 onError: future._completeError, | 1011 onError: future._completeError, |
902 onDone: () { | 1012 onDone: () { |
903 future._complete(result); | 1013 future._complete(result); |
(...skipping 131 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1035 * | 1145 * |
1036 * The returned stream is a broadcast stream if this stream is. | 1146 * The returned stream is a broadcast stream if this stream is. |
1037 * If a broadcast stream is listened to more than once, each subscription | 1147 * If a broadcast stream is listened to more than once, each subscription |
1038 * will individually perform the `equals` test. | 1148 * will individually perform the `equals` test. |
1039 */ | 1149 */ |
1040 Stream<T> distinct([bool equals(T previous, T next)]) { | 1150 Stream<T> distinct([bool equals(T previous, T next)]) { |
1041 return new _DistinctStream<T>(this, equals); | 1151 return new _DistinctStream<T>(this, equals); |
1042 } | 1152 } |
1043 | 1153 |
1044 /** | 1154 /** |
1045 * Returns the first element of the stream. | 1155 * The first element of the stream. |
1046 * | 1156 * |
1047 * Stops listening to the stream after the first element has been received. | 1157 * Stops listening to the stream after the first element has been received. |
1048 * | 1158 * |
1049 * Internally the method cancels its subscription after the first element. | 1159 * Internally the method cancels its subscription after the first element. |
1050 * This means that single-subscription (non-broadcast) streams are closed | 1160 * This means that single-subscription (non-broadcast) streams are closed |
1051 * and cannot be reused after a call to this getter. | 1161 * and cannot be reused after a call to this getter. |
1052 * | 1162 * |
1053 * If an error event occurs before the first data event, the resulting future | 1163 * If an error event occurs before the first data event, the returned future |
1054 * is completed with that error. | 1164 * is completed with that error. |
1055 * | 1165 * |
1056 * If this stream is empty (a done event occurs before the first data event), | 1166 * If this stream is empty (a done event occurs before the first data event), |
1057 * the resulting future completes with a [StateError]. | 1167 * the returned future completes with an error. |
1058 * | 1168 * |
1059 * Except for the type of the error, this method is equivalent to | 1169 * Except for the type of the error, this method is equivalent to |
1060 * [:this.elementAt(0):]. | 1170 * `this.elementAt(0)`. |
1061 */ | 1171 */ |
1062 Future<T> get first { | 1172 Future<T> get first { |
1063 _Future<T> future = new _Future<T>(); | 1173 _Future<T> future = new _Future<T>(); |
1064 StreamSubscription subscription; | 1174 StreamSubscription subscription; |
1065 subscription = this.listen( | 1175 subscription = this.listen( |
1066 (T value) { | 1176 (T value) { |
1067 _cancelAndValue(subscription, future, value); | 1177 _cancelAndValue(subscription, future, value); |
1068 }, | 1178 }, |
1069 onError: future._completeError, | 1179 onError: future._completeError, |
1070 onDone: () { | 1180 onDone: () { |
1071 try { | 1181 try { |
1072 throw IterableElementError.noElement(); | 1182 throw IterableElementError.noElement(); |
1073 } catch (e, s) { | 1183 } catch (e, s) { |
1074 _completeWithErrorCallback(future, e, s); | 1184 _completeWithErrorCallback(future, e, s); |
1075 } | 1185 } |
1076 }, | 1186 }, |
1077 cancelOnError: true); | 1187 cancelOnError: true); |
1078 return future; | 1188 return future; |
1079 } | 1189 } |
1080 | 1190 |
1081 /** | 1191 /** |
1082 * Returns the last element of the stream. | 1192 * The last element of this stream. |
1083 * | 1193 * |
1084 * If an error event occurs before the first data event, the resulting future | 1194 * If this stream emits an error event, |
1085 * is completed with that error. | 1195 * the returned future is completed with that error |
1196 * and processing stops. | |
1086 * | 1197 * |
1087 * If this stream is empty (a done event occurs before the first data event), | 1198 * If this stream is empty (the done event is the first event), |
1088 * the resulting future completes with a [StateError]. | 1199 * the returned future completes with an error. |
1089 */ | 1200 */ |
1090 Future<T> get last { | 1201 Future<T> get last { |
1091 _Future<T> future = new _Future<T>(); | 1202 _Future<T> future = new _Future<T>(); |
1092 T result = null; | 1203 T result = null; |
1093 bool foundResult = false; | 1204 bool foundResult = false; |
1094 listen( | 1205 listen( |
1095 (T value) { | 1206 (T value) { |
1096 foundResult = true; | 1207 foundResult = true; |
1097 result = value; | 1208 result = value; |
1098 }, | 1209 }, |
1099 onError: future._completeError, | 1210 onError: future._completeError, |
1100 onDone: () { | 1211 onDone: () { |
1101 if (foundResult) { | 1212 if (foundResult) { |
1102 future._complete(result); | 1213 future._complete(result); |
1103 return; | 1214 return; |
1104 } | 1215 } |
1105 try { | 1216 try { |
1106 throw IterableElementError.noElement(); | 1217 throw IterableElementError.noElement(); |
1107 } catch (e, s) { | 1218 } catch (e, s) { |
1108 _completeWithErrorCallback(future, e, s); | 1219 _completeWithErrorCallback(future, e, s); |
1109 } | 1220 } |
1110 }, | 1221 }, |
1111 cancelOnError: true); | 1222 cancelOnError: true); |
1112 return future; | 1223 return future; |
1113 } | 1224 } |
1114 | 1225 |
1115 /** | 1226 /** |
1116 * Returns the single element. | 1227 * The single element of this stream. |
1117 * | 1228 * |
1118 * If an error event occurs before or after the first data event, the | 1229 * If this stream emits an error event, |
1119 * resulting future is completed with that error. | 1230 * the returned future is completed with that error |
1231 * and processing stops. | |
1120 * | 1232 * |
1121 * If [this] is empty or has more than one element throws a [StateError]. | 1233 * If [this] is empty or has more than one element, |
1234 * the returned future completes with an error. | |
1122 */ | 1235 */ |
1123 Future<T> get single { | 1236 Future<T> get single { |
1124 _Future<T> future = new _Future<T>(); | 1237 _Future<T> future = new _Future<T>(); |
1125 T result = null; | 1238 T result = null; |
1126 bool foundResult = false; | 1239 bool foundResult = false; |
1127 StreamSubscription subscription; | 1240 StreamSubscription subscription; |
1128 subscription = this.listen( | 1241 subscription = this.listen( |
1129 (T value) { | 1242 (T value) { |
1130 if (foundResult) { | 1243 if (foundResult) { |
1131 // This is the second element we get. | 1244 // This is the second element we get. |
(...skipping 19 matching lines...) Expand all Loading... | |
1151 _completeWithErrorCallback(future, e, s); | 1264 _completeWithErrorCallback(future, e, s); |
1152 } | 1265 } |
1153 }, | 1266 }, |
1154 cancelOnError: true); | 1267 cancelOnError: true); |
1155 return future; | 1268 return future; |
1156 } | 1269 } |
1157 | 1270 |
1158 /** | 1271 /** |
1159 * Finds the first element of this stream matching [test]. | 1272 * Finds the first element of this stream matching [test]. |
1160 * | 1273 * |
1161 * Returns a future that is filled with the first element of this stream | 1274 * Returns a future that is completed with the first element of this stream |
1162 * that [test] returns true for. | 1275 * that [test] returns `true` for. |
1163 * | 1276 * |
1164 * If no such element is found before this stream is done, and a | 1277 * If no such element is found before this stream is done, and a |
1165 * [defaultValue] function is provided, the result of calling [defaultValue] | 1278 * [defaultValue] function is provided, the result of calling [defaultValue] |
1166 * becomes the value of the future. | 1279 * becomes the value of the future. If [defaultValue] throws, the returned |
1280 * future is completed with that error. | |
1167 * | 1281 * |
1168 * Stops listening to the stream after the first matching element has been | 1282 * If this stream emits an error before the first matching element, |
1169 * received. | 1283 * the returned future is completed with that error, and processing stops. |
1284 * | |
1285 * Stops listening to the stream after the first matching element or error | |
1286 * has been received. | |
1170 * | 1287 * |
1171 * Internally the method cancels its subscription after the first element that | 1288 * Internally the method cancels its subscription after the first element that |
1172 * matches the predicate. This means that single-subscription (non-broadcast) | 1289 * matches the predicate. This means that single-subscription (non-broadcast) |
1173 * streams are closed and cannot be reused after a call to this method. | 1290 * streams are closed and cannot be reused after a call to this method. |
1174 * | 1291 * |
1175 * If an error occurs, or if this stream ends without finding a match and | 1292 * If an error occurs, or if this stream ends without finding a match and |
1176 * with no [defaultValue] function provided, the future will receive an | 1293 * with no [defaultValue] function provided, |
1177 * error. | 1294 * the returned future is completed with an error. |
1178 */ | 1295 */ |
1179 Future<dynamic> firstWhere(bool test(T element), {Object defaultValue()}) { | 1296 Future<dynamic> firstWhere(bool test(T element), {Object defaultValue()}) { |
1180 _Future<dynamic> future = new _Future(); | 1297 _Future<dynamic> future = new _Future(); |
1181 StreamSubscription subscription; | 1298 StreamSubscription subscription; |
1182 subscription = this.listen( | 1299 subscription = this.listen( |
1183 (T value) { | 1300 (T value) { |
1184 _runUserCode(() => test(value), (bool isMatch) { | 1301 _runUserCode(() => test(value), (bool isMatch) { |
1185 if (isMatch) { | 1302 if (isMatch) { |
1186 _cancelAndValue(subscription, future, value); | 1303 _cancelAndValue(subscription, future, value); |
1187 } | 1304 } |
(...skipping 11 matching lines...) Expand all Loading... | |
1199 _completeWithErrorCallback(future, e, s); | 1316 _completeWithErrorCallback(future, e, s); |
1200 } | 1317 } |
1201 }, | 1318 }, |
1202 cancelOnError: true); | 1319 cancelOnError: true); |
1203 return future; | 1320 return future; |
1204 } | 1321 } |
1205 | 1322 |
1206 /** | 1323 /** |
1207 * Finds the last element in this stream matching [test]. | 1324 * Finds the last element in this stream matching [test]. |
1208 * | 1325 * |
1209 * As [firstWhere], except that the last matching element is found. | 1326 * If this stream emits an error, the returned future is completed with that |
1210 * That means that the result cannot be provided before this stream | 1327 * error, and processing stops. |
1328 * | |
1329 * Otherwise as [firstWhere], except that the last matching element is found | |
1330 * instead of the first. | |
1331 * That means that a non-error result cannot be provided before this stream | |
1211 * is done. | 1332 * is done. |
1212 */ | 1333 */ |
1213 Future<dynamic> lastWhere(bool test(T element), {Object defaultValue()}) { | 1334 Future<dynamic> lastWhere(bool test(T element), {Object defaultValue()}) { |
1214 _Future<dynamic> future = new _Future(); | 1335 _Future<dynamic> future = new _Future(); |
1215 T result = null; | 1336 T result = null; |
1216 bool foundResult = false; | 1337 bool foundResult = false; |
1217 StreamSubscription subscription; | 1338 StreamSubscription subscription; |
1218 subscription = this.listen( | 1339 subscription = this.listen( |
1219 (T value) { | 1340 (T value) { |
1220 _runUserCode(() => true == test(value), (bool isMatch) { | 1341 _runUserCode(() => true == test(value), (bool isMatch) { |
(...skipping 235 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1456 * operation). If the listener wants to delete the file after having | 1577 * operation). If the listener wants to delete the file after having |
1457 * canceled the subscription, it must wait for the cleanup future to complete. | 1578 * canceled the subscription, it must wait for the cleanup future to complete. |
1458 * | 1579 * |
1459 * A returned future completes with a `null` value. | 1580 * A returned future completes with a `null` value. |
1460 * If the cleanup throws, which it really shouldn't, the returned future | 1581 * If the cleanup throws, which it really shouldn't, the returned future |
1461 * completes with that error. | 1582 * completes with that error. |
1462 */ | 1583 */ |
1463 Future cancel(); | 1584 Future cancel(); |
1464 | 1585 |
1465 /** | 1586 /** |
1466 * Set or override the data event handler of this subscription. | 1587 * Replaces the data event handler of this subscription. |
1467 * | 1588 * |
1468 * This method overrides the handler that has been set at the invocation of | 1589 * The [handleData] function is called for each element of the stream |
1469 * [Stream.listen]. | 1590 * after this function is called. |
1591 * If [handleData] is `null`, further elements are ignored. | |
1592 * | |
1593 * This method replaces the current handler set by the invocation of | |
1594 * [Stream.listen] or by a previous call to [onData]. | |
1470 */ | 1595 */ |
1471 void onData(void handleData(T data)); | 1596 void onData(void handleData(T data)); |
1472 | 1597 |
1473 /** | 1598 /** |
1474 * Set or override the error event handler of this subscription. | 1599 * Replaces the error event handler of this subscription. |
1475 * | 1600 * |
1476 * This method overrides the handler that has been set at the invocation of | 1601 * The [handleError] function must be able to be called with either |
1477 * [Stream.listen] or by calling [asFuture]. | 1602 * one positional argument, or with two positional arguments |
1603 * where the seconds is always a [StackTrace]. | |
1604 * | |
1605 * The [handleError] argument may be `null`, in which case further | |
1606 * error events are considered unhandled, and will be reported to | |
1607 * [Zone.handleUncaughtError]. | |
1608 * | |
1609 * The provided function is called for all error events from the | |
1610 * stream subscription. | |
1611 * | |
1612 * This method replaces the current handler set by the invocation of | |
1613 * [Stream.listen], by calling [asFuture], or by a previous call to [onError]. | |
1478 */ | 1614 */ |
1479 void onError(Function handleError); | 1615 void onError(Function handleError); |
1480 | 1616 |
1481 /** | 1617 /** |
1482 * Set or override the done event handler of this subscription. | 1618 * Replaces the done event handler of this subscription. |
1483 * | 1619 * |
1484 * This method overrides the handler that has been set at the invocation of | 1620 * The [handleDone] function is called when the stream closes. |
1485 * [Stream.listen] or by calling [asFuture]. | 1621 * The value may be `null`, in which case no function is called. |
1622 * | |
1623 * This method reaplces the current handler set by the invocation of | |
1624 * [Stream.listen], by calling [asFuture], or by a previous call to [onDone]. | |
1486 */ | 1625 */ |
1487 void onDone(void handleDone()); | 1626 void onDone(void handleDone()); |
1488 | 1627 |
1489 /** | 1628 /** |
1490 * Request that the stream pauses events until further notice. | 1629 * Request that the stream pauses events until further notice. |
1491 * | 1630 * |
1492 * While paused, the subscription will not fire any events. | 1631 * While paused, the subscription will not fire any events. |
1493 * If it receives events from its source, they will be buffered until | 1632 * If it receives events from its source, they will be buffered until |
1494 * the subscription is resumed. | 1633 * the subscription is resumed. |
1495 * The underlying source is usually informed about the pause, | 1634 * For non-broadcast streams, the underlying source is usually informed |
1635 * about the pause, | |
1496 * so it can stop generating events until the subscription is resumed. | 1636 * so it can stop generating events until the subscription is resumed. |
1497 * | 1637 * |
1498 * To avoid buffering events on a broadcast stream, it is better to | 1638 * To avoid buffering events on a broadcast stream, it is better to |
1499 * cancel this subscription, and start to listen again when events | 1639 * cancel this subscription, and start to listen again when events |
1500 * are needed. | 1640 * are needed, if the intermediate events are not important. |
1501 * | 1641 * |
1502 * If [resumeSignal] is provided, the stream will undo the pause | 1642 * If [resumeSignal] is provided, the stream subscription will undo the pause |
1503 * when the future completes. If the future completes with an error, | 1643 * when the future completes, as if by a call to [resume]. |
1504 * the stream will resume, but the error will not be handled! | 1644 * If the future completes with an error, |
1645 * the stream will still resume, but the error will be considered unhandled | |
1646 * and is passed to [Zone.handleUncaughtError]. | |
1505 * | 1647 * |
1506 * A call to [resume] will also undo a pause. | 1648 * A call to [resume] will also undo a pause. |
1507 * | 1649 * |
1508 * If the subscription is paused more than once, an equal number | 1650 * If the subscription is paused more than once, an equal number |
1509 * of resumes must be performed to resume the stream. | 1651 * of resumes must be performed to resume the stream. |
1510 * | 1652 * |
1511 * Currently DOM streams silently drop events when the stream is paused. This | 1653 * Currently DOM streams silently drop events when the stream is paused. This |
1512 * is a bug and will be fixed. | 1654 * is a bug and will be fixed. |
1513 */ | 1655 */ |
1514 void pause([Future resumeSignal]); | 1656 void pause([Future resumeSignal]); |
1515 | 1657 |
1516 /** | 1658 /** |
1517 * Resume after a pause. | 1659 * Resume after a pause. |
1660 * | |
1661 * This undoes one previous call to [pause]. | |
1662 * When all previously calls to [pause] have been matched by a calls to | |
1663 * [resume], possibly through a `resumeSignal` passed to [pause], | |
1664 * the stream subscription may emit events again. | |
1518 */ | 1665 */ |
1519 void resume(); | 1666 void resume(); |
1520 | 1667 |
1521 /** | 1668 /** |
1522 * Returns true if the [StreamSubscription] is paused. | 1669 * Whether the [StreamSubscription] is currently paused. |
1670 * | |
1671 * If there have been more calls to [pause] than to [resume] on this | |
1672 * stream subscription, the subscription is paused, and this getter | |
1673 * returns `true`. | |
1674 * | |
1675 * Returns `false` if the stream can currently emit events, or if | |
1676 * the subscription has completed or been cancelled. | |
1523 */ | 1677 */ |
1524 bool get isPaused; | 1678 bool get isPaused; |
1525 | 1679 |
1526 /** | 1680 /** |
1527 * Returns a future that handles the [onDone] and [onError] callbacks. | 1681 * Returns a future that handles the [onDone] and [onError] callbacks. |
1528 * | 1682 * |
1529 * This method *overwrites* the existing [onDone] and [onError] callbacks | 1683 * This method *overwrites* the existing [onDone] and [onError] callbacks |
1530 * with new ones that complete the returned future. | 1684 * with new ones that complete the returned future. |
1531 * | 1685 * |
1532 * In case of an error the subscription will automatically cancel (even | 1686 * In case of an error the subscription will automatically cancel (even |
(...skipping 152 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1685 * Return a future which is completed when the [StreamSink] is finished. | 1839 * Return a future which is completed when the [StreamSink] is finished. |
1686 * | 1840 * |
1687 * If the `StreamSink` fails with an error, | 1841 * If the `StreamSink` fails with an error, |
1688 * perhaps in response to adding events using [add], [addError] or [close], | 1842 * perhaps in response to adding events using [add], [addError] or [close], |
1689 * the [done] future will complete with that error. | 1843 * the [done] future will complete with that error. |
1690 * | 1844 * |
1691 * Otherwise, the returned future will complete when either: | 1845 * Otherwise, the returned future will complete when either: |
1692 * | 1846 * |
1693 * * all events have been processed and the sink has been closed, or | 1847 * * all events have been processed and the sink has been closed, or |
1694 * * the sink has otherwise been stopped from handling more events | 1848 * * the sink has otherwise been stopped from handling more events |
1695 * (for example by cancelling a stream subscription). | 1849 * (for example by canceling a stream subscription). |
1696 */ | 1850 */ |
1697 Future get done; | 1851 Future get done; |
1698 } | 1852 } |
1699 | 1853 |
1700 /** | 1854 /** |
1701 * Transforms a Stream. | 1855 * Transforms a Stream. |
1702 * | 1856 * |
1703 * When a stream's [Stream.transform] method is invoked with a | 1857 * When a stream's [Stream.transform] method is invoked with a |
1704 * [StreamTransformer], the stream calls the [bind] method on the provided | 1858 * [StreamTransformer], the stream calls the [bind] method on the provided |
1705 * transformer. The resulting stream is then returned from the | 1859 * transformer. The resulting stream is then returned from the |
(...skipping 201 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1907 | 2061 |
1908 void addError(error, [StackTrace stackTrace]) { | 2062 void addError(error, [StackTrace stackTrace]) { |
1909 _sink.addError(error, stackTrace); | 2063 _sink.addError(error, stackTrace); |
1910 } | 2064 } |
1911 | 2065 |
1912 void close() { | 2066 void close() { |
1913 _sink.close(); | 2067 _sink.close(); |
1914 } | 2068 } |
1915 } | 2069 } |
1916 | 2070 |
1917 /// A group created by [Stream.groupBy] or [Stream.groupByMapped]. | 2071 /// A group created by [Stream.groupBy]. |
1918 /// | 2072 /// |
1919 /// The stream created by `groupBy` emits a `GroupedEvents` for each distinct ke y | 2073 /// The stream created by `groupBy` emits a `GroupedEvents` |
1920 /// it encounters. | 2074 /// for each distinct key it encounters. |
1921 /// This group contains the [key] itself, along with a stream of the [values] | 2075 /// This group contains the [key] itself, along with a stream of the [values] |
1922 /// associated with that key. | 2076 /// associated with that key. |
1923 class GroupedEvents<K, V> { | 2077 class GroupedEvents<K, V> { |
1924 /// The key that identifiers the values emitted by [values]. | 2078 /// The key that identifiers the values emitted by [values]. |
1925 final K key; | 2079 final K key; |
1926 | 2080 |
1927 /// The [values] that [GroupBy] have grouped by the common [key]. | 2081 /// The [values] that [GroupBy] have grouped by the common [key]. |
1928 final Stream<V> values; | 2082 final Stream<V> values; |
1929 | 2083 |
1930 factory GroupedEvents(K key, Stream<V> values) = GroupedEvents<K, V>._; | 2084 factory GroupedEvents(K key, Stream<V> values) = GroupedEvents<K, V>._; |
1931 | 2085 |
1932 // Don't expose a generative constructor. | 2086 // Don't expose a generative constructor. |
1933 // This class is not intended for subclassing, so we don't want to promise | 2087 // This class is not intended for subclassing, so we don't want to promise |
1934 // it. We can change that in the future. | 2088 // it. We can change that in the future. |
1935 GroupedEvents._(this.key, this.values); | 2089 GroupedEvents._(this.key, this.values); |
1936 | 2090 |
1937 /// Tells [values] to discard values instead of retaining them. | 2091 /// Tells [values] to discard values instead of retaining them. |
1938 /// | 2092 /// |
1939 /// Must only be used instead of listening to the [values] stream. | 2093 /// Must only be used instead of listening to the [values] stream. |
1940 /// If the stream has been listened to, this call fails. | 2094 /// If the stream has been listened to, this call fails. |
1941 /// After calling this method, listening on the [values] stream fails. | 2095 /// After calling this method, listening on the [values] stream fails. |
1942 Future cancel() { | 2096 Future cancel() { |
1943 // If values has been listened to, | 2097 // If values has been listened to, |
1944 // this throws a StateError saying that stream has already been listened to, | 2098 // this throws a StateError saying that stream has already been listened to, |
1945 // which is a correct error message for this call too. | 2099 // which is a correct error message for this call too. |
1946 return values.listen(null).cancel(); | 2100 return values.listen(null).cancel(); |
1947 } | 2101 } |
1948 } | 2102 } |
OLD | NEW |