OLD | NEW |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 // States shared by single/multi stream implementations. | 7 // States shared by single/multi stream implementations. |
8 | 8 |
9 // Completion state of the stream. | 9 // Completion state of the stream. |
10 /// Initial and default state where the stream can receive and send events. | 10 /// Initial and default state where the stream can receive and send events. |
(...skipping 208 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
219 /** | 219 /** |
220 * The bit representing the current or last event fired. | 220 * The bit representing the current or last event fired. |
221 * | 221 * |
222 * This bit matches a bit on listeners that have received the corresponding | 222 * This bit matches a bit on listeners that have received the corresponding |
223 * event. It is toggled for each new event being fired. | 223 * event. It is toggled for each new event being fired. |
224 */ | 224 */ |
225 int get _currentEventIdBit => | 225 int get _currentEventIdBit => |
226 (_state & _STREAM_EVENT_ID ) >> _STREAM_EVENT_ID_SHIFT; | 226 (_state & _STREAM_EVENT_ID ) >> _STREAM_EVENT_ID_SHIFT; |
227 | 227 |
228 /** Whether there is currently a subscriber on this [Stream]. */ | 228 /** Whether there is currently a subscriber on this [Stream]. */ |
229 bool get _hasSubscribers; | 229 bool get _hasListener; |
230 | 230 |
231 | 231 |
232 /** Whether the state bits allow firing. */ | 232 /** Whether the state bits allow firing. */ |
233 bool get _mayFireState { | 233 bool get _mayFireState { |
234 // The state allows firing unless: | 234 // The state allows firing unless: |
235 // - it's currently firing | 235 // - it's currently firing |
236 // - it's currently in a callback | 236 // - it's currently in a callback |
237 // - it's paused | 237 // - it's paused |
238 const int mask = | 238 const int mask = |
239 _STREAM_FIRING | | 239 _STREAM_FIRING | |
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
279 } | 279 } |
280 | 280 |
281 void _setComplete() { | 281 void _setComplete() { |
282 assert(_isClosed); | 282 assert(_isClosed); |
283 _state = _state |_STREAM_COMPLETE; | 283 _state = _state |_STREAM_COMPLETE; |
284 } | 284 } |
285 | 285 |
286 void _startFiring() { | 286 void _startFiring() { |
287 assert(!_isFiring); | 287 assert(!_isFiring); |
288 assert(!_isInCallback); | 288 assert(!_isInCallback); |
289 assert(_hasSubscribers); | 289 assert(_hasListener); |
290 assert(!_isPaused); | 290 assert(!_isPaused); |
291 // This sets the _STREAM_FIRING bit and toggles the _STREAM_EVENT_ID | 291 // This sets the _STREAM_FIRING bit and toggles the _STREAM_EVENT_ID |
292 // bit. All current subscribers will now have a _LISTENER_EVENT_ID | 292 // bit. All current subscribers will now have a _LISTENER_EVENT_ID |
293 // that doesn't match _STREAM_EVENT_ID, and they will receive the | 293 // that doesn't match _STREAM_EVENT_ID, and they will receive the |
294 // event being fired. | 294 // event being fired. |
295 _state ^= _STREAM_FIRING | _STREAM_EVENT_ID; | 295 _state ^= _STREAM_FIRING | _STREAM_EVENT_ID; |
296 } | 296 } |
297 | 297 |
298 void _endFiring(bool wasInputPaused) { | 298 void _endFiring(bool wasInputPaused) { |
299 assert(_isFiring); | 299 assert(_isFiring); |
(...skipping 110 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
410 */ | 410 */ |
411 void _forEachSubscriber(void action(_StreamSubscriptionImpl<T> subscription)); | 411 void _forEachSubscriber(void action(_StreamSubscriptionImpl<T> subscription)); |
412 | 412 |
413 /** | 413 /** |
414 * Checks whether the subscription/pause state has changed. | 414 * Checks whether the subscription/pause state has changed. |
415 * | 415 * |
416 * Calls the appropriate callback if the state has changed from the | 416 * Calls the appropriate callback if the state has changed from the |
417 * provided one. Repeats calling callbacks as long as the call changes | 417 * provided one. Repeats calling callbacks as long as the call changes |
418 * the state. | 418 * the state. |
419 */ | 419 */ |
420 void _checkCallbacks(bool hadSubscribers, bool wasPaused) { | 420 void _checkCallbacks(bool hadListener, bool wasPaused) { |
421 assert(!_isFiring); | 421 assert(!_isFiring); |
422 // Will be handled after the current callback. | 422 // Will be handled after the current callback. |
423 if (_isInCallback) return; | 423 if (_isInCallback) return; |
424 if (_hasPendingResume && !_hasPendingEvent) { | 424 if (_hasPendingResume && !_hasPendingEvent) { |
425 _state ^= _STREAM_PENDING_RESUME; | 425 _state ^= _STREAM_PENDING_RESUME; |
426 } | 426 } |
427 _state |= _STREAM_CALLBACK; | 427 _state |= _STREAM_CALLBACK; |
428 while (true) { | 428 while (true) { |
429 bool hasSubscribers = _hasSubscribers; | 429 bool hasListener = _hasListener; |
430 bool isPaused = _isInputPaused; | 430 bool isPaused = _isInputPaused; |
431 if (hadSubscribers != hasSubscribers) { | 431 if (hadListener != hasListener) { |
432 _onSubscriptionStateChange(); | 432 _onSubscriptionStateChange(); |
433 } else if (isPaused != wasPaused) { | 433 } else if (isPaused != wasPaused) { |
434 _onPauseStateChange(); | 434 _onPauseStateChange(); |
435 } else { | 435 } else { |
436 _state ^= _STREAM_CALLBACK; | 436 _state ^= _STREAM_CALLBACK; |
437 return; | 437 return; |
438 } | 438 } |
439 wasPaused = isPaused; | 439 wasPaused = isPaused; |
440 hadSubscribers = hasSubscribers; | 440 hadListener = hasListener; |
441 } | 441 } |
442 } | 442 } |
443 | 443 |
444 /** | 444 /** |
445 * Called when the first subscriber requests a pause or the last a resume. | 445 * Called when the first subscriber requests a pause or the last a resume. |
446 * | 446 * |
447 * Read [isPaused] to see the new state. | 447 * Read [isPaused] to see the new state. |
448 */ | 448 */ |
449 void _onPauseStateChange() {} | 449 void _onPauseStateChange() {} |
450 | 450 |
451 /** | 451 /** |
452 * Called when the first listener subscribes or the last unsubscribes. | 452 * Called when the first listener subscribes or the last unsubscribes. |
453 * | 453 * |
454 * Read [hasSubscribers] to see what the new state is. | 454 * Read [hasListener] to see what the new state is. |
455 */ | 455 */ |
456 void _onSubscriptionStateChange() {} | 456 void _onSubscriptionStateChange() {} |
457 | 457 |
458 /** | 458 /** |
459 * Add a pending event at the end of the pending event queue. | 459 * Add a pending event at the end of the pending event queue. |
460 * | 460 * |
461 * Schedules events if currently not paused and inside a callback. | 461 * Schedules events if currently not paused and inside a callback. |
462 */ | 462 */ |
463 void _addPendingEvent(_DelayedEvent event) { | 463 void _addPendingEvent(_DelayedEvent event) { |
464 if (_pendingEvents == null) _pendingEvents = new _StreamImplEvents(); | 464 if (_pendingEvents == null) _pendingEvents = new _StreamImplEvents(); |
(...skipping 17 matching lines...) Expand all Loading... |
482 events.handleNext(this); | 482 events.handleNext(this); |
483 } while (!events.isEmpty); | 483 } while (!events.isEmpty); |
484 } | 484 } |
485 | 485 |
486 /** | 486 /** |
487 * Send a data event directly to each subscriber. | 487 * Send a data event directly to each subscriber. |
488 */ | 488 */ |
489 _sendData(T value) { | 489 _sendData(T value) { |
490 assert(!_isPaused); | 490 assert(!_isPaused); |
491 assert(!_isComplete); | 491 assert(!_isComplete); |
492 if (!_hasSubscribers) return; | 492 if (!_hasListener) return; |
493 _forEachSubscriber((subscriber) { | 493 _forEachSubscriber((subscriber) { |
494 try { | 494 try { |
495 subscriber._sendData(value); | 495 subscriber._sendData(value); |
496 } on AsyncError catch (e) { | 496 } on AsyncError catch (e) { |
497 e.throwDelayed(); | 497 e.throwDelayed(); |
498 } catch (e, s) { | 498 } catch (e, s) { |
499 new AsyncError(e, s).throwDelayed(); | 499 new AsyncError(e, s).throwDelayed(); |
500 } | 500 } |
501 }); | 501 }); |
502 } | 502 } |
503 | 503 |
504 /** | 504 /** |
505 * Sends an error event directly to each subscriber. | 505 * Sends an error event directly to each subscriber. |
506 */ | 506 */ |
507 void _sendError(AsyncError error) { | 507 void _sendError(AsyncError error) { |
508 assert(!_isPaused); | 508 assert(!_isPaused); |
509 assert(!_isComplete); | 509 assert(!_isComplete); |
510 if (!_hasSubscribers) return; | 510 if (!_hasListener) return; |
511 _forEachSubscriber((subscriber) { | 511 _forEachSubscriber((subscriber) { |
512 try { | 512 try { |
513 subscriber._sendError(error); | 513 subscriber._sendError(error); |
514 } on AsyncError catch (e) { | 514 } on AsyncError catch (e) { |
515 e.throwDelayed(); | 515 e.throwDelayed(); |
516 } catch (e, s) { | 516 } catch (e, s) { |
517 new AsyncError.withCause(e, s, error).throwDelayed(); | 517 new AsyncError.withCause(e, s, error).throwDelayed(); |
518 } | 518 } |
519 }); | 519 }); |
520 } | 520 } |
521 | 521 |
522 /** | 522 /** |
523 * Sends the "done" message directly to each subscriber. | 523 * Sends the "done" message directly to each subscriber. |
524 * This automatically stops further subscription and | 524 * This automatically stops further subscription and |
525 * unsubscribes all subscribers. | 525 * unsubscribes all subscribers. |
526 */ | 526 */ |
527 void _sendDone() { | 527 void _sendDone() { |
528 assert(!_isPaused); | 528 assert(!_isPaused); |
529 assert(_isClosed); | 529 assert(_isClosed); |
530 _setComplete(); | 530 _setComplete(); |
531 if (!_hasSubscribers) return; | 531 if (!_hasListener) return; |
532 _forEachSubscriber((subscriber) { | 532 _forEachSubscriber((subscriber) { |
533 _cancel(subscriber); | 533 _cancel(subscriber); |
534 try { | 534 try { |
535 subscriber._sendDone(); | 535 subscriber._sendDone(); |
536 } on AsyncError catch (e) { | 536 } on AsyncError catch (e) { |
537 e.throwDelayed(); | 537 e.throwDelayed(); |
538 } catch (e, s) { | 538 } catch (e, s) { |
539 new AsyncError(e, s).throwDelayed(); | 539 new AsyncError(e, s).throwDelayed(); |
540 } | 540 } |
541 }); | 541 }); |
542 assert(!_hasSubscribers); | 542 assert(!_hasListener); |
543 } | 543 } |
544 } | 544 } |
545 | 545 |
546 // ------------------------------------------------------------------- | 546 // ------------------------------------------------------------------- |
547 // Default implementation of a stream with a single subscriber. | 547 // Default implementation of a stream with a single subscriber. |
548 // ------------------------------------------------------------------- | 548 // ------------------------------------------------------------------- |
549 /** | 549 /** |
550 * Default implementation of stream capable of sending events to one subscriber. | 550 * Default implementation of stream capable of sending events to one subscriber. |
551 * | 551 * |
552 * Any class needing to implement [Stream] can either directly extend this | 552 * Any class needing to implement [Stream] can either directly extend this |
553 * class, or extend [Stream] and delegate the subscribe method to an instance | 553 * class, or extend [Stream] and delegate the subscribe method to an instance |
554 * of this class. | 554 * of this class. |
555 * | 555 * |
556 * The only public methods are those of [Stream], so instances of | 556 * The only public methods are those of [Stream], so instances of |
557 * [_SingleStreamImpl] can be returned directly as a [Stream] without exposing | 557 * [_SingleStreamImpl] can be returned directly as a [Stream] without exposing |
558 * internal functionality. | 558 * internal functionality. |
559 * | 559 * |
560 * The [StreamController] is a public facing version of this class, with | 560 * The [StreamController] is a public facing version of this class, with |
561 * some methods made public. | 561 * some methods made public. |
562 * | 562 * |
563 * The user interface of [_SingleStreamImpl] are the following methods: | 563 * The user interface of [_SingleStreamImpl] are the following methods: |
564 * * [_add]: Add a data event to the stream. | 564 * * [_add]: Add a data event to the stream. |
565 * * [_addError]: Add an error event to the stream. | 565 * * [_addError]: Add an error event to the stream. |
566 * * [_close]: Request to close the stream. | 566 * * [_close]: Request to close the stream. |
567 * * [_onSubscriberStateChange]: Called when receiving the first subscriber or | 567 * * [_onSubscriberStateChange]: Called when receiving the first subscriber or |
568 * when losing the last subscriber. | 568 * when losing the last subscriber. |
569 * * [_onPauseStateChange]: Called when entering or leaving paused mode. | 569 * * [_onPauseStateChange]: Called when entering or leaving paused mode. |
570 * * [_hasSubscribers]: Test whether there are currently any subscribers. | 570 * * [_hasListener]: Test whether there are currently any subscribers. |
571 * * [_isInputPaused]: Test whether the stream is currently paused. | 571 * * [_isInputPaused]: Test whether the stream is currently paused. |
572 * The user should not add new events while the stream is paused, but if it | 572 * The user should not add new events while the stream is paused, but if it |
573 * happens anyway, the stream will enqueue the events just as when new events | 573 * happens anyway, the stream will enqueue the events just as when new events |
574 * arrive while still firing an old event. | 574 * arrive while still firing an old event. |
575 */ | 575 */ |
576 class _SingleStreamImpl<T> extends _StreamImpl<T> { | 576 class _SingleStreamImpl<T> extends _StreamImpl<T> { |
577 _StreamListener _subscriber = null; | 577 _StreamListener _subscriber = null; |
578 | 578 |
579 /** Whether there is currently a subscriber on this [Stream]. */ | 579 /** Whether there is currently a subscriber on this [Stream]. */ |
580 bool get _hasSubscribers => _subscriber != null; | 580 bool get _hasListener => _subscriber != null; |
581 | 581 |
582 // ------------------------------------------------------------------- | 582 // ------------------------------------------------------------------- |
583 // Internal implementation. | 583 // Internal implementation. |
584 | 584 |
585 _SingleStreamImpl() { | 585 _SingleStreamImpl() { |
586 // Start out paused. | 586 // Start out paused. |
587 _updatePauseCount(1); | 587 _updatePauseCount(1); |
588 } | 588 } |
589 | 589 |
590 /** | 590 /** |
591 * Create the new subscription object. | 591 * Create the new subscription object. |
592 */ | 592 */ |
593 _StreamSubscriptionImpl<T> _createSubscription( | 593 _StreamSubscriptionImpl<T> _createSubscription( |
594 void onData(T data), | 594 void onData(T data), |
595 void onError(AsyncError error), | 595 void onError(AsyncError error), |
596 void onDone(), | 596 void onDone(), |
597 bool unsubscribeOnError) { | 597 bool unsubscribeOnError) { |
598 return new _StreamSubscriptionImpl<T>( | 598 return new _StreamSubscriptionImpl<T>( |
599 this, onData, onError, onDone, unsubscribeOnError); | 599 this, onData, onError, onDone, unsubscribeOnError); |
600 } | 600 } |
601 | 601 |
602 void _addListener(_StreamListener subscription) { | 602 void _addListener(_StreamListener subscription) { |
603 assert(!_isComplete); | 603 assert(!_isComplete); |
604 if (_hasSubscribers) { | 604 if (_hasListener) { |
605 throw new StateError("Stream already has subscriber."); | 605 throw new StateError("Stream already has subscriber."); |
606 } | 606 } |
607 assert(_pauseCount == 1); | 607 assert(_pauseCount == 1); |
608 _updatePauseCount(-1); | 608 _updatePauseCount(-1); |
609 _subscriber = subscription; | 609 _subscriber = subscription; |
610 subscription._setSubscribed(0); | 610 subscription._setSubscribed(0); |
611 if (_isInactive) { | 611 if (_isInactive) { |
612 _checkCallbacks(false, true); | 612 _checkCallbacks(false, true); |
613 if (!_isPaused && _hasPendingEvent) { | 613 if (!_isPaused && _hasPendingEvent) { |
614 _schedulePendingEvents(); | 614 _schedulePendingEvents(); |
(...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
674 * The [StreamController] is a public facing version of this class, with | 674 * The [StreamController] is a public facing version of this class, with |
675 * some methods made public. | 675 * some methods made public. |
676 * | 676 * |
677 * The user interface of [_MultiStreamImpl] are the following methods: | 677 * The user interface of [_MultiStreamImpl] are the following methods: |
678 * * [_add]: Add a data event to the stream. | 678 * * [_add]: Add a data event to the stream. |
679 * * [_addError]: Add an error event to the stream. | 679 * * [_addError]: Add an error event to the stream. |
680 * * [_close]: Request to close the stream. | 680 * * [_close]: Request to close the stream. |
681 * * [_onSubscriptionStateChange]: Called when receiving the first subscriber or | 681 * * [_onSubscriptionStateChange]: Called when receiving the first subscriber or |
682 * when losing the last subscriber. | 682 * when losing the last subscriber. |
683 * * [_onPauseStateChange]: Called when entering or leaving paused mode. | 683 * * [_onPauseStateChange]: Called when entering or leaving paused mode. |
684 * * [_hasSubscribers]: Test whether there are currently any subscribers. | 684 * * [_hasListener]: Test whether there are currently any subscribers. |
685 * * [_isPaused]: Test whether the stream is currently paused. | 685 * * [_isPaused]: Test whether the stream is currently paused. |
686 * The user should not add new events while the stream is paused, but if it | 686 * The user should not add new events while the stream is paused, but if it |
687 * happens anyway, the stream will enqueue the events just as when new events | 687 * happens anyway, the stream will enqueue the events just as when new events |
688 * arrive while still firing an old event. | 688 * arrive while still firing an old event. |
689 */ | 689 */ |
690 class _MultiStreamImpl<T> extends _StreamImpl<T> | 690 class _MultiStreamImpl<T> extends _StreamImpl<T> |
691 implements _InternalLinkList { | 691 implements _InternalLinkList { |
692 // Link list implementation (mixin when possible). | 692 // Link list implementation (mixin when possible). |
693 _InternalLink _nextLink; | 693 _InternalLink _nextLink; |
694 _InternalLink _previousLink; | 694 _InternalLink _previousLink; |
695 | 695 |
696 _MultiStreamImpl() { | 696 _MultiStreamImpl() { |
697 _nextLink = _previousLink = this; | 697 _nextLink = _previousLink = this; |
698 } | 698 } |
699 | 699 |
700 bool get isBroadcast => true; | 700 bool get isBroadcast => true; |
701 | 701 |
702 Stream<T> asBroadcastStream() => this; | 702 Stream<T> asBroadcastStream() => this; |
703 | 703 |
704 // ------------------------------------------------------------------ | 704 // ------------------------------------------------------------------ |
705 // Helper functions that can be overridden in subclasses. | 705 // Helper functions that can be overridden in subclasses. |
706 | 706 |
707 /** Whether there are currently any subscribers on this [Stream]. */ | 707 /** Whether there are currently any subscribers on this [Stream]. */ |
708 bool get _hasSubscribers => !_InternalLinkList.isEmpty(this); | 708 bool get _hasListener => !_InternalLinkList.isEmpty(this); |
709 | 709 |
710 /** | 710 /** |
711 * Create the new subscription object. | 711 * Create the new subscription object. |
712 */ | 712 */ |
713 _StreamListener<T> _createSubscription( | 713 _StreamListener<T> _createSubscription( |
714 void onData(T data), | 714 void onData(T data), |
715 void onError(AsyncError error), | 715 void onError(AsyncError error), |
716 void onDone(), | 716 void onDone(), |
717 bool unsubscribeOnError) { | 717 bool unsubscribeOnError) { |
718 return new _StreamSubscriptionImpl<T>( | 718 return new _StreamSubscriptionImpl<T>( |
(...skipping 11 matching lines...) Expand all Loading... |
730 * after the iteration is complete. | 730 * after the iteration is complete. |
731 * | 731 * |
732 * The [action] must not throw, or the controller will be left in an | 732 * The [action] must not throw, or the controller will be left in an |
733 * invalid state. | 733 * invalid state. |
734 * | 734 * |
735 * This method must not be called while [isFiring] is true. | 735 * This method must not be called while [isFiring] is true. |
736 */ | 736 */ |
737 void _forEachSubscriber( | 737 void _forEachSubscriber( |
738 void action(_StreamListener<T> subscription)) { | 738 void action(_StreamListener<T> subscription)) { |
739 assert(!_isFiring); | 739 assert(!_isFiring); |
740 if (!_hasSubscribers) return; | 740 if (!_hasListener) return; |
741 bool wasInputPaused = _isInputPaused; | 741 bool wasInputPaused = _isInputPaused; |
742 _startFiring(); | 742 _startFiring(); |
743 _InternalLink cursor = this._nextLink; | 743 _InternalLink cursor = this._nextLink; |
744 while (!identical(cursor, this)) { | 744 while (!identical(cursor, this)) { |
745 _StreamListener<T> current = cursor; | 745 _StreamListener<T> current = cursor; |
746 if (current._needsEvent(_currentEventIdBit)) { | 746 if (current._needsEvent(_currentEventIdBit)) { |
747 action(current); | 747 action(current); |
748 // Marks as having received the event. | 748 // Marks as having received the event. |
749 current._toggleEventReceived(); | 749 current._toggleEventReceived(); |
750 } | 750 } |
751 cursor = current._nextLink; | 751 cursor = current._nextLink; |
752 if (current._isPendingUnsubscribe) { | 752 if (current._isPendingUnsubscribe) { |
753 _removeListener(current); | 753 _removeListener(current); |
754 } | 754 } |
755 } | 755 } |
756 _endFiring(wasInputPaused); | 756 _endFiring(wasInputPaused); |
757 } | 757 } |
758 | 758 |
759 void _addListener(_StreamListener listener) { | 759 void _addListener(_StreamListener listener) { |
760 listener._setSubscribed(_currentEventIdBit); | 760 listener._setSubscribed(_currentEventIdBit); |
761 bool hadSubscribers = _hasSubscribers; | 761 bool hadListener = _hasListener; |
762 _InternalLinkList.add(this, listener); | 762 _InternalLinkList.add(this, listener); |
763 if (!hadSubscribers && _isInactive) { | 763 if (!hadListener && _isInactive) { |
764 _checkCallbacks(false, false); | 764 _checkCallbacks(false, false); |
765 if (!_isPaused && _hasPendingEvent) { | 765 if (!_isPaused && _hasPendingEvent) { |
766 _schedulePendingEvents(); | 766 _schedulePendingEvents(); |
767 } | 767 } |
768 } | 768 } |
769 } | 769 } |
770 | 770 |
771 /** | 771 /** |
772 * Handle a cancel requested from a [_StreamListener]. | 772 * Handle a cancel requested from a [_StreamListener]. |
773 * | 773 * |
(...skipping 542 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1316 _subscription.resume(); | 1316 _subscription.resume(); |
1317 } | 1317 } |
1318 } | 1318 } |
1319 } | 1319 } |
1320 | 1320 |
1321 /** | 1321 /** |
1322 * Subscribe or unsubscribe on [_source] depending on whether | 1322 * Subscribe or unsubscribe on [_source] depending on whether |
1323 * [_stream] has subscribers. | 1323 * [_stream] has subscribers. |
1324 */ | 1324 */ |
1325 void _onSubscriptionStateChange() { | 1325 void _onSubscriptionStateChange() { |
1326 if (_hasSubscribers) { | 1326 if (_hasListener) { |
1327 assert(_subscription == null); | 1327 assert(_subscription == null); |
1328 _subscription = _source.listen(this._add, | 1328 _subscription = _source.listen(this._add, |
1329 onError: this._addError, | 1329 onError: this._addError, |
1330 onDone: this._close); | 1330 onDone: this._close); |
1331 } else { | 1331 } else { |
1332 // TODO(lrn): Check why this can happen. | 1332 // TODO(lrn): Check why this can happen. |
1333 if (_subscription == null) return; | 1333 if (_subscription == null) return; |
1334 _subscription.cancel(); | 1334 _subscription.cancel(); |
1335 _subscription = null; | 1335 _subscription = null; |
1336 } | 1336 } |
1337 } | 1337 } |
1338 } | 1338 } |
OLD | NEW |