OLD | NEW |
---|---|
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
8 // Core Stream types | 8 // Core Stream types |
9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
10 | 10 |
(...skipping 13 matching lines...) Expand all Loading... | |
24 return stream; | 24 return stream; |
25 } | 25 } |
26 | 26 |
27 /** | 27 /** |
28 * Creates a single-subscription stream that gets its data from [data]. | 28 * Creates a single-subscription stream that gets its data from [data]. |
29 */ | 29 */ |
30 factory Stream.fromIterable(Iterable<T> data) { | 30 factory Stream.fromIterable(Iterable<T> data) { |
31 return new _IterableSingleStreamImpl<T>(data); | 31 return new _IterableSingleStreamImpl<T>(data); |
32 } | 32 } |
33 | 33 |
34 /** Whether the stream is a single-subscription stream. */ | |
35 bool get isSingleSubscription; | |
floitsch
2013/01/14 16:09:09
different CL, but sometimes we have singleSubscrib
| |
36 | |
34 /** | 37 /** |
35 * Returns a multi-subscription stream that produces the same events as this. | 38 * Returns a multi-subscription stream that produces the same events as this. |
36 * | 39 * |
37 * If this stream is single-subscription, return a new stream that allows | 40 * If this stream is single-subscription, return a new stream that allows |
38 * multiple subscribers. It will subscribe to this stream when its first | 41 * multiple subscribers. It will subscribe to this stream when its first |
39 * subscriber is added, and unsubscribe again when the last subscription is | 42 * subscriber is added, and unsubscribe again when the last subscription is |
40 * cancelled. | 43 * cancelled. |
41 * | 44 * |
42 * If this stream is already multi-subscriber, it is returned unmodified. | 45 * If this stream is already multi-subscriber, it is returned unmodified. |
43 */ | 46 */ |
(...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
89 void onDone(), | 92 void onDone(), |
90 bool unsubscribeOnError}); | 93 bool unsubscribeOnError}); |
91 | 94 |
92 /** | 95 /** |
93 * Creates a new stream from this stream that discards some data events. | 96 * Creates a new stream from this stream that discards some data events. |
94 * | 97 * |
95 * The new stream sends the same error and done events as this stream, | 98 * The new stream sends the same error and done events as this stream, |
96 * but it only sends the data events that satisfy the [test]. | 99 * but it only sends the data events that satisfy the [test]. |
97 */ | 100 */ |
98 Stream<T> where(bool test(T event)) { | 101 Stream<T> where(bool test(T event)) { |
99 return this.transform(new WhereStream<T>(test)); | 102 return this.transform(new WhereTransformer<T>(test)); |
100 } | 103 } |
101 | 104 |
102 /** | 105 /** |
103 * Create a new stream that converts each element of this stream | 106 * Create a new stream that converts each element of this stream |
104 * to a new value using the [convert] function. | 107 * to a new value using the [convert] function. |
105 */ | 108 */ |
106 Stream mappedBy(convert(T event)) { | 109 Stream mappedBy(convert(T event)) { |
107 return this.transform(new MapStream<T, dynamic>(convert)); | 110 return this.transform(new MapTransformer<T, dynamic>(convert)); |
108 } | 111 } |
109 | 112 |
110 /** | 113 /** |
111 * Create a wrapper Stream that intercepts some errors from this stream. | 114 * Create a wrapper Stream that intercepts some errors from this stream. |
112 * | 115 * |
113 * If this stream sends an error that matches [test], then it is intercepted | 116 * If this stream sends an error that matches [test], then it is intercepted |
114 * by the [handle] function. | 117 * by the [handle] function. |
115 * | 118 * |
116 * An [AsyncError] [:e:] is matched by a test function if [:test(e):] returns | 119 * An [AsyncError] [:e:] is matched by a test function if [:test(e):] returns |
117 * true. If [test] is omitted, every error is considered matching. | 120 * true. If [test] is omitted, every error is considered matching. |
118 * | 121 * |
119 * If the error is intercepted, the [handle] function can decide what to do | 122 * If the error is intercepted, the [handle] function can decide what to do |
120 * with it. It can throw if it wants to raise a new (or the same) error, | 123 * with it. It can throw if it wants to raise a new (or the same) error, |
121 * or simply return to make the stream forget the error. | 124 * or simply return to make the stream forget the error. |
floitsch
2013/01/14 16:09:09
different CL: "if the error needs to be transforme
Lasse Reichstein Nielsen
2013/01/15 07:19:51
Done.
| |
122 */ | 125 */ |
123 Stream<T> handleError(void handle(AsyncError error), { bool test(error) }) { | 126 Stream<T> handleError(void handle(AsyncError error), { bool test(error) }) { |
124 return this.transform(new HandleErrorStream<T>(handle, test)); | 127 return this.transform(new HandleErrorTransformer<T>(handle, test)); |
125 } | 128 } |
126 | 129 |
127 /** | 130 /** |
128 * Create a new stream from this stream that converts each element | 131 * Create a new stream from this stream that converts each element |
129 * into zero or more events. | 132 * into zero or more events. |
130 * | 133 * |
131 * Each incoming event is converted to an [Iterable] of new events, | 134 * Each incoming event is converted to an [Iterable] of new events, |
132 * and each of these new events are then sent by the returned stream | 135 * and each of these new events are then sent by the returned stream |
133 * in order. | 136 * in order. |
134 */ | 137 */ |
135 Stream expand(Iterable convert(T value)) { | 138 Stream expand(Iterable convert(T value)) { |
136 return this.transform(new ExpandStream<T, dynamic>(convert)); | 139 return this.transform( |
140 new ExpandTransformer<T, dynamic>(convert)); | |
137 } | 141 } |
138 | 142 |
139 /** | 143 /** |
140 * Bind this stream as the input of the provided [StreamConsumer]. | 144 * Bind this stream as the input of the provided [StreamConsumer]. |
141 */ | 145 */ |
142 Future pipe(StreamConsumer<dynamic, T> streamConsumer) { | 146 Future pipe(StreamConsumer<dynamic, T> streamConsumer) { |
143 return streamConsumer.consume(this); | 147 return streamConsumer.consume(this); |
144 } | 148 } |
145 | 149 |
146 /** | 150 /** |
(...skipping 281 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
428 /** | 432 /** |
429 * Provide at most the first [n] values of this stream. | 433 * Provide at most the first [n] values of this stream. |
430 * | 434 * |
431 * Forwards the first [n] data events of this stream, and all error | 435 * Forwards the first [n] data events of this stream, and all error |
432 * events, to the returned stream, and ends with a done event. | 436 * events, to the returned stream, and ends with a done event. |
433 * | 437 * |
434 * If this stream produces fewer than [count] values before it's done, | 438 * If this stream produces fewer than [count] values before it's done, |
435 * so will the returned stream. | 439 * so will the returned stream. |
436 */ | 440 */ |
437 Stream<T> take(int count) { | 441 Stream<T> take(int count) { |
438 return this.transform(new TakeStream(count)); | 442 return this.transform(new TakeTransformer<T>(count)); |
439 } | 443 } |
440 | 444 |
441 /** | 445 /** |
442 * Forwards data events while [test] is successful. | 446 * Forwards data events while [test] is successful. |
443 * | 447 * |
444 * The returned stream provides the same events as this stream as long | 448 * The returned stream provides the same events as this stream as long |
445 * as [test] returns [:true:] for the event data. The stream is done | 449 * as [test] returns [:true:] for the event data. The stream is done |
446 * when either this stream is done, or when this stream first provides | 450 * when either this stream is done, or when this stream first provides |
447 * a value that [test] doesn't accept. | 451 * a value that [test] doesn't accept. |
448 */ | 452 */ |
449 Stream<T> takeWhile(bool test(T value)) { | 453 Stream<T> takeWhile(bool test(T value)) { |
450 return this.transform(new TakeWhileStream(test)); | 454 return this.transform(new TakeWhileTransformer<T>(test)); |
451 } | 455 } |
452 | 456 |
453 /** | 457 /** |
454 * Skips the first [count] data events from this stream. | 458 * Skips the first [count] data events from this stream. |
455 */ | 459 */ |
456 Stream<T> skip(int count) { | 460 Stream<T> skip(int count) { |
457 return this.transform(new SkipStream(count)); | 461 return this.transform(new SkipTransformer<T>(count)); |
458 } | 462 } |
459 | 463 |
460 /** | 464 /** |
461 * Skip data events from this stream while they are matched by [test]. | 465 * Skip data events from this stream while they are matched by [test]. |
462 * | 466 * |
463 * Error and done events are provided by the returned stream unmodified. | 467 * Error and done events are provided by the returned stream unmodified. |
464 * | 468 * |
465 * Starting with the first data event where [test] returns true for the | 469 * Starting with the first data event where [test] returns true for the |
466 * event data, the returned stream will have the same events as this stream. | 470 * event data, the returned stream will have the same events as this stream. |
467 */ | 471 */ |
468 Stream<T> skipWhile(bool test(T value)) { | 472 Stream<T> skipWhile(bool test(T value)) { |
469 return this.transform(new SkipWhileStream(test)); | 473 return this.transform(new SkipWhileTransformer<T>(test)); |
470 } | 474 } |
471 | 475 |
472 /** | 476 /** |
473 * Skip data events if they are equal to the previous data event. | 477 * Skip data events if they are equal to the previous data event. |
474 * | 478 * |
475 * The returned stream provides the same events as this stream, except | 479 * The returned stream provides the same events as this stream, except |
476 * that it never provides two consequtive data events that are equal. | 480 * that it never provides two consequtive data events that are equal. |
477 * | 481 * |
478 * Equality is determined by the provided [equals] method. If that is | 482 * Equality is determined by the provided [equals] method. If that is |
479 * omitted, the '==' operator on the last provided data element is used. | 483 * omitted, the '==' operator on the last provided data element is used. |
480 */ | 484 */ |
481 Stream<T> distinct([bool equals(T previous, T next)]) { | 485 Stream<T> distinct([bool equals(T previous, T next)]) { |
482 return this.transform(new DistinctStream(equals)); | 486 return this.transform(new DistinctTransformer<T>(equals)); |
483 } | 487 } |
484 | 488 |
485 /** | 489 /** |
486 * Returns the first element. | 490 * Returns the first element. |
487 * | 491 * |
488 * If [this] is empty throws a [StateError]. Otherwise this method is | 492 * If [this] is empty throws a [StateError]. Otherwise this method is |
489 * equivalent to [:this.elementAt(0):] | 493 * equivalent to [:this.elementAt(0):] |
490 */ | 494 */ |
491 Future<T> get first { | 495 Future<T> get first { |
492 _FutureImpl<T> future = new _FutureImpl(); | 496 _FutureImpl<T> future = new _FutureImpl<T>(); |
493 StreamSubscription subscription; | 497 StreamSubscription subscription; |
494 subscription = this.listen( | 498 subscription = this.listen( |
495 (T value) { | 499 (T value) { |
496 future._setValue(value); | 500 future._setValue(value); |
497 subscription.cancel(); | 501 subscription.cancel(); |
498 return; | 502 return; |
499 }, | 503 }, |
500 onError: future._setError, | 504 onError: future._setError, |
501 onDone: () { | 505 onDone: () { |
502 future._setError(new AsyncError(new StateError("No elements"))); | 506 future._setError(new AsyncError(new StateError("No elements"))); |
(...skipping 194 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
697 /** | 701 /** |
698 * Returns the value of the [index]th data event of this stream. | 702 * Returns the value of the [index]th data event of this stream. |
699 * | 703 * |
700 * If an error event occurs, the future will end with this error. | 704 * If an error event occurs, the future will end with this error. |
701 * | 705 * |
702 * If this stream provides fewer than [index] elements before closing, | 706 * If this stream provides fewer than [index] elements before closing, |
703 * an error is reported. | 707 * an error is reported. |
704 */ | 708 */ |
705 Future<T> elementAt(int index) { | 709 Future<T> elementAt(int index) { |
706 if (index is! int || index < 0) throw new ArgumentError(index); | 710 if (index is! int || index < 0) throw new ArgumentError(index); |
707 _FutureImpl<T> future = new _FutureImpl(); | 711 _FutureImpl<T> future = new _FutureImpl<T>(); |
708 StreamSubscription subscription; | 712 StreamSubscription subscription; |
709 subscription = this.listen( | 713 subscription = this.listen( |
710 (T value) { | 714 (T value) { |
711 if (index == 0) { | 715 if (index == 0) { |
712 future._setValue(value); | 716 future._setValue(value); |
713 subscription.cancel(); | 717 subscription.cancel(); |
714 return; | 718 return; |
715 } | 719 } |
716 index -= 1; | 720 index -= 1; |
717 }, | 721 }, |
(...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
779 void signalError(AsyncError errorEvent); | 783 void signalError(AsyncError errorEvent); |
780 void close(); | 784 void close(); |
781 } | 785 } |
782 | 786 |
783 /** [Stream] wrapper that only exposes the [Stream] interface. */ | 787 /** [Stream] wrapper that only exposes the [Stream] interface. */ |
784 class StreamView<T> extends Stream<T> { | 788 class StreamView<T> extends Stream<T> { |
785 Stream<T> _stream; | 789 Stream<T> _stream; |
786 | 790 |
787 StreamView(this._stream); | 791 StreamView(this._stream); |
788 | 792 |
793 bool get isSingleSubscription => _stream.isSingleSubscription; | |
794 | |
789 StreamSubscription<T> listen(void onData(T value), | 795 StreamSubscription<T> listen(void onData(T value), |
790 { void onError(AsyncError error), | 796 { void onError(AsyncError error), |
791 void onDone(), | 797 void onDone(), |
792 bool unsubscribeOnError }) { | 798 bool unsubscribeOnError }) { |
793 return _stream.listen(onData, onError: onError, onDone: onDone, | 799 return _stream.listen(onData, onError: onError, onDone: onDone, |
794 unsubscribeOnError: unsubscribeOnError); | 800 unsubscribeOnError: unsubscribeOnError); |
795 } | 801 } |
796 } | 802 } |
797 | 803 |
798 /** | 804 /** |
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
831 * Create a [StreamTransformer] that delegates events to the given functions. | 837 * Create a [StreamTransformer] that delegates events to the given functions. |
832 * | 838 * |
833 * If a parameter is omitted, a default handler is used that forwards the | 839 * If a parameter is omitted, a default handler is used that forwards the |
834 * event directly to the sink. | 840 * event directly to the sink. |
835 * | 841 * |
836 * Pauses on the are forwarded to the input stream as well. | 842 * Pauses on the are forwarded to the input stream as well. |
837 */ | 843 */ |
838 factory StreamTransformer.from({ | 844 factory StreamTransformer.from({ |
839 void onData(S data, StreamSink<T> sink), | 845 void onData(S data, StreamSink<T> sink), |
840 void onError(AsyncError error, StreamSink<T> sink), | 846 void onError(AsyncError error, StreamSink<T> sink), |
841 void onDone(StreamSink<T> sink)}) = _StreamTransformerFunctionWrapper; | 847 void onDone(StreamSink<T> sink)}) { |
848 return new _StreamTransformerImpl<S, T>(onData, onError, onDone); | |
849 } | |
842 | 850 |
843 Stream<T> bind(Stream<S> stream); | 851 Stream<T> bind(Stream<S> stream); |
844 } | 852 } |
845 | 853 |
846 | 854 |
847 // TODO(lrn): Remove this class. | 855 // TODO(lrn): Remove this class. |
848 /** | 856 /** |
849 * A base class for configuration objects for [TransformStream]. | 857 * A base class for configuration objects for [TransformStream]. |
850 * | 858 * |
851 * A default implementation forwards all incoming events to the output sink. | 859 * A default implementation forwards all incoming events to the output sink. |
(...skipping 20 matching lines...) Expand all Loading... | |
872 sink.signalError(error); | 880 sink.signalError(error); |
873 } | 881 } |
874 | 882 |
875 /** | 883 /** |
876 * Handle an incoming done event. | 884 * Handle an incoming done event. |
877 */ | 885 */ |
878 void handleDone(StreamSink<T> sink) { | 886 void handleDone(StreamSink<T> sink) { |
879 sink.close(); | 887 sink.close(); |
880 } | 888 } |
881 } | 889 } |
OLD | NEW |