Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2)

Side by Side Diff: sdk/lib/async/stream.dart

Issue 2899273003: Update stream documentation. (Closed)
Patch Set: Created 3 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Core Stream types 8 // Core Stream types
9 // ------------------------------------------------------------------- 9 // -------------------------------------------------------------------
10 10
11 typedef void _TimerCallback(); 11 typedef void _TimerCallback();
12 12
13 /** 13 /**
14 * A source of asynchronous data events. 14 * A source of asynchronous data events.
15 * 15 *
16 * A Stream provides a way to receive a sequence of events. 16 * A Stream provides a way to receive a sequence of events.
17 * Each event is either a data event or an error event, 17 * Each event is either a data event, also called an *element* of the stream,
floitsch 2017/05/24 13:37:51 isn't `_element_` the better way for this?
Lasse Reichstein Nielsen 2017/05/24 15:09:54 It's equivalent in commonmark. http://spec.commonm
18 * representing the result of a single computation. 18 * or an error event, which is a notification that something has failed.
19 * When the events provided by a Stream have all been sent, 19 * When a stream has emitted all its event,
20 * a single "done" event will mark the end. 20 * a single "done" event will notify the listener that the end has been reached.
21 * 21 *
22 * You can [listen] on a stream to make it start generating events, 22 * You [listen] on a stream to make it start generating events,
23 * and to set up listeners that receive the events. 23 * and to set up listeners that receive the events.
24 * When you listen, you receive a [StreamSubscription] object 24 * When you listen, you receive a [StreamSubscription] object
25 * which is the active object providing the events, 25 * which is the active object providing the events,
26 * and which can be used to stop listening again, 26 * and which can be used to stop listening again,
27 * or to temporarily pause events from the subscription. 27 * or to temporarily pause events from the subscription.
28 * 28 *
29 * There are two kinds of streams: "Single-subscription" streams and 29 * There are two kinds of streams: "Single-subscription" streams and
30 * "broadcast" streams. 30 * "broadcast" streams.
31 * 31 *
32 * *A single-subscription stream* allows only a single listener during the whole 32 * *A single-subscription stream* allows only a single listener during the whole
(...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after
111 controller._closeUnchecked(); 111 controller._closeUnchecked();
112 }); 112 });
113 return controller.stream; 113 return controller.stream;
114 } 114 }
115 115
116 /** 116 /**
117 * Create a stream from a group of futures. 117 * Create a stream from a group of futures.
118 * 118 *
119 * The stream reports the results of the futures on the stream in the order 119 * The stream reports the results of the futures on the stream in the order
120 * in which the futures complete. 120 * in which the futures complete.
121 * Each future provides either a data event or an error event,
122 * depending on how the future completes.
121 * 123 *
122 * If some futures have completed before calling `Stream.fromFutures`, 124 * If some futures have already completed when `Stream.fromFutures` is called,
123 * their result will be output on the created stream in some unspecified 125 * their results will be emitted in some unspecified order.
124 * order.
125 * 126 *
126 * When all futures have completed, the stream is closed. 127 * When all futures have completed, the stream is closed.
127 * 128 *
128 * If no future is passed, the stream closes as soon as possible. 129 * If [futures] is empty, the stream closes as soon as possible.
129 */ 130 */
130 factory Stream.fromFutures(Iterable<Future<T>> futures) { 131 factory Stream.fromFutures(Iterable<Future<T>> futures) {
131 _StreamController<T> controller = new StreamController<T>(sync: true); 132 _StreamController<T> controller = new StreamController<T>(sync: true);
132 int count = 0; 133 int count = 0;
134 // Declare as variable holding closure instead of as a function declaration.
floitsch 2017/05/24 13:37:51 holding a closure
Lasse Reichstein Nielsen 2017/05/24 15:09:54 And more rewrite of that broken sentence.
135 // This avoids creating a new closure for each future.
133 var onValue = (T value) { 136 var onValue = (T value) {
134 if (!controller.isClosed) { 137 if (!controller.isClosed) {
135 controller._add(value); 138 controller._add(value);
136 if (--count == 0) controller._closeUnchecked(); 139 if (--count == 0) controller._closeUnchecked();
137 } 140 }
138 }; 141 };
139 var onError = (error, stack) { 142 var onError = (error, stack) {
140 if (!controller.isClosed) { 143 if (!controller.isClosed) {
141 controller._addError(error, stack); 144 controller._addError(error, stack);
142 if (--count == 0) controller._closeUnchecked(); 145 if (--count == 0) controller._closeUnchecked();
(...skipping 192 matching lines...) Expand 10 before | Expand all | Expand 10 after
335 * stack trace). 338 * stack trace).
336 * Otherwise it is called with just the error object. 339 * Otherwise it is called with just the error object.
337 * If [onError] is omitted, any errors on the stream are considered unhandled, 340 * If [onError] is omitted, any errors on the stream are considered unhandled,
338 * and will be passed to the current [Zone]'s error handler. 341 * and will be passed to the current [Zone]'s error handler.
339 * By default unhandled async errors are treated 342 * By default unhandled async errors are treated
340 * as if they were uncaught top-level errors. 343 * as if they were uncaught top-level errors.
341 * 344 *
342 * If this stream closes and sends a done event, the [onDone] handler is 345 * If this stream closes and sends a done event, the [onDone] handler is
343 * called. If [onDone] is `null`, nothing happens. 346 * called. If [onDone] is `null`, nothing happens.
344 * 347 *
345 * If [cancelOnError] is true, the subscription is automatically cancelled 348 * If [cancelOnError] is true, the subscription is automatically canceled
346 * when the first error event is delivered. The default is `false`. 349 * when the first error event is delivered. The default is `false`.
347 * 350 *
348 * While a subscription is paused, or when it has been cancelled, 351 * While a subscription is paused, or when it has been canceled,
349 * the subscription doesn't receive events and none of the 352 * the subscription doesn't receive events and none of the
350 * event handler functions are called. 353 * event handler functions are called.
351 */ 354 */
352 StreamSubscription<T> listen(void onData(T event), 355 StreamSubscription<T> listen(void onData(T event),
353 {Function onError, void onDone(), bool cancelOnError}); 356 {Function onError, void onDone(), bool cancelOnError});
354 357
355 /** 358 /**
356 * Creates a new stream from this stream that discards some data events. 359 * Creates a new stream from this stream that discards some elements.
357 * 360 *
358 * The new stream sends the same error and done events as this stream, 361 * The new stream sends the same error and done events as this stream,
359 * but it only sends the data events that satisfy the [test]. 362 * but it only sends the data events that satisfy the [test].
360 * 363 *
364 * If the [test] function throws, the data event is dropped and the
365 * error emitted on the returned stream instead.
floitsch 2017/05/24 13:37:51 error is emitted
Lasse Reichstein Nielsen 2017/05/24 15:09:54 I think it can read without the "is", but it's pro
366 *
361 * The returned stream is a broadcast stream if this stream is. 367 * The returned stream is a broadcast stream if this stream is.
362 * If a broadcast stream is listened to more than once, each subscription 368 * If a broadcast stream is listened to more than once, each subscription
363 * will individually perform the `test`. 369 * will individually perform the `test`.
364 */ 370 */
365 Stream<T> where(bool test(T event)) { 371 Stream<T> where(bool test(T event)) {
366 return new _WhereStream<T>(this, test); 372 return new _WhereStream<T>(this, test);
367 } 373 }
368 374
369 /** 375 /**
376 * Transform each element of this stream into a new stream event.
floitsch 2017/05/24 13:37:50 Transforms
Lasse Reichstein Nielsen 2017/05/24 15:09:54 Done.
377 *
370 * Creates a new stream that converts each element of this stream 378 * Creates a new stream that converts each element of this stream
371 * to a new value using the [convert] function. 379 * to a new value using the [convert] function, and emits the result.
372 * 380 *
373 * For each data event, `o`, in this stream, the returned stream 381 * For each data event, `o`, in this stream, the returned stream
374 * provides a data event with the value `convert(o)`. 382 * provides a data event with the value `convert(o)`.
375 * If [convert] throws, the returned stream reports the exception as an error 383 * If [convert] throws, the returned stream reports it as an error
376 * event instead. 384 * event instead.
377 * 385 *
378 * Error and done events are passed through unchanged to the returned stream. 386 * Error and done events are passed through unchanged to the returned stream.
379 * 387 *
380 * The returned stream is a broadcast stream if this stream is. 388 * The returned stream is a broadcast stream if this stream is.
381 * The [convert] function is called once per data event per listener. 389 * The [convert] function is called once per data event per listener.
382 * If a broadcast stream is listened to more than once, each subscription 390 * If a broadcast stream is listened to more than once, each subscription
383 * will individually call [convert] on each data event. 391 * will individually call [convert] on each data event.
384 */ 392 */
385 Stream<S> map<S>(S convert(T event)) { 393 Stream<S> map<S>(S convert(T event)) {
(...skipping 125 matching lines...) Expand 10 before | Expand all | Expand 10 after
511 onResume: () { 519 onResume: () {
512 subscription.resume(); 520 subscription.resume();
513 }, 521 },
514 onCancel: () => subscription.cancel(), 522 onCancel: () => subscription.cancel(),
515 sync: true); 523 sync: true);
516 } 524 }
517 return controller.stream; 525 return controller.stream;
518 } 526 }
519 527
520 /** 528 /**
521 * Creates a new stream with the events of a stream per original event. 529 * Transform each element into a sequence of asynchronous events.
floitsch 2017/05/24 13:37:51 Transforms
530 *
531 * Returns a new stream that emits the error events and done event
floitsch 2017/05/24 13:37:51 I think we should start with what happens to the e
Lasse Reichstein Nielsen 2017/05/24 15:09:54 I've tried doing a rewording, but I'm not complete
532 * of this stream, and for each element of this stream, the returned
533 * stream emits a sequence of events, provided as a stream by the
534 * [convert] function called on the element.
522 * 535 *
523 * This acts like [expand], except that [convert] returns a [Stream] 536 * This acts like [expand], except that [convert] returns a [Stream]
524 * instead of an [Iterable]. 537 * instead of an [Iterable].
525 * The events of the returned stream becomes the events of the returned 538 * The events of the stream returned by [convert] are emitted on
526 * stream, in the order they are produced. 539 * the stream returned by [asyncExpand] in the order they are produced.
527 * 540 *
528 * If [convert] returns `null`, no value is put on the output stream, 541 * Further events from this stream are delayed until the stream
529 * just as if it returned an empty stream. 542 * returned by [convert] has ended.
543 *
544 * If [convert] returns `null`, no value is put on the output stream
545 * for that element, just as if it had returned an empty stream.
530 * 546 *
531 * The returned stream is a broadcast stream if this stream is. 547 * The returned stream is a broadcast stream if this stream is.
532 */ 548 */
533 Stream<E> asyncExpand<E>(Stream<E> convert(T event)) { 549 Stream<E> asyncExpand<E>(Stream<E> convert(T event)) {
534 StreamController<E> controller; 550 StreamController<E> controller;
535 StreamSubscription<T> subscription; 551 StreamSubscription<T> subscription;
536 void onListen() { 552 void onListen() {
537 assert(controller is _StreamController || 553 assert(controller is _StreamController ||
538 controller is _BroadcastStreamController); 554 controller is _BroadcastStreamController);
539 final _EventSink<E> eventSink = controller as Object/*=_EventSink<E>*/; 555 final _EventSink<E> eventSink = controller as Object/*=_EventSink<E>*/;
(...skipping 48 matching lines...) Expand 10 before | Expand all | Expand 10 after
588 * trace. The stack trace argument might be `null` if the stream itself 604 * trace. The stack trace argument might be `null` if the stream itself
589 * received an error without stack trace. 605 * received an error without stack trace.
590 * 606 *
591 * An asynchronous error `error` is matched by a test function if 607 * An asynchronous error `error` is matched by a test function if
592 *`test(error)` returns true. If [test] is omitted, every error is considered 608 *`test(error)` returns true. If [test] is omitted, every error is considered
593 * matching. 609 * matching.
594 * 610 *
595 * If the error is intercepted, the [onError] function can decide what to do 611 * If the error is intercepted, the [onError] function can decide what to do
596 * with it. It can throw if it wants to raise a new (or the same) error, 612 * with it. It can throw if it wants to raise a new (or the same) error,
597 * or simply return to make the stream forget the error. 613 * or simply return to make the stream forget the error.
614 * If the received `error` value is thrown again by the [onError] function,
615 * it acts like a `rethrow` and it is emitted along with its original
616 * stack trace, not the stack trace of the `throw` inside [onError].
598 * 617 *
599 * If you need to transform an error into a data event, use the more generic 618 * If you need to transform an error into a data event, use the more generic
600 * [Stream.transform] to handle the event by writing a data event to 619 * [Stream.transform] to handle the event by writing a data event to
601 * the output sink. 620 * the output sink.
602 * 621 *
603 * The returned stream is a broadcast stream if this stream is. 622 * The returned stream is a broadcast stream if this stream is.
604 * If a broadcast stream is listened to more than once, each subscription 623 * If a broadcast stream is listened to more than once, each subscription
605 * will individually perform the `test` and handle the error. 624 * will individually perform the `test` and handle the error.
606 */ 625 */
607 Stream<T> handleError(Function onError, {bool test(error)}) { 626 Stream<T> handleError(Function onError, {bool test(error)}) {
608 return new _HandleErrorStream<T>(this, onError, test); 627 return new _HandleErrorStream<T>(this, onError, test);
609 } 628 }
610 629
611 /** 630 /**
612 * Creates a new stream from this stream that converts each element 631 * Transform each element of this stream into a sqeuence of elements.
floitsch 2017/05/24 13:37:50 Transforms ... sequence ...
Lasse Reichstein Nielsen 2017/05/24 15:09:54 Done.
613 * into zero or more events.
614 * 632 *
615 * Each incoming event is converted to an [Iterable] of new events, 633 * Returns a new stream which emits the same error and done events as
floitsch 2017/05/24 13:37:51 See above.
Lasse Reichstein Nielsen 2017/05/24 15:09:54 Reworded. It's simpler than the async version.
616 * and each of these new events are then sent by the returned stream 634 * this stream, and for each element of this stream,
617 * in order. 635 * it calls [convert] with the element to get an iterable.
636 * Then each element of the iterable is emitted as an element
637 * of the returned stream, in iteration order.
638 *
639 * If iteration throws, that error is emitted as an error on the returned
640 * stream and iteration ends for that element of this stream.
618 * 641 *
619 * The returned stream is a broadcast stream if this stream is. 642 * The returned stream is a broadcast stream if this stream is.
620 * If a broadcast stream is listened to more than once, each subscription 643 * If a broadcast stream is listened to more than once, each subscription
621 * will individually call `convert` and expand the events. 644 * will individually call `convert` and expand the events.
622 */ 645 */
623 Stream<S> expand<S>(Iterable<S> convert(T value)) { 646 Stream<S> expand<S>(Iterable<S> convert(T element)) {
624 return new _ExpandStream<T, S>(this, convert); 647 return new _ExpandStream<T, S>(this, convert);
625 } 648 }
626 649
627 /** 650 /**
628 * Pipe the events of this stream into [streamConsumer]. 651 * Pipe the events of this stream into [streamConsumer].
floitsch 2017/05/24 13:37:50 Pipes
Lasse Reichstein Nielsen 2017/05/24 15:09:54 Done.
629 * 652 *
630 * The events of this stream are added to `streamConsumer` using 653 * All events of this stream are added to `streamConsumer` using
631 * [StreamConsumer.addStream]. 654 * [StreamConsumer.addStream].
632 * The `streamConsumer` is closed when this stream has been successfully added 655 * The `streamConsumer` is closed when this stream has been successfully added
633 * to it - when the future returned by `addStream` completes without an error. 656 * to it - when the future returned by `addStream` completes without an error.
634 * 657 *
635 * Returns a future which completes when the stream has been consumed 658 * Returns a future which completes when the stream has been consumed
636 * and the consumer has been closed. 659 * and the consumer has been closed.
637 * 660 *
638 * The returned future completes with the same result as the future returned 661 * The returned future completes with the same result as the future returned
639 * by [StreamConsumer.close]. 662 * by [StreamConsumer.close].
640 * If the adding of the stream itself fails in some way, 663 * If the call to [StreamConsumer.addStream] fails in some way, this
floitsch 2017/05/24 13:37:50 "fails" -> "throws"
Lasse Reichstein Nielsen 2017/05/24 15:09:54 Actually, I meant the completely vague thing here,
641 * then the consumer is expected to be closed, and won't be closed again. 664 * method fails in the same way.
642 * In that case the returned future completes with the error from calling
643 * `addStream`.
644 */ 665 */
645 Future pipe(StreamConsumer<T> streamConsumer) { 666 Future pipe(StreamConsumer<T> streamConsumer) {
646 return streamConsumer.addStream(this).then((_) => streamConsumer.close()); 667 return streamConsumer.addStream(this).then((_) => streamConsumer.close());
647 } 668 }
648 669
649 /** 670 /**
650 * Chains this stream as the input of the provided [StreamTransformer]. 671 * Applies a [StreamTransformer] to the current stream.
651 * 672 *
652 * Returns the result of [:streamTransformer.bind:] itself. 673 * Returns the result of the stream transformation,
674 * that is, the result of `streamTransformer.bind(this)`.
675 * This method simply allows writing the call to `streamTransformer.bind`
676 * in a chained fashion, like
677 * ```
678 * stream.map(mapping).transform(transformation).toList()
679 * ```
680 * which can be more convenient than calling `bind` directly.
653 * 681 *
654 * The `streamTransformer` can decide whether it wants to return a 682 * The [streamTransformer] can return any stream,
floitsch 2017/05/24 13:37:50 any stream. Whether the ... Maybe mention, that t
Lasse Reichstein Nielsen 2017/05/24 15:09:54 Done.
655 * broadcast stream or not. 683 * so whether the returned stream is a broadcast stream or not,
684 * and which elements it will contain,
685 * is entirely up to the transformation.
656 */ 686 */
657 Stream<S> transform<S>(StreamTransformer<T, S> streamTransformer) { 687 Stream<S> transform<S>(StreamTransformer<T, S> streamTransformer) {
658 return streamTransformer.bind(this); 688 return streamTransformer.bind(this);
659 } 689 }
660 690
661 /** 691 /**
662 * Reduces a sequence of values by repeatedly applying [combine]. 692 * Combines a sequence of values by repeatedly applying [combine].
693 *
694 * Similar to [Iterable.reduce], this function maintains a value,
695 * starting with the first element of the stream
696 * and updated for each further element of this stream.
697 * For each element after the first,
698 * the value is updated to the result of calling [combine]
699 * with the previous value and the element.
700 *
701 * When this stream is done, the returned future is completed with
702 * the value at that time.
703 *
704 * If the stream is empty, the returned future is completed with
705 * an error.
706 * If this stream emits an error, or the call to [combine] throws,
707 * the returned future is completed with that error,
708 * and processing is stopped.
663 */ 709 */
664 Future<T> reduce(T combine(T previous, T element)) { 710 Future<T> reduce(T combine(T previous, T element)) {
665 _Future<T> result = new _Future<T>(); 711 _Future<T> result = new _Future<T>();
666 bool seenFirst = false; 712 bool seenFirst = false;
667 T value; 713 T value;
668 StreamSubscription subscription; 714 StreamSubscription subscription;
669 subscription = this.listen( 715 subscription = this.listen(
670 (T element) { 716 (T element) {
671 if (seenFirst) { 717 if (seenFirst) {
672 _runUserCode(() => combine(value, element), (T newValue) { 718 _runUserCode(() => combine(value, element), (T newValue) {
673 value = newValue; 719 value = newValue;
674 }, _cancelAndErrorClosure(subscription, result)); 720 }, _cancelAndErrorClosure(subscription, result));
675 } else { 721 } else {
676 value = element; 722 value = element;
677 seenFirst = true; 723 seenFirst = true;
678 } 724 }
679 }, 725 },
680 onError: result._completeError, 726 onError: result._completeError,
681 onDone: () { 727 onDone: () {
682 if (!seenFirst) { 728 if (!seenFirst) {
683 try { 729 try {
730 // Throw and recatch, instead of just doing
731 // _completeWithErrorCallback, e, theError, StackTrace.current),
732 // to ensure that the stackTrace is set on the error.
684 throw IterableElementError.noElement(); 733 throw IterableElementError.noElement();
685 } catch (e, s) { 734 } catch (e, s) {
686 _completeWithErrorCallback(result, e, s); 735 _completeWithErrorCallback(result, e, s);
687 } 736 }
688 } else { 737 } else {
689 result._complete(value); 738 result._complete(value);
690 } 739 }
691 }, 740 },
692 cancelOnError: true); 741 cancelOnError: true);
693 return result; 742 return result;
694 } 743 }
695 744
696 /** Reduces a sequence of values by repeatedly applying [combine]. */ 745 /**
746 * Combines a sequence of values by repeatedly applying [combine].
747 *
748 * Similar to [Iterable.fold], this function maintains a value,
749 * starting with [initialValue] and updated for each element of
750 * this stream.
751 * For each element, the value is updated to the result of calling
752 * [combine] with the previous value and the element.
753 *
754 * When this stream is done, the returned future is completed with
755 * the value at that time.
756 * For an empty stream, the future is completed with [initialValue].
757 *
758 * If this stream emits an error, or the call to [combine] throws,
759 * the returned future is completed with that error,
760 * and processing is stopped.
761 */
697 Future<S> fold<S>(S initialValue, S combine(S previous, T element)) { 762 Future<S> fold<S>(S initialValue, S combine(S previous, T element)) {
698 _Future<S> result = new _Future<S>(); 763 _Future<S> result = new _Future<S>();
699 S value = initialValue; 764 S value = initialValue;
700 StreamSubscription subscription; 765 StreamSubscription subscription;
701 subscription = this.listen((T element) { 766 subscription = this.listen(
702 _runUserCode(() => combine(value, element), (S newValue) { 767 (T element) {
703 value = newValue; 768 _runUserCode(() => combine(value, element), (S newValue) {
704 }, _cancelAndErrorClosure(subscription, result)); 769 value = newValue;
705 }, onError: (e, st) { 770 }, _cancelAndErrorClosure(subscription, result));
706 result._completeError(e, st); 771 },
707 }, onDone: () { 772 onError: result._completeError,
708 result._complete(value); 773 onDone: () {
709 }, cancelOnError: true); 774 result._complete(value);
775 },
776 cancelOnError: true);
710 return result; 777 return result;
711 } 778 }
712 779
713 /** 780 /**
714 * Collects string of data events' string representations. 781 * Combines the string representation of elements into a single string.
715 * 782 *
716 * If [separator] is provided, it is inserted between any two 783 * Each element is converted to a string using its [Object.toString] method.
717 * elements. 784 * If [separator] is provided, it is inserted between element string
785 * representations.
718 * 786 *
719 * Any error in the stream causes the future to complete with that 787 * The returned future is completed with the combined string when the stream
720 * error. Otherwise it completes with the collected string when 788 * is done.
721 * the "done" event arrives. 789 *
790 * If the stream contains an error, or if the call to [Object.toString]
791 * throws, the returned future is completed with that error,
792 * and procssing stops.
floitsch 2017/05/24 13:37:51 processing
Lasse Reichstein Nielsen 2017/05/24 15:09:54 Done.
722 */ 793 */
723 Future<String> join([String separator = ""]) { 794 Future<String> join([String separator = ""]) {
724 _Future<String> result = new _Future<String>(); 795 _Future<String> result = new _Future<String>();
725 StringBuffer buffer = new StringBuffer(); 796 StringBuffer buffer = new StringBuffer();
726 StreamSubscription subscription; 797 StreamSubscription subscription;
727 bool first = true; 798 bool first = true;
728 subscription = this.listen((T element) { 799 subscription = this.listen((T element) {
729 if (!first) { 800 if (!first) {
730 buffer.write(separator); 801 buffer.write(separator);
731 } 802 }
732 first = false; 803 first = false;
733 try { 804 try {
734 buffer.write(element); 805 buffer.write(element);
735 } catch (e, s) { 806 } catch (e, s) {
736 _cancelAndErrorWithReplacement(subscription, result, e, s); 807 _cancelAndErrorWithReplacement(subscription, result, e, s);
737 } 808 }
738 }, onError: (e) { 809 }, onError: (e) {
739 result._completeError(e); 810 result._completeError(e);
740 }, onDone: () { 811 }, onDone: () {
741 result._complete(buffer.toString()); 812 result._complete(buffer.toString());
742 }, cancelOnError: true); 813 }, cancelOnError: true);
743 return result; 814 return result;
744 } 815 }
745 816
746 /** 817 /**
747 * Checks whether [needle] occurs in the elements provided by this stream. 818 * Checks whether [needle] occurs in the elements provided by this stream.
floitsch 2017/05/24 13:37:50 Returns
748 * 819 *
749 * Completes the [Future] when the answer is known. 820 * Compares each element of this stream to [needle] using [Object.==].
750 * If this stream reports an error, the [Future] will report that error. 821 * If an equal element is found, the returned future is completed with `true`.
822 * If the stream ends without finding a match, the future is completed with
823 * `false`.
824 *
825 * If the stream contains an error, or the call to `Object.==` throws,
826 * the returned future is completed with that error, and processing stops.
751 */ 827 */
752 Future<bool> contains(Object needle) { 828 Future<bool> contains(Object needle) {
753 _Future<bool> future = new _Future<bool>(); 829 _Future<bool> future = new _Future<bool>();
754 StreamSubscription subscription; 830 StreamSubscription subscription;
755 subscription = this.listen( 831 subscription = this.listen(
756 (T element) { 832 (T element) {
757 _runUserCode(() => (element == needle), (bool isMatch) { 833 _runUserCode(() => (element == needle), (bool isMatch) {
758 if (isMatch) { 834 if (isMatch) {
759 _cancelAndValue(subscription, future, true); 835 _cancelAndValue(subscription, future, true);
760 } 836 }
761 }, _cancelAndErrorClosure(subscription, future)); 837 }, _cancelAndErrorClosure(subscription, future));
762 }, 838 },
763 onError: future._completeError, 839 onError: future._completeError,
764 onDone: () { 840 onDone: () {
765 future._complete(false); 841 future._complete(false);
766 }, 842 },
767 cancelOnError: true); 843 cancelOnError: true);
768 return future; 844 return future;
769 } 845 }
770 846
771 /** 847 /**
772 * Executes [action] on each data event of the stream. 848 * Executes [action] on each element of the stream.
773 * 849 *
774 * Completes the returned [Future] when all events of the stream 850 * Completes the returned [Future] when all elements of the stream
775 * have been processed. Completes the future with an error if the 851 * have been processed.
776 * stream has an error event, or if [action] throws. 852 *
853 * If the stream contains an error, or if the call to [action] throws,
854 * the returne future completes with that error, and processing stops.
777 */ 855 */
778 Future forEach(void action(T element)) { 856 Future forEach(void action(T element)) {
779 _Future future = new _Future(); 857 _Future future = new _Future();
780 StreamSubscription subscription; 858 StreamSubscription subscription;
781 subscription = this.listen( 859 subscription = this.listen(
782 (T element) { 860 (T element) {
783 // TODO(floitsch): the type should be 'void' and inferred. 861 // TODO(floitsch): the type should be 'void' and inferred.
784 _runUserCode<dynamic>(() => action(element), (_) {}, 862 _runUserCode<dynamic>(() => action(element), (_) {},
785 _cancelAndErrorClosure(subscription, future)); 863 _cancelAndErrorClosure(subscription, future));
786 }, 864 },
787 onError: future._completeError, 865 onError: future._completeError,
788 onDone: () { 866 onDone: () {
789 future._complete(null); 867 future._complete(null);
790 }, 868 },
791 cancelOnError: true); 869 cancelOnError: true);
792 return future; 870 return future;
793 } 871 }
794 872
795 /** 873 /**
796 * Checks whether [test] accepts all elements provided by this stream. 874 * Checks whether [test] accepts all elements provided by this stream.
797 * 875 *
798 * Completes the [Future] when the answer is known. 876 * Calls [test] on each element of the stream.
799 * If this stream reports an error, the [Future] will report that error. 877 * If the call returns `false`, the returned future is completed with `false`
878 * and processing stops.
879 *
880 * If the stream ends without finding an element that [test] rejects,
881 * the returned future is completed with `true`.
882 *
883 * If this stream contains an error, or if the call to [test] throws,
884 * the returned future is completed with that error, and processing stops.
800 */ 885 */
801 Future<bool> every(bool test(T element)) { 886 Future<bool> every(bool test(T element)) {
802 _Future<bool> future = new _Future<bool>(); 887 _Future<bool> future = new _Future<bool>();
803 StreamSubscription subscription; 888 StreamSubscription subscription;
804 subscription = this.listen( 889 subscription = this.listen(
805 (T element) { 890 (T element) {
806 _runUserCode(() => test(element), (bool isMatch) { 891 _runUserCode(() => test(element), (bool isMatch) {
807 if (!isMatch) { 892 if (!isMatch) {
808 _cancelAndValue(subscription, future, false); 893 _cancelAndValue(subscription, future, false);
809 } 894 }
810 }, _cancelAndErrorClosure(subscription, future)); 895 }, _cancelAndErrorClosure(subscription, future));
811 }, 896 },
812 onError: future._completeError, 897 onError: future._completeError,
813 onDone: () { 898 onDone: () {
814 future._complete(true); 899 future._complete(true);
815 }, 900 },
816 cancelOnError: true); 901 cancelOnError: true);
817 return future; 902 return future;
818 } 903 }
819 904
820 /** 905 /**
821 * Checks whether [test] accepts any element provided by this stream. 906 * Checks whether [test] accepts any element provided by this stream.
822 * 907 *
823 * Completes the [Future] when the answer is known. 908 * Calls [test] on each element of the stream.
909 * If the call returns `true`, the returned future is completed with `true`
910 * and processing stops.
824 * 911 *
825 * If this stream reports an error, the [Future] reports that error. 912 * If the stream ends without finding an element that [test] accepts,
913 * the returned future is completed with `false`.
826 * 914 *
827 * Stops listening to the stream after the first matching element has been 915 * If this stream contains an error, or if the call to [test] throws,
828 * found. 916 * the returned future is completed with that error, and processing stops.
829 *
830 * Internally the method cancels its subscription after this element. This
831 * means that single-subscription (non-broadcast) streams are closed and
832 * cannot be reused after a call to this method.
833 */ 917 */
834 Future<bool> any(bool test(T element)) { 918 Future<bool> any(bool test(T element)) {
835 _Future<bool> future = new _Future<bool>(); 919 _Future<bool> future = new _Future<bool>();
836 StreamSubscription subscription; 920 StreamSubscription subscription;
837 subscription = this.listen( 921 subscription = this.listen(
838 (T element) { 922 (T element) {
839 _runUserCode(() => test(element), (bool isMatch) { 923 _runUserCode(() => test(element), (bool isMatch) {
840 if (isMatch) { 924 if (isMatch) {
841 _cancelAndValue(subscription, future, true); 925 _cancelAndValue(subscription, future, true);
842 } 926 }
843 }, _cancelAndErrorClosure(subscription, future)); 927 }, _cancelAndErrorClosure(subscription, future));
844 }, 928 },
845 onError: future._completeError, 929 onError: future._completeError,
846 onDone: () { 930 onDone: () {
847 future._complete(false); 931 future._complete(false);
848 }, 932 },
849 cancelOnError: true); 933 cancelOnError: true);
850 return future; 934 return future;
851 } 935 }
852 936
853 /** Counts the elements in the stream. */ 937 /**
938 * The number of elements in this stream.
939 *
940 * Waits for all elements of this stream. When the stream ends,
941 * the returned future is completed with the number of elements.
942 *
943 * If the stream contains an error, the returned future is completed with
944 * that error, and processing stops.
945 *
946 * This operation listens to the stream, and a non-broadcast stream cannot
947 * be reused after finding its length.
948 */
854 Future<int> get length { 949 Future<int> get length {
855 _Future<int> future = new _Future<int>(); 950 _Future<int> future = new _Future<int>();
856 int count = 0; 951 int count = 0;
857 this.listen( 952 this.listen(
858 (_) { 953 (_) {
859 count++; 954 count++;
860 }, 955 },
861 onError: future._completeError, 956 onError: future._completeError,
862 onDone: () { 957 onDone: () {
863 future._complete(count); 958 future._complete(count);
864 }, 959 },
865 cancelOnError: true); 960 cancelOnError: true);
866 return future; 961 return future;
867 } 962 }
868 963
869 /** 964 /**
870 * Reports whether this stream contains any elements. 965 * Whether this stream contains any elements.
871 * 966 *
872 * Stops listening to the stream after the first element has been received. 967 * Waits for the first element of this stream, then completes the returned
968 * future with `true`.
969 * If the stream ends without emitting any elements, the returned future is
970 * completed with `false`.
873 * 971 *
874 * Internally the method cancels its subscription after the first element. 972 * If the first event is an error, the returned future is completed with that
875 * This means that single-subscription (non-broadcast) streams are closed and 973 * error.
876 * cannot be reused after a call to this getter. 974 *
975 * This operation listens to the stream, and a non-broadcast stream cannot
976 * be reused after checking whether it is empty.
877 */ 977 */
878 Future<bool> get isEmpty { 978 Future<bool> get isEmpty {
879 _Future<bool> future = new _Future<bool>(); 979 _Future<bool> future = new _Future<bool>();
880 StreamSubscription subscription; 980 StreamSubscription subscription;
881 subscription = this.listen( 981 subscription = this.listen(
882 (_) { 982 (_) {
883 _cancelAndValue(subscription, future, false); 983 _cancelAndValue(subscription, future, false);
884 }, 984 },
885 onError: future._completeError, 985 onError: future._completeError,
886 onDone: () { 986 onDone: () {
887 future._complete(true); 987 future._complete(true);
888 }, 988 },
889 cancelOnError: true); 989 cancelOnError: true);
890 return future; 990 return future;
891 } 991 }
892 992
893 /** Collects the data of this stream in a [List]. */ 993 /**
994 * Collects all elements of this stream in a [List].
995 *
996 * Creates a `List<T>` and adds all elements of the stream to the list
997 * in the order they arrive.
998 * When the stream ends, the returned future is completed with that list.
999 *
1000 * If the stream contains an error, the returned future is completed
1001 * with that error, and processing stops.
1002 */
894 Future<List<T>> toList() { 1003 Future<List<T>> toList() {
895 List<T> result = <T>[]; 1004 List<T> result = <T>[];
896 _Future<List<T>> future = new _Future<List<T>>(); 1005 _Future<List<T>> future = new _Future<List<T>>();
897 this.listen( 1006 this.listen(
898 (T data) { 1007 (T data) {
899 result.add(data); 1008 result.add(data);
900 }, 1009 },
901 onError: future._completeError, 1010 onError: future._completeError,
902 onDone: () { 1011 onDone: () {
903 future._complete(result); 1012 future._complete(result);
(...skipping 131 matching lines...) Expand 10 before | Expand all | Expand 10 after
1035 * 1144 *
1036 * The returned stream is a broadcast stream if this stream is. 1145 * The returned stream is a broadcast stream if this stream is.
1037 * If a broadcast stream is listened to more than once, each subscription 1146 * If a broadcast stream is listened to more than once, each subscription
1038 * will individually perform the `equals` test. 1147 * will individually perform the `equals` test.
1039 */ 1148 */
1040 Stream<T> distinct([bool equals(T previous, T next)]) { 1149 Stream<T> distinct([bool equals(T previous, T next)]) {
1041 return new _DistinctStream<T>(this, equals); 1150 return new _DistinctStream<T>(this, equals);
1042 } 1151 }
1043 1152
1044 /** 1153 /**
1045 * Returns the first element of the stream. 1154 * The first element of the stream.
1046 * 1155 *
1047 * Stops listening to the stream after the first element has been received. 1156 * Stops listening to the stream after the first element has been received.
1048 * 1157 *
1049 * Internally the method cancels its subscription after the first element. 1158 * Internally the method cancels its subscription after the first element.
1050 * This means that single-subscription (non-broadcast) streams are closed 1159 * This means that single-subscription (non-broadcast) streams are closed
1051 * and cannot be reused after a call to this getter. 1160 * and cannot be reused after a call to this getter.
1052 * 1161 *
1053 * If an error event occurs before the first data event, the resulting future 1162 * If an error event occurs before the first data event, the returned future
1054 * is completed with that error. 1163 * is completed with that error.
1055 * 1164 *
1056 * If this stream is empty (a done event occurs before the first data event), 1165 * If this stream is empty (a done event occurs before the first data event),
1057 * the resulting future completes with a [StateError]. 1166 * the returned future completes with an error.
1058 * 1167 *
1059 * Except for the type of the error, this method is equivalent to 1168 * Except for the type of the error, this method is equivalent to
1060 * [:this.elementAt(0):]. 1169 * `this.elementAt(0)`.
1061 */ 1170 */
1062 Future<T> get first { 1171 Future<T> get first {
1063 _Future<T> future = new _Future<T>(); 1172 _Future<T> future = new _Future<T>();
1064 StreamSubscription subscription; 1173 StreamSubscription subscription;
1065 subscription = this.listen( 1174 subscription = this.listen(
1066 (T value) { 1175 (T value) {
1067 _cancelAndValue(subscription, future, value); 1176 _cancelAndValue(subscription, future, value);
1068 }, 1177 },
1069 onError: future._completeError, 1178 onError: future._completeError,
1070 onDone: () { 1179 onDone: () {
1071 try { 1180 try {
1072 throw IterableElementError.noElement(); 1181 throw IterableElementError.noElement();
1073 } catch (e, s) { 1182 } catch (e, s) {
1074 _completeWithErrorCallback(future, e, s); 1183 _completeWithErrorCallback(future, e, s);
1075 } 1184 }
1076 }, 1185 },
1077 cancelOnError: true); 1186 cancelOnError: true);
1078 return future; 1187 return future;
1079 } 1188 }
1080 1189
1081 /** 1190 /**
1082 * Returns the last element of the stream. 1191 * The last element of this stream.
1083 * 1192 *
1084 * If an error event occurs before the first data event, the resulting future 1193 * If this stream emits an error event,
1085 * is completed with that error. 1194 * the returned future is completed with that error
1195 * and processing stops.
1086 * 1196 *
1087 * If this stream is empty (a done event occurs before the first data event), 1197 * If this stream is empty (the done event is the first event),
1088 * the resulting future completes with a [StateError]. 1198 * the returned future completes with an error.
1089 */ 1199 */
1090 Future<T> get last { 1200 Future<T> get last {
1091 _Future<T> future = new _Future<T>(); 1201 _Future<T> future = new _Future<T>();
1092 T result = null; 1202 T result = null;
1093 bool foundResult = false; 1203 bool foundResult = false;
1094 listen( 1204 listen(
1095 (T value) { 1205 (T value) {
1096 foundResult = true; 1206 foundResult = true;
1097 result = value; 1207 result = value;
1098 }, 1208 },
1099 onError: future._completeError, 1209 onError: future._completeError,
1100 onDone: () { 1210 onDone: () {
1101 if (foundResult) { 1211 if (foundResult) {
1102 future._complete(result); 1212 future._complete(result);
1103 return; 1213 return;
1104 } 1214 }
1105 try { 1215 try {
1106 throw IterableElementError.noElement(); 1216 throw IterableElementError.noElement();
1107 } catch (e, s) { 1217 } catch (e, s) {
1108 _completeWithErrorCallback(future, e, s); 1218 _completeWithErrorCallback(future, e, s);
1109 } 1219 }
1110 }, 1220 },
1111 cancelOnError: true); 1221 cancelOnError: true);
1112 return future; 1222 return future;
1113 } 1223 }
1114 1224
1115 /** 1225 /**
1116 * Returns the single element. 1226 * The single element of this stream.
1117 * 1227 *
1118 * If an error event occurs before or after the first data event, the 1228 * If this stream emits an error event,
1119 * resulting future is completed with that error. 1229 * the returned future is completed with that error
1230 * and processing stops.
1120 * 1231 *
1121 * If [this] is empty or has more than one element throws a [StateError]. 1232 * If [this] is empty or has more than one element,
1233 * the returned future completes with an error.
1122 */ 1234 */
1123 Future<T> get single { 1235 Future<T> get single {
1124 _Future<T> future = new _Future<T>(); 1236 _Future<T> future = new _Future<T>();
1125 T result = null; 1237 T result = null;
1126 bool foundResult = false; 1238 bool foundResult = false;
1127 StreamSubscription subscription; 1239 StreamSubscription subscription;
1128 subscription = this.listen( 1240 subscription = this.listen(
1129 (T value) { 1241 (T value) {
1130 if (foundResult) { 1242 if (foundResult) {
1131 // This is the second element we get. 1243 // This is the second element we get.
(...skipping 19 matching lines...) Expand all
1151 _completeWithErrorCallback(future, e, s); 1263 _completeWithErrorCallback(future, e, s);
1152 } 1264 }
1153 }, 1265 },
1154 cancelOnError: true); 1266 cancelOnError: true);
1155 return future; 1267 return future;
1156 } 1268 }
1157 1269
1158 /** 1270 /**
1159 * Finds the first element of this stream matching [test]. 1271 * Finds the first element of this stream matching [test].
1160 * 1272 *
1161 * Returns a future that is filled with the first element of this stream 1273 * Returns a future that is completed with the first element of this stream
1162 * that [test] returns true for. 1274 * that [test] returns `true` for.
1163 * 1275 *
1164 * If no such element is found before this stream is done, and a 1276 * If no such element is found before this stream is done, and a
1165 * [defaultValue] function is provided, the result of calling [defaultValue] 1277 * [defaultValue] function is provided, the result of calling [defaultValue]
1166 * becomes the value of the future. 1278 * becomes the value of the future. If [defaultValue] throws, the returned
1279 * future is completed with that error.
1167 * 1280 *
1168 * Stops listening to the stream after the first matching element has been 1281 * If this stream emits an error before the first matching element,
1169 * received. 1282 * the returned future is completed with that error, and processing stops.
1283 *
1284 * Stops listening to the stream after the first matching element or error
1285 * has been received.
1170 * 1286 *
1171 * Internally the method cancels its subscription after the first element that 1287 * Internally the method cancels its subscription after the first element that
1172 * matches the predicate. This means that single-subscription (non-broadcast) 1288 * matches the predicate. This means that single-subscription (non-broadcast)
1173 * streams are closed and cannot be reused after a call to this method. 1289 * streams are closed and cannot be reused after a call to this method.
1174 * 1290 *
1175 * If an error occurs, or if this stream ends without finding a match and 1291 * If an error occurs, or if this stream ends without finding a match and
1176 * with no [defaultValue] function provided, the future will receive an 1292 * with no [defaultValue] function provided,
1177 * error. 1293 * the returned future is completed with an error.
1178 */ 1294 */
1179 Future<dynamic> firstWhere(bool test(T element), {Object defaultValue()}) { 1295 Future<dynamic> firstWhere(bool test(T element), {Object defaultValue()}) {
1180 _Future<dynamic> future = new _Future(); 1296 _Future<dynamic> future = new _Future();
1181 StreamSubscription subscription; 1297 StreamSubscription subscription;
1182 subscription = this.listen( 1298 subscription = this.listen(
1183 (T value) { 1299 (T value) {
1184 _runUserCode(() => test(value), (bool isMatch) { 1300 _runUserCode(() => test(value), (bool isMatch) {
1185 if (isMatch) { 1301 if (isMatch) {
1186 _cancelAndValue(subscription, future, value); 1302 _cancelAndValue(subscription, future, value);
1187 } 1303 }
(...skipping 11 matching lines...) Expand all
1199 _completeWithErrorCallback(future, e, s); 1315 _completeWithErrorCallback(future, e, s);
1200 } 1316 }
1201 }, 1317 },
1202 cancelOnError: true); 1318 cancelOnError: true);
1203 return future; 1319 return future;
1204 } 1320 }
1205 1321
1206 /** 1322 /**
1207 * Finds the last element in this stream matching [test]. 1323 * Finds the last element in this stream matching [test].
1208 * 1324 *
1209 * As [firstWhere], except that the last matching element is found. 1325 * If this stream emits an error, the returned future is completed with that
1210 * That means that the result cannot be provided before this stream 1326 * error, and processing stops.
1327 *
1328 * Otherwise as [firstWhere], except that the last matching element is found
1329 * instead of the first.
1330 * That means that a non-error result cannot be provided before this stream
1211 * is done. 1331 * is done.
1212 */ 1332 */
1213 Future<dynamic> lastWhere(bool test(T element), {Object defaultValue()}) { 1333 Future<dynamic> lastWhere(bool test(T element), {Object defaultValue()}) {
1214 _Future<dynamic> future = new _Future(); 1334 _Future<dynamic> future = new _Future();
1215 T result = null; 1335 T result = null;
1216 bool foundResult = false; 1336 bool foundResult = false;
1217 StreamSubscription subscription; 1337 StreamSubscription subscription;
1218 subscription = this.listen( 1338 subscription = this.listen(
1219 (T value) { 1339 (T value) {
1220 _runUserCode(() => true == test(value), (bool isMatch) { 1340 _runUserCode(() => true == test(value), (bool isMatch) {
(...skipping 235 matching lines...) Expand 10 before | Expand all | Expand 10 after
1456 * operation). If the listener wants to delete the file after having 1576 * operation). If the listener wants to delete the file after having
1457 * canceled the subscription, it must wait for the cleanup future to complete. 1577 * canceled the subscription, it must wait for the cleanup future to complete.
1458 * 1578 *
1459 * A returned future completes with a `null` value. 1579 * A returned future completes with a `null` value.
1460 * If the cleanup throws, which it really shouldn't, the returned future 1580 * If the cleanup throws, which it really shouldn't, the returned future
1461 * completes with that error. 1581 * completes with that error.
1462 */ 1582 */
1463 Future cancel(); 1583 Future cancel();
1464 1584
1465 /** 1585 /**
1466 * Set or override the data event handler of this subscription. 1586 * Set or override the data event handler of this subscription.
floitsch 2017/05/24 13:37:51 Set*s* or replaces ? Maybe just "replaces". After
Lasse Reichstein Nielsen 2017/05/24 15:09:54 Just "replaces", it's cleaner!
1467 * 1587 *
1468 * This method overrides the handler that has been set at the invocation of 1588 * The [handleData] function is called for each element of the stream
1469 * [Stream.listen]. 1589 * after this function is called.
1590 * If [handleData] is `null`, furhter elements are ignored.
floitsch 2017/05/24 13:37:51 further
Lasse Reichstein Nielsen 2017/05/24 15:09:54 Done.
1591 *
1592 * This method overrides the current handler set by the invocation of
1593 * [Stream.listen] or by a previous call to [onData].
1470 */ 1594 */
1471 void onData(void handleData(T data)); 1595 void onData(void handleData(T data));
1472 1596
1473 /** 1597 /**
1474 * Set or override the error event handler of this subscription. 1598 * Set or override the error event handler of this subscription.
1475 * 1599 *
1476 * This method overrides the handler that has been set at the invocation of 1600 * The [handleError] function must be able to be called with either
1477 * [Stream.listen] or by calling [asFuture]. 1601 * one positional argument, or with two positional arguments
1602 * where the seconds is always a [StackTrace].
1603 *
1604 * The [handleError] argument may be `null`, in which case further
1605 * error events are considered unhandled, and will be reported to
1606 * [Zone.handleUncaughtError].
1607 *
1608 * The provided function is called for all error events from the
1609 * stream subscription.
1610 *
1611 * This method overrides the current handler set by the invocation of
1612 * [Stream.listen], by calling [asFuture], or by a previous call to [onError].
1478 */ 1613 */
1479 void onError(Function handleError); 1614 void onError(Function handleError);
1480 1615
1481 /** 1616 /**
1482 * Set or override the done event handler of this subscription. 1617 * Set or override the done event handler of this subscription.
1483 * 1618 *
1484 * This method overrides the handler that has been set at the invocation of 1619 * The [handleDone] function is called when the stream closes.
1485 * [Stream.listen] or by calling [asFuture]. 1620 * The value may be `null`, in which case no function is called.
1621 *
1622 * This method overrides the current handler set by the invocation of
1623 * [Stream.listen], by calling [asFuture], or by a previous call to [onDone].
1486 */ 1624 */
1487 void onDone(void handleDone()); 1625 void onDone(void handleDone());
1488 1626
1489 /** 1627 /**
1490 * Request that the stream pauses events until further notice. 1628 * Request that the stream pauses events until further notice.
1491 * 1629 *
1492 * While paused, the subscription will not fire any events. 1630 * While paused, the subscription will not fire any events.
1493 * If it receives events from its source, they will be buffered until 1631 * If it receives events from its source, they will be buffered until
1494 * the subscription is resumed. 1632 * the subscription is resumed.
1495 * The underlying source is usually informed about the pause, 1633 * For non-broadcast streams, the underlying source is usually informed
1634 * about the pause,
1496 * so it can stop generating events until the subscription is resumed. 1635 * so it can stop generating events until the subscription is resumed.
1497 * 1636 *
1498 * To avoid buffering events on a broadcast stream, it is better to 1637 * To avoid buffering events on a broadcast stream, it is better to
1499 * cancel this subscription, and start to listen again when events 1638 * cancel this subscription, and start to listen again when events
1500 * are needed. 1639 * are needed, if the intermediate events are not important.
1501 * 1640 *
1502 * If [resumeSignal] is provided, the stream will undo the pause 1641 * If [resumeSignal] is provided, the stream subscription will undo the pause
1503 * when the future completes. If the future completes with an error, 1642 * when the future completes, as if by a call to [resume].
1504 * the stream will resume, but the error will not be handled! 1643 * If the future completes with an error,
1644 * the stream will still resume, but the error will be considered unhandled
1645 * and is passed to [Zone.handleUncaughtError].
1505 * 1646 *
1506 * A call to [resume] will also undo a pause. 1647 * A call to [resume] will also undo a pause.
1507 * 1648 *
1508 * If the subscription is paused more than once, an equal number 1649 * If the subscription is paused more than once, an equal number
1509 * of resumes must be performed to resume the stream. 1650 * of resumes must be performed to resume the stream.
1510 * 1651 *
1511 * Currently DOM streams silently drop events when the stream is paused. This 1652 * Currently DOM streams silently drop events when the stream is paused. This
1512 * is a bug and will be fixed. 1653 * is a bug and will be fixed.
1513 */ 1654 */
1514 void pause([Future resumeSignal]); 1655 void pause([Future resumeSignal]);
1515 1656
1516 /** 1657 /**
1517 * Resume after a pause. 1658 * Resume after a pause.
1659 *
1660 * This undoes one previous call to [pause].
1661 * When all previously calls to [pause] has been matched by a calls to
floitsch 2017/05/24 13:37:51 have been matched
Lasse Reichstein Nielsen 2017/05/24 15:09:54 Done.
1662 * [resume], possibly through a `resumeSignal` passed to [pause],
1663 * the stream subscription will again emit events.
floitsch 2017/05/24 13:37:50 subscription emits again events. (No need for futu
Lasse Reichstein Nielsen 2017/05/24 15:09:54 There is no guarantee that it will emit events imm
1518 */ 1664 */
1519 void resume(); 1665 void resume();
1520 1666
1521 /** 1667 /**
1522 * Returns true if the [StreamSubscription] is paused. 1668 * Returns true if the [StreamSubscription] is currently paused.
floitsch 2017/05/24 13:37:51 Whether the ...
Lasse Reichstein Nielsen 2017/05/24 15:09:54 Done.
1669 *
1670 * If there have been more calls to [pause] than to [resume] on this
1671 * stream subscription, the subscription is paused, and this getter
1672 * returns `true`.
1673 *
1674 * Returns `false` if the stream can currently emit events, or if
1675 * the subscription has completed or been cancelled.
1523 */ 1676 */
1524 bool get isPaused; 1677 bool get isPaused;
1525 1678
1526 /** 1679 /**
1527 * Returns a future that handles the [onDone] and [onError] callbacks. 1680 * Returns a future that handles the [onDone] and [onError] callbacks.
1528 * 1681 *
1529 * This method *overwrites* the existing [onDone] and [onError] callbacks 1682 * This method *overwrites* the existing [onDone] and [onError] callbacks
1530 * with new ones that complete the returned future. 1683 * with new ones that complete the returned future.
1531 * 1684 *
1532 * In case of an error the subscription will automatically cancel (even 1685 * In case of an error the subscription will automatically cancel (even
(...skipping 152 matching lines...) Expand 10 before | Expand all | Expand 10 after
1685 * Return a future which is completed when the [StreamSink] is finished. 1838 * Return a future which is completed when the [StreamSink] is finished.
1686 * 1839 *
1687 * If the `StreamSink` fails with an error, 1840 * If the `StreamSink` fails with an error,
1688 * perhaps in response to adding events using [add], [addError] or [close], 1841 * perhaps in response to adding events using [add], [addError] or [close],
1689 * the [done] future will complete with that error. 1842 * the [done] future will complete with that error.
1690 * 1843 *
1691 * Otherwise, the returned future will complete when either: 1844 * Otherwise, the returned future will complete when either:
1692 * 1845 *
1693 * * all events have been processed and the sink has been closed, or 1846 * * all events have been processed and the sink has been closed, or
1694 * * the sink has otherwise been stopped from handling more events 1847 * * the sink has otherwise been stopped from handling more events
1695 * (for example by cancelling a stream subscription). 1848 * (for example by canceling a stream subscription).
1696 */ 1849 */
1697 Future get done; 1850 Future get done;
1698 } 1851 }
1699 1852
1700 /** 1853 /**
1701 * Transforms a Stream. 1854 * Transforms a Stream.
1702 * 1855 *
1703 * When a stream's [Stream.transform] method is invoked with a 1856 * When a stream's [Stream.transform] method is invoked with a
1704 * [StreamTransformer], the stream calls the [bind] method on the provided 1857 * [StreamTransformer], the stream calls the [bind] method on the provided
1705 * transformer. The resulting stream is then returned from the 1858 * transformer. The resulting stream is then returned from the
(...skipping 201 matching lines...) Expand 10 before | Expand all | Expand 10 after
1907 2060
1908 void addError(error, [StackTrace stackTrace]) { 2061 void addError(error, [StackTrace stackTrace]) {
1909 _sink.addError(error, stackTrace); 2062 _sink.addError(error, stackTrace);
1910 } 2063 }
1911 2064
1912 void close() { 2065 void close() {
1913 _sink.close(); 2066 _sink.close();
1914 } 2067 }
1915 } 2068 }
1916 2069
1917 /// A group created by [Stream.groupBy] or [Stream.groupByMapped]. 2070 /// A group created by [Stream.groupBy].
1918 /// 2071 ///
1919 /// The stream created by `groupBy` emits a `GroupedEvents` for each distinct ke y 2072 /// The stream created by `groupBy` emits a `GroupedEvents` for each distinct ke y
floitsch 2017/05/24 13:37:51 long line
Lasse Reichstein Nielsen 2017/05/24 15:09:54 Ack, happened with the renaming :(
1920 /// it encounters. 2073 /// it encounters.
1921 /// This group contains the [key] itself, along with a stream of the [values] 2074 /// This group contains the [key] itself, along with a stream of the [values]
1922 /// associated with that key. 2075 /// associated with that key.
1923 class GroupedEvents<K, V> { 2076 class GroupedEvents<K, V> {
1924 /// The key that identifiers the values emitted by [values]. 2077 /// The key that identifiers the values emitted by [values].
1925 final K key; 2078 final K key;
1926 2079
1927 /// The [values] that [GroupBy] have grouped by the common [key]. 2080 /// The [values] that [GroupBy] have grouped by the common [key].
1928 final Stream<V> values; 2081 final Stream<V> values;
1929 2082
1930 factory GroupedEvents(K key, Stream<V> values) = GroupedEvents<K, V>._; 2083 factory GroupedEvents(K key, Stream<V> values) = GroupedEvents<K, V>._;
1931 2084
1932 // Don't expose a generative constructor. 2085 // Don't expose a generative constructor.
1933 // This class is not intended for subclassing, so we don't want to promise 2086 // This class is not intended for subclassing, so we don't want to promise
1934 // it. We can change that in the future. 2087 // it. We can change that in the future.
1935 GroupedEvents._(this.key, this.values); 2088 GroupedEvents._(this.key, this.values);
1936 2089
1937 /// Tells [values] to discard values instead of retaining them. 2090 /// Tells [values] to discard values instead of retaining them.
1938 /// 2091 ///
1939 /// Must only be used instead of listening to the [values] stream. 2092 /// Must only be used instead of listening to the [values] stream.
1940 /// If the stream has been listened to, this call fails. 2093 /// If the stream has been listened to, this call fails.
1941 /// After calling this method, listening on the [values] stream fails. 2094 /// After calling this method, listening on the [values] stream fails.
1942 Future cancel() { 2095 Future cancel() {
1943 // If values has been listened to, 2096 // If values has been listened to,
1944 // this throws a StateError saying that stream has already been listened to, 2097 // this throws a StateError saying that stream has already been listened to,
1945 // which is a correct error message for this call too. 2098 // which is a correct error message for this call too.
1946 return values.listen(null).cancel(); 2099 return values.listen(null).cancel();
1947 } 2100 }
1948 } 2101 }
OLDNEW
« no previous file with comments | « no previous file | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698