Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(3)

Side by Side Diff: lib/src/stream_completer.dart

Issue 1149563010: Add new features to package:async. (Closed) Base URL: https://github.com/dart-lang/async@master
Patch Set: more docs and tests. Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
nweiz 2015/06/16 01:05:23 As part of moving a bunch of existing async utilit
Lasse Reichstein Nielsen 2015/06/16 13:05:45 True. And if I was just using a controller this co
nweiz 2015/06/16 22:34:11 Here's a (rough untested) sketch of what it would
Lasse Reichstein Nielsen 2015/06/17 11:08:29 And, to make it even more obvious that this is a g
nweiz 2015/06/17 22:58:45 Oh dang, I thought I published that CL yesterday,
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 library async.streams.stream_completer;
6
7 import 'dart:async';
8
9 /// A [stream] where the contents aren't known at creation time.
10 ///
11 /// It is generally recommended that you never create a `Future<Stream>`
12 /// because you can just use directly create a stream that doesn't do anything
nweiz 2015/06/12 01:24:23 "use directly" -> "directly"
Lasse Reichstein Nielsen 2015/06/12 13:04:20 Done.
13 /// until it's ready to do so.
14 /// This class can be used to create such a stream.
15 ///
16 /// The [stream] is a normal stream that you can listen to immediately,
17 /// but until [setSourceStream] is called, the stream won't produce
18 /// any events.
19 ///
20 /// The same effect can be achieved by using a [StreamController]
21 /// and adding the stream using `addStream` when both
22 /// the controller's stream is listened to and the source stream is ready.
23 /// This class attempts to shortcut some of the overhead when possible.
24 /// For example, if the [stream] is only listened to
25 /// after the source stream has been set,
26 /// the listen is performed directly on the source stream.
27 class StreamCompleter<T> {
Søren Gjesse 2015/06/11 07:57:06 Maybe add a constructor with an optional onListen
nweiz 2015/06/12 01:24:23 Consider adding a constructor so that the user can
Lasse Reichstein Nielsen 2015/06/12 13:04:19 I think I'd use a completely different class for a
Lasse Reichstein Nielsen 2015/06/12 13:04:20 You are thinking the case where you don't even wan
28 final Stream<T> stream = new _PromiseStream<T>();
nweiz 2015/06/12 01:24:22 Nit: document this
Lasse Reichstein Nielsen 2015/06/12 13:04:21 Done.
29
nweiz 2015/06/12 01:24:23 Suggestion: add "static Stream fromFuture(Future<S
Lasse Reichstein Nielsen 2015/06/12 13:04:21 Good idea. Done.
30 /// Set a stream as the source of events for the [StreamCompleter].
31 ///
32 /// There is no guarantee that the stream will ever be listened to.
nweiz 2015/06/12 01:24:23 It would be nice to go further and fully explain h
Lasse Reichstein Nielsen 2015/06/12 13:04:20 Done.
33 void setSourceStream(Stream<T> stream) {
34 _PromiseStream promiseStream = this.stream;
35 promiseStream._linkStream(stream);
36 }
37
38 /// As setting an empty stream using [setSourceStream].
39 void setEmpty() {
nweiz 2015/06/12 01:24:23 Document that this is mutually exclusive with [set
Lasse Reichstein Nielsen 2015/06/12 13:04:20 Done.
40 // TODO(lrn): Optimize this to not actually create the empty stream.
41 _PromiseStream promiseStream = this.stream;
42 promiseStream._linkStream(new Stream.fromIterable(const []));
43 }
44 }
45
46 /// Stream that acts as a source stream it is (eventually) linked with.
47 ///
48 /// The linked source stream can be set after a user has started listening on
49 /// this stream. No events occur before the source stream is provided.
50 ///
51 /// If a user listens before events are available, the state of the
52 /// subscription is maintained, and the subscription is then linked
53 /// to the source stream when that becomes available.
54 class _PromiseStream<T> extends Stream<T> {
nweiz 2015/06/12 01:24:23 Even though it's not public, it would be nice to h
Lasse Reichstein Nielsen 2015/06/12 13:04:19 True. I renamed the public class at some point, so
55 static const int _UNINITIALIZED = 0;
56 static const int _STREAM_SET = 1;
57 static const int _LISTENED = 2;
nweiz 2015/06/12 01:24:22 Document the semantics of each of these variables.
Lasse Reichstein Nielsen 2015/06/12 13:04:21 Done
58
59 int _state = _UNINITIALIZED;
nweiz 2015/06/12 01:24:23 Document that this is a bitfield. At first glance,
Lasse Reichstein Nielsen 2015/06/12 13:04:20 This is no longer used as a bit field. I have cons
60 var _streamOrSubscription;
nweiz 2015/06/12 01:24:22 This really seems like it should be two separate f
Lasse Reichstein Nielsen 2015/06/12 13:04:20 I think the way I have (now) encapsulated it shoul
nweiz 2015/06/16 01:05:23 It's definitely better, but it's still a lot of ex
Lasse Reichstein Nielsen 2015/06/16 13:05:44 True. I think I'll try rewriting it without the st
61
62 void _linkStream(Stream stream) {
nweiz 2015/06/12 01:24:23 It would be really nice to have all these private
Lasse Reichstein Nielsen 2015/06/12 13:04:20 Done.
63 if (_state == _UNINITIALIZED) {
64 _streamOrSubscription = stream;
65 _state |= _STREAM_SET;
66 } else if (_state == _LISTENED) {
67 _PromiseSubscription promiseSubscription = _streamOrSubscription;
68 promiseSubscription._linkStream(stream);
69 _state |= _STREAM_SET;
70 } else {
71 throw new StateError("Stream already linked.");
72 }
73 }
74
75 StreamSubscription listen(void onData(T event),
76 {Function onError,
77 void onDone(),
78 bool cancelOnError}) {
79 int state = _state;
nweiz 2015/06/12 01:24:22 Nit: don't type-annotate local variables. There ar
Lasse Reichstein Nielsen 2015/06/12 13:04:21 Yeah, about that. I try to use more "var", but I
nweiz 2015/06/16 01:05:22 As Bob likes to say, code is for people to read it
Lasse Reichstein Nielsen 2015/06/16 13:05:45 True. It's mainly for myself. I like to see the t
80 _state = state | _LISTENED;
81 if (state == _UNINITIALIZED) {
Søren Gjesse 2015/06/11 07:57:05 Why not just check (_state & _STREAM_SET) == 0? Th
Lasse Reichstein Nielsen 2015/06/12 13:04:19 I still need to read the state before setting it,
82 _PromiseSubscription subscription =
83 new _PromiseSubscription<T>(true == cancelOnError);
nweiz 2015/06/12 01:24:23 "true == cancelOnError" is really hard to read. It
Lasse Reichstein Nielsen 2015/06/12 13:04:20 Now forwards the parameter all the way to the fina
84 subscription.onData(onData);
85 subscription.onError(onError);
86 subscription.onDone(onDone);
87 _streamOrSubscription = subscription;
88 return subscription;
89 }
90 if (state == _STREAM_SET) {
91 Stream stream = _streamOrSubscription;
92 return stream.listen(onData, onError: onError, onDone: onDone,
93 cancelOnError: cancelOnError);
94 }
95 print(state);
Søren Gjesse 2015/06/11 07:57:06 Debug print.
Lasse Reichstein Nielsen 2015/06/12 13:04:21 Acknowledged.
96 throw new StateError("Stream has already been listened to.");
97 }
98 }
99
100 /// Subscription for a [_PromiseStream] that is listened to but has no data.
nweiz 2015/06/12 01:24:23 "has no data" is a little misleading, since this w
Lasse Reichstein Nielsen 2015/06/12 13:04:20 I have reworded it. It's a little constrained by w
101 ///
102 /// Maintains the state of a stream subscription that hasn't received any
103 /// events until events are available, then it starts forwarding to another
nweiz 2015/06/12 01:24:22 "events are available" has the same issue as above
Lasse Reichstein Nielsen 2015/06/12 13:04:20 reworded.
104 /// subscription.
105 class _PromiseSubscription<T> implements StreamSubscription<T> {
106 // State values
107 // The subscription is in one of three distinct states:
108 // - Initial (remembers whether it's cancelOnError and paused).
109 // - Cancelled (before being linked).
110 // - Linked (before being cancelled).
111 static const int _INIT = 0;
112 static const int _INIT_CANCEL_ON_ERROR = 1;
113 static const int _CANCELLED = 2; // Exclusive.
114 static const int _LINKED = 3; // Exclusive
115 static const int _PAUSE = 4; // Used with _INIT or _INIT_CANCEL_ON_ERROR.
nweiz 2015/06/12 01:24:23 Similarly to the above, it would be a lot clearer
Lasse Reichstein Nielsen 2015/06/12 13:04:20 I have moved the entire state into a separate obje
116
117 /// State represents the status of the subscription until the
118 /// real subscription becomes available.
119 ///
120 /// While `_state` is not `_LINKED` or `_CANCELED`, `_stateData` contains
121 /// a list of length three with the data, error and done handlers that
122 /// have been set.
123 ///
124 /// While `_state` is [_LINKED], [_stateData] contains the real
125 /// stream subscription.
126 ///
127 /// When `_state` is `_CANCELED`, `_stateData` is cleared since the
128 /// event handlers won't be needed anyway.
129 int _state;
130 var _stateData = new List(3);
Søren Gjesse 2015/06/11 07:57:05 Why use a list with three members instead of three
nweiz 2015/06/12 01:24:23 +1
Lasse Reichstein Nielsen 2015/06/12 13:04:20 Space! Saving of! I started writing it as a real o
131
132 _PromiseSubscription(bool cancelOnError)
133 : _state = (cancelOnError ? _INIT_CANCEL_ON_ERROR : _INIT);
134
135 bool get _isLinked => _state == _LINKED;
Søren Gjesse 2015/06/11 07:57:06 If you relay want to use a bit-field why not ave g
Lasse Reichstein Nielsen 2015/06/12 13:04:21 It's not always a bit field. It's really a combo:
136 bool get _isCancelled => _state == _CANCELLED;
137 bool get _isInitial => (_state & (_PAUSE - 1)) <= _INIT_CANCEL_ON_ERROR;
138
139 void _linkStream(Stream stream) {
140 if (_isLinked) {
141 throw new StateError("Already linked to a stream.");
142 }
143 if (_isCancelled) {
Søren Gjesse 2015/06/11 07:57:06 Shouldn't we listen on and cancel the stream in th
Lasse Reichstein Nielsen 2015/06/12 13:04:21 Probably not. For one thing, it's unnecessary ove
nweiz 2015/06/16 01:05:23 I agree with Søren that the default should be to s
Lasse Reichstein Nielsen 2015/06/16 13:05:45 Ok. The only problem with that is that listen+canc
144 return;
145 }
146 bool cancelOnError = (_state & _INIT_CANCEL_ON_ERROR) != 0;
147 StreamSubscription subscription =
148 stream.listen(null, cancelOnError: cancelOnError);
149 List handlers = _stateData;
Søren Gjesse 2015/06/11 07:57:05 Why this 'cast' - documentation?
Lasse Reichstein Nielsen 2015/06/12 13:04:21 Because I like it. It's not obvious what the type
150 subscription.onData(handlers[0]);
151 subscription.onError(handlers[1]);
152 subscription.onDone(handlers[2]);
153 int state = _state;
154 _subscription = subscription;
155 while (state >= _PAUSE) {
156 subscription.pause();
157 state -= _PAUSE;
nweiz 2015/06/12 01:24:22 I have no idea what's going on here :-/. If you're
Lasse Reichstein Nielsen 2015/06/12 13:04:20 Rewritten completely (twice), and there is no fanc
158 }
159 }
160
161 List get _handlers {
162 assert(_isInitial);
163 return _stateData;
164 }
165
166 StreamSubscription get _subscription {
167 assert(_isLinked);
168 return _stateData;
169 }
170
171 // Sets state to linked.
172 void set _subscription(StreamSubscription subscription) {
nweiz 2015/06/12 01:24:22 Since this is only called once, I think it would b
Lasse Reichstein Nielsen 2015/06/12 13:04:21 Now a function named setListened that does the sta
173 assert(_isInitial);
174 _stateData = subscription;
175 _state = _LINKED;
176 }
177
178 void onData(void handleData(T data)) {
179 if (_isLinked) {
180 _subscription.onData(handleData);
181 } else {
182 assert(_isInitial);
183 _handlers[0] = handleData;
184 }
185 }
186
187 void onError(void handleError(error, StackTrace stackTrace)) {
188 if (_isLinked) {
189 _subscription.onError(handleError);
190 } else {
191 assert(_isInitial);
192 _handlers[1] = handleError;
193 }
194 }
195
196 void onDone(void handleDone()) {
197 if (_isLinked) {
198 _subscription.onDone(handleDone);
199 } else {
200 assert(_isInitial);
201 _handlers[2] = handleDone;
202 }
203 }
204
205 void pause([Future resumeFuture]) {
206 if (_isLinked) {
207 _subscription.pause(resumeFuture);
208 } else if (!_isCancelled) {
Søren Gjesse 2015/06/11 07:57:05 assert that it is not already paused (or use |).
Lasse Reichstein Nielsen 2015/06/12 13:04:20 Pauses are cumulative, it can be paused more than
209 _state += _PAUSE;
210 if (resumeFuture != null) {
211 resumeFuture.whenComplete(this.resume);
212 }
213 }
214 }
215
216 void resume() {
217 if (_isLinked) {
218 _subscription.resume();
219 } else if (_state >= _PAUSE) {
Søren Gjesse 2015/06/11 07:57:05 Just check the bit instead of using >=.
Lasse Reichstein Nielsen 2015/06/12 13:04:20 It can be paused more than once, so the bit will o
220 _state -= _PAUSE;
221 }
222 }
223
224 Future cancel() {
Søren Gjesse 2015/06/11 07:57:06 What is the semantics of calling cancel several ti
Lasse Reichstein Nielsen 2015/06/12 13:04:20 I'm not sure it's specified, but that's what we us
225 if (_isLinked) {
226 return _subscription.cancel();
227 } else {
228 _stateData = null;
229 _state = _CANCELLED;
230 return new Future.value();
231 }
232 }
233
234 Future asFuture([futureValue]) {
235 if (_isLinked) {
236 return _subscription.asFuture(futureValue);
237 }
238 Completer completer = new Completer();
239 if (!_isCancelled) {
nweiz 2015/06/12 01:24:23 Nit: Short-circuit if it is cancelled to save some
Lasse Reichstein Nielsen 2015/06/12 13:04:20 Rewritten.
240 // Asking for a future of a cancelled subscription gives a future
241 // which never completes.
242 _handlers[1] = _cancelBeforeError(completer.completeError);
243 if (futureValue == null) {
244 _handlers[2] = completer.complete;
245 } else {
246 _handlers[2] = () { completer.complete(futureValue); };
nweiz 2015/06/12 01:24:23 Nit: Either use "=>" or make this multiple lines.
Lasse Reichstein Nielsen 2015/06/12 13:04:21 I skipped the special casing of null completely (a
247 }
248 }
249 return completer.future;
250 }
251
252 bool get isPaused {
253 if (_isLinked) {
254 return _subscription.isPaused;
255 }
256 return _state >= _PAUSE;
257 }
258
259 /// Helper function used by [asFuture].
260 ///
261 /// Returns an error handler which cancels the stream when it receives an
262 /// error.
263 Function _cancelBeforeError(Function handleError) {
264 return (e, s) {
nweiz 2015/06/12 01:24:23 Nit: use full words for variables.
Lasse Reichstein Nielsen 2015/06/12 13:04:21 Done.
265 cancel();
266 if (handleError is _BinaryCallback) {
267 handleError(e, s);
268 } else {
269 handleError(e);
270 }
271 };
272 }
273 }
274
275 typedef _BinaryCallback(a, b);
Søren Gjesse 2015/06/11 07:57:06 We don't have this typedef publid in and dart: lib
Lasse Reichstein Nielsen 2015/06/12 13:04:19 We do: ZoneBinaryCallback in dart:async. Also not
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698