Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
| 8 // Controller for creating and adding events to a stream. | 8 // Controller for creating and adding events to a stream. |
| 9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
| 10 | 10 |
| (...skipping 28 matching lines...) Expand all Loading... | |
| 39 * Whether to invoke a callback depends only on the state before and after | 39 * Whether to invoke a callback depends only on the state before and after |
| 40 * a stream action, for example firing an event. If the state changes multiple | 40 * a stream action, for example firing an event. If the state changes multiple |
| 41 * times during the action, and then ends up in the same state as before, no | 41 * times during the action, and then ends up in the same state as before, no |
| 42 * callback is performed. | 42 * callback is performed. |
| 43 * | 43 * |
| 44 * If listeners are added after the stream has completed (sent a "done" event), | 44 * If listeners are added after the stream has completed (sent a "done" event), |
| 45 * the listeners will be sent a "done" event eventually, but they won't affect | 45 * the listeners will be sent a "done" event eventually, but they won't affect |
| 46 * the stream at all, and won't trigger callbacks. From the controller's point | 46 * the stream at all, and won't trigger callbacks. From the controller's point |
| 47 * of view, the stream is completely inert when has completed. | 47 * of view, the stream is completely inert when has completed. |
| 48 */ | 48 */ |
| 49 abstract class StreamController<T> implements EventSink<T> { | 49 abstract class StreamController<T> implements StreamSink<T> { |
| 50 /** The stream that this controller is controlling. */ | 50 /** The stream that this controller is controlling. */ |
| 51 Stream<T> get stream; | 51 Stream<T> get stream; |
| 52 | 52 |
| 53 /** | 53 /** |
| 54 * A controller with a [stream] that supports only one single subscriber. | 54 * A controller with a [stream] that supports only one single subscriber. |
| 55 * | 55 * |
| 56 * If [sync] is true, events may be passed directly to the stream's listener | 56 * If [sync] is true, events may be passed directly to the stream's listener |
| 57 * during an [add], [addError] or [close] call. If [sync] is false, the event | 57 * during an [add], [addError] or [close] call. If [sync] is false, the event |
| 58 * will be passed to the listener at a later time, after the code creating | 58 * will be passed to the listener at a later time, after the code creating |
| 59 * the event has returned. | 59 * the event has returned. |
| 60 * | 60 * |
| 61 * The controller will buffer all incoming events until the subscriber is | 61 * The controller will buffer all incoming events until the subscriber is |
| 62 * registered. | 62 * registered. |
| 63 * | 63 * |
| 64 * The [onPause] function is called when the stream becomes | 64 * The [onPause] function is called when the stream becomes |
| 65 * paused. [onResume] is called when the stream resumed. | 65 * paused. [onResume] is called when the stream resumed. |
| 66 * | 66 * |
| 67 * The [onListen] callback is called when the stream | 67 * The [onListen] callback is called when the stream |
| 68 * receives its listener and [onCancel] when the listener ends | 68 * receives its listener and [onCancel] when the listener ends |
| 69 * its subscription. | 69 * its subscription. |
| 70 * | 70 * |
| 71 * If the stream is canceled before the controller needs new data the | 71 * If the stream is canceled before the controller needs new data the |
| 72 * [onResume] call might not be executed. | 72 * [onResume] call might not be executed. |
| 73 */ | 73 */ |
| 74 factory StreamController({void onListen(), | 74 factory StreamController({void onListen(), |
| 75 void onPause(), | 75 void onPause(), |
| 76 void onResume(), | 76 void onResume(), |
| 77 void onCancel(), | 77 void onCancel(), |
| 78 bool sync: false}) | 78 bool sync: false}) { |
| 79 => sync | 79 if (onListen == null && onPause == null && |
| 80 onResume == null && onCancel == null) { | |
| 81 return sync | |
| 82 ? new _NoCallbackSyncStreamController<T>() | |
| 83 : new _NoCallbackAsyncStreamController<T>(); | |
| 84 } | |
| 85 return sync | |
| 80 ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel) | 86 ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel) |
| 81 : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel); | 87 : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel); |
| 88 } | |
| 82 | 89 |
| 83 /** | 90 /** |
| 84 * A controller where [stream] can be listened to more than once. | 91 * A controller where [stream] can be listened to more than once. |
| 85 * | 92 * |
| 86 * The [Stream] returned by [stream] is a broadcast stream. It can be listened | 93 * The [Stream] returned by [stream] is a broadcast stream. It can be listened |
| 87 * to more than once. | 94 * to more than once. |
| 88 * | 95 * |
| 89 * The controller distributes any events to all currently subscribed | 96 * The controller distributes any events to all currently subscribed |
| 90 * listeners. | 97 * listeners. |
| 91 * It is not allowed to call [add], [addError], or [close] before a previous | 98 * It is not allowed to call [add], [addError], or [close] before a previous |
| (...skipping 24 matching lines...) Expand all Loading... | |
| 116 */ | 123 */ |
| 117 factory StreamController.broadcast({void onListen(), | 124 factory StreamController.broadcast({void onListen(), |
| 118 void onCancel(), | 125 void onCancel(), |
| 119 bool sync: false}) { | 126 bool sync: false}) { |
| 120 return sync | 127 return sync |
| 121 ? new _SyncBroadcastStreamController<T>(onListen, onCancel) | 128 ? new _SyncBroadcastStreamController<T>(onListen, onCancel) |
| 122 : new _AsyncBroadcastStreamController<T>(onListen, onCancel); | 129 : new _AsyncBroadcastStreamController<T>(onListen, onCancel); |
| 123 } | 130 } |
| 124 | 131 |
| 125 /** | 132 /** |
| 126 * Returns a view of this object that only exposes the [EventSink] interface. | 133 * Returns a view of this object that only exposes the [StreamSink] interface. |
| 127 */ | 134 */ |
| 128 EventSink<T> get sink; | 135 StreamSink<T> get sink; |
| 129 | 136 |
| 130 /** | 137 /** |
| 131 * Whether the stream is closed for adding more events. | 138 * Whether the stream is closed for adding more events. |
| 132 * | 139 * |
| 133 * If true, the "done" event might not have fired yet, but it has been | 140 * If true, the "done" event might not have _ yet, but it has been |
|
floitsch
2013/06/27 15:15:19
undo change?
Lasse Reichstein Nielsen
2013/06/28 12:57:38
Done.
| |
| 134 * scheduled, and it is too late to add more events. | 141 * scheduled, and it is too late to add more events. |
| 135 */ | 142 */ |
| 136 bool get isClosed; | 143 bool get isClosed; |
| 137 | 144 |
| 138 /** | 145 /** |
| 139 * Whether the subscription would need to buffer events. | 146 * Whether the subscription would need to buffer events. |
| 140 * | 147 * |
| 141 * This is the case if the controller's stream has a listener and it is | 148 * This is the case if the controller's stream has a listener and it is |
| 142 * paused, or if it has not received a listener yet. In that case, the | 149 * paused, or if it has not received a listener yet. In that case, the |
| 143 * controller is considered paused as well. | 150 * controller is considered paused as well. |
| (...skipping 11 matching lines...) Expand all Loading... | |
| 155 * Send or enqueue an error event. | 162 * Send or enqueue an error event. |
| 156 * | 163 * |
| 157 * Also allows an objection stack trace object, on top of what [EventSink] | 164 * Also allows an objection stack trace object, on top of what [EventSink] |
| 158 * allows. | 165 * allows. |
| 159 */ | 166 */ |
| 160 void addError(Object error, [Object stackTrace]); | 167 void addError(Object error, [Object stackTrace]); |
| 161 } | 168 } |
| 162 | 169 |
| 163 | 170 |
| 164 abstract class _StreamControllerLifecycle<T> { | 171 abstract class _StreamControllerLifecycle<T> { |
| 165 void _recordListen(StreamSubscription<T> subscription) {} | 172 StreamSubscription<T> _subscribe(void onData(T data), |
| 173 void onError(Object error), | |
| 174 void onDone(), | |
| 175 bool cancelOnError); | |
| 166 void _recordPause(StreamSubscription<T> subscription) {} | 176 void _recordPause(StreamSubscription<T> subscription) {} |
| 167 void _recordResume(StreamSubscription<T> subscription) {} | 177 void _recordResume(StreamSubscription<T> subscription) {} |
| 168 void _recordCancel(StreamSubscription<T> subscription) {} | 178 void _recordCancel(StreamSubscription<T> subscription) {} |
| 169 } | 179 } |
| 170 | 180 |
| 171 /** | 181 /** |
| 172 * Default implementation of [StreamController]. | 182 * Default implementation of [StreamController]. |
| 173 * | 183 * |
| 174 * Controls a stream that only supports a single controller. | 184 * Controls a stream that only supports a single controller. |
| 175 */ | 185 */ |
| 176 abstract class _StreamController<T> implements StreamController<T>, | 186 abstract class _StreamController<T> implements StreamController<T>, |
| 177 _StreamControllerLifecycle<T>, | 187 _StreamControllerLifecycle<T>, |
| 188 _EventSink<T>, | |
| 178 _EventDispatch<T> { | 189 _EventDispatch<T> { |
| 179 static const int _STATE_OPEN = 0; | 190 // The states are bit-flags. More than one can be set at a time. |
| 180 static const int _STATE_CANCELLED = 1; | 191 // |
| 181 static const int _STATE_CLOSED = 2; | 192 // The "subscription state" goes through the states: |
| 193 // initial -> subscribed -> canceled. | |
| 194 // These are mutually exclusive. | |
| 195 // The "closed" state records whether the [close] method has been called | |
| 196 // on the controller. This can be done at any time. If done before | |
| 197 // subscription, the done event is queued. If done after cancel, the done | |
| 198 // event is ignored (just as any other event after a cancel). | |
| 182 | 199 |
| 183 final _NotificationHandler _onListen; | 200 /** The controller is in its initial state with no subscription. */ |
| 184 final _NotificationHandler _onPause; | 201 static const int _STATE_INITIAL = 0; |
| 185 final _NotificationHandler _onResume; | 202 /** The controller has a subscription, but hasn't been closed or canceled. */ |
| 186 final _NotificationHandler _onCancel; | 203 static const int _STATE_SUBSCRIBED = 1; |
| 187 _StreamImpl<T> _stream; | 204 /** The subscription is canceled. */ |
| 205 static const int _STATE_CANCELED = 2; | |
| 206 /** Mask for the subscription state. */ | |
| 207 static const int _STATE_SUBSCRIPTION_MASK = 3; | |
| 188 | 208 |
| 189 // An active subscription on the stream, or null if no subscripton is active. | 209 // The following state relate to the controller, not the subscription. |
| 190 _ControllerSubscription<T> _subscription; | 210 // If closed, adding more events is not allowed. |
| 191 | 211 // If executing an [addStream], now events are not allowed either, but will |
|
floitsch
2013/06/27 15:15:19
-now-
Move description of _STATE_ADDSTREAM below.
Lasse Reichstein Nielsen
2013/06/28 12:57:38
now -> new.
Reworded and reordered.
| |
| 192 // Whether we have sent a "done" event. | 212 // be added by the stream. |
| 193 int _state = _STATE_OPEN; | 213 /** The controller is closed due to calling [close]. */ |
| 194 | 214 static const int _STATE_CLOSED = 4; |
| 195 // Events added to the stream before it has an active subscription. | 215 /** The controller is in the middle of an [addStream] call. */ |
| 196 _PendingEvents _pendingEvents = null; | 216 static const int _STATE_ADDSTREAM = 8; |
| 197 | |
| 198 _StreamController(this._onListen, | |
| 199 this._onPause, | |
| 200 this._onResume, | |
| 201 this._onCancel) { | |
| 202 _stream = new _ControllerStream<T>(this); | |
| 203 } | |
| 204 | |
| 205 Stream<T> get stream => _stream; | |
| 206 | 217 |
| 207 /** | 218 /** |
| 208 * Returns a view of this object that only exposes the [EventSink] interface. | 219 * Field containing different data depending on the current subscription |
| 220 * state. | |
| 221 * | |
| 222 * If [_state] is [_STATE_INITIAL], the field may contain a [_PendingEvents] | |
| 223 * for events added to the controller before a subscription. | |
| 224 * | |
| 225 * While [_state] is [_STATE_SUBSCRIBED], the field contains the subscription. | |
| 226 * | |
| 227 * When [_state] is [_STATE_CANCELED] the field is currently not used. | |
| 209 */ | 228 */ |
| 210 EventSink<T> get sink => new _EventSinkView<T>(this); | 229 var _varData; |
| 230 | |
| 231 /** Current state of the controller. */ | |
| 232 int _state = _STATE_INITIAL; | |
| 211 | 233 |
| 212 /** | 234 /** |
| 213 * Whether a listener has existed and been cancelled. | 235 * Future completed when the stream sends its last event. |
| 236 * | |
| 237 * This is also the future returned by [close]. | |
| 238 */ | |
| 239 // TODO(lrn): Could this be stored in the varData field too, if it's not | |
| 240 // accessed until the call to "close"? Then we need to special case if it's | |
| 241 // accessed earlier, or if close is called before subscribing. | |
| 242 _FutureImpl _doneFuture; | |
| 243 | |
| 244 _StreamController(); | |
| 245 | |
| 246 _NotificationHandler get _onListen; | |
| 247 _NotificationHandler get _onPause; | |
| 248 _NotificationHandler get _onResume; | |
| 249 _NotificationHandler get _onCancel; | |
| 250 | |
| 251 // Return a new stream every time. The streams are equal, but not identical. | |
| 252 Stream<T> get stream => new _ControllerStream(this); | |
| 253 | |
| 254 /** | |
| 255 * Returns a view of this object that only exposes the [StreamSink] interface. | |
| 256 */ | |
| 257 StreamSink<T> get sink => new _StreamSinkWrapper<T>(this); | |
| 258 | |
| 259 /** | |
| 260 * Whether a listener has existed and been canceled. | |
| 214 * | 261 * |
| 215 * After this, adding more events will be ignored. | 262 * After this, adding more events will be ignored. |
| 216 */ | 263 */ |
| 217 bool get _isCancelled => (_state & _STATE_CANCELLED) != 0; | 264 bool get _isCanceled => (_state & _STATE_CANCELED) != 0; |
| 265 | |
| 266 /** Whether there is an active listener. */ | |
| 267 bool get hasListener => (_state & _STATE_SUBSCRIBED) != 0; | |
| 268 | |
| 269 /** Whether there has not been a listener yet. */ | |
| 270 bool get _isInitialState => | |
| 271 (_state & _STATE_SUBSCRIPTION_MASK) == _STATE_INITIAL; | |
| 218 | 272 |
| 219 bool get isClosed => (_state & _STATE_CLOSED) != 0; | 273 bool get isClosed => (_state & _STATE_CLOSED) != 0; |
| 220 | 274 |
| 221 bool get isPaused => hasListener ? _subscription._isInputPaused | 275 bool get isPaused => hasListener ? _subscription._isInputPaused |
| 222 : !_isCancelled; | 276 : !_isCanceled; |
| 223 | 277 |
| 224 bool get hasListener => _subscription != null; | 278 bool get _isAddingStream => (_state & _STATE_ADDSTREAM) != 0; |
| 279 | |
| 280 /** New events may not be added after close, or during addStream. */ | |
| 281 bool get _mayAddEvent => (_state < _STATE_CLOSED); | |
| 282 | |
| 283 // Returns the pending events. | |
| 284 // Pending events are events added before a subscription exists. | |
| 285 // They are added to the subscription when it is created. | |
| 286 // Pending events, if any, are kept in the _varData field until the | |
| 287 // stream is listened to. | |
| 288 // While adding a stream, pending events are moved into the | |
| 289 // state object to allow the state object to use the _varData field. | |
| 290 _PendingEvents get _pendingEvents { | |
| 291 assert(_isInitialState); | |
| 292 if (!_isAddingStream) { | |
| 293 return _varData; | |
| 294 } | |
| 295 _StreamControllerAddStreamState state = _varData; | |
| 296 return state.varData; | |
| 297 } | |
| 298 | |
| 299 // Returns the pending events, and creates the object if necessary. | |
| 300 _StreamImplEvents _ensurePendingEvents() { | |
| 301 assert(_isInitialState); | |
| 302 if (!_isAddingStream) { | |
| 303 if (_varData == null) _varData = new _StreamImplEvents(); | |
| 304 return _varData; | |
| 305 } | |
| 306 _StreamControllerAddStreamState state = _varData; | |
| 307 if (state.varData == null) state.varData = new _StreamImplEvents(); | |
| 308 return state.varData; | |
| 309 } | |
| 310 | |
| 311 // Get the current subscription. | |
| 312 // If we are adding a stream, the subscription is moved into the state | |
| 313 // object to allow the state object to use the _varData field. | |
| 314 _ControllerSubscription get _subscription { | |
| 315 assert(hasListener); | |
| 316 if (_isAddingStream) { | |
| 317 _StreamControllerAddStreamState addState = _varData; | |
| 318 return addState.varData; | |
| 319 } | |
| 320 return _varData; | |
| 321 } | |
| 225 | 322 |
| 226 /** | 323 /** |
| 227 * Send or queue a data event. | 324 * Creates an error describing why an event cannot be added. |
| 325 * | |
| 326 * The reason, and therefore the error message, depends on the current state. | |
| 327 */ | |
| 328 Error _badEventState() { | |
| 329 if (isClosed) { | |
| 330 return new StateError("Cannot add event after closing"); | |
| 331 } | |
| 332 assert(_isAddingStream); | |
| 333 return new StateError("Cannot add event while adding a stream"); | |
| 334 } | |
| 335 | |
| 336 // StreamSink interface. | |
| 337 Future addStream(Stream<T> source) { | |
| 338 if (!_mayAddEvent) throw _badEventState(); | |
| 339 if (_isCanceled) return new _FutureImpl.immediate(null); | |
| 340 _StreamControllerAddStreamState addState = | |
| 341 new _StreamControllerAddStreamState(this, _varData, source); | |
| 342 _varData = addState; | |
| 343 _state |= _STATE_ADDSTREAM; | |
| 344 return addState.addStreamFuture; | |
| 345 } | |
| 346 | |
| 347 Future get done => _ensureDoneFuture(); | |
| 348 | |
| 349 Future _ensureDoneFuture() { | |
| 350 if (_doneFuture == null) { | |
| 351 _doneFuture = new _FutureImpl(); | |
| 352 if (_isCanceled) _doneFuture._setValue(null); | |
| 353 } | |
| 354 return _doneFuture; | |
| 355 } | |
| 356 | |
| 357 /** | |
| 358 * Send or enqueue a data event. | |
| 228 */ | 359 */ |
| 229 void add(T value) { | 360 void add(T value) { |
| 230 if (isClosed) throw new StateError("Adding event after close"); | 361 if (!_mayAddEvent) throw _badEventState(); |
| 231 if (_subscription != null) { | 362 _add(value); |
| 232 _sendData(value); | |
| 233 } else if (!_isCancelled) { | |
| 234 _addPendingEvent(new _DelayedData<T>(value)); | |
| 235 } | |
| 236 } | 363 } |
| 237 | 364 |
| 238 /** | 365 /** |
| 239 * Send or enqueue an error event. | 366 * Send or enqueue an error event. |
| 240 */ | 367 */ |
| 241 void addError(Object error, [Object stackTrace]) { | 368 void addError(Object error, [Object stackTrace]) { |
| 242 if (isClosed) throw new StateError("Adding event after close"); | 369 if (!_mayAddEvent) throw _badEventState(); |
| 243 if (stackTrace != null) { | 370 if (stackTrace != null) { |
| 244 // Force stack trace overwrite. Even if the error already contained | 371 // Force stack trace overwrite. Even if the error already contained |
| 245 // a stack trace. | 372 // a stack trace. |
| 246 _attachStackTrace(error, stackTrace); | 373 _attachStackTrace(error, stackTrace); |
| 247 } | 374 } |
| 248 if (_subscription != null) { | 375 if (hasListener) { |
| 249 _sendError(error); | 376 _sendError(error); |
| 250 } else if (!_isCancelled) { | 377 } else if (_isInitialState) { |
| 251 _addPendingEvent(new _DelayedError(error)); | 378 _ensurePendingEvents().add(new _DelayedError(error)); |
| 252 } | 379 } |
| 253 } | 380 } |
| 254 | 381 |
| 255 /** | 382 /** |
| 256 * Closes this controller. | 383 * Closes this controller. |
| 257 * | 384 * |
| 258 * After closing, no further events may be added using [add] or [addError]. | 385 * After closing, no further events may be added using [add] or [addError]. |
| 259 * | 386 * |
| 260 * You are allowed to close the controller more than once, but only the first | 387 * You are allowed to close the controller more than once, but only the first |
| 261 * call has any effect. | 388 * call has any effect. |
| 262 * | 389 * |
| 263 * The first time a controller is closed, a "done" event is sent to its | 390 * The first time a controller is closed, a "done" event is sent to its |
| 264 * stream. | 391 * stream. |
| 265 */ | 392 */ |
| 266 void close() { | 393 Future close() { |
| 267 if (isClosed) return; | 394 if (isClosed) { |
| 395 assert(_doneFuture != null); // Was set when close was first called. | |
| 396 return _doneFuture; | |
| 397 } | |
| 398 if (!_mayAddEvent) throw _badEventState(); | |
| 268 _state |= _STATE_CLOSED; | 399 _state |= _STATE_CLOSED; |
| 269 if (_subscription != null) { | 400 _ensureDoneFuture(); |
| 401 if (hasListener) { | |
| 270 _sendDone(); | 402 _sendDone(); |
| 271 } else if (!_isCancelled) { | 403 } else if (_isInitialState) { |
| 272 _addPendingEvent(const _DelayedDone()); | 404 _ensurePendingEvents().add(const _DelayedDone()); |
| 405 } | |
| 406 return _doneFuture; | |
| 407 } | |
| 408 | |
| 409 // EventSink interface. Used by the [addStream] events. | |
| 410 | |
| 411 // Add data event, used both by the [addStream] events and by [add]. | |
| 412 void _add(T value) { | |
| 413 if (hasListener) { | |
| 414 _sendData(value); | |
| 415 } else if (_isInitialState) { | |
| 416 _ensurePendingEvents().add(new _DelayedData<T>(value)); | |
| 273 } | 417 } |
| 274 } | 418 } |
| 275 | 419 |
| 276 // EventDispatch interface | 420 void _addError(Object error) { |
| 277 | 421 // Error from addStream. Stop the addStream and complete its future with the |
|
floitsch
2013/06/27 15:15:19
I don't think that's the right thing to do. Just p
Lasse Reichstein Nielsen
2013/06/28 12:57:38
Done.
| |
| 278 void _addPendingEvent(_DelayedEvent event) { | 422 // error. |
| 279 if (_isCancelled) return; | 423 assert(_isAddingStream); |
| 280 _StreamImplEvents events = _pendingEvents; | 424 _StreamControllerAddStreamState addState = _varData; |
| 281 if (events == null) { | 425 _varData = addState.varData; |
| 282 events = _pendingEvents = new _StreamImplEvents(); | 426 _state &= ~_STATE_ADDSTREAM; |
| 283 } | 427 addState.completeWithError(error); |
| 284 events.add(event); | |
| 285 } | 428 } |
| 286 | 429 |
| 287 void _recordListen(_BufferingStreamSubscription<T> subscription) { | 430 void _close() { |
| 288 assert(_subscription == null); | 431 // End of addStream stream. |
| 289 _subscription = subscription; | 432 assert(_isAddingStream); |
| 290 subscription._setPendingEvents(_pendingEvents); | 433 _StreamControllerAddStreamState addState = _varData; |
| 291 _pendingEvents = null; | 434 _varData = addState.varData; |
| 435 _state &= ~_STATE_ADDSTREAM; | |
| 436 addState.complete(); | |
| 437 } | |
| 438 | |
| 439 // _StreamControllerLifeCycle interface | |
| 440 | |
| 441 StreamSubscription<T> _subscribe(void onData(T data), | |
| 442 void onError(Object error), | |
| 443 void onDone(), | |
| 444 bool cancelOnError) { | |
| 445 if (!_isInitialState) { | |
| 446 throw new StateError("Stream has already been listened to."); | |
| 447 } | |
| 448 _ControllerSubscription subscription = new _ControllerSubscription( | |
| 449 this, onData, onError, onDone, cancelOnError); | |
| 450 | |
| 451 _PendingEvents pendingEvents = _pendingEvents; | |
| 452 _state |= _STATE_SUBSCRIBED; | |
| 453 if (_isAddingStream) { | |
| 454 _StreamControllerAddStreamState addState = _varData; | |
| 455 addState.varData = subscription; | |
| 456 } else { | |
| 457 _varData = subscription; | |
| 458 } | |
| 459 subscription._setPendingEvents(pendingEvents); | |
| 292 subscription._guardCallback(() { | 460 subscription._guardCallback(() { |
| 293 _runGuarded(_onListen); | 461 _runGuarded(_onListen); |
| 294 }); | 462 }); |
| 463 | |
| 464 return subscription; | |
| 295 } | 465 } |
| 296 | 466 |
| 297 void _recordCancel(StreamSubscription<T> subscription) { | 467 void _recordCancel(StreamSubscription<T> subscription) { |
| 298 assert(identical(_subscription, subscription)); | 468 if (_isAddingStream) { |
| 299 _subscription = null; | 469 _StreamControllerAddStreamState addState = _varData; |
| 300 _state |= _STATE_CANCELLED; | 470 addState.cancel(); |
| 471 } | |
| 472 _varData = null; | |
| 473 _state = | |
| 474 (_state & ~(_STATE_SUBSCRIBED | _STATE_ADDSTREAM)) | _STATE_CANCELED; | |
| 301 _runGuarded(_onCancel); | 475 _runGuarded(_onCancel); |
| 476 if (_doneFuture != null && _doneFuture._mayComplete) { | |
| 477 _doneFuture._asyncSetValue(null); | |
| 478 } | |
| 302 } | 479 } |
| 303 | 480 |
| 304 void _recordPause(StreamSubscription<T> subscription) { | 481 void _recordPause(StreamSubscription<T> subscription) { |
| 482 if (_isAddingStream) { | |
| 483 _StreamControllerAddStreamState addState = _varData; | |
| 484 addState.pause(); | |
| 485 } | |
| 305 _runGuarded(_onPause); | 486 _runGuarded(_onPause); |
| 306 } | 487 } |
| 307 | 488 |
| 308 void _recordResume(StreamSubscription<T> subscription) { | 489 void _recordResume(StreamSubscription<T> subscription) { |
| 490 if (_isAddingStream) { | |
| 491 _StreamControllerAddStreamState addState = _varData; | |
| 492 addState.resume(); | |
| 493 } | |
| 309 _runGuarded(_onResume); | 494 _runGuarded(_onResume); |
| 310 } | 495 } |
| 311 } | 496 } |
| 312 | 497 |
| 313 class _SyncStreamController<T> extends _StreamController<T> { | 498 abstract class _SyncStreamControllerDispatch<T> |
| 314 _SyncStreamController(void onListen(), | 499 implements _StreamController<T> { |
| 315 void onPause(), | |
| 316 void onResume(), | |
| 317 void onCancel()) | |
| 318 : super(onListen, onPause, onResume, onCancel); | |
| 319 | |
| 320 void _sendData(T data) { | 500 void _sendData(T data) { |
| 321 _subscription._add(data); | 501 _subscription._add(data); |
| 322 } | 502 } |
| 323 | 503 |
| 324 void _sendError(Object error) { | 504 void _sendError(Object error) { |
| 325 _subscription._addError(error); | 505 _subscription._addError(error); |
| 326 } | 506 } |
| 327 | 507 |
| 328 void _sendDone() { | 508 void _sendDone() { |
| 329 _subscription._close(); | 509 _subscription._close(); |
| 330 } | 510 } |
| 331 } | 511 } |
| 332 | 512 |
| 333 class _AsyncStreamController<T> extends _StreamController<T> { | 513 abstract class _AsyncStreamControllerDispatch<T> |
| 334 _AsyncStreamController(void onListen(), | 514 implements _StreamController<T> { |
| 335 void onPause(), | |
| 336 void onResume(), | |
| 337 void onCancel()) | |
| 338 : super(onListen, onPause, onResume, onCancel); | |
| 339 | |
| 340 void _sendData(T data) { | 515 void _sendData(T data) { |
| 341 _subscription._addPending(new _DelayedData(data)); | 516 _subscription._addPending(new _DelayedData(data)); |
| 342 } | 517 } |
| 343 | 518 |
| 344 void _sendError(Object error) { | 519 void _sendError(Object error) { |
| 345 _subscription._addPending(new _DelayedError(error)); | 520 _subscription._addPending(new _DelayedError(error)); |
| 346 } | 521 } |
| 347 | 522 |
| 348 void _sendDone() { | 523 void _sendDone() { |
| 349 _subscription._addPending(const _DelayedDone()); | 524 _subscription._addPending(const _DelayedDone()); |
| 350 } | 525 } |
| 351 } | 526 } |
| 352 | 527 |
| 528 // TODO(lrn): Use common superclass for callback-controllers when VM supports | |
| 529 // constructors in mixin superclasses. | |
| 530 | |
| 531 class _AsyncStreamController<T> extends _StreamController<T> | |
| 532 with _AsyncStreamControllerDispatch<T> { | |
| 533 final _NotificationHandler _onListen; | |
| 534 final _NotificationHandler _onPause; | |
| 535 final _NotificationHandler _onResume; | |
| 536 final _NotificationHandler _onCancel; | |
| 537 | |
| 538 _AsyncStreamController(void this._onListen(), | |
| 539 void this._onPause(), | |
| 540 void this._onResume(), | |
| 541 void this._onCancel()); | |
| 542 } | |
| 543 | |
| 544 class _SyncStreamController<T> extends _StreamController<T> | |
| 545 with _SyncStreamControllerDispatch<T> { | |
| 546 final _NotificationHandler _onListen; | |
| 547 final _NotificationHandler _onPause; | |
| 548 final _NotificationHandler _onResume; | |
| 549 final _NotificationHandler _onCancel; | |
| 550 | |
| 551 _SyncStreamController(void this._onListen(), | |
| 552 void this._onPause(), | |
| 553 void this._onResume(), | |
| 554 void this._onCancel()); | |
| 555 } | |
| 556 | |
| 557 abstract class _NoCallbacks { | |
| 558 _NotificationHandler get _onListen => null; | |
| 559 _NotificationHandler get _onPause => null; | |
| 560 _NotificationHandler get _onResume => null; | |
| 561 _NotificationHandler get _onCancel => null; | |
| 562 } | |
| 563 | |
| 564 typedef _NoCallbackAsyncStreamController<T> = _StreamController<T> | |
| 565 with _AsyncStreamControllerDispatch/*<T>*/, _NoCallbacks; | |
| 566 | |
| 567 typedef _NoCallbackSyncStreamController<T> = _StreamController<T> | |
| 568 with _SyncStreamControllerDispatch/*<T>*/, _NoCallbacks; | |
| 569 | |
| 353 typedef void _NotificationHandler(); | 570 typedef void _NotificationHandler(); |
| 354 | 571 |
| 355 void _runGuarded(_NotificationHandler notificationHandler) { | 572 void _runGuarded(_NotificationHandler notificationHandler) { |
| 356 if (notificationHandler == null) return; | 573 if (notificationHandler == null) return; |
| 357 try { | 574 try { |
| 358 notificationHandler(); | 575 notificationHandler(); |
| 359 } catch (e, s) { | 576 } catch (e, s) { |
| 360 _Zone.current.handleUncaughtError(_asyncError(e, s)); | 577 _Zone.current.handleUncaughtError(_asyncError(e, s)); |
| 361 } | 578 } |
| 362 } | 579 } |
| 363 | 580 |
| 364 class _ControllerStream<T> extends _StreamImpl<T> { | 581 class _ControllerStream<T> extends _StreamImpl<T> { |
| 365 _StreamControllerLifecycle<T> _controller; | 582 _StreamControllerLifecycle<T> _controller; |
| 366 bool _hasListener = false; | |
| 367 | 583 |
| 368 _ControllerStream(this._controller); | 584 _ControllerStream(this._controller); |
| 369 | 585 |
| 370 StreamSubscription<T> _createSubscription( | 586 StreamSubscription<T> _createSubscription( |
| 371 void onData(T data), | 587 void onData(T data), |
| 372 void onError(Object error), | 588 void onError(Object error), |
| 373 void onDone(), | 589 void onDone(), |
| 374 bool cancelOnError) { | 590 bool cancelOnError) => |
| 375 if (_hasListener) { | 591 _controller._subscribe(onData, onError, onDone, cancelOnError); |
| 376 throw new StateError("The stream has already been listened to."); | |
| 377 } | |
| 378 _hasListener = true; | |
| 379 return new _ControllerSubscription<T>( | |
| 380 _controller, onData, onError, onDone, cancelOnError); | |
| 381 } | |
| 382 | 592 |
| 383 void _onListen(_BufferingStreamSubscription subscription) { | 593 // Override == and hashCode so that new streams returned by the same |
| 384 _controller._recordListen(subscription); | 594 // controller are considered equal. The controller returns a new stream |
| 595 // each time it's queried, but doesn't have to cache the result. | |
| 596 | |
| 597 int get hashCode => _controller.hashCode ^ 0x35323532; | |
| 598 | |
| 599 bool operator==(Object other) { | |
| 600 if (other is! _ControllerStream) return false; | |
| 601 _ControllerStream otherStream = other; | |
| 602 return identical(otherStream._controller, this); | |
| 385 } | 603 } |
| 386 } | 604 } |
| 387 | 605 |
| 388 class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> { | 606 class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> { |
| 389 final _StreamControllerLifecycle<T> _controller; | 607 final _StreamControllerLifecycle<T> _controller; |
| 390 | 608 |
| 391 _ControllerSubscription(this._controller, | 609 _ControllerSubscription(this._controller, |
| 392 void onData(T data), | 610 void onData(T data), |
| 393 void onError(Object error), | 611 void onError(Object error), |
| 394 void onDone(), | 612 void onDone(), |
| 395 bool cancelOnError) | 613 bool cancelOnError) |
| 396 : super(onData, onError, onDone, cancelOnError); | 614 : super(onData, onError, onDone, cancelOnError); |
| 397 | 615 |
| 398 void _onCancel() { | 616 void _onCancel() { |
| 399 _controller._recordCancel(this); | 617 _controller._recordCancel(this); |
| 400 } | 618 } |
| 401 | 619 |
| 402 void _onPause() { | 620 void _onPause() { |
| 403 _controller._recordPause(this); | 621 _controller._recordPause(this); |
| 404 } | 622 } |
| 405 | 623 |
| 406 void _onResume() { | 624 void _onResume() { |
| 407 _controller._recordResume(this); | 625 _controller._recordResume(this); |
| 408 } | 626 } |
| 409 } | 627 } |
| 410 | 628 |
| 411 class _BroadcastStream<T> extends _StreamImpl<T> { | |
| 412 _BroadcastStreamController _controller; | |
| 413 | 629 |
| 414 _BroadcastStream(this._controller); | 630 /** A class that exposes only the [StreamSink] interface of an object. */ |
| 631 class _StreamSinkWrapper<T> implements StreamSink<T> { | |
| 632 StreamSink _target; | |
|
floitsch
2013/06/27 15:15:19
final.
Lasse Reichstein Nielsen
2013/06/28 12:57:38
Done.
| |
| 633 _StreamSinkWrapper(this._target); | |
| 634 void add(T data) { _target.add(data); } | |
| 635 void addError(Object error) { _target.addError(error); } | |
| 636 Future close() => _target.close(); | |
| 637 Future addStream(Stream<T> source) => _target.addStream(source); | |
| 638 Future get done => _target.done; | |
| 639 } | |
| 415 | 640 |
| 416 bool get isBroadcast => true; | 641 /** |
| 642 * Object containing the state used to handle [StreamController.addStream]. | |
| 643 */ | |
| 644 class _AddStreamState<T> { | |
| 645 // [_FutureImpl] returned by call to addStream. | |
| 646 _FutureImpl addStreamFuture; | |
| 417 | 647 |
| 418 StreamSubscription<T> _createSubscription( | 648 // Subscription on stream argument to addStream. |
| 419 void onData(T data), | 649 StreamSubscription addSubscription; |
| 420 void onError(Object error), | 650 |
| 421 void onDone(), | 651 _AddStreamState(StreamSink controller, Stream source) |
| 422 bool cancelOnError) { | 652 : addStreamFuture = new _FutureImpl(), |
| 423 return new _BroadcastSubscription<T>( | 653 addSubscription = source.listen(controller._add, |
| 424 _controller, onData, onError, onDone, cancelOnError); | 654 onError: controller._addError, |
| 655 onDone: controller._close, | |
| 656 cancelOnError: true); | |
| 657 | |
| 658 void pause() { | |
| 659 addSubscription.pause(); | |
| 425 } | 660 } |
| 426 | 661 |
| 427 void _onListen(_BufferingStreamSubscription subscription) { | 662 void resume() { |
| 428 _controller._recordListen(subscription); | 663 addSubscription.resume(); |
| 664 } | |
| 665 | |
| 666 void cancel() { | |
| 667 addSubscription.cancel(); | |
| 668 complete(); | |
| 669 } | |
| 670 | |
| 671 void completeWithError(Object error) { | |
| 672 addStreamFuture._asyncSetError(error); | |
| 673 } | |
| 674 | |
| 675 void complete() { | |
| 676 addStreamFuture._asyncSetValue(null); | |
| 429 } | 677 } |
| 430 } | 678 } |
| 431 | 679 |
| 432 abstract class _BroadcastSubscriptionLink { | 680 class _StreamControllerAddStreamState<T> extends _AddStreamState<T> { |
| 433 _BroadcastSubscriptionLink _next; | 681 // The subscription or pending data of a _StreamController. |
| 434 _BroadcastSubscriptionLink _previous; | 682 // Stored here because we reuse the `_varData` field in the _StreamController |
| 435 } | 683 // to store this state object. |
| 684 var varData; | |
| 436 | 685 |
| 437 class _BroadcastSubscription<T> extends _ControllerSubscription<T> | 686 _StreamControllerAddStreamState(_StreamController controller, |
| 438 implements _BroadcastSubscriptionLink { | 687 this.varData, |
| 439 static const int _STATE_EVENT_ID = 1; | 688 Stream source) : super(controller, source) { |
| 440 static const int _STATE_FIRING = 2; | 689 if (controller.isPaused) { |
| 441 static const int _STATE_REMOVE_AFTER_FIRING = 4; | 690 addSubscription.pause(); |
| 442 int _eventState; | |
| 443 | |
| 444 _BroadcastSubscriptionLink _next; | |
| 445 _BroadcastSubscriptionLink _previous; | |
| 446 | |
| 447 _BroadcastSubscription(_StreamControllerLifecycle controller, | |
| 448 void onData(T data), | |
| 449 void onError(Object error), | |
| 450 void onDone(), | |
| 451 bool cancelOnError) | |
| 452 : super(controller, onData, onError, onDone, cancelOnError) { | |
| 453 _next = _previous = this; | |
| 454 } | |
| 455 | |
| 456 _BroadcastStreamController get _controller => super._controller; | |
| 457 | |
| 458 bool _expectsEvent(int eventId) { | |
| 459 return (_eventState & _STATE_EVENT_ID) == eventId; | |
| 460 } | |
| 461 | |
| 462 void _toggleEventId() { | |
| 463 _eventState ^= _STATE_EVENT_ID; | |
| 464 } | |
| 465 | |
| 466 bool get _isFiring => (_eventState & _STATE_FIRING) != 0; | |
| 467 | |
| 468 bool _setRemoveAfterFiring() { | |
| 469 assert(_isFiring); | |
| 470 _eventState |= _STATE_REMOVE_AFTER_FIRING; | |
| 471 } | |
| 472 | |
| 473 bool get _removeAfterFiring => | |
| 474 (_eventState & _STATE_REMOVE_AFTER_FIRING) != 0; | |
| 475 } | |
| 476 | |
| 477 | |
| 478 abstract class _BroadcastStreamController<T> | |
| 479 implements StreamController<T>, | |
| 480 _StreamControllerLifecycle<T>, | |
| 481 _BroadcastSubscriptionLink, | |
| 482 _EventDispatch<T> { | |
| 483 static const int _STATE_INITIAL = 0; | |
| 484 static const int _STATE_EVENT_ID = 1; | |
| 485 static const int _STATE_FIRING = 2; | |
| 486 static const int _STATE_CLOSED = 4; | |
| 487 | |
| 488 final _NotificationHandler _onListen; | |
| 489 final _NotificationHandler _onCancel; | |
| 490 | |
| 491 // State of the controller. | |
| 492 int _state; | |
| 493 | |
| 494 // Double-linked list of active listeners. | |
| 495 _BroadcastSubscriptionLink _next; | |
| 496 _BroadcastSubscriptionLink _previous; | |
| 497 | |
| 498 _BroadcastStreamController(this._onListen, this._onCancel) | |
| 499 : _state = _STATE_INITIAL { | |
| 500 _next = _previous = this; | |
| 501 } | |
| 502 | |
| 503 // StreamController interface. | |
| 504 | |
| 505 Stream<T> get stream => new _BroadcastStream<T>(this); | |
| 506 | |
| 507 EventSink<T> get sink => new _EventSinkView<T>(this); | |
| 508 | |
| 509 bool get isClosed => (_state & _STATE_CLOSED) != 0; | |
| 510 | |
| 511 /** | |
| 512 * A broadcast controller is never paused. | |
| 513 * | |
| 514 * Each receiving stream may be paused individually, and they handle their | |
| 515 * own buffering. | |
| 516 */ | |
| 517 bool get isPaused => false; | |
| 518 | |
| 519 /** Whether there are currently a subscriber on the [Stream]. */ | |
| 520 bool get hasListener => !_isEmpty; | |
| 521 | |
| 522 /** Whether an event is being fired (sent to some, but not all, listeners). */ | |
| 523 bool get _isFiring => (_state & _STATE_FIRING) != 0; | |
| 524 | |
| 525 // Linked list helpers | |
| 526 | |
| 527 bool get _isEmpty => identical(_next, this); | |
| 528 | |
| 529 /** Adds subscription to linked list of active listeners. */ | |
| 530 void _addListener(_BroadcastSubscription<T> subscription) { | |
| 531 _BroadcastSubscriptionLink previous = _previous; | |
| 532 previous._next = subscription; | |
| 533 _previous = subscription._previous; | |
| 534 subscription._previous._next = this; | |
| 535 subscription._previous = previous; | |
| 536 subscription._eventState = (_state & _STATE_EVENT_ID); | |
| 537 } | |
| 538 | |
| 539 void _removeListener(_BroadcastSubscription<T> subscription) { | |
| 540 assert(identical(subscription._controller, this)); | |
| 541 assert(!identical(subscription._next, subscription)); | |
| 542 subscription._previous._next = subscription._next; | |
| 543 subscription._next._previous = subscription._previous; | |
| 544 subscription._next = subscription._previous = subscription; | |
| 545 } | |
| 546 | |
| 547 // _StreamControllerLifecycle interface. | |
| 548 | |
| 549 void _recordListen(_BroadcastSubscription<T> subscription) { | |
| 550 _addListener(subscription); | |
| 551 if (identical(_next, _previous)) { | |
| 552 // Only one listener, so it must be the first listener. | |
| 553 _runGuarded(_onListen); | |
| 554 } | |
| 555 } | |
| 556 | |
| 557 void _recordCancel(_BroadcastSubscription<T> subscription) { | |
| 558 if (subscription._isFiring) { | |
| 559 subscription._setRemoveAfterFiring(); | |
| 560 } else { | |
| 561 _removeListener(subscription); | |
| 562 // If we are currently firing an event, the empty-check is performed at | |
| 563 // the end of the listener loop instead of here. | |
| 564 if ((_state & _STATE_FIRING) == 0 && _isEmpty) { | |
| 565 _callOnCancel(); | |
| 566 } | |
| 567 } | |
| 568 } | |
| 569 | |
| 570 void _recordPause(StreamSubscription<T> subscription) {} | |
| 571 void _recordResume(StreamSubscription<T> subscription) {} | |
| 572 | |
| 573 // EventSink interface. | |
| 574 | |
| 575 void add(T data) { | |
| 576 if (isClosed) { | |
| 577 throw new StateError("Cannot add new events after calling close()"); | |
| 578 } | |
| 579 _sendData(data); | |
| 580 } | |
| 581 | |
| 582 void addError(Object error, [Object stackTrace]) { | |
| 583 if (isClosed) { | |
| 584 throw new StateError("Cannot add new events after calling close()"); | |
| 585 } | |
| 586 if (stackTrace != null) _attachStackTrace(error, stackTrace); | |
| 587 _sendError(error); | |
| 588 } | |
| 589 | |
| 590 void close() { | |
| 591 if (isClosed) { | |
| 592 throw new StateError("Cannot add new events after calling close()"); | |
| 593 } | |
| 594 _state |= _STATE_CLOSED; | |
| 595 _sendDone(); | |
| 596 } | |
| 597 | |
| 598 void _forEachListener( | |
| 599 void action(_BufferingStreamSubscription<T> subscription)) { | |
| 600 if (_isFiring) { | |
| 601 throw new StateError( | |
| 602 "Cannot fire new event. Controller is already firing an event"); | |
| 603 } | |
| 604 if (_isEmpty) return; | |
| 605 | |
| 606 // Get event id of this event. | |
| 607 int id = (_state & _STATE_EVENT_ID); | |
| 608 // Start firing (set the _STATE_FIRING bit). We don't do [_onCancel] | |
| 609 // callbacks while firing, and we prevent reentrancy of this function. | |
| 610 // | |
| 611 // Set [_state]'s event id to the next event's id. | |
| 612 // Any listeners added while firing this event will expect the next event, | |
| 613 // not this one, and won't get notified. | |
| 614 _state ^= _STATE_EVENT_ID | _STATE_FIRING; | |
| 615 _BroadcastSubscriptionLink link = _next; | |
| 616 while (!identical(link, this)) { | |
| 617 _BroadcastSubscription<T> subscription = link; | |
| 618 if (subscription._expectsEvent(id)) { | |
| 619 subscription._eventState |= _BroadcastSubscription._STATE_FIRING; | |
| 620 action(subscription); | |
| 621 subscription._toggleEventId(); | |
| 622 link = subscription._next; | |
| 623 if (subscription._removeAfterFiring) { | |
| 624 _removeListener(subscription); | |
| 625 } | |
| 626 subscription._eventState &= ~_BroadcastSubscription._STATE_FIRING; | |
| 627 } else { | |
| 628 link = subscription._next; | |
| 629 } | |
| 630 } | |
| 631 _state &= ~_STATE_FIRING; | |
| 632 | |
| 633 if (_isEmpty) { | |
| 634 _callOnCancel(); | |
| 635 } | |
| 636 } | |
| 637 | |
| 638 void _callOnCancel() { | |
| 639 _runGuarded(_onCancel); | |
| 640 } | |
| 641 } | |
| 642 | |
| 643 class _SyncBroadcastStreamController<T> extends _BroadcastStreamController<T> { | |
| 644 _SyncBroadcastStreamController(void onListen(), void onCancel()) | |
| 645 : super(onListen, onCancel); | |
| 646 | |
| 647 // EventDispatch interface. | |
| 648 | |
| 649 void _sendData(T data) { | |
| 650 if (_isEmpty) return; | |
| 651 _forEachListener((_BufferingStreamSubscription<T> subscription) { | |
| 652 subscription._add(data); | |
| 653 }); | |
| 654 } | |
| 655 | |
| 656 void _sendError(Object error) { | |
| 657 if (_isEmpty) return; | |
| 658 _forEachListener((_BufferingStreamSubscription<T> subscription) { | |
| 659 subscription._addError(error); | |
| 660 }); | |
| 661 } | |
| 662 | |
| 663 void _sendDone() { | |
| 664 if (_isEmpty) return; | |
| 665 _forEachListener((_BroadcastSubscription<T> subscription) { | |
| 666 subscription._close(); | |
| 667 subscription._eventState |= | |
| 668 _BroadcastSubscription._STATE_REMOVE_AFTER_FIRING; | |
| 669 }); | |
| 670 } | |
| 671 } | |
| 672 | |
| 673 class _AsyncBroadcastStreamController<T> extends _BroadcastStreamController<T> { | |
| 674 _AsyncBroadcastStreamController(void onListen(), void onCancel()) | |
| 675 : super(onListen, onCancel); | |
| 676 | |
| 677 // EventDispatch interface. | |
| 678 | |
| 679 void _sendData(T data) { | |
| 680 for (_BroadcastSubscriptionLink link = _next; | |
| 681 !identical(link, this); | |
| 682 link = link._next) { | |
| 683 _BroadcastSubscription<T> subscription = link; | |
| 684 subscription._addPending(new _DelayedData(data)); | |
| 685 } | |
| 686 } | |
| 687 | |
| 688 void _sendError(Object error) { | |
| 689 for (_BroadcastSubscriptionLink link = _next; | |
| 690 !identical(link, this); | |
| 691 link = link._next) { | |
| 692 _BroadcastSubscription<T> subscription = link; | |
| 693 subscription._addPending(new _DelayedError(error)); | |
| 694 } | |
| 695 } | |
| 696 | |
| 697 void _sendDone() { | |
| 698 for (_BroadcastSubscriptionLink link = _next; | |
| 699 !identical(link, this); | |
| 700 link = link._next) { | |
| 701 _BroadcastSubscription<T> subscription = link; | |
| 702 subscription._addPending(const _DelayedDone()); | |
| 703 } | 691 } |
| 704 } | 692 } |
| 705 } | 693 } |
| 706 | |
| 707 /** | |
| 708 * Stream controller that is used by [Stream.asBroadcastStream]. | |
| 709 * | |
| 710 * This stream controller allows incoming events while it is firing | |
| 711 * other events. This is handled by delaying the events until the | |
| 712 * current event is done firing, and then fire the pending events. | |
| 713 * | |
| 714 * This class extends [_SyncBroadcastStreamController]. Events of | |
| 715 * an "asBroadcastStream" stream are always initiated by events | |
| 716 * on another stream, and it is fine to forward them synchronously. | |
| 717 */ | |
| 718 class _AsBroadcastStreamController<T> | |
| 719 extends _SyncBroadcastStreamController<T> | |
| 720 implements _EventDispatch<T> { | |
| 721 _StreamImplEvents _pending; | |
| 722 | |
| 723 _AsBroadcastStreamController(void onListen(), void onCancel()) | |
| 724 : super(onListen, onCancel); | |
| 725 | |
| 726 bool get _hasPending => _pending != null && ! _pending.isEmpty; | |
| 727 | |
| 728 void _addPendingEvent(_DelayedEvent event) { | |
| 729 if (_pending == null) { | |
| 730 _pending = new _StreamImplEvents(); | |
| 731 } | |
| 732 _pending.add(event); | |
| 733 } | |
| 734 | |
| 735 void add(T data) { | |
| 736 if (_isFiring) { | |
| 737 _addPendingEvent(new _DelayedData<T>(data)); | |
| 738 return; | |
| 739 } | |
| 740 super.add(data); | |
| 741 while (_hasPending) { | |
| 742 _pending.handleNext(this); | |
| 743 } | |
| 744 } | |
| 745 | |
| 746 void addError(Object error, [StackTrace stackTrace]) { | |
| 747 if (_isFiring) { | |
| 748 _addPendingEvent(new _DelayedError(error)); | |
| 749 return; | |
| 750 } | |
| 751 super.addError(error, stackTrace); | |
| 752 while (_hasPending) { | |
| 753 _pending.handleNext(this); | |
| 754 } | |
| 755 } | |
| 756 | |
| 757 void close() { | |
| 758 if (_isFiring) { | |
| 759 _addPendingEvent(const _DelayedDone()); | |
| 760 _state |= _STATE_CLOSED; | |
| 761 return; | |
| 762 } | |
| 763 super.close(); | |
| 764 assert(!_hasPending); | |
| 765 } | |
| 766 | |
| 767 void _callOnCancel() { | |
| 768 if (_hasPending) { | |
| 769 _pending.clear(); | |
| 770 _pending = null; | |
| 771 } | |
| 772 super._callOnCancel(); | |
| 773 } | |
| 774 } | |
| OLD | NEW |