Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | |
| 2 // for details. All rights reserved. Use of this source code is governed by a | |
| 3 // BSD-style license that can be found in the LICENSE file. | |
| 4 | |
| 5 part of dart.async; | |
| 6 | |
| 7 class _BroadcastStream<T> extends _StreamImpl<T> { | |
| 8 _BroadcastStreamController _controller; | |
| 9 | |
| 10 _BroadcastStream(this._controller); | |
| 11 | |
| 12 bool get isBroadcast => true; | |
| 13 | |
| 14 StreamSubscription<T> _createSubscription( | |
| 15 void onData(T data), | |
| 16 void onError(Object error), | |
| 17 void onDone(), | |
| 18 bool cancelOnError) => | |
| 19 _controller._subscribe(onData, onError, onDone, cancelOnError); | |
| 20 } | |
| 21 | |
| 22 abstract class _BroadcastSubscriptionLink { | |
| 23 _BroadcastSubscriptionLink _next; | |
| 24 _BroadcastSubscriptionLink _previous; | |
| 25 } | |
| 26 | |
| 27 class _BroadcastSubscription<T> extends _ControllerSubscription<T> | |
| 28 implements _BroadcastSubscriptionLink { | |
| 29 static const int _STATE_EVENT_ID = 1; | |
| 30 static const int _STATE_FIRING = 2; | |
| 31 static const int _STATE_REMOVE_AFTER_FIRING = 4; | |
| 32 // TODO(lrn): Use the _state field on _ControllerSubscription to | |
| 33 // also store this state. Requires that the subscription implementation | |
| 34 // does not assume that it's use of the state integer is the only use. | |
| 35 int _eventState; | |
| 36 | |
| 37 _BroadcastSubscriptionLink _next; | |
| 38 _BroadcastSubscriptionLink _previous; | |
| 39 | |
| 40 _BroadcastSubscription(_StreamControllerLifecycle controller, | |
| 41 void onData(T data), | |
| 42 void onError(Object error), | |
| 43 void onDone(), | |
| 44 bool cancelOnError) | |
| 45 : super(controller, onData, onError, onDone, cancelOnError) { | |
| 46 _next = _previous = this; | |
| 47 } | |
| 48 | |
| 49 _BroadcastStreamController get _controller => super._controller; | |
| 50 | |
| 51 bool _expectsEvent(int eventId) { | |
|
floitsch
2013/06/27 15:15:19
=> ?
Lasse Reichstein Nielsen
2013/06/28 12:57:38
Will do.
Probably added the block to be able to d
| |
| 52 return (_eventState & _STATE_EVENT_ID) == eventId; | |
| 53 } | |
| 54 | |
| 55 void _toggleEventId() { | |
|
floitsch
2013/06/27 15:15:19
=> ?
Lasse Reichstein Nielsen
2013/06/28 12:57:38
doesn't return a value.
| |
| 56 _eventState ^= _STATE_EVENT_ID; | |
| 57 } | |
| 58 | |
| 59 bool get _isFiring => (_eventState & _STATE_FIRING) != 0; | |
| 60 | |
| 61 bool _setRemoveAfterFiring() { | |
| 62 assert(_isFiring); | |
| 63 _eventState |= _STATE_REMOVE_AFTER_FIRING; | |
| 64 } | |
| 65 | |
| 66 bool get _removeAfterFiring => | |
|
floitsch
2013/06/27 15:15:19
_shouldRemoveAfterFiring
Lasse Reichstein Nielsen
2013/06/28 12:57:38
This is an imperative. It must be removed after fi
| |
| 67 (_eventState & _STATE_REMOVE_AFTER_FIRING) != 0; | |
| 68 | |
| 69 void _onPause() { } | |
| 70 | |
| 71 void _onResume() { } | |
| 72 } | |
|
floitsch
2013/06/27 15:15:19
missing _onCancel() from the _ControllerSubscripti
Lasse Reichstein Nielsen
2013/06/28 12:57:38
It's inherited. We overwrite _onPause and _onResum
| |
| 73 | |
| 74 | |
| 75 abstract class _BroadcastStreamController<T> | |
| 76 implements StreamController<T>, | |
| 77 _StreamControllerLifecycle<T>, | |
| 78 _BroadcastSubscriptionLink, | |
| 79 _EventSink<T>, | |
| 80 _EventDispatch<T> { | |
| 81 static const int _STATE_INITIAL = 0; | |
| 82 static const int _STATE_EVENT_ID = 1; | |
| 83 static const int _STATE_FIRING = 2; | |
| 84 static const int _STATE_CLOSED = 4; | |
| 85 static const int _STATE_ADDSTREAM = 8; | |
| 86 | |
| 87 final _NotificationHandler _onListen; | |
| 88 final _NotificationHandler _onCancel; | |
| 89 | |
| 90 // State of the controller. | |
| 91 int _state; | |
| 92 | |
| 93 // Double-linked list of active listeners. | |
| 94 _BroadcastSubscriptionLink _next; | |
| 95 _BroadcastSubscriptionLink _previous; | |
| 96 | |
| 97 // Extra state used during an [addStream] call. | |
| 98 _AddStreamState<T> _addStreamState; | |
| 99 | |
| 100 /** | |
| 101 * Future returned by [close] and [done]. | |
| 102 * | |
| 103 * The future is completed whenever the done event has been sent to all | |
| 104 * relevant listeners. | |
| 105 * This means when all listeners at the time when the done event was | |
|
floitsch
2013/06/27 15:15:19
bad English sentence.
Lasse Reichstein Nielsen
2013/06/28 12:57:38
Reworded.
| |
| 106 * scheduled have been canceled (sending the done event makes them cancel, | |
| 107 * but they can also be canceled before sending the event). | |
| 108 * | |
| 109 * To make this easier to handle, all listeners added after calling "close" | |
| 110 * will never receive any events, so we don't remember them. That means that | |
| 111 * this future can be completed whenever the controller [isClosed] and | |
| 112 * [hasListener] is false. This is checked in [close] and [_callOnCancel]. | |
| 113 */ | |
| 114 _FutureImpl _doneFuture; | |
| 115 | |
| 116 _BroadcastStreamController(this._onListen, this._onCancel) | |
| 117 : _state = _STATE_INITIAL { | |
| 118 _next = _previous = this; | |
| 119 } | |
| 120 | |
| 121 // StreamController interface. | |
| 122 | |
| 123 Stream<T> get stream => new _BroadcastStream<T>(this); | |
| 124 | |
| 125 StreamSink<T> get sink => new _StreamSinkWrapper<T>(this); | |
| 126 | |
| 127 bool get isClosed => (_state & _STATE_CLOSED) != 0; | |
| 128 | |
| 129 /** | |
| 130 * A broadcast controller is never paused. | |
| 131 * | |
| 132 * Each receiving stream may be paused individually, and they handle their | |
| 133 * own buffering. | |
| 134 */ | |
| 135 bool get isPaused => false; | |
| 136 | |
| 137 /** Whether there are currently one or more subscribers. */ | |
| 138 bool get hasListener => !_isEmpty; | |
| 139 | |
| 140 /** Whether an event is being fired (sent to some, but not all, listeners). */ | |
| 141 bool get _isFiring => (_state & _STATE_FIRING) != 0; | |
| 142 | |
| 143 bool get _isAddingStream => (_state & _STATE_ADDSTREAM) != 0; | |
| 144 | |
| 145 bool get _mayAddEvent => (_state < _STATE_CLOSED); | |
| 146 | |
| 147 _FutureImpl _ensureDoneFuture() { | |
| 148 if (_doneFuture != null) return _doneFuture; | |
| 149 return _doneFuture = new _FutureImpl(); | |
| 150 } | |
| 151 | |
| 152 // Linked list helpers | |
| 153 | |
| 154 bool get _isEmpty => identical(_next, this); | |
| 155 | |
| 156 /** Adds subscription to linked list of active listeners. */ | |
| 157 void _addListener(_BroadcastSubscription<T> subscription) { | |
| 158 _BroadcastSubscriptionLink previous = _previous; | |
| 159 previous._next = subscription; | |
|
floitsch
2013/06/27 15:15:19
needs comments.
Either explain that you want to ac
Lasse Reichstein Nielsen
2013/06/28 12:57:38
Done.
| |
| 160 _previous = subscription._previous; | |
| 161 subscription._previous._next = this; | |
| 162 subscription._previous = previous; | |
| 163 subscription._eventState = (_state & _STATE_EVENT_ID); | |
| 164 } | |
| 165 | |
| 166 void _removeListener(_BroadcastSubscription<T> subscription) { | |
| 167 assert(identical(subscription._controller, this)); | |
| 168 assert(!identical(subscription._next, subscription)); | |
| 169 subscription._previous._next = subscription._next; | |
|
floitsch
2013/06/27 15:15:19
please make this nicer to read:
var prev = sub.pre
Lasse Reichstein Nielsen
2013/06/28 12:57:38
Done.
| |
| 170 subscription._next._previous = subscription._previous; | |
| 171 subscription._next = subscription._previous = subscription; | |
| 172 } | |
| 173 | |
| 174 // _StreamControllerLifecycle interface. | |
| 175 | |
| 176 StreamSubscription<T> _subscribe(void onData(T data), | |
| 177 void onError(Object error), | |
| 178 void onDone(), | |
| 179 bool cancelOnError) { | |
| 180 if (isClosed) { | |
| 181 // No events will ever reach the new subscription, so we don't attach | |
| 182 // it to anything. | |
| 183 return new _DoneSubscription<T>(); | |
|
floitsch
2013/06/27 15:15:19
Let's throw instead.
Lasse Reichstein Nielsen
2013/06/28 12:57:38
Done.
| |
| 184 } | |
| 185 StreamSubscription subscription = new _BroadcastSubscription<T>( | |
| 186 this, onData, onError, onDone, cancelOnError); | |
| 187 _addListener(subscription); | |
| 188 if (identical(_next, _previous)) { | |
| 189 // Only one listener, so it must be the first listener. | |
| 190 _runGuarded(_onListen); | |
| 191 } | |
| 192 return subscription; | |
| 193 } | |
| 194 | |
| 195 void _recordCancel(_BroadcastSubscription<T> subscription) { | |
| 196 // If already removed by the stream, don't remove it again. | |
| 197 if (identical(subscription._next, subscription)) return; | |
| 198 assert(!identical(subscription._next, subscription)); | |
| 199 if (subscription._isFiring) { | |
| 200 subscription._setRemoveAfterFiring(); | |
| 201 } else { | |
| 202 assert(!identical(subscription._next, subscription)); | |
| 203 _removeListener(subscription); | |
| 204 // If we are currently firing an event, the empty-check is performed at | |
| 205 // the end of the listener loop instead of here. | |
| 206 if ((_state & _STATE_FIRING) == 0 && _isEmpty) { | |
|
floitsch
2013/06/27 15:15:19
if (!_isFiring && _isEmpty)
Lasse Reichstein Nielsen
2013/06/28 12:57:38
Done.
| |
| 207 _callOnCancel(); | |
| 208 } | |
| 209 } | |
| 210 } | |
| 211 | |
| 212 void _recordPause(StreamSubscription<T> subscription) {} | |
| 213 void _recordResume(StreamSubscription<T> subscription) {} | |
| 214 | |
| 215 // EventSink interface. | |
| 216 | |
| 217 Error _addEventError() { | |
| 218 if (isClosed) { | |
| 219 return new StateError("Cannot add new events after calling close"); | |
| 220 } | |
| 221 assert(_isAddingStream); | |
| 222 return new StateError("Cannot add new events while doing an addStream"); | |
| 223 } | |
| 224 | |
| 225 void add(T data) { | |
| 226 if (!_mayAddEvent) throw _addEventError(); | |
| 227 _sendData(data); | |
| 228 } | |
| 229 | |
| 230 void addError(Object error, [Object stackTrace]) { | |
| 231 if (!_mayAddEvent) throw _addEventError(); | |
| 232 if (stackTrace != null) _attachStackTrace(error, stackTrace); | |
| 233 _sendError(error); | |
| 234 } | |
| 235 | |
| 236 Future close() { | |
| 237 if (isClosed) { | |
| 238 assert(_doneFuture != null); | |
| 239 return _doneFuture; | |
| 240 } | |
| 241 if (!_mayAddEvent) throw _addEventError(); | |
| 242 _state |= _STATE_CLOSED; | |
| 243 Future doneFuture = _ensureDoneFuture(); | |
| 244 _sendDone(); | |
| 245 return doneFuture; | |
| 246 } | |
| 247 | |
| 248 Future get done => _ensureDoneFuture(); | |
| 249 | |
| 250 Future addStream(Stream<T> stream) { | |
| 251 if (!_mayAddEvent) throw _addEventError(); | |
| 252 _state |= _STATE_ADDSTREAM; | |
| 253 _addStreamState = new _AddStreamState(this, stream); | |
| 254 return _addStreamState.addStreamFuture; | |
| 255 } | |
| 256 | |
| 257 // _EventSink interface, called from AddStramState. | |
|
floitsch
2013/06/27 15:15:19
AddStreamState
Lasse Reichstein Nielsen
2013/06/28 12:57:38
Done.
| |
| 258 void _add(T data) { | |
| 259 _sendData(data); | |
| 260 } | |
| 261 | |
| 262 void _addError(Object error) { | |
| 263 assert(_isAddingStream); | |
|
floitsch
2013/06/27 15:15:19
Why is an error fatal? Isn't it just passed throug
Lasse Reichstein Nielsen
2013/06/28 12:57:38
Let's pass it through, the controller can handle i
| |
| 264 _AddStreamState addState = _addStreamState; | |
| 265 _addStreamState = null; | |
| 266 _state &= ~_STATE_ADDSTREAM; | |
| 267 addState.completeWithError(error); | |
| 268 } | |
| 269 | |
| 270 void _close() { | |
| 271 assert(_isAddingStream); | |
| 272 _AddStreamState addState = _addStreamState; | |
| 273 _addStreamState = null; | |
| 274 _state &= ~_STATE_ADDSTREAM; | |
| 275 addState.complete(); | |
| 276 } | |
| 277 | |
| 278 // Event handling. | |
| 279 void _forEachListener( | |
| 280 void action(_BufferingStreamSubscription<T> subscription)) { | |
| 281 if (_isFiring) { | |
| 282 throw new StateError( | |
| 283 "Cannot fire new event. Controller is already firing an event"); | |
| 284 } | |
| 285 if (_isEmpty) return; | |
| 286 | |
| 287 // Get event id of this event. | |
| 288 int id = (_state & _STATE_EVENT_ID); | |
| 289 // Start firing (set the _STATE_FIRING bit). We don't do [_onCancel] | |
| 290 // callbacks while firing, and we prevent reentrancy of this function. | |
| 291 // | |
| 292 // Set [_state]'s event id to the next event's id. | |
| 293 // Any listeners added while firing this event will expect the next event, | |
| 294 // not this one, and won't get notified. | |
| 295 _state ^= _STATE_EVENT_ID | _STATE_FIRING; | |
| 296 _BroadcastSubscriptionLink link = _next; | |
| 297 while (!identical(link, this)) { | |
| 298 _BroadcastSubscription<T> subscription = link; | |
| 299 if (subscription._expectsEvent(id)) { | |
| 300 subscription._eventState |= _BroadcastSubscription._STATE_FIRING; | |
| 301 action(subscription); | |
| 302 subscription._toggleEventId(); | |
| 303 link = subscription._next; | |
| 304 if (subscription._removeAfterFiring) { | |
| 305 _removeListener(subscription); | |
| 306 } | |
| 307 subscription._eventState &= ~_BroadcastSubscription._STATE_FIRING; | |
| 308 } else { | |
| 309 link = subscription._next; | |
| 310 } | |
| 311 } | |
| 312 _state &= ~_STATE_FIRING; | |
| 313 | |
| 314 if (_isEmpty) { | |
| 315 _callOnCancel(); | |
| 316 } | |
| 317 } | |
| 318 | |
| 319 void _callOnCancel() { | |
| 320 assert(_isEmpty); | |
| 321 if (isClosed && _doneFuture._mayComplete) { | |
| 322 // When closed, _doneFuture is not null. | |
| 323 _doneFuture._asyncSetValue(null); | |
| 324 } | |
| 325 _runGuarded(_onCancel); | |
| 326 } | |
| 327 } | |
| 328 | |
| 329 class _SyncBroadcastStreamController<T> extends _BroadcastStreamController<T> { | |
| 330 _SyncBroadcastStreamController(void onListen(), void onCancel()) | |
| 331 : super(onListen, onCancel); | |
| 332 | |
| 333 // EventDispatch interface. | |
| 334 | |
| 335 void _sendData(T data) { | |
| 336 if (_isEmpty) return; | |
| 337 _forEachListener((_BufferingStreamSubscription<T> subscription) { | |
| 338 subscription._add(data); | |
| 339 }); | |
| 340 } | |
| 341 | |
| 342 void _sendError(Object error) { | |
| 343 if (_isEmpty) return; | |
| 344 _forEachListener((_BufferingStreamSubscription<T> subscription) { | |
| 345 subscription._addError(error); | |
| 346 }); | |
| 347 } | |
| 348 | |
| 349 void _sendDone() { | |
| 350 if (!_isEmpty) { | |
| 351 _forEachListener((_BroadcastSubscription<T> subscription) { | |
| 352 subscription._close(); | |
| 353 }); | |
| 354 } else { | |
| 355 assert(_doneFuture != null); | |
| 356 assert(_doneFuture._mayComplete); | |
| 357 _doneFuture._asyncSetValue(null); | |
| 358 } | |
| 359 } | |
| 360 } | |
| 361 | |
| 362 class _AsyncBroadcastStreamController<T> extends _BroadcastStreamController<T> { | |
| 363 _AsyncBroadcastStreamController(void onListen(), void onCancel()) | |
| 364 : super(onListen, onCancel); | |
| 365 | |
| 366 // EventDispatch interface. | |
| 367 | |
| 368 void _sendData(T data) { | |
| 369 for (_BroadcastSubscriptionLink link = _next; | |
| 370 !identical(link, this); | |
| 371 link = link._next) { | |
| 372 _BroadcastSubscription<T> subscription = link; | |
| 373 subscription._addPending(new _DelayedData(data)); | |
| 374 } | |
| 375 } | |
| 376 | |
| 377 void _sendError(Object error) { | |
| 378 for (_BroadcastSubscriptionLink link = _next; | |
| 379 !identical(link, this); | |
| 380 link = link._next) { | |
| 381 _BroadcastSubscription<T> subscription = link; | |
| 382 subscription._addPending(new _DelayedError(error)); | |
| 383 } | |
| 384 } | |
| 385 | |
| 386 void _sendDone() { | |
| 387 if (!_isEmpty) { | |
| 388 for (_BroadcastSubscriptionLink link = _next; | |
| 389 !identical(link, this); | |
| 390 link = link._next) { | |
| 391 _BroadcastSubscription<T> subscription = link; | |
| 392 subscription._addPending(const _DelayedDone()); | |
| 393 } | |
| 394 } else { | |
| 395 assert(_doneFuture != null); | |
| 396 assert(_doneFuture._mayComplete); | |
| 397 _doneFuture._asyncSetValue(null); | |
| 398 } | |
| 399 } | |
| 400 } | |
| 401 | |
| 402 /** | |
| 403 * Stream controller that is used by [Stream.asBroadcastStream]. | |
| 404 * | |
| 405 * This stream controller allows incoming events while it is firing | |
| 406 * other events. This is handled by delaying the events until the | |
| 407 * current event is done firing, and then fire the pending events. | |
| 408 * | |
| 409 * This class extends [_SyncBroadcastStreamController]. Events of | |
| 410 * an "asBroadcastStream" stream are always initiated by events | |
| 411 * on another stream, and it is fine to forward them synchronously. | |
| 412 */ | |
| 413 class _AsBroadcastStreamController<T> | |
| 414 extends _SyncBroadcastStreamController<T> | |
| 415 implements _EventDispatch<T> { | |
| 416 _StreamImplEvents _pending; | |
| 417 | |
| 418 _AsBroadcastStreamController(void onListen(), void onCancel()) | |
| 419 : super(onListen, onCancel); | |
| 420 | |
| 421 bool get _hasPending => _pending != null && ! _pending.isEmpty; | |
| 422 | |
| 423 void _addPendingEvent(_DelayedEvent event) { | |
| 424 if (_pending == null) { | |
| 425 _pending = new _StreamImplEvents(); | |
| 426 } | |
| 427 _pending.add(event); | |
| 428 } | |
| 429 | |
| 430 void add(T data) { | |
| 431 if (!isClosed && _isFiring) { | |
| 432 _addPendingEvent(new _DelayedData<T>(data)); | |
| 433 return; | |
| 434 } | |
| 435 super.add(data); | |
| 436 while (_hasPending) { | |
| 437 _pending.handleNext(this); | |
| 438 } | |
| 439 } | |
| 440 | |
| 441 void addError(Object error, [StackTrace stackTrace]) { | |
| 442 if (!isClosed && _isFiring) { | |
| 443 _addPendingEvent(new _DelayedError(error)); | |
| 444 return; | |
| 445 } | |
| 446 super.addError(error, stackTrace); | |
| 447 while (_hasPending) { | |
| 448 _pending.handleNext(this); | |
| 449 } | |
| 450 } | |
| 451 | |
| 452 void close() { | |
| 453 if (!isClosed && _isFiring) { | |
| 454 _addPendingEvent(const _DelayedDone()); | |
| 455 _state |= _STATE_CLOSED; | |
| 456 return; | |
| 457 } | |
| 458 super.close(); | |
| 459 assert(!_hasPending); | |
| 460 } | |
| 461 | |
| 462 void _callOnCancel() { | |
| 463 if (_hasPending) { | |
| 464 _pending.clear(); | |
| 465 _pending = null; | |
| 466 } | |
| 467 super._callOnCancel(); | |
| 468 } | |
| 469 } | |
| 470 | |
| 471 // A subscription that never receives any events. | |
| 472 // It can simulate pauses, but otherwise does nothing. | |
| 473 class _DoneSubscription<T> implements StreamSubscription<T> { | |
| 474 int _pauseCount = 0; | |
| 475 void onData(void handleData(T data)) {} | |
| 476 void onError(void handleErrr(Object error)) {} | |
| 477 void onDone(void handleDone()) {} | |
| 478 void pause([Future resumeSignal]) { | |
| 479 if (resumeSignal != null) resumeSignal.then(_resume); | |
| 480 _pauseCount++; | |
| 481 } | |
| 482 void resume() { _resume(null); } | |
| 483 void _resume(_) { | |
| 484 if (_pauseCount > 0) _pauseCount--; | |
| 485 } | |
| 486 void cancel() {} | |
| 487 bool get isPaused => _pauseCount > 0; | |
| 488 Future asFuture(Object value) => new _FutureImpl(); | |
| 489 } | |
| OLD | NEW |