Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(139)

Side by Side Diff: sdk/lib/async/stream.dart

Issue 2899273003: Update stream documentation. (Closed)
Patch Set: Address comments. Created 3 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Core Stream types 8 // Core Stream types
9 // ------------------------------------------------------------------- 9 // -------------------------------------------------------------------
10 10
11 typedef void _TimerCallback(); 11 typedef void _TimerCallback();
12 12
13 /** 13 /**
14 * A source of asynchronous data events. 14 * A source of asynchronous data events.
15 * 15 *
16 * A Stream provides a way to receive a sequence of events. 16 * A Stream provides a way to receive a sequence of events.
17 * Each event is either a data event or an error event, 17 * Each event is either a data event, also called an *element* of the stream,
18 * representing the result of a single computation. 18 * or an error event, which is a notification that something has failed.
19 * When the events provided by a Stream have all been sent, 19 * When a stream has emitted all its event,
20 * a single "done" event will mark the end. 20 * a single "done" event will notify the listener that the end has been reached.
21 * 21 *
22 * You can [listen] on a stream to make it start generating events, 22 * You [listen] on a stream to make it start generating events,
23 * and to set up listeners that receive the events. 23 * and to set up listeners that receive the events.
24 * When you listen, you receive a [StreamSubscription] object 24 * When you listen, you receive a [StreamSubscription] object
25 * which is the active object providing the events, 25 * which is the active object providing the events,
26 * and which can be used to stop listening again, 26 * and which can be used to stop listening again,
27 * or to temporarily pause events from the subscription. 27 * or to temporarily pause events from the subscription.
28 * 28 *
29 * There are two kinds of streams: "Single-subscription" streams and 29 * There are two kinds of streams: "Single-subscription" streams and
30 * "broadcast" streams. 30 * "broadcast" streams.
31 * 31 *
32 * *A single-subscription stream* allows only a single listener during the whole 32 * *A single-subscription stream* allows only a single listener during the whole
(...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after
111 controller._closeUnchecked(); 111 controller._closeUnchecked();
112 }); 112 });
113 return controller.stream; 113 return controller.stream;
114 } 114 }
115 115
116 /** 116 /**
117 * Create a stream from a group of futures. 117 * Create a stream from a group of futures.
118 * 118 *
119 * The stream reports the results of the futures on the stream in the order 119 * The stream reports the results of the futures on the stream in the order
120 * in which the futures complete. 120 * in which the futures complete.
121 * Each future provides either a data event or an error event,
122 * depending on how the future completes.
121 * 123 *
122 * If some futures have completed before calling `Stream.fromFutures`, 124 * If some futures have already completed when `Stream.fromFutures` is called,
123 * their result will be output on the created stream in some unspecified 125 * their results will be emitted in some unspecified order.
124 * order.
125 * 126 *
126 * When all futures have completed, the stream is closed. 127 * When all futures have completed, the stream is closed.
127 * 128 *
128 * If no future is passed, the stream closes as soon as possible. 129 * If [futures] is empty, the stream closes as soon as possible.
129 */ 130 */
130 factory Stream.fromFutures(Iterable<Future<T>> futures) { 131 factory Stream.fromFutures(Iterable<Future<T>> futures) {
131 _StreamController<T> controller = new StreamController<T>(sync: true); 132 _StreamController<T> controller = new StreamController<T>(sync: true);
132 int count = 0; 133 int count = 0;
134 // Declare these as variables holding closures instead of as
135 // function declarations.
136 // This avoids creating a new closure from the functions for each future.
133 var onValue = (T value) { 137 var onValue = (T value) {
134 if (!controller.isClosed) { 138 if (!controller.isClosed) {
135 controller._add(value); 139 controller._add(value);
136 if (--count == 0) controller._closeUnchecked(); 140 if (--count == 0) controller._closeUnchecked();
137 } 141 }
138 }; 142 };
139 var onError = (error, stack) { 143 var onError = (error, stack) {
140 if (!controller.isClosed) { 144 if (!controller.isClosed) {
141 controller._addError(error, stack); 145 controller._addError(error, stack);
142 if (--count == 0) controller._closeUnchecked(); 146 if (--count == 0) controller._closeUnchecked();
(...skipping 192 matching lines...) Expand 10 before | Expand all | Expand 10 after
335 * stack trace). 339 * stack trace).
336 * Otherwise it is called with just the error object. 340 * Otherwise it is called with just the error object.
337 * If [onError] is omitted, any errors on the stream are considered unhandled, 341 * If [onError] is omitted, any errors on the stream are considered unhandled,
338 * and will be passed to the current [Zone]'s error handler. 342 * and will be passed to the current [Zone]'s error handler.
339 * By default unhandled async errors are treated 343 * By default unhandled async errors are treated
340 * as if they were uncaught top-level errors. 344 * as if they were uncaught top-level errors.
341 * 345 *
342 * If this stream closes and sends a done event, the [onDone] handler is 346 * If this stream closes and sends a done event, the [onDone] handler is
343 * called. If [onDone] is `null`, nothing happens. 347 * called. If [onDone] is `null`, nothing happens.
344 * 348 *
345 * If [cancelOnError] is true, the subscription is automatically cancelled 349 * If [cancelOnError] is true, the subscription is automatically canceled
346 * when the first error event is delivered. The default is `false`. 350 * when the first error event is delivered. The default is `false`.
347 * 351 *
348 * While a subscription is paused, or when it has been cancelled, 352 * While a subscription is paused, or when it has been canceled,
349 * the subscription doesn't receive events and none of the 353 * the subscription doesn't receive events and none of the
350 * event handler functions are called. 354 * event handler functions are called.
351 */ 355 */
352 StreamSubscription<T> listen(void onData(T event), 356 StreamSubscription<T> listen(void onData(T event),
353 {Function onError, void onDone(), bool cancelOnError}); 357 {Function onError, void onDone(), bool cancelOnError});
354 358
355 /** 359 /**
356 * Creates a new stream from this stream that discards some data events. 360 * Creates a new stream from this stream that discards some elements.
357 * 361 *
358 * The new stream sends the same error and done events as this stream, 362 * The new stream sends the same error and done events as this stream,
359 * but it only sends the data events that satisfy the [test]. 363 * but it only sends the data events that satisfy the [test].
360 * 364 *
365 * If the [test] function throws, the data event is dropped and the
366 * error is emitted on the returned stream instead.
367 *
361 * The returned stream is a broadcast stream if this stream is. 368 * The returned stream is a broadcast stream if this stream is.
362 * If a broadcast stream is listened to more than once, each subscription 369 * If a broadcast stream is listened to more than once, each subscription
363 * will individually perform the `test`. 370 * will individually perform the `test`.
364 */ 371 */
365 Stream<T> where(bool test(T event)) { 372 Stream<T> where(bool test(T event)) {
366 return new _WhereStream<T>(this, test); 373 return new _WhereStream<T>(this, test);
367 } 374 }
368 375
369 /** 376 /**
377 * Transforms each element of this stream into a new stream event.
378 *
370 * Creates a new stream that converts each element of this stream 379 * Creates a new stream that converts each element of this stream
371 * to a new value using the [convert] function. 380 * to a new value using the [convert] function, and emits the result.
372 * 381 *
373 * For each data event, `o`, in this stream, the returned stream 382 * For each data event, `o`, in this stream, the returned stream
374 * provides a data event with the value `convert(o)`. 383 * provides a data event with the value `convert(o)`.
375 * If [convert] throws, the returned stream reports the exception as an error 384 * If [convert] throws, the returned stream reports it as an error
376 * event instead. 385 * event instead.
377 * 386 *
378 * Error and done events are passed through unchanged to the returned stream. 387 * Error and done events are passed through unchanged to the returned stream.
379 * 388 *
380 * The returned stream is a broadcast stream if this stream is. 389 * The returned stream is a broadcast stream if this stream is.
381 * The [convert] function is called once per data event per listener. 390 * The [convert] function is called once per data event per listener.
382 * If a broadcast stream is listened to more than once, each subscription 391 * If a broadcast stream is listened to more than once, each subscription
383 * will individually call [convert] on each data event. 392 * will individually call [convert] on each data event.
384 */ 393 */
385 Stream<S> map<S>(S convert(T event)) { 394 Stream<S> map<S>(S convert(T event)) {
(...skipping 125 matching lines...) Expand 10 before | Expand all | Expand 10 after
511 onResume: () { 520 onResume: () {
512 subscription.resume(); 521 subscription.resume();
513 }, 522 },
514 onCancel: () => subscription.cancel(), 523 onCancel: () => subscription.cancel(),
515 sync: true); 524 sync: true);
516 } 525 }
517 return controller.stream; 526 return controller.stream;
518 } 527 }
519 528
520 /** 529 /**
521 * Creates a new stream with the events of a stream per original event. 530 * Transforms each element into a sequence of asynchronous events.
522 * 531 *
523 * This acts like [expand], except that [convert] returns a [Stream] 532 * Returns a new stream and for each event of this stream, do the following:
524 * instead of an [Iterable].
525 * The events of the returned stream becomes the events of the returned
526 * stream, in the order they are produced.
527 * 533 *
528 * If [convert] returns `null`, no value is put on the output stream, 534 * * If the event is an error event or a done event, it is emitted directly
529 * just as if it returned an empty stream. 535 * by the returned stream.
536 * * Otherwise it is an element. Then the [convert] function is called
537 * with the element as argument to produce a convert-stream for the element.
538 * * If that call throws, the error is emitted on the returned stream.
539 * * If the call returnes `null`, no further action is taken for the elements.
540 * * Otherwise, this stream is paused and convert-stream is listened to.
541 * Every data and error event of the convert-stream is emitted on the returned
542 * stream in the order it is produced.
543 * When the convert-stream ends, this stream is resumed.
530 * 544 *
531 * The returned stream is a broadcast stream if this stream is. 545 * The returned stream is a broadcast stream if this stream is.
532 */ 546 */
533 Stream<E> asyncExpand<E>(Stream<E> convert(T event)) { 547 Stream<E> asyncExpand<E>(Stream<E> convert(T event)) {
534 StreamController<E> controller; 548 StreamController<E> controller;
535 StreamSubscription<T> subscription; 549 StreamSubscription<T> subscription;
536 void onListen() { 550 void onListen() {
537 assert(controller is _StreamController || 551 assert(controller is _StreamController ||
538 controller is _BroadcastStreamController); 552 controller is _BroadcastStreamController);
539 final _EventSink<E> eventSink = controller as Object/*=_EventSink<E>*/; 553 final _EventSink<E> eventSink = controller as Object/*=_EventSink<E>*/;
(...skipping 48 matching lines...) Expand 10 before | Expand all | Expand 10 after
588 * trace. The stack trace argument might be `null` if the stream itself 602 * trace. The stack trace argument might be `null` if the stream itself
589 * received an error without stack trace. 603 * received an error without stack trace.
590 * 604 *
591 * An asynchronous error `error` is matched by a test function if 605 * An asynchronous error `error` is matched by a test function if
592 *`test(error)` returns true. If [test] is omitted, every error is considered 606 *`test(error)` returns true. If [test] is omitted, every error is considered
593 * matching. 607 * matching.
594 * 608 *
595 * If the error is intercepted, the [onError] function can decide what to do 609 * If the error is intercepted, the [onError] function can decide what to do
596 * with it. It can throw if it wants to raise a new (or the same) error, 610 * with it. It can throw if it wants to raise a new (or the same) error,
597 * or simply return to make the stream forget the error. 611 * or simply return to make the stream forget the error.
612 * If the received `error` value is thrown again by the [onError] function,
613 * it acts like a `rethrow` and it is emitted along with its original
614 * stack trace, not the stack trace of the `throw` inside [onError].
598 * 615 *
599 * If you need to transform an error into a data event, use the more generic 616 * If you need to transform an error into a data event, use the more generic
600 * [Stream.transform] to handle the event by writing a data event to 617 * [Stream.transform] to handle the event by writing a data event to
601 * the output sink. 618 * the output sink.
602 * 619 *
603 * The returned stream is a broadcast stream if this stream is. 620 * The returned stream is a broadcast stream if this stream is.
604 * If a broadcast stream is listened to more than once, each subscription 621 * If a broadcast stream is listened to more than once, each subscription
605 * will individually perform the `test` and handle the error. 622 * will individually perform the `test` and handle the error.
606 */ 623 */
607 Stream<T> handleError(Function onError, {bool test(error)}) { 624 Stream<T> handleError(Function onError, {bool test(error)}) {
608 return new _HandleErrorStream<T>(this, onError, test); 625 return new _HandleErrorStream<T>(this, onError, test);
609 } 626 }
610 627
611 /** 628 /**
612 * Creates a new stream from this stream that converts each element 629 * Transforms each element of this stream into a sequence of elements.
613 * into zero or more events.
614 * 630 *
615 * Each incoming event is converted to an [Iterable] of new events, 631 * Returns a new stream where each element of this stream is replaced
616 * and each of these new events are then sent by the returned stream 632 * by zero or more data events.
617 * in order. 633 * The event values are proveded as an [Iterable] by a call to [convert]
floitsch 2017/05/30 17:52:47 provided
634 * with the element as argument, and the elements of that iterable is
635 * emitted in iteration order.
636 * If calling [convert] throws, or if iteration of the returned values throws,
floitsch 2017/05/30 17:52:47 if the iteration
637 * the error is emitted on the returned stream and iteration ends for that
638 * element of this stream.
639 *
640 * Error events and the done event of this stream are forwarded directly
641 * to the returned stream.
618 * 642 *
619 * The returned stream is a broadcast stream if this stream is. 643 * The returned stream is a broadcast stream if this stream is.
620 * If a broadcast stream is listened to more than once, each subscription 644 * If a broadcast stream is listened to more than once, each subscription
621 * will individually call `convert` and expand the events. 645 * will individually call `convert` and expand the events.
622 */ 646 */
623 Stream<S> expand<S>(Iterable<S> convert(T value)) { 647 Stream<S> expand<S>(Iterable<S> convert(T element)) {
624 return new _ExpandStream<T, S>(this, convert); 648 return new _ExpandStream<T, S>(this, convert);
625 } 649 }
626 650
627 /** 651 /**
628 * Pipe the events of this stream into [streamConsumer]. 652 * Pipes the events of this stream into [streamConsumer].
629 * 653 *
630 * The events of this stream are added to `streamConsumer` using 654 * All events of this stream are added to `streamConsumer` using
631 * [StreamConsumer.addStream]. 655 * [StreamConsumer.addStream].
632 * The `streamConsumer` is closed when this stream has been successfully added 656 * The `streamConsumer` is closed when this stream has been successfully added
633 * to it - when the future returned by `addStream` completes without an error. 657 * to it - when the future returned by `addStream` completes without an error.
634 * 658 *
635 * Returns a future which completes when the stream has been consumed 659 * Returns a future which completes when the stream has been consumed
636 * and the consumer has been closed. 660 * and the consumer has been closed.
637 * 661 *
638 * The returned future completes with the same result as the future returned 662 * The returned future completes with the same result as the future returned
639 * by [StreamConsumer.close]. 663 * by [StreamConsumer.close].
640 * If the adding of the stream itself fails in some way, 664 * If the call to [StreamConsumer.addStream] fails in some way, this
641 * then the consumer is expected to be closed, and won't be closed again. 665 * method fails in the same way.
642 * In that case the returned future completes with the error from calling
643 * `addStream`.
644 */ 666 */
645 Future pipe(StreamConsumer<T> streamConsumer) { 667 Future pipe(StreamConsumer<T> streamConsumer) {
646 return streamConsumer.addStream(this).then((_) => streamConsumer.close()); 668 return streamConsumer.addStream(this).then((_) => streamConsumer.close());
647 } 669 }
648 670
649 /** 671 /**
650 * Chains this stream as the input of the provided [StreamTransformer]. 672 * Applies a [StreamTransformer] to the current stream.
651 * 673 *
652 * Returns the result of [:streamTransformer.bind:] itself. 674 * Returns the result of the stream transformation,
675 * that is, the result of `streamTransformer.bind(this)`.
676 * This method simply allows writing the call to `streamTransformer.bind`
677 * in a chained fashion, like
678 * ```
679 * stream.map(mapping).transform(transformation).toList()
680 * ```
681 * which can be more convenient than calling `bind` directly.
653 * 682 *
654 * The `streamTransformer` can decide whether it wants to return a 683 * The [streamTransformer] can return any stream.
655 * broadcast stream or not. 684 * Whether the returned stream is a broadcast stream or not,
685 * and which elements it will contain,
686 * is entirely up to the transformation.
656 */ 687 */
657 Stream<S> transform<S>(StreamTransformer<T, S> streamTransformer) { 688 Stream<S> transform<S>(StreamTransformer<T, S> streamTransformer) {
658 return streamTransformer.bind(this); 689 return streamTransformer.bind(this);
659 } 690 }
660 691
661 /** 692 /**
662 * Reduces a sequence of values by repeatedly applying [combine]. 693 * Combines a sequence of values by repeatedly applying [combine].
694 *
695 * Similar to [Iterable.reduce], this function maintains a value,
696 * starting with the first element of the stream
697 * and updated for each further element of this stream.
698 * For each element after the first,
699 * the value is updated to the result of calling [combine]
700 * with the previous value and the element.
701 *
702 * When this stream is done, the returned future is completed with
703 * the value at that time.
704 *
705 * If the stream is empty, the returned future is completed with
706 * an error.
707 * If this stream emits an error, or the call to [combine] throws,
708 * the returned future is completed with that error,
709 * and processing is stopped.
663 */ 710 */
664 Future<T> reduce(T combine(T previous, T element)) { 711 Future<T> reduce(T combine(T previous, T element)) {
665 _Future<T> result = new _Future<T>(); 712 _Future<T> result = new _Future<T>();
666 bool seenFirst = false; 713 bool seenFirst = false;
667 T value; 714 T value;
668 StreamSubscription subscription; 715 StreamSubscription subscription;
669 subscription = this.listen( 716 subscription = this.listen(
670 (T element) { 717 (T element) {
671 if (seenFirst) { 718 if (seenFirst) {
672 _runUserCode(() => combine(value, element), (T newValue) { 719 _runUserCode(() => combine(value, element), (T newValue) {
673 value = newValue; 720 value = newValue;
674 }, _cancelAndErrorClosure(subscription, result)); 721 }, _cancelAndErrorClosure(subscription, result));
675 } else { 722 } else {
676 value = element; 723 value = element;
677 seenFirst = true; 724 seenFirst = true;
678 } 725 }
679 }, 726 },
680 onError: result._completeError, 727 onError: result._completeError,
681 onDone: () { 728 onDone: () {
682 if (!seenFirst) { 729 if (!seenFirst) {
683 try { 730 try {
731 // Throw and recatch, instead of just doing
732 // _completeWithErrorCallback, e, theError, StackTrace.current),
733 // to ensure that the stackTrace is set on the error.
684 throw IterableElementError.noElement(); 734 throw IterableElementError.noElement();
685 } catch (e, s) { 735 } catch (e, s) {
686 _completeWithErrorCallback(result, e, s); 736 _completeWithErrorCallback(result, e, s);
687 } 737 }
688 } else { 738 } else {
689 result._complete(value); 739 result._complete(value);
690 } 740 }
691 }, 741 },
692 cancelOnError: true); 742 cancelOnError: true);
693 return result; 743 return result;
694 } 744 }
695 745
696 /** Reduces a sequence of values by repeatedly applying [combine]. */ 746 /**
747 * Combines a sequence of values by repeatedly applying [combine].
748 *
749 * Similar to [Iterable.fold], this function maintains a value,
750 * starting with [initialValue] and updated for each element of
751 * this stream.
752 * For each element, the value is updated to the result of calling
753 * [combine] with the previous value and the element.
754 *
755 * When this stream is done, the returned future is completed with
756 * the value at that time.
757 * For an empty stream, the future is completed with [initialValue].
758 *
759 * If this stream emits an error, or the call to [combine] throws,
760 * the returned future is completed with that error,
761 * and processing is stopped.
762 */
697 Future<S> fold<S>(S initialValue, S combine(S previous, T element)) { 763 Future<S> fold<S>(S initialValue, S combine(S previous, T element)) {
698 _Future<S> result = new _Future<S>(); 764 _Future<S> result = new _Future<S>();
699 S value = initialValue; 765 S value = initialValue;
700 StreamSubscription subscription; 766 StreamSubscription subscription;
701 subscription = this.listen((T element) { 767 subscription = this.listen(
702 _runUserCode(() => combine(value, element), (S newValue) { 768 (T element) {
703 value = newValue; 769 _runUserCode(() => combine(value, element), (S newValue) {
704 }, _cancelAndErrorClosure(subscription, result)); 770 value = newValue;
705 }, onError: (e, st) { 771 }, _cancelAndErrorClosure(subscription, result));
706 result._completeError(e, st); 772 },
707 }, onDone: () { 773 onError: result._completeError,
708 result._complete(value); 774 onDone: () {
709 }, cancelOnError: true); 775 result._complete(value);
776 },
777 cancelOnError: true);
710 return result; 778 return result;
711 } 779 }
712 780
713 /** 781 /**
714 * Collects string of data events' string representations. 782 * Combines the string representation of elements into a single string.
715 * 783 *
716 * If [separator] is provided, it is inserted between any two 784 * Each element is converted to a string using its [Object.toString] method.
717 * elements. 785 * If [separator] is provided, it is inserted between element string
786 * representations.
718 * 787 *
719 * Any error in the stream causes the future to complete with that 788 * The returned future is completed with the combined string when the stream
720 * error. Otherwise it completes with the collected string when 789 * is done.
721 * the "done" event arrives. 790 *
791 * If the stream contains an error, or if the call to [Object.toString]
792 * throws, the returned future is completed with that error,
793 * and processing stops.
722 */ 794 */
723 Future<String> join([String separator = ""]) { 795 Future<String> join([String separator = ""]) {
724 _Future<String> result = new _Future<String>(); 796 _Future<String> result = new _Future<String>();
725 StringBuffer buffer = new StringBuffer(); 797 StringBuffer buffer = new StringBuffer();
726 StreamSubscription subscription; 798 StreamSubscription subscription;
727 bool first = true; 799 bool first = true;
728 subscription = this.listen((T element) { 800 subscription = this.listen((T element) {
729 if (!first) { 801 if (!first) {
730 buffer.write(separator); 802 buffer.write(separator);
731 } 803 }
732 first = false; 804 first = false;
733 try { 805 try {
734 buffer.write(element); 806 buffer.write(element);
735 } catch (e, s) { 807 } catch (e, s) {
736 _cancelAndErrorWithReplacement(subscription, result, e, s); 808 _cancelAndErrorWithReplacement(subscription, result, e, s);
737 } 809 }
738 }, onError: (e) { 810 }, onError: (e) {
739 result._completeError(e); 811 result._completeError(e);
740 }, onDone: () { 812 }, onDone: () {
741 result._complete(buffer.toString()); 813 result._complete(buffer.toString());
742 }, cancelOnError: true); 814 }, cancelOnError: true);
743 return result; 815 return result;
744 } 816 }
745 817
746 /** 818 /**
747 * Checks whether [needle] occurs in the elements provided by this stream. 819 * Returns whether [needle] occurs in the elements provided by this stream.
748 * 820 *
749 * Completes the [Future] when the answer is known. 821 * Compares each element of this stream to [needle] using [Object.==].
750 * If this stream reports an error, the [Future] will report that error. 822 * If an equal element is found, the returned future is completed with `true`.
823 * If the stream ends without finding a match, the future is completed with
824 * `false`.
825 *
826 * If the stream contains an error, or the call to `Object.==` throws,
827 * the returned future is completed with that error, and processing stops.
751 */ 828 */
752 Future<bool> contains(Object needle) { 829 Future<bool> contains(Object needle) {
753 _Future<bool> future = new _Future<bool>(); 830 _Future<bool> future = new _Future<bool>();
754 StreamSubscription subscription; 831 StreamSubscription subscription;
755 subscription = this.listen( 832 subscription = this.listen(
756 (T element) { 833 (T element) {
757 _runUserCode(() => (element == needle), (bool isMatch) { 834 _runUserCode(() => (element == needle), (bool isMatch) {
758 if (isMatch) { 835 if (isMatch) {
759 _cancelAndValue(subscription, future, true); 836 _cancelAndValue(subscription, future, true);
760 } 837 }
761 }, _cancelAndErrorClosure(subscription, future)); 838 }, _cancelAndErrorClosure(subscription, future));
762 }, 839 },
763 onError: future._completeError, 840 onError: future._completeError,
764 onDone: () { 841 onDone: () {
765 future._complete(false); 842 future._complete(false);
766 }, 843 },
767 cancelOnError: true); 844 cancelOnError: true);
768 return future; 845 return future;
769 } 846 }
770 847
771 /** 848 /**
772 * Executes [action] on each data event of the stream. 849 * Executes [action] on each element of the stream.
773 * 850 *
774 * Completes the returned [Future] when all events of the stream 851 * Completes the returned [Future] when all elements of the stream
775 * have been processed. Completes the future with an error if the 852 * have been processed.
776 * stream has an error event, or if [action] throws. 853 *
854 * If the stream contains an error, or if the call to [action] throws,
855 * the returne future completes with that error, and processing stops.
777 */ 856 */
778 Future forEach(void action(T element)) { 857 Future forEach(void action(T element)) {
779 _Future future = new _Future(); 858 _Future future = new _Future();
780 StreamSubscription subscription; 859 StreamSubscription subscription;
781 subscription = this.listen( 860 subscription = this.listen(
782 (T element) { 861 (T element) {
783 // TODO(floitsch): the type should be 'void' and inferred. 862 // TODO(floitsch): the type should be 'void' and inferred.
784 _runUserCode<dynamic>(() => action(element), (_) {}, 863 _runUserCode<dynamic>(() => action(element), (_) {},
785 _cancelAndErrorClosure(subscription, future)); 864 _cancelAndErrorClosure(subscription, future));
786 }, 865 },
787 onError: future._completeError, 866 onError: future._completeError,
788 onDone: () { 867 onDone: () {
789 future._complete(null); 868 future._complete(null);
790 }, 869 },
791 cancelOnError: true); 870 cancelOnError: true);
792 return future; 871 return future;
793 } 872 }
794 873
795 /** 874 /**
796 * Checks whether [test] accepts all elements provided by this stream. 875 * Checks whether [test] accepts all elements provided by this stream.
797 * 876 *
798 * Completes the [Future] when the answer is known. 877 * Calls [test] on each element of the stream.
799 * If this stream reports an error, the [Future] will report that error. 878 * If the call returns `false`, the returned future is completed with `false`
879 * and processing stops.
880 *
881 * If the stream ends without finding an element that [test] rejects,
882 * the returned future is completed with `true`.
883 *
884 * If this stream contains an error, or if the call to [test] throws,
885 * the returned future is completed with that error, and processing stops.
800 */ 886 */
801 Future<bool> every(bool test(T element)) { 887 Future<bool> every(bool test(T element)) {
802 _Future<bool> future = new _Future<bool>(); 888 _Future<bool> future = new _Future<bool>();
803 StreamSubscription subscription; 889 StreamSubscription subscription;
804 subscription = this.listen( 890 subscription = this.listen(
805 (T element) { 891 (T element) {
806 _runUserCode(() => test(element), (bool isMatch) { 892 _runUserCode(() => test(element), (bool isMatch) {
807 if (!isMatch) { 893 if (!isMatch) {
808 _cancelAndValue(subscription, future, false); 894 _cancelAndValue(subscription, future, false);
809 } 895 }
810 }, _cancelAndErrorClosure(subscription, future)); 896 }, _cancelAndErrorClosure(subscription, future));
811 }, 897 },
812 onError: future._completeError, 898 onError: future._completeError,
813 onDone: () { 899 onDone: () {
814 future._complete(true); 900 future._complete(true);
815 }, 901 },
816 cancelOnError: true); 902 cancelOnError: true);
817 return future; 903 return future;
818 } 904 }
819 905
820 /** 906 /**
821 * Checks whether [test] accepts any element provided by this stream. 907 * Checks whether [test] accepts any element provided by this stream.
822 * 908 *
823 * Completes the [Future] when the answer is known. 909 * Calls [test] on each element of the stream.
910 * If the call returns `true`, the returned future is completed with `true`
911 * and processing stops.
824 * 912 *
825 * If this stream reports an error, the [Future] reports that error. 913 * If the stream ends without finding an element that [test] accepts,
914 * the returned future is completed with `false`.
826 * 915 *
827 * Stops listening to the stream after the first matching element has been 916 * If this stream contains an error, or if the call to [test] throws,
828 * found. 917 * the returned future is completed with that error, and processing stops.
829 *
830 * Internally the method cancels its subscription after this element. This
831 * means that single-subscription (non-broadcast) streams are closed and
832 * cannot be reused after a call to this method.
833 */ 918 */
834 Future<bool> any(bool test(T element)) { 919 Future<bool> any(bool test(T element)) {
835 _Future<bool> future = new _Future<bool>(); 920 _Future<bool> future = new _Future<bool>();
836 StreamSubscription subscription; 921 StreamSubscription subscription;
837 subscription = this.listen( 922 subscription = this.listen(
838 (T element) { 923 (T element) {
839 _runUserCode(() => test(element), (bool isMatch) { 924 _runUserCode(() => test(element), (bool isMatch) {
840 if (isMatch) { 925 if (isMatch) {
841 _cancelAndValue(subscription, future, true); 926 _cancelAndValue(subscription, future, true);
842 } 927 }
843 }, _cancelAndErrorClosure(subscription, future)); 928 }, _cancelAndErrorClosure(subscription, future));
844 }, 929 },
845 onError: future._completeError, 930 onError: future._completeError,
846 onDone: () { 931 onDone: () {
847 future._complete(false); 932 future._complete(false);
848 }, 933 },
849 cancelOnError: true); 934 cancelOnError: true);
850 return future; 935 return future;
851 } 936 }
852 937
853 /** Counts the elements in the stream. */ 938 /**
939 * The number of elements in this stream.
940 *
941 * Waits for all elements of this stream. When the stream ends,
942 * the returned future is completed with the number of elements.
943 *
944 * If the stream contains an error, the returned future is completed with
945 * that error, and processing stops.
946 *
947 * This operation listens to the stream, and a non-broadcast stream cannot
948 * be reused after finding its length.
949 */
854 Future<int> get length { 950 Future<int> get length {
855 _Future<int> future = new _Future<int>(); 951 _Future<int> future = new _Future<int>();
856 int count = 0; 952 int count = 0;
857 this.listen( 953 this.listen(
858 (_) { 954 (_) {
859 count++; 955 count++;
860 }, 956 },
861 onError: future._completeError, 957 onError: future._completeError,
862 onDone: () { 958 onDone: () {
863 future._complete(count); 959 future._complete(count);
864 }, 960 },
865 cancelOnError: true); 961 cancelOnError: true);
866 return future; 962 return future;
867 } 963 }
868 964
869 /** 965 /**
870 * Reports whether this stream contains any elements. 966 * Whether this stream contains any elements.
871 * 967 *
872 * Stops listening to the stream after the first element has been received. 968 * Waits for the first element of this stream, then completes the returned
969 * future with `true`.
970 * If the stream ends without emitting any elements, the returned future is
971 * completed with `false`.
873 * 972 *
874 * Internally the method cancels its subscription after the first element. 973 * If the first event is an error, the returned future is completed with that
875 * This means that single-subscription (non-broadcast) streams are closed and 974 * error.
876 * cannot be reused after a call to this getter. 975 *
976 * This operation listens to the stream, and a non-broadcast stream cannot
977 * be reused after checking whether it is empty.
877 */ 978 */
878 Future<bool> get isEmpty { 979 Future<bool> get isEmpty {
879 _Future<bool> future = new _Future<bool>(); 980 _Future<bool> future = new _Future<bool>();
880 StreamSubscription subscription; 981 StreamSubscription subscription;
881 subscription = this.listen( 982 subscription = this.listen(
882 (_) { 983 (_) {
883 _cancelAndValue(subscription, future, false); 984 _cancelAndValue(subscription, future, false);
884 }, 985 },
885 onError: future._completeError, 986 onError: future._completeError,
886 onDone: () { 987 onDone: () {
887 future._complete(true); 988 future._complete(true);
888 }, 989 },
889 cancelOnError: true); 990 cancelOnError: true);
890 return future; 991 return future;
891 } 992 }
892 993
893 /** Collects the data of this stream in a [List]. */ 994 /**
995 * Collects all elements of this stream in a [List].
996 *
997 * Creates a `List<T>` and adds all elements of the stream to the list
998 * in the order they arrive.
999 * When the stream ends, the returned future is completed with that list.
1000 *
1001 * If the stream contains an error, the returned future is completed
1002 * with that error, and processing stops.
1003 */
894 Future<List<T>> toList() { 1004 Future<List<T>> toList() {
895 List<T> result = <T>[]; 1005 List<T> result = <T>[];
896 _Future<List<T>> future = new _Future<List<T>>(); 1006 _Future<List<T>> future = new _Future<List<T>>();
897 this.listen( 1007 this.listen(
898 (T data) { 1008 (T data) {
899 result.add(data); 1009 result.add(data);
900 }, 1010 },
901 onError: future._completeError, 1011 onError: future._completeError,
902 onDone: () { 1012 onDone: () {
903 future._complete(result); 1013 future._complete(result);
(...skipping 131 matching lines...) Expand 10 before | Expand all | Expand 10 after
1035 * 1145 *
1036 * The returned stream is a broadcast stream if this stream is. 1146 * The returned stream is a broadcast stream if this stream is.
1037 * If a broadcast stream is listened to more than once, each subscription 1147 * If a broadcast stream is listened to more than once, each subscription
1038 * will individually perform the `equals` test. 1148 * will individually perform the `equals` test.
1039 */ 1149 */
1040 Stream<T> distinct([bool equals(T previous, T next)]) { 1150 Stream<T> distinct([bool equals(T previous, T next)]) {
1041 return new _DistinctStream<T>(this, equals); 1151 return new _DistinctStream<T>(this, equals);
1042 } 1152 }
1043 1153
1044 /** 1154 /**
1045 * Returns the first element of the stream. 1155 * The first element of the stream.
1046 * 1156 *
1047 * Stops listening to the stream after the first element has been received. 1157 * Stops listening to the stream after the first element has been received.
1048 * 1158 *
1049 * Internally the method cancels its subscription after the first element. 1159 * Internally the method cancels its subscription after the first element.
1050 * This means that single-subscription (non-broadcast) streams are closed 1160 * This means that single-subscription (non-broadcast) streams are closed
1051 * and cannot be reused after a call to this getter. 1161 * and cannot be reused after a call to this getter.
1052 * 1162 *
1053 * If an error event occurs before the first data event, the resulting future 1163 * If an error event occurs before the first data event, the returned future
1054 * is completed with that error. 1164 * is completed with that error.
1055 * 1165 *
1056 * If this stream is empty (a done event occurs before the first data event), 1166 * If this stream is empty (a done event occurs before the first data event),
1057 * the resulting future completes with a [StateError]. 1167 * the returned future completes with an error.
1058 * 1168 *
1059 * Except for the type of the error, this method is equivalent to 1169 * Except for the type of the error, this method is equivalent to
1060 * [:this.elementAt(0):]. 1170 * `this.elementAt(0)`.
1061 */ 1171 */
1062 Future<T> get first { 1172 Future<T> get first {
1063 _Future<T> future = new _Future<T>(); 1173 _Future<T> future = new _Future<T>();
1064 StreamSubscription subscription; 1174 StreamSubscription subscription;
1065 subscription = this.listen( 1175 subscription = this.listen(
1066 (T value) { 1176 (T value) {
1067 _cancelAndValue(subscription, future, value); 1177 _cancelAndValue(subscription, future, value);
1068 }, 1178 },
1069 onError: future._completeError, 1179 onError: future._completeError,
1070 onDone: () { 1180 onDone: () {
1071 try { 1181 try {
1072 throw IterableElementError.noElement(); 1182 throw IterableElementError.noElement();
1073 } catch (e, s) { 1183 } catch (e, s) {
1074 _completeWithErrorCallback(future, e, s); 1184 _completeWithErrorCallback(future, e, s);
1075 } 1185 }
1076 }, 1186 },
1077 cancelOnError: true); 1187 cancelOnError: true);
1078 return future; 1188 return future;
1079 } 1189 }
1080 1190
1081 /** 1191 /**
1082 * Returns the last element of the stream. 1192 * The last element of this stream.
1083 * 1193 *
1084 * If an error event occurs before the first data event, the resulting future 1194 * If this stream emits an error event,
1085 * is completed with that error. 1195 * the returned future is completed with that error
1196 * and processing stops.
1086 * 1197 *
1087 * If this stream is empty (a done event occurs before the first data event), 1198 * If this stream is empty (the done event is the first event),
1088 * the resulting future completes with a [StateError]. 1199 * the returned future completes with an error.
1089 */ 1200 */
1090 Future<T> get last { 1201 Future<T> get last {
1091 _Future<T> future = new _Future<T>(); 1202 _Future<T> future = new _Future<T>();
1092 T result = null; 1203 T result = null;
1093 bool foundResult = false; 1204 bool foundResult = false;
1094 listen( 1205 listen(
1095 (T value) { 1206 (T value) {
1096 foundResult = true; 1207 foundResult = true;
1097 result = value; 1208 result = value;
1098 }, 1209 },
1099 onError: future._completeError, 1210 onError: future._completeError,
1100 onDone: () { 1211 onDone: () {
1101 if (foundResult) { 1212 if (foundResult) {
1102 future._complete(result); 1213 future._complete(result);
1103 return; 1214 return;
1104 } 1215 }
1105 try { 1216 try {
1106 throw IterableElementError.noElement(); 1217 throw IterableElementError.noElement();
1107 } catch (e, s) { 1218 } catch (e, s) {
1108 _completeWithErrorCallback(future, e, s); 1219 _completeWithErrorCallback(future, e, s);
1109 } 1220 }
1110 }, 1221 },
1111 cancelOnError: true); 1222 cancelOnError: true);
1112 return future; 1223 return future;
1113 } 1224 }
1114 1225
1115 /** 1226 /**
1116 * Returns the single element. 1227 * The single element of this stream.
1117 * 1228 *
1118 * If an error event occurs before or after the first data event, the 1229 * If this stream emits an error event,
1119 * resulting future is completed with that error. 1230 * the returned future is completed with that error
1231 * and processing stops.
1120 * 1232 *
1121 * If [this] is empty or has more than one element throws a [StateError]. 1233 * If [this] is empty or has more than one element,
1234 * the returned future completes with an error.
1122 */ 1235 */
1123 Future<T> get single { 1236 Future<T> get single {
1124 _Future<T> future = new _Future<T>(); 1237 _Future<T> future = new _Future<T>();
1125 T result = null; 1238 T result = null;
1126 bool foundResult = false; 1239 bool foundResult = false;
1127 StreamSubscription subscription; 1240 StreamSubscription subscription;
1128 subscription = this.listen( 1241 subscription = this.listen(
1129 (T value) { 1242 (T value) {
1130 if (foundResult) { 1243 if (foundResult) {
1131 // This is the second element we get. 1244 // This is the second element we get.
(...skipping 19 matching lines...) Expand all
1151 _completeWithErrorCallback(future, e, s); 1264 _completeWithErrorCallback(future, e, s);
1152 } 1265 }
1153 }, 1266 },
1154 cancelOnError: true); 1267 cancelOnError: true);
1155 return future; 1268 return future;
1156 } 1269 }
1157 1270
1158 /** 1271 /**
1159 * Finds the first element of this stream matching [test]. 1272 * Finds the first element of this stream matching [test].
1160 * 1273 *
1161 * Returns a future that is filled with the first element of this stream 1274 * Returns a future that is completed with the first element of this stream
1162 * that [test] returns true for. 1275 * that [test] returns `true` for.
1163 * 1276 *
1164 * If no such element is found before this stream is done, and a 1277 * If no such element is found before this stream is done, and a
1165 * [defaultValue] function is provided, the result of calling [defaultValue] 1278 * [defaultValue] function is provided, the result of calling [defaultValue]
1166 * becomes the value of the future. 1279 * becomes the value of the future. If [defaultValue] throws, the returned
1280 * future is completed with that error.
1167 * 1281 *
1168 * Stops listening to the stream after the first matching element has been 1282 * If this stream emits an error before the first matching element,
1169 * received. 1283 * the returned future is completed with that error, and processing stops.
1284 *
1285 * Stops listening to the stream after the first matching element or error
1286 * has been received.
1170 * 1287 *
1171 * Internally the method cancels its subscription after the first element that 1288 * Internally the method cancels its subscription after the first element that
1172 * matches the predicate. This means that single-subscription (non-broadcast) 1289 * matches the predicate. This means that single-subscription (non-broadcast)
1173 * streams are closed and cannot be reused after a call to this method. 1290 * streams are closed and cannot be reused after a call to this method.
1174 * 1291 *
1175 * If an error occurs, or if this stream ends without finding a match and 1292 * If an error occurs, or if this stream ends without finding a match and
1176 * with no [defaultValue] function provided, the future will receive an 1293 * with no [defaultValue] function provided,
1177 * error. 1294 * the returned future is completed with an error.
1178 */ 1295 */
1179 Future<dynamic> firstWhere(bool test(T element), {Object defaultValue()}) { 1296 Future<dynamic> firstWhere(bool test(T element), {Object defaultValue()}) {
1180 _Future<dynamic> future = new _Future(); 1297 _Future<dynamic> future = new _Future();
1181 StreamSubscription subscription; 1298 StreamSubscription subscription;
1182 subscription = this.listen( 1299 subscription = this.listen(
1183 (T value) { 1300 (T value) {
1184 _runUserCode(() => test(value), (bool isMatch) { 1301 _runUserCode(() => test(value), (bool isMatch) {
1185 if (isMatch) { 1302 if (isMatch) {
1186 _cancelAndValue(subscription, future, value); 1303 _cancelAndValue(subscription, future, value);
1187 } 1304 }
(...skipping 11 matching lines...) Expand all
1199 _completeWithErrorCallback(future, e, s); 1316 _completeWithErrorCallback(future, e, s);
1200 } 1317 }
1201 }, 1318 },
1202 cancelOnError: true); 1319 cancelOnError: true);
1203 return future; 1320 return future;
1204 } 1321 }
1205 1322
1206 /** 1323 /**
1207 * Finds the last element in this stream matching [test]. 1324 * Finds the last element in this stream matching [test].
1208 * 1325 *
1209 * As [firstWhere], except that the last matching element is found. 1326 * If this stream emits an error, the returned future is completed with that
1210 * That means that the result cannot be provided before this stream 1327 * error, and processing stops.
1328 *
1329 * Otherwise as [firstWhere], except that the last matching element is found
1330 * instead of the first.
1331 * That means that a non-error result cannot be provided before this stream
1211 * is done. 1332 * is done.
1212 */ 1333 */
1213 Future<dynamic> lastWhere(bool test(T element), {Object defaultValue()}) { 1334 Future<dynamic> lastWhere(bool test(T element), {Object defaultValue()}) {
1214 _Future<dynamic> future = new _Future(); 1335 _Future<dynamic> future = new _Future();
1215 T result = null; 1336 T result = null;
1216 bool foundResult = false; 1337 bool foundResult = false;
1217 StreamSubscription subscription; 1338 StreamSubscription subscription;
1218 subscription = this.listen( 1339 subscription = this.listen(
1219 (T value) { 1340 (T value) {
1220 _runUserCode(() => true == test(value), (bool isMatch) { 1341 _runUserCode(() => true == test(value), (bool isMatch) {
(...skipping 235 matching lines...) Expand 10 before | Expand all | Expand 10 after
1456 * operation). If the listener wants to delete the file after having 1577 * operation). If the listener wants to delete the file after having
1457 * canceled the subscription, it must wait for the cleanup future to complete. 1578 * canceled the subscription, it must wait for the cleanup future to complete.
1458 * 1579 *
1459 * A returned future completes with a `null` value. 1580 * A returned future completes with a `null` value.
1460 * If the cleanup throws, which it really shouldn't, the returned future 1581 * If the cleanup throws, which it really shouldn't, the returned future
1461 * completes with that error. 1582 * completes with that error.
1462 */ 1583 */
1463 Future cancel(); 1584 Future cancel();
1464 1585
1465 /** 1586 /**
1466 * Set or override the data event handler of this subscription. 1587 * Replaces the data event handler of this subscription.
1467 * 1588 *
1468 * This method overrides the handler that has been set at the invocation of 1589 * The [handleData] function is called for each element of the stream
1469 * [Stream.listen]. 1590 * after this function is called.
1591 * If [handleData] is `null`, further elements are ignored.
1592 *
1593 * This method replaces the current handler set by the invocation of
1594 * [Stream.listen] or by a previous call to [onData].
1470 */ 1595 */
1471 void onData(void handleData(T data)); 1596 void onData(void handleData(T data));
1472 1597
1473 /** 1598 /**
1474 * Set or override the error event handler of this subscription. 1599 * Replaces the error event handler of this subscription.
1475 * 1600 *
1476 * This method overrides the handler that has been set at the invocation of 1601 * The [handleError] function must be able to be called with either
1477 * [Stream.listen] or by calling [asFuture]. 1602 * one positional argument, or with two positional arguments
1603 * where the seconds is always a [StackTrace].
1604 *
1605 * The [handleError] argument may be `null`, in which case further
1606 * error events are considered unhandled, and will be reported to
1607 * [Zone.handleUncaughtError].
1608 *
1609 * The provided function is called for all error events from the
1610 * stream subscription.
1611 *
1612 * This method replaces the current handler set by the invocation of
1613 * [Stream.listen], by calling [asFuture], or by a previous call to [onError].
1478 */ 1614 */
1479 void onError(Function handleError); 1615 void onError(Function handleError);
1480 1616
1481 /** 1617 /**
1482 * Set or override the done event handler of this subscription. 1618 * Replaces the done event handler of this subscription.
1483 * 1619 *
1484 * This method overrides the handler that has been set at the invocation of 1620 * The [handleDone] function is called when the stream closes.
1485 * [Stream.listen] or by calling [asFuture]. 1621 * The value may be `null`, in which case no function is called.
1622 *
1623 * This method reaplces the current handler set by the invocation of
1624 * [Stream.listen], by calling [asFuture], or by a previous call to [onDone].
1486 */ 1625 */
1487 void onDone(void handleDone()); 1626 void onDone(void handleDone());
1488 1627
1489 /** 1628 /**
1490 * Request that the stream pauses events until further notice. 1629 * Request that the stream pauses events until further notice.
1491 * 1630 *
1492 * While paused, the subscription will not fire any events. 1631 * While paused, the subscription will not fire any events.
1493 * If it receives events from its source, they will be buffered until 1632 * If it receives events from its source, they will be buffered until
1494 * the subscription is resumed. 1633 * the subscription is resumed.
1495 * The underlying source is usually informed about the pause, 1634 * For non-broadcast streams, the underlying source is usually informed
1635 * about the pause,
1496 * so it can stop generating events until the subscription is resumed. 1636 * so it can stop generating events until the subscription is resumed.
1497 * 1637 *
1498 * To avoid buffering events on a broadcast stream, it is better to 1638 * To avoid buffering events on a broadcast stream, it is better to
1499 * cancel this subscription, and start to listen again when events 1639 * cancel this subscription, and start to listen again when events
1500 * are needed. 1640 * are needed, if the intermediate events are not important.
1501 * 1641 *
1502 * If [resumeSignal] is provided, the stream will undo the pause 1642 * If [resumeSignal] is provided, the stream subscription will undo the pause
1503 * when the future completes. If the future completes with an error, 1643 * when the future completes, as if by a call to [resume].
1504 * the stream will resume, but the error will not be handled! 1644 * If the future completes with an error,
1645 * the stream will still resume, but the error will be considered unhandled
1646 * and is passed to [Zone.handleUncaughtError].
1505 * 1647 *
1506 * A call to [resume] will also undo a pause. 1648 * A call to [resume] will also undo a pause.
1507 * 1649 *
1508 * If the subscription is paused more than once, an equal number 1650 * If the subscription is paused more than once, an equal number
1509 * of resumes must be performed to resume the stream. 1651 * of resumes must be performed to resume the stream.
1510 * 1652 *
1511 * Currently DOM streams silently drop events when the stream is paused. This 1653 * Currently DOM streams silently drop events when the stream is paused. This
1512 * is a bug and will be fixed. 1654 * is a bug and will be fixed.
1513 */ 1655 */
1514 void pause([Future resumeSignal]); 1656 void pause([Future resumeSignal]);
1515 1657
1516 /** 1658 /**
1517 * Resume after a pause. 1659 * Resume after a pause.
1660 *
1661 * This undoes one previous call to [pause].
1662 * When all previously calls to [pause] have been matched by a calls to
1663 * [resume], possibly through a `resumeSignal` passed to [pause],
1664 * the stream subscription may emit events again.
1518 */ 1665 */
1519 void resume(); 1666 void resume();
1520 1667
1521 /** 1668 /**
1522 * Returns true if the [StreamSubscription] is paused. 1669 * Whether the [StreamSubscription] is currently paused.
1670 *
1671 * If there have been more calls to [pause] than to [resume] on this
1672 * stream subscription, the subscription is paused, and this getter
1673 * returns `true`.
1674 *
1675 * Returns `false` if the stream can currently emit events, or if
1676 * the subscription has completed or been cancelled.
1523 */ 1677 */
1524 bool get isPaused; 1678 bool get isPaused;
1525 1679
1526 /** 1680 /**
1527 * Returns a future that handles the [onDone] and [onError] callbacks. 1681 * Returns a future that handles the [onDone] and [onError] callbacks.
1528 * 1682 *
1529 * This method *overwrites* the existing [onDone] and [onError] callbacks 1683 * This method *overwrites* the existing [onDone] and [onError] callbacks
1530 * with new ones that complete the returned future. 1684 * with new ones that complete the returned future.
1531 * 1685 *
1532 * In case of an error the subscription will automatically cancel (even 1686 * In case of an error the subscription will automatically cancel (even
(...skipping 152 matching lines...) Expand 10 before | Expand all | Expand 10 after
1685 * Return a future which is completed when the [StreamSink] is finished. 1839 * Return a future which is completed when the [StreamSink] is finished.
1686 * 1840 *
1687 * If the `StreamSink` fails with an error, 1841 * If the `StreamSink` fails with an error,
1688 * perhaps in response to adding events using [add], [addError] or [close], 1842 * perhaps in response to adding events using [add], [addError] or [close],
1689 * the [done] future will complete with that error. 1843 * the [done] future will complete with that error.
1690 * 1844 *
1691 * Otherwise, the returned future will complete when either: 1845 * Otherwise, the returned future will complete when either:
1692 * 1846 *
1693 * * all events have been processed and the sink has been closed, or 1847 * * all events have been processed and the sink has been closed, or
1694 * * the sink has otherwise been stopped from handling more events 1848 * * the sink has otherwise been stopped from handling more events
1695 * (for example by cancelling a stream subscription). 1849 * (for example by canceling a stream subscription).
1696 */ 1850 */
1697 Future get done; 1851 Future get done;
1698 } 1852 }
1699 1853
1700 /** 1854 /**
1701 * Transforms a Stream. 1855 * Transforms a Stream.
1702 * 1856 *
1703 * When a stream's [Stream.transform] method is invoked with a 1857 * When a stream's [Stream.transform] method is invoked with a
1704 * [StreamTransformer], the stream calls the [bind] method on the provided 1858 * [StreamTransformer], the stream calls the [bind] method on the provided
1705 * transformer. The resulting stream is then returned from the 1859 * transformer. The resulting stream is then returned from the
(...skipping 201 matching lines...) Expand 10 before | Expand all | Expand 10 after
1907 2061
1908 void addError(error, [StackTrace stackTrace]) { 2062 void addError(error, [StackTrace stackTrace]) {
1909 _sink.addError(error, stackTrace); 2063 _sink.addError(error, stackTrace);
1910 } 2064 }
1911 2065
1912 void close() { 2066 void close() {
1913 _sink.close(); 2067 _sink.close();
1914 } 2068 }
1915 } 2069 }
1916 2070
1917 /// A group created by [Stream.groupBy] or [Stream.groupByMapped]. 2071 /// A group created by [Stream.groupBy].
1918 /// 2072 ///
1919 /// The stream created by `groupBy` emits a `GroupedEvents` for each distinct ke y 2073 /// The stream created by `groupBy` emits a `GroupedEvents`
1920 /// it encounters. 2074 /// for each distinct key it encounters.
1921 /// This group contains the [key] itself, along with a stream of the [values] 2075 /// This group contains the [key] itself, along with a stream of the [values]
1922 /// associated with that key. 2076 /// associated with that key.
1923 class GroupedEvents<K, V> { 2077 class GroupedEvents<K, V> {
1924 /// The key that identifiers the values emitted by [values]. 2078 /// The key that identifiers the values emitted by [values].
1925 final K key; 2079 final K key;
1926 2080
1927 /// The [values] that [GroupBy] have grouped by the common [key]. 2081 /// The [values] that [GroupBy] have grouped by the common [key].
1928 final Stream<V> values; 2082 final Stream<V> values;
1929 2083
1930 factory GroupedEvents(K key, Stream<V> values) = GroupedEvents<K, V>._; 2084 factory GroupedEvents(K key, Stream<V> values) = GroupedEvents<K, V>._;
1931 2085
1932 // Don't expose a generative constructor. 2086 // Don't expose a generative constructor.
1933 // This class is not intended for subclassing, so we don't want to promise 2087 // This class is not intended for subclassing, so we don't want to promise
1934 // it. We can change that in the future. 2088 // it. We can change that in the future.
1935 GroupedEvents._(this.key, this.values); 2089 GroupedEvents._(this.key, this.values);
1936 2090
1937 /// Tells [values] to discard values instead of retaining them. 2091 /// Tells [values] to discard values instead of retaining them.
1938 /// 2092 ///
1939 /// Must only be used instead of listening to the [values] stream. 2093 /// Must only be used instead of listening to the [values] stream.
1940 /// If the stream has been listened to, this call fails. 2094 /// If the stream has been listened to, this call fails.
1941 /// After calling this method, listening on the [values] stream fails. 2095 /// After calling this method, listening on the [values] stream fails.
1942 Future cancel() { 2096 Future cancel() {
1943 // If values has been listened to, 2097 // If values has been listened to,
1944 // this throws a StateError saying that stream has already been listened to, 2098 // this throws a StateError saying that stream has already been listened to,
1945 // which is a correct error message for this call too. 2099 // which is a correct error message for this call too.
1946 return values.listen(null).cancel(); 2100 return values.listen(null).cancel();
1947 } 2101 }
1948 } 2102 }
OLDNEW
« no previous file with comments | « no previous file | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698