Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(126)

Side by Side Diff: packages/async/lib/src/stream_completer.dart

Issue 2989763002: Update charted to 0.4.8 and roll (Closed)
Patch Set: Removed Cutch from list of reviewers Created 3 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 library async.stream_completer;
6
7 import "dart:async"; 5 import "dart:async";
8 6
9 /// A single-subscription [stream] where the contents are provided later. 7 /// A single-subscription [stream] where the contents are provided later.
10 /// 8 ///
11 /// It is generally recommended that you never create a `Future<Stream>` 9 /// It is generally recommended that you never create a `Future<Stream>`
12 /// because you can just directly create a stream that doesn't do anything 10 /// because you can just directly create a stream that doesn't do anything
13 /// until it's ready to do so. 11 /// until it's ready to do so.
14 /// This class can be used to create such a stream. 12 /// This class can be used to create such a stream.
15 /// 13 ///
16 /// The [stream] is a normal stream that you can listen to immediately, 14 /// The [stream] is a normal stream that you can listen to immediately,
17 /// but until either [setSourceStream] or [setEmpty] is called, 15 /// but until either [setSourceStream] or [setEmpty] is called,
18 /// the stream won't produce any events. 16 /// the stream won't produce any events.
19 /// 17 ///
20 /// The same effect can be achieved by using a [StreamController] 18 /// The same effect can be achieved by using a [StreamController]
21 /// and adding the stream using `addStream` when both 19 /// and adding the stream using `addStream` when both
22 /// the controller's stream is listened to and the source stream is ready. 20 /// the controller's stream is listened to and the source stream is ready.
23 /// This class attempts to shortcut some of the overhead when possible. 21 /// This class attempts to shortcut some of the overhead when possible.
24 /// For example, if the [stream] is only listened to 22 /// For example, if the [stream] is only listened to
25 /// after the source stream has been set, 23 /// after the source stream has been set,
26 /// the listen is performed directly on the source stream. 24 /// the listen is performed directly on the source stream.
27 class StreamCompleter<T> { 25 class StreamCompleter<T> {
28 /// The stream doing the actual work, is returned by [stream]. 26 /// The stream doing the actual work, is returned by [stream].
29 final _CompleterStream _stream = new _CompleterStream<T>(); 27 final _stream = new _CompleterStream<T>();
30 28
31 /// Convert a `Future<Stream>` to a `Stream`. 29 /// Convert a `Future<Stream>` to a `Stream`.
32 /// 30 ///
33 /// This creates a stream using a stream completer, 31 /// This creates a stream using a stream completer,
34 /// and sets the source stream to the result of the future when the 32 /// and sets the source stream to the result of the future when the
35 /// future completes. 33 /// future completes.
36 /// 34 ///
37 /// If the future completes with an error, the returned stream will 35 /// If the future completes with an error, the returned stream will
38 /// instead contain just that error. 36 /// instead contain just that error.
39 static Stream fromFuture(Future<Stream> streamFuture) { 37 static Stream<T> fromFuture<T>(Future<Stream<T>> streamFuture) {
40 var completer = new StreamCompleter(); 38 var completer = new StreamCompleter<T>();
41 streamFuture.then(completer.setSourceStream, 39 streamFuture.then(completer.setSourceStream, onError: completer.setError);
42 onError: (e, s) {
43 completer.setSourceStream(streamFuture.asStream());
44 });
45 return completer.stream; 40 return completer.stream;
46 } 41 }
47 42
48 /// The stream of this completer. 43 /// The stream of this completer.
49 /// 44 ///
50 /// This stream is always a single-subscription stream. 45 /// This stream is always a single-subscription stream.
51 /// 46 ///
52 /// When a source stream is provided, its events will be forwarded to 47 /// When a source stream is provided, its events will be forwarded to
53 /// listeners on this stream. 48 /// listeners on this stream.
54 /// 49 ///
(...skipping 14 matching lines...) Expand all
69 /// normal subscription, and can be paused or canceled, but it won't 64 /// normal subscription, and can be paused or canceled, but it won't
70 /// produce any events until a source stream is provided. 65 /// produce any events until a source stream is provided.
71 /// 66 ///
72 /// If the `stream` subscription is canceled before a source stream is set, 67 /// If the `stream` subscription is canceled before a source stream is set,
73 /// the source stream will be listened to and immediately canceled again. 68 /// the source stream will be listened to and immediately canceled again.
74 /// 69 ///
75 /// Otherwise, when the source stream is then set, 70 /// Otherwise, when the source stream is then set,
76 /// it is immediately listened to, and its events are forwarded to the 71 /// it is immediately listened to, and its events are forwarded to the
77 /// existing subscription. 72 /// existing subscription.
78 /// 73 ///
79 /// Either [setSourceStream] or [setEmpty] may be called at most once. 74 /// Any one of [setSourceStream], [setEmpty], and [setError] may be called at
80 /// Trying to call either of them again will fail. 75 /// most once. Trying to call any of them again will fail.
81 void setSourceStream(Stream<T> sourceStream) { 76 void setSourceStream(Stream<T> sourceStream) {
82 if (_stream._isSourceStreamSet) { 77 if (_stream._isSourceStreamSet) {
83 throw new StateError("Source stream already set"); 78 throw new StateError("Source stream already set");
84 } 79 }
85 _stream._setSourceStream(sourceStream); 80 _stream._setSourceStream(sourceStream);
86 } 81 }
87 82
88 /// Equivalent to setting an empty stream using [setSourceStream]. 83 /// Equivalent to setting an empty stream using [setSourceStream].
89 /// 84 ///
90 /// Either [setSourceStream] or [setEmpty] may be called at most once. 85 /// Any one of [setSourceStream], [setEmpty], and [setError] may be called at
91 /// Trying to call either of them again will fail. 86 /// most once. Trying to call any of them again will fail.
92 void setEmpty() { 87 void setEmpty() {
93 if (_stream._isSourceStreamSet) { 88 if (_stream._isSourceStreamSet) {
94 throw new StateError("Source stream already set"); 89 throw new StateError("Source stream already set");
95 } 90 }
96 _stream._setEmpty(); 91 _stream._setEmpty();
97 } 92 }
93
94 /// Completes this to a stream that emits [error] and then closes.
95 ///
96 /// This is useful when the process of creating the data for the stream fails.
97 ///
98 /// Any one of [setSourceStream], [setEmpty], and [setError] may be called at
99 /// most once. Trying to call any of them again will fail.
100 void setError(error, [StackTrace stackTrace]) {
101 setSourceStream(new Stream.fromFuture(new Future.error(error, stackTrace)));
102 }
98 } 103 }
99 104
100 /// Stream completed by [StreamCompleter]. 105 /// Stream completed by [StreamCompleter].
101 class _CompleterStream<T> extends Stream<T> { 106 class _CompleterStream<T> extends Stream<T> {
102 /// Controller for an intermediate stream. 107 /// Controller for an intermediate stream.
103 /// 108 ///
104 /// Created if the user listens on this stream before the source stream 109 /// Created if the user listens on this stream before the source stream
105 /// is set, or if using [_setEmpty] so there is no source stream. 110 /// is set, or if using [_setEmpty] so there is no source stream.
106 StreamController _controller; 111 StreamController<T> _controller;
107 112
108 /// Source stream for the events provided by this stream. 113 /// Source stream for the events provided by this stream.
109 /// 114 ///
110 /// Set when the completer sets the source stream using [_setSourceStream] 115 /// Set when the completer sets the source stream using [_setSourceStream]
111 /// or [_setEmpty]. 116 /// or [_setEmpty].
112 Stream _sourceStream; 117 Stream<T> _sourceStream;
113 118
114 StreamSubscription<T> listen(onData(T data), 119 StreamSubscription<T> listen(onData(T data),
115 {Function onError, 120 {Function onError, void onDone(), bool cancelOnError}) {
116 void onDone(),
117 bool cancelOnError}) {
118 if (_controller == null) { 121 if (_controller == null) {
119 if (_sourceStream != null && !_sourceStream.isBroadcast) { 122 if (_sourceStream != null && !_sourceStream.isBroadcast) {
120 // If the source stream is itself single subscription, 123 // If the source stream is itself single subscription,
121 // just listen to it directly instead of creating a controller. 124 // just listen to it directly instead of creating a controller.
122 return _sourceStream.listen(onData, onError: onError, onDone: onDone, 125 return _sourceStream.listen(onData,
123 cancelOnError: cancelOnError); 126 onError: onError, onDone: onDone, cancelOnError: cancelOnError);
124 } 127 }
125 _createController(); 128 _createController();
126 if (_sourceStream != null) { 129 if (_sourceStream != null) {
127 _linkStreamToController(); 130 _linkStreamToController();
128 } 131 }
129 } 132 }
130 return _controller.stream.listen(onData, onError: onError, onDone: onDone, 133 return _controller.stream.listen(onData,
131 cancelOnError: cancelOnError); 134 onError: onError, onDone: onDone, cancelOnError: cancelOnError);
132 } 135 }
133 136
134 /// Whether a source stream has been set. 137 /// Whether a source stream has been set.
135 /// 138 ///
136 /// Used to throw an error if trying to set a source stream twice. 139 /// Used to throw an error if trying to set a source stream twice.
137 bool get _isSourceStreamSet => _sourceStream != null; 140 bool get _isSourceStreamSet => _sourceStream != null;
138 141
139 /// Sets the source stream providing the events for this stream. 142 /// Sets the source stream providing the events for this stream.
140 /// 143 ///
141 /// If set before the user listens, listen calls will be directed directly 144 /// If set before the user listens, listen calls will be directed directly
142 /// to the source stream. If the user listenes earlier, and intermediate 145 /// to the source stream. If the user listenes earlier, and intermediate
143 /// stream is created using a stream controller, and the source stream is 146 /// stream is created using a stream controller, and the source stream is
144 /// linked into that stream later. 147 /// linked into that stream later.
145 void _setSourceStream(Stream<T> sourceStream) { 148 void _setSourceStream(Stream<T> sourceStream) {
146 assert(_sourceStream == null); 149 assert(_sourceStream == null);
147 _sourceStream = sourceStream; 150 _sourceStream = sourceStream;
148 if (_controller != null) { 151 if (_controller != null) {
149 // User has already listened, so provide the data through controller. 152 // User has already listened, so provide the data through controller.
150 _linkStreamToController(); 153 _linkStreamToController();
151 } 154 }
152 } 155 }
153 156
154 /// Links source stream to controller when both are available. 157 /// Links source stream to controller when both are available.
155 void _linkStreamToController() { 158 void _linkStreamToController() {
156 assert(_controller != null); 159 assert(_controller != null);
157 assert(_sourceStream != null); 160 assert(_sourceStream != null);
158 _controller.addStream(_sourceStream, cancelOnError: false) 161 _controller
159 .whenComplete(_controller.close); 162 .addStream(_sourceStream, cancelOnError: false)
163 .whenComplete(_controller.close);
160 } 164 }
161 165
162 /// Sets an empty source stream. 166 /// Sets an empty source stream.
163 /// 167 ///
164 /// Uses [_controller] for the stream, then closes the controller 168 /// Uses [_controller] for the stream, then closes the controller
165 /// immediately. 169 /// immediately.
166 void _setEmpty() { 170 void _setEmpty() {
167 assert(_sourceStream == null); 171 assert(_sourceStream == null);
168 if (_controller == null) { 172 if (_controller == null) {
169 _createController(); 173 _createController();
170 } 174 }
171 _sourceStream = _controller.stream; // Mark stream as set. 175 _sourceStream = _controller.stream; // Mark stream as set.
172 _controller.close(); 176 _controller.close();
173 } 177 }
174 178
175 // Creates the [_controller]. 179 // Creates the [_controller].
176 void _createController() { 180 void _createController() {
177 assert(_controller == null); 181 assert(_controller == null);
178 _controller = new StreamController<T>(sync: true); 182 _controller = new StreamController<T>(sync: true);
179 } 183 }
180 } 184 }
OLDNEW
« no previous file with comments | « packages/async/lib/src/single_subscription_transformer.dart ('k') | packages/async/lib/src/stream_group.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698