| OLD | NEW |
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 // States shared by single/multi stream implementations. | 7 // States shared by single/multi stream implementations. |
| 8 | 8 |
| 9 // Completion state of the stream. | 9 // Completion state of the stream. |
| 10 /// Initial and default state where the stream can receive and send events. | 10 /// Initial and default state where the stream can receive and send events. |
| (...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 72 * Also supports scheduling the events for later execution. | 72 * Also supports scheduling the events for later execution. |
| 73 */ | 73 */ |
| 74 _PendingEvents _pendingEvents; | 74 _PendingEvents _pendingEvents; |
| 75 | 75 |
| 76 // ------------------------------------------------------------------ | 76 // ------------------------------------------------------------------ |
| 77 // Stream interface. | 77 // Stream interface. |
| 78 | 78 |
| 79 StreamSubscription<T> listen(void onData(T data), | 79 StreamSubscription<T> listen(void onData(T data), |
| 80 { void onError(AsyncError error), | 80 { void onError(AsyncError error), |
| 81 void onDone(), | 81 void onDone(), |
| 82 bool unsubscribeOnError }) { | 82 bool cancelOnError }) { |
| 83 if (_isComplete) { | 83 if (_isComplete) { |
| 84 return new _DoneSubscription(onDone); | 84 return new _DoneSubscription(onDone); |
| 85 } | 85 } |
| 86 if (onData == null) onData = _nullDataHandler; | 86 if (onData == null) onData = _nullDataHandler; |
| 87 if (onError == null) onError = _nullErrorHandler; | 87 if (onError == null) onError = _nullErrorHandler; |
| 88 if (onDone == null) onDone = _nullDoneHandler; | 88 if (onDone == null) onDone = _nullDoneHandler; |
| 89 unsubscribeOnError = identical(true, unsubscribeOnError); | 89 cancelOnError = identical(true, cancelOnError); |
| 90 _StreamSubscriptionImpl subscription = | 90 _StreamSubscriptionImpl subscription = |
| 91 _createSubscription(onData, onError, onDone, unsubscribeOnError); | 91 _createSubscription(onData, onError, onDone, cancelOnError); |
| 92 _addListener(subscription); | 92 _addListener(subscription); |
| 93 return subscription; | 93 return subscription; |
| 94 } | 94 } |
| 95 | 95 |
| 96 // ------------------------------------------------------------------ | 96 // ------------------------------------------------------------------ |
| 97 // EventSink interface-like methods for sending events into the stream. | 97 // EventSink interface-like methods for sending events into the stream. |
| 98 // It's the responsibility of the caller to ensure that the stream is not | 98 // It's the responsibility of the caller to ensure that the stream is not |
| 99 // paused when adding events. If the stream is paused, the events will be | 99 // paused when adding events. If the stream is paused, the events will be |
| 100 // queued, but it's better to not send events at all. | 100 // queued, but it's better to not send events at all. |
| 101 | 101 |
| (...skipping 267 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 369 void _schedulePendingEvents() { | 369 void _schedulePendingEvents() { |
| 370 assert(_hasPendingEvent); | 370 assert(_hasPendingEvent); |
| 371 _pendingEvents.schedule(this); | 371 _pendingEvents.schedule(this); |
| 372 } | 372 } |
| 373 | 373 |
| 374 /** Create a subscription object. Called by [subcribe]. */ | 374 /** Create a subscription object. Called by [subcribe]. */ |
| 375 _StreamSubscriptionImpl<T> _createSubscription( | 375 _StreamSubscriptionImpl<T> _createSubscription( |
| 376 void onData(T data), | 376 void onData(T data), |
| 377 void onError(AsyncError error), | 377 void onError(AsyncError error), |
| 378 void onDone(), | 378 void onDone(), |
| 379 bool unsubscribeOnError); | 379 bool cancelOnError); |
| 380 | 380 |
| 381 /** | 381 /** |
| 382 * Adds a listener to this stream. | 382 * Adds a listener to this stream. |
| 383 */ | 383 */ |
| 384 void _addListener(_StreamSubscriptionImpl subscription); | 384 void _addListener(_StreamSubscriptionImpl subscription); |
| 385 | 385 |
| 386 /** | 386 /** |
| 387 * Handle a cancel requested from a [_StreamSubscriptionImpl]. | 387 * Handle a cancel requested from a [_StreamSubscriptionImpl]. |
| 388 * | 388 * |
| 389 * This method is called from [_StreamSubscriptionImpl.cancel]. | 389 * This method is called from [_StreamSubscriptionImpl.cancel]. |
| (...skipping 197 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 587 _updatePauseCount(1); | 587 _updatePauseCount(1); |
| 588 } | 588 } |
| 589 | 589 |
| 590 /** | 590 /** |
| 591 * Create the new subscription object. | 591 * Create the new subscription object. |
| 592 */ | 592 */ |
| 593 _StreamSubscriptionImpl<T> _createSubscription( | 593 _StreamSubscriptionImpl<T> _createSubscription( |
| 594 void onData(T data), | 594 void onData(T data), |
| 595 void onError(AsyncError error), | 595 void onError(AsyncError error), |
| 596 void onDone(), | 596 void onDone(), |
| 597 bool unsubscribeOnError) { | 597 bool cancelOnError) { |
| 598 return new _StreamSubscriptionImpl<T>( | 598 return new _StreamSubscriptionImpl<T>( |
| 599 this, onData, onError, onDone, unsubscribeOnError); | 599 this, onData, onError, onDone, cancelOnError); |
| 600 } | 600 } |
| 601 | 601 |
| 602 void _addListener(_StreamListener subscription) { | 602 void _addListener(_StreamListener subscription) { |
| 603 assert(!_isComplete); | 603 assert(!_isComplete); |
| 604 if (_hasListener) { | 604 if (_hasListener) { |
| 605 throw new StateError("Stream already has subscriber."); | 605 throw new StateError("Stream already has subscriber."); |
| 606 } | 606 } |
| 607 assert(_pauseCount == 1); | 607 assert(_pauseCount == 1); |
| 608 _updatePauseCount(-1); | 608 _updatePauseCount(-1); |
| 609 _subscriber = subscription; | 609 _subscriber = subscription; |
| (...skipping 97 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 707 /** Whether there are currently any subscribers on this [Stream]. */ | 707 /** Whether there are currently any subscribers on this [Stream]. */ |
| 708 bool get _hasListener => !_InternalLinkList.isEmpty(this); | 708 bool get _hasListener => !_InternalLinkList.isEmpty(this); |
| 709 | 709 |
| 710 /** | 710 /** |
| 711 * Create the new subscription object. | 711 * Create the new subscription object. |
| 712 */ | 712 */ |
| 713 _StreamListener<T> _createSubscription( | 713 _StreamListener<T> _createSubscription( |
| 714 void onData(T data), | 714 void onData(T data), |
| 715 void onError(AsyncError error), | 715 void onError(AsyncError error), |
| 716 void onDone(), | 716 void onDone(), |
| 717 bool unsubscribeOnError) { | 717 bool cancelOnError) { |
| 718 return new _StreamSubscriptionImpl<T>( | 718 return new _StreamSubscriptionImpl<T>( |
| 719 this, onData, onError, onDone, unsubscribeOnError); | 719 this, onData, onError, onDone, cancelOnError); |
| 720 } | 720 } |
| 721 | 721 |
| 722 // ------------------------------------------------------------------- | 722 // ------------------------------------------------------------------- |
| 723 // Internal implementation. | 723 // Internal implementation. |
| 724 | 724 |
| 725 /** | 725 /** |
| 726 * Iterate over all current subscribers and perform an action on each. | 726 * Iterate over all current subscribers and perform an action on each. |
| 727 * | 727 * |
| 728 * The set of subscribers cannot be modified during this iteration. | 728 * The set of subscribers cannot be modified during this iteration. |
| 729 * All attempts to add or unsubscribe subscribers will be delayed until | 729 * All attempts to add or unsubscribe subscribers will be delayed until |
| (...skipping 164 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 894 * [_StreamImpl.createSubscription]. | 894 * [_StreamImpl.createSubscription]. |
| 895 * | 895 * |
| 896 * The subscription is in one of three states: | 896 * The subscription is in one of three states: |
| 897 * * Subscribed. | 897 * * Subscribed. |
| 898 * * Paused-and-subscribed. | 898 * * Paused-and-subscribed. |
| 899 * * Unsubscribed. | 899 * * Unsubscribed. |
| 900 * Unsubscribing also resumes any pauses started by the subscription. | 900 * Unsubscribing also resumes any pauses started by the subscription. |
| 901 */ | 901 */ |
| 902 class _StreamSubscriptionImpl<T> extends _StreamListener<T> | 902 class _StreamSubscriptionImpl<T> extends _StreamListener<T> |
| 903 implements StreamSubscription<T> { | 903 implements StreamSubscription<T> { |
| 904 final bool _unsubscribeOnError; | 904 final bool _cancelOnError; |
| 905 // TODO(ahe): Restore type when feature is implemented in dart2js | 905 // TODO(ahe): Restore type when feature is implemented in dart2js |
| 906 // checked mode. http://dartbug.com/7733 | 906 // checked mode. http://dartbug.com/7733 |
| 907 var /* _DataHandler<T> */ _onData; | 907 var /* _DataHandler<T> */ _onData; |
| 908 _ErrorHandler _onError; | 908 _ErrorHandler _onError; |
| 909 _DoneHandler _onDone; | 909 _DoneHandler _onDone; |
| 910 _StreamSubscriptionImpl(_StreamImpl source, | 910 _StreamSubscriptionImpl(_StreamImpl source, |
| 911 this._onData, | 911 this._onData, |
| 912 this._onError, | 912 this._onError, |
| 913 this._onDone, | 913 this._onDone, |
| 914 this._unsubscribeOnError) : super(source); | 914 this._cancelOnError) : super(source); |
| 915 | 915 |
| 916 void onData(void handleData(T event)) { | 916 void onData(void handleData(T event)) { |
| 917 if (handleData == null) handleData = _nullDataHandler; | 917 if (handleData == null) handleData = _nullDataHandler; |
| 918 _onData = handleData; | 918 _onData = handleData; |
| 919 } | 919 } |
| 920 | 920 |
| 921 void onError(void handleError(AsyncError error)) { | 921 void onError(void handleError(AsyncError error)) { |
| 922 if (handleError == null) handleError = _nullErrorHandler; | 922 if (handleError == null) handleError = _nullErrorHandler; |
| 923 _onError = handleError; | 923 _onError = handleError; |
| 924 } | 924 } |
| 925 | 925 |
| 926 void onDone(void handleDone()) { | 926 void onDone(void handleDone()) { |
| 927 if (handleDone == null) handleDone = _nullDoneHandler; | 927 if (handleDone == null) handleDone = _nullDoneHandler; |
| 928 _onDone = handleDone; | 928 _onDone = handleDone; |
| 929 } | 929 } |
| 930 | 930 |
| 931 void _sendData(T data) { | 931 void _sendData(T data) { |
| 932 _onData(data); | 932 _onData(data); |
| 933 } | 933 } |
| 934 | 934 |
| 935 void _sendError(AsyncError error) { | 935 void _sendError(AsyncError error) { |
| 936 _onError(error); | 936 _onError(error); |
| 937 if (_unsubscribeOnError) _source._cancel(this); | 937 if (_cancelOnError) _source._cancel(this); |
| 938 } | 938 } |
| 939 | 939 |
| 940 void _sendDone() { | 940 void _sendDone() { |
| 941 _onDone(); | 941 _onDone(); |
| 942 } | 942 } |
| 943 | 943 |
| 944 void cancel() { | 944 void cancel() { |
| 945 if (!_isSubscribed) return; | 945 if (!_isSubscribed) return; |
| 946 _source._cancel(this); | 946 _source._cancel(this); |
| 947 } | 947 } |
| (...skipping 408 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1356 onError: this._addError, | 1356 onError: this._addError, |
| 1357 onDone: this._close); | 1357 onDone: this._close); |
| 1358 } else { | 1358 } else { |
| 1359 // TODO(lrn): Check why this can happen. | 1359 // TODO(lrn): Check why this can happen. |
| 1360 if (_subscription == null) return; | 1360 if (_subscription == null) return; |
| 1361 _subscription.cancel(); | 1361 _subscription.cancel(); |
| 1362 _subscription = null; | 1362 _subscription = null; |
| 1363 } | 1363 } |
| 1364 } | 1364 } |
| 1365 } | 1365 } |
| OLD | NEW |