Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
| 8 // Core Stream types | 8 // Core Stream types |
| 9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
| 10 | 10 |
| 11 typedef void _TimerCallback(); | 11 typedef void _TimerCallback(); |
| 12 | 12 |
| 13 /** | 13 /** |
| 14 * A source of asynchronous data events. | 14 * A source of asynchronous data events. |
| 15 * | 15 * |
| 16 * A Stream provides a way to receive a sequence of events. | 16 * A Stream provides a way to receive a sequence of events. |
| 17 * Each event is either a data event or an error event, | 17 * Each event is either a data event, also called an *element* of the stream, |
| 18 * representing the result of a single computation. | 18 * or an error event, which is a notification that something has failed. |
| 19 * When the events provided by a Stream have all been sent, | 19 * When a stream has emitted all its event, |
| 20 * a single "done" event will mark the end. | 20 * a single "done" event will notify the listener that the end has been reached. |
| 21 * | 21 * |
| 22 * You can [listen] on a stream to make it start generating events, | 22 * You [listen] on a stream to make it start generating events, |
| 23 * and to set up listeners that receive the events. | 23 * and to set up listeners that receive the events. |
| 24 * When you listen, you receive a [StreamSubscription] object | 24 * When you listen, you receive a [StreamSubscription] object |
| 25 * which is the active object providing the events, | 25 * which is the active object providing the events, |
| 26 * and which can be used to stop listening again, | 26 * and which can be used to stop listening again, |
| 27 * or to temporarily pause events from the subscription. | 27 * or to temporarily pause events from the subscription. |
| 28 * | 28 * |
| 29 * There are two kinds of streams: "Single-subscription" streams and | 29 * There are two kinds of streams: "Single-subscription" streams and |
| 30 * "broadcast" streams. | 30 * "broadcast" streams. |
| 31 * | 31 * |
| 32 * *A single-subscription stream* allows only a single listener during the whole | 32 * *A single-subscription stream* allows only a single listener during the whole |
| (...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 111 controller._closeUnchecked(); | 111 controller._closeUnchecked(); |
| 112 }); | 112 }); |
| 113 return controller.stream; | 113 return controller.stream; |
| 114 } | 114 } |
| 115 | 115 |
| 116 /** | 116 /** |
| 117 * Create a stream from a group of futures. | 117 * Create a stream from a group of futures. |
| 118 * | 118 * |
| 119 * The stream reports the results of the futures on the stream in the order | 119 * The stream reports the results of the futures on the stream in the order |
| 120 * in which the futures complete. | 120 * in which the futures complete. |
| 121 * Each future provides either a data event or an error event, | |
| 122 * depending on how the future completes. | |
| 121 * | 123 * |
| 122 * If some futures have completed before calling `Stream.fromFutures`, | 124 * If some futures have already completed when `Stream.fromFutures` is called, |
| 123 * their result will be output on the created stream in some unspecified | 125 * their results will be emitted in some unspecified order. |
| 124 * order. | |
| 125 * | 126 * |
| 126 * When all futures have completed, the stream is closed. | 127 * When all futures have completed, the stream is closed. |
| 127 * | 128 * |
| 128 * If no future is passed, the stream closes as soon as possible. | 129 * If [futures] is empty, the stream closes as soon as possible. |
| 129 */ | 130 */ |
| 130 factory Stream.fromFutures(Iterable<Future<T>> futures) { | 131 factory Stream.fromFutures(Iterable<Future<T>> futures) { |
| 131 _StreamController<T> controller = new StreamController<T>(sync: true); | 132 _StreamController<T> controller = new StreamController<T>(sync: true); |
| 132 int count = 0; | 133 int count = 0; |
| 134 // Declare these as variables holding closures instead of as | |
| 135 // function declarations. | |
| 136 // This avoids creating a new closure from the functions for each future. | |
| 133 var onValue = (T value) { | 137 var onValue = (T value) { |
| 134 if (!controller.isClosed) { | 138 if (!controller.isClosed) { |
| 135 controller._add(value); | 139 controller._add(value); |
| 136 if (--count == 0) controller._closeUnchecked(); | 140 if (--count == 0) controller._closeUnchecked(); |
| 137 } | 141 } |
| 138 }; | 142 }; |
| 139 var onError = (error, stack) { | 143 var onError = (error, stack) { |
| 140 if (!controller.isClosed) { | 144 if (!controller.isClosed) { |
| 141 controller._addError(error, stack); | 145 controller._addError(error, stack); |
| 142 if (--count == 0) controller._closeUnchecked(); | 146 if (--count == 0) controller._closeUnchecked(); |
| (...skipping 192 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 335 * stack trace). | 339 * stack trace). |
| 336 * Otherwise it is called with just the error object. | 340 * Otherwise it is called with just the error object. |
| 337 * If [onError] is omitted, any errors on the stream are considered unhandled, | 341 * If [onError] is omitted, any errors on the stream are considered unhandled, |
| 338 * and will be passed to the current [Zone]'s error handler. | 342 * and will be passed to the current [Zone]'s error handler. |
| 339 * By default unhandled async errors are treated | 343 * By default unhandled async errors are treated |
| 340 * as if they were uncaught top-level errors. | 344 * as if they were uncaught top-level errors. |
| 341 * | 345 * |
| 342 * If this stream closes and sends a done event, the [onDone] handler is | 346 * If this stream closes and sends a done event, the [onDone] handler is |
| 343 * called. If [onDone] is `null`, nothing happens. | 347 * called. If [onDone] is `null`, nothing happens. |
| 344 * | 348 * |
| 345 * If [cancelOnError] is true, the subscription is automatically cancelled | 349 * If [cancelOnError] is true, the subscription is automatically canceled |
| 346 * when the first error event is delivered. The default is `false`. | 350 * when the first error event is delivered. The default is `false`. |
| 347 * | 351 * |
| 348 * While a subscription is paused, or when it has been cancelled, | 352 * While a subscription is paused, or when it has been canceled, |
| 349 * the subscription doesn't receive events and none of the | 353 * the subscription doesn't receive events and none of the |
| 350 * event handler functions are called. | 354 * event handler functions are called. |
| 351 */ | 355 */ |
| 352 StreamSubscription<T> listen(void onData(T event), | 356 StreamSubscription<T> listen(void onData(T event), |
| 353 {Function onError, void onDone(), bool cancelOnError}); | 357 {Function onError, void onDone(), bool cancelOnError}); |
| 354 | 358 |
| 355 /** | 359 /** |
| 356 * Creates a new stream from this stream that discards some data events. | 360 * Creates a new stream from this stream that discards some elements. |
| 357 * | 361 * |
| 358 * The new stream sends the same error and done events as this stream, | 362 * The new stream sends the same error and done events as this stream, |
| 359 * but it only sends the data events that satisfy the [test]. | 363 * but it only sends the data events that satisfy the [test]. |
| 360 * | 364 * |
| 365 * If the [test] function throws, the data event is dropped and the | |
| 366 * error is emitted on the returned stream instead. | |
| 367 * | |
| 361 * The returned stream is a broadcast stream if this stream is. | 368 * The returned stream is a broadcast stream if this stream is. |
| 362 * If a broadcast stream is listened to more than once, each subscription | 369 * If a broadcast stream is listened to more than once, each subscription |
| 363 * will individually perform the `test`. | 370 * will individually perform the `test`. |
| 364 */ | 371 */ |
| 365 Stream<T> where(bool test(T event)) { | 372 Stream<T> where(bool test(T event)) { |
| 366 return new _WhereStream<T>(this, test); | 373 return new _WhereStream<T>(this, test); |
| 367 } | 374 } |
| 368 | 375 |
| 369 /** | 376 /** |
| 377 * Transforms each element of this stream into a new stream event. | |
| 378 * | |
| 370 * Creates a new stream that converts each element of this stream | 379 * Creates a new stream that converts each element of this stream |
| 371 * to a new value using the [convert] function. | 380 * to a new value using the [convert] function, and emits the result. |
| 372 * | 381 * |
| 373 * For each data event, `o`, in this stream, the returned stream | 382 * For each data event, `o`, in this stream, the returned stream |
| 374 * provides a data event with the value `convert(o)`. | 383 * provides a data event with the value `convert(o)`. |
| 375 * If [convert] throws, the returned stream reports the exception as an error | 384 * If [convert] throws, the returned stream reports it as an error |
| 376 * event instead. | 385 * event instead. |
| 377 * | 386 * |
| 378 * Error and done events are passed through unchanged to the returned stream. | 387 * Error and done events are passed through unchanged to the returned stream. |
| 379 * | 388 * |
| 380 * The returned stream is a broadcast stream if this stream is. | 389 * The returned stream is a broadcast stream if this stream is. |
| 381 * The [convert] function is called once per data event per listener. | 390 * The [convert] function is called once per data event per listener. |
| 382 * If a broadcast stream is listened to more than once, each subscription | 391 * If a broadcast stream is listened to more than once, each subscription |
| 383 * will individually call [convert] on each data event. | 392 * will individually call [convert] on each data event. |
| 384 */ | 393 */ |
| 385 Stream<S> map<S>(S convert(T event)) { | 394 Stream<S> map<S>(S convert(T event)) { |
| (...skipping 125 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 511 onResume: () { | 520 onResume: () { |
| 512 subscription.resume(); | 521 subscription.resume(); |
| 513 }, | 522 }, |
| 514 onCancel: () => subscription.cancel(), | 523 onCancel: () => subscription.cancel(), |
| 515 sync: true); | 524 sync: true); |
| 516 } | 525 } |
| 517 return controller.stream; | 526 return controller.stream; |
| 518 } | 527 } |
| 519 | 528 |
| 520 /** | 529 /** |
| 521 * Creates a new stream with the events of a stream per original event. | 530 * Transforms each element into a sequence of asynchronous events. |
| 522 * | 531 * |
| 523 * This acts like [expand], except that [convert] returns a [Stream] | 532 * Returns a new stream and for each event of this stream, do the following: |
| 524 * instead of an [Iterable]. | |
| 525 * The events of the returned stream becomes the events of the returned | |
| 526 * stream, in the order they are produced. | |
| 527 * | 533 * |
| 528 * If [convert] returns `null`, no value is put on the output stream, | 534 * * If the event is an error event or a done event, it is emitted directly |
| 529 * just as if it returned an empty stream. | 535 * by the returned stream. |
| 536 * * Otherwise it is an element. Then the [convert] function is called | |
| 537 * with the element as argument to produce a convert-stream for the element. | |
| 538 * * If that call throws, the error is emitted on the returned stream. | |
| 539 * * If the call returnes `null`, no further action is taken for the elements. | |
| 540 * * Otherwise, this stream is paused and convert-stream is listened to. | |
| 541 * Every data and error event of the convert-stream is emitted on the returned | |
| 542 * stream in the order it is produced. | |
| 543 * When the convert-stream ends, this stream is resumed. | |
| 530 * | 544 * |
| 531 * The returned stream is a broadcast stream if this stream is. | 545 * The returned stream is a broadcast stream if this stream is. |
| 532 */ | 546 */ |
| 533 Stream<E> asyncExpand<E>(Stream<E> convert(T event)) { | 547 Stream<E> asyncExpand<E>(Stream<E> convert(T event)) { |
| 534 StreamController<E> controller; | 548 StreamController<E> controller; |
| 535 StreamSubscription<T> subscription; | 549 StreamSubscription<T> subscription; |
| 536 void onListen() { | 550 void onListen() { |
| 537 assert(controller is _StreamController || | 551 assert(controller is _StreamController || |
| 538 controller is _BroadcastStreamController); | 552 controller is _BroadcastStreamController); |
| 539 final _EventSink<E> eventSink = controller as Object/*=_EventSink<E>*/; | 553 final _EventSink<E> eventSink = controller as Object/*=_EventSink<E>*/; |
| (...skipping 48 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 588 * trace. The stack trace argument might be `null` if the stream itself | 602 * trace. The stack trace argument might be `null` if the stream itself |
| 589 * received an error without stack trace. | 603 * received an error without stack trace. |
| 590 * | 604 * |
| 591 * An asynchronous error `error` is matched by a test function if | 605 * An asynchronous error `error` is matched by a test function if |
| 592 *`test(error)` returns true. If [test] is omitted, every error is considered | 606 *`test(error)` returns true. If [test] is omitted, every error is considered |
| 593 * matching. | 607 * matching. |
| 594 * | 608 * |
| 595 * If the error is intercepted, the [onError] function can decide what to do | 609 * If the error is intercepted, the [onError] function can decide what to do |
| 596 * with it. It can throw if it wants to raise a new (or the same) error, | 610 * with it. It can throw if it wants to raise a new (or the same) error, |
| 597 * or simply return to make the stream forget the error. | 611 * or simply return to make the stream forget the error. |
| 612 * If the received `error` value is thrown again by the [onError] function, | |
| 613 * it acts like a `rethrow` and it is emitted along with its original | |
| 614 * stack trace, not the stack trace of the `throw` inside [onError]. | |
| 598 * | 615 * |
| 599 * If you need to transform an error into a data event, use the more generic | 616 * If you need to transform an error into a data event, use the more generic |
| 600 * [Stream.transform] to handle the event by writing a data event to | 617 * [Stream.transform] to handle the event by writing a data event to |
| 601 * the output sink. | 618 * the output sink. |
| 602 * | 619 * |
| 603 * The returned stream is a broadcast stream if this stream is. | 620 * The returned stream is a broadcast stream if this stream is. |
| 604 * If a broadcast stream is listened to more than once, each subscription | 621 * If a broadcast stream is listened to more than once, each subscription |
| 605 * will individually perform the `test` and handle the error. | 622 * will individually perform the `test` and handle the error. |
| 606 */ | 623 */ |
| 607 Stream<T> handleError(Function onError, {bool test(error)}) { | 624 Stream<T> handleError(Function onError, {bool test(error)}) { |
| 608 return new _HandleErrorStream<T>(this, onError, test); | 625 return new _HandleErrorStream<T>(this, onError, test); |
| 609 } | 626 } |
| 610 | 627 |
| 611 /** | 628 /** |
| 612 * Creates a new stream from this stream that converts each element | 629 * Transforms each element of this stream into a sequence of elements. |
| 613 * into zero or more events. | |
| 614 * | 630 * |
| 615 * Each incoming event is converted to an [Iterable] of new events, | 631 * Returns a new stream where each element of this stream is replaced |
| 616 * and each of these new events are then sent by the returned stream | 632 * by zero or more data events. |
| 617 * in order. | 633 * The event values are proveded as an [Iterable] by a call to [convert] |
|
floitsch
2017/05/30 17:52:47
provided
| |
| 634 * with the element as argument, and the elements of that iterable is | |
| 635 * emitted in iteration order. | |
| 636 * If calling [convert] throws, or if iteration of the returned values throws, | |
|
floitsch
2017/05/30 17:52:47
if the iteration
| |
| 637 * the error is emitted on the returned stream and iteration ends for that | |
| 638 * element of this stream. | |
| 639 * | |
| 640 * Error events and the done event of this stream are forwarded directly | |
| 641 * to the returned stream. | |
| 618 * | 642 * |
| 619 * The returned stream is a broadcast stream if this stream is. | 643 * The returned stream is a broadcast stream if this stream is. |
| 620 * If a broadcast stream is listened to more than once, each subscription | 644 * If a broadcast stream is listened to more than once, each subscription |
| 621 * will individually call `convert` and expand the events. | 645 * will individually call `convert` and expand the events. |
| 622 */ | 646 */ |
| 623 Stream<S> expand<S>(Iterable<S> convert(T value)) { | 647 Stream<S> expand<S>(Iterable<S> convert(T element)) { |
| 624 return new _ExpandStream<T, S>(this, convert); | 648 return new _ExpandStream<T, S>(this, convert); |
| 625 } | 649 } |
| 626 | 650 |
| 627 /** | 651 /** |
| 628 * Pipe the events of this stream into [streamConsumer]. | 652 * Pipes the events of this stream into [streamConsumer]. |
| 629 * | 653 * |
| 630 * The events of this stream are added to `streamConsumer` using | 654 * All events of this stream are added to `streamConsumer` using |
| 631 * [StreamConsumer.addStream]. | 655 * [StreamConsumer.addStream]. |
| 632 * The `streamConsumer` is closed when this stream has been successfully added | 656 * The `streamConsumer` is closed when this stream has been successfully added |
| 633 * to it - when the future returned by `addStream` completes without an error. | 657 * to it - when the future returned by `addStream` completes without an error. |
| 634 * | 658 * |
| 635 * Returns a future which completes when the stream has been consumed | 659 * Returns a future which completes when the stream has been consumed |
| 636 * and the consumer has been closed. | 660 * and the consumer has been closed. |
| 637 * | 661 * |
| 638 * The returned future completes with the same result as the future returned | 662 * The returned future completes with the same result as the future returned |
| 639 * by [StreamConsumer.close]. | 663 * by [StreamConsumer.close]. |
| 640 * If the adding of the stream itself fails in some way, | 664 * If the call to [StreamConsumer.addStream] fails in some way, this |
| 641 * then the consumer is expected to be closed, and won't be closed again. | 665 * method fails in the same way. |
| 642 * In that case the returned future completes with the error from calling | |
| 643 * `addStream`. | |
| 644 */ | 666 */ |
| 645 Future pipe(StreamConsumer<T> streamConsumer) { | 667 Future pipe(StreamConsumer<T> streamConsumer) { |
| 646 return streamConsumer.addStream(this).then((_) => streamConsumer.close()); | 668 return streamConsumer.addStream(this).then((_) => streamConsumer.close()); |
| 647 } | 669 } |
| 648 | 670 |
| 649 /** | 671 /** |
| 650 * Chains this stream as the input of the provided [StreamTransformer]. | 672 * Applies a [StreamTransformer] to the current stream. |
| 651 * | 673 * |
| 652 * Returns the result of [:streamTransformer.bind:] itself. | 674 * Returns the result of the stream transformation, |
| 675 * that is, the result of `streamTransformer.bind(this)`. | |
| 676 * This method simply allows writing the call to `streamTransformer.bind` | |
| 677 * in a chained fashion, like | |
| 678 * ``` | |
| 679 * stream.map(mapping).transform(transformation).toList() | |
| 680 * ``` | |
| 681 * which can be more convenient than calling `bind` directly. | |
| 653 * | 682 * |
| 654 * The `streamTransformer` can decide whether it wants to return a | 683 * The [streamTransformer] can return any stream. |
| 655 * broadcast stream or not. | 684 * Whether the returned stream is a broadcast stream or not, |
| 685 * and which elements it will contain, | |
| 686 * is entirely up to the transformation. | |
| 656 */ | 687 */ |
| 657 Stream<S> transform<S>(StreamTransformer<T, S> streamTransformer) { | 688 Stream<S> transform<S>(StreamTransformer<T, S> streamTransformer) { |
| 658 return streamTransformer.bind(this); | 689 return streamTransformer.bind(this); |
| 659 } | 690 } |
| 660 | 691 |
| 661 /** | 692 /** |
| 662 * Reduces a sequence of values by repeatedly applying [combine]. | 693 * Combines a sequence of values by repeatedly applying [combine]. |
| 694 * | |
| 695 * Similar to [Iterable.reduce], this function maintains a value, | |
| 696 * starting with the first element of the stream | |
| 697 * and updated for each further element of this stream. | |
| 698 * For each element after the first, | |
| 699 * the value is updated to the result of calling [combine] | |
| 700 * with the previous value and the element. | |
| 701 * | |
| 702 * When this stream is done, the returned future is completed with | |
| 703 * the value at that time. | |
| 704 * | |
| 705 * If the stream is empty, the returned future is completed with | |
| 706 * an error. | |
| 707 * If this stream emits an error, or the call to [combine] throws, | |
| 708 * the returned future is completed with that error, | |
| 709 * and processing is stopped. | |
| 663 */ | 710 */ |
| 664 Future<T> reduce(T combine(T previous, T element)) { | 711 Future<T> reduce(T combine(T previous, T element)) { |
| 665 _Future<T> result = new _Future<T>(); | 712 _Future<T> result = new _Future<T>(); |
| 666 bool seenFirst = false; | 713 bool seenFirst = false; |
| 667 T value; | 714 T value; |
| 668 StreamSubscription subscription; | 715 StreamSubscription subscription; |
| 669 subscription = this.listen( | 716 subscription = this.listen( |
| 670 (T element) { | 717 (T element) { |
| 671 if (seenFirst) { | 718 if (seenFirst) { |
| 672 _runUserCode(() => combine(value, element), (T newValue) { | 719 _runUserCode(() => combine(value, element), (T newValue) { |
| 673 value = newValue; | 720 value = newValue; |
| 674 }, _cancelAndErrorClosure(subscription, result)); | 721 }, _cancelAndErrorClosure(subscription, result)); |
| 675 } else { | 722 } else { |
| 676 value = element; | 723 value = element; |
| 677 seenFirst = true; | 724 seenFirst = true; |
| 678 } | 725 } |
| 679 }, | 726 }, |
| 680 onError: result._completeError, | 727 onError: result._completeError, |
| 681 onDone: () { | 728 onDone: () { |
| 682 if (!seenFirst) { | 729 if (!seenFirst) { |
| 683 try { | 730 try { |
| 731 // Throw and recatch, instead of just doing | |
| 732 // _completeWithErrorCallback, e, theError, StackTrace.current), | |
| 733 // to ensure that the stackTrace is set on the error. | |
| 684 throw IterableElementError.noElement(); | 734 throw IterableElementError.noElement(); |
| 685 } catch (e, s) { | 735 } catch (e, s) { |
| 686 _completeWithErrorCallback(result, e, s); | 736 _completeWithErrorCallback(result, e, s); |
| 687 } | 737 } |
| 688 } else { | 738 } else { |
| 689 result._complete(value); | 739 result._complete(value); |
| 690 } | 740 } |
| 691 }, | 741 }, |
| 692 cancelOnError: true); | 742 cancelOnError: true); |
| 693 return result; | 743 return result; |
| 694 } | 744 } |
| 695 | 745 |
| 696 /** Reduces a sequence of values by repeatedly applying [combine]. */ | 746 /** |
| 747 * Combines a sequence of values by repeatedly applying [combine]. | |
| 748 * | |
| 749 * Similar to [Iterable.fold], this function maintains a value, | |
| 750 * starting with [initialValue] and updated for each element of | |
| 751 * this stream. | |
| 752 * For each element, the value is updated to the result of calling | |
| 753 * [combine] with the previous value and the element. | |
| 754 * | |
| 755 * When this stream is done, the returned future is completed with | |
| 756 * the value at that time. | |
| 757 * For an empty stream, the future is completed with [initialValue]. | |
| 758 * | |
| 759 * If this stream emits an error, or the call to [combine] throws, | |
| 760 * the returned future is completed with that error, | |
| 761 * and processing is stopped. | |
| 762 */ | |
| 697 Future<S> fold<S>(S initialValue, S combine(S previous, T element)) { | 763 Future<S> fold<S>(S initialValue, S combine(S previous, T element)) { |
| 698 _Future<S> result = new _Future<S>(); | 764 _Future<S> result = new _Future<S>(); |
| 699 S value = initialValue; | 765 S value = initialValue; |
| 700 StreamSubscription subscription; | 766 StreamSubscription subscription; |
| 701 subscription = this.listen((T element) { | 767 subscription = this.listen( |
| 702 _runUserCode(() => combine(value, element), (S newValue) { | 768 (T element) { |
| 703 value = newValue; | 769 _runUserCode(() => combine(value, element), (S newValue) { |
| 704 }, _cancelAndErrorClosure(subscription, result)); | 770 value = newValue; |
| 705 }, onError: (e, st) { | 771 }, _cancelAndErrorClosure(subscription, result)); |
| 706 result._completeError(e, st); | 772 }, |
| 707 }, onDone: () { | 773 onError: result._completeError, |
| 708 result._complete(value); | 774 onDone: () { |
| 709 }, cancelOnError: true); | 775 result._complete(value); |
| 776 }, | |
| 777 cancelOnError: true); | |
| 710 return result; | 778 return result; |
| 711 } | 779 } |
| 712 | 780 |
| 713 /** | 781 /** |
| 714 * Collects string of data events' string representations. | 782 * Combines the string representation of elements into a single string. |
| 715 * | 783 * |
| 716 * If [separator] is provided, it is inserted between any two | 784 * Each element is converted to a string using its [Object.toString] method. |
| 717 * elements. | 785 * If [separator] is provided, it is inserted between element string |
| 786 * representations. | |
| 718 * | 787 * |
| 719 * Any error in the stream causes the future to complete with that | 788 * The returned future is completed with the combined string when the stream |
| 720 * error. Otherwise it completes with the collected string when | 789 * is done. |
| 721 * the "done" event arrives. | 790 * |
| 791 * If the stream contains an error, or if the call to [Object.toString] | |
| 792 * throws, the returned future is completed with that error, | |
| 793 * and processing stops. | |
| 722 */ | 794 */ |
| 723 Future<String> join([String separator = ""]) { | 795 Future<String> join([String separator = ""]) { |
| 724 _Future<String> result = new _Future<String>(); | 796 _Future<String> result = new _Future<String>(); |
| 725 StringBuffer buffer = new StringBuffer(); | 797 StringBuffer buffer = new StringBuffer(); |
| 726 StreamSubscription subscription; | 798 StreamSubscription subscription; |
| 727 bool first = true; | 799 bool first = true; |
| 728 subscription = this.listen((T element) { | 800 subscription = this.listen((T element) { |
| 729 if (!first) { | 801 if (!first) { |
| 730 buffer.write(separator); | 802 buffer.write(separator); |
| 731 } | 803 } |
| 732 first = false; | 804 first = false; |
| 733 try { | 805 try { |
| 734 buffer.write(element); | 806 buffer.write(element); |
| 735 } catch (e, s) { | 807 } catch (e, s) { |
| 736 _cancelAndErrorWithReplacement(subscription, result, e, s); | 808 _cancelAndErrorWithReplacement(subscription, result, e, s); |
| 737 } | 809 } |
| 738 }, onError: (e) { | 810 }, onError: (e) { |
| 739 result._completeError(e); | 811 result._completeError(e); |
| 740 }, onDone: () { | 812 }, onDone: () { |
| 741 result._complete(buffer.toString()); | 813 result._complete(buffer.toString()); |
| 742 }, cancelOnError: true); | 814 }, cancelOnError: true); |
| 743 return result; | 815 return result; |
| 744 } | 816 } |
| 745 | 817 |
| 746 /** | 818 /** |
| 747 * Checks whether [needle] occurs in the elements provided by this stream. | 819 * Returns whether [needle] occurs in the elements provided by this stream. |
| 748 * | 820 * |
| 749 * Completes the [Future] when the answer is known. | 821 * Compares each element of this stream to [needle] using [Object.==]. |
| 750 * If this stream reports an error, the [Future] will report that error. | 822 * If an equal element is found, the returned future is completed with `true`. |
| 823 * If the stream ends without finding a match, the future is completed with | |
| 824 * `false`. | |
| 825 * | |
| 826 * If the stream contains an error, or the call to `Object.==` throws, | |
| 827 * the returned future is completed with that error, and processing stops. | |
| 751 */ | 828 */ |
| 752 Future<bool> contains(Object needle) { | 829 Future<bool> contains(Object needle) { |
| 753 _Future<bool> future = new _Future<bool>(); | 830 _Future<bool> future = new _Future<bool>(); |
| 754 StreamSubscription subscription; | 831 StreamSubscription subscription; |
| 755 subscription = this.listen( | 832 subscription = this.listen( |
| 756 (T element) { | 833 (T element) { |
| 757 _runUserCode(() => (element == needle), (bool isMatch) { | 834 _runUserCode(() => (element == needle), (bool isMatch) { |
| 758 if (isMatch) { | 835 if (isMatch) { |
| 759 _cancelAndValue(subscription, future, true); | 836 _cancelAndValue(subscription, future, true); |
| 760 } | 837 } |
| 761 }, _cancelAndErrorClosure(subscription, future)); | 838 }, _cancelAndErrorClosure(subscription, future)); |
| 762 }, | 839 }, |
| 763 onError: future._completeError, | 840 onError: future._completeError, |
| 764 onDone: () { | 841 onDone: () { |
| 765 future._complete(false); | 842 future._complete(false); |
| 766 }, | 843 }, |
| 767 cancelOnError: true); | 844 cancelOnError: true); |
| 768 return future; | 845 return future; |
| 769 } | 846 } |
| 770 | 847 |
| 771 /** | 848 /** |
| 772 * Executes [action] on each data event of the stream. | 849 * Executes [action] on each element of the stream. |
| 773 * | 850 * |
| 774 * Completes the returned [Future] when all events of the stream | 851 * Completes the returned [Future] when all elements of the stream |
| 775 * have been processed. Completes the future with an error if the | 852 * have been processed. |
| 776 * stream has an error event, or if [action] throws. | 853 * |
| 854 * If the stream contains an error, or if the call to [action] throws, | |
| 855 * the returne future completes with that error, and processing stops. | |
| 777 */ | 856 */ |
| 778 Future forEach(void action(T element)) { | 857 Future forEach(void action(T element)) { |
| 779 _Future future = new _Future(); | 858 _Future future = new _Future(); |
| 780 StreamSubscription subscription; | 859 StreamSubscription subscription; |
| 781 subscription = this.listen( | 860 subscription = this.listen( |
| 782 (T element) { | 861 (T element) { |
| 783 // TODO(floitsch): the type should be 'void' and inferred. | 862 // TODO(floitsch): the type should be 'void' and inferred. |
| 784 _runUserCode<dynamic>(() => action(element), (_) {}, | 863 _runUserCode<dynamic>(() => action(element), (_) {}, |
| 785 _cancelAndErrorClosure(subscription, future)); | 864 _cancelAndErrorClosure(subscription, future)); |
| 786 }, | 865 }, |
| 787 onError: future._completeError, | 866 onError: future._completeError, |
| 788 onDone: () { | 867 onDone: () { |
| 789 future._complete(null); | 868 future._complete(null); |
| 790 }, | 869 }, |
| 791 cancelOnError: true); | 870 cancelOnError: true); |
| 792 return future; | 871 return future; |
| 793 } | 872 } |
| 794 | 873 |
| 795 /** | 874 /** |
| 796 * Checks whether [test] accepts all elements provided by this stream. | 875 * Checks whether [test] accepts all elements provided by this stream. |
| 797 * | 876 * |
| 798 * Completes the [Future] when the answer is known. | 877 * Calls [test] on each element of the stream. |
| 799 * If this stream reports an error, the [Future] will report that error. | 878 * If the call returns `false`, the returned future is completed with `false` |
| 879 * and processing stops. | |
| 880 * | |
| 881 * If the stream ends without finding an element that [test] rejects, | |
| 882 * the returned future is completed with `true`. | |
| 883 * | |
| 884 * If this stream contains an error, or if the call to [test] throws, | |
| 885 * the returned future is completed with that error, and processing stops. | |
| 800 */ | 886 */ |
| 801 Future<bool> every(bool test(T element)) { | 887 Future<bool> every(bool test(T element)) { |
| 802 _Future<bool> future = new _Future<bool>(); | 888 _Future<bool> future = new _Future<bool>(); |
| 803 StreamSubscription subscription; | 889 StreamSubscription subscription; |
| 804 subscription = this.listen( | 890 subscription = this.listen( |
| 805 (T element) { | 891 (T element) { |
| 806 _runUserCode(() => test(element), (bool isMatch) { | 892 _runUserCode(() => test(element), (bool isMatch) { |
| 807 if (!isMatch) { | 893 if (!isMatch) { |
| 808 _cancelAndValue(subscription, future, false); | 894 _cancelAndValue(subscription, future, false); |
| 809 } | 895 } |
| 810 }, _cancelAndErrorClosure(subscription, future)); | 896 }, _cancelAndErrorClosure(subscription, future)); |
| 811 }, | 897 }, |
| 812 onError: future._completeError, | 898 onError: future._completeError, |
| 813 onDone: () { | 899 onDone: () { |
| 814 future._complete(true); | 900 future._complete(true); |
| 815 }, | 901 }, |
| 816 cancelOnError: true); | 902 cancelOnError: true); |
| 817 return future; | 903 return future; |
| 818 } | 904 } |
| 819 | 905 |
| 820 /** | 906 /** |
| 821 * Checks whether [test] accepts any element provided by this stream. | 907 * Checks whether [test] accepts any element provided by this stream. |
| 822 * | 908 * |
| 823 * Completes the [Future] when the answer is known. | 909 * Calls [test] on each element of the stream. |
| 910 * If the call returns `true`, the returned future is completed with `true` | |
| 911 * and processing stops. | |
| 824 * | 912 * |
| 825 * If this stream reports an error, the [Future] reports that error. | 913 * If the stream ends without finding an element that [test] accepts, |
| 914 * the returned future is completed with `false`. | |
| 826 * | 915 * |
| 827 * Stops listening to the stream after the first matching element has been | 916 * If this stream contains an error, or if the call to [test] throws, |
| 828 * found. | 917 * the returned future is completed with that error, and processing stops. |
| 829 * | |
| 830 * Internally the method cancels its subscription after this element. This | |
| 831 * means that single-subscription (non-broadcast) streams are closed and | |
| 832 * cannot be reused after a call to this method. | |
| 833 */ | 918 */ |
| 834 Future<bool> any(bool test(T element)) { | 919 Future<bool> any(bool test(T element)) { |
| 835 _Future<bool> future = new _Future<bool>(); | 920 _Future<bool> future = new _Future<bool>(); |
| 836 StreamSubscription subscription; | 921 StreamSubscription subscription; |
| 837 subscription = this.listen( | 922 subscription = this.listen( |
| 838 (T element) { | 923 (T element) { |
| 839 _runUserCode(() => test(element), (bool isMatch) { | 924 _runUserCode(() => test(element), (bool isMatch) { |
| 840 if (isMatch) { | 925 if (isMatch) { |
| 841 _cancelAndValue(subscription, future, true); | 926 _cancelAndValue(subscription, future, true); |
| 842 } | 927 } |
| 843 }, _cancelAndErrorClosure(subscription, future)); | 928 }, _cancelAndErrorClosure(subscription, future)); |
| 844 }, | 929 }, |
| 845 onError: future._completeError, | 930 onError: future._completeError, |
| 846 onDone: () { | 931 onDone: () { |
| 847 future._complete(false); | 932 future._complete(false); |
| 848 }, | 933 }, |
| 849 cancelOnError: true); | 934 cancelOnError: true); |
| 850 return future; | 935 return future; |
| 851 } | 936 } |
| 852 | 937 |
| 853 /** Counts the elements in the stream. */ | 938 /** |
| 939 * The number of elements in this stream. | |
| 940 * | |
| 941 * Waits for all elements of this stream. When the stream ends, | |
| 942 * the returned future is completed with the number of elements. | |
| 943 * | |
| 944 * If the stream contains an error, the returned future is completed with | |
| 945 * that error, and processing stops. | |
| 946 * | |
| 947 * This operation listens to the stream, and a non-broadcast stream cannot | |
| 948 * be reused after finding its length. | |
| 949 */ | |
| 854 Future<int> get length { | 950 Future<int> get length { |
| 855 _Future<int> future = new _Future<int>(); | 951 _Future<int> future = new _Future<int>(); |
| 856 int count = 0; | 952 int count = 0; |
| 857 this.listen( | 953 this.listen( |
| 858 (_) { | 954 (_) { |
| 859 count++; | 955 count++; |
| 860 }, | 956 }, |
| 861 onError: future._completeError, | 957 onError: future._completeError, |
| 862 onDone: () { | 958 onDone: () { |
| 863 future._complete(count); | 959 future._complete(count); |
| 864 }, | 960 }, |
| 865 cancelOnError: true); | 961 cancelOnError: true); |
| 866 return future; | 962 return future; |
| 867 } | 963 } |
| 868 | 964 |
| 869 /** | 965 /** |
| 870 * Reports whether this stream contains any elements. | 966 * Whether this stream contains any elements. |
| 871 * | 967 * |
| 872 * Stops listening to the stream after the first element has been received. | 968 * Waits for the first element of this stream, then completes the returned |
| 969 * future with `true`. | |
| 970 * If the stream ends without emitting any elements, the returned future is | |
| 971 * completed with `false`. | |
| 873 * | 972 * |
| 874 * Internally the method cancels its subscription after the first element. | 973 * If the first event is an error, the returned future is completed with that |
| 875 * This means that single-subscription (non-broadcast) streams are closed and | 974 * error. |
| 876 * cannot be reused after a call to this getter. | 975 * |
| 976 * This operation listens to the stream, and a non-broadcast stream cannot | |
| 977 * be reused after checking whether it is empty. | |
| 877 */ | 978 */ |
| 878 Future<bool> get isEmpty { | 979 Future<bool> get isEmpty { |
| 879 _Future<bool> future = new _Future<bool>(); | 980 _Future<bool> future = new _Future<bool>(); |
| 880 StreamSubscription subscription; | 981 StreamSubscription subscription; |
| 881 subscription = this.listen( | 982 subscription = this.listen( |
| 882 (_) { | 983 (_) { |
| 883 _cancelAndValue(subscription, future, false); | 984 _cancelAndValue(subscription, future, false); |
| 884 }, | 985 }, |
| 885 onError: future._completeError, | 986 onError: future._completeError, |
| 886 onDone: () { | 987 onDone: () { |
| 887 future._complete(true); | 988 future._complete(true); |
| 888 }, | 989 }, |
| 889 cancelOnError: true); | 990 cancelOnError: true); |
| 890 return future; | 991 return future; |
| 891 } | 992 } |
| 892 | 993 |
| 893 /** Collects the data of this stream in a [List]. */ | 994 /** |
| 995 * Collects all elements of this stream in a [List]. | |
| 996 * | |
| 997 * Creates a `List<T>` and adds all elements of the stream to the list | |
| 998 * in the order they arrive. | |
| 999 * When the stream ends, the returned future is completed with that list. | |
| 1000 * | |
| 1001 * If the stream contains an error, the returned future is completed | |
| 1002 * with that error, and processing stops. | |
| 1003 */ | |
| 894 Future<List<T>> toList() { | 1004 Future<List<T>> toList() { |
| 895 List<T> result = <T>[]; | 1005 List<T> result = <T>[]; |
| 896 _Future<List<T>> future = new _Future<List<T>>(); | 1006 _Future<List<T>> future = new _Future<List<T>>(); |
| 897 this.listen( | 1007 this.listen( |
| 898 (T data) { | 1008 (T data) { |
| 899 result.add(data); | 1009 result.add(data); |
| 900 }, | 1010 }, |
| 901 onError: future._completeError, | 1011 onError: future._completeError, |
| 902 onDone: () { | 1012 onDone: () { |
| 903 future._complete(result); | 1013 future._complete(result); |
| (...skipping 131 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 1035 * | 1145 * |
| 1036 * The returned stream is a broadcast stream if this stream is. | 1146 * The returned stream is a broadcast stream if this stream is. |
| 1037 * If a broadcast stream is listened to more than once, each subscription | 1147 * If a broadcast stream is listened to more than once, each subscription |
| 1038 * will individually perform the `equals` test. | 1148 * will individually perform the `equals` test. |
| 1039 */ | 1149 */ |
| 1040 Stream<T> distinct([bool equals(T previous, T next)]) { | 1150 Stream<T> distinct([bool equals(T previous, T next)]) { |
| 1041 return new _DistinctStream<T>(this, equals); | 1151 return new _DistinctStream<T>(this, equals); |
| 1042 } | 1152 } |
| 1043 | 1153 |
| 1044 /** | 1154 /** |
| 1045 * Returns the first element of the stream. | 1155 * The first element of the stream. |
| 1046 * | 1156 * |
| 1047 * Stops listening to the stream after the first element has been received. | 1157 * Stops listening to the stream after the first element has been received. |
| 1048 * | 1158 * |
| 1049 * Internally the method cancels its subscription after the first element. | 1159 * Internally the method cancels its subscription after the first element. |
| 1050 * This means that single-subscription (non-broadcast) streams are closed | 1160 * This means that single-subscription (non-broadcast) streams are closed |
| 1051 * and cannot be reused after a call to this getter. | 1161 * and cannot be reused after a call to this getter. |
| 1052 * | 1162 * |
| 1053 * If an error event occurs before the first data event, the resulting future | 1163 * If an error event occurs before the first data event, the returned future |
| 1054 * is completed with that error. | 1164 * is completed with that error. |
| 1055 * | 1165 * |
| 1056 * If this stream is empty (a done event occurs before the first data event), | 1166 * If this stream is empty (a done event occurs before the first data event), |
| 1057 * the resulting future completes with a [StateError]. | 1167 * the returned future completes with an error. |
| 1058 * | 1168 * |
| 1059 * Except for the type of the error, this method is equivalent to | 1169 * Except for the type of the error, this method is equivalent to |
| 1060 * [:this.elementAt(0):]. | 1170 * `this.elementAt(0)`. |
| 1061 */ | 1171 */ |
| 1062 Future<T> get first { | 1172 Future<T> get first { |
| 1063 _Future<T> future = new _Future<T>(); | 1173 _Future<T> future = new _Future<T>(); |
| 1064 StreamSubscription subscription; | 1174 StreamSubscription subscription; |
| 1065 subscription = this.listen( | 1175 subscription = this.listen( |
| 1066 (T value) { | 1176 (T value) { |
| 1067 _cancelAndValue(subscription, future, value); | 1177 _cancelAndValue(subscription, future, value); |
| 1068 }, | 1178 }, |
| 1069 onError: future._completeError, | 1179 onError: future._completeError, |
| 1070 onDone: () { | 1180 onDone: () { |
| 1071 try { | 1181 try { |
| 1072 throw IterableElementError.noElement(); | 1182 throw IterableElementError.noElement(); |
| 1073 } catch (e, s) { | 1183 } catch (e, s) { |
| 1074 _completeWithErrorCallback(future, e, s); | 1184 _completeWithErrorCallback(future, e, s); |
| 1075 } | 1185 } |
| 1076 }, | 1186 }, |
| 1077 cancelOnError: true); | 1187 cancelOnError: true); |
| 1078 return future; | 1188 return future; |
| 1079 } | 1189 } |
| 1080 | 1190 |
| 1081 /** | 1191 /** |
| 1082 * Returns the last element of the stream. | 1192 * The last element of this stream. |
| 1083 * | 1193 * |
| 1084 * If an error event occurs before the first data event, the resulting future | 1194 * If this stream emits an error event, |
| 1085 * is completed with that error. | 1195 * the returned future is completed with that error |
| 1196 * and processing stops. | |
| 1086 * | 1197 * |
| 1087 * If this stream is empty (a done event occurs before the first data event), | 1198 * If this stream is empty (the done event is the first event), |
| 1088 * the resulting future completes with a [StateError]. | 1199 * the returned future completes with an error. |
| 1089 */ | 1200 */ |
| 1090 Future<T> get last { | 1201 Future<T> get last { |
| 1091 _Future<T> future = new _Future<T>(); | 1202 _Future<T> future = new _Future<T>(); |
| 1092 T result = null; | 1203 T result = null; |
| 1093 bool foundResult = false; | 1204 bool foundResult = false; |
| 1094 listen( | 1205 listen( |
| 1095 (T value) { | 1206 (T value) { |
| 1096 foundResult = true; | 1207 foundResult = true; |
| 1097 result = value; | 1208 result = value; |
| 1098 }, | 1209 }, |
| 1099 onError: future._completeError, | 1210 onError: future._completeError, |
| 1100 onDone: () { | 1211 onDone: () { |
| 1101 if (foundResult) { | 1212 if (foundResult) { |
| 1102 future._complete(result); | 1213 future._complete(result); |
| 1103 return; | 1214 return; |
| 1104 } | 1215 } |
| 1105 try { | 1216 try { |
| 1106 throw IterableElementError.noElement(); | 1217 throw IterableElementError.noElement(); |
| 1107 } catch (e, s) { | 1218 } catch (e, s) { |
| 1108 _completeWithErrorCallback(future, e, s); | 1219 _completeWithErrorCallback(future, e, s); |
| 1109 } | 1220 } |
| 1110 }, | 1221 }, |
| 1111 cancelOnError: true); | 1222 cancelOnError: true); |
| 1112 return future; | 1223 return future; |
| 1113 } | 1224 } |
| 1114 | 1225 |
| 1115 /** | 1226 /** |
| 1116 * Returns the single element. | 1227 * The single element of this stream. |
| 1117 * | 1228 * |
| 1118 * If an error event occurs before or after the first data event, the | 1229 * If this stream emits an error event, |
| 1119 * resulting future is completed with that error. | 1230 * the returned future is completed with that error |
| 1231 * and processing stops. | |
| 1120 * | 1232 * |
| 1121 * If [this] is empty or has more than one element throws a [StateError]. | 1233 * If [this] is empty or has more than one element, |
| 1234 * the returned future completes with an error. | |
| 1122 */ | 1235 */ |
| 1123 Future<T> get single { | 1236 Future<T> get single { |
| 1124 _Future<T> future = new _Future<T>(); | 1237 _Future<T> future = new _Future<T>(); |
| 1125 T result = null; | 1238 T result = null; |
| 1126 bool foundResult = false; | 1239 bool foundResult = false; |
| 1127 StreamSubscription subscription; | 1240 StreamSubscription subscription; |
| 1128 subscription = this.listen( | 1241 subscription = this.listen( |
| 1129 (T value) { | 1242 (T value) { |
| 1130 if (foundResult) { | 1243 if (foundResult) { |
| 1131 // This is the second element we get. | 1244 // This is the second element we get. |
| (...skipping 19 matching lines...) Expand all Loading... | |
| 1151 _completeWithErrorCallback(future, e, s); | 1264 _completeWithErrorCallback(future, e, s); |
| 1152 } | 1265 } |
| 1153 }, | 1266 }, |
| 1154 cancelOnError: true); | 1267 cancelOnError: true); |
| 1155 return future; | 1268 return future; |
| 1156 } | 1269 } |
| 1157 | 1270 |
| 1158 /** | 1271 /** |
| 1159 * Finds the first element of this stream matching [test]. | 1272 * Finds the first element of this stream matching [test]. |
| 1160 * | 1273 * |
| 1161 * Returns a future that is filled with the first element of this stream | 1274 * Returns a future that is completed with the first element of this stream |
| 1162 * that [test] returns true for. | 1275 * that [test] returns `true` for. |
| 1163 * | 1276 * |
| 1164 * If no such element is found before this stream is done, and a | 1277 * If no such element is found before this stream is done, and a |
| 1165 * [defaultValue] function is provided, the result of calling [defaultValue] | 1278 * [defaultValue] function is provided, the result of calling [defaultValue] |
| 1166 * becomes the value of the future. | 1279 * becomes the value of the future. If [defaultValue] throws, the returned |
| 1280 * future is completed with that error. | |
| 1167 * | 1281 * |
| 1168 * Stops listening to the stream after the first matching element has been | 1282 * If this stream emits an error before the first matching element, |
| 1169 * received. | 1283 * the returned future is completed with that error, and processing stops. |
| 1284 * | |
| 1285 * Stops listening to the stream after the first matching element or error | |
| 1286 * has been received. | |
| 1170 * | 1287 * |
| 1171 * Internally the method cancels its subscription after the first element that | 1288 * Internally the method cancels its subscription after the first element that |
| 1172 * matches the predicate. This means that single-subscription (non-broadcast) | 1289 * matches the predicate. This means that single-subscription (non-broadcast) |
| 1173 * streams are closed and cannot be reused after a call to this method. | 1290 * streams are closed and cannot be reused after a call to this method. |
| 1174 * | 1291 * |
| 1175 * If an error occurs, or if this stream ends without finding a match and | 1292 * If an error occurs, or if this stream ends without finding a match and |
| 1176 * with no [defaultValue] function provided, the future will receive an | 1293 * with no [defaultValue] function provided, |
| 1177 * error. | 1294 * the returned future is completed with an error. |
| 1178 */ | 1295 */ |
| 1179 Future<dynamic> firstWhere(bool test(T element), {Object defaultValue()}) { | 1296 Future<dynamic> firstWhere(bool test(T element), {Object defaultValue()}) { |
| 1180 _Future<dynamic> future = new _Future(); | 1297 _Future<dynamic> future = new _Future(); |
| 1181 StreamSubscription subscription; | 1298 StreamSubscription subscription; |
| 1182 subscription = this.listen( | 1299 subscription = this.listen( |
| 1183 (T value) { | 1300 (T value) { |
| 1184 _runUserCode(() => test(value), (bool isMatch) { | 1301 _runUserCode(() => test(value), (bool isMatch) { |
| 1185 if (isMatch) { | 1302 if (isMatch) { |
| 1186 _cancelAndValue(subscription, future, value); | 1303 _cancelAndValue(subscription, future, value); |
| 1187 } | 1304 } |
| (...skipping 11 matching lines...) Expand all Loading... | |
| 1199 _completeWithErrorCallback(future, e, s); | 1316 _completeWithErrorCallback(future, e, s); |
| 1200 } | 1317 } |
| 1201 }, | 1318 }, |
| 1202 cancelOnError: true); | 1319 cancelOnError: true); |
| 1203 return future; | 1320 return future; |
| 1204 } | 1321 } |
| 1205 | 1322 |
| 1206 /** | 1323 /** |
| 1207 * Finds the last element in this stream matching [test]. | 1324 * Finds the last element in this stream matching [test]. |
| 1208 * | 1325 * |
| 1209 * As [firstWhere], except that the last matching element is found. | 1326 * If this stream emits an error, the returned future is completed with that |
| 1210 * That means that the result cannot be provided before this stream | 1327 * error, and processing stops. |
| 1328 * | |
| 1329 * Otherwise as [firstWhere], except that the last matching element is found | |
| 1330 * instead of the first. | |
| 1331 * That means that a non-error result cannot be provided before this stream | |
| 1211 * is done. | 1332 * is done. |
| 1212 */ | 1333 */ |
| 1213 Future<dynamic> lastWhere(bool test(T element), {Object defaultValue()}) { | 1334 Future<dynamic> lastWhere(bool test(T element), {Object defaultValue()}) { |
| 1214 _Future<dynamic> future = new _Future(); | 1335 _Future<dynamic> future = new _Future(); |
| 1215 T result = null; | 1336 T result = null; |
| 1216 bool foundResult = false; | 1337 bool foundResult = false; |
| 1217 StreamSubscription subscription; | 1338 StreamSubscription subscription; |
| 1218 subscription = this.listen( | 1339 subscription = this.listen( |
| 1219 (T value) { | 1340 (T value) { |
| 1220 _runUserCode(() => true == test(value), (bool isMatch) { | 1341 _runUserCode(() => true == test(value), (bool isMatch) { |
| (...skipping 235 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 1456 * operation). If the listener wants to delete the file after having | 1577 * operation). If the listener wants to delete the file after having |
| 1457 * canceled the subscription, it must wait for the cleanup future to complete. | 1578 * canceled the subscription, it must wait for the cleanup future to complete. |
| 1458 * | 1579 * |
| 1459 * A returned future completes with a `null` value. | 1580 * A returned future completes with a `null` value. |
| 1460 * If the cleanup throws, which it really shouldn't, the returned future | 1581 * If the cleanup throws, which it really shouldn't, the returned future |
| 1461 * completes with that error. | 1582 * completes with that error. |
| 1462 */ | 1583 */ |
| 1463 Future cancel(); | 1584 Future cancel(); |
| 1464 | 1585 |
| 1465 /** | 1586 /** |
| 1466 * Set or override the data event handler of this subscription. | 1587 * Replaces the data event handler of this subscription. |
| 1467 * | 1588 * |
| 1468 * This method overrides the handler that has been set at the invocation of | 1589 * The [handleData] function is called for each element of the stream |
| 1469 * [Stream.listen]. | 1590 * after this function is called. |
| 1591 * If [handleData] is `null`, further elements are ignored. | |
| 1592 * | |
| 1593 * This method replaces the current handler set by the invocation of | |
| 1594 * [Stream.listen] or by a previous call to [onData]. | |
| 1470 */ | 1595 */ |
| 1471 void onData(void handleData(T data)); | 1596 void onData(void handleData(T data)); |
| 1472 | 1597 |
| 1473 /** | 1598 /** |
| 1474 * Set or override the error event handler of this subscription. | 1599 * Replaces the error event handler of this subscription. |
| 1475 * | 1600 * |
| 1476 * This method overrides the handler that has been set at the invocation of | 1601 * The [handleError] function must be able to be called with either |
| 1477 * [Stream.listen] or by calling [asFuture]. | 1602 * one positional argument, or with two positional arguments |
| 1603 * where the seconds is always a [StackTrace]. | |
| 1604 * | |
| 1605 * The [handleError] argument may be `null`, in which case further | |
| 1606 * error events are considered unhandled, and will be reported to | |
| 1607 * [Zone.handleUncaughtError]. | |
| 1608 * | |
| 1609 * The provided function is called for all error events from the | |
| 1610 * stream subscription. | |
| 1611 * | |
| 1612 * This method replaces the current handler set by the invocation of | |
| 1613 * [Stream.listen], by calling [asFuture], or by a previous call to [onError]. | |
| 1478 */ | 1614 */ |
| 1479 void onError(Function handleError); | 1615 void onError(Function handleError); |
| 1480 | 1616 |
| 1481 /** | 1617 /** |
| 1482 * Set or override the done event handler of this subscription. | 1618 * Replaces the done event handler of this subscription. |
| 1483 * | 1619 * |
| 1484 * This method overrides the handler that has been set at the invocation of | 1620 * The [handleDone] function is called when the stream closes. |
| 1485 * [Stream.listen] or by calling [asFuture]. | 1621 * The value may be `null`, in which case no function is called. |
| 1622 * | |
| 1623 * This method reaplces the current handler set by the invocation of | |
| 1624 * [Stream.listen], by calling [asFuture], or by a previous call to [onDone]. | |
| 1486 */ | 1625 */ |
| 1487 void onDone(void handleDone()); | 1626 void onDone(void handleDone()); |
| 1488 | 1627 |
| 1489 /** | 1628 /** |
| 1490 * Request that the stream pauses events until further notice. | 1629 * Request that the stream pauses events until further notice. |
| 1491 * | 1630 * |
| 1492 * While paused, the subscription will not fire any events. | 1631 * While paused, the subscription will not fire any events. |
| 1493 * If it receives events from its source, they will be buffered until | 1632 * If it receives events from its source, they will be buffered until |
| 1494 * the subscription is resumed. | 1633 * the subscription is resumed. |
| 1495 * The underlying source is usually informed about the pause, | 1634 * For non-broadcast streams, the underlying source is usually informed |
| 1635 * about the pause, | |
| 1496 * so it can stop generating events until the subscription is resumed. | 1636 * so it can stop generating events until the subscription is resumed. |
| 1497 * | 1637 * |
| 1498 * To avoid buffering events on a broadcast stream, it is better to | 1638 * To avoid buffering events on a broadcast stream, it is better to |
| 1499 * cancel this subscription, and start to listen again when events | 1639 * cancel this subscription, and start to listen again when events |
| 1500 * are needed. | 1640 * are needed, if the intermediate events are not important. |
| 1501 * | 1641 * |
| 1502 * If [resumeSignal] is provided, the stream will undo the pause | 1642 * If [resumeSignal] is provided, the stream subscription will undo the pause |
| 1503 * when the future completes. If the future completes with an error, | 1643 * when the future completes, as if by a call to [resume]. |
| 1504 * the stream will resume, but the error will not be handled! | 1644 * If the future completes with an error, |
| 1645 * the stream will still resume, but the error will be considered unhandled | |
| 1646 * and is passed to [Zone.handleUncaughtError]. | |
| 1505 * | 1647 * |
| 1506 * A call to [resume] will also undo a pause. | 1648 * A call to [resume] will also undo a pause. |
| 1507 * | 1649 * |
| 1508 * If the subscription is paused more than once, an equal number | 1650 * If the subscription is paused more than once, an equal number |
| 1509 * of resumes must be performed to resume the stream. | 1651 * of resumes must be performed to resume the stream. |
| 1510 * | 1652 * |
| 1511 * Currently DOM streams silently drop events when the stream is paused. This | 1653 * Currently DOM streams silently drop events when the stream is paused. This |
| 1512 * is a bug and will be fixed. | 1654 * is a bug and will be fixed. |
| 1513 */ | 1655 */ |
| 1514 void pause([Future resumeSignal]); | 1656 void pause([Future resumeSignal]); |
| 1515 | 1657 |
| 1516 /** | 1658 /** |
| 1517 * Resume after a pause. | 1659 * Resume after a pause. |
| 1660 * | |
| 1661 * This undoes one previous call to [pause]. | |
| 1662 * When all previously calls to [pause] have been matched by a calls to | |
| 1663 * [resume], possibly through a `resumeSignal` passed to [pause], | |
| 1664 * the stream subscription may emit events again. | |
| 1518 */ | 1665 */ |
| 1519 void resume(); | 1666 void resume(); |
| 1520 | 1667 |
| 1521 /** | 1668 /** |
| 1522 * Returns true if the [StreamSubscription] is paused. | 1669 * Whether the [StreamSubscription] is currently paused. |
| 1670 * | |
| 1671 * If there have been more calls to [pause] than to [resume] on this | |
| 1672 * stream subscription, the subscription is paused, and this getter | |
| 1673 * returns `true`. | |
| 1674 * | |
| 1675 * Returns `false` if the stream can currently emit events, or if | |
| 1676 * the subscription has completed or been cancelled. | |
| 1523 */ | 1677 */ |
| 1524 bool get isPaused; | 1678 bool get isPaused; |
| 1525 | 1679 |
| 1526 /** | 1680 /** |
| 1527 * Returns a future that handles the [onDone] and [onError] callbacks. | 1681 * Returns a future that handles the [onDone] and [onError] callbacks. |
| 1528 * | 1682 * |
| 1529 * This method *overwrites* the existing [onDone] and [onError] callbacks | 1683 * This method *overwrites* the existing [onDone] and [onError] callbacks |
| 1530 * with new ones that complete the returned future. | 1684 * with new ones that complete the returned future. |
| 1531 * | 1685 * |
| 1532 * In case of an error the subscription will automatically cancel (even | 1686 * In case of an error the subscription will automatically cancel (even |
| (...skipping 152 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 1685 * Return a future which is completed when the [StreamSink] is finished. | 1839 * Return a future which is completed when the [StreamSink] is finished. |
| 1686 * | 1840 * |
| 1687 * If the `StreamSink` fails with an error, | 1841 * If the `StreamSink` fails with an error, |
| 1688 * perhaps in response to adding events using [add], [addError] or [close], | 1842 * perhaps in response to adding events using [add], [addError] or [close], |
| 1689 * the [done] future will complete with that error. | 1843 * the [done] future will complete with that error. |
| 1690 * | 1844 * |
| 1691 * Otherwise, the returned future will complete when either: | 1845 * Otherwise, the returned future will complete when either: |
| 1692 * | 1846 * |
| 1693 * * all events have been processed and the sink has been closed, or | 1847 * * all events have been processed and the sink has been closed, or |
| 1694 * * the sink has otherwise been stopped from handling more events | 1848 * * the sink has otherwise been stopped from handling more events |
| 1695 * (for example by cancelling a stream subscription). | 1849 * (for example by canceling a stream subscription). |
| 1696 */ | 1850 */ |
| 1697 Future get done; | 1851 Future get done; |
| 1698 } | 1852 } |
| 1699 | 1853 |
| 1700 /** | 1854 /** |
| 1701 * Transforms a Stream. | 1855 * Transforms a Stream. |
| 1702 * | 1856 * |
| 1703 * When a stream's [Stream.transform] method is invoked with a | 1857 * When a stream's [Stream.transform] method is invoked with a |
| 1704 * [StreamTransformer], the stream calls the [bind] method on the provided | 1858 * [StreamTransformer], the stream calls the [bind] method on the provided |
| 1705 * transformer. The resulting stream is then returned from the | 1859 * transformer. The resulting stream is then returned from the |
| (...skipping 201 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 1907 | 2061 |
| 1908 void addError(error, [StackTrace stackTrace]) { | 2062 void addError(error, [StackTrace stackTrace]) { |
| 1909 _sink.addError(error, stackTrace); | 2063 _sink.addError(error, stackTrace); |
| 1910 } | 2064 } |
| 1911 | 2065 |
| 1912 void close() { | 2066 void close() { |
| 1913 _sink.close(); | 2067 _sink.close(); |
| 1914 } | 2068 } |
| 1915 } | 2069 } |
| 1916 | 2070 |
| 1917 /// A group created by [Stream.groupBy] or [Stream.groupByMapped]. | 2071 /// A group created by [Stream.groupBy]. |
| 1918 /// | 2072 /// |
| 1919 /// The stream created by `groupBy` emits a `GroupedEvents` for each distinct ke y | 2073 /// The stream created by `groupBy` emits a `GroupedEvents` |
| 1920 /// it encounters. | 2074 /// for each distinct key it encounters. |
| 1921 /// This group contains the [key] itself, along with a stream of the [values] | 2075 /// This group contains the [key] itself, along with a stream of the [values] |
| 1922 /// associated with that key. | 2076 /// associated with that key. |
| 1923 class GroupedEvents<K, V> { | 2077 class GroupedEvents<K, V> { |
| 1924 /// The key that identifiers the values emitted by [values]. | 2078 /// The key that identifiers the values emitted by [values]. |
| 1925 final K key; | 2079 final K key; |
| 1926 | 2080 |
| 1927 /// The [values] that [GroupBy] have grouped by the common [key]. | 2081 /// The [values] that [GroupBy] have grouped by the common [key]. |
| 1928 final Stream<V> values; | 2082 final Stream<V> values; |
| 1929 | 2083 |
| 1930 factory GroupedEvents(K key, Stream<V> values) = GroupedEvents<K, V>._; | 2084 factory GroupedEvents(K key, Stream<V> values) = GroupedEvents<K, V>._; |
| 1931 | 2085 |
| 1932 // Don't expose a generative constructor. | 2086 // Don't expose a generative constructor. |
| 1933 // This class is not intended for subclassing, so we don't want to promise | 2087 // This class is not intended for subclassing, so we don't want to promise |
| 1934 // it. We can change that in the future. | 2088 // it. We can change that in the future. |
| 1935 GroupedEvents._(this.key, this.values); | 2089 GroupedEvents._(this.key, this.values); |
| 1936 | 2090 |
| 1937 /// Tells [values] to discard values instead of retaining them. | 2091 /// Tells [values] to discard values instead of retaining them. |
| 1938 /// | 2092 /// |
| 1939 /// Must only be used instead of listening to the [values] stream. | 2093 /// Must only be used instead of listening to the [values] stream. |
| 1940 /// If the stream has been listened to, this call fails. | 2094 /// If the stream has been listened to, this call fails. |
| 1941 /// After calling this method, listening on the [values] stream fails. | 2095 /// After calling this method, listening on the [values] stream fails. |
| 1942 Future cancel() { | 2096 Future cancel() { |
| 1943 // If values has been listened to, | 2097 // If values has been listened to, |
| 1944 // this throws a StateError saying that stream has already been listened to, | 2098 // this throws a StateError saying that stream has already been listened to, |
| 1945 // which is a correct error message for this call too. | 2099 // which is a correct error message for this call too. |
| 1946 return values.listen(null).cancel(); | 2100 return values.listen(null).cancel(); |
| 1947 } | 2101 } |
| 1948 } | 2102 } |
| OLD | NEW |