OLD | NEW |
(Empty) | |
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. |
| 4 |
| 5 // part of dart.async; |
| 6 |
| 7 // States shared by single/multi stream implementations. |
| 8 |
| 9 /// Initial and default state where the stream can receive and send events. |
| 10 const int _STREAM_OPEN = 0; |
| 11 /// The stream has received a request to complete, but hasn't done so yet. |
| 12 /// No further events can be aded to the stream. |
| 13 const int _STREAM_CLOSED = 1; |
| 14 /// The stream has completed and will no longer receive or send events. |
| 15 /// Also counts as closed. The stream must not be paused when it's completed. |
| 16 /// Always used in conjunction with [_STREAM_CLOSED]. |
| 17 const int _STREAM_COMPLETE = 2; |
| 18 /// Bit that alternates between events, and listeners are updated to the |
| 19 /// current value when they are notified of the event. |
| 20 const int _STREAM_EVENT_ID = 4; |
| 21 const int _STREAM_EVENT_ID_SHIFT = 2; |
| 22 /// Bit set while firing and clear while not. |
| 23 const int _STREAM_FIRING = 8; |
| 24 /// The count of times a stream has paused is stored in the |
| 25 /// state, shifted by this amount. |
| 26 const int _STREAM_PAUSE_COUNT_SHIFT = 4; |
| 27 |
| 28 // States for listeners. |
| 29 |
| 30 /// The listener is currently not subscribed to its source stream. |
| 31 const int _LISTENER_UNSUBSCRIBED = 0; |
| 32 /// The listener is actively subscribed to its source stream. |
| 33 const int _LISTENER_SUBSCRIBED = 1; |
| 34 /// The listener is subscribed until it has been notified of the current event. |
| 35 /// This flag bit is always used in conjuction with [_LISTENER_SUBSCRIBED]. |
| 36 const int _LISTENER_PENDING_UNSUBSCRIBE = 2; |
| 37 /// Bit that contains the last sent event's "id bit". |
| 38 const int _LISTENER_EVENT_ID = 4; |
| 39 const int _LISTENER_EVENT_ID_SHIFT = 2; |
| 40 /// The count of times a listener has paused is stored in the |
| 41 /// state, shifted by this amount. |
| 42 const int _LISTENER_PAUSE_COUNT_SHIFT = 3; |
| 43 |
| 44 |
| 45 // ------------------------------------------------------------------- |
| 46 // Common base class for single and multi-subscription streams. |
| 47 // ------------------------------------------------------------------- |
| 48 abstract class _StreamImpl<T> extends Stream<T> { |
| 49 /** Current state of the stream. */ |
| 50 int _state = _STREAM_OPEN; |
| 51 |
| 52 /** |
| 53 * List of pending events. |
| 54 * |
| 55 * If events are added to the stream (using [_add], [_signalError] or [_done]) |
| 56 * while the stream is paused, or while another event is firing, events will |
| 57 * stored here. |
| 58 * Also supports scheduling the events for later execution. |
| 59 */ |
| 60 _StreamImplEvents _pendingEvents; |
| 61 |
| 62 // ------------------------------------------------------------------ |
| 63 // Stream interface. |
| 64 |
| 65 StreamSubscription listen(void onData(T data), |
| 66 { void onError(AsyncError error), |
| 67 void onDone(), |
| 68 bool unsubscribeOnError }) { |
| 69 if (_isComplete) { |
| 70 return new _DoneSubscription(onDone); |
| 71 } |
| 72 if (onData == null) onData = _nullDataHandler; |
| 73 if (onError == null) onError = _nullErrorHandler; |
| 74 if (onDone == null) onDone = _nullDoneHandler; |
| 75 unsubscribeOnError = identical(true, unsubscribeOnError); |
| 76 _StreamListener subscription = |
| 77 _createSubscription(onData, onError, onDone, unsubscribeOnError); |
| 78 _addListener(subscription); |
| 79 return subscription; |
| 80 } |
| 81 |
| 82 // ------------------------------------------------------------------ |
| 83 // StreamSink interface-like methods for sending events into the stream. |
| 84 // It's the responsibility of the caller to ensure that the stream is not |
| 85 // paused when adding events. If the stream is paused, the events will be |
| 86 // queued, but it's better to not send events at all. |
| 87 |
| 88 /** |
| 89 * Send or queue a data event. |
| 90 */ |
| 91 void _add(T value) { |
| 92 if (_isClosed) throw new StateError("Sending on closed stream"); |
| 93 if (!_canFireEvent) { |
| 94 _addPendingEvent(new _DelayedData<T>(value)); |
| 95 return; |
| 96 } |
| 97 _sendData(value); |
| 98 _handlePendingEvents(); |
| 99 } |
| 100 |
| 101 /** |
| 102 * Send or enqueue an error event. |
| 103 * |
| 104 * If a subscription has requested to be unsubscribed on errors, |
| 105 * it will be unsubscribed after receiving this event. |
| 106 */ |
| 107 void _signalError(AsyncError error) { |
| 108 if (_isClosed) throw new StateError("Sending on closed stream"); |
| 109 if (!_canFireEvent) { |
| 110 _addPendingEvent(new _DelayedError(error)); |
| 111 return; |
| 112 } |
| 113 _sendError(error); |
| 114 _handlePendingEvents(); |
| 115 } |
| 116 |
| 117 /** |
| 118 * Send or enqueue a "done" message. |
| 119 * |
| 120 * The "done" message should be sent at most once by a stream, and it |
| 121 * should be the last message sent. |
| 122 */ |
| 123 void _close() { |
| 124 if (_isClosed) throw new StateError("Sending on closed stream"); |
| 125 _state |= _STREAM_CLOSED; |
| 126 if (!_canFireEvent) { |
| 127 // You can't enqueue an event after the Done, so make it const. |
| 128 _addPendingEvent(const _DelayedDone()); |
| 129 return; |
| 130 } |
| 131 _sendDone(); |
| 132 assert(!_hasPendingEvent); |
| 133 } |
| 134 |
| 135 // ------------------------------------------------------------------- |
| 136 // Internal implementation. |
| 137 |
| 138 // State prediates. |
| 139 |
| 140 /** Whether the stream has been closed (a done event requested). */ |
| 141 bool get _isClosed => (_state & _STREAM_CLOSED) != 0; |
| 142 |
| 143 /** Whether the stream is completed. */ |
| 144 bool get _isComplete => (_state & _STREAM_COMPLETE) != 0; |
| 145 |
| 146 /** Whether one or more active subscribers have requested a pause. */ |
| 147 bool get _isPaused => _state >= (1 << _STREAM_PAUSE_COUNT_SHIFT); |
| 148 |
| 149 /** Check whether the pending event queue is non-empty */ |
| 150 bool get _hasPendingEvent => |
| 151 _pendingEvents != null && !_pendingEvents.isEmpty; |
| 152 |
| 153 /** Whether we are currently firing an event. */ |
| 154 bool get _isFiring => (_state & _STREAM_FIRING) != 0; |
| 155 |
| 156 int get _currentEventIdBit => |
| 157 (_state & _STREAM_EVENT_ID ) >> _STREAM_EVENT_ID_SHIFT; |
| 158 |
| 159 /** Whether there is currently a subscriber on this [Stream]. */ |
| 160 bool get _hasSubscribers; |
| 161 |
| 162 /** Whether the stream can fire a new event. */ |
| 163 bool get _canFireEvent => !_isFiring && !_isPaused && !_hasPendingEvent; |
| 164 |
| 165 // State modification. |
| 166 |
| 167 /** Record an increases in the number of times the listener has paused. */ |
| 168 void _incrementPauseCount(_StreamListener<T> listener) { |
| 169 listener._incrementPauseCount(); |
| 170 _updatePauseCount(1); |
| 171 } |
| 172 |
| 173 /** Record a decrease in the number of times the listener has paused. */ |
| 174 void _decrementPauseCount(_StreamListener<T> listener) { |
| 175 assert(_isPaused); |
| 176 listener._decrementPauseCount(); |
| 177 _updatePauseCount(-1); |
| 178 } |
| 179 |
| 180 /** Update the stream's own pause count only. */ |
| 181 void _updatePauseCount(int by) { |
| 182 _state += by << _STREAM_PAUSE_COUNT_SHIFT; |
| 183 assert(_state >= 0); |
| 184 } |
| 185 |
| 186 void _setClosed() { |
| 187 assert(!_isClosed); |
| 188 _state |= _STREAM_CLOSED; |
| 189 } |
| 190 |
| 191 void _setComplete() { |
| 192 assert(_isClosed); |
| 193 _state = _state |_STREAM_COMPLETE; |
| 194 } |
| 195 |
| 196 void _startFiring() { |
| 197 assert(!_isFiring); |
| 198 // This sets the _STREAM_FIRING bit and toggles the _STREAM_EVENT_ID |
| 199 // bit. All current subscribers will now have a _LISTENER_EVENT_ID |
| 200 // that doesn't match _STREAM_EVENT_ID, and they will receive the |
| 201 // event being fired. |
| 202 _state ^= _STREAM_FIRING | _STREAM_EVENT_ID; |
| 203 } |
| 204 |
| 205 void _endFiring() { |
| 206 assert(_isFiring); |
| 207 _state ^= _STREAM_FIRING; |
| 208 } |
| 209 |
| 210 /** |
| 211 * Record that a listener wants a pause from events. |
| 212 * |
| 213 * This methods is called from [_StreamListener.pause()]. |
| 214 * Subclasses can override this method, along with [isPaused] and |
| 215 * [createSubscription], if they want to do a different handling of paused |
| 216 * subscriptions, e.g., a filtering stream pausing its own source if all its |
| 217 * subscribers are paused. |
| 218 */ |
| 219 void _pause(_StreamListener<T> listener, Signal resumeSignal) { |
| 220 assert(identical(listener._source, this)); |
| 221 if (!listener._isSubscribed) { |
| 222 throw new StateError("Subscription has been canceled."); |
| 223 } |
| 224 assert(!_isComplete); // There can be no subscribers when complete. |
| 225 bool wasPaused = _isPaused; |
| 226 _incrementPauseCount(listener); |
| 227 if (resumeSignal != null) { |
| 228 resumeSignal.then(() { this._resume(listener, true); }); |
| 229 } |
| 230 if (!wasPaused) { |
| 231 _onPauseStateChange(); |
| 232 } |
| 233 } |
| 234 |
| 235 /** Stops pausing due to one request from the given listener. */ |
| 236 void _resume(_StreamListener<T> listener, bool fromEvent) { |
| 237 if (!listener.isPaused) return; |
| 238 assert(listener._isSubscribed); |
| 239 assert(_isPaused); |
| 240 _decrementPauseCount(listener); |
| 241 if (!_isPaused) { |
| 242 _onPauseStateChange(); |
| 243 if (_hasPendingEvent) { |
| 244 // If we can fire events now, fire any pending events right away. |
| 245 if (fromEvent && !_isFiring) { |
| 246 _handlePendingEvents(); |
| 247 } else { |
| 248 _pendingEvents.schedule(this); |
| 249 } |
| 250 } |
| 251 } |
| 252 } |
| 253 |
| 254 /** Create a subscription object. Called by [subcribe]. */ |
| 255 _StreamSubscriptionImpl<T> _createSubscription( |
| 256 void onData(T data), |
| 257 void onError(AsyncError error), |
| 258 void onDone(), |
| 259 bool unsubscribeOnError); |
| 260 |
| 261 /** |
| 262 * Adds a listener to this stream. |
| 263 */ |
| 264 void _addListener(_StreamSubscriptionImpl subscription); |
| 265 |
| 266 /** |
| 267 * Handle a cancel requested from a [_StreamSubscriptionImpl]. |
| 268 * |
| 269 * This method is called from [_StreamSubscriptionImpl.cancel]. |
| 270 * |
| 271 * If an event is currently firing, the cancel is delayed |
| 272 * until after the subscribers have received the event. |
| 273 */ |
| 274 void _cancel(_StreamSubscriptionImpl subscriber); |
| 275 |
| 276 /** |
| 277 * Iterate over all current subscribers and perform an action on each. |
| 278 * |
| 279 * Subscribers added during the iteration will not be visited. |
| 280 * Subscribers unsubscribed during the iteration will only be removed |
| 281 * after they have been acted on. |
| 282 * |
| 283 * Any change in the pause state is only reported after all subscribers have |
| 284 * received the event. |
| 285 * |
| 286 * The [action] must not throw, or the controller will be left in an |
| 287 * invalid state. |
| 288 * |
| 289 * This method must not be called while [isFiring] is true. |
| 290 */ |
| 291 void _forEachSubscriber(void action(_StreamSubscriptionImpl<T> subscription)); |
| 292 |
| 293 /** |
| 294 * Called when the first subscriber requests a pause or the last a resume. |
| 295 * |
| 296 * Read [isPaused] to see the new state. |
| 297 */ |
| 298 void _onPauseStateChange() {} |
| 299 |
| 300 /** |
| 301 * Called when the first listener subscribes or the last unsubscribes. |
| 302 * |
| 303 * Read [hasSubscribers] to see what the new state is. |
| 304 */ |
| 305 void _onSubscriptionStateChange() {} |
| 306 |
| 307 /** Add a pending event at the end of the pending event queue. */ |
| 308 void _addPendingEvent(_DelayedEvent event) { |
| 309 if (_pendingEvents == null) _pendingEvents = new _StreamImplEvents(); |
| 310 _pendingEvents.add(event); |
| 311 } |
| 312 |
| 313 /** Fire any pending events until the pending event queue. */ |
| 314 void _handlePendingEvents() { |
| 315 _StreamImplEvents events = _pendingEvents; |
| 316 if (events == null) return; |
| 317 while (!events.isEmpty && !_isPaused) { |
| 318 events.removeFirst().perform(this); |
| 319 } |
| 320 } |
| 321 |
| 322 /** |
| 323 * Send a data event directly to each subscriber. |
| 324 */ |
| 325 _sendData(T value) { |
| 326 assert(!_isPaused); |
| 327 assert(!_isComplete); |
| 328 _forEachSubscriber((subscriber) { |
| 329 try { |
| 330 subscriber._sendData(value); |
| 331 } catch (e, s) { |
| 332 new AsyncError(e, s).throwDelayed(); |
| 333 } |
| 334 }); |
| 335 } |
| 336 |
| 337 /** |
| 338 * Sends an error event directly to each subscriber. |
| 339 */ |
| 340 void _sendError(AsyncError error) { |
| 341 assert(!_isPaused); |
| 342 assert(!_isComplete); |
| 343 _forEachSubscriber((subscriber) { |
| 344 try { |
| 345 subscriber._sendError(error); |
| 346 } catch (e, s) { |
| 347 new AsyncError.withCause(e, s, error).throwDelayed(); |
| 348 } |
| 349 }); |
| 350 } |
| 351 |
| 352 /** |
| 353 * Sends the "done" message directly to each subscriber. |
| 354 * This automatically stops further subscription and |
| 355 * unsubscribes all subscribers. |
| 356 */ |
| 357 void _sendDone() { |
| 358 assert(!_isPaused); |
| 359 assert(_isClosed); |
| 360 _setComplete(); |
| 361 if (!_hasSubscribers) return; |
| 362 _forEachSubscriber((subscriber) { |
| 363 _cancel(subscriber); |
| 364 try { |
| 365 subscriber._sendDone(); |
| 366 } catch (e, s) { |
| 367 new AsyncError(e, s).throwDelayed(); |
| 368 } |
| 369 }); |
| 370 assert(!_hasSubscribers); |
| 371 _onSubscriptionStateChange(); |
| 372 } |
| 373 } |
| 374 |
| 375 // ------------------------------------------------------------------- |
| 376 // Default implementation of a stream with a single subscriber. |
| 377 // ------------------------------------------------------------------- |
| 378 /** |
| 379 * Default implementation of stream capable of sending events to one subscriber. |
| 380 * |
| 381 * Any class needing to implement [Stream] can either directly extend this |
| 382 * class, or extend [Stream] and delegate the subscribe method to an instance |
| 383 * of this class. |
| 384 * |
| 385 * The only public methods are those of [Stream], so instances of |
| 386 * [_SingleStreamImpl] can be returned directly as a [Stream] without exposing |
| 387 * internal functionality. |
| 388 * |
| 389 * The [StreamController] is a public facing version of this class, with |
| 390 * some methods made public. |
| 391 * |
| 392 * The user interface of [_SingleStreamImpl] are the following methods: |
| 393 * * [_add]: Add a data event to the stream. |
| 394 * * [_signalError]: Add an error event to the stream. |
| 395 * * [_close]: Request to close the stream. |
| 396 * * [_onSubscriberStateChange]: Called when receiving the first subscriber or |
| 397 * when losing the last subscriber. |
| 398 * * [_onPauseStateChange]: Called when entering or leaving paused mode. |
| 399 * * [_hasSubscribers]: Test whether there are currently any subscribers. |
| 400 * * [_isPaused]: Test whether the stream is currently paused. |
| 401 * The user should not add new events while the stream is paused, but if it |
| 402 * happens anyway, the stream will enqueue the events just as when new events |
| 403 * arrive while still firing an old event. |
| 404 */ |
| 405 class _SingleStreamImpl<T> extends _StreamImpl<T> { |
| 406 _StreamSubscriptionImpl _subscriber = null; |
| 407 |
| 408 /** Whether one or more active subscribers have requested a pause. */ |
| 409 bool get _isPaused => !_hasSubscribers || super._isPaused; |
| 410 |
| 411 /** Whether there is currently a subscriber on this [Stream]. */ |
| 412 bool get _hasSubscribers => _subscriber != null; |
| 413 |
| 414 // ------------------------------------------------------------------- |
| 415 // Internal implementation. |
| 416 |
| 417 /** |
| 418 * Create the new subscription object. |
| 419 */ |
| 420 _StreamSubscriptionImpl<T> _createSubscription( |
| 421 void onData(T data), |
| 422 void onError(AsyncError error), |
| 423 void onDone(), |
| 424 bool unsubscribeOnError) { |
| 425 return new _StreamSubscriptionImpl<T>( |
| 426 this, onData, onError, onDone, unsubscribeOnError); |
| 427 } |
| 428 |
| 429 void _addListener(_StreamSubscriptionImpl subscription) { |
| 430 if (_hasSubscribers) { |
| 431 throw new StateError("Stream has already subscriber."); |
| 432 } |
| 433 _subscriber = subscription; |
| 434 subscription._setSubscribed(0); |
| 435 _onSubscriptionStateChange(); |
| 436 // TODO(floitsch): Should this be delayed? |
| 437 _handlePendingEvents(); |
| 438 } |
| 439 |
| 440 /** |
| 441 * Handle a cancel requested from a [_StreamSubscriptionImpl]. |
| 442 * |
| 443 * This method is called from [_StreamSubscriptionImpl.cancel]. |
| 444 * |
| 445 * If an event is currently firing, the cancel is delayed |
| 446 * until after the subscriber has received the event. |
| 447 */ |
| 448 void _cancel(_StreamSubscriptionImpl subscriber) { |
| 449 assert(identical(subscriber._source, this)); |
| 450 // We allow unsubscribing the currently firing subscription during |
| 451 // the event firing, because it is indistinguishable from delaying it since |
| 452 // that event has already received the event. |
| 453 if (!identical(_subscriber, subscriber)) { |
| 454 // You may unsubscribe more than once, only the first one counts. |
| 455 return; |
| 456 } |
| 457 _subscriber = null; |
| 458 int timesPaused = subscriber._setUnsubscribed(); |
| 459 _updatePauseCount(-timesPaused); |
| 460 if (timesPaused > 0) { |
| 461 _onPauseStateChange(); |
| 462 } |
| 463 _onSubscriptionStateChange(); |
| 464 } |
| 465 |
| 466 void _forEachSubscriber( |
| 467 void action(_StreamSubscriptionImpl<T> subscription)) { |
| 468 _StreamSubscriptionImpl subscription = _subscriber; |
| 469 assert(subscription != null); |
| 470 _startFiring(); |
| 471 action(subscription); |
| 472 _endFiring(); |
| 473 } |
| 474 } |
| 475 |
| 476 // ------------------------------------------------------------------- |
| 477 // Default implementation of a stream with subscribers. |
| 478 // ------------------------------------------------------------------- |
| 479 |
| 480 /** |
| 481 * Default implementation of stream capable of sending events to subscribers. |
| 482 * |
| 483 * Any class needing to implement [Stream] can either directly extend this |
| 484 * class, or extend [Stream] and delegate the subscribe method to an instance |
| 485 * of this class. |
| 486 * |
| 487 * The only public methods are those of [Stream], so instances of |
| 488 * [_MultiStreamImpl] can be returned directly as a [Stream] without exposing |
| 489 * internal functionality. |
| 490 * |
| 491 * The [StreamController] is a public facing version of this class, with |
| 492 * some methods made public. |
| 493 * |
| 494 * The user interface of [_MultiStreamImpl] are the following methods: |
| 495 * * [_add]: Add a data event to the stream. |
| 496 * * [_signalError]: Add an error event to the stream. |
| 497 * * [_close]: Request to close the stream. |
| 498 * * [_onSubscriptionStateChange]: Called when receiving the first subscriber or |
| 499 * when losing the last subscriber. |
| 500 * * [_onPauseStateChange]: Called when entering or leaving paused mode. |
| 501 * * [_hasSubscribers]: Test whether there are currently any subscribers. |
| 502 * * [_isPaused]: Test whether the stream is currently paused. |
| 503 * The user should not add new events while the stream is paused, but if it |
| 504 * happens anyway, the stream will enqueue the events just as when new events |
| 505 * arrive while still firing an old event. |
| 506 */ |
| 507 class _MultiStreamImpl<T> extends _StreamImpl<T> |
| 508 implements _InternalLinkList { |
| 509 // Link list implementation (mixin when possible). |
| 510 _InternalLink _nextLink; |
| 511 _InternalLink _previousLink; |
| 512 |
| 513 _MultiStreamImpl() { |
| 514 _nextLink = _previousLink = this; |
| 515 } |
| 516 |
| 517 // ------------------------------------------------------------------ |
| 518 // Helper functions that can be overridden in subclasses. |
| 519 |
| 520 /** Whether there are currently any subscribers on this [Stream]. */ |
| 521 bool get _hasSubscribers => !_InternalLinkList.isEmpty(this); |
| 522 |
| 523 /** |
| 524 * Create the new subscription object. |
| 525 */ |
| 526 _StreamListener<T> _createSubscription( |
| 527 void onData(T data), |
| 528 void onError(AsyncError error), |
| 529 void onDone(), |
| 530 bool unsubscribeOnError) { |
| 531 return new _StreamSubscriptionImpl<T>( |
| 532 this, onData, onError, onDone, unsubscribeOnError); |
| 533 } |
| 534 |
| 535 // ------------------------------------------------------------------- |
| 536 // Internal implementation. |
| 537 |
| 538 /** |
| 539 * Iterate over all current subscribers and perform an action on each. |
| 540 * |
| 541 * The set of subscribers cannot be modified during this iteration. |
| 542 * All attempts to add or unsubscribe subscribers will be delayed until |
| 543 * after the iteration is complete. |
| 544 * |
| 545 * The [action] must not throw, or the controller will be left in an |
| 546 * invalid state. |
| 547 * |
| 548 * This method must not be called while [isFiring] is true. |
| 549 */ |
| 550 void _forEachSubscriber( |
| 551 void action(_StreamListener<T> subscription)) { |
| 552 assert(!_isFiring); |
| 553 if (!_hasSubscribers) return; |
| 554 _startFiring(); |
| 555 _InternalLink cursor = this._nextLink; |
| 556 while (!identical(cursor, this)) { |
| 557 _StreamListener<T> current = cursor; |
| 558 if (current._needsEvent(_currentEventIdBit)) { |
| 559 action(current); |
| 560 // Marks as having received the event. |
| 561 current._toggleEventReceived(); |
| 562 } |
| 563 cursor = current._nextLink; |
| 564 if (current._isPendingUnsubscribe) { |
| 565 _removeListener(current); |
| 566 } |
| 567 } |
| 568 _endFiring(); |
| 569 if (_isPaused) _onPauseStateChange(); |
| 570 if (!_hasSubscribers) _onSubscriptionStateChange(); |
| 571 } |
| 572 |
| 573 void _addListener(_StreamListener listener) { |
| 574 listener._setSubscribed(_currentEventIdBit); |
| 575 bool firstSubscriber = !_hasSubscribers; |
| 576 _InternalLinkList.add(this, listener); |
| 577 if (firstSubscriber) { |
| 578 _onSubscriptionStateChange(); |
| 579 } |
| 580 } |
| 581 |
| 582 /** |
| 583 * Handle a cancel requested from a [_StreamListener]. |
| 584 * |
| 585 * This method is called from [_StreamListener.cancel]. |
| 586 * |
| 587 * If an event is currently firing, the cancel is delayed |
| 588 * until after the subscribers have received the event. |
| 589 */ |
| 590 void _cancel(_StreamListener listener) { |
| 591 assert(identical(listener._source, this)); |
| 592 if (_InternalLink.isUnlinked(listener)) { |
| 593 // You may unsubscribe more than once, only the first one counts. |
| 594 return; |
| 595 } |
| 596 if (_isFiring) { |
| 597 if (listener._needsEvent(_currentEventIdBit)) { |
| 598 assert(listener._isSubscribed); |
| 599 listener._setPendingUnsubscribe(); |
| 600 } else { |
| 601 // The listener has been notified of the event (or don't need to, |
| 602 // if it's still pending subscription) so it's safe to remove it. |
| 603 _removeListener(listener); |
| 604 } |
| 605 // Pause and subscription state changes are reported when we end |
| 606 // firing. |
| 607 } else { |
| 608 bool wasPaused = _isPaused; |
| 609 _removeListener(listener); |
| 610 if (wasPaused != _isPaused) _onPauseStateChange(); |
| 611 if (!_hasSubscribers) _onSubscriptionStateChange(); |
| 612 } |
| 613 } |
| 614 |
| 615 /** |
| 616 * Removes a listener from this stream and cancels its pauses. |
| 617 * |
| 618 * This is a low-level action that doesn't call [_onSubscriptionStateChange]. |
| 619 * or [_onPauseStateChange]. |
| 620 */ |
| 621 void _removeListener(_StreamListener listener) { |
| 622 int pauseCount = listener._setUnsubscribed(); |
| 623 _updatePauseCount(-pauseCount); |
| 624 _InternalLinkList.remove(listener); |
| 625 } |
| 626 } |
| 627 |
| 628 /** |
| 629 * The subscription class that the [StreamController] uses. |
| 630 * |
| 631 * The [StreamController.createSubscription] method should |
| 632 * create an object of this type, or another subclass of [_StreamListener]. |
| 633 * A subclass of [StreamController] can specify which subclass |
| 634 * of [_StreamSubscriptionImpl] it uses by overriding |
| 635 * [StreamController.createSubscription]. |
| 636 * |
| 637 * The subscription is in one of three states: |
| 638 * * Subscribed. |
| 639 * * Paused-and-subscribed. |
| 640 * * Unsubscribed. |
| 641 * Unsubscribing also unpauses. |
| 642 */ |
| 643 class _StreamSubscriptionImpl<T> extends _StreamListener<T> |
| 644 implements StreamSubscription<T> { |
| 645 final bool _unsubscribeOnError; |
| 646 _DataHandler _onData; |
| 647 _ErrorHandler _onError; |
| 648 _DoneHandler _onDone; |
| 649 _StreamSubscriptionImpl(_StreamImpl source, |
| 650 this._onData, |
| 651 this._onError, |
| 652 this._onDone, |
| 653 this._unsubscribeOnError) : super(source); |
| 654 |
| 655 void onData(void handleData(T event)) { |
| 656 if (handleData == null) handleData = _nullDataHandler; |
| 657 _onData = handleData; |
| 658 } |
| 659 |
| 660 void onError(void handleError(AsyncError error)) { |
| 661 if (handleError == null) handleError = _nullErrorHandler; |
| 662 _onError = handleError; |
| 663 } |
| 664 |
| 665 void onDone(void handleDone()) { |
| 666 if (handleDone == null) handleDone = _nullDoneHandler; |
| 667 _onDone = handleDone; |
| 668 } |
| 669 |
| 670 void _sendData(T data) { |
| 671 _onData(data); |
| 672 } |
| 673 |
| 674 void _sendError(AsyncError error) { |
| 675 _onError(error); |
| 676 if (_unsubscribeOnError) _source._cancel(this); |
| 677 } |
| 678 |
| 679 void _sendDone() { |
| 680 _onDone(); |
| 681 } |
| 682 |
| 683 void cancel() { |
| 684 _source._cancel(this); |
| 685 } |
| 686 |
| 687 void pause([Signal resumeSignal]) { |
| 688 _source._pause(this, resumeSignal); |
| 689 } |
| 690 |
| 691 void resume() { |
| 692 if (!isPaused) { |
| 693 throw new StateError("Resuming unpaused subscription"); |
| 694 } |
| 695 _source._resume(this, false); |
| 696 } |
| 697 } |
| 698 |
| 699 // Internal helpers. |
| 700 |
| 701 // Types of the different handlers on a stream. Types used to type fields. |
| 702 typedef void _DataHandler<T>(T value); |
| 703 typedef void _ErrorHandler(AsyncError error); |
| 704 typedef void _DoneHandler(); |
| 705 |
| 706 |
| 707 /** Default data handler, does nothing. */ |
| 708 void _nullDataHandler(var value) {} |
| 709 |
| 710 /** Default error handler, reports the error to the global handler. */ |
| 711 void _nullErrorHandler(AsyncError error) { |
| 712 error.throwDelayed(); |
| 713 } |
| 714 |
| 715 /** Default done handler, does nothing. */ |
| 716 void _nullDoneHandler() {} |
| 717 |
| 718 |
| 719 /** A delayed event on a stream implementation. */ |
| 720 abstract class _DelayedEvent { |
| 721 /** Added as a linked list on the [StreamController]. */ |
| 722 _DelayedEvent next; |
| 723 /** Execute the delayed event on the [StreamController]. */ |
| 724 void perform(_StreamImpl stream); |
| 725 } |
| 726 |
| 727 /** A delayed data event. */ |
| 728 class _DelayedData<T> extends _DelayedEvent{ |
| 729 T value; |
| 730 _DelayedData(this.value); |
| 731 void perform(_StreamImpl<T> stream) { |
| 732 stream._sendData(value); |
| 733 } |
| 734 } |
| 735 |
| 736 /** A delayed error event. */ |
| 737 class _DelayedError extends _DelayedEvent { |
| 738 AsyncError error; |
| 739 _DelayedError(this.error); |
| 740 void perform(_StreamImpl stream) { |
| 741 stream._sendError(error); |
| 742 } |
| 743 } |
| 744 |
| 745 /** A delayed done event. */ |
| 746 class _DelayedDone implements _DelayedEvent { |
| 747 const _DelayedDone(); |
| 748 void perform(_StreamImpl stream) { |
| 749 stream._sendDone(); |
| 750 } |
| 751 |
| 752 _DelayedEvent get next => null; |
| 753 |
| 754 void set next(_DelayedEvent _) { |
| 755 throw new StateError("No events after a done."); |
| 756 } |
| 757 } |
| 758 |
| 759 /** |
| 760 * Simple internal doubly-linked list implementation. |
| 761 * |
| 762 * In an internal linked list, the links are in the data objects themselves, |
| 763 * instead of in a separate object. That means each element can be in at most |
| 764 * one list at a time. |
| 765 * |
| 766 * All links are always members of an element cycle. At creation it's a |
| 767 * singleton cycle. |
| 768 */ |
| 769 abstract class _InternalLink { |
| 770 _InternalLink _nextLink; |
| 771 _InternalLink _previousLink; |
| 772 |
| 773 _InternalLink() { |
| 774 this._previousLink = this._nextLink = this; |
| 775 } |
| 776 |
| 777 /* Removes a link from any list it may be part of, and links it to itself. */ |
| 778 static void unlink(_InternalLink element) { |
| 779 _InternalLink next = element._nextLink; |
| 780 _InternalLink previous = element._previousLink; |
| 781 next._previousLink = previous; |
| 782 previous._nextLink = next; |
| 783 element._nextLink = element._previousLink = element; |
| 784 } |
| 785 |
| 786 /** Check whether an element is unattached to other elements. */ |
| 787 static bool isUnlinked(_InternalLink element) { |
| 788 return identical(element, element._nextLink); |
| 789 } |
| 790 } |
| 791 |
| 792 /** |
| 793 * Marker interface for "list" links. |
| 794 * |
| 795 * An "InternalLinkList" is an abstraction on top of a link cycle, where the |
| 796 * "list" object itself is not considered an element (it's just a header link |
| 797 * created to avoid edge cases). |
| 798 * An element is considered part of a list if it is in the list's cycle. |
| 799 * There should never be more than one "list" object in a cycle. |
| 800 */ |
| 801 abstract class _InternalLinkList extends _InternalLink { |
| 802 /** |
| 803 * Adds an element to a list, just before the header link. |
| 804 * |
| 805 * This effectively adds it at the end of the list. |
| 806 */ |
| 807 static void add(_InternalLinkList list, _InternalLink element) { |
| 808 if (!_InternalLink.isUnlinked(element)) _InternalLink.unlink(element); |
| 809 _InternalLink listEnd = list._previousLink; |
| 810 listEnd._nextLink = element; |
| 811 list._previousLink = element; |
| 812 element._previousLink = listEnd; |
| 813 element._nextLink = list; |
| 814 } |
| 815 |
| 816 /** Removes an element from its list. */ |
| 817 static void remove(_InternalLink element) { |
| 818 _InternalLink.unlink(element); |
| 819 } |
| 820 |
| 821 /** Check whether a list contains no elements, only the header link. */ |
| 822 static bool isEmpty(_InternalLinkList list) => _InternalLink.isUnlinked(list); |
| 823 |
| 824 /** Moves all elements from the list [other] to [list]. */ |
| 825 static void addAll(_InternalLinkList list, _InternalLinkList other) { |
| 826 if (isEmpty(other)) return; |
| 827 _InternalLink listLast = list._previousLink; |
| 828 _InternalLink otherNext = other._nextLink; |
| 829 listLast._nextLink = otherNext; |
| 830 otherNext._previousLink = listLast; |
| 831 _InternalLink otherLast = other._previousLink; |
| 832 list._previousLink = otherLast; |
| 833 otherLast._nextLink = list; |
| 834 // Clean up [other]. |
| 835 other._nextLink = other._previousLink = other; |
| 836 } |
| 837 } |
| 838 |
| 839 abstract class _StreamListener<T> extends _InternalLink { |
| 840 final _StreamImpl _source; |
| 841 int _state = _LISTENER_UNSUBSCRIBED; |
| 842 |
| 843 _StreamListener(this._source); |
| 844 |
| 845 bool get isPaused => _state >= (1 << _LISTENER_PAUSE_COUNT_SHIFT); |
| 846 |
| 847 bool get _isPendingUnsubscribe => |
| 848 (_state & _LISTENER_PENDING_UNSUBSCRIBE) != 0; |
| 849 |
| 850 bool get _isSubscribed => (_state & _LISTENER_SUBSCRIBED) != 0; |
| 851 |
| 852 /** |
| 853 * Whether the listener still needs to receive the currently firing event. |
| 854 * |
| 855 * The currently firing event is identified by a single bit, which alternates |
| 856 * between events. The [_state] contains the previously sent event's bit in |
| 857 * the [_LISTENER_EVENT_ID] bit. If the two don't match, this listener |
| 858 * still need the current event. |
| 859 */ |
| 860 bool _needsEvent(int currentEventIdBit) { |
| 861 int lastEventIdBit = |
| 862 (_state & _LISTENER_EVENT_ID) >> _LISTENER_EVENT_ID_SHIFT; |
| 863 return lastEventIdBit != currentEventIdBit; |
| 864 } |
| 865 |
| 866 /// If a subscriber's "firing bit" doesn't match the stream's firing bit, |
| 867 /// we are currently firing an event and the subscriber still need to receive |
| 868 /// the event. |
| 869 void _toggleEventReceived() { |
| 870 _state ^= _LISTENER_EVENT_ID; |
| 871 } |
| 872 |
| 873 void _setSubscribed(int eventIdBit) { |
| 874 assert(eventIdBit == 0 || eventIdBit == 1); |
| 875 _state = _LISTENER_SUBSCRIBED | (eventIdBit << _LISTENER_EVENT_ID_SHIFT); |
| 876 } |
| 877 |
| 878 void _setPendingUnsubscribe() { |
| 879 assert(_isSubscribed); |
| 880 _state |= _LISTENER_PENDING_UNSUBSCRIBE; |
| 881 } |
| 882 |
| 883 /** |
| 884 * Marks the listener as unsubscibed. |
| 885 * |
| 886 * Returns the number of unresumed pauses for the listener. |
| 887 */ |
| 888 int _setUnsubscribed() { |
| 889 assert(_isSubscribed); |
| 890 int timesPaused = _state >> _LISTENER_PAUSE_COUNT_SHIFT; |
| 891 _state = _LISTENER_UNSUBSCRIBED; |
| 892 return timesPaused; |
| 893 } |
| 894 |
| 895 void _incrementPauseCount() { |
| 896 _state += 1 << _LISTENER_PAUSE_COUNT_SHIFT; |
| 897 } |
| 898 |
| 899 void _decrementPauseCount() { |
| 900 assert(isPaused); |
| 901 _state -= 1 << _LISTENER_PAUSE_COUNT_SHIFT; |
| 902 } |
| 903 |
| 904 _sendData(T data); |
| 905 _sendError(AsyncError error); |
| 906 _sendDone(); |
| 907 } |
| 908 |
| 909 /** Class holding pending events for a [_StreamImpl]. */ |
| 910 class _StreamImplEvents { |
| 911 /// Single linked list of [_DelayedEvent] objects. |
| 912 _DelayedEvent firstPendingEvent = null; |
| 913 /// Last element in the list of pending events. New events are added after it. |
| 914 _DelayedEvent lastPendingEvent = null; |
| 915 /** |
| 916 * Timer set when pending events are scheduled for execution. |
| 917 * |
| 918 * When scheduling pending events for execution in a later cycle, the timer |
| 919 * is stored here. If pending events are executed earlier than that, e.g., |
| 920 * due to a second event in the current cycle, the timer is canceled again. |
| 921 */ |
| 922 Timer scheduleTimer = null; |
| 923 |
| 924 bool get isEmpty => lastPendingEvent == null; |
| 925 |
| 926 bool get isScheduled => scheduleTimer != null; |
| 927 |
| 928 void schedule(_StreamImpl stream) { |
| 929 if (isScheduled) return; |
| 930 scheduleTimer = new Timer(0, (_) { |
| 931 scheduleTimer = null; |
| 932 stream._handlePendingEvents(); |
| 933 }); |
| 934 } |
| 935 |
| 936 void cancelSchedule() { |
| 937 assert(isScheduled); |
| 938 scheduleTimer.cancel(); |
| 939 scheduleTimer = null; |
| 940 } |
| 941 |
| 942 void add(_DelayedEvent event) { |
| 943 if (lastPendingEvent == null) { |
| 944 firstPendingEvent = lastPendingEvent = event; |
| 945 } else { |
| 946 lastPendingEvent = lastPendingEvent.next = event; |
| 947 } |
| 948 } |
| 949 |
| 950 _DelayedEvent removeFirst() { |
| 951 if (isScheduled) cancelSchedule(); |
| 952 _DelayedEvent event = firstPendingEvent; |
| 953 firstPendingEvent = event.next; |
| 954 if (firstPendingEvent == null) { |
| 955 lastPendingEvent = null; |
| 956 } |
| 957 return event; |
| 958 } |
| 959 } |
| 960 |
| 961 |
| 962 class _DoneSubscription<T> implements StreamSubscription<T> { |
| 963 _DoneHandler _handler; |
| 964 Timer _timer; |
| 965 int _pauseCount = 0; |
| 966 |
| 967 _DoneSubscription(this._handler) { |
| 968 _delayDone(); |
| 969 } |
| 970 |
| 971 void _delayDone() { |
| 972 assert(_timer == null && _pauseCount == 0); |
| 973 _timer = new Timer(0, (_) { |
| 974 if (_handler != null) _handler(); |
| 975 }); |
| 976 } |
| 977 |
| 978 bool get _isComplete => _timer == null && _pauseCount == 0; |
| 979 |
| 980 void onData(void handleAction(T value)) {} |
| 981 void onError(void handleError(StateError error)) {} |
| 982 void onDone(void handleDone(T value)) { |
| 983 _handler = handleDone; |
| 984 } |
| 985 |
| 986 void pause([Signal signal]) { |
| 987 if (_isComplete) { |
| 988 throw new StateError("Subscription has been canceled."); |
| 989 } |
| 990 if (_timer != null) _timer.cancel(); |
| 991 _pauseCount++; |
| 992 } |
| 993 |
| 994 void resume() { |
| 995 if (_isComplete) { |
| 996 throw new StateError("Subscription has been canceled."); |
| 997 } |
| 998 if (_pauseCount == 0) return; |
| 999 _pauseCount--; |
| 1000 if (_pauseCount == 0) { |
| 1001 _delayDone(); |
| 1002 } |
| 1003 } |
| 1004 |
| 1005 bool get isPaused => _pauseCount > 0; |
| 1006 |
| 1007 void cancel() { |
| 1008 if (_isComplete) { |
| 1009 throw new StateError("Subscription has been canceled."); |
| 1010 } |
| 1011 if (_timer != null) { |
| 1012 _timer.cancel(); |
| 1013 _timer = null; |
| 1014 } |
| 1015 _pauseCount = 0; |
| 1016 } |
| 1017 } |
OLD | NEW |