Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
| 8 // Core Stream types | 8 // Core Stream types |
| 9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
| 10 | 10 |
| (...skipping 13 matching lines...) Expand all Loading... | |
| 24 return stream; | 24 return stream; |
| 25 } | 25 } |
| 26 | 26 |
| 27 /** | 27 /** |
| 28 * Creates a single-subscription stream that gets its data from [data]. | 28 * Creates a single-subscription stream that gets its data from [data]. |
| 29 */ | 29 */ |
| 30 factory Stream.fromIterable(Iterable<T> data) { | 30 factory Stream.fromIterable(Iterable<T> data) { |
| 31 return new _IterableSingleStreamImpl<T>(data); | 31 return new _IterableSingleStreamImpl<T>(data); |
| 32 } | 32 } |
| 33 | 33 |
| 34 /** Whether the stream is a single-subscription stream. */ | |
| 35 bool get isSingleSubscription; | |
|
floitsch
2013/01/14 16:09:09
different CL, but sometimes we have singleSubscrib
| |
| 36 | |
| 34 /** | 37 /** |
| 35 * Returns a multi-subscription stream that produces the same events as this. | 38 * Returns a multi-subscription stream that produces the same events as this. |
| 36 * | 39 * |
| 37 * If this stream is single-subscription, return a new stream that allows | 40 * If this stream is single-subscription, return a new stream that allows |
| 38 * multiple subscribers. It will subscribe to this stream when its first | 41 * multiple subscribers. It will subscribe to this stream when its first |
| 39 * subscriber is added, and unsubscribe again when the last subscription is | 42 * subscriber is added, and unsubscribe again when the last subscription is |
| 40 * cancelled. | 43 * cancelled. |
| 41 * | 44 * |
| 42 * If this stream is already multi-subscriber, it is returned unmodified. | 45 * If this stream is already multi-subscriber, it is returned unmodified. |
| 43 */ | 46 */ |
| (...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 89 void onDone(), | 92 void onDone(), |
| 90 bool unsubscribeOnError}); | 93 bool unsubscribeOnError}); |
| 91 | 94 |
| 92 /** | 95 /** |
| 93 * Creates a new stream from this stream that discards some data events. | 96 * Creates a new stream from this stream that discards some data events. |
| 94 * | 97 * |
| 95 * The new stream sends the same error and done events as this stream, | 98 * The new stream sends the same error and done events as this stream, |
| 96 * but it only sends the data events that satisfy the [test]. | 99 * but it only sends the data events that satisfy the [test]. |
| 97 */ | 100 */ |
| 98 Stream<T> where(bool test(T event)) { | 101 Stream<T> where(bool test(T event)) { |
| 99 return this.transform(new WhereStream<T>(test)); | 102 return this.transform(new WhereTransformer<T>(test)); |
| 100 } | 103 } |
| 101 | 104 |
| 102 /** | 105 /** |
| 103 * Create a new stream that converts each element of this stream | 106 * Create a new stream that converts each element of this stream |
| 104 * to a new value using the [convert] function. | 107 * to a new value using the [convert] function. |
| 105 */ | 108 */ |
| 106 Stream mappedBy(convert(T event)) { | 109 Stream mappedBy(convert(T event)) { |
| 107 return this.transform(new MapStream<T, dynamic>(convert)); | 110 return this.transform(new MapTransformer<T, dynamic>(convert)); |
| 108 } | 111 } |
| 109 | 112 |
| 110 /** | 113 /** |
| 111 * Create a wrapper Stream that intercepts some errors from this stream. | 114 * Create a wrapper Stream that intercepts some errors from this stream. |
| 112 * | 115 * |
| 113 * If this stream sends an error that matches [test], then it is intercepted | 116 * If this stream sends an error that matches [test], then it is intercepted |
| 114 * by the [handle] function. | 117 * by the [handle] function. |
| 115 * | 118 * |
| 116 * An [AsyncError] [:e:] is matched by a test function if [:test(e):] returns | 119 * An [AsyncError] [:e:] is matched by a test function if [:test(e):] returns |
| 117 * true. If [test] is omitted, every error is considered matching. | 120 * true. If [test] is omitted, every error is considered matching. |
| 118 * | 121 * |
| 119 * If the error is intercepted, the [handle] function can decide what to do | 122 * If the error is intercepted, the [handle] function can decide what to do |
| 120 * with it. It can throw if it wants to raise a new (or the same) error, | 123 * with it. It can throw if it wants to raise a new (or the same) error, |
| 121 * or simply return to make the stream forget the error. | 124 * or simply return to make the stream forget the error. |
|
floitsch
2013/01/14 16:09:09
different CL: "if the error needs to be transforme
Lasse Reichstein Nielsen
2013/01/15 07:19:51
Done.
| |
| 122 */ | 125 */ |
| 123 Stream<T> handleError(void handle(AsyncError error), { bool test(error) }) { | 126 Stream<T> handleError(void handle(AsyncError error), { bool test(error) }) { |
| 124 return this.transform(new HandleErrorStream<T>(handle, test)); | 127 return this.transform(new HandleErrorTransformer<T>(handle, test)); |
| 125 } | 128 } |
| 126 | 129 |
| 127 /** | 130 /** |
| 128 * Create a new stream from this stream that converts each element | 131 * Create a new stream from this stream that converts each element |
| 129 * into zero or more events. | 132 * into zero or more events. |
| 130 * | 133 * |
| 131 * Each incoming event is converted to an [Iterable] of new events, | 134 * Each incoming event is converted to an [Iterable] of new events, |
| 132 * and each of these new events are then sent by the returned stream | 135 * and each of these new events are then sent by the returned stream |
| 133 * in order. | 136 * in order. |
| 134 */ | 137 */ |
| 135 Stream expand(Iterable convert(T value)) { | 138 Stream expand(Iterable convert(T value)) { |
| 136 return this.transform(new ExpandStream<T, dynamic>(convert)); | 139 return this.transform( |
| 140 new ExpandTransformer<T, dynamic>(convert)); | |
| 137 } | 141 } |
| 138 | 142 |
| 139 /** | 143 /** |
| 140 * Bind this stream as the input of the provided [StreamConsumer]. | 144 * Bind this stream as the input of the provided [StreamConsumer]. |
| 141 */ | 145 */ |
| 142 Future pipe(StreamConsumer<dynamic, T> streamConsumer) { | 146 Future pipe(StreamConsumer<dynamic, T> streamConsumer) { |
| 143 return streamConsumer.consume(this); | 147 return streamConsumer.consume(this); |
| 144 } | 148 } |
| 145 | 149 |
| 146 /** | 150 /** |
| (...skipping 281 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 428 /** | 432 /** |
| 429 * Provide at most the first [n] values of this stream. | 433 * Provide at most the first [n] values of this stream. |
| 430 * | 434 * |
| 431 * Forwards the first [n] data events of this stream, and all error | 435 * Forwards the first [n] data events of this stream, and all error |
| 432 * events, to the returned stream, and ends with a done event. | 436 * events, to the returned stream, and ends with a done event. |
| 433 * | 437 * |
| 434 * If this stream produces fewer than [count] values before it's done, | 438 * If this stream produces fewer than [count] values before it's done, |
| 435 * so will the returned stream. | 439 * so will the returned stream. |
| 436 */ | 440 */ |
| 437 Stream<T> take(int count) { | 441 Stream<T> take(int count) { |
| 438 return this.transform(new TakeStream(count)); | 442 return this.transform(new TakeTransformer<T>(count)); |
| 439 } | 443 } |
| 440 | 444 |
| 441 /** | 445 /** |
| 442 * Forwards data events while [test] is successful. | 446 * Forwards data events while [test] is successful. |
| 443 * | 447 * |
| 444 * The returned stream provides the same events as this stream as long | 448 * The returned stream provides the same events as this stream as long |
| 445 * as [test] returns [:true:] for the event data. The stream is done | 449 * as [test] returns [:true:] for the event data. The stream is done |
| 446 * when either this stream is done, or when this stream first provides | 450 * when either this stream is done, or when this stream first provides |
| 447 * a value that [test] doesn't accept. | 451 * a value that [test] doesn't accept. |
| 448 */ | 452 */ |
| 449 Stream<T> takeWhile(bool test(T value)) { | 453 Stream<T> takeWhile(bool test(T value)) { |
| 450 return this.transform(new TakeWhileStream(test)); | 454 return this.transform(new TakeWhileTransformer<T>(test)); |
| 451 } | 455 } |
| 452 | 456 |
| 453 /** | 457 /** |
| 454 * Skips the first [count] data events from this stream. | 458 * Skips the first [count] data events from this stream. |
| 455 */ | 459 */ |
| 456 Stream<T> skip(int count) { | 460 Stream<T> skip(int count) { |
| 457 return this.transform(new SkipStream(count)); | 461 return this.transform(new SkipTransformer<T>(count)); |
| 458 } | 462 } |
| 459 | 463 |
| 460 /** | 464 /** |
| 461 * Skip data events from this stream while they are matched by [test]. | 465 * Skip data events from this stream while they are matched by [test]. |
| 462 * | 466 * |
| 463 * Error and done events are provided by the returned stream unmodified. | 467 * Error and done events are provided by the returned stream unmodified. |
| 464 * | 468 * |
| 465 * Starting with the first data event where [test] returns true for the | 469 * Starting with the first data event where [test] returns true for the |
| 466 * event data, the returned stream will have the same events as this stream. | 470 * event data, the returned stream will have the same events as this stream. |
| 467 */ | 471 */ |
| 468 Stream<T> skipWhile(bool test(T value)) { | 472 Stream<T> skipWhile(bool test(T value)) { |
| 469 return this.transform(new SkipWhileStream(test)); | 473 return this.transform(new SkipWhileTransformer<T>(test)); |
| 470 } | 474 } |
| 471 | 475 |
| 472 /** | 476 /** |
| 473 * Skip data events if they are equal to the previous data event. | 477 * Skip data events if they are equal to the previous data event. |
| 474 * | 478 * |
| 475 * The returned stream provides the same events as this stream, except | 479 * The returned stream provides the same events as this stream, except |
| 476 * that it never provides two consequtive data events that are equal. | 480 * that it never provides two consequtive data events that are equal. |
| 477 * | 481 * |
| 478 * Equality is determined by the provided [equals] method. If that is | 482 * Equality is determined by the provided [equals] method. If that is |
| 479 * omitted, the '==' operator on the last provided data element is used. | 483 * omitted, the '==' operator on the last provided data element is used. |
| 480 */ | 484 */ |
| 481 Stream<T> distinct([bool equals(T previous, T next)]) { | 485 Stream<T> distinct([bool equals(T previous, T next)]) { |
| 482 return this.transform(new DistinctStream(equals)); | 486 return this.transform(new DistinctTransformer<T>(equals)); |
| 483 } | 487 } |
| 484 | 488 |
| 485 /** | 489 /** |
| 486 * Returns the first element. | 490 * Returns the first element. |
| 487 * | 491 * |
| 488 * If [this] is empty throws a [StateError]. Otherwise this method is | 492 * If [this] is empty throws a [StateError]. Otherwise this method is |
| 489 * equivalent to [:this.elementAt(0):] | 493 * equivalent to [:this.elementAt(0):] |
| 490 */ | 494 */ |
| 491 Future<T> get first { | 495 Future<T> get first { |
| 492 _FutureImpl<T> future = new _FutureImpl(); | 496 _FutureImpl<T> future = new _FutureImpl<T>(); |
| 493 StreamSubscription subscription; | 497 StreamSubscription subscription; |
| 494 subscription = this.listen( | 498 subscription = this.listen( |
| 495 (T value) { | 499 (T value) { |
| 496 future._setValue(value); | 500 future._setValue(value); |
| 497 subscription.cancel(); | 501 subscription.cancel(); |
| 498 return; | 502 return; |
| 499 }, | 503 }, |
| 500 onError: future._setError, | 504 onError: future._setError, |
| 501 onDone: () { | 505 onDone: () { |
| 502 future._setError(new AsyncError(new StateError("No elements"))); | 506 future._setError(new AsyncError(new StateError("No elements"))); |
| (...skipping 194 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 697 /** | 701 /** |
| 698 * Returns the value of the [index]th data event of this stream. | 702 * Returns the value of the [index]th data event of this stream. |
| 699 * | 703 * |
| 700 * If an error event occurs, the future will end with this error. | 704 * If an error event occurs, the future will end with this error. |
| 701 * | 705 * |
| 702 * If this stream provides fewer than [index] elements before closing, | 706 * If this stream provides fewer than [index] elements before closing, |
| 703 * an error is reported. | 707 * an error is reported. |
| 704 */ | 708 */ |
| 705 Future<T> elementAt(int index) { | 709 Future<T> elementAt(int index) { |
| 706 if (index is! int || index < 0) throw new ArgumentError(index); | 710 if (index is! int || index < 0) throw new ArgumentError(index); |
| 707 _FutureImpl<T> future = new _FutureImpl(); | 711 _FutureImpl<T> future = new _FutureImpl<T>(); |
| 708 StreamSubscription subscription; | 712 StreamSubscription subscription; |
| 709 subscription = this.listen( | 713 subscription = this.listen( |
| 710 (T value) { | 714 (T value) { |
| 711 if (index == 0) { | 715 if (index == 0) { |
| 712 future._setValue(value); | 716 future._setValue(value); |
| 713 subscription.cancel(); | 717 subscription.cancel(); |
| 714 return; | 718 return; |
| 715 } | 719 } |
| 716 index -= 1; | 720 index -= 1; |
| 717 }, | 721 }, |
| (...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 779 void signalError(AsyncError errorEvent); | 783 void signalError(AsyncError errorEvent); |
| 780 void close(); | 784 void close(); |
| 781 } | 785 } |
| 782 | 786 |
| 783 /** [Stream] wrapper that only exposes the [Stream] interface. */ | 787 /** [Stream] wrapper that only exposes the [Stream] interface. */ |
| 784 class StreamView<T> extends Stream<T> { | 788 class StreamView<T> extends Stream<T> { |
| 785 Stream<T> _stream; | 789 Stream<T> _stream; |
| 786 | 790 |
| 787 StreamView(this._stream); | 791 StreamView(this._stream); |
| 788 | 792 |
| 793 bool get isSingleSubscription => _stream.isSingleSubscription; | |
| 794 | |
| 789 StreamSubscription<T> listen(void onData(T value), | 795 StreamSubscription<T> listen(void onData(T value), |
| 790 { void onError(AsyncError error), | 796 { void onError(AsyncError error), |
| 791 void onDone(), | 797 void onDone(), |
| 792 bool unsubscribeOnError }) { | 798 bool unsubscribeOnError }) { |
| 793 return _stream.listen(onData, onError: onError, onDone: onDone, | 799 return _stream.listen(onData, onError: onError, onDone: onDone, |
| 794 unsubscribeOnError: unsubscribeOnError); | 800 unsubscribeOnError: unsubscribeOnError); |
| 795 } | 801 } |
| 796 } | 802 } |
| 797 | 803 |
| 798 /** | 804 /** |
| (...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 831 * Create a [StreamTransformer] that delegates events to the given functions. | 837 * Create a [StreamTransformer] that delegates events to the given functions. |
| 832 * | 838 * |
| 833 * If a parameter is omitted, a default handler is used that forwards the | 839 * If a parameter is omitted, a default handler is used that forwards the |
| 834 * event directly to the sink. | 840 * event directly to the sink. |
| 835 * | 841 * |
| 836 * Pauses on the are forwarded to the input stream as well. | 842 * Pauses on the are forwarded to the input stream as well. |
| 837 */ | 843 */ |
| 838 factory StreamTransformer.from({ | 844 factory StreamTransformer.from({ |
| 839 void onData(S data, StreamSink<T> sink), | 845 void onData(S data, StreamSink<T> sink), |
| 840 void onError(AsyncError error, StreamSink<T> sink), | 846 void onError(AsyncError error, StreamSink<T> sink), |
| 841 void onDone(StreamSink<T> sink)}) = _StreamTransformerFunctionWrapper; | 847 void onDone(StreamSink<T> sink)}) { |
| 848 return new _StreamTransformerImpl<S, T>(onData, onError, onDone); | |
| 849 } | |
| 842 | 850 |
| 843 Stream<T> bind(Stream<S> stream); | 851 Stream<T> bind(Stream<S> stream); |
| 844 } | 852 } |
| 845 | 853 |
| 846 | 854 |
| 847 // TODO(lrn): Remove this class. | 855 // TODO(lrn): Remove this class. |
| 848 /** | 856 /** |
| 849 * A base class for configuration objects for [TransformStream]. | 857 * A base class for configuration objects for [TransformStream]. |
| 850 * | 858 * |
| 851 * A default implementation forwards all incoming events to the output sink. | 859 * A default implementation forwards all incoming events to the output sink. |
| (...skipping 20 matching lines...) Expand all Loading... | |
| 872 sink.signalError(error); | 880 sink.signalError(error); |
| 873 } | 881 } |
| 874 | 882 |
| 875 /** | 883 /** |
| 876 * Handle an incoming done event. | 884 * Handle an incoming done event. |
| 877 */ | 885 */ |
| 878 void handleDone(StreamSink<T> sink) { | 886 void handleDone(StreamSink<T> sink) { |
| 879 sink.close(); | 887 sink.close(); |
| 880 } | 888 } |
| 881 } | 889 } |
| OLD | NEW |