Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | |
|
nweiz
2015/06/16 01:05:23
As part of moving a bunch of existing async utilit
Lasse Reichstein Nielsen
2015/06/16 13:05:45
True. And if I was just using a controller this co
nweiz
2015/06/16 22:34:11
Here's a (rough untested) sketch of what it would
Lasse Reichstein Nielsen
2015/06/17 11:08:29
And, to make it even more obvious that this is a g
nweiz
2015/06/17 22:58:45
Oh dang, I thought I published that CL yesterday,
| |
| 2 // for details. All rights reserved. Use of this source code is governed by a | |
| 3 // BSD-style license that can be found in the LICENSE file. | |
| 4 | |
| 5 library async.streams.stream_completer; | |
| 6 | |
| 7 import 'dart:async'; | |
| 8 | |
| 9 /// A [stream] where the contents aren't known at creation time. | |
| 10 /// | |
| 11 /// It is generally recommended that you never create a `Future<Stream>` | |
| 12 /// because you can just use directly create a stream that doesn't do anything | |
|
nweiz
2015/06/12 01:24:23
"use directly" -> "directly"
Lasse Reichstein Nielsen
2015/06/12 13:04:20
Done.
| |
| 13 /// until it's ready to do so. | |
| 14 /// This class can be used to create such a stream. | |
| 15 /// | |
| 16 /// The [stream] is a normal stream that you can listen to immediately, | |
| 17 /// but until [setSourceStream] is called, the stream won't produce | |
| 18 /// any events. | |
| 19 /// | |
| 20 /// The same effect can be achieved by using a [StreamController] | |
| 21 /// and adding the stream using `addStream` when both | |
| 22 /// the controller's stream is listened to and the source stream is ready. | |
| 23 /// This class attempts to shortcut some of the overhead when possible. | |
| 24 /// For example, if the [stream] is only listened to | |
| 25 /// after the source stream has been set, | |
| 26 /// the listen is performed directly on the source stream. | |
| 27 class StreamCompleter<T> { | |
|
Søren Gjesse
2015/06/11 07:57:06
Maybe add a constructor with an optional onListen
nweiz
2015/06/12 01:24:23
Consider adding a constructor so that the user can
Lasse Reichstein Nielsen
2015/06/12 13:04:19
I think I'd use a completely different class for a
Lasse Reichstein Nielsen
2015/06/12 13:04:20
You are thinking the case where you don't even wan
| |
| 28 final Stream<T> stream = new _PromiseStream<T>(); | |
|
nweiz
2015/06/12 01:24:22
Nit: document this
Lasse Reichstein Nielsen
2015/06/12 13:04:21
Done.
| |
| 29 | |
|
nweiz
2015/06/12 01:24:23
Suggestion: add "static Stream fromFuture(Future<S
Lasse Reichstein Nielsen
2015/06/12 13:04:21
Good idea. Done.
| |
| 30 /// Set a stream as the source of events for the [StreamCompleter]. | |
| 31 /// | |
| 32 /// There is no guarantee that the stream will ever be listened to. | |
|
nweiz
2015/06/12 01:24:23
It would be nice to go further and fully explain h
Lasse Reichstein Nielsen
2015/06/12 13:04:20
Done.
| |
| 33 void setSourceStream(Stream<T> stream) { | |
| 34 _PromiseStream promiseStream = this.stream; | |
| 35 promiseStream._linkStream(stream); | |
| 36 } | |
| 37 | |
| 38 /// As setting an empty stream using [setSourceStream]. | |
| 39 void setEmpty() { | |
|
nweiz
2015/06/12 01:24:23
Document that this is mutually exclusive with [set
Lasse Reichstein Nielsen
2015/06/12 13:04:20
Done.
| |
| 40 // TODO(lrn): Optimize this to not actually create the empty stream. | |
| 41 _PromiseStream promiseStream = this.stream; | |
| 42 promiseStream._linkStream(new Stream.fromIterable(const [])); | |
| 43 } | |
| 44 } | |
| 45 | |
| 46 /// Stream that acts as a source stream it is (eventually) linked with. | |
| 47 /// | |
| 48 /// The linked source stream can be set after a user has started listening on | |
| 49 /// this stream. No events occur before the source stream is provided. | |
| 50 /// | |
| 51 /// If a user listens before events are available, the state of the | |
| 52 /// subscription is maintained, and the subscription is then linked | |
| 53 /// to the source stream when that becomes available. | |
| 54 class _PromiseStream<T> extends Stream<T> { | |
|
nweiz
2015/06/12 01:24:23
Even though it's not public, it would be nice to h
Lasse Reichstein Nielsen
2015/06/12 13:04:19
True. I renamed the public class at some point, so
| |
| 55 static const int _UNINITIALIZED = 0; | |
| 56 static const int _STREAM_SET = 1; | |
| 57 static const int _LISTENED = 2; | |
|
nweiz
2015/06/12 01:24:22
Document the semantics of each of these variables.
Lasse Reichstein Nielsen
2015/06/12 13:04:21
Done
| |
| 58 | |
| 59 int _state = _UNINITIALIZED; | |
|
nweiz
2015/06/12 01:24:23
Document that this is a bitfield. At first glance,
Lasse Reichstein Nielsen
2015/06/12 13:04:20
This is no longer used as a bit field. I have cons
| |
| 60 var _streamOrSubscription; | |
|
nweiz
2015/06/12 01:24:22
This really seems like it should be two separate f
Lasse Reichstein Nielsen
2015/06/12 13:04:20
I think the way I have (now) encapsulated it shoul
nweiz
2015/06/16 01:05:23
It's definitely better, but it's still a lot of ex
Lasse Reichstein Nielsen
2015/06/16 13:05:44
True. I think I'll try rewriting it without the st
| |
| 61 | |
| 62 void _linkStream(Stream stream) { | |
|
nweiz
2015/06/12 01:24:23
It would be really nice to have all these private
Lasse Reichstein Nielsen
2015/06/12 13:04:20
Done.
| |
| 63 if (_state == _UNINITIALIZED) { | |
| 64 _streamOrSubscription = stream; | |
| 65 _state |= _STREAM_SET; | |
| 66 } else if (_state == _LISTENED) { | |
| 67 _PromiseSubscription promiseSubscription = _streamOrSubscription; | |
| 68 promiseSubscription._linkStream(stream); | |
| 69 _state |= _STREAM_SET; | |
| 70 } else { | |
| 71 throw new StateError("Stream already linked."); | |
| 72 } | |
| 73 } | |
| 74 | |
| 75 StreamSubscription listen(void onData(T event), | |
| 76 {Function onError, | |
| 77 void onDone(), | |
| 78 bool cancelOnError}) { | |
| 79 int state = _state; | |
|
nweiz
2015/06/12 01:24:22
Nit: don't type-annotate local variables. There ar
Lasse Reichstein Nielsen
2015/06/12 13:04:21
Yeah, about that.
I try to use more "var", but I
nweiz
2015/06/16 01:05:22
As Bob likes to say, code is for people to read it
Lasse Reichstein Nielsen
2015/06/16 13:05:45
True.
It's mainly for myself. I like to see the t
| |
| 80 _state = state | _LISTENED; | |
| 81 if (state == _UNINITIALIZED) { | |
|
Søren Gjesse
2015/06/11 07:57:05
Why not just check (_state & _STREAM_SET) == 0? Th
Lasse Reichstein Nielsen
2015/06/12 13:04:19
I still need to read the state before setting it,
| |
| 82 _PromiseSubscription subscription = | |
| 83 new _PromiseSubscription<T>(true == cancelOnError); | |
|
nweiz
2015/06/12 01:24:23
"true == cancelOnError" is really hard to read. It
Lasse Reichstein Nielsen
2015/06/12 13:04:20
Now forwards the parameter all the way to the fina
| |
| 84 subscription.onData(onData); | |
| 85 subscription.onError(onError); | |
| 86 subscription.onDone(onDone); | |
| 87 _streamOrSubscription = subscription; | |
| 88 return subscription; | |
| 89 } | |
| 90 if (state == _STREAM_SET) { | |
| 91 Stream stream = _streamOrSubscription; | |
| 92 return stream.listen(onData, onError: onError, onDone: onDone, | |
| 93 cancelOnError: cancelOnError); | |
| 94 } | |
| 95 print(state); | |
|
Søren Gjesse
2015/06/11 07:57:06
Debug print.
Lasse Reichstein Nielsen
2015/06/12 13:04:21
Acknowledged.
| |
| 96 throw new StateError("Stream has already been listened to."); | |
| 97 } | |
| 98 } | |
| 99 | |
| 100 /// Subscription for a [_PromiseStream] that is listened to but has no data. | |
|
nweiz
2015/06/12 01:24:23
"has no data" is a little misleading, since this w
Lasse Reichstein Nielsen
2015/06/12 13:04:20
I have reworded it. It's a little constrained by w
| |
| 101 /// | |
| 102 /// Maintains the state of a stream subscription that hasn't received any | |
| 103 /// events until events are available, then it starts forwarding to another | |
|
nweiz
2015/06/12 01:24:22
"events are available" has the same issue as above
Lasse Reichstein Nielsen
2015/06/12 13:04:20
reworded.
| |
| 104 /// subscription. | |
| 105 class _PromiseSubscription<T> implements StreamSubscription<T> { | |
| 106 // State values | |
| 107 // The subscription is in one of three distinct states: | |
| 108 // - Initial (remembers whether it's cancelOnError and paused). | |
| 109 // - Cancelled (before being linked). | |
| 110 // - Linked (before being cancelled). | |
| 111 static const int _INIT = 0; | |
| 112 static const int _INIT_CANCEL_ON_ERROR = 1; | |
| 113 static const int _CANCELLED = 2; // Exclusive. | |
| 114 static const int _LINKED = 3; // Exclusive | |
| 115 static const int _PAUSE = 4; // Used with _INIT or _INIT_CANCEL_ON_ERROR. | |
|
nweiz
2015/06/12 01:24:23
Similarly to the above, it would be a lot clearer
Lasse Reichstein Nielsen
2015/06/12 13:04:20
I have moved the entire state into a separate obje
| |
| 116 | |
| 117 /// State represents the status of the subscription until the | |
| 118 /// real subscription becomes available. | |
| 119 /// | |
| 120 /// While `_state` is not `_LINKED` or `_CANCELED`, `_stateData` contains | |
| 121 /// a list of length three with the data, error and done handlers that | |
| 122 /// have been set. | |
| 123 /// | |
| 124 /// While `_state` is [_LINKED], [_stateData] contains the real | |
| 125 /// stream subscription. | |
| 126 /// | |
| 127 /// When `_state` is `_CANCELED`, `_stateData` is cleared since the | |
| 128 /// event handlers won't be needed anyway. | |
| 129 int _state; | |
| 130 var _stateData = new List(3); | |
|
Søren Gjesse
2015/06/11 07:57:05
Why use a list with three members instead of three
nweiz
2015/06/12 01:24:23
+1
Lasse Reichstein Nielsen
2015/06/12 13:04:20
Space! Saving of!
I started writing it as a real o
| |
| 131 | |
| 132 _PromiseSubscription(bool cancelOnError) | |
| 133 : _state = (cancelOnError ? _INIT_CANCEL_ON_ERROR : _INIT); | |
| 134 | |
| 135 bool get _isLinked => _state == _LINKED; | |
|
Søren Gjesse
2015/06/11 07:57:06
If you relay want to use a bit-field why not ave g
Lasse Reichstein Nielsen
2015/06/12 13:04:21
It's not always a bit field.
It's really a combo:
| |
| 136 bool get _isCancelled => _state == _CANCELLED; | |
| 137 bool get _isInitial => (_state & (_PAUSE - 1)) <= _INIT_CANCEL_ON_ERROR; | |
| 138 | |
| 139 void _linkStream(Stream stream) { | |
| 140 if (_isLinked) { | |
| 141 throw new StateError("Already linked to a stream."); | |
| 142 } | |
| 143 if (_isCancelled) { | |
|
Søren Gjesse
2015/06/11 07:57:06
Shouldn't we listen on and cancel the stream in th
Lasse Reichstein Nielsen
2015/06/12 13:04:21
Probably not.
For one thing, it's unnecessary ove
nweiz
2015/06/16 01:05:23
I agree with Søren that the default should be to s
Lasse Reichstein Nielsen
2015/06/16 13:05:45
Ok. The only problem with that is that listen+canc
| |
| 144 return; | |
| 145 } | |
| 146 bool cancelOnError = (_state & _INIT_CANCEL_ON_ERROR) != 0; | |
| 147 StreamSubscription subscription = | |
| 148 stream.listen(null, cancelOnError: cancelOnError); | |
| 149 List handlers = _stateData; | |
|
Søren Gjesse
2015/06/11 07:57:05
Why this 'cast' - documentation?
Lasse Reichstein Nielsen
2015/06/12 13:04:21
Because I like it. It's not obvious what the type
| |
| 150 subscription.onData(handlers[0]); | |
| 151 subscription.onError(handlers[1]); | |
| 152 subscription.onDone(handlers[2]); | |
| 153 int state = _state; | |
| 154 _subscription = subscription; | |
| 155 while (state >= _PAUSE) { | |
| 156 subscription.pause(); | |
| 157 state -= _PAUSE; | |
|
nweiz
2015/06/12 01:24:22
I have no idea what's going on here :-/. If you're
Lasse Reichstein Nielsen
2015/06/12 13:04:20
Rewritten completely (twice), and there is no fanc
| |
| 158 } | |
| 159 } | |
| 160 | |
| 161 List get _handlers { | |
| 162 assert(_isInitial); | |
| 163 return _stateData; | |
| 164 } | |
| 165 | |
| 166 StreamSubscription get _subscription { | |
| 167 assert(_isLinked); | |
| 168 return _stateData; | |
| 169 } | |
| 170 | |
| 171 // Sets state to linked. | |
| 172 void set _subscription(StreamSubscription subscription) { | |
|
nweiz
2015/06/12 01:24:22
Since this is only called once, I think it would b
Lasse Reichstein Nielsen
2015/06/12 13:04:21
Now a function named setListened that does the sta
| |
| 173 assert(_isInitial); | |
| 174 _stateData = subscription; | |
| 175 _state = _LINKED; | |
| 176 } | |
| 177 | |
| 178 void onData(void handleData(T data)) { | |
| 179 if (_isLinked) { | |
| 180 _subscription.onData(handleData); | |
| 181 } else { | |
| 182 assert(_isInitial); | |
| 183 _handlers[0] = handleData; | |
| 184 } | |
| 185 } | |
| 186 | |
| 187 void onError(void handleError(error, StackTrace stackTrace)) { | |
| 188 if (_isLinked) { | |
| 189 _subscription.onError(handleError); | |
| 190 } else { | |
| 191 assert(_isInitial); | |
| 192 _handlers[1] = handleError; | |
| 193 } | |
| 194 } | |
| 195 | |
| 196 void onDone(void handleDone()) { | |
| 197 if (_isLinked) { | |
| 198 _subscription.onDone(handleDone); | |
| 199 } else { | |
| 200 assert(_isInitial); | |
| 201 _handlers[2] = handleDone; | |
| 202 } | |
| 203 } | |
| 204 | |
| 205 void pause([Future resumeFuture]) { | |
| 206 if (_isLinked) { | |
| 207 _subscription.pause(resumeFuture); | |
| 208 } else if (!_isCancelled) { | |
|
Søren Gjesse
2015/06/11 07:57:05
assert that it is not already paused (or use |).
Lasse Reichstein Nielsen
2015/06/12 13:04:20
Pauses are cumulative, it can be paused more than
| |
| 209 _state += _PAUSE; | |
| 210 if (resumeFuture != null) { | |
| 211 resumeFuture.whenComplete(this.resume); | |
| 212 } | |
| 213 } | |
| 214 } | |
| 215 | |
| 216 void resume() { | |
| 217 if (_isLinked) { | |
| 218 _subscription.resume(); | |
| 219 } else if (_state >= _PAUSE) { | |
|
Søren Gjesse
2015/06/11 07:57:05
Just check the bit instead of using >=.
Lasse Reichstein Nielsen
2015/06/12 13:04:20
It can be paused more than once, so the bit will o
| |
| 220 _state -= _PAUSE; | |
| 221 } | |
| 222 } | |
| 223 | |
| 224 Future cancel() { | |
|
Søren Gjesse
2015/06/11 07:57:06
What is the semantics of calling cancel several ti
Lasse Reichstein Nielsen
2015/06/12 13:04:20
I'm not sure it's specified, but that's what we us
| |
| 225 if (_isLinked) { | |
| 226 return _subscription.cancel(); | |
| 227 } else { | |
| 228 _stateData = null; | |
| 229 _state = _CANCELLED; | |
| 230 return new Future.value(); | |
| 231 } | |
| 232 } | |
| 233 | |
| 234 Future asFuture([futureValue]) { | |
| 235 if (_isLinked) { | |
| 236 return _subscription.asFuture(futureValue); | |
| 237 } | |
| 238 Completer completer = new Completer(); | |
| 239 if (!_isCancelled) { | |
|
nweiz
2015/06/12 01:24:23
Nit: Short-circuit if it is cancelled to save some
Lasse Reichstein Nielsen
2015/06/12 13:04:20
Rewritten.
| |
| 240 // Asking for a future of a cancelled subscription gives a future | |
| 241 // which never completes. | |
| 242 _handlers[1] = _cancelBeforeError(completer.completeError); | |
| 243 if (futureValue == null) { | |
| 244 _handlers[2] = completer.complete; | |
| 245 } else { | |
| 246 _handlers[2] = () { completer.complete(futureValue); }; | |
|
nweiz
2015/06/12 01:24:23
Nit: Either use "=>" or make this multiple lines.
Lasse Reichstein Nielsen
2015/06/12 13:04:21
I skipped the special casing of null completely (a
| |
| 247 } | |
| 248 } | |
| 249 return completer.future; | |
| 250 } | |
| 251 | |
| 252 bool get isPaused { | |
| 253 if (_isLinked) { | |
| 254 return _subscription.isPaused; | |
| 255 } | |
| 256 return _state >= _PAUSE; | |
| 257 } | |
| 258 | |
| 259 /// Helper function used by [asFuture]. | |
| 260 /// | |
| 261 /// Returns an error handler which cancels the stream when it receives an | |
| 262 /// error. | |
| 263 Function _cancelBeforeError(Function handleError) { | |
| 264 return (e, s) { | |
|
nweiz
2015/06/12 01:24:23
Nit: use full words for variables.
Lasse Reichstein Nielsen
2015/06/12 13:04:21
Done.
| |
| 265 cancel(); | |
| 266 if (handleError is _BinaryCallback) { | |
| 267 handleError(e, s); | |
| 268 } else { | |
| 269 handleError(e); | |
| 270 } | |
| 271 }; | |
| 272 } | |
| 273 } | |
| 274 | |
| 275 typedef _BinaryCallback(a, b); | |
|
Søren Gjesse
2015/06/11 07:57:06
We don't have this typedef publid in and dart: lib
Lasse Reichstein Nielsen
2015/06/12 13:04:19
We do: ZoneBinaryCallback in dart:async.
Also not
| |
| OLD | NEW |