| OLD | NEW |
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
| 8 // Controller for creating and adding events to a stream. | 8 // Controller for creating and adding events to a stream. |
| 9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
| 10 | 10 |
| (...skipping 28 matching lines...) Expand all Loading... |
| 39 * Whether to invoke a callback depends only on the state before and after | 39 * Whether to invoke a callback depends only on the state before and after |
| 40 * a stream action, for example firing an event. If the state changes multiple | 40 * a stream action, for example firing an event. If the state changes multiple |
| 41 * times during the action, and then ends up in the same state as before, no | 41 * times during the action, and then ends up in the same state as before, no |
| 42 * callback is performed. | 42 * callback is performed. |
| 43 * | 43 * |
| 44 * If listeners are added after the stream has completed (sent a "done" event), | 44 * If listeners are added after the stream has completed (sent a "done" event), |
| 45 * the listeners will be sent a "done" event eventually, but they won't affect | 45 * the listeners will be sent a "done" event eventually, but they won't affect |
| 46 * the stream at all, and won't trigger callbacks. From the controller's point | 46 * the stream at all, and won't trigger callbacks. From the controller's point |
| 47 * of view, the stream is completely inert when has completed. | 47 * of view, the stream is completely inert when has completed. |
| 48 */ | 48 */ |
| 49 abstract class StreamController<T> implements EventSink<T> { | 49 abstract class StreamController<T> implements StreamSink<T> { |
| 50 /** The stream that this controller is controlling. */ | 50 /** The stream that this controller is controlling. */ |
| 51 Stream<T> get stream; | 51 Stream<T> get stream; |
| 52 | 52 |
| 53 /** | 53 /** |
| 54 * A controller with a [stream] that supports only one single subscriber. | 54 * A controller with a [stream] that supports only one single subscriber. |
| 55 * | 55 * |
| 56 * If [sync] is true, events may be passed directly to the stream's listener | 56 * If [sync] is true, events may be passed directly to the stream's listener |
| 57 * during an [add], [addError] or [close] call. If [sync] is false, the event | 57 * during an [add], [addError] or [close] call. If [sync] is false, the event |
| 58 * will be passed to the listener at a later time, after the code creating | 58 * will be passed to the listener at a later time, after the code creating |
| 59 * the event has returned. | 59 * the event has returned. |
| 60 * | 60 * |
| 61 * The controller will buffer all incoming events until the subscriber is | 61 * The controller will buffer all incoming events until the subscriber is |
| 62 * registered. | 62 * registered. |
| 63 * | 63 * |
| 64 * The [onPause] function is called when the stream becomes | 64 * The [onPause] function is called when the stream becomes |
| 65 * paused. [onResume] is called when the stream resumed. | 65 * paused. [onResume] is called when the stream resumed. |
| 66 * | 66 * |
| 67 * The [onListen] callback is called when the stream | 67 * The [onListen] callback is called when the stream |
| 68 * receives its listener and [onCancel] when the listener ends | 68 * receives its listener and [onCancel] when the listener ends |
| 69 * its subscription. | 69 * its subscription. |
| 70 * | 70 * |
| 71 * If the stream is canceled before the controller needs new data the | 71 * If the stream is canceled before the controller needs new data the |
| 72 * [onResume] call might not be executed. | 72 * [onResume] call might not be executed. |
| 73 */ | 73 */ |
| 74 factory StreamController({void onListen(), | 74 factory StreamController({void onListen(), |
| 75 void onPause(), | 75 void onPause(), |
| 76 void onResume(), | 76 void onResume(), |
| 77 void onCancel(), | 77 void onCancel(), |
| 78 bool sync: false}) | 78 bool sync: false}) { |
| 79 => sync | 79 if (onListen == null && onPause == null && |
| 80 onResume == null && onCancel == null) { |
| 81 return sync |
| 82 ? new _NoCallbackSyncStreamController<T>() |
| 83 : new _NoCallbackAsyncStreamController<T>(); |
| 84 } |
| 85 return sync |
| 80 ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel) | 86 ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel) |
| 81 : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel); | 87 : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel); |
| 88 } |
| 82 | 89 |
| 83 /** | 90 /** |
| 84 * A controller where [stream] can be listened to more than once. | 91 * A controller where [stream] can be listened to more than once. |
| 85 * | 92 * |
| 86 * The [Stream] returned by [stream] is a broadcast stream. It can be listened | 93 * The [Stream] returned by [stream] is a broadcast stream. It can be listened |
| 87 * to more than once. | 94 * to more than once. |
| 88 * | 95 * |
| 89 * The controller distributes any events to all currently subscribed | 96 * The controller distributes any events to all currently subscribed |
| 90 * listeners. | 97 * listeners. |
| 91 * It is not allowed to call [add], [addError], or [close] before a previous | 98 * It is not allowed to call [add], [addError], or [close] before a previous |
| (...skipping 24 matching lines...) Expand all Loading... |
| 116 */ | 123 */ |
| 117 factory StreamController.broadcast({void onListen(), | 124 factory StreamController.broadcast({void onListen(), |
| 118 void onCancel(), | 125 void onCancel(), |
| 119 bool sync: false}) { | 126 bool sync: false}) { |
| 120 return sync | 127 return sync |
| 121 ? new _SyncBroadcastStreamController<T>(onListen, onCancel) | 128 ? new _SyncBroadcastStreamController<T>(onListen, onCancel) |
| 122 : new _AsyncBroadcastStreamController<T>(onListen, onCancel); | 129 : new _AsyncBroadcastStreamController<T>(onListen, onCancel); |
| 123 } | 130 } |
| 124 | 131 |
| 125 /** | 132 /** |
| 126 * Returns a view of this object that only exposes the [EventSink] interface. | 133 * Returns a view of this object that only exposes the [StreamSink] interface. |
| 127 */ | 134 */ |
| 128 EventSink<T> get sink; | 135 StreamSink<T> get sink; |
| 129 | 136 |
| 130 /** | 137 /** |
| 131 * Whether the stream is closed for adding more events. | 138 * Whether the stream is closed for adding more events. |
| 132 * | 139 * |
| 133 * If true, the "done" event might not have fired yet, but it has been | 140 * If true, the "done" event might not have fired yet, but it has been |
| 134 * scheduled, and it is too late to add more events. | 141 * scheduled, and it is too late to add more events. |
| 135 */ | 142 */ |
| 136 bool get isClosed; | 143 bool get isClosed; |
| 137 | 144 |
| 138 /** | 145 /** |
| (...skipping 16 matching lines...) Expand all Loading... |
| 155 * Send or enqueue an error event. | 162 * Send or enqueue an error event. |
| 156 * | 163 * |
| 157 * Also allows an objection stack trace object, on top of what [EventSink] | 164 * Also allows an objection stack trace object, on top of what [EventSink] |
| 158 * allows. | 165 * allows. |
| 159 */ | 166 */ |
| 160 void addError(Object error, [Object stackTrace]); | 167 void addError(Object error, [Object stackTrace]); |
| 161 } | 168 } |
| 162 | 169 |
| 163 | 170 |
| 164 abstract class _StreamControllerLifecycle<T> { | 171 abstract class _StreamControllerLifecycle<T> { |
| 165 void _recordListen(StreamSubscription<T> subscription) {} | 172 StreamSubscription<T> _subscribe(void onData(T data), |
| 173 void onError(Object error), |
| 174 void onDone(), |
| 175 bool cancelOnError); |
| 166 void _recordPause(StreamSubscription<T> subscription) {} | 176 void _recordPause(StreamSubscription<T> subscription) {} |
| 167 void _recordResume(StreamSubscription<T> subscription) {} | 177 void _recordResume(StreamSubscription<T> subscription) {} |
| 168 void _recordCancel(StreamSubscription<T> subscription) {} | 178 void _recordCancel(StreamSubscription<T> subscription) {} |
| 169 } | 179 } |
| 170 | 180 |
| 171 /** | 181 /** |
| 172 * Default implementation of [StreamController]. | 182 * Default implementation of [StreamController]. |
| 173 * | 183 * |
| 174 * Controls a stream that only supports a single controller. | 184 * Controls a stream that only supports a single controller. |
| 175 */ | 185 */ |
| 176 abstract class _StreamController<T> implements StreamController<T>, | 186 abstract class _StreamController<T> implements StreamController<T>, |
| 177 _StreamControllerLifecycle<T>, | 187 _StreamControllerLifecycle<T>, |
| 188 _EventSink<T>, |
| 178 _EventDispatch<T> { | 189 _EventDispatch<T> { |
| 179 static const int _STATE_OPEN = 0; | 190 // The states are bit-flags. More than one can be set at a time. |
| 180 static const int _STATE_CANCELLED = 1; | 191 // |
| 181 static const int _STATE_CLOSED = 2; | 192 // The "subscription state" goes through the states: |
| 182 | 193 // initial -> subscribed -> canceled. |
| 183 final _NotificationHandler _onListen; | 194 // These are mutually exclusive. |
| 184 final _NotificationHandler _onPause; | 195 // The "closed" state records whether the [close] method has been called |
| 185 final _NotificationHandler _onResume; | 196 // on the controller. This can be done at any time. If done before |
| 186 final _NotificationHandler _onCancel; | 197 // subscription, the done event is queued. If done after cancel, the done |
| 187 _StreamImpl<T> _stream; | 198 // event is ignored (just as any other event after a cancel). |
| 188 | 199 |
| 189 // An active subscription on the stream, or null if no subscripton is active. | 200 /** The controller is in its initial state with no subscription. */ |
| 190 _ControllerSubscription<T> _subscription; | 201 static const int _STATE_INITIAL = 0; |
| 191 | 202 /** The controller has a subscription, but hasn't been closed or canceled. */ |
| 192 // Whether we have sent a "done" event. | 203 static const int _STATE_SUBSCRIBED = 1; |
| 193 int _state = _STATE_OPEN; | 204 /** The subscription is canceled. */ |
| 194 | 205 static const int _STATE_CANCELED = 2; |
| 195 // Events added to the stream before it has an active subscription. | 206 /** Mask for the subscription state. */ |
| 196 _PendingEvents _pendingEvents = null; | 207 static const int _STATE_SUBSCRIPTION_MASK = 3; |
| 197 | 208 |
| 198 _StreamController(this._onListen, | 209 // The following state relate to the controller, not the subscription. |
| 199 this._onPause, | 210 // If closed, adding more events is not allowed. |
| 200 this._onResume, | 211 // If executing an [addStream], new events are not allowed either, but will |
| 201 this._onCancel) { | 212 // be added by the stream. |
| 202 _stream = new _ControllerStream<T>(this); | 213 |
| 203 } | 214 /** |
| 204 | 215 * The controller is closed due to calling [close]. |
| 205 Stream<T> get stream => _stream; | 216 * |
| 206 | 217 * When the stream is closed, you can neither add new events nor add new |
| 207 /** | 218 * listeners. |
| 208 * Returns a view of this object that only exposes the [EventSink] interface. | 219 */ |
| 209 */ | 220 static const int _STATE_CLOSED = 4; |
| 210 EventSink<T> get sink => new _EventSinkView<T>(this); | 221 /** |
| 211 | 222 * The controller is in the middle of an [addStream] operation. |
| 212 /** | 223 * |
| 213 * Whether a listener has existed and been cancelled. | 224 * While adding events from a stream, no new events can be added directly |
| 225 * on the controller. |
| 226 */ |
| 227 static const int _STATE_ADDSTREAM = 8; |
| 228 |
| 229 /** |
| 230 * Field containing different data depending on the current subscription |
| 231 * state. |
| 232 * |
| 233 * If [_state] is [_STATE_INITIAL], the field may contain a [_PendingEvents] |
| 234 * for events added to the controller before a subscription. |
| 235 * |
| 236 * While [_state] is [_STATE_SUBSCRIBED], the field contains the subscription. |
| 237 * |
| 238 * When [_state] is [_STATE_CANCELED] the field is currently not used. |
| 239 */ |
| 240 var _varData; |
| 241 |
| 242 /** Current state of the controller. */ |
| 243 int _state = _STATE_INITIAL; |
| 244 |
| 245 /** |
| 246 * Future completed when the stream sends its last event. |
| 247 * |
| 248 * This is also the future returned by [close]. |
| 249 */ |
| 250 // TODO(lrn): Could this be stored in the varData field too, if it's not |
| 251 // accessed until the call to "close"? Then we need to special case if it's |
| 252 // accessed earlier, or if close is called before subscribing. |
| 253 _FutureImpl _doneFuture; |
| 254 |
| 255 _StreamController(); |
| 256 |
| 257 _NotificationHandler get _onListen; |
| 258 _NotificationHandler get _onPause; |
| 259 _NotificationHandler get _onResume; |
| 260 _NotificationHandler get _onCancel; |
| 261 |
| 262 // Return a new stream every time. The streams are equal, but not identical. |
| 263 Stream<T> get stream => new _ControllerStream(this); |
| 264 |
| 265 /** |
| 266 * Returns a view of this object that only exposes the [StreamSink] interface. |
| 267 */ |
| 268 StreamSink<T> get sink => new _StreamSinkWrapper<T>(this); |
| 269 |
| 270 /** |
| 271 * Whether a listener has existed and been canceled. |
| 214 * | 272 * |
| 215 * After this, adding more events will be ignored. | 273 * After this, adding more events will be ignored. |
| 216 */ | 274 */ |
| 217 bool get _isCancelled => (_state & _STATE_CANCELLED) != 0; | 275 bool get _isCanceled => (_state & _STATE_CANCELED) != 0; |
| 276 |
| 277 /** Whether there is an active listener. */ |
| 278 bool get hasListener => (_state & _STATE_SUBSCRIBED) != 0; |
| 279 |
| 280 /** Whether there has not been a listener yet. */ |
| 281 bool get _isInitialState => |
| 282 (_state & _STATE_SUBSCRIPTION_MASK) == _STATE_INITIAL; |
| 218 | 283 |
| 219 bool get isClosed => (_state & _STATE_CLOSED) != 0; | 284 bool get isClosed => (_state & _STATE_CLOSED) != 0; |
| 220 | 285 |
| 221 bool get isPaused => hasListener ? _subscription._isInputPaused | 286 bool get isPaused => hasListener ? _subscription._isInputPaused |
| 222 : !_isCancelled; | 287 : !_isCanceled; |
| 223 | 288 |
| 224 bool get hasListener => _subscription != null; | 289 bool get _isAddingStream => (_state & _STATE_ADDSTREAM) != 0; |
| 225 | 290 |
| 226 /** | 291 /** New events may not be added after close, or during addStream. */ |
| 227 * Send or queue a data event. | 292 bool get _mayAddEvent => (_state < _STATE_CLOSED); |
| 293 |
| 294 // Returns the pending events. |
| 295 // Pending events are events added before a subscription exists. |
| 296 // They are added to the subscription when it is created. |
| 297 // Pending events, if any, are kept in the _varData field until the |
| 298 // stream is listened to. |
| 299 // While adding a stream, pending events are moved into the |
| 300 // state object to allow the state object to use the _varData field. |
| 301 _PendingEvents get _pendingEvents { |
| 302 assert(_isInitialState); |
| 303 if (!_isAddingStream) { |
| 304 return _varData; |
| 305 } |
| 306 _StreamControllerAddStreamState state = _varData; |
| 307 return state.varData; |
| 308 } |
| 309 |
| 310 // Returns the pending events, and creates the object if necessary. |
| 311 _StreamImplEvents _ensurePendingEvents() { |
| 312 assert(_isInitialState); |
| 313 if (!_isAddingStream) { |
| 314 if (_varData == null) _varData = new _StreamImplEvents(); |
| 315 return _varData; |
| 316 } |
| 317 _StreamControllerAddStreamState state = _varData; |
| 318 if (state.varData == null) state.varData = new _StreamImplEvents(); |
| 319 return state.varData; |
| 320 } |
| 321 |
| 322 // Get the current subscription. |
| 323 // If we are adding a stream, the subscription is moved into the state |
| 324 // object to allow the state object to use the _varData field. |
| 325 _ControllerSubscription get _subscription { |
| 326 assert(hasListener); |
| 327 if (_isAddingStream) { |
| 328 _StreamControllerAddStreamState addState = _varData; |
| 329 return addState.varData; |
| 330 } |
| 331 return _varData; |
| 332 } |
| 333 |
| 334 /** |
| 335 * Creates an error describing why an event cannot be added. |
| 336 * |
| 337 * The reason, and therefore the error message, depends on the current state. |
| 338 */ |
| 339 Error _badEventState() { |
| 340 if (isClosed) { |
| 341 return new StateError("Cannot add event after closing"); |
| 342 } |
| 343 assert(_isAddingStream); |
| 344 return new StateError("Cannot add event while adding a stream"); |
| 345 } |
| 346 |
| 347 // StreamSink interface. |
| 348 Future addStream(Stream<T> source) { |
| 349 if (!_mayAddEvent) throw _badEventState(); |
| 350 if (_isCanceled) return new _FutureImpl.immediate(null); |
| 351 _StreamControllerAddStreamState addState = |
| 352 new _StreamControllerAddStreamState(this, _varData, source); |
| 353 _varData = addState; |
| 354 _state |= _STATE_ADDSTREAM; |
| 355 return addState.addStreamFuture; |
| 356 } |
| 357 |
| 358 Future get done => _ensureDoneFuture(); |
| 359 |
| 360 Future _ensureDoneFuture() { |
| 361 if (_doneFuture == null) { |
| 362 _doneFuture = new _FutureImpl(); |
| 363 if (_isCanceled) _doneFuture._setValue(null); |
| 364 } |
| 365 return _doneFuture; |
| 366 } |
| 367 |
| 368 /** |
| 369 * Send or enqueue a data event. |
| 228 */ | 370 */ |
| 229 void add(T value) { | 371 void add(T value) { |
| 230 if (isClosed) throw new StateError("Adding event after close"); | 372 if (!_mayAddEvent) throw _badEventState(); |
| 231 if (_subscription != null) { | 373 _add(value); |
| 232 _sendData(value); | |
| 233 } else if (!_isCancelled) { | |
| 234 _addPendingEvent(new _DelayedData<T>(value)); | |
| 235 } | |
| 236 } | 374 } |
| 237 | 375 |
| 238 /** | 376 /** |
| 239 * Send or enqueue an error event. | 377 * Send or enqueue an error event. |
| 240 */ | 378 */ |
| 241 void addError(Object error, [Object stackTrace]) { | 379 void addError(Object error, [Object stackTrace]) { |
| 242 if (isClosed) throw new StateError("Adding event after close"); | 380 if (!_mayAddEvent) throw _badEventState(); |
| 243 if (stackTrace != null) { | 381 if (stackTrace != null) { |
| 244 // Force stack trace overwrite. Even if the error already contained | 382 // Force stack trace overwrite. Even if the error already contained |
| 245 // a stack trace. | 383 // a stack trace. |
| 246 _attachStackTrace(error, stackTrace); | 384 _attachStackTrace(error, stackTrace); |
| 247 } | 385 } |
| 248 if (_subscription != null) { | 386 _addError(error); |
| 249 _sendError(error); | 387 } |
| 250 } else if (!_isCancelled) { | 388 |
| 251 _addPendingEvent(new _DelayedError(error)); | 389 /** |
| 252 } | |
| 253 } | |
| 254 | |
| 255 /** | |
| 256 * Closes this controller. | 390 * Closes this controller. |
| 257 * | 391 * |
| 258 * After closing, no further events may be added using [add] or [addError]. | 392 * After closing, no further events may be added using [add] or [addError]. |
| 259 * | 393 * |
| 260 * You are allowed to close the controller more than once, but only the first | 394 * You are allowed to close the controller more than once, but only the first |
| 261 * call has any effect. | 395 * call has any effect. |
| 262 * | 396 * |
| 263 * The first time a controller is closed, a "done" event is sent to its | 397 * The first time a controller is closed, a "done" event is sent to its |
| 264 * stream. | 398 * stream. |
| 265 */ | 399 */ |
| 266 void close() { | 400 Future close() { |
| 267 if (isClosed) return; | 401 if (isClosed) { |
| 402 assert(_doneFuture != null); // Was set when close was first called. |
| 403 return _doneFuture; |
| 404 } |
| 405 if (!_mayAddEvent) throw _badEventState(); |
| 268 _state |= _STATE_CLOSED; | 406 _state |= _STATE_CLOSED; |
| 269 if (_subscription != null) { | 407 _ensureDoneFuture(); |
| 408 if (hasListener) { |
| 270 _sendDone(); | 409 _sendDone(); |
| 271 } else if (!_isCancelled) { | 410 } else if (_isInitialState) { |
| 272 _addPendingEvent(const _DelayedDone()); | 411 _ensurePendingEvents().add(const _DelayedDone()); |
| 412 } |
| 413 return _doneFuture; |
| 414 } |
| 415 |
| 416 // EventSink interface. Used by the [addStream] events. |
| 417 |
| 418 // Add data event, used both by the [addStream] events and by [add]. |
| 419 void _add(T value) { |
| 420 if (hasListener) { |
| 421 _sendData(value); |
| 422 } else if (_isInitialState) { |
| 423 _ensurePendingEvents().add(new _DelayedData<T>(value)); |
| 273 } | 424 } |
| 274 } | 425 } |
| 275 | 426 |
| 276 // EventDispatch interface | 427 void _addError(Object error) { |
| 277 | 428 if (hasListener) { |
| 278 void _addPendingEvent(_DelayedEvent event) { | 429 _sendError(error); |
| 279 if (_isCancelled) return; | 430 } else if (_isInitialState) { |
| 280 _StreamImplEvents events = _pendingEvents; | 431 _ensurePendingEvents().add(new _DelayedError(error)); |
| 281 if (events == null) { | |
| 282 events = _pendingEvents = new _StreamImplEvents(); | |
| 283 } | 432 } |
| 284 events.add(event); | |
| 285 } | 433 } |
| 286 | 434 |
| 287 void _recordListen(_BufferingStreamSubscription<T> subscription) { | 435 void _close() { |
| 288 assert(_subscription == null); | 436 // End of addStream stream. |
| 289 _subscription = subscription; | 437 assert(_isAddingStream); |
| 290 subscription._setPendingEvents(_pendingEvents); | 438 _StreamControllerAddStreamState addState = _varData; |
| 291 _pendingEvents = null; | 439 _varData = addState.varData; |
| 440 _state &= ~_STATE_ADDSTREAM; |
| 441 addState.complete(); |
| 442 } |
| 443 |
| 444 // _StreamControllerLifeCycle interface |
| 445 |
| 446 StreamSubscription<T> _subscribe(void onData(T data), |
| 447 void onError(Object error), |
| 448 void onDone(), |
| 449 bool cancelOnError) { |
| 450 if (!_isInitialState) { |
| 451 throw new StateError("Stream has already been listened to."); |
| 452 } |
| 453 _ControllerSubscription subscription = new _ControllerSubscription( |
| 454 this, onData, onError, onDone, cancelOnError); |
| 455 |
| 456 _PendingEvents pendingEvents = _pendingEvents; |
| 457 _state |= _STATE_SUBSCRIBED; |
| 458 if (_isAddingStream) { |
| 459 _StreamControllerAddStreamState addState = _varData; |
| 460 addState.varData = subscription; |
| 461 } else { |
| 462 _varData = subscription; |
| 463 } |
| 464 subscription._setPendingEvents(pendingEvents); |
| 292 subscription._guardCallback(() { | 465 subscription._guardCallback(() { |
| 293 _runGuarded(_onListen); | 466 _runGuarded(_onListen); |
| 294 }); | 467 }); |
| 468 |
| 469 return subscription; |
| 295 } | 470 } |
| 296 | 471 |
| 297 void _recordCancel(StreamSubscription<T> subscription) { | 472 void _recordCancel(StreamSubscription<T> subscription) { |
| 298 assert(identical(_subscription, subscription)); | 473 if (_isAddingStream) { |
| 299 _subscription = null; | 474 _StreamControllerAddStreamState addState = _varData; |
| 300 _state |= _STATE_CANCELLED; | 475 addState.cancel(); |
| 476 } |
| 477 _varData = null; |
| 478 _state = |
| 479 (_state & ~(_STATE_SUBSCRIBED | _STATE_ADDSTREAM)) | _STATE_CANCELED; |
| 301 _runGuarded(_onCancel); | 480 _runGuarded(_onCancel); |
| 481 if (_doneFuture != null && _doneFuture._mayComplete) { |
| 482 _doneFuture._asyncSetValue(null); |
| 483 } |
| 302 } | 484 } |
| 303 | 485 |
| 304 void _recordPause(StreamSubscription<T> subscription) { | 486 void _recordPause(StreamSubscription<T> subscription) { |
| 487 if (_isAddingStream) { |
| 488 _StreamControllerAddStreamState addState = _varData; |
| 489 addState.pause(); |
| 490 } |
| 305 _runGuarded(_onPause); | 491 _runGuarded(_onPause); |
| 306 } | 492 } |
| 307 | 493 |
| 308 void _recordResume(StreamSubscription<T> subscription) { | 494 void _recordResume(StreamSubscription<T> subscription) { |
| 495 if (_isAddingStream) { |
| 496 _StreamControllerAddStreamState addState = _varData; |
| 497 addState.resume(); |
| 498 } |
| 309 _runGuarded(_onResume); | 499 _runGuarded(_onResume); |
| 310 } | 500 } |
| 311 } | 501 } |
| 312 | 502 |
| 313 class _SyncStreamController<T> extends _StreamController<T> { | 503 abstract class _SyncStreamControllerDispatch<T> |
| 314 _SyncStreamController(void onListen(), | 504 implements _StreamController<T> { |
| 315 void onPause(), | |
| 316 void onResume(), | |
| 317 void onCancel()) | |
| 318 : super(onListen, onPause, onResume, onCancel); | |
| 319 | |
| 320 void _sendData(T data) { | 505 void _sendData(T data) { |
| 321 _subscription._add(data); | 506 _subscription._add(data); |
| 322 } | 507 } |
| 323 | 508 |
| 324 void _sendError(Object error) { | 509 void _sendError(Object error) { |
| 325 _subscription._addError(error); | 510 _subscription._addError(error); |
| 326 } | 511 } |
| 327 | 512 |
| 328 void _sendDone() { | 513 void _sendDone() { |
| 329 _subscription._close(); | 514 _subscription._close(); |
| 330 } | 515 } |
| 331 } | 516 } |
| 332 | 517 |
| 333 class _AsyncStreamController<T> extends _StreamController<T> { | 518 abstract class _AsyncStreamControllerDispatch<T> |
| 334 _AsyncStreamController(void onListen(), | 519 implements _StreamController<T> { |
| 335 void onPause(), | |
| 336 void onResume(), | |
| 337 void onCancel()) | |
| 338 : super(onListen, onPause, onResume, onCancel); | |
| 339 | |
| 340 void _sendData(T data) { | 520 void _sendData(T data) { |
| 341 _subscription._addPending(new _DelayedData(data)); | 521 _subscription._addPending(new _DelayedData(data)); |
| 342 } | 522 } |
| 343 | 523 |
| 344 void _sendError(Object error) { | 524 void _sendError(Object error) { |
| 345 _subscription._addPending(new _DelayedError(error)); | 525 _subscription._addPending(new _DelayedError(error)); |
| 346 } | 526 } |
| 347 | 527 |
| 348 void _sendDone() { | 528 void _sendDone() { |
| 349 _subscription._addPending(const _DelayedDone()); | 529 _subscription._addPending(const _DelayedDone()); |
| 350 } | 530 } |
| 351 } | 531 } |
| 352 | 532 |
| 533 // TODO(lrn): Use common superclass for callback-controllers when VM supports |
| 534 // constructors in mixin superclasses. |
| 535 |
| 536 class _AsyncStreamController<T> extends _StreamController<T> |
| 537 with _AsyncStreamControllerDispatch<T> { |
| 538 final _NotificationHandler _onListen; |
| 539 final _NotificationHandler _onPause; |
| 540 final _NotificationHandler _onResume; |
| 541 final _NotificationHandler _onCancel; |
| 542 |
| 543 _AsyncStreamController(void this._onListen(), |
| 544 void this._onPause(), |
| 545 void this._onResume(), |
| 546 void this._onCancel()); |
| 547 } |
| 548 |
| 549 class _SyncStreamController<T> extends _StreamController<T> |
| 550 with _SyncStreamControllerDispatch<T> { |
| 551 final _NotificationHandler _onListen; |
| 552 final _NotificationHandler _onPause; |
| 553 final _NotificationHandler _onResume; |
| 554 final _NotificationHandler _onCancel; |
| 555 |
| 556 _SyncStreamController(void this._onListen(), |
| 557 void this._onPause(), |
| 558 void this._onResume(), |
| 559 void this._onCancel()); |
| 560 } |
| 561 |
| 562 abstract class _NoCallbacks { |
| 563 _NotificationHandler get _onListen => null; |
| 564 _NotificationHandler get _onPause => null; |
| 565 _NotificationHandler get _onResume => null; |
| 566 _NotificationHandler get _onCancel => null; |
| 567 } |
| 568 |
| 569 typedef _NoCallbackAsyncStreamController<T> = _StreamController<T> |
| 570 with _AsyncStreamControllerDispatch/*<T>*/, _NoCallbacks; |
| 571 |
| 572 typedef _NoCallbackSyncStreamController<T> = _StreamController<T> |
| 573 with _SyncStreamControllerDispatch/*<T>*/, _NoCallbacks; |
| 574 |
| 353 typedef void _NotificationHandler(); | 575 typedef void _NotificationHandler(); |
| 354 | 576 |
| 355 void _runGuarded(_NotificationHandler notificationHandler) { | 577 void _runGuarded(_NotificationHandler notificationHandler) { |
| 356 if (notificationHandler == null) return; | 578 if (notificationHandler == null) return; |
| 357 try { | 579 try { |
| 358 notificationHandler(); | 580 notificationHandler(); |
| 359 } catch (e, s) { | 581 } catch (e, s) { |
| 360 _Zone.current.handleUncaughtError(_asyncError(e, s)); | 582 _Zone.current.handleUncaughtError(_asyncError(e, s)); |
| 361 } | 583 } |
| 362 } | 584 } |
| 363 | 585 |
| 364 class _ControllerStream<T> extends _StreamImpl<T> { | 586 class _ControllerStream<T> extends _StreamImpl<T> { |
| 365 _StreamControllerLifecycle<T> _controller; | 587 _StreamControllerLifecycle<T> _controller; |
| 366 bool _hasListener = false; | |
| 367 | 588 |
| 368 _ControllerStream(this._controller); | 589 _ControllerStream(this._controller); |
| 369 | 590 |
| 370 StreamSubscription<T> _createSubscription( | 591 StreamSubscription<T> _createSubscription( |
| 371 void onData(T data), | 592 void onData(T data), |
| 372 void onError(Object error), | 593 void onError(Object error), |
| 373 void onDone(), | 594 void onDone(), |
| 374 bool cancelOnError) { | 595 bool cancelOnError) => |
| 375 if (_hasListener) { | 596 _controller._subscribe(onData, onError, onDone, cancelOnError); |
| 376 throw new StateError("The stream has already been listened to."); | |
| 377 } | |
| 378 _hasListener = true; | |
| 379 return new _ControllerSubscription<T>( | |
| 380 _controller, onData, onError, onDone, cancelOnError); | |
| 381 } | |
| 382 | 597 |
| 383 void _onListen(_BufferingStreamSubscription subscription) { | 598 // Override == and hashCode so that new streams returned by the same |
| 384 _controller._recordListen(subscription); | 599 // controller are considered equal. The controller returns a new stream |
| 600 // each time it's queried, but doesn't have to cache the result. |
| 601 |
| 602 int get hashCode => _controller.hashCode ^ 0x35323532; |
| 603 |
| 604 bool operator==(Object other) { |
| 605 if (other is! _ControllerStream) return false; |
| 606 _ControllerStream otherStream = other; |
| 607 return identical(otherStream._controller, this); |
| 385 } | 608 } |
| 386 } | 609 } |
| 387 | 610 |
| 388 class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> { | 611 class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> { |
| 389 final _StreamControllerLifecycle<T> _controller; | 612 final _StreamControllerLifecycle<T> _controller; |
| 390 | 613 |
| 391 _ControllerSubscription(this._controller, | 614 _ControllerSubscription(this._controller, |
| 392 void onData(T data), | 615 void onData(T data), |
| 393 void onError(Object error), | 616 void onError(Object error), |
| 394 void onDone(), | 617 void onDone(), |
| 395 bool cancelOnError) | 618 bool cancelOnError) |
| 396 : super(onData, onError, onDone, cancelOnError); | 619 : super(onData, onError, onDone, cancelOnError); |
| 397 | 620 |
| 398 void _onCancel() { | 621 void _onCancel() { |
| 399 _controller._recordCancel(this); | 622 _controller._recordCancel(this); |
| 400 } | 623 } |
| 401 | 624 |
| 402 void _onPause() { | 625 void _onPause() { |
| 403 _controller._recordPause(this); | 626 _controller._recordPause(this); |
| 404 } | 627 } |
| 405 | 628 |
| 406 void _onResume() { | 629 void _onResume() { |
| 407 _controller._recordResume(this); | 630 _controller._recordResume(this); |
| 408 } | 631 } |
| 409 } | 632 } |
| 410 | 633 |
| 411 class _BroadcastStream<T> extends _StreamImpl<T> { | |
| 412 _BroadcastStreamController _controller; | |
| 413 | 634 |
| 414 _BroadcastStream(this._controller); | 635 /** A class that exposes only the [StreamSink] interface of an object. */ |
| 636 class _StreamSinkWrapper<T> implements StreamSink<T> { |
| 637 final StreamSink _target; |
| 638 _StreamSinkWrapper(this._target); |
| 639 void add(T data) { _target.add(data); } |
| 640 void addError(Object error) { _target.addError(error); } |
| 641 Future close() => _target.close(); |
| 642 Future addStream(Stream<T> source) => _target.addStream(source); |
| 643 Future get done => _target.done; |
| 644 } |
| 415 | 645 |
| 416 bool get isBroadcast => true; | 646 /** |
| 647 * Object containing the state used to handle [StreamController.addStream]. |
| 648 */ |
| 649 class _AddStreamState<T> { |
| 650 // [_FutureImpl] returned by call to addStream. |
| 651 _FutureImpl addStreamFuture; |
| 417 | 652 |
| 418 StreamSubscription<T> _createSubscription( | 653 // Subscription on stream argument to addStream. |
| 419 void onData(T data), | 654 StreamSubscription addSubscription; |
| 420 void onError(Object error), | 655 |
| 421 void onDone(), | 656 _AddStreamState(StreamSink controller, Stream source) |
| 422 bool cancelOnError) { | 657 : addStreamFuture = new _FutureImpl(), |
| 423 return new _BroadcastSubscription<T>( | 658 addSubscription = source.listen(controller._add, |
| 424 _controller, onData, onError, onDone, cancelOnError); | 659 onError: controller._addError, |
| 660 onDone: controller._close, |
| 661 cancelOnError: true); |
| 662 |
| 663 void pause() { |
| 664 addSubscription.pause(); |
| 425 } | 665 } |
| 426 | 666 |
| 427 void _onListen(_BufferingStreamSubscription subscription) { | 667 void resume() { |
| 428 _controller._recordListen(subscription); | 668 addSubscription.resume(); |
| 669 } |
| 670 |
| 671 void cancel() { |
| 672 addSubscription.cancel(); |
| 673 complete(); |
| 674 } |
| 675 |
| 676 void complete() { |
| 677 addStreamFuture._asyncSetValue(null); |
| 429 } | 678 } |
| 430 } | 679 } |
| 431 | 680 |
| 432 abstract class _BroadcastSubscriptionLink { | 681 class _StreamControllerAddStreamState<T> extends _AddStreamState<T> { |
| 433 _BroadcastSubscriptionLink _next; | 682 // The subscription or pending data of a _StreamController. |
| 434 _BroadcastSubscriptionLink _previous; | 683 // Stored here because we reuse the `_varData` field in the _StreamController |
| 435 } | 684 // to store this state object. |
| 685 var varData; |
| 436 | 686 |
| 437 class _BroadcastSubscription<T> extends _ControllerSubscription<T> | 687 _StreamControllerAddStreamState(_StreamController controller, |
| 438 implements _BroadcastSubscriptionLink { | 688 this.varData, |
| 439 static const int _STATE_EVENT_ID = 1; | 689 Stream source) : super(controller, source) { |
| 440 static const int _STATE_FIRING = 2; | 690 if (controller.isPaused) { |
| 441 static const int _STATE_REMOVE_AFTER_FIRING = 4; | 691 addSubscription.pause(); |
| 442 int _eventState; | |
| 443 | |
| 444 _BroadcastSubscriptionLink _next; | |
| 445 _BroadcastSubscriptionLink _previous; | |
| 446 | |
| 447 _BroadcastSubscription(_StreamControllerLifecycle controller, | |
| 448 void onData(T data), | |
| 449 void onError(Object error), | |
| 450 void onDone(), | |
| 451 bool cancelOnError) | |
| 452 : super(controller, onData, onError, onDone, cancelOnError) { | |
| 453 _next = _previous = this; | |
| 454 } | |
| 455 | |
| 456 _BroadcastStreamController get _controller => super._controller; | |
| 457 | |
| 458 bool _expectsEvent(int eventId) { | |
| 459 return (_eventState & _STATE_EVENT_ID) == eventId; | |
| 460 } | |
| 461 | |
| 462 void _toggleEventId() { | |
| 463 _eventState ^= _STATE_EVENT_ID; | |
| 464 } | |
| 465 | |
| 466 bool get _isFiring => (_eventState & _STATE_FIRING) != 0; | |
| 467 | |
| 468 bool _setRemoveAfterFiring() { | |
| 469 assert(_isFiring); | |
| 470 _eventState |= _STATE_REMOVE_AFTER_FIRING; | |
| 471 } | |
| 472 | |
| 473 bool get _removeAfterFiring => | |
| 474 (_eventState & _STATE_REMOVE_AFTER_FIRING) != 0; | |
| 475 } | |
| 476 | |
| 477 | |
| 478 abstract class _BroadcastStreamController<T> | |
| 479 implements StreamController<T>, | |
| 480 _StreamControllerLifecycle<T>, | |
| 481 _BroadcastSubscriptionLink, | |
| 482 _EventDispatch<T> { | |
| 483 static const int _STATE_INITIAL = 0; | |
| 484 static const int _STATE_EVENT_ID = 1; | |
| 485 static const int _STATE_FIRING = 2; | |
| 486 static const int _STATE_CLOSED = 4; | |
| 487 | |
| 488 final _NotificationHandler _onListen; | |
| 489 final _NotificationHandler _onCancel; | |
| 490 | |
| 491 // State of the controller. | |
| 492 int _state; | |
| 493 | |
| 494 // Double-linked list of active listeners. | |
| 495 _BroadcastSubscriptionLink _next; | |
| 496 _BroadcastSubscriptionLink _previous; | |
| 497 | |
| 498 _BroadcastStreamController(this._onListen, this._onCancel) | |
| 499 : _state = _STATE_INITIAL { | |
| 500 _next = _previous = this; | |
| 501 } | |
| 502 | |
| 503 // StreamController interface. | |
| 504 | |
| 505 Stream<T> get stream => new _BroadcastStream<T>(this); | |
| 506 | |
| 507 EventSink<T> get sink => new _EventSinkView<T>(this); | |
| 508 | |
| 509 bool get isClosed => (_state & _STATE_CLOSED) != 0; | |
| 510 | |
| 511 /** | |
| 512 * A broadcast controller is never paused. | |
| 513 * | |
| 514 * Each receiving stream may be paused individually, and they handle their | |
| 515 * own buffering. | |
| 516 */ | |
| 517 bool get isPaused => false; | |
| 518 | |
| 519 /** Whether there are currently a subscriber on the [Stream]. */ | |
| 520 bool get hasListener => !_isEmpty; | |
| 521 | |
| 522 /** Whether an event is being fired (sent to some, but not all, listeners). */ | |
| 523 bool get _isFiring => (_state & _STATE_FIRING) != 0; | |
| 524 | |
| 525 // Linked list helpers | |
| 526 | |
| 527 bool get _isEmpty => identical(_next, this); | |
| 528 | |
| 529 /** Adds subscription to linked list of active listeners. */ | |
| 530 void _addListener(_BroadcastSubscription<T> subscription) { | |
| 531 _BroadcastSubscriptionLink previous = _previous; | |
| 532 previous._next = subscription; | |
| 533 _previous = subscription._previous; | |
| 534 subscription._previous._next = this; | |
| 535 subscription._previous = previous; | |
| 536 subscription._eventState = (_state & _STATE_EVENT_ID); | |
| 537 } | |
| 538 | |
| 539 void _removeListener(_BroadcastSubscription<T> subscription) { | |
| 540 assert(identical(subscription._controller, this)); | |
| 541 assert(!identical(subscription._next, subscription)); | |
| 542 subscription._previous._next = subscription._next; | |
| 543 subscription._next._previous = subscription._previous; | |
| 544 subscription._next = subscription._previous = subscription; | |
| 545 } | |
| 546 | |
| 547 // _StreamControllerLifecycle interface. | |
| 548 | |
| 549 void _recordListen(_BroadcastSubscription<T> subscription) { | |
| 550 _addListener(subscription); | |
| 551 if (identical(_next, _previous)) { | |
| 552 // Only one listener, so it must be the first listener. | |
| 553 _runGuarded(_onListen); | |
| 554 } | |
| 555 } | |
| 556 | |
| 557 void _recordCancel(_BroadcastSubscription<T> subscription) { | |
| 558 if (subscription._isFiring) { | |
| 559 subscription._setRemoveAfterFiring(); | |
| 560 } else { | |
| 561 _removeListener(subscription); | |
| 562 // If we are currently firing an event, the empty-check is performed at | |
| 563 // the end of the listener loop instead of here. | |
| 564 if ((_state & _STATE_FIRING) == 0 && _isEmpty) { | |
| 565 _callOnCancel(); | |
| 566 } | |
| 567 } | |
| 568 } | |
| 569 | |
| 570 void _recordPause(StreamSubscription<T> subscription) {} | |
| 571 void _recordResume(StreamSubscription<T> subscription) {} | |
| 572 | |
| 573 // EventSink interface. | |
| 574 | |
| 575 void add(T data) { | |
| 576 if (isClosed) { | |
| 577 throw new StateError("Cannot add new events after calling close()"); | |
| 578 } | |
| 579 _sendData(data); | |
| 580 } | |
| 581 | |
| 582 void addError(Object error, [Object stackTrace]) { | |
| 583 if (isClosed) { | |
| 584 throw new StateError("Cannot add new events after calling close()"); | |
| 585 } | |
| 586 if (stackTrace != null) _attachStackTrace(error, stackTrace); | |
| 587 _sendError(error); | |
| 588 } | |
| 589 | |
| 590 void close() { | |
| 591 if (isClosed) { | |
| 592 throw new StateError("Cannot add new events after calling close()"); | |
| 593 } | |
| 594 _state |= _STATE_CLOSED; | |
| 595 _sendDone(); | |
| 596 } | |
| 597 | |
| 598 void _forEachListener( | |
| 599 void action(_BufferingStreamSubscription<T> subscription)) { | |
| 600 if (_isFiring) { | |
| 601 throw new StateError( | |
| 602 "Cannot fire new event. Controller is already firing an event"); | |
| 603 } | |
| 604 if (_isEmpty) return; | |
| 605 | |
| 606 // Get event id of this event. | |
| 607 int id = (_state & _STATE_EVENT_ID); | |
| 608 // Start firing (set the _STATE_FIRING bit). We don't do [_onCancel] | |
| 609 // callbacks while firing, and we prevent reentrancy of this function. | |
| 610 // | |
| 611 // Set [_state]'s event id to the next event's id. | |
| 612 // Any listeners added while firing this event will expect the next event, | |
| 613 // not this one, and won't get notified. | |
| 614 _state ^= _STATE_EVENT_ID | _STATE_FIRING; | |
| 615 _BroadcastSubscriptionLink link = _next; | |
| 616 while (!identical(link, this)) { | |
| 617 _BroadcastSubscription<T> subscription = link; | |
| 618 if (subscription._expectsEvent(id)) { | |
| 619 subscription._eventState |= _BroadcastSubscription._STATE_FIRING; | |
| 620 action(subscription); | |
| 621 subscription._toggleEventId(); | |
| 622 link = subscription._next; | |
| 623 if (subscription._removeAfterFiring) { | |
| 624 _removeListener(subscription); | |
| 625 } | |
| 626 subscription._eventState &= ~_BroadcastSubscription._STATE_FIRING; | |
| 627 } else { | |
| 628 link = subscription._next; | |
| 629 } | |
| 630 } | |
| 631 _state &= ~_STATE_FIRING; | |
| 632 | |
| 633 if (_isEmpty) { | |
| 634 _callOnCancel(); | |
| 635 } | |
| 636 } | |
| 637 | |
| 638 void _callOnCancel() { | |
| 639 _runGuarded(_onCancel); | |
| 640 } | |
| 641 } | |
| 642 | |
| 643 class _SyncBroadcastStreamController<T> extends _BroadcastStreamController<T> { | |
| 644 _SyncBroadcastStreamController(void onListen(), void onCancel()) | |
| 645 : super(onListen, onCancel); | |
| 646 | |
| 647 // EventDispatch interface. | |
| 648 | |
| 649 void _sendData(T data) { | |
| 650 if (_isEmpty) return; | |
| 651 _forEachListener((_BufferingStreamSubscription<T> subscription) { | |
| 652 subscription._add(data); | |
| 653 }); | |
| 654 } | |
| 655 | |
| 656 void _sendError(Object error) { | |
| 657 if (_isEmpty) return; | |
| 658 _forEachListener((_BufferingStreamSubscription<T> subscription) { | |
| 659 subscription._addError(error); | |
| 660 }); | |
| 661 } | |
| 662 | |
| 663 void _sendDone() { | |
| 664 if (_isEmpty) return; | |
| 665 _forEachListener((_BroadcastSubscription<T> subscription) { | |
| 666 subscription._close(); | |
| 667 subscription._eventState |= | |
| 668 _BroadcastSubscription._STATE_REMOVE_AFTER_FIRING; | |
| 669 }); | |
| 670 } | |
| 671 } | |
| 672 | |
| 673 class _AsyncBroadcastStreamController<T> extends _BroadcastStreamController<T> { | |
| 674 _AsyncBroadcastStreamController(void onListen(), void onCancel()) | |
| 675 : super(onListen, onCancel); | |
| 676 | |
| 677 // EventDispatch interface. | |
| 678 | |
| 679 void _sendData(T data) { | |
| 680 for (_BroadcastSubscriptionLink link = _next; | |
| 681 !identical(link, this); | |
| 682 link = link._next) { | |
| 683 _BroadcastSubscription<T> subscription = link; | |
| 684 subscription._addPending(new _DelayedData(data)); | |
| 685 } | |
| 686 } | |
| 687 | |
| 688 void _sendError(Object error) { | |
| 689 for (_BroadcastSubscriptionLink link = _next; | |
| 690 !identical(link, this); | |
| 691 link = link._next) { | |
| 692 _BroadcastSubscription<T> subscription = link; | |
| 693 subscription._addPending(new _DelayedError(error)); | |
| 694 } | |
| 695 } | |
| 696 | |
| 697 void _sendDone() { | |
| 698 for (_BroadcastSubscriptionLink link = _next; | |
| 699 !identical(link, this); | |
| 700 link = link._next) { | |
| 701 _BroadcastSubscription<T> subscription = link; | |
| 702 subscription._addPending(const _DelayedDone()); | |
| 703 } | 692 } |
| 704 } | 693 } |
| 705 } | 694 } |
| 706 | |
| 707 /** | |
| 708 * Stream controller that is used by [Stream.asBroadcastStream]. | |
| 709 * | |
| 710 * This stream controller allows incoming events while it is firing | |
| 711 * other events. This is handled by delaying the events until the | |
| 712 * current event is done firing, and then fire the pending events. | |
| 713 * | |
| 714 * This class extends [_SyncBroadcastStreamController]. Events of | |
| 715 * an "asBroadcastStream" stream are always initiated by events | |
| 716 * on another stream, and it is fine to forward them synchronously. | |
| 717 */ | |
| 718 class _AsBroadcastStreamController<T> | |
| 719 extends _SyncBroadcastStreamController<T> | |
| 720 implements _EventDispatch<T> { | |
| 721 _StreamImplEvents _pending; | |
| 722 | |
| 723 _AsBroadcastStreamController(void onListen(), void onCancel()) | |
| 724 : super(onListen, onCancel); | |
| 725 | |
| 726 bool get _hasPending => _pending != null && ! _pending.isEmpty; | |
| 727 | |
| 728 void _addPendingEvent(_DelayedEvent event) { | |
| 729 if (_pending == null) { | |
| 730 _pending = new _StreamImplEvents(); | |
| 731 } | |
| 732 _pending.add(event); | |
| 733 } | |
| 734 | |
| 735 void add(T data) { | |
| 736 if (_isFiring) { | |
| 737 _addPendingEvent(new _DelayedData<T>(data)); | |
| 738 return; | |
| 739 } | |
| 740 super.add(data); | |
| 741 while (_hasPending) { | |
| 742 _pending.handleNext(this); | |
| 743 } | |
| 744 } | |
| 745 | |
| 746 void addError(Object error, [StackTrace stackTrace]) { | |
| 747 if (_isFiring) { | |
| 748 _addPendingEvent(new _DelayedError(error)); | |
| 749 return; | |
| 750 } | |
| 751 super.addError(error, stackTrace); | |
| 752 while (_hasPending) { | |
| 753 _pending.handleNext(this); | |
| 754 } | |
| 755 } | |
| 756 | |
| 757 void close() { | |
| 758 if (_isFiring) { | |
| 759 _addPendingEvent(const _DelayedDone()); | |
| 760 _state |= _STATE_CLOSED; | |
| 761 return; | |
| 762 } | |
| 763 super.close(); | |
| 764 assert(!_hasPending); | |
| 765 } | |
| 766 | |
| 767 void _callOnCancel() { | |
| 768 if (_hasPending) { | |
| 769 _pending.clear(); | |
| 770 _pending = null; | |
| 771 } | |
| 772 super._callOnCancel(); | |
| 773 } | |
| 774 } | |
| OLD | NEW |