| OLD | NEW |
| (Empty) |
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | |
| 2 // for details. All rights reserved. Use of this source code is governed by a | |
| 3 // BSD-style license that can be found in the LICENSE file. | |
| 4 | |
| 5 part of dart.async; | |
| 6 | |
| 7 // ------------------------------------------------------------------- | |
| 8 // Controller for creating and adding events to a stream. | |
| 9 // ------------------------------------------------------------------- | |
| 10 | |
| 11 /** | |
| 12 * Type of a stream controller's `onListen`, `onPause` and `onResume` callbacks. | |
| 13 */ | |
| 14 typedef void ControllerCallback(); | |
| 15 | |
| 16 /** | |
| 17 * Type of stream controller `onCancel` callbacks. | |
| 18 * | |
| 19 * The callback may return either `void` or a future. | |
| 20 */ | |
| 21 typedef ControllerCancelCallback(); | |
| 22 | |
| 23 /** | |
| 24 * A controller with the stream it controls. | |
| 25 * | |
| 26 * This controller allows sending data, error and done events on | |
| 27 * its [stream]. | |
| 28 * This class can be used to create a simple stream that others | |
| 29 * can listen on, and to push events to that stream. | |
| 30 * | |
| 31 * It's possible to check whether the stream is paused or not, and whether | |
| 32 * it has subscribers or not, as well as getting a callback when either of | |
| 33 * these change. | |
| 34 * | |
| 35 * If the stream starts or stops having listeners (first listener subscribing, | |
| 36 * last listener unsubscribing), the `onSubscriptionStateChange` callback | |
| 37 * is notified as soon as possible. If the subscription stat changes during | |
| 38 * an event firing or a callback being executed, the change will not be reported | |
| 39 * until the current event or callback has finished. | |
| 40 * If the pause state has also changed during an event or callback, only the | |
| 41 * subscription state callback is notified. | |
| 42 * | |
| 43 * If the subscriber state has not changed, but the pause state has, the | |
| 44 * `onPauseStateChange` callback is notified as soon as possible, after firing | |
| 45 * a current event or completing another callback. This happens if the stream | |
| 46 * is not paused, and a listener pauses it, or if the stream has been resumed | |
| 47 * from pause and has no pending events. If the listeners resume a paused stream | |
| 48 * while it still has queued events, the controller will still consider the | |
| 49 * stream paused until all queued events have been dispatched. | |
| 50 * | |
| 51 * Whether to invoke a callback depends only on the state before and after | |
| 52 * a stream action, for example firing an event. If the state changes multiple | |
| 53 * times during the action, and then ends up in the same state as before, no | |
| 54 * callback is performed. | |
| 55 * | |
| 56 * If listeners are added after the stream has completed (sent a "done" event), | |
| 57 * the listeners will be sent a "done" event eventually, but they won't affect | |
| 58 * the stream at all, and won't trigger callbacks. From the controller's point | |
| 59 * of view, the stream is completely inert when has completed. | |
| 60 */ | |
| 61 abstract class StreamController<T> implements StreamSink<T> { | |
| 62 /** The stream that this controller is controlling. */ | |
| 63 Stream<T> get stream; | |
| 64 | |
| 65 /** | |
| 66 * A controller with a [stream] that supports only one single subscriber. | |
| 67 * | |
| 68 * If [sync] is true, the returned stream controller is a | |
| 69 * [SynchronousStreamController], and must be used with the care | |
| 70 * and attention necessary to not break the [Stream] contract. | |
| 71 * See [Completer.sync] for some explanations on when a synchronous | |
| 72 * dispatching can be used. | |
| 73 * If in doubt, keep the controller non-sync. | |
| 74 * | |
| 75 * A Stream should be inert until a subscriber starts listening on it (using | |
| 76 * the [onListen] callback to start producing events). Streams should not | |
| 77 * leak resources (like websockets) when no user ever listens on the stream. | |
| 78 * | |
| 79 * The controller buffers all incoming events until a subscriber is | |
| 80 * registered, but this feature should only be used in rare circumstances. | |
| 81 * | |
| 82 * The [onPause] function is called when the stream becomes | |
| 83 * paused. [onResume] is called when the stream resumed. | |
| 84 * | |
| 85 * The [onListen] callback is called when the stream | |
| 86 * receives its listener and [onCancel] when the listener ends | |
| 87 * its subscription. If [onCancel] needs to perform an asynchronous operation, | |
| 88 * [onCancel] should return a future that completes when the cancel operation | |
| 89 * is done. | |
| 90 * | |
| 91 * If the stream is canceled before the controller needs new data the | |
| 92 * [onResume] call might not be executed. | |
| 93 */ | |
| 94 factory StreamController({void onListen(), | |
| 95 void onPause(), | |
| 96 void onResume(), | |
| 97 onCancel(), | |
| 98 bool sync: false}) { | |
| 99 return sync | |
| 100 ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel) | |
| 101 : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel); | |
| 102 } | |
| 103 | |
| 104 /** | |
| 105 * A controller where [stream] can be listened to more than once. | |
| 106 * | |
| 107 * The [Stream] returned by [stream] is a broadcast stream. | |
| 108 * It can be listened to more than once. | |
| 109 * | |
| 110 * A Stream should be inert until a subscriber starts listening on it (using | |
| 111 * the [onListen] callback to start producing events). Streams should not | |
| 112 * leak resources (like websockets) when no user ever listens on the stream. | |
| 113 * | |
| 114 * Broadcast streams do not buffer events when there is no listener. | |
| 115 * | |
| 116 * The controller distributes any events to all currently subscribed | |
| 117 * listeners at the time when [add], [addError] or [close] is called. | |
| 118 * It is not allowed to call `add`, `addError`, or `close` before a previous | |
| 119 * call has returned. The controller does not have any internal queue of | |
| 120 * events, and if there are no listeners at the time the event is added, | |
| 121 * it will just be dropped, or, if it is an error, be reported as uncaught. | |
| 122 * | |
| 123 * Each listener subscription is handled independently, | |
| 124 * and if one pauses, only the pausing listener is affected. | |
| 125 * A paused listener will buffer events internally until unpaused or canceled. | |
| 126 * | |
| 127 * If [sync] is true, events may be fired directly by the stream's | |
| 128 * subscriptions during an [add], [addError] or [close] call. | |
| 129 * The returned stream controller is a [SynchronousStreamController], | |
| 130 * and must be used with the care and attention necessary to not break | |
| 131 * the [Stream] contract. | |
| 132 * See [Completer.sync] for some explanations on when a synchronous | |
| 133 * dispatching can be used. | |
| 134 * If in doubt, keep the controller non-sync. | |
| 135 * | |
| 136 * If [sync] is false, the event will always be fired at a later time, | |
| 137 * after the code adding the event has completed. | |
| 138 * In that case, no guarantees are given with regard to when | |
| 139 * multiple listeners get the events, except that each listener will get | |
| 140 * all events in the correct order. Each subscription handles the events | |
| 141 * individually. | |
| 142 * If two events are sent on an async controller with two listeners, | |
| 143 * one of the listeners may get both events | |
| 144 * before the other listener gets any. | |
| 145 * A listener must be subscribed both when the event is initiated | |
| 146 * (that is, when [add] is called) | |
| 147 * and when the event is later delivered, | |
| 148 * in order to receive the event. | |
| 149 * | |
| 150 * The [onListen] callback is called when the first listener is subscribed, | |
| 151 * and the [onCancel] is called when there are no longer any active listeners. | |
| 152 * If a listener is added again later, after the [onCancel] was called, | |
| 153 * the [onListen] will be called again. | |
| 154 */ | |
| 155 factory StreamController.broadcast({void onListen(), | |
| 156 void onCancel(), | |
| 157 bool sync: false}) { | |
| 158 return sync | |
| 159 ? new _SyncBroadcastStreamController<T>(onListen, onCancel) | |
| 160 : new _AsyncBroadcastStreamController<T>(onListen, onCancel); | |
| 161 } | |
| 162 | |
| 163 /** | |
| 164 * The callback which is called when the stream is listened to. | |
| 165 * | |
| 166 * May be set to `null`, in which case no callback will happen. | |
| 167 */ | |
| 168 ControllerCallback get onListen; | |
| 169 | |
| 170 void set onListen(void onListenHandler()); | |
| 171 | |
| 172 /** | |
| 173 * The callback which is called when the stream is paused. | |
| 174 * | |
| 175 * May be set to `null`, in which case no callback will happen. | |
| 176 * | |
| 177 * Pause related callbacks are not supported on broadcast stream controllers. | |
| 178 */ | |
| 179 ControllerCallback get onPause; | |
| 180 | |
| 181 void set onPause(void onPauseHandler()); | |
| 182 | |
| 183 /** | |
| 184 * The callback which is called when the stream is resumed. | |
| 185 * | |
| 186 * May be set to `null`, in which case no callback will happen. | |
| 187 * | |
| 188 * Pause related callbacks are not supported on broadcast stream controllers. | |
| 189 */ | |
| 190 ControllerCallback get onResume; | |
| 191 | |
| 192 void set onResume(void onResumeHandler()); | |
| 193 | |
| 194 /** | |
| 195 * The callback which is called when the stream is canceled. | |
| 196 * | |
| 197 * May be set to `null`, in which case no callback will happen. | |
| 198 */ | |
| 199 ControllerCancelCallback get onCancel; | |
| 200 | |
| 201 void set onCancel(onCancelHandler()); | |
| 202 | |
| 203 /** | |
| 204 * Returns a view of this object that only exposes the [StreamSink] interface. | |
| 205 */ | |
| 206 StreamSink<T> get sink; | |
| 207 | |
| 208 /** | |
| 209 * Whether the stream controller is closed for adding more events. | |
| 210 * | |
| 211 * The controller becomes closed by calling the [close] method. | |
| 212 * New events cannot be added, by calling [add] or [addError], | |
| 213 * to a closed controller. | |
| 214 * | |
| 215 * If the controller is closed, | |
| 216 * the "done" event might not have been delivered yet, | |
| 217 * but it has been scheduled, and it is too late to add more events. | |
| 218 */ | |
| 219 bool get isClosed; | |
| 220 | |
| 221 /** | |
| 222 * Whether the subscription would need to buffer events. | |
| 223 * | |
| 224 * This is the case if the controller's stream has a listener and it is | |
| 225 * paused, or if it has not received a listener yet. In that case, the | |
| 226 * controller is considered paused as well. | |
| 227 * | |
| 228 * A broadcast stream controller is never considered paused. It always | |
| 229 * forwards its events to all uncanceled subscriptions, if any, | |
| 230 * and let the subscriptions handle their own pausing and buffering. | |
| 231 */ | |
| 232 bool get isPaused; | |
| 233 | |
| 234 /** Whether there is a subscriber on the [Stream]. */ | |
| 235 bool get hasListener; | |
| 236 | |
| 237 /** | |
| 238 * Send or enqueue an error event. | |
| 239 * | |
| 240 * If [error] is `null`, it is replaced by a [NullThrownError]. | |
| 241 */ | |
| 242 void addError(Object error, [StackTrace stackTrace]); | |
| 243 | |
| 244 /** | |
| 245 * Receives events from [source] and puts them into this controller's stream. | |
| 246 * | |
| 247 * Returns a future which completes when the source stream is done. | |
| 248 * | |
| 249 * Events must not be added directly to this controller using [add], | |
| 250 * [addError], [close] or [addStream], until the returned future | |
| 251 * is complete. | |
| 252 * | |
| 253 * Data and error events are forwarded to this controller's stream. A done | |
| 254 * event on the source will end the `addStream` operation and complete the | |
| 255 * returned future. | |
| 256 * | |
| 257 * If [cancelOnError] is true, only the first error on [source] is | |
| 258 * forwarded to the controller's stream, and the `addStream` ends | |
| 259 * after this. If [cancelOnError] is false, all errors are forwarded | |
| 260 * and only a done event will end the `addStream`. | |
| 261 */ | |
| 262 Future addStream(Stream<T> source, {bool cancelOnError: true}); | |
| 263 } | |
| 264 | |
| 265 | |
| 266 /** | |
| 267 * A stream controller that delivers its events synchronously. | |
| 268 * | |
| 269 * A synchronous stream controller is intended for cases where | |
| 270 * an already asynchronous event triggers an event on a stream. | |
| 271 * | |
| 272 * Instead of adding the event to the stream in a later microtask, | |
| 273 * causing extra latency, the event is instead fired immediately by the | |
| 274 * synchronous stream controller, as if the stream event was | |
| 275 * the current event or microtask. | |
| 276 * | |
| 277 * The synchronous stream controller can be used to break the contract | |
| 278 * on [Stream], and it must be used carefully to avoid doing so. | |
| 279 * | |
| 280 * The only advantage to using a [SynchronousStreamController] over a | |
| 281 * normal [StreamController] is the improved latency. | |
| 282 * Only use the synchronous version if the improvement is significant, | |
| 283 * and if its use is safe. Otherwise just use a normal stream controller, | |
| 284 * which will always have the correct behavior for a [Stream], and won't | |
| 285 * accidentally break other code. | |
| 286 * | |
| 287 * Adding events to a synchronous controller should only happen as the | |
| 288 * very last part of a the handling of the original event. | |
| 289 * At that point, adding an event to the stream is equivalent to | |
| 290 * returning to the event loop and adding the event in the next microtask. | |
| 291 * | |
| 292 * Each listener callback will be run as if it was a top-level event | |
| 293 * or microtask. This means that if it throws, the error will be reported as | |
| 294 * uncaught as soon as possible. | |
| 295 * This is one reason to add the event as the last thing in the original event | |
| 296 * handler - any action done after adding the event will delay the report of | |
| 297 * errors in the event listener callbacks. | |
| 298 * | |
| 299 * If an event is added in a setting that isn't known to be another event, | |
| 300 * it may cause the stream's listener to get that event before the listener | |
| 301 * is ready to handle it. We promise that after calling [Stream.listen], | |
| 302 * you won't get any events until the code doing the listen has completed. | |
| 303 * Calling [add] in response to a function call of unknown origin may break | |
| 304 * that promise. | |
| 305 * | |
| 306 * An [onListen] callback from the controller is *not* an asynchronous event, | |
| 307 * and adding events to the controller in the `onListen` callback is always | |
| 308 * wrong. The events will be delivered before the listener has even received | |
| 309 * the subscription yet. | |
| 310 * | |
| 311 * The synchronous broadcast stream controller also has a restrictions that a | |
| 312 * normal stream controller does not: | |
| 313 * The [add], [addError], [close] and [addStream] methods *must not* be | |
| 314 * called while an event is being delivered. | |
| 315 * That is, if a callback on a subscription on the controller's stream causes | |
| 316 * a call to any of the functions above, the call will fail. | |
| 317 * A broadcast stream may have more than one listener, and if an | |
| 318 * event is added synchronously while another is being also in the process | |
| 319 * of being added, the latter event might reach some listeners before | |
| 320 * the former. To prevent that, an event cannot be added while a previous | |
| 321 * event is being fired. | |
| 322 * This guarantees that an event is fully delivered when the | |
| 323 * first [add], [addError] or [close] returns, | |
| 324 * and further events will be delivered in the correct order. | |
| 325 * | |
| 326 * This still only guarantees that the event is delivered to the subscription. | |
| 327 * If the subscription is paused, the actual callback may still happen later, | |
| 328 * and the event will instead be buffered by the subscription. | |
| 329 * Barring pausing, and the following buffered events that haven't been | |
| 330 * delivered yet, callbacks will be called synchronously when an event is added. | |
| 331 * | |
| 332 * Adding an event to a synchronous non-broadcast stream controller while | |
| 333 * another event is in progress may cause the second event to be delayed | |
| 334 * and not be delivered synchronously, and until that event is delivered, | |
| 335 * the controller will not act synchronously. | |
| 336 */ | |
| 337 abstract class SynchronousStreamController<T> implements StreamController<T> { | |
| 338 /** | |
| 339 * Adds event to the controller's stream. | |
| 340 * | |
| 341 * As [StreamController.add], but must not be called while an event is | |
| 342 * being added by [add], [addError] or [close]. | |
| 343 */ | |
| 344 void add(T data); | |
| 345 | |
| 346 /** | |
| 347 * Adds error to the controller's stream. | |
| 348 * | |
| 349 * As [StreamController.addError], but must not be called while an event is | |
| 350 * being added by [add], [addError] or [close]. | |
| 351 */ | |
| 352 void addError(Object error, [StackTrace stackTrace]); | |
| 353 | |
| 354 /** | |
| 355 * Closes the controller's stream. | |
| 356 * | |
| 357 * As [StreamController.close], but must not be called while an event is | |
| 358 * being added by [add], [addError] or [close]. | |
| 359 */ | |
| 360 Future close(); | |
| 361 } | |
| 362 | |
| 363 abstract class _StreamControllerLifecycle<T> { | |
| 364 StreamSubscription<T> _subscribe( | |
| 365 void onData(T data), | |
| 366 Function onError, | |
| 367 void onDone(), | |
| 368 bool cancelOnError); | |
| 369 void _recordPause(StreamSubscription<T> subscription) {} | |
| 370 void _recordResume(StreamSubscription<T> subscription) {} | |
| 371 Future _recordCancel(StreamSubscription<T> subscription) => null; | |
| 372 } | |
| 373 | |
| 374 /** | |
| 375 * Default implementation of [StreamController]. | |
| 376 * | |
| 377 * Controls a stream that only supports a single controller. | |
| 378 */ | |
| 379 abstract class _StreamController<T> implements StreamController<T>, | |
| 380 _StreamControllerLifecycle<T>, | |
| 381 _EventSink<T>, | |
| 382 _EventDispatch<T> { | |
| 383 // The states are bit-flags. More than one can be set at a time. | |
| 384 // | |
| 385 // The "subscription state" goes through the states: | |
| 386 // initial -> subscribed -> canceled. | |
| 387 // These are mutually exclusive. | |
| 388 // The "closed" state records whether the [close] method has been called | |
| 389 // on the controller. This can be done at any time. If done before | |
| 390 // subscription, the done event is queued. If done after cancel, the done | |
| 391 // event is ignored (just as any other event after a cancel). | |
| 392 | |
| 393 /** The controller is in its initial state with no subscription. */ | |
| 394 static const int _STATE_INITIAL = 0; | |
| 395 /** The controller has a subscription, but hasn't been closed or canceled. */ | |
| 396 static const int _STATE_SUBSCRIBED = 1; | |
| 397 /** The subscription is canceled. */ | |
| 398 static const int _STATE_CANCELED = 2; | |
| 399 /** Mask for the subscription state. */ | |
| 400 static const int _STATE_SUBSCRIPTION_MASK = 3; | |
| 401 | |
| 402 // The following state relate to the controller, not the subscription. | |
| 403 // If closed, adding more events is not allowed. | |
| 404 // If executing an [addStream], new events are not allowed either, but will | |
| 405 // be added by the stream. | |
| 406 | |
| 407 /** | |
| 408 * The controller is closed due to calling [close]. | |
| 409 * | |
| 410 * When the stream is closed, you can neither add new events nor add new | |
| 411 * listeners. | |
| 412 */ | |
| 413 static const int _STATE_CLOSED = 4; | |
| 414 /** | |
| 415 * The controller is in the middle of an [addStream] operation. | |
| 416 * | |
| 417 * While adding events from a stream, no new events can be added directly | |
| 418 * on the controller. | |
| 419 */ | |
| 420 static const int _STATE_ADDSTREAM = 8; | |
| 421 | |
| 422 /** | |
| 423 * Field containing different data depending on the current subscription | |
| 424 * state. | |
| 425 * | |
| 426 * If [_state] is [_STATE_INITIAL], the field may contain a [_PendingEvents] | |
| 427 * for events added to the controller before a subscription. | |
| 428 * | |
| 429 * While [_state] is [_STATE_SUBSCRIBED], the field contains the subscription. | |
| 430 * | |
| 431 * When [_state] is [_STATE_CANCELED] the field is currently not used. | |
| 432 */ | |
| 433 var _varData; | |
| 434 | |
| 435 /** Current state of the controller. */ | |
| 436 int _state = _STATE_INITIAL; | |
| 437 | |
| 438 /** | |
| 439 * Future completed when the stream sends its last event. | |
| 440 * | |
| 441 * This is also the future returned by [close]. | |
| 442 */ | |
| 443 // TODO(lrn): Could this be stored in the varData field too, if it's not | |
| 444 // accessed until the call to "close"? Then we need to special case if it's | |
| 445 // accessed earlier, or if close is called before subscribing. | |
| 446 _Future _doneFuture; | |
| 447 | |
| 448 ControllerCallback onListen; | |
| 449 ControllerCallback onPause; | |
| 450 ControllerCallback onResume; | |
| 451 ControllerCancelCallback onCancel; | |
| 452 | |
| 453 _StreamController(this.onListen, | |
| 454 this.onPause, | |
| 455 this.onResume, | |
| 456 this.onCancel); | |
| 457 | |
| 458 // Return a new stream every time. The streams are equal, but not identical. | |
| 459 Stream<T> get stream => new _ControllerStream<T>(this); | |
| 460 | |
| 461 /** | |
| 462 * Returns a view of this object that only exposes the [StreamSink] interface. | |
| 463 */ | |
| 464 StreamSink<T> get sink => new _StreamSinkWrapper<T>(this); | |
| 465 | |
| 466 /** | |
| 467 * Whether a listener has existed and been canceled. | |
| 468 * | |
| 469 * After this, adding more events will be ignored. | |
| 470 */ | |
| 471 bool get _isCanceled => (_state & _STATE_CANCELED) != 0; | |
| 472 | |
| 473 /** Whether there is an active listener. */ | |
| 474 bool get hasListener => (_state & _STATE_SUBSCRIBED) != 0; | |
| 475 | |
| 476 /** Whether there has not been a listener yet. */ | |
| 477 bool get _isInitialState => | |
| 478 (_state & _STATE_SUBSCRIPTION_MASK) == _STATE_INITIAL; | |
| 479 | |
| 480 bool get isClosed => (_state & _STATE_CLOSED) != 0; | |
| 481 | |
| 482 bool get isPaused => hasListener ? _subscription._isInputPaused | |
| 483 : !_isCanceled; | |
| 484 | |
| 485 bool get _isAddingStream => (_state & _STATE_ADDSTREAM) != 0; | |
| 486 | |
| 487 /** New events may not be added after close, or during addStream. */ | |
| 488 bool get _mayAddEvent => (_state < _STATE_CLOSED); | |
| 489 | |
| 490 // Returns the pending events. | |
| 491 // Pending events are events added before a subscription exists. | |
| 492 // They are added to the subscription when it is created. | |
| 493 // Pending events, if any, are kept in the _varData field until the | |
| 494 // stream is listened to. | |
| 495 // While adding a stream, pending events are moved into the | |
| 496 // state object to allow the state object to use the _varData field. | |
| 497 _PendingEvents<T> get _pendingEvents { | |
| 498 assert(_isInitialState); | |
| 499 if (!_isAddingStream) { | |
| 500 return _varData as Object /*=_PendingEvents<T>*/; | |
| 501 } | |
| 502 _StreamControllerAddStreamState<T> state = | |
| 503 _varData as Object /*=_StreamControllerAddStreamState<T>*/; | |
| 504 return state.varData as Object /*=_PendingEvents<T>*/; | |
| 505 } | |
| 506 | |
| 507 // Returns the pending events, and creates the object if necessary. | |
| 508 _StreamImplEvents<T> _ensurePendingEvents() { | |
| 509 assert(_isInitialState); | |
| 510 if (!_isAddingStream) { | |
| 511 if (_varData == null) _varData = new _StreamImplEvents<T>(); | |
| 512 return _varData as Object /*=_StreamImplEvents<T>*/; | |
| 513 } | |
| 514 _StreamControllerAddStreamState<T> state = | |
| 515 _varData as Object /*=_StreamControllerAddStreamState<T>*/; | |
| 516 if (state.varData == null) state.varData = new _StreamImplEvents<T>(); | |
| 517 return state.varData as Object /*=_StreamImplEvents<T>*/; | |
| 518 } | |
| 519 | |
| 520 // Get the current subscription. | |
| 521 // If we are adding a stream, the subscription is moved into the state | |
| 522 // object to allow the state object to use the _varData field. | |
| 523 _ControllerSubscription<T> get _subscription { | |
| 524 assert(hasListener); | |
| 525 if (_isAddingStream) { | |
| 526 _StreamControllerAddStreamState<T> addState = | |
| 527 _varData as Object /*=_StreamControllerAddStreamState<T>*/; | |
| 528 return addState.varData as Object /*=_ControllerSubscription<T>*/; | |
| 529 } | |
| 530 return _varData as Object /*=_ControllerSubscription<T>*/; | |
| 531 } | |
| 532 | |
| 533 /** | |
| 534 * Creates an error describing why an event cannot be added. | |
| 535 * | |
| 536 * The reason, and therefore the error message, depends on the current state. | |
| 537 */ | |
| 538 Error _badEventState() { | |
| 539 if (isClosed) { | |
| 540 return new StateError("Cannot add event after closing"); | |
| 541 } | |
| 542 assert(_isAddingStream); | |
| 543 return new StateError("Cannot add event while adding a stream"); | |
| 544 } | |
| 545 | |
| 546 // StreamSink interface. | |
| 547 Future addStream(Stream<T> source, {bool cancelOnError: true}) { | |
| 548 if (!_mayAddEvent) throw _badEventState(); | |
| 549 if (_isCanceled) return new _Future.immediate(null); | |
| 550 _StreamControllerAddStreamState<T> addState = | |
| 551 new _StreamControllerAddStreamState<T>(this, | |
| 552 _varData, | |
| 553 source, | |
| 554 cancelOnError); | |
| 555 _varData = addState; | |
| 556 _state |= _STATE_ADDSTREAM; | |
| 557 return addState.addStreamFuture; | |
| 558 } | |
| 559 | |
| 560 /** | |
| 561 * Returns a future that is completed when the stream is done | |
| 562 * processing events. | |
| 563 * | |
| 564 * This happens either when the done event has been sent, or if the | |
| 565 * subscriber of a single-subscription stream is cancelled. | |
| 566 */ | |
| 567 Future get done => _ensureDoneFuture(); | |
| 568 | |
| 569 Future _ensureDoneFuture() { | |
| 570 if (_doneFuture == null) { | |
| 571 _doneFuture = _isCanceled ? Future._nullFuture : new _Future(); | |
| 572 } | |
| 573 return _doneFuture; | |
| 574 } | |
| 575 | |
| 576 /** | |
| 577 * Send or enqueue a data event. | |
| 578 */ | |
| 579 void add(T value) { | |
| 580 if (!_mayAddEvent) throw _badEventState(); | |
| 581 _add(value); | |
| 582 } | |
| 583 | |
| 584 /** | |
| 585 * Send or enqueue an error event. | |
| 586 */ | |
| 587 void addError(Object error, [StackTrace stackTrace]) { | |
| 588 if (!_mayAddEvent) throw _badEventState(); | |
| 589 error = _nonNullError(error); | |
| 590 AsyncError replacement = Zone.current.errorCallback(error, stackTrace); | |
| 591 if (replacement != null) { | |
| 592 error = _nonNullError(replacement.error); | |
| 593 stackTrace = replacement.stackTrace; | |
| 594 } | |
| 595 _addError(error, stackTrace); | |
| 596 } | |
| 597 | |
| 598 /** | |
| 599 * Closes this controller and sends a done event on the stream. | |
| 600 * | |
| 601 * The first time a controller is closed, a "done" event is added to its | |
| 602 * stream. | |
| 603 * | |
| 604 * You are allowed to close the controller more than once, but only the first | |
| 605 * call has any effect. | |
| 606 * | |
| 607 * After closing, no further events may be added using [add], [addError] | |
| 608 * or [addStream]. | |
| 609 * | |
| 610 * The returned future is completed when the done event has been delivered. | |
| 611 */ | |
| 612 Future close() { | |
| 613 if (isClosed) { | |
| 614 return _ensureDoneFuture(); | |
| 615 } | |
| 616 if (!_mayAddEvent) throw _badEventState(); | |
| 617 _closeUnchecked(); | |
| 618 return _ensureDoneFuture(); | |
| 619 } | |
| 620 | |
| 621 void _closeUnchecked() { | |
| 622 _state |= _STATE_CLOSED; | |
| 623 if (hasListener) { | |
| 624 _sendDone(); | |
| 625 } else if (_isInitialState) { | |
| 626 _ensurePendingEvents().add(const _DelayedDone()); | |
| 627 } | |
| 628 } | |
| 629 | |
| 630 // EventSink interface. Used by the [addStream] events. | |
| 631 | |
| 632 // Add data event, used both by the [addStream] events and by [add]. | |
| 633 void _add(T value) { | |
| 634 if (hasListener) { | |
| 635 _sendData(value); | |
| 636 } else if (_isInitialState) { | |
| 637 _ensurePendingEvents().add(new _DelayedData<T>(value)); | |
| 638 } | |
| 639 } | |
| 640 | |
| 641 void _addError(Object error, StackTrace stackTrace) { | |
| 642 if (hasListener) { | |
| 643 _sendError(error, stackTrace); | |
| 644 } else if (_isInitialState) { | |
| 645 _ensurePendingEvents().add(new _DelayedError(error, stackTrace)); | |
| 646 } | |
| 647 } | |
| 648 | |
| 649 void _close() { | |
| 650 // End of addStream stream. | |
| 651 assert(_isAddingStream); | |
| 652 _StreamControllerAddStreamState<T> addState = | |
| 653 _varData as Object /*=_StreamControllerAddStreamState<T>*/; | |
| 654 _varData = addState.varData; | |
| 655 _state &= ~_STATE_ADDSTREAM; | |
| 656 addState.complete(); | |
| 657 } | |
| 658 | |
| 659 // _StreamControllerLifeCycle interface | |
| 660 | |
| 661 StreamSubscription<T> _subscribe( | |
| 662 void onData(T data), | |
| 663 Function onError, | |
| 664 void onDone(), | |
| 665 bool cancelOnError) { | |
| 666 if (!_isInitialState) { | |
| 667 throw new StateError("Stream has already been listened to."); | |
| 668 } | |
| 669 _ControllerSubscription<T> subscription = | |
| 670 new _ControllerSubscription<T>(this, onData, onError, onDone, | |
| 671 cancelOnError); | |
| 672 | |
| 673 _PendingEvents<T> pendingEvents = _pendingEvents; | |
| 674 _state |= _STATE_SUBSCRIBED; | |
| 675 if (_isAddingStream) { | |
| 676 _StreamControllerAddStreamState<T> addState = | |
| 677 _varData as Object /*=_StreamControllerAddStreamState<T>*/; | |
| 678 addState.varData = subscription; | |
| 679 addState.resume(); | |
| 680 } else { | |
| 681 _varData = subscription; | |
| 682 } | |
| 683 subscription._setPendingEvents(pendingEvents); | |
| 684 subscription._guardCallback(() { | |
| 685 _runGuarded(onListen); | |
| 686 }); | |
| 687 | |
| 688 return subscription; | |
| 689 } | |
| 690 | |
| 691 Future _recordCancel(StreamSubscription<T> subscription) { | |
| 692 // When we cancel, we first cancel any stream being added, | |
| 693 // Then we call `onCancel`, and finally the _doneFuture is completed. | |
| 694 // If either of addStream's cancel or `onCancel` returns a future, | |
| 695 // we wait for it before continuing. | |
| 696 // Any error during this process ends up in the returned future. | |
| 697 // If more errors happen, we act as if it happens inside nested try/finallys | |
| 698 // or whenComplete calls, and only the last error ends up in the | |
| 699 // returned future. | |
| 700 Future result; | |
| 701 if (_isAddingStream) { | |
| 702 _StreamControllerAddStreamState<T> addState = | |
| 703 _varData as Object /*=_StreamControllerAddStreamState<T>*/; | |
| 704 result = addState.cancel(); | |
| 705 } | |
| 706 _varData = null; | |
| 707 _state = | |
| 708 (_state & ~(_STATE_SUBSCRIBED | _STATE_ADDSTREAM)) | _STATE_CANCELED; | |
| 709 | |
| 710 if (onCancel != null) { | |
| 711 if (result == null) { | |
| 712 // Only introduce a future if one is needed. | |
| 713 // If _onCancel returns null, no future is needed. | |
| 714 try { | |
| 715 result = onCancel(); | |
| 716 } catch (e, s) { | |
| 717 // Return the error in the returned future. | |
| 718 // Complete it asynchronously, so there is time for a listener | |
| 719 // to handle the error. | |
| 720 result = new _Future().._asyncCompleteError(e, s); | |
| 721 } | |
| 722 } else { | |
| 723 // Simpler case when we already know that we will return a future. | |
| 724 result = result.whenComplete(onCancel); | |
| 725 } | |
| 726 } | |
| 727 | |
| 728 void complete() { | |
| 729 if (_doneFuture != null && _doneFuture._mayComplete) { | |
| 730 _doneFuture._asyncComplete(null); | |
| 731 } | |
| 732 } | |
| 733 | |
| 734 if (result != null) { | |
| 735 result = result.whenComplete(complete); | |
| 736 } else { | |
| 737 complete(); | |
| 738 } | |
| 739 | |
| 740 return result; | |
| 741 } | |
| 742 | |
| 743 void _recordPause(StreamSubscription<T> subscription) { | |
| 744 if (_isAddingStream) { | |
| 745 _StreamControllerAddStreamState<T> addState = | |
| 746 _varData as Object /*=_StreamControllerAddStreamState<T>*/; | |
| 747 addState.pause(); | |
| 748 } | |
| 749 _runGuarded(onPause); | |
| 750 } | |
| 751 | |
| 752 void _recordResume(StreamSubscription<T> subscription) { | |
| 753 if (_isAddingStream) { | |
| 754 _StreamControllerAddStreamState<T> addState = | |
| 755 _varData as Object /*=_StreamControllerAddStreamState<T>*/; | |
| 756 addState.resume(); | |
| 757 } | |
| 758 _runGuarded(onResume); | |
| 759 } | |
| 760 } | |
| 761 | |
| 762 abstract class _SyncStreamControllerDispatch<T> | |
| 763 implements _StreamController<T>, SynchronousStreamController<T> { | |
| 764 int get _state; | |
| 765 void set _state(int state); | |
| 766 | |
| 767 void _sendData(T data) { | |
| 768 _subscription._add(data); | |
| 769 } | |
| 770 | |
| 771 void _sendError(Object error, StackTrace stackTrace) { | |
| 772 _subscription._addError(error, stackTrace); | |
| 773 } | |
| 774 | |
| 775 void _sendDone() { | |
| 776 _subscription._close(); | |
| 777 } | |
| 778 } | |
| 779 | |
| 780 abstract class _AsyncStreamControllerDispatch<T> | |
| 781 implements _StreamController<T> { | |
| 782 void _sendData(T data) { | |
| 783 _subscription._addPending(new _DelayedData<dynamic /*=T*/>(data)); | |
| 784 } | |
| 785 | |
| 786 void _sendError(Object error, StackTrace stackTrace) { | |
| 787 _subscription._addPending(new _DelayedError(error, stackTrace)); | |
| 788 } | |
| 789 | |
| 790 void _sendDone() { | |
| 791 _subscription._addPending(const _DelayedDone()); | |
| 792 } | |
| 793 } | |
| 794 | |
| 795 // TODO(lrn): Use common superclass for callback-controllers when VM supports | |
| 796 // constructors in mixin superclasses. | |
| 797 | |
| 798 class _AsyncStreamController<T> = _StreamController<T> | |
| 799 with _AsyncStreamControllerDispatch<T>; | |
| 800 | |
| 801 class _SyncStreamController<T> = _StreamController<T> | |
| 802 with _SyncStreamControllerDispatch<T>; | |
| 803 | |
| 804 typedef _NotificationHandler(); | |
| 805 | |
| 806 Future _runGuarded(_NotificationHandler notificationHandler) { | |
| 807 if (notificationHandler == null) return null; | |
| 808 try { | |
| 809 var result = notificationHandler(); | |
| 810 if (result is Future) return result; | |
| 811 return null; | |
| 812 } catch (e, s) { | |
| 813 Zone.current.handleUncaughtError(e, s); | |
| 814 } | |
| 815 } | |
| 816 | |
| 817 class _ControllerStream<T> extends _StreamImpl<T> { | |
| 818 _StreamControllerLifecycle<T> _controller; | |
| 819 | |
| 820 _ControllerStream(this._controller); | |
| 821 | |
| 822 StreamSubscription<T> _createSubscription( | |
| 823 void onData(T data), | |
| 824 Function onError, | |
| 825 void onDone(), | |
| 826 bool cancelOnError) => | |
| 827 _controller._subscribe(onData, onError, onDone, cancelOnError); | |
| 828 | |
| 829 // Override == and hashCode so that new streams returned by the same | |
| 830 // controller are considered equal. The controller returns a new stream | |
| 831 // each time it's queried, but doesn't have to cache the result. | |
| 832 | |
| 833 int get hashCode => _controller.hashCode ^ 0x35323532; | |
| 834 | |
| 835 bool operator==(Object other) { | |
| 836 if (identical(this, other)) return true; | |
| 837 if (other is! _ControllerStream) return false; | |
| 838 _ControllerStream otherStream = other; | |
| 839 return identical(otherStream._controller, this._controller); | |
| 840 } | |
| 841 } | |
| 842 | |
| 843 class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> { | |
| 844 final _StreamControllerLifecycle<T> _controller; | |
| 845 | |
| 846 _ControllerSubscription(this._controller, void onData(T data), | |
| 847 Function onError, void onDone(), bool cancelOnError) | |
| 848 : super(onData, onError, onDone, cancelOnError); | |
| 849 | |
| 850 Future _onCancel() { | |
| 851 return _controller._recordCancel(this); | |
| 852 } | |
| 853 | |
| 854 void _onPause() { | |
| 855 _controller._recordPause(this); | |
| 856 } | |
| 857 | |
| 858 void _onResume() { | |
| 859 _controller._recordResume(this); | |
| 860 } | |
| 861 } | |
| 862 | |
| 863 | |
| 864 /** A class that exposes only the [StreamSink] interface of an object. */ | |
| 865 class _StreamSinkWrapper<T> implements StreamSink<T> { | |
| 866 final StreamController _target; | |
| 867 _StreamSinkWrapper(this._target); | |
| 868 void add(T data) { _target.add(data); } | |
| 869 void addError(Object error, [StackTrace stackTrace]) { | |
| 870 _target.addError(error, stackTrace); | |
| 871 } | |
| 872 Future close() => _target.close(); | |
| 873 Future addStream(Stream<T> source, {bool cancelOnError: true}) => | |
| 874 _target.addStream(source, cancelOnError: cancelOnError); | |
| 875 Future get done => _target.done; | |
| 876 } | |
| 877 | |
| 878 /** | |
| 879 * Object containing the state used to handle [StreamController.addStream]. | |
| 880 */ | |
| 881 class _AddStreamState<T> { | |
| 882 // [_Future] returned by call to addStream. | |
| 883 final _Future addStreamFuture; | |
| 884 | |
| 885 // Subscription on stream argument to addStream. | |
| 886 final StreamSubscription addSubscription; | |
| 887 | |
| 888 _AddStreamState(_EventSink<T> controller, Stream source, bool cancelOnError) | |
| 889 : addStreamFuture = new _Future(), | |
| 890 addSubscription = source.listen(controller._add, | |
| 891 onError: cancelOnError | |
| 892 ? makeErrorHandler(controller) | |
| 893 : controller._addError, | |
| 894 onDone: controller._close, | |
| 895 cancelOnError: cancelOnError); | |
| 896 | |
| 897 static makeErrorHandler(_EventSink controller) => | |
| 898 (e, StackTrace s) { | |
| 899 controller._addError(e, s); | |
| 900 controller._close(); | |
| 901 }; | |
| 902 | |
| 903 void pause() { | |
| 904 addSubscription.pause(); | |
| 905 } | |
| 906 | |
| 907 void resume() { | |
| 908 addSubscription.resume(); | |
| 909 } | |
| 910 | |
| 911 /** | |
| 912 * Stop adding the stream. | |
| 913 * | |
| 914 * Complete the future returned by `StreamController.addStream` when | |
| 915 * the cancel is complete. | |
| 916 * | |
| 917 * Return a future if the cancel takes time, otherwise return `null`. | |
| 918 */ | |
| 919 Future cancel() { | |
| 920 var cancel = addSubscription.cancel(); | |
| 921 if (cancel == null) { | |
| 922 addStreamFuture._asyncComplete(null); | |
| 923 return null; | |
| 924 } | |
| 925 return cancel.whenComplete(() { addStreamFuture._asyncComplete(null); }); | |
| 926 } | |
| 927 | |
| 928 void complete() { | |
| 929 addStreamFuture._asyncComplete(null); | |
| 930 } | |
| 931 } | |
| 932 | |
| 933 class _StreamControllerAddStreamState<T> extends _AddStreamState<T> { | |
| 934 // The subscription or pending data of a _StreamController. | |
| 935 // Stored here because we reuse the `_varData` field in the _StreamController | |
| 936 // to store this state object. | |
| 937 var varData; | |
| 938 | |
| 939 _StreamControllerAddStreamState(_StreamController<T> controller, | |
| 940 this.varData, | |
| 941 Stream source, | |
| 942 bool cancelOnError) | |
| 943 : super(controller, source, cancelOnError) { | |
| 944 if (controller.isPaused) { | |
| 945 addSubscription.pause(); | |
| 946 } | |
| 947 } | |
| 948 } | |
| OLD | NEW |