| OLD | NEW |
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 // part of dart.async; | 5 // part of dart.async; |
| 6 | 6 |
| 7 // States shared by single/multi stream implementations. | 7 // States shared by single/multi stream implementations. |
| 8 | 8 |
| 9 /// Initial and default state where the stream can receive and send events. | 9 /// Initial and default state where the stream can receive and send events. |
| 10 const int _STREAM_OPEN = 0; | 10 const int _STREAM_OPEN = 0; |
| (...skipping 201 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 212 * | 212 * |
| 213 * This methods is called from [_StreamListener.pause()]. | 213 * This methods is called from [_StreamListener.pause()]. |
| 214 * Subclasses can override this method, along with [isPaused] and | 214 * Subclasses can override this method, along with [isPaused] and |
| 215 * [createSubscription], if they want to do a different handling of paused | 215 * [createSubscription], if they want to do a different handling of paused |
| 216 * subscriptions, e.g., a filtering stream pausing its own source if all its | 216 * subscriptions, e.g., a filtering stream pausing its own source if all its |
| 217 * subscribers are paused. | 217 * subscribers are paused. |
| 218 */ | 218 */ |
| 219 void _pause(_StreamListener<T> listener, Signal resumeSignal) { | 219 void _pause(_StreamListener<T> listener, Signal resumeSignal) { |
| 220 assert(identical(listener._source, this)); | 220 assert(identical(listener._source, this)); |
| 221 if (!listener._isSubscribed) { | 221 if (!listener._isSubscribed) { |
| 222 throw new StateError("Subscription has been unsubscribed."); | 222 throw new StateError("Subscription has been canceled."); |
| 223 } | 223 } |
| 224 assert(!_isComplete); // There can be no subscribers when complete. | 224 assert(!_isComplete); // There can be no subscribers when complete. |
| 225 bool wasPaused = _isPaused; | 225 bool wasPaused = _isPaused; |
| 226 _incrementPauseCount(listener); | 226 _incrementPauseCount(listener); |
| 227 if (resumeSignal != null) { | 227 if (resumeSignal != null) { |
| 228 resumeSignal.then(() { this._resume(listener, true); }); | 228 resumeSignal.then(() { this._resume(listener, true); }); |
| 229 } | 229 } |
| 230 if (!wasPaused) { | 230 if (!wasPaused) { |
| 231 _onPauseStateChange(); | 231 _onPauseStateChange(); |
| 232 } | 232 } |
| (...skipping 24 matching lines...) Expand all Loading... |
| 257 void onError(AsyncError error), | 257 void onError(AsyncError error), |
| 258 void onDone(), | 258 void onDone(), |
| 259 bool unsubscribeOnError); | 259 bool unsubscribeOnError); |
| 260 | 260 |
| 261 /** | 261 /** |
| 262 * Adds a listener to this stream. | 262 * Adds a listener to this stream. |
| 263 */ | 263 */ |
| 264 void _addListener(_StreamSubscriptionImpl subscription); | 264 void _addListener(_StreamSubscriptionImpl subscription); |
| 265 | 265 |
| 266 /** | 266 /** |
| 267 * Handle an unsubscription requested from a [_StreamSubscriptionImpl]. | 267 * Handle a cancel requested from a [_StreamSubscriptionImpl]. |
| 268 * | 268 * |
| 269 * This method is called from [_StreamSubscriptionImpl.unsubscribe]. | 269 * This method is called from [_StreamSubscriptionImpl.cancel]. |
| 270 * | 270 * |
| 271 * If an event is currently firing, the subscription is delayed | 271 * If an event is currently firing, the cancel is delayed |
| 272 * until after the event has been sent to all subscribers. | 272 * until after the subscribers have received the event. |
| 273 */ | 273 */ |
| 274 void _unsubscribe(_StreamSubscriptionImpl subscriber); | 274 void _cancel(_StreamSubscriptionImpl subscriber); |
| 275 | 275 |
| 276 /** | 276 /** |
| 277 * Iterate over all current subscribers and perform an action on each. | 277 * Iterate over all current subscribers and perform an action on each. |
| 278 * | 278 * |
| 279 * Subscribers added during the iteration will not be visited. | 279 * Subscribers added during the iteration will not be visited. |
| 280 * Subscribers unsubscribed during the iteration will only be removed | 280 * Subscribers unsubscribed during the iteration will only be removed |
| 281 * after they have been acted on. | 281 * after they have been acted on. |
| 282 * | 282 * |
| 283 * Any change in the pause state is only reported after all subscribers have | 283 * Any change in the pause state is only reported after all subscribers have |
| 284 * received the event. | 284 * received the event. |
| (...skipping 68 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 353 * Sends the "done" message directly to each subscriber. | 353 * Sends the "done" message directly to each subscriber. |
| 354 * This automatically stops further subscription and | 354 * This automatically stops further subscription and |
| 355 * unsubscribes all subscribers. | 355 * unsubscribes all subscribers. |
| 356 */ | 356 */ |
| 357 void _sendDone() { | 357 void _sendDone() { |
| 358 assert(!_isPaused); | 358 assert(!_isPaused); |
| 359 assert(_isClosed); | 359 assert(_isClosed); |
| 360 _setComplete(); | 360 _setComplete(); |
| 361 if (!_hasSubscribers) return; | 361 if (!_hasSubscribers) return; |
| 362 _forEachSubscriber((subscriber) { | 362 _forEachSubscriber((subscriber) { |
| 363 _unsubscribe(subscriber); | 363 _cancel(subscriber); |
| 364 try { | 364 try { |
| 365 subscriber._sendDone(); | 365 subscriber._sendDone(); |
| 366 } catch (e, s) { | 366 } catch (e, s) { |
| 367 new AsyncError(e, s).throwDelayed(); | 367 new AsyncError(e, s).throwDelayed(); |
| 368 } | 368 } |
| 369 }); | 369 }); |
| 370 assert(!_hasSubscribers); | 370 assert(!_hasSubscribers); |
| 371 _onSubscriptionStateChange(); | 371 _onSubscriptionStateChange(); |
| 372 } | 372 } |
| 373 } | 373 } |
| (...skipping 57 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 431 throw new StateError("Stream has already subscriber."); | 431 throw new StateError("Stream has already subscriber."); |
| 432 } | 432 } |
| 433 _subscriber = subscription; | 433 _subscriber = subscription; |
| 434 subscription._setSubscribed(0); | 434 subscription._setSubscribed(0); |
| 435 _onSubscriptionStateChange(); | 435 _onSubscriptionStateChange(); |
| 436 // TODO(floitsch): Should this be delayed? | 436 // TODO(floitsch): Should this be delayed? |
| 437 _handlePendingEvents(); | 437 _handlePendingEvents(); |
| 438 } | 438 } |
| 439 | 439 |
| 440 /** | 440 /** |
| 441 * Handle an unsubscription requested from a [_StreamSubscriptionImpl]. | 441 * Handle a cancel requested from a [_StreamSubscriptionImpl]. |
| 442 * | 442 * |
| 443 * This method is called from [_StreamSubscriptionImpl.unsubscribe]. | 443 * This method is called from [_StreamSubscriptionImpl.cancel]. |
| 444 * | 444 * |
| 445 * If an event is currently firing, the subscription is delayed | 445 * If an event is currently firing, the cancel is delayed |
| 446 * until after the event has been sent to all subscribers. | 446 * until after the subscriber has received the event. |
| 447 */ | 447 */ |
| 448 void _unsubscribe(_StreamSubscriptionImpl subscriber) { | 448 void _cancel(_StreamSubscriptionImpl subscriber) { |
| 449 assert(identical(subscriber._source, this)); | 449 assert(identical(subscriber._source, this)); |
| 450 // We allow unsubscribing the currently firing subscription during | 450 // We allow unsubscribing the currently firing subscription during |
| 451 // the event firing, because it is indistinguishable from delaying it since | 451 // the event firing, because it is indistinguishable from delaying it since |
| 452 // that event has already received the event. | 452 // that event has already received the event. |
| 453 if (!identical(_subscriber, subscriber)) { | 453 if (!identical(_subscriber, subscriber)) { |
| 454 // You may unsubscribe more than once, only the first one counts. | 454 // You may unsubscribe more than once, only the first one counts. |
| 455 return; | 455 return; |
| 456 } | 456 } |
| 457 _subscriber = null; | 457 _subscriber = null; |
| 458 int timesPaused = subscriber._setUnsubscribed(); | 458 int timesPaused = subscriber._setUnsubscribed(); |
| (...skipping 114 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 573 void _addListener(_StreamListener listener) { | 573 void _addListener(_StreamListener listener) { |
| 574 listener._setSubscribed(_currentEventIdBit); | 574 listener._setSubscribed(_currentEventIdBit); |
| 575 bool firstSubscriber = !_hasSubscribers; | 575 bool firstSubscriber = !_hasSubscribers; |
| 576 _InternalLinkList.add(this, listener); | 576 _InternalLinkList.add(this, listener); |
| 577 if (firstSubscriber) { | 577 if (firstSubscriber) { |
| 578 _onSubscriptionStateChange(); | 578 _onSubscriptionStateChange(); |
| 579 } | 579 } |
| 580 } | 580 } |
| 581 | 581 |
| 582 /** | 582 /** |
| 583 * Handle an unsubscription requested from a [_StreamListener]. | 583 * Handle a cancel requested from a [_StreamListener]. |
| 584 * | 584 * |
| 585 * This method is called from [_StreamListener.unsubscribe]. | 585 * This method is called from [_StreamListener.cancel]. |
| 586 * | 586 * |
| 587 * If an event is currently firing, the unsubscription is delayed | 587 * If an event is currently firing, the cancel is delayed |
| 588 * until after the event has been sent to all subscribers. | 588 * until after the subscribers have received the event. |
| 589 */ | 589 */ |
| 590 void _unsubscribe(_StreamListener listener) { | 590 void _cancel(_StreamListener listener) { |
| 591 assert(identical(listener._source, this)); | 591 assert(identical(listener._source, this)); |
| 592 if (_InternalLink.isUnlinked(listener)) { | 592 if (_InternalLink.isUnlinked(listener)) { |
| 593 // You may unsubscribe more than once, only the first one counts. | 593 // You may unsubscribe more than once, only the first one counts. |
| 594 return; | 594 return; |
| 595 } | 595 } |
| 596 if (_isFiring) { | 596 if (_isFiring) { |
| 597 if (listener._needsEvent(_currentEventIdBit)) { | 597 if (listener._needsEvent(_currentEventIdBit)) { |
| 598 assert(listener._isSubscribed); | 598 assert(listener._isSubscribed); |
| 599 listener._setPendingUnsubscribe(); | 599 listener._setPendingUnsubscribe(); |
| 600 } else { | 600 } else { |
| (...skipping 65 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 666 if (handleDone == null) handleDone = _nullDoneHandler; | 666 if (handleDone == null) handleDone = _nullDoneHandler; |
| 667 _onDone = handleDone; | 667 _onDone = handleDone; |
| 668 } | 668 } |
| 669 | 669 |
| 670 void _sendData(T data) { | 670 void _sendData(T data) { |
| 671 _onData(data); | 671 _onData(data); |
| 672 } | 672 } |
| 673 | 673 |
| 674 void _sendError(AsyncError error) { | 674 void _sendError(AsyncError error) { |
| 675 _onError(error); | 675 _onError(error); |
| 676 if (_unsubscribeOnError) _source._unsubscribe(this); | 676 if (_unsubscribeOnError) _source._cancel(this); |
| 677 } | 677 } |
| 678 | 678 |
| 679 void _sendDone() { | 679 void _sendDone() { |
| 680 _onDone(); | 680 _onDone(); |
| 681 } | 681 } |
| 682 | 682 |
| 683 void unsubscribe() { | 683 void cancel() { |
| 684 _source._unsubscribe(this); | 684 _source._cancel(this); |
| 685 } | 685 } |
| 686 | 686 |
| 687 void pause([Signal resumeSignal]) { | 687 void pause([Signal resumeSignal]) { |
| 688 _source._pause(this, resumeSignal); | 688 _source._pause(this, resumeSignal); |
| 689 } | 689 } |
| 690 | 690 |
| 691 void resume() { | 691 void resume() { |
| 692 if (!isPaused) { | 692 if (!isPaused) { |
| 693 throw new StateError("Resuming unpaused subscription"); | 693 throw new StateError("Resuming unpaused subscription"); |
| 694 } | 694 } |
| (...skipping 283 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 978 bool get _isComplete => _timer == null && _pauseCount == 0; | 978 bool get _isComplete => _timer == null && _pauseCount == 0; |
| 979 | 979 |
| 980 void onData(void handleAction(T value)) {} | 980 void onData(void handleAction(T value)) {} |
| 981 void onError(void handleError(StateError error)) {} | 981 void onError(void handleError(StateError error)) {} |
| 982 void onDone(void handleDone(T value)) { | 982 void onDone(void handleDone(T value)) { |
| 983 _handler = handleDone; | 983 _handler = handleDone; |
| 984 } | 984 } |
| 985 | 985 |
| 986 void pause([Signal signal]) { | 986 void pause([Signal signal]) { |
| 987 if (_isComplete) { | 987 if (_isComplete) { |
| 988 throw new StateError("Subscription has been unsubscribed."); | 988 throw new StateError("Subscription has been canceled."); |
| 989 } | 989 } |
| 990 if (_timer != null) _timer.cancel(); | 990 if (_timer != null) _timer.cancel(); |
| 991 _pauseCount++; | 991 _pauseCount++; |
| 992 } | 992 } |
| 993 | 993 |
| 994 void resume() { | 994 void resume() { |
| 995 if (_isComplete) { | 995 if (_isComplete) { |
| 996 throw new StateError("Subscription has been unsubscribed."); | 996 throw new StateError("Subscription has been canceled."); |
| 997 } | 997 } |
| 998 if (_pauseCount == 0) return; | 998 if (_pauseCount == 0) return; |
| 999 _pauseCount--; | 999 _pauseCount--; |
| 1000 if (_pauseCount == 0) { | 1000 if (_pauseCount == 0) { |
| 1001 _delayDone(); | 1001 _delayDone(); |
| 1002 } | 1002 } |
| 1003 } | 1003 } |
| 1004 | 1004 |
| 1005 bool get isPaused => _pauseCount > 0; | 1005 bool get isPaused => _pauseCount > 0; |
| 1006 | 1006 |
| 1007 void unsubscribe() { | 1007 void cancel() { |
| 1008 if (_isComplete) { | 1008 if (_isComplete) { |
| 1009 throw new StateError("Subscription has been unsubscribed."); | 1009 throw new StateError("Subscription has been canceled."); |
| 1010 } | 1010 } |
| 1011 if (_timer != null) { | 1011 if (_timer != null) { |
| 1012 _timer.cancel(); | 1012 _timer.cancel(); |
| 1013 _timer = null; | 1013 _timer = null; |
| 1014 } | 1014 } |
| 1015 _pauseCount = 0; | 1015 _pauseCount = 0; |
| 1016 } | 1016 } |
| 1017 } | 1017 } |
| OLD | NEW |