| OLD | NEW |
| (Empty) | |
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. |
| 4 |
| 5 part of dart.async; |
| 6 |
| 7 class _BroadcastStream<T> extends _StreamImpl<T> { |
| 8 _BroadcastStreamController _controller; |
| 9 |
| 10 _BroadcastStream(this._controller); |
| 11 |
| 12 bool get isBroadcast => true; |
| 13 |
| 14 StreamSubscription<T> _createSubscription( |
| 15 void onData(T data), |
| 16 void onError(Object error), |
| 17 void onDone(), |
| 18 bool cancelOnError) => |
| 19 _controller._subscribe(onData, onError, onDone, cancelOnError); |
| 20 } |
| 21 |
| 22 abstract class _BroadcastSubscriptionLink { |
| 23 _BroadcastSubscriptionLink _next; |
| 24 _BroadcastSubscriptionLink _previous; |
| 25 } |
| 26 |
| 27 class _BroadcastSubscription<T> extends _ControllerSubscription<T> |
| 28 implements _BroadcastSubscriptionLink { |
| 29 static const int _STATE_EVENT_ID = 1; |
| 30 static const int _STATE_FIRING = 2; |
| 31 static const int _STATE_REMOVE_AFTER_FIRING = 4; |
| 32 // TODO(lrn): Use the _state field on _ControllerSubscription to |
| 33 // also store this state. Requires that the subscription implementation |
| 34 // does not assume that it's use of the state integer is the only use. |
| 35 int _eventState; |
| 36 |
| 37 _BroadcastSubscriptionLink _next; |
| 38 _BroadcastSubscriptionLink _previous; |
| 39 |
| 40 _BroadcastSubscription(_StreamControllerLifecycle controller, |
| 41 void onData(T data), |
| 42 void onError(Object error), |
| 43 void onDone(), |
| 44 bool cancelOnError) |
| 45 : super(controller, onData, onError, onDone, cancelOnError) { |
| 46 _next = _previous = this; |
| 47 } |
| 48 |
| 49 _BroadcastStreamController get _controller => super._controller; |
| 50 |
| 51 bool _expectsEvent(int eventId) => |
| 52 (_eventState & _STATE_EVENT_ID) == eventId; |
| 53 |
| 54 |
| 55 void _toggleEventId() { |
| 56 _eventState ^= _STATE_EVENT_ID; |
| 57 } |
| 58 |
| 59 bool get _isFiring => (_eventState & _STATE_FIRING) != 0; |
| 60 |
| 61 bool _setRemoveAfterFiring() { |
| 62 assert(_isFiring); |
| 63 _eventState |= _STATE_REMOVE_AFTER_FIRING; |
| 64 } |
| 65 |
| 66 bool get _removeAfterFiring => |
| 67 (_eventState & _STATE_REMOVE_AFTER_FIRING) != 0; |
| 68 |
| 69 // The controller._recordPause doesn't do anything for a broadcast controller, |
| 70 // so we don't bother calling it. |
| 71 void _onPause() { } |
| 72 |
| 73 // The controller._recordResume doesn't do anything for a broadcast |
| 74 // controller, so we don't bother calling it. |
| 75 void _onResume() { } |
| 76 |
| 77 // _onCancel is inherited. |
| 78 } |
| 79 |
| 80 |
| 81 abstract class _BroadcastStreamController<T> |
| 82 implements StreamController<T>, |
| 83 _StreamControllerLifecycle<T>, |
| 84 _BroadcastSubscriptionLink, |
| 85 _EventSink<T>, |
| 86 _EventDispatch<T> { |
| 87 static const int _STATE_INITIAL = 0; |
| 88 static const int _STATE_EVENT_ID = 1; |
| 89 static const int _STATE_FIRING = 2; |
| 90 static const int _STATE_CLOSED = 4; |
| 91 static const int _STATE_ADDSTREAM = 8; |
| 92 |
| 93 final _NotificationHandler _onListen; |
| 94 final _NotificationHandler _onCancel; |
| 95 |
| 96 // State of the controller. |
| 97 int _state; |
| 98 |
| 99 // Double-linked list of active listeners. |
| 100 _BroadcastSubscriptionLink _next; |
| 101 _BroadcastSubscriptionLink _previous; |
| 102 |
| 103 // Extra state used during an [addStream] call. |
| 104 _AddStreamState<T> _addStreamState; |
| 105 |
| 106 /** |
| 107 * Future returned by [close] and [done]. |
| 108 * |
| 109 * The future is completed whenever the done event has been sent to all |
| 110 * relevant listeners. |
| 111 * The relevant listeners are the ones that were listening when [close] was |
| 112 * called. When all of these have been canceled (sending the done event makes |
| 113 * them cancel, but they can also be canceled before sending the event), |
| 114 * this future completes. |
| 115 * |
| 116 * Any attempt to listen after calling [close] will throw, so there won't |
| 117 * be any further listeners. |
| 118 */ |
| 119 _FutureImpl _doneFuture; |
| 120 |
| 121 _BroadcastStreamController(this._onListen, this._onCancel) |
| 122 : _state = _STATE_INITIAL { |
| 123 _next = _previous = this; |
| 124 } |
| 125 |
| 126 // StreamController interface. |
| 127 |
| 128 Stream<T> get stream => new _BroadcastStream<T>(this); |
| 129 |
| 130 StreamSink<T> get sink => new _StreamSinkWrapper<T>(this); |
| 131 |
| 132 bool get isClosed => (_state & _STATE_CLOSED) != 0; |
| 133 |
| 134 /** |
| 135 * A broadcast controller is never paused. |
| 136 * |
| 137 * Each receiving stream may be paused individually, and they handle their |
| 138 * own buffering. |
| 139 */ |
| 140 bool get isPaused => false; |
| 141 |
| 142 /** Whether there are currently one or more subscribers. */ |
| 143 bool get hasListener => !_isEmpty; |
| 144 |
| 145 /** Whether an event is being fired (sent to some, but not all, listeners). */ |
| 146 bool get _isFiring => (_state & _STATE_FIRING) != 0; |
| 147 |
| 148 bool get _isAddingStream => (_state & _STATE_ADDSTREAM) != 0; |
| 149 |
| 150 bool get _mayAddEvent => (_state < _STATE_CLOSED); |
| 151 |
| 152 _FutureImpl _ensureDoneFuture() { |
| 153 if (_doneFuture != null) return _doneFuture; |
| 154 return _doneFuture = new _FutureImpl(); |
| 155 } |
| 156 |
| 157 // Linked list helpers |
| 158 |
| 159 bool get _isEmpty => identical(_next, this); |
| 160 |
| 161 /** Adds subscription to linked list of active listeners. */ |
| 162 void _addListener(_BroadcastSubscription<T> subscription) { |
| 163 assert(identical(subscription._next, subscription)); |
| 164 // Insert in linked list just before `this`. |
| 165 subscription._previous = _previous; |
| 166 subscription._next = this; |
| 167 this._previous._next = subscription; |
| 168 this._previous = subscription; |
| 169 subscription._eventState = (_state & _STATE_EVENT_ID); |
| 170 } |
| 171 |
| 172 void _removeListener(_BroadcastSubscription<T> subscription) { |
| 173 assert(identical(subscription._controller, this)); |
| 174 assert(!identical(subscription._next, subscription)); |
| 175 _BroadcastSubscriptionLink previous = subscription._previous; |
| 176 _BroadcastSubscriptionLink next = subscription._next; |
| 177 previous._next = next; |
| 178 next._previous = previous; |
| 179 subscription._next = subscription._previous = subscription; |
| 180 } |
| 181 |
| 182 // _StreamControllerLifecycle interface. |
| 183 |
| 184 StreamSubscription<T> _subscribe(void onData(T data), |
| 185 void onError(Object error), |
| 186 void onDone(), |
| 187 bool cancelOnError) { |
| 188 if (isClosed) { |
| 189 throw new StateError("Subscribing to closed stream"); |
| 190 } |
| 191 StreamSubscription subscription = new _BroadcastSubscription<T>( |
| 192 this, onData, onError, onDone, cancelOnError); |
| 193 _addListener(subscription); |
| 194 if (identical(_next, _previous)) { |
| 195 // Only one listener, so it must be the first listener. |
| 196 _runGuarded(_onListen); |
| 197 } |
| 198 return subscription; |
| 199 } |
| 200 |
| 201 void _recordCancel(_BroadcastSubscription<T> subscription) { |
| 202 // If already removed by the stream, don't remove it again. |
| 203 if (identical(subscription._next, subscription)) return; |
| 204 assert(!identical(subscription._next, subscription)); |
| 205 if (subscription._isFiring) { |
| 206 subscription._setRemoveAfterFiring(); |
| 207 } else { |
| 208 assert(!identical(subscription._next, subscription)); |
| 209 _removeListener(subscription); |
| 210 // If we are currently firing an event, the empty-check is performed at |
| 211 // the end of the listener loop instead of here. |
| 212 if (!_isFiring && _isEmpty) { |
| 213 _callOnCancel(); |
| 214 } |
| 215 } |
| 216 } |
| 217 |
| 218 void _recordPause(StreamSubscription<T> subscription) {} |
| 219 void _recordResume(StreamSubscription<T> subscription) {} |
| 220 |
| 221 // EventSink interface. |
| 222 |
| 223 Error _addEventError() { |
| 224 if (isClosed) { |
| 225 return new StateError("Cannot add new events after calling close"); |
| 226 } |
| 227 assert(_isAddingStream); |
| 228 return new StateError("Cannot add new events while doing an addStream"); |
| 229 } |
| 230 |
| 231 void add(T data) { |
| 232 if (!_mayAddEvent) throw _addEventError(); |
| 233 _sendData(data); |
| 234 } |
| 235 |
| 236 void addError(Object error, [Object stackTrace]) { |
| 237 if (!_mayAddEvent) throw _addEventError(); |
| 238 if (stackTrace != null) _attachStackTrace(error, stackTrace); |
| 239 _sendError(error); |
| 240 } |
| 241 |
| 242 Future close() { |
| 243 if (isClosed) { |
| 244 assert(_doneFuture != null); |
| 245 return _doneFuture; |
| 246 } |
| 247 if (!_mayAddEvent) throw _addEventError(); |
| 248 _state |= _STATE_CLOSED; |
| 249 Future doneFuture = _ensureDoneFuture(); |
| 250 _sendDone(); |
| 251 return doneFuture; |
| 252 } |
| 253 |
| 254 Future get done => _ensureDoneFuture(); |
| 255 |
| 256 Future addStream(Stream<T> stream) { |
| 257 if (!_mayAddEvent) throw _addEventError(); |
| 258 _state |= _STATE_ADDSTREAM; |
| 259 _addStreamState = new _AddStreamState(this, stream); |
| 260 return _addStreamState.addStreamFuture; |
| 261 } |
| 262 |
| 263 // _EventSink interface, called from AddStreamState. |
| 264 void _add(T data) { |
| 265 _sendData(data); |
| 266 } |
| 267 |
| 268 void _addError(Object error) { |
| 269 assert(_isAddingStream); |
| 270 _sendError(error); |
| 271 } |
| 272 |
| 273 void _close() { |
| 274 assert(_isAddingStream); |
| 275 _AddStreamState addState = _addStreamState; |
| 276 _addStreamState = null; |
| 277 _state &= ~_STATE_ADDSTREAM; |
| 278 addState.complete(); |
| 279 } |
| 280 |
| 281 // Event handling. |
| 282 void _forEachListener( |
| 283 void action(_BufferingStreamSubscription<T> subscription)) { |
| 284 if (_isFiring) { |
| 285 throw new StateError( |
| 286 "Cannot fire new event. Controller is already firing an event"); |
| 287 } |
| 288 if (_isEmpty) return; |
| 289 |
| 290 // Get event id of this event. |
| 291 int id = (_state & _STATE_EVENT_ID); |
| 292 // Start firing (set the _STATE_FIRING bit). We don't do [_onCancel] |
| 293 // callbacks while firing, and we prevent reentrancy of this function. |
| 294 // |
| 295 // Set [_state]'s event id to the next event's id. |
| 296 // Any listeners added while firing this event will expect the next event, |
| 297 // not this one, and won't get notified. |
| 298 _state ^= _STATE_EVENT_ID | _STATE_FIRING; |
| 299 _BroadcastSubscriptionLink link = _next; |
| 300 while (!identical(link, this)) { |
| 301 _BroadcastSubscription<T> subscription = link; |
| 302 if (subscription._expectsEvent(id)) { |
| 303 subscription._eventState |= _BroadcastSubscription._STATE_FIRING; |
| 304 action(subscription); |
| 305 subscription._toggleEventId(); |
| 306 link = subscription._next; |
| 307 if (subscription._removeAfterFiring) { |
| 308 _removeListener(subscription); |
| 309 } |
| 310 subscription._eventState &= ~_BroadcastSubscription._STATE_FIRING; |
| 311 } else { |
| 312 link = subscription._next; |
| 313 } |
| 314 } |
| 315 _state &= ~_STATE_FIRING; |
| 316 |
| 317 if (_isEmpty) { |
| 318 _callOnCancel(); |
| 319 } |
| 320 } |
| 321 |
| 322 void _callOnCancel() { |
| 323 assert(_isEmpty); |
| 324 if (isClosed && _doneFuture._mayComplete) { |
| 325 // When closed, _doneFuture is not null. |
| 326 _doneFuture._asyncSetValue(null); |
| 327 } |
| 328 _runGuarded(_onCancel); |
| 329 } |
| 330 } |
| 331 |
| 332 class _SyncBroadcastStreamController<T> extends _BroadcastStreamController<T> { |
| 333 _SyncBroadcastStreamController(void onListen(), void onCancel()) |
| 334 : super(onListen, onCancel); |
| 335 |
| 336 // EventDispatch interface. |
| 337 |
| 338 void _sendData(T data) { |
| 339 if (_isEmpty) return; |
| 340 _forEachListener((_BufferingStreamSubscription<T> subscription) { |
| 341 subscription._add(data); |
| 342 }); |
| 343 } |
| 344 |
| 345 void _sendError(Object error) { |
| 346 if (_isEmpty) return; |
| 347 _forEachListener((_BufferingStreamSubscription<T> subscription) { |
| 348 subscription._addError(error); |
| 349 }); |
| 350 } |
| 351 |
| 352 void _sendDone() { |
| 353 if (!_isEmpty) { |
| 354 _forEachListener((_BroadcastSubscription<T> subscription) { |
| 355 subscription._close(); |
| 356 }); |
| 357 } else { |
| 358 assert(_doneFuture != null); |
| 359 assert(_doneFuture._mayComplete); |
| 360 _doneFuture._asyncSetValue(null); |
| 361 } |
| 362 } |
| 363 } |
| 364 |
| 365 class _AsyncBroadcastStreamController<T> extends _BroadcastStreamController<T> { |
| 366 _AsyncBroadcastStreamController(void onListen(), void onCancel()) |
| 367 : super(onListen, onCancel); |
| 368 |
| 369 // EventDispatch interface. |
| 370 |
| 371 void _sendData(T data) { |
| 372 for (_BroadcastSubscriptionLink link = _next; |
| 373 !identical(link, this); |
| 374 link = link._next) { |
| 375 _BroadcastSubscription<T> subscription = link; |
| 376 subscription._addPending(new _DelayedData(data)); |
| 377 } |
| 378 } |
| 379 |
| 380 void _sendError(Object error) { |
| 381 for (_BroadcastSubscriptionLink link = _next; |
| 382 !identical(link, this); |
| 383 link = link._next) { |
| 384 _BroadcastSubscription<T> subscription = link; |
| 385 subscription._addPending(new _DelayedError(error)); |
| 386 } |
| 387 } |
| 388 |
| 389 void _sendDone() { |
| 390 if (!_isEmpty) { |
| 391 for (_BroadcastSubscriptionLink link = _next; |
| 392 !identical(link, this); |
| 393 link = link._next) { |
| 394 _BroadcastSubscription<T> subscription = link; |
| 395 subscription._addPending(const _DelayedDone()); |
| 396 } |
| 397 } else { |
| 398 assert(_doneFuture != null); |
| 399 assert(_doneFuture._mayComplete); |
| 400 _doneFuture._asyncSetValue(null); |
| 401 } |
| 402 } |
| 403 } |
| 404 |
| 405 /** |
| 406 * Stream controller that is used by [Stream.asBroadcastStream]. |
| 407 * |
| 408 * This stream controller allows incoming events while it is firing |
| 409 * other events. This is handled by delaying the events until the |
| 410 * current event is done firing, and then fire the pending events. |
| 411 * |
| 412 * This class extends [_SyncBroadcastStreamController]. Events of |
| 413 * an "asBroadcastStream" stream are always initiated by events |
| 414 * on another stream, and it is fine to forward them synchronously. |
| 415 */ |
| 416 class _AsBroadcastStreamController<T> |
| 417 extends _SyncBroadcastStreamController<T> |
| 418 implements _EventDispatch<T> { |
| 419 _StreamImplEvents _pending; |
| 420 |
| 421 _AsBroadcastStreamController(void onListen(), void onCancel()) |
| 422 : super(onListen, onCancel); |
| 423 |
| 424 bool get _hasPending => _pending != null && ! _pending.isEmpty; |
| 425 |
| 426 void _addPendingEvent(_DelayedEvent event) { |
| 427 if (_pending == null) { |
| 428 _pending = new _StreamImplEvents(); |
| 429 } |
| 430 _pending.add(event); |
| 431 } |
| 432 |
| 433 void add(T data) { |
| 434 if (!isClosed && _isFiring) { |
| 435 _addPendingEvent(new _DelayedData<T>(data)); |
| 436 return; |
| 437 } |
| 438 super.add(data); |
| 439 while (_hasPending) { |
| 440 _pending.handleNext(this); |
| 441 } |
| 442 } |
| 443 |
| 444 void addError(Object error, [StackTrace stackTrace]) { |
| 445 if (!isClosed && _isFiring) { |
| 446 _addPendingEvent(new _DelayedError(error)); |
| 447 return; |
| 448 } |
| 449 super.addError(error, stackTrace); |
| 450 while (_hasPending) { |
| 451 _pending.handleNext(this); |
| 452 } |
| 453 } |
| 454 |
| 455 void close() { |
| 456 if (!isClosed && _isFiring) { |
| 457 _addPendingEvent(const _DelayedDone()); |
| 458 _state |= _STATE_CLOSED; |
| 459 return; |
| 460 } |
| 461 super.close(); |
| 462 assert(!_hasPending); |
| 463 } |
| 464 |
| 465 void _callOnCancel() { |
| 466 if (_hasPending) { |
| 467 _pending.clear(); |
| 468 _pending = null; |
| 469 } |
| 470 super._callOnCancel(); |
| 471 } |
| 472 } |
| 473 |
| 474 // A subscription that never receives any events. |
| 475 // It can simulate pauses, but otherwise does nothing. |
| 476 class _DoneSubscription<T> implements StreamSubscription<T> { |
| 477 int _pauseCount = 0; |
| 478 void onData(void handleData(T data)) {} |
| 479 void onError(void handleErrr(Object error)) {} |
| 480 void onDone(void handleDone()) {} |
| 481 void pause([Future resumeSignal]) { |
| 482 if (resumeSignal != null) resumeSignal.then(_resume); |
| 483 _pauseCount++; |
| 484 } |
| 485 void resume() { _resume(null); } |
| 486 void _resume(_) { |
| 487 if (_pauseCount > 0) _pauseCount--; |
| 488 } |
| 489 void cancel() {} |
| 490 bool get isPaused => _pauseCount > 0; |
| 491 Future asFuture(Object value) => new _FutureImpl(); |
| 492 } |
| OLD | NEW |