| OLD | NEW |
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
| 8 // Controller for creating and adding events to a stream. | 8 // Controller for creating and adding events to a stream. |
| 9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
| 10 | 10 |
| (...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 46 * the stream at all, and won't trigger callbacks. From the controller's point | 46 * the stream at all, and won't trigger callbacks. From the controller's point |
| 47 * of view, the stream is completely inert when has completed. | 47 * of view, the stream is completely inert when has completed. |
| 48 */ | 48 */ |
| 49 abstract class StreamController<T> implements EventSink<T> { | 49 abstract class StreamController<T> implements EventSink<T> { |
| 50 /** The stream that this controller is controlling. */ | 50 /** The stream that this controller is controlling. */ |
| 51 Stream<T> get stream; | 51 Stream<T> get stream; |
| 52 | 52 |
| 53 /** | 53 /** |
| 54 * A controller with a [stream] that supports only one single subscriber. | 54 * A controller with a [stream] that supports only one single subscriber. |
| 55 * | 55 * |
| 56 * If [sync] is true, events may be passed directly to the stream's listener |
| 57 * during an [add], [addError] or [close] call. If [sync] is false, the event |
| 58 * will be passed to the listener at a later time, after the code creating |
| 59 * the event has returned. |
| 60 * |
| 56 * The controller will buffer all incoming events until the subscriber is | 61 * The controller will buffer all incoming events until the subscriber is |
| 57 * registered. | 62 * registered. |
| 58 * | 63 * |
| 59 * The [onPause] function is called when the stream becomes | 64 * The [onPause] function is called when the stream becomes |
| 60 * paused. [onResume] is called when the stream resumed. | 65 * paused. [onResume] is called when the stream resumed. |
| 61 * | 66 * |
| 62 * The [onListen] callback is called when the stream | 67 * The [onListen] callback is called when the stream |
| 63 * receives its listener and [onCancel] when the listener ends | 68 * receives its listener and [onCancel] when the listener ends |
| 64 * its subscription. | 69 * its subscription. |
| 65 * | 70 * |
| 66 * If the stream is canceled before the controller needs new data the | 71 * If the stream is canceled before the controller needs new data the |
| 67 * [onResume] call might not be executed. | 72 * [onResume] call might not be executed. |
| 68 */ | 73 */ |
| 69 factory StreamController({void onListen(), | 74 factory StreamController({void onListen(), |
| 70 void onPause(), | 75 void onPause(), |
| 71 void onResume(), | 76 void onResume(), |
| 72 void onCancel()}) | 77 void onCancel(), |
| 73 => new _StreamControllerImpl<T>(onListen, onPause, onResume, onCancel); | 78 bool sync: false}) |
| 79 => sync |
| 80 ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel) |
| 81 : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel); |
| 74 | 82 |
| 75 /** | 83 /** |
| 76 * A controller where [stream] can be listened to more than once. | 84 * A controller where [stream] can be listened to more than once. |
| 77 * | 85 * |
| 78 * The [Stream] returned by [stream] is a broadcast stream. It can be listened | 86 * The [Stream] returned by [stream] is a broadcast stream. It can be listened |
| 79 * to more than once. | 87 * to more than once. |
| 80 * | 88 * |
| 81 * The controller distributes any events to all currently subscribed | 89 * The controller distributes any events to all currently subscribed |
| 82 * listeners. | 90 * listeners. |
| 83 * It is not allowed to call [add], [addError], or [close] before a previous | 91 * It is not allowed to call [add], [addError], or [close] before a previous |
| 84 * call has returned. | 92 * call has returned. |
| 85 * | 93 * |
| 94 * If [sync] is true, events may be passed directly to the stream's listener |
| 95 * during an [add], [addError] or [close] call. If [sync] is false, the event |
| 96 * will be passed to the listener at a later time, after the code creating |
| 97 * the event has returned. |
| 98 * |
| 86 * Each listener is handled independently, and if they pause, only the pausing | 99 * Each listener is handled independently, and if they pause, only the pausing |
| 87 * listener is affected. A paused listener will buffer events internally until | 100 * listener is affected. A paused listener will buffer events internally until |
| 88 * unpaused or canceled. | 101 * unpaused or canceled. |
| 89 * | 102 * |
| 103 * If [sync] is false, no guarantees are given with regard to when |
| 104 * multiple listeners get the events, except that each listener will get |
| 105 * all events in the correct order. If two events are sent on an async |
| 106 * controller with two listeners, one of the listeners may get both events |
| 107 * before the other listener gets any. |
| 108 * A listener must be subscribed both when the event is initiated (that is, |
| 109 * when [add] is called) and when the event is later delivered, in order to |
| 110 * get the event. |
| 111 * |
| 90 * The [onListen] callback is called when the first listener is subscribed, | 112 * The [onListen] callback is called when the first listener is subscribed, |
| 91 * and the [onCancel] is called when there are no longer any active listeners. | 113 * and the [onCancel] is called when there are no longer any active listeners. |
| 92 * If a listener is added again later, after the [onCancel] was called, | 114 * If a listener is added again later, after the [onCancel] was called, |
| 93 * the [onListen] will be called again. | 115 * the [onListen] will be called again. |
| 94 */ | 116 */ |
| 95 factory StreamController.broadcast({void onListen(), void onCancel()}) { | 117 factory StreamController.broadcast({void onListen(), |
| 96 return new _MultiplexStreamController<T>(onListen, onCancel); | 118 void onCancel(), |
| 119 bool sync: false}) { |
| 120 return sync |
| 121 ? new _SyncBroadcastStreamController<T>(onListen, onCancel) |
| 122 : new _AsyncBroadcastStreamController<T>(onListen, onCancel); |
| 97 } | 123 } |
| 98 | 124 |
| 99 /** | 125 /** |
| 100 * Returns a view of this object that only exposes the [EventSink] interface. | 126 * Returns a view of this object that only exposes the [EventSink] interface. |
| 101 */ | 127 */ |
| 102 EventSink<T> get sink; | 128 EventSink<T> get sink; |
| 103 | 129 |
| 104 /** | 130 /** |
| 105 * Whether the stream is closed for adding more events. | 131 * Whether the stream is closed for adding more events. |
| 106 * | 132 * |
| (...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 140 void _recordPause(StreamSubscription<T> subscription) {} | 166 void _recordPause(StreamSubscription<T> subscription) {} |
| 141 void _recordResume(StreamSubscription<T> subscription) {} | 167 void _recordResume(StreamSubscription<T> subscription) {} |
| 142 void _recordCancel(StreamSubscription<T> subscription) {} | 168 void _recordCancel(StreamSubscription<T> subscription) {} |
| 143 } | 169 } |
| 144 | 170 |
| 145 /** | 171 /** |
| 146 * Default implementation of [StreamController]. | 172 * Default implementation of [StreamController]. |
| 147 * | 173 * |
| 148 * Controls a stream that only supports a single controller. | 174 * Controls a stream that only supports a single controller. |
| 149 */ | 175 */ |
| 150 class _StreamControllerImpl<T> implements StreamController<T>, | 176 abstract class _StreamController<T> implements StreamController<T>, |
| 151 _StreamControllerLifecycle<T> { | 177 _StreamControllerLifecycle<T>, |
| 178 _EventDispatch<T> { |
| 152 static const int _STATE_OPEN = 0; | 179 static const int _STATE_OPEN = 0; |
| 153 static const int _STATE_CANCELLED = 1; | 180 static const int _STATE_CANCELLED = 1; |
| 154 static const int _STATE_CLOSED = 2; | 181 static const int _STATE_CLOSED = 2; |
| 155 | 182 |
| 156 final _NotificationHandler _onListen; | 183 final _NotificationHandler _onListen; |
| 157 final _NotificationHandler _onPause; | 184 final _NotificationHandler _onPause; |
| 158 final _NotificationHandler _onResume; | 185 final _NotificationHandler _onResume; |
| 159 final _NotificationHandler _onCancel; | 186 final _NotificationHandler _onCancel; |
| 160 _StreamImpl<T> _stream; | 187 _StreamImpl<T> _stream; |
| 161 | 188 |
| 162 // An active subscription on the stream, or null if no subscripton is active. | 189 // An active subscription on the stream, or null if no subscripton is active. |
| 163 _ControllerSubscription<T> _subscription; | 190 _ControllerSubscription<T> _subscription; |
| 164 | 191 |
| 165 // Whether we have sent a "done" event. | 192 // Whether we have sent a "done" event. |
| 166 int _state = _STATE_OPEN; | 193 int _state = _STATE_OPEN; |
| 167 | 194 |
| 168 // Events added to the stream before it has an active subscription. | 195 // Events added to the stream before it has an active subscription. |
| 169 _PendingEvents _pendingEvents = null; | 196 _PendingEvents _pendingEvents = null; |
| 170 | 197 |
| 171 _StreamControllerImpl(this._onListen, | 198 _StreamController(this._onListen, |
| 172 this._onPause, | 199 this._onPause, |
| 173 this._onResume, | 200 this._onResume, |
| 174 this._onCancel) { | 201 this._onCancel) { |
| 175 _stream = new _ControllerStream<T>(this); | 202 _stream = new _ControllerStream<T>(this); |
| 176 } | 203 } |
| 177 | 204 |
| 178 Stream<T> get stream => _stream; | 205 Stream<T> get stream => _stream; |
| 179 | 206 |
| 180 /** | 207 /** |
| 181 * Returns a view of this object that only exposes the [EventSink] interface. | 208 * Returns a view of this object that only exposes the [EventSink] interface. |
| 182 */ | 209 */ |
| 183 EventSink<T> get sink => new _EventSinkView<T>(this); | 210 EventSink<T> get sink => new _EventSinkView<T>(this); |
| 184 | 211 |
| (...skipping 10 matching lines...) Expand all Loading... |
| 195 : !_isCancelled; | 222 : !_isCancelled; |
| 196 | 223 |
| 197 bool get hasListener => _subscription != null; | 224 bool get hasListener => _subscription != null; |
| 198 | 225 |
| 199 /** | 226 /** |
| 200 * Send or queue a data event. | 227 * Send or queue a data event. |
| 201 */ | 228 */ |
| 202 void add(T value) { | 229 void add(T value) { |
| 203 if (isClosed) throw new StateError("Adding event after close"); | 230 if (isClosed) throw new StateError("Adding event after close"); |
| 204 if (_subscription != null) { | 231 if (_subscription != null) { |
| 205 _subscription._add(value); | 232 _sendData(value); |
| 206 } else if (!_isCancelled) { | 233 } else if (!_isCancelled) { |
| 207 _addPendingEvent(new _DelayedData<T>(value)); | 234 _addPendingEvent(new _DelayedData<T>(value)); |
| 208 } | 235 } |
| 209 } | 236 } |
| 210 | 237 |
| 211 /** | 238 /** |
| 212 * Send or enqueue an error event. | 239 * Send or enqueue an error event. |
| 213 */ | 240 */ |
| 214 void addError(Object error, [Object stackTrace]) { | 241 void addError(Object error, [Object stackTrace]) { |
| 215 if (isClosed) throw new StateError("Adding event after close"); | 242 if (isClosed) throw new StateError("Adding event after close"); |
| 216 if (stackTrace != null) { | 243 if (stackTrace != null) { |
| 217 // Force stack trace overwrite. Even if the error already contained | 244 // Force stack trace overwrite. Even if the error already contained |
| 218 // a stack trace. | 245 // a stack trace. |
| 219 _attachStackTrace(error, stackTrace); | 246 _attachStackTrace(error, stackTrace); |
| 220 } | 247 } |
| 221 if (_subscription != null) { | 248 if (_subscription != null) { |
| 222 _subscription._addError(error); | 249 _sendError(error); |
| 223 } else if (!_isCancelled) { | 250 } else if (!_isCancelled) { |
| 224 _addPendingEvent(new _DelayedError(error)); | 251 _addPendingEvent(new _DelayedError(error)); |
| 225 } | 252 } |
| 226 } | 253 } |
| 227 | 254 |
| 228 /** | 255 /** |
| 229 * Closes this controller. | 256 * Closes this controller. |
| 230 * | 257 * |
| 231 * After closing, no further events may be added using [add] or [addError]. | 258 * After closing, no further events may be added using [add] or [addError]. |
| 232 * | 259 * |
| 233 * You are allowed to close the controller more than once, but only the first | 260 * You are allowed to close the controller more than once, but only the first |
| 234 * call has any effect. | 261 * call has any effect. |
| 235 * | 262 * |
| 236 * The first time a controller is closed, a "done" event is sent to its | 263 * The first time a controller is closed, a "done" event is sent to its |
| 237 * stream. | 264 * stream. |
| 238 */ | 265 */ |
| 239 void close() { | 266 void close() { |
| 240 if (isClosed) return; | 267 if (isClosed) return; |
| 241 _state |= _STATE_CLOSED; | 268 _state |= _STATE_CLOSED; |
| 242 if (_subscription != null) { | 269 if (_subscription != null) { |
| 243 _subscription._close(); | 270 _sendDone(); |
| 244 } else if (!_isCancelled) { | 271 } else if (!_isCancelled) { |
| 245 _addPendingEvent(const _DelayedDone()); | 272 _addPendingEvent(const _DelayedDone()); |
| 246 } | 273 } |
| 247 } | 274 } |
| 248 | 275 |
| 276 // EventDispatch interface |
| 277 |
| 249 void _addPendingEvent(_DelayedEvent event) { | 278 void _addPendingEvent(_DelayedEvent event) { |
| 250 if (_isCancelled) return; | 279 if (_isCancelled) return; |
| 251 _StreamImplEvents events = _pendingEvents; | 280 _StreamImplEvents events = _pendingEvents; |
| 252 if (events == null) { | 281 if (events == null) { |
| 253 events = _pendingEvents = new _StreamImplEvents(); | 282 events = _pendingEvents = new _StreamImplEvents(); |
| 254 } | 283 } |
| 255 events.add(event); | 284 events.add(event); |
| 256 } | 285 } |
| 257 | 286 |
| 258 void _recordListen(_BufferingStreamSubscription<T> subscription) { | 287 void _recordListen(_BufferingStreamSubscription<T> subscription) { |
| (...skipping 15 matching lines...) Expand all Loading... |
| 274 | 303 |
| 275 void _recordPause(StreamSubscription<T> subscription) { | 304 void _recordPause(StreamSubscription<T> subscription) { |
| 276 _runGuarded(_onPause); | 305 _runGuarded(_onPause); |
| 277 } | 306 } |
| 278 | 307 |
| 279 void _recordResume(StreamSubscription<T> subscription) { | 308 void _recordResume(StreamSubscription<T> subscription) { |
| 280 _runGuarded(_onResume); | 309 _runGuarded(_onResume); |
| 281 } | 310 } |
| 282 } | 311 } |
| 283 | 312 |
| 313 class _SyncStreamController<T> extends _StreamController<T> { |
| 314 _SyncStreamController(void onListen(), |
| 315 void onPause(), |
| 316 void onResume(), |
| 317 void onCancel()) |
| 318 : super(onListen, onPause, onResume, onCancel); |
| 319 |
| 320 void _sendData(T data) { |
| 321 _subscription._add(data); |
| 322 } |
| 323 |
| 324 void _sendError(Object error) { |
| 325 _subscription._addError(error); |
| 326 } |
| 327 |
| 328 void _sendDone() { |
| 329 _subscription._close(); |
| 330 } |
| 331 } |
| 332 |
| 333 class _AsyncStreamController<T> extends _StreamController<T> { |
| 334 _AsyncStreamController(void onListen(), |
| 335 void onPause(), |
| 336 void onResume(), |
| 337 void onCancel()) |
| 338 : super(onListen, onPause, onResume, onCancel); |
| 339 |
| 340 void _sendData(T data) { |
| 341 _subscription._addPending(new _DelayedData(data)); |
| 342 } |
| 343 |
| 344 void _sendError(Object error) { |
| 345 _subscription._addPending(new _DelayedError(error)); |
| 346 } |
| 347 |
| 348 void _sendDone() { |
| 349 _subscription._addPending(const _DelayedDone()); |
| 350 } |
| 351 } |
| 352 |
| 284 typedef void _NotificationHandler(); | 353 typedef void _NotificationHandler(); |
| 285 | 354 |
| 286 void _runGuarded(_NotificationHandler notificationHandler) { | 355 void _runGuarded(_NotificationHandler notificationHandler) { |
| 287 if (notificationHandler == null) return; | 356 if (notificationHandler == null) return; |
| 288 try { | 357 try { |
| 289 notificationHandler(); | 358 notificationHandler(); |
| 290 } catch (e, s) { | 359 } catch (e, s) { |
| 291 _throwDelayed(e, s); | 360 _throwDelayed(e, s); |
| 292 } | 361 } |
| 293 } | 362 } |
| (...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 332 | 401 |
| 333 void _onPause() { | 402 void _onPause() { |
| 334 _controller._recordPause(this); | 403 _controller._recordPause(this); |
| 335 } | 404 } |
| 336 | 405 |
| 337 void _onResume() { | 406 void _onResume() { |
| 338 _controller._recordResume(this); | 407 _controller._recordResume(this); |
| 339 } | 408 } |
| 340 } | 409 } |
| 341 | 410 |
| 342 class _MultiplexStream<T> extends _StreamImpl<T> { | 411 class _BroadcastStream<T> extends _StreamImpl<T> { |
| 343 _MultiplexStreamController _controller; | 412 _BroadcastStreamController _controller; |
| 344 | 413 |
| 345 _MultiplexStream(this._controller); | 414 _BroadcastStream(this._controller); |
| 346 | 415 |
| 347 bool get isBroadcast => true; | 416 bool get isBroadcast => true; |
| 348 | 417 |
| 349 StreamSubscription<T> _createSubscription( | 418 StreamSubscription<T> _createSubscription( |
| 350 void onData(T data), | 419 void onData(T data), |
| 351 void onError(Object error), | 420 void onError(Object error), |
| 352 void onDone(), | 421 void onDone(), |
| 353 bool cancelOnError) { | 422 bool cancelOnError) { |
| 354 return new _MultiplexSubscription<T>( | 423 return new _BroadcastSubscription<T>( |
| 355 _controller, onData, onError, onDone, cancelOnError); | 424 _controller, onData, onError, onDone, cancelOnError); |
| 356 } | 425 } |
| 357 | 426 |
| 358 void _onListen(_BufferingStreamSubscription subscription) { | 427 void _onListen(_BufferingStreamSubscription subscription) { |
| 359 _controller._recordListen(subscription); | 428 _controller._recordListen(subscription); |
| 360 } | 429 } |
| 361 } | 430 } |
| 362 | 431 |
| 363 abstract class _MultiplexSubscriptionLink { | 432 abstract class _BroadcastSubscriptionLink { |
| 364 _MultiplexSubscriptionLink _next; | 433 _BroadcastSubscriptionLink _next; |
| 365 _MultiplexSubscriptionLink _previous; | 434 _BroadcastSubscriptionLink _previous; |
| 366 } | 435 } |
| 367 | 436 |
| 368 class _MultiplexSubscription<T> extends _ControllerSubscription<T> | 437 class _BroadcastSubscription<T> extends _ControllerSubscription<T> |
| 369 implements _MultiplexSubscriptionLink { | 438 implements _BroadcastSubscriptionLink { |
| 370 static const int _STATE_EVENT_ID = 1; | 439 static const int _STATE_EVENT_ID = 1; |
| 371 static const int _STATE_FIRING = 2; | 440 static const int _STATE_FIRING = 2; |
| 372 static const int _STATE_REMOVE_AFTER_FIRING = 4; | 441 static const int _STATE_REMOVE_AFTER_FIRING = 4; |
| 373 int _eventState; | 442 int _eventState; |
| 374 | 443 |
| 375 _MultiplexSubscriptionLink _next; | 444 _BroadcastSubscriptionLink _next; |
| 376 _MultiplexSubscriptionLink _previous; | 445 _BroadcastSubscriptionLink _previous; |
| 377 | 446 |
| 378 _MultiplexSubscription(_StreamControllerLifecycle controller, | 447 _BroadcastSubscription(_StreamControllerLifecycle controller, |
| 379 void onData(T data), | 448 void onData(T data), |
| 380 void onError(Object error), | 449 void onError(Object error), |
| 381 void onDone(), | 450 void onDone(), |
| 382 bool cancelOnError) | 451 bool cancelOnError) |
| 383 : super(controller, onData, onError, onDone, cancelOnError) { | 452 : super(controller, onData, onError, onDone, cancelOnError) { |
| 384 _next = _previous = this; | 453 _next = _previous = this; |
| 385 } | 454 } |
| 386 | 455 |
| 387 _MultiplexStreamController get _controller => super._controller; | 456 _BroadcastStreamController get _controller => super._controller; |
| 388 | 457 |
| 389 bool _expectsEvent(int eventId) { | 458 bool _expectsEvent(int eventId) { |
| 390 return (_eventState & _STATE_EVENT_ID) == eventId; | 459 return (_eventState & _STATE_EVENT_ID) == eventId; |
| 391 } | 460 } |
| 392 | 461 |
| 393 void _toggleEventId() { | 462 void _toggleEventId() { |
| 394 _eventState ^= _STATE_EVENT_ID; | 463 _eventState ^= _STATE_EVENT_ID; |
| 395 } | 464 } |
| 396 | 465 |
| 397 bool get _isFiring => (_eventState & _STATE_FIRING) != 0; | 466 bool get _isFiring => (_eventState & _STATE_FIRING) != 0; |
| 398 | 467 |
| 399 bool _setRemoveAfterFiring() { | 468 bool _setRemoveAfterFiring() { |
| 400 assert(_isFiring); | 469 assert(_isFiring); |
| 401 _eventState |= _STATE_REMOVE_AFTER_FIRING; | 470 _eventState |= _STATE_REMOVE_AFTER_FIRING; |
| 402 } | 471 } |
| 403 | 472 |
| 404 bool get _removeAfterFiring => | 473 bool get _removeAfterFiring => |
| 405 (_eventState & _STATE_REMOVE_AFTER_FIRING) != 0; | 474 (_eventState & _STATE_REMOVE_AFTER_FIRING) != 0; |
| 406 } | 475 } |
| 407 | 476 |
| 408 | 477 |
| 409 class _MultiplexStreamController<T> implements StreamController<T>, | 478 abstract class _BroadcastStreamController<T> |
| 410 _StreamControllerLifecycle<T>, | 479 implements StreamController<T>, |
| 411 _MultiplexSubscriptionLink { | 480 _StreamControllerLifecycle<T>, |
| 481 _BroadcastSubscriptionLink, |
| 482 _EventDispatch<T> { |
| 412 static const int _STATE_INITIAL = 0; | 483 static const int _STATE_INITIAL = 0; |
| 413 static const int _STATE_EVENT_ID = 1; | 484 static const int _STATE_EVENT_ID = 1; |
| 414 static const int _STATE_FIRING = 2; | 485 static const int _STATE_FIRING = 2; |
| 415 static const int _STATE_CLOSED = 4; | 486 static const int _STATE_CLOSED = 4; |
| 416 | 487 |
| 417 final _NotificationHandler _onListen; | 488 final _NotificationHandler _onListen; |
| 418 final _NotificationHandler _onCancel; | 489 final _NotificationHandler _onCancel; |
| 419 | 490 |
| 420 // State of the controller. | 491 // State of the controller. |
| 421 int _state; | 492 int _state; |
| 422 | 493 |
| 423 // Double-linked list of active listeners. | 494 // Double-linked list of active listeners. |
| 424 _MultiplexSubscriptionLink _next; | 495 _BroadcastSubscriptionLink _next; |
| 425 _MultiplexSubscriptionLink _previous; | 496 _BroadcastSubscriptionLink _previous; |
| 426 | 497 |
| 427 _MultiplexStreamController(this._onListen, this._onCancel) | 498 _BroadcastStreamController(this._onListen, this._onCancel) |
| 428 : _state = _STATE_INITIAL { | 499 : _state = _STATE_INITIAL { |
| 429 _next = _previous = this; | 500 _next = _previous = this; |
| 430 } | 501 } |
| 431 | 502 |
| 432 // StreamController interface. | 503 // StreamController interface. |
| 433 | 504 |
| 434 Stream<T> get stream => new _MultiplexStream<T>(this); | 505 Stream<T> get stream => new _BroadcastStream<T>(this); |
| 435 | 506 |
| 436 EventSink<T> get sink => new _EventSinkView<T>(this); | 507 EventSink<T> get sink => new _EventSinkView<T>(this); |
| 437 | 508 |
| 438 bool get isClosed => (_state & _STATE_CLOSED) != 0; | 509 bool get isClosed => (_state & _STATE_CLOSED) != 0; |
| 439 | 510 |
| 440 /** | 511 /** |
| 441 * A multiplex controller is never paused. | 512 * A broadcast controller is never paused. |
| 442 * | 513 * |
| 443 * Each receiving stream may be paused individually, and they handle their | 514 * Each receiving stream may be paused individually, and they handle their |
| 444 * own buffering. | 515 * own buffering. |
| 445 */ | 516 */ |
| 446 bool get isPaused => false; | 517 bool get isPaused => false; |
| 447 | 518 |
| 448 /** Whether there are currently a subscriber on the [Stream]. */ | 519 /** Whether there are currently a subscriber on the [Stream]. */ |
| 449 bool get hasListener => !_isEmpty; | 520 bool get hasListener => !_isEmpty; |
| 450 | 521 |
| 451 /** Whether an event is being fired (sent to some, but not all, listeners). */ | 522 /** Whether an event is being fired (sent to some, but not all, listeners). */ |
| 452 bool get _isFiring => (_state & _STATE_FIRING) != 0; | 523 bool get _isFiring => (_state & _STATE_FIRING) != 0; |
| 453 | 524 |
| 454 // Linked list helpers | 525 // Linked list helpers |
| 455 | 526 |
| 456 bool get _isEmpty => identical(_next, this); | 527 bool get _isEmpty => identical(_next, this); |
| 457 | 528 |
| 458 /** Adds subscription to linked list of active listeners. */ | 529 /** Adds subscription to linked list of active listeners. */ |
| 459 void _addListener(_MultiplexSubscription<T> subscription) { | 530 void _addListener(_BroadcastSubscription<T> subscription) { |
| 460 _MultiplexSubscriptionLink previous = _previous; | 531 _BroadcastSubscriptionLink previous = _previous; |
| 461 previous._next = subscription; | 532 previous._next = subscription; |
| 462 _previous = subscription._previous; | 533 _previous = subscription._previous; |
| 463 subscription._previous._next = this; | 534 subscription._previous._next = this; |
| 464 subscription._previous = previous; | 535 subscription._previous = previous; |
| 465 subscription._eventState = (_state & _STATE_EVENT_ID); | 536 subscription._eventState = (_state & _STATE_EVENT_ID); |
| 466 } | 537 } |
| 467 | 538 |
| 468 void _removeListener(_MultiplexSubscription<T> subscription) { | 539 void _removeListener(_BroadcastSubscription<T> subscription) { |
| 469 assert(identical(subscription._controller, this)); | 540 assert(identical(subscription._controller, this)); |
| 470 assert(!identical(subscription._next, subscription)); | 541 assert(!identical(subscription._next, subscription)); |
| 471 subscription._previous._next = subscription._next; | 542 subscription._previous._next = subscription._next; |
| 472 subscription._next._previous = subscription._previous; | 543 subscription._next._previous = subscription._previous; |
| 473 subscription._next = subscription._previous = subscription; | 544 subscription._next = subscription._previous = subscription; |
| 474 } | 545 } |
| 475 | 546 |
| 476 // _StreamControllerLifecycle interface. | 547 // _StreamControllerLifecycle interface. |
| 477 | 548 |
| 478 void _recordListen(_MultiplexSubscription<T> subscription) { | 549 void _recordListen(_BroadcastSubscription<T> subscription) { |
| 479 _addListener(subscription); | 550 _addListener(subscription); |
| 480 if (identical(_next, _previous)) { | 551 if (identical(_next, _previous)) { |
| 481 // Only one listener, so it must be the first listener. | 552 // Only one listener, so it must be the first listener. |
| 482 _runGuarded(_onListen); | 553 _runGuarded(_onListen); |
| 483 } | 554 } |
| 484 } | 555 } |
| 485 | 556 |
| 486 void _recordCancel(_MultiplexSubscription<T> subscription) { | 557 void _recordCancel(_BroadcastSubscription<T> subscription) { |
| 487 if (subscription._isFiring) { | 558 if (subscription._isFiring) { |
| 488 subscription._setRemoveAfterFiring(); | 559 subscription._setRemoveAfterFiring(); |
| 489 } else { | 560 } else { |
| 490 _removeListener(subscription); | 561 _removeListener(subscription); |
| 491 // If we are currently firing an event, the empty-check is performed at | 562 // If we are currently firing an event, the empty-check is performed at |
| 492 // the end of the listener loop instead of here. | 563 // the end of the listener loop instead of here. |
| 493 if ((_state & _STATE_FIRING) == 0 && _isEmpty) { | 564 if ((_state & _STATE_FIRING) == 0 && _isEmpty) { |
| 494 _callOnCancel(); | 565 _callOnCancel(); |
| 495 } | 566 } |
| 496 } | 567 } |
| (...skipping 20 matching lines...) Expand all Loading... |
| 517 } | 588 } |
| 518 | 589 |
| 519 void close() { | 590 void close() { |
| 520 if (isClosed) { | 591 if (isClosed) { |
| 521 throw new StateError("Cannot add new events after calling close()"); | 592 throw new StateError("Cannot add new events after calling close()"); |
| 522 } | 593 } |
| 523 _state |= _STATE_CLOSED; | 594 _state |= _STATE_CLOSED; |
| 524 _sendDone(); | 595 _sendDone(); |
| 525 } | 596 } |
| 526 | 597 |
| 527 // EventDispatch interface. | |
| 528 | |
| 529 void _sendData(T data) { | |
| 530 if (_isEmpty) return; | |
| 531 _forEachListener((_BufferingStreamSubscription<T> subscription) { | |
| 532 subscription._add(data); | |
| 533 }); | |
| 534 } | |
| 535 | |
| 536 void _sendError(Object error) { | |
| 537 if (_isEmpty) return; | |
| 538 _forEachListener((_BufferingStreamSubscription<T> subscription) { | |
| 539 subscription._addError(error); | |
| 540 }); | |
| 541 } | |
| 542 | |
| 543 void _sendDone() { | |
| 544 if (_isEmpty) return; | |
| 545 _forEachListener((_MultiplexSubscription<T> subscription) { | |
| 546 subscription._close(); | |
| 547 subscription._eventState |= | |
| 548 _MultiplexSubscription._STATE_REMOVE_AFTER_FIRING; | |
| 549 }); | |
| 550 } | |
| 551 | |
| 552 void _forEachListener( | 598 void _forEachListener( |
| 553 void action(_BufferingStreamSubscription<T> subscription)) { | 599 void action(_BufferingStreamSubscription<T> subscription)) { |
| 554 if (_isFiring) { | 600 if (_isFiring) { |
| 555 throw new StateError( | 601 throw new StateError( |
| 556 "Cannot fire new event. Controller is already firing an event"); | 602 "Cannot fire new event. Controller is already firing an event"); |
| 557 } | 603 } |
| 558 if (_isEmpty) return; | 604 if (_isEmpty) return; |
| 559 | 605 |
| 560 // Get event id of this event. | 606 // Get event id of this event. |
| 561 int id = (_state & _STATE_EVENT_ID); | 607 int id = (_state & _STATE_EVENT_ID); |
| 562 // Start firing (set the _STATE_FIRING bit). We don't do [_onCancel] | 608 // Start firing (set the _STATE_FIRING bit). We don't do [_onCancel] |
| 563 // callbacks while firing, and we prevent reentrancy of this function. | 609 // callbacks while firing, and we prevent reentrancy of this function. |
| 564 // | 610 // |
| 565 // Set [_state]'s event id to the next event's id. | 611 // Set [_state]'s event id to the next event's id. |
| 566 // Any listeners added while firing this event will expect the next event, | 612 // Any listeners added while firing this event will expect the next event, |
| 567 // not this one, and won't get notified. | 613 // not this one, and won't get notified. |
| 568 _state ^= _STATE_EVENT_ID | _STATE_FIRING; | 614 _state ^= _STATE_EVENT_ID | _STATE_FIRING; |
| 569 _MultiplexSubscriptionLink link = _next; | 615 _BroadcastSubscriptionLink link = _next; |
| 570 while (!identical(link, this)) { | 616 while (!identical(link, this)) { |
| 571 _MultiplexSubscription<T> subscription = link; | 617 _BroadcastSubscription<T> subscription = link; |
| 572 if (subscription._expectsEvent(id)) { | 618 if (subscription._expectsEvent(id)) { |
| 573 subscription._eventState |= _MultiplexSubscription._STATE_FIRING; | 619 subscription._eventState |= _BroadcastSubscription._STATE_FIRING; |
| 574 action(subscription); | 620 action(subscription); |
| 575 subscription._toggleEventId(); | 621 subscription._toggleEventId(); |
| 576 link = subscription._next; | 622 link = subscription._next; |
| 577 if (subscription._removeAfterFiring) { | 623 if (subscription._removeAfterFiring) { |
| 578 _removeListener(subscription); | 624 _removeListener(subscription); |
| 579 } | 625 } |
| 580 subscription._eventState &= ~_MultiplexSubscription._STATE_FIRING; | 626 subscription._eventState &= ~_BroadcastSubscription._STATE_FIRING; |
| 581 } else { | 627 } else { |
| 582 link = subscription._next; | 628 link = subscription._next; |
| 583 } | 629 } |
| 584 } | 630 } |
| 585 _state &= ~_STATE_FIRING; | 631 _state &= ~_STATE_FIRING; |
| 586 | 632 |
| 587 if (_isEmpty) { | 633 if (_isEmpty) { |
| 588 _callOnCancel(); | 634 _callOnCancel(); |
| 589 } | 635 } |
| 590 } | 636 } |
| 591 | 637 |
| 592 void _callOnCancel() { | 638 void _callOnCancel() { |
| 593 _runGuarded(_onCancel); | 639 _runGuarded(_onCancel); |
| 594 } | 640 } |
| 595 } | 641 } |
| 596 | 642 |
| 597 class _BufferingMultiplexStreamController<T> | 643 class _SyncBroadcastStreamController<T> extends _BroadcastStreamController<T> { |
| 598 extends _MultiplexStreamController<T> | 644 _SyncBroadcastStreamController(void onListen(), void onCancel()) |
| 645 : super(onListen, onCancel); |
| 646 |
| 647 // EventDispatch interface. |
| 648 |
| 649 void _sendData(T data) { |
| 650 if (_isEmpty) return; |
| 651 _forEachListener((_BufferingStreamSubscription<T> subscription) { |
| 652 subscription._add(data); |
| 653 }); |
| 654 } |
| 655 |
| 656 void _sendError(Object error) { |
| 657 if (_isEmpty) return; |
| 658 _forEachListener((_BufferingStreamSubscription<T> subscription) { |
| 659 subscription._addError(error); |
| 660 }); |
| 661 } |
| 662 |
| 663 void _sendDone() { |
| 664 if (_isEmpty) return; |
| 665 _forEachListener((_BroadcastSubscription<T> subscription) { |
| 666 subscription._close(); |
| 667 subscription._eventState |= |
| 668 _BroadcastSubscription._STATE_REMOVE_AFTER_FIRING; |
| 669 }); |
| 670 } |
| 671 } |
| 672 |
| 673 class _AsyncBroadcastStreamController<T> extends _BroadcastStreamController<T> { |
| 674 _AsyncBroadcastStreamController(void onListen(), void onCancel()) |
| 675 : super(onListen, onCancel); |
| 676 |
| 677 // EventDispatch interface. |
| 678 |
| 679 void _sendData(T data) { |
| 680 for (_BroadcastSubscriptionLink link = _next; |
| 681 !identical(link, this); |
| 682 link = link._next) { |
| 683 _BroadcastSubscription<T> subscription = link; |
| 684 subscription._addPending(new _DelayedData(data)); |
| 685 } |
| 686 } |
| 687 |
| 688 void _sendError(Object error) { |
| 689 for (_BroadcastSubscriptionLink link = _next; |
| 690 !identical(link, this); |
| 691 link = link._next) { |
| 692 _BroadcastSubscription<T> subscription = link; |
| 693 subscription._addPending(new _DelayedError(error)); |
| 694 } |
| 695 } |
| 696 |
| 697 void _sendDone() { |
| 698 for (_BroadcastSubscriptionLink link = _next; |
| 699 !identical(link, this); |
| 700 link = link._next) { |
| 701 _BroadcastSubscription<T> subscription = link; |
| 702 subscription._addPending(const _DelayedDone()); |
| 703 } |
| 704 } |
| 705 } |
| 706 |
| 707 /** |
| 708 * Stream controller that is used by [Stream.asBroadcastStream]. |
| 709 * |
| 710 * This stream controller allows incoming events while it is firing |
| 711 * other events. This is handled by delaying the events until the |
| 712 * current event is done firing, and then fire the pending events. |
| 713 * |
| 714 * This class extends [_SyncBroadcastStreamController]. Events of |
| 715 * an "asBroadcastStream" stream are always initiated by events |
| 716 * on another stream, and it is fine to forward them synchronously. |
| 717 */ |
| 718 class _AsBroadcastStreamController<T> |
| 719 extends _SyncBroadcastStreamController<T> |
| 599 implements _EventDispatch<T> { | 720 implements _EventDispatch<T> { |
| 600 _StreamImplEvents _pending; | 721 _StreamImplEvents _pending; |
| 601 | 722 |
| 602 _BufferingMultiplexStreamController(void onListen(), void onCancel()) | 723 _AsBroadcastStreamController(void onListen(), void onCancel()) |
| 603 : super(onListen, onCancel); | 724 : super(onListen, onCancel); |
| 604 | 725 |
| 605 bool get _hasPending => _pending != null && ! _pending.isEmpty; | 726 bool get _hasPending => _pending != null && ! _pending.isEmpty; |
| 606 | 727 |
| 607 void _addPendingEvent(_DelayedEvent event) { | 728 void _addPendingEvent(_DelayedEvent event) { |
| 608 if (_pending == null) { | 729 if (_pending == null) { |
| 609 _pending = new _StreamImplEvents(); | 730 _pending = new _StreamImplEvents(); |
| 610 } | 731 } |
| 611 _pending.add(event); | 732 _pending.add(event); |
| 612 } | 733 } |
| (...skipping 29 matching lines...) Expand all Loading... |
| 642 super.close(); | 763 super.close(); |
| 643 assert(!_hasPending); | 764 assert(!_hasPending); |
| 644 } | 765 } |
| 645 | 766 |
| 646 void _callOnCancel() { | 767 void _callOnCancel() { |
| 647 if (_hasPending) { | 768 if (_hasPending) { |
| 648 _pending.clear(); | 769 _pending.clear(); |
| 649 _pending = null; | 770 _pending = null; |
| 650 } | 771 } |
| 651 super._callOnCancel(); | 772 super._callOnCancel(); |
| 652 | |
| 653 } | 773 } |
| 654 } | 774 } |
| OLD | NEW |