| OLD | NEW |
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 // States shared by single/multi stream implementations. | 7 // States shared by single/multi stream implementations. |
| 8 | 8 |
| 9 // Completion state of the stream. | 9 // Completion state of the stream. |
| 10 /// Initial and default state where the stream can receive and send events. | 10 /// Initial and default state where the stream can receive and send events. |
| (...skipping 208 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 219 /** | 219 /** |
| 220 * The bit representing the current or last event fired. | 220 * The bit representing the current or last event fired. |
| 221 * | 221 * |
| 222 * This bit matches a bit on listeners that have received the corresponding | 222 * This bit matches a bit on listeners that have received the corresponding |
| 223 * event. It is toggled for each new event being fired. | 223 * event. It is toggled for each new event being fired. |
| 224 */ | 224 */ |
| 225 int get _currentEventIdBit => | 225 int get _currentEventIdBit => |
| 226 (_state & _STREAM_EVENT_ID ) >> _STREAM_EVENT_ID_SHIFT; | 226 (_state & _STREAM_EVENT_ID ) >> _STREAM_EVENT_ID_SHIFT; |
| 227 | 227 |
| 228 /** Whether there is currently a subscriber on this [Stream]. */ | 228 /** Whether there is currently a subscriber on this [Stream]. */ |
| 229 bool get _hasSubscribers; | 229 bool get _hasListener; |
| 230 | 230 |
| 231 | 231 |
| 232 /** Whether the state bits allow firing. */ | 232 /** Whether the state bits allow firing. */ |
| 233 bool get _mayFireState { | 233 bool get _mayFireState { |
| 234 // The state allows firing unless: | 234 // The state allows firing unless: |
| 235 // - it's currently firing | 235 // - it's currently firing |
| 236 // - it's currently in a callback | 236 // - it's currently in a callback |
| 237 // - it's paused | 237 // - it's paused |
| 238 const int mask = | 238 const int mask = |
| 239 _STREAM_FIRING | | 239 _STREAM_FIRING | |
| (...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 279 } | 279 } |
| 280 | 280 |
| 281 void _setComplete() { | 281 void _setComplete() { |
| 282 assert(_isClosed); | 282 assert(_isClosed); |
| 283 _state = _state |_STREAM_COMPLETE; | 283 _state = _state |_STREAM_COMPLETE; |
| 284 } | 284 } |
| 285 | 285 |
| 286 void _startFiring() { | 286 void _startFiring() { |
| 287 assert(!_isFiring); | 287 assert(!_isFiring); |
| 288 assert(!_isInCallback); | 288 assert(!_isInCallback); |
| 289 assert(_hasSubscribers); | 289 assert(_hasListener); |
| 290 assert(!_isPaused); | 290 assert(!_isPaused); |
| 291 // This sets the _STREAM_FIRING bit and toggles the _STREAM_EVENT_ID | 291 // This sets the _STREAM_FIRING bit and toggles the _STREAM_EVENT_ID |
| 292 // bit. All current subscribers will now have a _LISTENER_EVENT_ID | 292 // bit. All current subscribers will now have a _LISTENER_EVENT_ID |
| 293 // that doesn't match _STREAM_EVENT_ID, and they will receive the | 293 // that doesn't match _STREAM_EVENT_ID, and they will receive the |
| 294 // event being fired. | 294 // event being fired. |
| 295 _state ^= _STREAM_FIRING | _STREAM_EVENT_ID; | 295 _state ^= _STREAM_FIRING | _STREAM_EVENT_ID; |
| 296 } | 296 } |
| 297 | 297 |
| 298 void _endFiring(bool wasInputPaused) { | 298 void _endFiring(bool wasInputPaused) { |
| 299 assert(_isFiring); | 299 assert(_isFiring); |
| (...skipping 110 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 410 */ | 410 */ |
| 411 void _forEachSubscriber(void action(_StreamSubscriptionImpl<T> subscription)); | 411 void _forEachSubscriber(void action(_StreamSubscriptionImpl<T> subscription)); |
| 412 | 412 |
| 413 /** | 413 /** |
| 414 * Checks whether the subscription/pause state has changed. | 414 * Checks whether the subscription/pause state has changed. |
| 415 * | 415 * |
| 416 * Calls the appropriate callback if the state has changed from the | 416 * Calls the appropriate callback if the state has changed from the |
| 417 * provided one. Repeats calling callbacks as long as the call changes | 417 * provided one. Repeats calling callbacks as long as the call changes |
| 418 * the state. | 418 * the state. |
| 419 */ | 419 */ |
| 420 void _checkCallbacks(bool hadSubscribers, bool wasPaused) { | 420 void _checkCallbacks(bool hadListener, bool wasPaused) { |
| 421 assert(!_isFiring); | 421 assert(!_isFiring); |
| 422 // Will be handled after the current callback. | 422 // Will be handled after the current callback. |
| 423 if (_isInCallback) return; | 423 if (_isInCallback) return; |
| 424 if (_hasPendingResume && !_hasPendingEvent) { | 424 if (_hasPendingResume && !_hasPendingEvent) { |
| 425 _state ^= _STREAM_PENDING_RESUME; | 425 _state ^= _STREAM_PENDING_RESUME; |
| 426 } | 426 } |
| 427 _state |= _STREAM_CALLBACK; | 427 _state |= _STREAM_CALLBACK; |
| 428 while (true) { | 428 while (true) { |
| 429 bool hasSubscribers = _hasSubscribers; | 429 bool hasListener = _hasListener; |
| 430 bool isPaused = _isInputPaused; | 430 bool isPaused = _isInputPaused; |
| 431 if (hadSubscribers != hasSubscribers) { | 431 if (hadListener != hasListener) { |
| 432 _onSubscriptionStateChange(); | 432 _onSubscriptionStateChange(); |
| 433 } else if (isPaused != wasPaused) { | 433 } else if (isPaused != wasPaused) { |
| 434 _onPauseStateChange(); | 434 _onPauseStateChange(); |
| 435 } else { | 435 } else { |
| 436 _state ^= _STREAM_CALLBACK; | 436 _state ^= _STREAM_CALLBACK; |
| 437 return; | 437 return; |
| 438 } | 438 } |
| 439 wasPaused = isPaused; | 439 wasPaused = isPaused; |
| 440 hadSubscribers = hasSubscribers; | 440 hadListener = hasListener; |
| 441 } | 441 } |
| 442 } | 442 } |
| 443 | 443 |
| 444 /** | 444 /** |
| 445 * Called when the first subscriber requests a pause or the last a resume. | 445 * Called when the first subscriber requests a pause or the last a resume. |
| 446 * | 446 * |
| 447 * Read [isPaused] to see the new state. | 447 * Read [isPaused] to see the new state. |
| 448 */ | 448 */ |
| 449 void _onPauseStateChange() {} | 449 void _onPauseStateChange() {} |
| 450 | 450 |
| 451 /** | 451 /** |
| 452 * Called when the first listener subscribes or the last unsubscribes. | 452 * Called when the first listener subscribes or the last unsubscribes. |
| 453 * | 453 * |
| 454 * Read [hasSubscribers] to see what the new state is. | 454 * Read [hasListener] to see what the new state is. |
| 455 */ | 455 */ |
| 456 void _onSubscriptionStateChange() {} | 456 void _onSubscriptionStateChange() {} |
| 457 | 457 |
| 458 /** | 458 /** |
| 459 * Add a pending event at the end of the pending event queue. | 459 * Add a pending event at the end of the pending event queue. |
| 460 * | 460 * |
| 461 * Schedules events if currently not paused and inside a callback. | 461 * Schedules events if currently not paused and inside a callback. |
| 462 */ | 462 */ |
| 463 void _addPendingEvent(_DelayedEvent event) { | 463 void _addPendingEvent(_DelayedEvent event) { |
| 464 if (_pendingEvents == null) _pendingEvents = new _StreamImplEvents(); | 464 if (_pendingEvents == null) _pendingEvents = new _StreamImplEvents(); |
| (...skipping 17 matching lines...) Expand all Loading... |
| 482 events.handleNext(this); | 482 events.handleNext(this); |
| 483 } while (!events.isEmpty); | 483 } while (!events.isEmpty); |
| 484 } | 484 } |
| 485 | 485 |
| 486 /** | 486 /** |
| 487 * Send a data event directly to each subscriber. | 487 * Send a data event directly to each subscriber. |
| 488 */ | 488 */ |
| 489 _sendData(T value) { | 489 _sendData(T value) { |
| 490 assert(!_isPaused); | 490 assert(!_isPaused); |
| 491 assert(!_isComplete); | 491 assert(!_isComplete); |
| 492 if (!_hasSubscribers) return; | 492 if (!_hasListener) return; |
| 493 _forEachSubscriber((subscriber) { | 493 _forEachSubscriber((subscriber) { |
| 494 try { | 494 try { |
| 495 subscriber._sendData(value); | 495 subscriber._sendData(value); |
| 496 } on AsyncError catch (e) { | 496 } on AsyncError catch (e) { |
| 497 e.throwDelayed(); | 497 e.throwDelayed(); |
| 498 } catch (e, s) { | 498 } catch (e, s) { |
| 499 new AsyncError(e, s).throwDelayed(); | 499 new AsyncError(e, s).throwDelayed(); |
| 500 } | 500 } |
| 501 }); | 501 }); |
| 502 } | 502 } |
| 503 | 503 |
| 504 /** | 504 /** |
| 505 * Sends an error event directly to each subscriber. | 505 * Sends an error event directly to each subscriber. |
| 506 */ | 506 */ |
| 507 void _sendError(AsyncError error) { | 507 void _sendError(AsyncError error) { |
| 508 assert(!_isPaused); | 508 assert(!_isPaused); |
| 509 assert(!_isComplete); | 509 assert(!_isComplete); |
| 510 if (!_hasSubscribers) return; | 510 if (!_hasListener) return; |
| 511 _forEachSubscriber((subscriber) { | 511 _forEachSubscriber((subscriber) { |
| 512 try { | 512 try { |
| 513 subscriber._sendError(error); | 513 subscriber._sendError(error); |
| 514 } on AsyncError catch (e) { | 514 } on AsyncError catch (e) { |
| 515 e.throwDelayed(); | 515 e.throwDelayed(); |
| 516 } catch (e, s) { | 516 } catch (e, s) { |
| 517 new AsyncError.withCause(e, s, error).throwDelayed(); | 517 new AsyncError.withCause(e, s, error).throwDelayed(); |
| 518 } | 518 } |
| 519 }); | 519 }); |
| 520 } | 520 } |
| 521 | 521 |
| 522 /** | 522 /** |
| 523 * Sends the "done" message directly to each subscriber. | 523 * Sends the "done" message directly to each subscriber. |
| 524 * This automatically stops further subscription and | 524 * This automatically stops further subscription and |
| 525 * unsubscribes all subscribers. | 525 * unsubscribes all subscribers. |
| 526 */ | 526 */ |
| 527 void _sendDone() { | 527 void _sendDone() { |
| 528 assert(!_isPaused); | 528 assert(!_isPaused); |
| 529 assert(_isClosed); | 529 assert(_isClosed); |
| 530 _setComplete(); | 530 _setComplete(); |
| 531 if (!_hasSubscribers) return; | 531 if (!_hasListener) return; |
| 532 _forEachSubscriber((subscriber) { | 532 _forEachSubscriber((subscriber) { |
| 533 _cancel(subscriber); | 533 _cancel(subscriber); |
| 534 try { | 534 try { |
| 535 subscriber._sendDone(); | 535 subscriber._sendDone(); |
| 536 } on AsyncError catch (e) { | 536 } on AsyncError catch (e) { |
| 537 e.throwDelayed(); | 537 e.throwDelayed(); |
| 538 } catch (e, s) { | 538 } catch (e, s) { |
| 539 new AsyncError(e, s).throwDelayed(); | 539 new AsyncError(e, s).throwDelayed(); |
| 540 } | 540 } |
| 541 }); | 541 }); |
| 542 assert(!_hasSubscribers); | 542 assert(!_hasListener); |
| 543 } | 543 } |
| 544 } | 544 } |
| 545 | 545 |
| 546 // ------------------------------------------------------------------- | 546 // ------------------------------------------------------------------- |
| 547 // Default implementation of a stream with a single subscriber. | 547 // Default implementation of a stream with a single subscriber. |
| 548 // ------------------------------------------------------------------- | 548 // ------------------------------------------------------------------- |
| 549 /** | 549 /** |
| 550 * Default implementation of stream capable of sending events to one subscriber. | 550 * Default implementation of stream capable of sending events to one subscriber. |
| 551 * | 551 * |
| 552 * Any class needing to implement [Stream] can either directly extend this | 552 * Any class needing to implement [Stream] can either directly extend this |
| 553 * class, or extend [Stream] and delegate the subscribe method to an instance | 553 * class, or extend [Stream] and delegate the subscribe method to an instance |
| 554 * of this class. | 554 * of this class. |
| 555 * | 555 * |
| 556 * The only public methods are those of [Stream], so instances of | 556 * The only public methods are those of [Stream], so instances of |
| 557 * [_SingleStreamImpl] can be returned directly as a [Stream] without exposing | 557 * [_SingleStreamImpl] can be returned directly as a [Stream] without exposing |
| 558 * internal functionality. | 558 * internal functionality. |
| 559 * | 559 * |
| 560 * The [StreamController] is a public facing version of this class, with | 560 * The [StreamController] is a public facing version of this class, with |
| 561 * some methods made public. | 561 * some methods made public. |
| 562 * | 562 * |
| 563 * The user interface of [_SingleStreamImpl] are the following methods: | 563 * The user interface of [_SingleStreamImpl] are the following methods: |
| 564 * * [_add]: Add a data event to the stream. | 564 * * [_add]: Add a data event to the stream. |
| 565 * * [_addError]: Add an error event to the stream. | 565 * * [_addError]: Add an error event to the stream. |
| 566 * * [_close]: Request to close the stream. | 566 * * [_close]: Request to close the stream. |
| 567 * * [_onSubscriberStateChange]: Called when receiving the first subscriber or | 567 * * [_onSubscriberStateChange]: Called when receiving the first subscriber or |
| 568 * when losing the last subscriber. | 568 * when losing the last subscriber. |
| 569 * * [_onPauseStateChange]: Called when entering or leaving paused mode. | 569 * * [_onPauseStateChange]: Called when entering or leaving paused mode. |
| 570 * * [_hasSubscribers]: Test whether there are currently any subscribers. | 570 * * [_hasListener]: Test whether there are currently any subscribers. |
| 571 * * [_isInputPaused]: Test whether the stream is currently paused. | 571 * * [_isInputPaused]: Test whether the stream is currently paused. |
| 572 * The user should not add new events while the stream is paused, but if it | 572 * The user should not add new events while the stream is paused, but if it |
| 573 * happens anyway, the stream will enqueue the events just as when new events | 573 * happens anyway, the stream will enqueue the events just as when new events |
| 574 * arrive while still firing an old event. | 574 * arrive while still firing an old event. |
| 575 */ | 575 */ |
| 576 class _SingleStreamImpl<T> extends _StreamImpl<T> { | 576 class _SingleStreamImpl<T> extends _StreamImpl<T> { |
| 577 _StreamListener _subscriber = null; | 577 _StreamListener _subscriber = null; |
| 578 | 578 |
| 579 /** Whether there is currently a subscriber on this [Stream]. */ | 579 /** Whether there is currently a subscriber on this [Stream]. */ |
| 580 bool get _hasSubscribers => _subscriber != null; | 580 bool get _hasListener => _subscriber != null; |
| 581 | 581 |
| 582 // ------------------------------------------------------------------- | 582 // ------------------------------------------------------------------- |
| 583 // Internal implementation. | 583 // Internal implementation. |
| 584 | 584 |
| 585 _SingleStreamImpl() { | 585 _SingleStreamImpl() { |
| 586 // Start out paused. | 586 // Start out paused. |
| 587 _updatePauseCount(1); | 587 _updatePauseCount(1); |
| 588 } | 588 } |
| 589 | 589 |
| 590 /** | 590 /** |
| 591 * Create the new subscription object. | 591 * Create the new subscription object. |
| 592 */ | 592 */ |
| 593 _StreamSubscriptionImpl<T> _createSubscription( | 593 _StreamSubscriptionImpl<T> _createSubscription( |
| 594 void onData(T data), | 594 void onData(T data), |
| 595 void onError(AsyncError error), | 595 void onError(AsyncError error), |
| 596 void onDone(), | 596 void onDone(), |
| 597 bool unsubscribeOnError) { | 597 bool unsubscribeOnError) { |
| 598 return new _StreamSubscriptionImpl<T>( | 598 return new _StreamSubscriptionImpl<T>( |
| 599 this, onData, onError, onDone, unsubscribeOnError); | 599 this, onData, onError, onDone, unsubscribeOnError); |
| 600 } | 600 } |
| 601 | 601 |
| 602 void _addListener(_StreamListener subscription) { | 602 void _addListener(_StreamListener subscription) { |
| 603 assert(!_isComplete); | 603 assert(!_isComplete); |
| 604 if (_hasSubscribers) { | 604 if (_hasListener) { |
| 605 throw new StateError("Stream already has subscriber."); | 605 throw new StateError("Stream already has subscriber."); |
| 606 } | 606 } |
| 607 assert(_pauseCount == 1); | 607 assert(_pauseCount == 1); |
| 608 _updatePauseCount(-1); | 608 _updatePauseCount(-1); |
| 609 _subscriber = subscription; | 609 _subscriber = subscription; |
| 610 subscription._setSubscribed(0); | 610 subscription._setSubscribed(0); |
| 611 if (_isInactive) { | 611 if (_isInactive) { |
| 612 _checkCallbacks(false, true); | 612 _checkCallbacks(false, true); |
| 613 if (!_isPaused && _hasPendingEvent) { | 613 if (!_isPaused && _hasPendingEvent) { |
| 614 _schedulePendingEvents(); | 614 _schedulePendingEvents(); |
| (...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 674 * The [StreamController] is a public facing version of this class, with | 674 * The [StreamController] is a public facing version of this class, with |
| 675 * some methods made public. | 675 * some methods made public. |
| 676 * | 676 * |
| 677 * The user interface of [_MultiStreamImpl] are the following methods: | 677 * The user interface of [_MultiStreamImpl] are the following methods: |
| 678 * * [_add]: Add a data event to the stream. | 678 * * [_add]: Add a data event to the stream. |
| 679 * * [_addError]: Add an error event to the stream. | 679 * * [_addError]: Add an error event to the stream. |
| 680 * * [_close]: Request to close the stream. | 680 * * [_close]: Request to close the stream. |
| 681 * * [_onSubscriptionStateChange]: Called when receiving the first subscriber or | 681 * * [_onSubscriptionStateChange]: Called when receiving the first subscriber or |
| 682 * when losing the last subscriber. | 682 * when losing the last subscriber. |
| 683 * * [_onPauseStateChange]: Called when entering or leaving paused mode. | 683 * * [_onPauseStateChange]: Called when entering or leaving paused mode. |
| 684 * * [_hasSubscribers]: Test whether there are currently any subscribers. | 684 * * [_hasListener]: Test whether there are currently any subscribers. |
| 685 * * [_isPaused]: Test whether the stream is currently paused. | 685 * * [_isPaused]: Test whether the stream is currently paused. |
| 686 * The user should not add new events while the stream is paused, but if it | 686 * The user should not add new events while the stream is paused, but if it |
| 687 * happens anyway, the stream will enqueue the events just as when new events | 687 * happens anyway, the stream will enqueue the events just as when new events |
| 688 * arrive while still firing an old event. | 688 * arrive while still firing an old event. |
| 689 */ | 689 */ |
| 690 class _MultiStreamImpl<T> extends _StreamImpl<T> | 690 class _MultiStreamImpl<T> extends _StreamImpl<T> |
| 691 implements _InternalLinkList { | 691 implements _InternalLinkList { |
| 692 // Link list implementation (mixin when possible). | 692 // Link list implementation (mixin when possible). |
| 693 _InternalLink _nextLink; | 693 _InternalLink _nextLink; |
| 694 _InternalLink _previousLink; | 694 _InternalLink _previousLink; |
| 695 | 695 |
| 696 _MultiStreamImpl() { | 696 _MultiStreamImpl() { |
| 697 _nextLink = _previousLink = this; | 697 _nextLink = _previousLink = this; |
| 698 } | 698 } |
| 699 | 699 |
| 700 bool get isBroadcast => true; | 700 bool get isBroadcast => true; |
| 701 | 701 |
| 702 Stream<T> asBroadcastStream() => this; | 702 Stream<T> asBroadcastStream() => this; |
| 703 | 703 |
| 704 // ------------------------------------------------------------------ | 704 // ------------------------------------------------------------------ |
| 705 // Helper functions that can be overridden in subclasses. | 705 // Helper functions that can be overridden in subclasses. |
| 706 | 706 |
| 707 /** Whether there are currently any subscribers on this [Stream]. */ | 707 /** Whether there are currently any subscribers on this [Stream]. */ |
| 708 bool get _hasSubscribers => !_InternalLinkList.isEmpty(this); | 708 bool get _hasListener => !_InternalLinkList.isEmpty(this); |
| 709 | 709 |
| 710 /** | 710 /** |
| 711 * Create the new subscription object. | 711 * Create the new subscription object. |
| 712 */ | 712 */ |
| 713 _StreamListener<T> _createSubscription( | 713 _StreamListener<T> _createSubscription( |
| 714 void onData(T data), | 714 void onData(T data), |
| 715 void onError(AsyncError error), | 715 void onError(AsyncError error), |
| 716 void onDone(), | 716 void onDone(), |
| 717 bool unsubscribeOnError) { | 717 bool unsubscribeOnError) { |
| 718 return new _StreamSubscriptionImpl<T>( | 718 return new _StreamSubscriptionImpl<T>( |
| (...skipping 11 matching lines...) Expand all Loading... |
| 730 * after the iteration is complete. | 730 * after the iteration is complete. |
| 731 * | 731 * |
| 732 * The [action] must not throw, or the controller will be left in an | 732 * The [action] must not throw, or the controller will be left in an |
| 733 * invalid state. | 733 * invalid state. |
| 734 * | 734 * |
| 735 * This method must not be called while [isFiring] is true. | 735 * This method must not be called while [isFiring] is true. |
| 736 */ | 736 */ |
| 737 void _forEachSubscriber( | 737 void _forEachSubscriber( |
| 738 void action(_StreamListener<T> subscription)) { | 738 void action(_StreamListener<T> subscription)) { |
| 739 assert(!_isFiring); | 739 assert(!_isFiring); |
| 740 if (!_hasSubscribers) return; | 740 if (!_hasListener) return; |
| 741 bool wasInputPaused = _isInputPaused; | 741 bool wasInputPaused = _isInputPaused; |
| 742 _startFiring(); | 742 _startFiring(); |
| 743 _InternalLink cursor = this._nextLink; | 743 _InternalLink cursor = this._nextLink; |
| 744 while (!identical(cursor, this)) { | 744 while (!identical(cursor, this)) { |
| 745 _StreamListener<T> current = cursor; | 745 _StreamListener<T> current = cursor; |
| 746 if (current._needsEvent(_currentEventIdBit)) { | 746 if (current._needsEvent(_currentEventIdBit)) { |
| 747 action(current); | 747 action(current); |
| 748 // Marks as having received the event. | 748 // Marks as having received the event. |
| 749 current._toggleEventReceived(); | 749 current._toggleEventReceived(); |
| 750 } | 750 } |
| 751 cursor = current._nextLink; | 751 cursor = current._nextLink; |
| 752 if (current._isPendingUnsubscribe) { | 752 if (current._isPendingUnsubscribe) { |
| 753 _removeListener(current); | 753 _removeListener(current); |
| 754 } | 754 } |
| 755 } | 755 } |
| 756 _endFiring(wasInputPaused); | 756 _endFiring(wasInputPaused); |
| 757 } | 757 } |
| 758 | 758 |
| 759 void _addListener(_StreamListener listener) { | 759 void _addListener(_StreamListener listener) { |
| 760 listener._setSubscribed(_currentEventIdBit); | 760 listener._setSubscribed(_currentEventIdBit); |
| 761 bool hadSubscribers = _hasSubscribers; | 761 bool hadListener = _hasListener; |
| 762 _InternalLinkList.add(this, listener); | 762 _InternalLinkList.add(this, listener); |
| 763 if (!hadSubscribers && _isInactive) { | 763 if (!hadListener && _isInactive) { |
| 764 _checkCallbacks(false, false); | 764 _checkCallbacks(false, false); |
| 765 if (!_isPaused && _hasPendingEvent) { | 765 if (!_isPaused && _hasPendingEvent) { |
| 766 _schedulePendingEvents(); | 766 _schedulePendingEvents(); |
| 767 } | 767 } |
| 768 } | 768 } |
| 769 } | 769 } |
| 770 | 770 |
| 771 /** | 771 /** |
| 772 * Handle a cancel requested from a [_StreamListener]. | 772 * Handle a cancel requested from a [_StreamListener]. |
| 773 * | 773 * |
| (...skipping 542 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1316 _subscription.resume(); | 1316 _subscription.resume(); |
| 1317 } | 1317 } |
| 1318 } | 1318 } |
| 1319 } | 1319 } |
| 1320 | 1320 |
| 1321 /** | 1321 /** |
| 1322 * Subscribe or unsubscribe on [_source] depending on whether | 1322 * Subscribe or unsubscribe on [_source] depending on whether |
| 1323 * [_stream] has subscribers. | 1323 * [_stream] has subscribers. |
| 1324 */ | 1324 */ |
| 1325 void _onSubscriptionStateChange() { | 1325 void _onSubscriptionStateChange() { |
| 1326 if (_hasSubscribers) { | 1326 if (_hasListener) { |
| 1327 assert(_subscription == null); | 1327 assert(_subscription == null); |
| 1328 _subscription = _source.listen(this._add, | 1328 _subscription = _source.listen(this._add, |
| 1329 onError: this._addError, | 1329 onError: this._addError, |
| 1330 onDone: this._close); | 1330 onDone: this._close); |
| 1331 } else { | 1331 } else { |
| 1332 // TODO(lrn): Check why this can happen. | 1332 // TODO(lrn): Check why this can happen. |
| 1333 if (_subscription == null) return; | 1333 if (_subscription == null) return; |
| 1334 _subscription.cancel(); | 1334 _subscription.cancel(); |
| 1335 _subscription = null; | 1335 _subscription = null; |
| 1336 } | 1336 } |
| 1337 } | 1337 } |
| 1338 } | 1338 } |
| OLD | NEW |