Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(80)

Side by Side Diff: sdk/lib/async/stream.dart

Issue 11886013: Make Stream transformation respect the single/multi subscriber nature of the source. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Added missing isSingleSubscription impl. Created 7 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « no previous file | sdk/lib/async/stream_controller.dart » ('j') | sdk/lib/async/stream_impl.dart » ('J')
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Core Stream types 8 // Core Stream types
9 // ------------------------------------------------------------------- 9 // -------------------------------------------------------------------
10 10
(...skipping 13 matching lines...) Expand all
24 return stream; 24 return stream;
25 } 25 }
26 26
27 /** 27 /**
28 * Creates a single-subscription stream that gets its data from [data]. 28 * Creates a single-subscription stream that gets its data from [data].
29 */ 29 */
30 factory Stream.fromIterable(Iterable<T> data) { 30 factory Stream.fromIterable(Iterable<T> data) {
31 return new _IterableSingleStreamImpl<T>(data); 31 return new _IterableSingleStreamImpl<T>(data);
32 } 32 }
33 33
34 /** Whether the stream is a single-subscription stream. */
35 bool get isSingleSubscription;
floitsch 2013/01/14 16:09:09 different CL, but sometimes we have singleSubscrib
36
34 /** 37 /**
35 * Returns a multi-subscription stream that produces the same events as this. 38 * Returns a multi-subscription stream that produces the same events as this.
36 * 39 *
37 * If this stream is single-subscription, return a new stream that allows 40 * If this stream is single-subscription, return a new stream that allows
38 * multiple subscribers. It will subscribe to this stream when its first 41 * multiple subscribers. It will subscribe to this stream when its first
39 * subscriber is added, and unsubscribe again when the last subscription is 42 * subscriber is added, and unsubscribe again when the last subscription is
40 * cancelled. 43 * cancelled.
41 * 44 *
42 * If this stream is already multi-subscriber, it is returned unmodified. 45 * If this stream is already multi-subscriber, it is returned unmodified.
43 */ 46 */
(...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after
89 void onDone(), 92 void onDone(),
90 bool unsubscribeOnError}); 93 bool unsubscribeOnError});
91 94
92 /** 95 /**
93 * Creates a new stream from this stream that discards some data events. 96 * Creates a new stream from this stream that discards some data events.
94 * 97 *
95 * The new stream sends the same error and done events as this stream, 98 * The new stream sends the same error and done events as this stream,
96 * but it only sends the data events that satisfy the [test]. 99 * but it only sends the data events that satisfy the [test].
97 */ 100 */
98 Stream<T> where(bool test(T event)) { 101 Stream<T> where(bool test(T event)) {
99 return this.transform(new WhereStream<T>(test)); 102 return this.transform(new WhereTransformer<T>(test));
100 } 103 }
101 104
102 /** 105 /**
103 * Create a new stream that converts each element of this stream 106 * Create a new stream that converts each element of this stream
104 * to a new value using the [convert] function. 107 * to a new value using the [convert] function.
105 */ 108 */
106 Stream mappedBy(convert(T event)) { 109 Stream mappedBy(convert(T event)) {
107 return this.transform(new MapStream<T, dynamic>(convert)); 110 return this.transform(new MapTransformer<T, dynamic>(convert));
108 } 111 }
109 112
110 /** 113 /**
111 * Create a wrapper Stream that intercepts some errors from this stream. 114 * Create a wrapper Stream that intercepts some errors from this stream.
112 * 115 *
113 * If this stream sends an error that matches [test], then it is intercepted 116 * If this stream sends an error that matches [test], then it is intercepted
114 * by the [handle] function. 117 * by the [handle] function.
115 * 118 *
116 * An [AsyncError] [:e:] is matched by a test function if [:test(e):] returns 119 * An [AsyncError] [:e:] is matched by a test function if [:test(e):] returns
117 * true. If [test] is omitted, every error is considered matching. 120 * true. If [test] is omitted, every error is considered matching.
118 * 121 *
119 * If the error is intercepted, the [handle] function can decide what to do 122 * If the error is intercepted, the [handle] function can decide what to do
120 * with it. It can throw if it wants to raise a new (or the same) error, 123 * with it. It can throw if it wants to raise a new (or the same) error,
121 * or simply return to make the stream forget the error. 124 * or simply return to make the stream forget the error.
floitsch 2013/01/14 16:09:09 different CL: "if the error needs to be transforme
Lasse Reichstein Nielsen 2013/01/15 07:19:51 Done.
122 */ 125 */
123 Stream<T> handleError(void handle(AsyncError error), { bool test(error) }) { 126 Stream<T> handleError(void handle(AsyncError error), { bool test(error) }) {
124 return this.transform(new HandleErrorStream<T>(handle, test)); 127 return this.transform(new HandleErrorTransformer<T>(handle, test));
125 } 128 }
126 129
127 /** 130 /**
128 * Create a new stream from this stream that converts each element 131 * Create a new stream from this stream that converts each element
129 * into zero or more events. 132 * into zero or more events.
130 * 133 *
131 * Each incoming event is converted to an [Iterable] of new events, 134 * Each incoming event is converted to an [Iterable] of new events,
132 * and each of these new events are then sent by the returned stream 135 * and each of these new events are then sent by the returned stream
133 * in order. 136 * in order.
134 */ 137 */
135 Stream expand(Iterable convert(T value)) { 138 Stream expand(Iterable convert(T value)) {
136 return this.transform(new ExpandStream<T, dynamic>(convert)); 139 return this.transform(
140 new ExpandTransformer<T, dynamic>(convert));
137 } 141 }
138 142
139 /** 143 /**
140 * Bind this stream as the input of the provided [StreamConsumer]. 144 * Bind this stream as the input of the provided [StreamConsumer].
141 */ 145 */
142 Future pipe(StreamConsumer<dynamic, T> streamConsumer) { 146 Future pipe(StreamConsumer<dynamic, T> streamConsumer) {
143 return streamConsumer.consume(this); 147 return streamConsumer.consume(this);
144 } 148 }
145 149
146 /** 150 /**
(...skipping 281 matching lines...) Expand 10 before | Expand all | Expand 10 after
428 /** 432 /**
429 * Provide at most the first [n] values of this stream. 433 * Provide at most the first [n] values of this stream.
430 * 434 *
431 * Forwards the first [n] data events of this stream, and all error 435 * Forwards the first [n] data events of this stream, and all error
432 * events, to the returned stream, and ends with a done event. 436 * events, to the returned stream, and ends with a done event.
433 * 437 *
434 * If this stream produces fewer than [count] values before it's done, 438 * If this stream produces fewer than [count] values before it's done,
435 * so will the returned stream. 439 * so will the returned stream.
436 */ 440 */
437 Stream<T> take(int count) { 441 Stream<T> take(int count) {
438 return this.transform(new TakeStream(count)); 442 return this.transform(new TakeTransformer<T>(count));
439 } 443 }
440 444
441 /** 445 /**
442 * Forwards data events while [test] is successful. 446 * Forwards data events while [test] is successful.
443 * 447 *
444 * The returned stream provides the same events as this stream as long 448 * The returned stream provides the same events as this stream as long
445 * as [test] returns [:true:] for the event data. The stream is done 449 * as [test] returns [:true:] for the event data. The stream is done
446 * when either this stream is done, or when this stream first provides 450 * when either this stream is done, or when this stream first provides
447 * a value that [test] doesn't accept. 451 * a value that [test] doesn't accept.
448 */ 452 */
449 Stream<T> takeWhile(bool test(T value)) { 453 Stream<T> takeWhile(bool test(T value)) {
450 return this.transform(new TakeWhileStream(test)); 454 return this.transform(new TakeWhileTransformer<T>(test));
451 } 455 }
452 456
453 /** 457 /**
454 * Skips the first [count] data events from this stream. 458 * Skips the first [count] data events from this stream.
455 */ 459 */
456 Stream<T> skip(int count) { 460 Stream<T> skip(int count) {
457 return this.transform(new SkipStream(count)); 461 return this.transform(new SkipTransformer<T>(count));
458 } 462 }
459 463
460 /** 464 /**
461 * Skip data events from this stream while they are matched by [test]. 465 * Skip data events from this stream while they are matched by [test].
462 * 466 *
463 * Error and done events are provided by the returned stream unmodified. 467 * Error and done events are provided by the returned stream unmodified.
464 * 468 *
465 * Starting with the first data event where [test] returns true for the 469 * Starting with the first data event where [test] returns true for the
466 * event data, the returned stream will have the same events as this stream. 470 * event data, the returned stream will have the same events as this stream.
467 */ 471 */
468 Stream<T> skipWhile(bool test(T value)) { 472 Stream<T> skipWhile(bool test(T value)) {
469 return this.transform(new SkipWhileStream(test)); 473 return this.transform(new SkipWhileTransformer<T>(test));
470 } 474 }
471 475
472 /** 476 /**
473 * Skip data events if they are equal to the previous data event. 477 * Skip data events if they are equal to the previous data event.
474 * 478 *
475 * The returned stream provides the same events as this stream, except 479 * The returned stream provides the same events as this stream, except
476 * that it never provides two consequtive data events that are equal. 480 * that it never provides two consequtive data events that are equal.
477 * 481 *
478 * Equality is determined by the provided [equals] method. If that is 482 * Equality is determined by the provided [equals] method. If that is
479 * omitted, the '==' operator on the last provided data element is used. 483 * omitted, the '==' operator on the last provided data element is used.
480 */ 484 */
481 Stream<T> distinct([bool equals(T previous, T next)]) { 485 Stream<T> distinct([bool equals(T previous, T next)]) {
482 return this.transform(new DistinctStream(equals)); 486 return this.transform(new DistinctTransformer<T>(equals));
483 } 487 }
484 488
485 /** 489 /**
486 * Returns the first element. 490 * Returns the first element.
487 * 491 *
488 * If [this] is empty throws a [StateError]. Otherwise this method is 492 * If [this] is empty throws a [StateError]. Otherwise this method is
489 * equivalent to [:this.elementAt(0):] 493 * equivalent to [:this.elementAt(0):]
490 */ 494 */
491 Future<T> get first { 495 Future<T> get first {
492 _FutureImpl<T> future = new _FutureImpl(); 496 _FutureImpl<T> future = new _FutureImpl<T>();
493 StreamSubscription subscription; 497 StreamSubscription subscription;
494 subscription = this.listen( 498 subscription = this.listen(
495 (T value) { 499 (T value) {
496 future._setValue(value); 500 future._setValue(value);
497 subscription.cancel(); 501 subscription.cancel();
498 return; 502 return;
499 }, 503 },
500 onError: future._setError, 504 onError: future._setError,
501 onDone: () { 505 onDone: () {
502 future._setError(new AsyncError(new StateError("No elements"))); 506 future._setError(new AsyncError(new StateError("No elements")));
(...skipping 194 matching lines...) Expand 10 before | Expand all | Expand 10 after
697 /** 701 /**
698 * Returns the value of the [index]th data event of this stream. 702 * Returns the value of the [index]th data event of this stream.
699 * 703 *
700 * If an error event occurs, the future will end with this error. 704 * If an error event occurs, the future will end with this error.
701 * 705 *
702 * If this stream provides fewer than [index] elements before closing, 706 * If this stream provides fewer than [index] elements before closing,
703 * an error is reported. 707 * an error is reported.
704 */ 708 */
705 Future<T> elementAt(int index) { 709 Future<T> elementAt(int index) {
706 if (index is! int || index < 0) throw new ArgumentError(index); 710 if (index is! int || index < 0) throw new ArgumentError(index);
707 _FutureImpl<T> future = new _FutureImpl(); 711 _FutureImpl<T> future = new _FutureImpl<T>();
708 StreamSubscription subscription; 712 StreamSubscription subscription;
709 subscription = this.listen( 713 subscription = this.listen(
710 (T value) { 714 (T value) {
711 if (index == 0) { 715 if (index == 0) {
712 future._setValue(value); 716 future._setValue(value);
713 subscription.cancel(); 717 subscription.cancel();
714 return; 718 return;
715 } 719 }
716 index -= 1; 720 index -= 1;
717 }, 721 },
(...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after
779 void signalError(AsyncError errorEvent); 783 void signalError(AsyncError errorEvent);
780 void close(); 784 void close();
781 } 785 }
782 786
783 /** [Stream] wrapper that only exposes the [Stream] interface. */ 787 /** [Stream] wrapper that only exposes the [Stream] interface. */
784 class StreamView<T> extends Stream<T> { 788 class StreamView<T> extends Stream<T> {
785 Stream<T> _stream; 789 Stream<T> _stream;
786 790
787 StreamView(this._stream); 791 StreamView(this._stream);
788 792
793 bool get isSingleSubscription => _stream.isSingleSubscription;
794
789 StreamSubscription<T> listen(void onData(T value), 795 StreamSubscription<T> listen(void onData(T value),
790 { void onError(AsyncError error), 796 { void onError(AsyncError error),
791 void onDone(), 797 void onDone(),
792 bool unsubscribeOnError }) { 798 bool unsubscribeOnError }) {
793 return _stream.listen(onData, onError: onError, onDone: onDone, 799 return _stream.listen(onData, onError: onError, onDone: onDone,
794 unsubscribeOnError: unsubscribeOnError); 800 unsubscribeOnError: unsubscribeOnError);
795 } 801 }
796 } 802 }
797 803
798 /** 804 /**
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after
831 * Create a [StreamTransformer] that delegates events to the given functions. 837 * Create a [StreamTransformer] that delegates events to the given functions.
832 * 838 *
833 * If a parameter is omitted, a default handler is used that forwards the 839 * If a parameter is omitted, a default handler is used that forwards the
834 * event directly to the sink. 840 * event directly to the sink.
835 * 841 *
836 * Pauses on the are forwarded to the input stream as well. 842 * Pauses on the are forwarded to the input stream as well.
837 */ 843 */
838 factory StreamTransformer.from({ 844 factory StreamTransformer.from({
839 void onData(S data, StreamSink<T> sink), 845 void onData(S data, StreamSink<T> sink),
840 void onError(AsyncError error, StreamSink<T> sink), 846 void onError(AsyncError error, StreamSink<T> sink),
841 void onDone(StreamSink<T> sink)}) = _StreamTransformerFunctionWrapper; 847 void onDone(StreamSink<T> sink)}) {
848 return new _StreamTransformerImpl<S, T>(onData, onError, onDone);
849 }
842 850
843 Stream<T> bind(Stream<S> stream); 851 Stream<T> bind(Stream<S> stream);
844 } 852 }
845 853
846 854
847 // TODO(lrn): Remove this class. 855 // TODO(lrn): Remove this class.
848 /** 856 /**
849 * A base class for configuration objects for [TransformStream]. 857 * A base class for configuration objects for [TransformStream].
850 * 858 *
851 * A default implementation forwards all incoming events to the output sink. 859 * A default implementation forwards all incoming events to the output sink.
(...skipping 20 matching lines...) Expand all
872 sink.signalError(error); 880 sink.signalError(error);
873 } 881 }
874 882
875 /** 883 /**
876 * Handle an incoming done event. 884 * Handle an incoming done event.
877 */ 885 */
878 void handleDone(StreamSink<T> sink) { 886 void handleDone(StreamSink<T> sink) {
879 sink.close(); 887 sink.close();
880 } 888 }
881 } 889 }
OLDNEW
« no previous file with comments | « no previous file | sdk/lib/async/stream_controller.dart » ('j') | sdk/lib/async/stream_impl.dart » ('J')

Powered by Google App Engine
This is Rietveld 408576698