Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | |
| 2 // for details. All rights reserved. Use of this source code is governed by a | |
| 3 // BSD-style license that can be found in the LICENSE file. | |
| 4 | |
| 5 library async.stream_completer; | |
| 6 | |
| 7 import "dart:async"; | |
| 8 import "delegating_stream_subscription.dart"; | |
| 9 | |
| 10 /// A single-subscription [stream] where the contents are provided later. | |
| 11 /// | |
| 12 /// It is generally recommended that you never create a `Future<Stream>` | |
| 13 /// because you can just directly create a stream that doesn't do anything | |
| 14 /// until it's ready to do so. | |
| 15 /// This class can be used to create such a stream. | |
| 16 /// | |
| 17 /// The [stream] is a normal stream that you can listen to immediately, | |
| 18 /// but until either [setSourceStream] or [setEmpty] is called, | |
| 19 /// the stream won't produce any events. | |
| 20 /// | |
| 21 /// The same effect can be achieved by using a [StreamController] | |
| 22 /// and adding the stream using `addStream` when both | |
| 23 /// the controller's stream is listened to and the source stream is ready. | |
| 24 /// This class attempts to shortcut some of the overhead when possible. | |
| 25 /// For example, if the [stream] is only listened to | |
| 26 /// after the source stream has been set, | |
| 27 /// the listen is performed directly on the source stream. | |
| 28 class StreamCompleter<T> { | |
| 29 /// The stream of this completer. | |
| 30 /// | |
| 31 /// When a source stream is provided, its events will be forwarded to | |
| 32 /// listeners on this stream. | |
| 33 /// | |
| 34 /// The stream can be listened either before or after a source stream | |
| 35 /// is set. | |
| 36 final Stream<T> stream = new _CompleterStream<T>(); | |
|
nweiz
2015/06/16 01:05:23
I should have mentioned this in the last pass (sor
Lasse Reichstein Nielsen
2015/06/18 12:10:12
Ack. Done.
| |
| 37 | |
| 38 /// Convert a `Future<Stream>` to a `Stream`. | |
| 39 /// | |
| 40 /// This creates a stream using a stream completer, | |
| 41 /// and sets the source stream to the result of the future when the | |
| 42 /// future completes. | |
| 43 /// | |
| 44 /// If the future completes with an error, the returned stream will | |
| 45 /// instead contain just that error. | |
| 46 static Stream fromFuture(Future<Stream> streamFuture) { | |
| 47 var completer = new StreamCompleter(); | |
| 48 streamFuture.then(completer.setSourceStream, | |
| 49 onError: (e, s) { | |
|
nweiz
2015/06/16 01:05:23
"e, s" -> "_"
Lasse Reichstein Nielsen
2015/06/18 12:10:12
Done.
| |
| 50 completer.setSourceStream(streamFuture.asStream()); | |
| 51 }); | |
| 52 return completer.stream; | |
| 53 } | |
| 54 | |
| 55 /// Set a stream as the source of events for the [StreamCompleter]'s | |
| 56 /// [stream]. | |
| 57 /// | |
| 58 /// The completer's `stream` will act exactly as [sourceStream]. | |
| 59 /// | |
| 60 /// If the source stream is set before [stream] is listened to, | |
| 61 /// the listen call on [stream] is forwarded directly to [sourceStream]. | |
| 62 /// | |
| 63 /// If [stream] is listened to before setting the source stream, | |
| 64 /// an intermediate subscription is created. It looks like a completely | |
| 65 /// normal subscription, and can be paused or canceled, but it won't | |
| 66 /// produce any events until a source stream is provided. | |
| 67 /// | |
| 68 /// If the `stream` subscription is canceled before a source stream is set, | |
| 69 /// then nothing further happens. This means that the `sourceStream` may | |
| 70 /// never be listened to, even if the [stream] has been listened to. | |
| 71 /// | |
| 72 /// Otherwise, when the source stream is then set, | |
| 73 /// it is immediately listened to. | |
| 74 /// If the existing subscription was paused, the source stream subscription | |
| 75 /// is paused as many times, and then all events and callbacks are forwarded | |
|
nweiz
2015/06/16 01:05:23
It's kind of strange that this forwards the same *
Lasse Reichstein Nielsen
2015/06/18 12:10:12
If you pause three times, you need to resume three
| |
| 76 /// between the two subscriptions, so the listener works as if it was | |
| 77 /// listening to the source stream subscription directly. | |
| 78 /// | |
| 79 /// Either [setSourceStream] or [setEmpty] may be called at most once. | |
| 80 /// Trying to call either of them again will fail. | |
| 81 void setSourceStream(Stream<T> sourceStream) { | |
| 82 _CompleterStream completerStream = this.stream; | |
| 83 completerStream._linkStream(sourceStream); | |
| 84 } | |
| 85 | |
| 86 /// Equivalent to setting an empty stream using [setSourceStream]. | |
| 87 /// | |
| 88 /// Either [setSourceStream] or [setEmpty] may be called at most once. | |
| 89 /// Trying to call either of them again will fail. | |
| 90 void setEmpty() { | |
| 91 // TODO(lrn): Optimize this to not actually create the empty stream. | |
| 92 _CompleterStream completerStream = this.stream; | |
| 93 completerStream._linkStream(new Stream.fromIterable(const [])); | |
| 94 } | |
| 95 } | |
| 96 | |
| 97 /// Stream that acts as a source stream it is (eventually) linked with. | |
| 98 /// | |
| 99 /// The linked source stream can be set after a user has started listening on | |
| 100 /// this stream. No events occur before the source stream is provided. | |
| 101 /// | |
| 102 /// If a user listens before events are available, the state of the | |
| 103 /// subscription is maintained, and the subscription is then linked | |
| 104 /// to the source stream when that becomes available. | |
| 105 class _CompleterStream<T> extends Stream<T> { | |
| 106 // Bit flags used for the value of [_state]. | |
| 107 /// Flag marking that the source stream has been set. | |
| 108 static const int _streamFlag = 1; | |
| 109 /// Flag marking that the stream has been listened to. | |
|
nweiz
2015/06/16 01:05:23
Nit: separate these with newlines.
Lasse Reichstein Nielsen
2015/06/18 12:10:12
Gone.
| |
| 110 static const int _listenerFlag = 2; | |
| 111 | |
| 112 // States used as values for _state. | |
| 113 /// Initial state with no listener and no source stream set. | |
| 114 static const int _initial = 0; | |
| 115 /// State where only the stream has been set. | |
| 116 static const int _streamOnly = _streamFlag; | |
| 117 /// State where there is only a listener. | |
| 118 static const int _listenerOnly = _listenerFlag; | |
| 119 /// State where source stream and listener have been linked. | |
| 120 static const int _linked = _streamFlag | _listenerFlag; | |
| 121 | |
| 122 /// The current state. | |
| 123 /// | |
| 124 /// One of [_initial], [_streamOnly], [_listenerOnly], | |
| 125 /// or [_linked]. | |
| 126 int _state = _initial; | |
| 127 | |
| 128 /// Shared variable used to store different values depending on the state. | |
| 129 /// | |
| 130 /// In the [_streamOnly] state it contains the source stream. | |
| 131 /// In the [_listenerOnly] state it contains a delegating subscription | |
| 132 /// controller. | |
| 133 /// In the [_linked] and [_initial] states it contains `null`. | |
| 134 /// | |
| 135 /// Do not access this field directly, | |
| 136 /// instead use [_stream] or [_controller] to read the value | |
| 137 /// appropriate for the current state. | |
| 138 var _stateData; | |
| 139 | |
| 140 /// Returns source stream that has been set. | |
| 141 /// | |
| 142 /// Must only be used when the source stream has been set, | |
| 143 /// but the stream has not been listened to yet (state is [_streamOnly]); | |
|
nweiz
2015/06/16 01:05:23
";" -> "."
| |
| 144 Stream get _stream { | |
| 145 assert(_state == _streamOnly); | |
| 146 return _stateData; | |
| 147 } | |
| 148 | |
| 149 /// Returns the mutable subscription controller with the subscription | |
| 150 /// on this stream | |
| 151 /// | |
| 152 /// Must only be used when the stream has been listened to, | |
| 153 /// but the source stream has not been set to yet (state is [_listenerOnly]); | |
|
nweiz
2015/06/16 01:05:23
";" -> "."
Lasse Reichstein Nielsen
2015/06/18 12:10:12
Acknowledged.
| |
| 154 MutableDelegatingStreamSubscriptionController get _controller { | |
| 155 assert(_state == _listenerOnly); | |
| 156 return _stateData; | |
| 157 } | |
| 158 | |
| 159 // State transition functions. | |
| 160 | |
| 161 /// Sets the source stream, and enters the [_streamOnly] state. | |
| 162 /// | |
| 163 /// Must only be called from the initial state, before this stream has | |
| 164 /// been listened to. | |
| 165 void _setStream(Stream stream) { | |
| 166 assert(_state == _initial); | |
| 167 _stateData = stream; | |
| 168 _state = _streamOnly; | |
| 169 } | |
| 170 | |
| 171 /// Sets the listener subscription, and enters the [_listenerOnly] state. | |
| 172 /// | |
| 173 /// Must only be called from the initial state, before the source stream has | |
| 174 /// been set. | |
| 175 void _setListened(MutableDelegatingStreamSubscriptionController subscription) { | |
|
nweiz
2015/06/16 01:05:23
Long line.
Lasse Reichstein Nielsen
2015/06/18 12:10:12
Class really needs a better name.
I think I'll rem
| |
| 176 assert(_state == _initial); | |
| 177 _stateData = subscription; | |
| 178 _state = _listenerOnly; | |
| 179 } | |
| 180 | |
| 181 /// Marks the listener and source stream as linked. | |
| 182 /// | |
| 183 /// Enters the [_isLinked] state. | |
| 184 /// This must be called only from either the [_streamOnly] | |
| 185 /// or the [_listenerOnly] state after setting up the link. | |
| 186 void _setLinked() { | |
| 187 assert(_state == _streamOnly || _state == _listenerOnly); | |
| 188 _state = _linked; | |
| 189 _stateData = null; | |
| 190 } | |
| 191 | |
| 192 // end state transition functions. | |
| 193 | |
| 194 /// Called by the controller when the user supplies a stream | |
| 195 void _linkStream(Stream stream) { | |
| 196 if (_state == _listenerOnly) { | |
| 197 var subscription = _controller.sourceSubscription; | |
| 198 if (subscription is _CompleterSubscriptionState) { | |
| 199 _controller.sourceSubscription = subscription._linkStream(stream); | |
| 200 } else { | |
| 201 assert(subscription is _CanceledSubscription); | |
| 202 } | |
| 203 _setLinked(); | |
| 204 } else if (_state == _initial) { | |
| 205 _setStream(stream); | |
| 206 } else { | |
| 207 throw new StateError("Stream already linked."); | |
| 208 } | |
| 209 } | |
| 210 | |
| 211 StreamSubscription listen(void onData(T event), | |
| 212 {Function onError, | |
| 213 void onDone(), | |
| 214 bool cancelOnError}) { | |
| 215 if (_state == _initial) { | |
| 216 if (cancelOnError == null) cancelOnError = false; | |
| 217 var controller = | |
| 218 new MutableDelegatingStreamSubscriptionController<T>(null); | |
| 219 controller.sourceSubscription = | |
|
nweiz
2015/06/16 01:05:23
Why are you assigning this rather than passing it
Lasse Reichstein Nielsen
2015/06/18 12:10:12
Because of the cyclic dependency, the state object
| |
| 220 new _CompleterSubscriptionState(cancelOnError, controller); | |
| 221 var subscription = controller.subscription; | |
| 222 subscription.onData(onData); | |
| 223 subscription.onError(onError); | |
| 224 subscription.onDone(onDone); | |
| 225 _setListened(controller); | |
| 226 return subscription; | |
| 227 } | |
| 228 if (_state == _streamOnly) { | |
|
nweiz
2015/06/16 01:05:23
Nit: "else if", for compactness and to match other
Lasse Reichstein Nielsen
2015/06/18 12:10:12
Then I think I should also put the final throw in
| |
| 229 var result = _stream.listen(onData, onError: onError, onDone: onDone, | |
| 230 cancelOnError: cancelOnError); | |
| 231 _setLinked(); | |
| 232 return result; | |
| 233 } | |
| 234 throw new StateError("Stream has already been listened to."); | |
| 235 } | |
| 236 } | |
| 237 | |
| 238 /// Holds subscription callbacks and state for a [_CompleterSubscription] | |
| 239 /// until the real subscription is available. | |
| 240 /// | |
| 241 /// Always used in a [MutableDelegatingStreamSubscriptionController]. | |
| 242 class _CompleterSubscriptionState implements StreamSubscription { | |
| 243 /// The mutable subscription wrapper. | |
| 244 /// | |
| 245 /// Used for handling [pause] with resume futures and [asFuture] which | |
| 246 /// both need to call a different function (`resume` and `cancel` | |
| 247 /// respectively) at a later point. We let those go through the | |
| 248 /// wrapper subscription to ensure that they are forwarded to the | |
| 249 /// controller source subscription that is current at that time. | |
| 250 final MutableDelegatingStreamSubscriptionController _controller; | |
| 251 | |
| 252 /// Whether the subscription cancels on error. | |
| 253 /// | |
| 254 /// This is forwarded to the real subscription when that is created. | |
| 255 final bool _cancelOnError; | |
| 256 | |
| 257 // Callbacks forwarded to the real subscription when it's created. | |
| 258 | |
| 259 ZoneUnaryCallback _onData; | |
| 260 Function _onError; | |
| 261 ZoneCallback _onDone; | |
| 262 | |
| 263 /// Future set when cancel is called. | |
| 264 /// This both marks the subscription as canceled and allows returning | |
|
nweiz
2015/06/16 01:05:23
Nit: newline above this.
Lasse Reichstein Nielsen
2015/06/18 12:10:12
Acknowledged.
| |
| 265 /// the same future every time the cancel function is called. | |
| 266 Future cancelFuture; | |
| 267 | |
| 268 /// Count of active pauses. | |
| 269 /// | |
| 270 /// When the real subscription is created, it is paused this many times. | |
| 271 int pauseCount = 0; | |
| 272 | |
| 273 _CompleterSubscriptionState(this._cancelOnError, | |
| 274 this._controller); | |
| 275 | |
| 276 void onData(void handleData(data)) { | |
| 277 _onData = handleData; | |
| 278 } | |
| 279 | |
| 280 void onError(Function handleError) { | |
| 281 _onError = handleError; | |
| 282 } | |
| 283 | |
| 284 void onDone(void handleDone()) { | |
| 285 _onDone = handleDone; | |
| 286 } | |
| 287 | |
| 288 void pause([Future resumeFuture]) { | |
| 289 pauseCount++; | |
| 290 if (resumeFuture != null) { | |
| 291 // Go through wrapper subscription in case the real subscription | |
| 292 // is linked before the future completes. | |
| 293 resumeFuture.whenComplete(_controller.subscription.resume); | |
| 294 } | |
| 295 } | |
| 296 | |
| 297 void resume() { | |
| 298 if (pauseCount > 0) pauseCount--; | |
| 299 } | |
| 300 | |
| 301 Future cancel() { | |
| 302 var cancelFuture = new Future.value(); | |
|
nweiz
2015/06/16 01:05:23
Why return a future here? cancel() is allowed to r
Lasse Reichstein Nielsen
2015/06/18 12:10:12
Yes, sadly. That's a consequence of backwards comp
| |
| 303 // Immediately replace the [_CompleterSubscription._delegate] so | |
| 304 // we won't be called again. | |
| 305 // This also releases any, now unused, callbacks we are holding on to. | |
| 306 _controller.sourceSubscription = new _CanceledSubscription(cancelFuture); | |
| 307 return cancelFuture; | |
| 308 } | |
| 309 | |
| 310 bool get isPaused { | |
| 311 return (cancelFuture != null && pauseCount > 0); | |
| 312 } | |
| 313 | |
| 314 Future asFuture([futureValue]) { | |
| 315 Completer completer = new Completer(); | |
| 316 _onDone = () { | |
| 317 completer.complete(futureValue); | |
| 318 }; | |
| 319 _onError = (error, StackTrace stackTrace) { | |
| 320 // Cancel the wrapper subscription in case the real subscription | |
| 321 // is linked before an error triggers this function. | |
| 322 _controller.subscription.cancel(); | |
| 323 completer.completeError(error, stackTrace); | |
| 324 }; | |
| 325 return completer.future; | |
| 326 } | |
| 327 | |
| 328 StreamSubscription _linkStream(Stream stream) { | |
| 329 if (cancelFuture != null) { | |
| 330 return new _CanceledSubscription(cancelFuture); | |
| 331 } | |
| 332 // If not canceled, create the real subscription and make | |
| 333 // sure it has the requested callbacks, cancelOnErrror flag and | |
| 334 // number of pauses. | |
| 335 var subscription = stream.listen(null, cancelOnError: _cancelOnError); | |
| 336 subscription.onData(_onData); | |
| 337 subscription.onError(_onError); | |
| 338 subscription.onDone(_onDone); | |
| 339 while (pauseCount > 0) { | |
| 340 subscription.pause(); | |
| 341 pauseCount--; | |
| 342 } | |
| 343 return subscription; | |
| 344 } | |
| 345 } | |
| 346 | |
| 347 /// A subscription that acts as if it has been canceled. | |
| 348 /// | |
| 349 /// No events are fired and pausing is ignored. | |
| 350 /// The [cancel] method always returns the same future. | |
| 351 class _CanceledSubscription implements StreamSubscription { | |
| 352 /// The future returned by [cancel]; | |
| 353 Future _doneFuture; | |
| 354 | |
| 355 _CanceledSubscription(this._doneFuture); | |
| 356 | |
| 357 void onData(void handleData(data)) {} | |
| 358 | |
| 359 void onError(Function handleError) {} | |
| 360 | |
| 361 void onDone(void handleDone()) {} | |
| 362 | |
| 363 void pause([Future resumeFuture]) {} | |
| 364 | |
| 365 void resume() {} | |
| 366 | |
| 367 Future cancel() => _doneFuture; | |
| 368 | |
| 369 /// Returns future that never completes. | |
| 370 /// | |
| 371 /// The `asFuture` result is completed by either an error event | |
| 372 /// or a done event. A canceled future never produces either. | |
| 373 Future asFuture([futureValue]) => new Completer().future; | |
| 374 | |
| 375 bool get isPaused => false; | |
| 376 } | |
| OLD | NEW |