Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 // States shared by single/multi stream implementations. | 7 // States shared by single/multi stream implementations. |
| 8 | 8 |
| 9 /// Initial and default state where the stream can receive and send events. | 9 /// Initial and default state where the stream can receive and send events. |
| 10 const int _STREAM_OPEN = 0; | 10 const int _STREAM_OPEN = 0; |
| (...skipping 186 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 197 _state |= _STREAM_CLOSED; | 197 _state |= _STREAM_CLOSED; |
| 198 } | 198 } |
| 199 | 199 |
| 200 void _setComplete() { | 200 void _setComplete() { |
| 201 assert(_isClosed); | 201 assert(_isClosed); |
| 202 _state = _state |_STREAM_COMPLETE; | 202 _state = _state |_STREAM_COMPLETE; |
| 203 } | 203 } |
| 204 | 204 |
| 205 void _startFiring() { | 205 void _startFiring() { |
| 206 assert(!_isFiring); | 206 assert(!_isFiring); |
| 207 assert(_hasSubscribers); | |
| 208 assert(!_isPaused); | |
| 207 // This sets the _STREAM_FIRING bit and toggles the _STREAM_EVENT_ID | 209 // This sets the _STREAM_FIRING bit and toggles the _STREAM_EVENT_ID |
| 208 // bit. All current subscribers will now have a _LISTENER_EVENT_ID | 210 // bit. All current subscribers will now have a _LISTENER_EVENT_ID |
| 209 // that doesn't match _STREAM_EVENT_ID, and they will receive the | 211 // that doesn't match _STREAM_EVENT_ID, and they will receive the |
| 210 // event being fired. | 212 // event being fired. |
| 211 _state ^= _STREAM_FIRING | _STREAM_EVENT_ID; | 213 _state ^= _STREAM_FIRING | _STREAM_EVENT_ID; |
| 212 } | 214 } |
| 213 | 215 |
| 214 void _endFiring() { | 216 void _endFiring() { |
| 215 assert(_isFiring); | 217 assert(_isFiring); |
| 216 _state ^= _STREAM_FIRING; | 218 _state ^= _STREAM_FIRING; |
| 219 if (_isPaused) _onPauseStateChange(); | |
| 220 if (!_hasSubscribers) _onSubscriptionStateChange(); | |
| 217 } | 221 } |
| 218 | 222 |
| 219 /** | 223 /** |
| 220 * Record that a listener wants a pause from events. | 224 * Record that a listener wants a pause from events. |
| 221 * | 225 * |
| 222 * This methods is called from [_StreamListener.pause()]. | 226 * This methods is called from [_StreamListener.pause()]. |
| 223 * Subclasses can override this method, along with [isPaused] and | 227 * Subclasses can override this method, along with [isPaused] and |
| 224 * [createSubscription], if they want to do a different handling of paused | 228 * [createSubscription], if they want to do a different handling of paused |
| 225 * subscriptions, e.g., a filtering stream pausing its own source if all its | 229 * subscriptions, e.g., a filtering stream pausing its own source if all its |
| 226 * subscribers are paused. | 230 * subscribers are paused. |
| 227 */ | 231 */ |
| 228 void _pause(_StreamListener<T> listener, Future resumeSignal) { | 232 void _pause(_StreamListener<T> listener, Future resumeSignal) { |
| 229 assert(identical(listener._source, this)); | 233 assert(identical(listener._source, this)); |
| 230 if (!listener._isSubscribed) { | 234 if (!listener._isSubscribed) { |
| 231 throw new StateError("Subscription has been canceled."); | 235 throw new StateError("Subscription has been canceled."); |
| 232 } | 236 } |
| 233 assert(!_isComplete); // There can be no subscribers when complete. | 237 assert(!_isComplete); // There can be no subscribers when complete. |
| 234 bool wasPaused = _isPaused; | 238 bool wasPaused = _isPaused; |
| 235 _incrementPauseCount(listener); | 239 _incrementPauseCount(listener); |
| 236 if (resumeSignal != null) { | 240 if (resumeSignal != null) { |
| 237 resumeSignal.whenComplete(() { this._resume(listener, true); }); | 241 resumeSignal.whenComplete(() { this._resume(listener, true); }); |
| 238 } | 242 } |
| 239 if (!wasPaused) { | 243 if (!wasPaused && !_isFiring) { |
| 240 _onPauseStateChange(); | 244 _onPauseStateChange(); |
| 241 } | 245 } |
| 242 } | 246 } |
| 243 | 247 |
| 244 /** Stops pausing due to one request from the given listener. */ | 248 /** Stops pausing due to one request from the given listener. */ |
| 245 void _resume(_StreamListener<T> listener, bool fromEvent) { | 249 void _resume(_StreamListener<T> listener, bool fromEvent) { |
| 246 if (!listener.isPaused) return; | 250 if (!listener.isPaused) return; |
| 247 assert(listener._isSubscribed); | 251 assert(listener._isSubscribed); |
| 248 assert(_isPaused); | 252 assert(_isPaused); |
| 249 _decrementPauseCount(listener); | 253 _decrementPauseCount(listener); |
| 250 if (!_isPaused) { | 254 if (!_isPaused) { |
| 251 _onPauseStateChange(); | 255 if (!_isFiring) _onPauseStateChange(); |
| 252 if (_hasPendingEvent) { | 256 if (_hasPendingEvent) { |
| 253 // If we can fire events now, fire any pending events right away. | 257 // If we can fire events now, fire any pending events right away. |
| 254 if (fromEvent && !_isFiring) { | 258 if (fromEvent && !_isFiring) { |
| 255 _handlePendingEvents(); | 259 _handlePendingEvents(); |
| 256 } else { | 260 } else { |
| 257 _pendingEvents.schedule(this); | 261 _schedulePendingEvents(); |
| 258 } | 262 } |
| 259 } | 263 } |
| 260 } | 264 } |
| 261 } | 265 } |
| 262 | 266 |
| 267 /** Schedule pending events to be executed. */ | |
| 268 void _schedulePendingEvents() { | |
| 269 assert(_hasPendingEvent); | |
| 270 _pendingEvents.schedule(this); | |
| 271 } | |
| 272 | |
| 263 /** Create a subscription object. Called by [subcribe]. */ | 273 /** Create a subscription object. Called by [subcribe]. */ |
| 264 _StreamSubscriptionImpl<T> _createSubscription( | 274 _StreamSubscriptionImpl<T> _createSubscription( |
| 265 void onData(T data), | 275 void onData(T data), |
| 266 void onError(AsyncError error), | 276 void onError(AsyncError error), |
| 267 void onDone(), | 277 void onDone(), |
| 268 bool unsubscribeOnError); | 278 bool unsubscribeOnError); |
| 269 | 279 |
| 270 /** | 280 /** |
| 271 * Adds a listener to this stream. | 281 * Adds a listener to this stream. |
| 272 */ | 282 */ |
| (...skipping 103 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 376 _cancel(subscriber); | 386 _cancel(subscriber); |
| 377 try { | 387 try { |
| 378 subscriber._sendDone(); | 388 subscriber._sendDone(); |
| 379 } on AsyncError catch (e) { | 389 } on AsyncError catch (e) { |
| 380 e.throwDelayed(); | 390 e.throwDelayed(); |
| 381 } catch (e, s) { | 391 } catch (e, s) { |
| 382 new AsyncError(e, s).throwDelayed(); | 392 new AsyncError(e, s).throwDelayed(); |
| 383 } | 393 } |
| 384 }); | 394 }); |
| 385 assert(!_hasSubscribers); | 395 assert(!_hasSubscribers); |
| 386 _onSubscriptionStateChange(); | |
| 387 } | 396 } |
| 388 } | 397 } |
| 389 | 398 |
| 390 // ------------------------------------------------------------------- | 399 // ------------------------------------------------------------------- |
| 391 // Default implementation of a stream with a single subscriber. | 400 // Default implementation of a stream with a single subscriber. |
| 392 // ------------------------------------------------------------------- | 401 // ------------------------------------------------------------------- |
| 393 /** | 402 /** |
| 394 * Default implementation of stream capable of sending events to one subscriber. | 403 * Default implementation of stream capable of sending events to one subscriber. |
| 395 * | 404 * |
| 396 * Any class needing to implement [Stream] can either directly extend this | 405 * Any class needing to implement [Stream] can either directly extend this |
| (...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 446 } | 455 } |
| 447 | 456 |
| 448 void _addListener(_StreamListener subscription) { | 457 void _addListener(_StreamListener subscription) { |
| 449 if (_hasSubscribers) { | 458 if (_hasSubscribers) { |
| 450 throw new StateError("Stream already has subscriber."); | 459 throw new StateError("Stream already has subscriber."); |
| 451 } | 460 } |
| 452 _subscriber = subscription; | 461 _subscriber = subscription; |
| 453 subscription._setSubscribed(0); | 462 subscription._setSubscribed(0); |
| 454 _onSubscriptionStateChange(); | 463 _onSubscriptionStateChange(); |
| 455 if (_hasPendingEvent) { | 464 if (_hasPendingEvent) { |
| 456 new Timer(0, (_) { | 465 _schedulePendingEvents(); |
| 457 _handlePendingEvents(); | |
| 458 }); | |
| 459 } | 466 } |
| 460 } | 467 } |
| 461 | 468 |
| 462 /** | 469 /** |
| 463 * Handle a cancel requested from a [_StreamSubscriptionImpl]. | 470 * Handle a cancel requested from a [_StreamSubscriptionImpl]. |
| 464 * | 471 * |
| 465 * This method is called from [_StreamSubscriptionImpl.cancel]. | 472 * This method is called from [_StreamSubscriptionImpl.cancel]. |
| 466 * | 473 * |
| 467 * If an event is currently firing, the cancel is delayed | 474 * If an event is currently firing, the cancel is delayed |
| 468 * until after the subscriber has received the event. | 475 * until after the subscriber has received the event. |
| 469 */ | 476 */ |
| 470 void _cancel(_StreamListener subscriber) { | 477 void _cancel(_StreamListener subscriber) { |
| 471 assert(identical(subscriber._source, this)); | 478 assert(identical(subscriber._source, this)); |
| 472 // We allow unsubscribing the currently firing subscription during | 479 // We allow unsubscribing the currently firing subscription during |
| 473 // the event firing, because it is indistinguishable from delaying it since | 480 // the event firing, because it is indistinguishable from delaying it since |
| 474 // that event has already received the event. | 481 // that event has already received the event. |
| 475 if (!identical(_subscriber, subscriber)) { | 482 if (!identical(_subscriber, subscriber)) { |
| 476 // You may unsubscribe more than once, only the first one counts. | 483 // You may unsubscribe more than once, only the first one counts. |
| 477 return; | 484 return; |
| 478 } | 485 } |
| 479 _subscriber = null; | 486 _subscriber = null; |
| 480 int timesPaused = subscriber._setUnsubscribed(); | 487 int timesPaused = subscriber._setUnsubscribed(); |
|
floitsch
2013/01/14 16:31:24
Add comment: Unsubscribing a paused subscriber can
Lasse Reichstein Nielsen
2013/01/15 08:52:54
Done.
| |
| 481 _updatePauseCount(-timesPaused); | 488 _updatePauseCount(-timesPaused); |
| 482 if (timesPaused > 0) { | 489 if (!_isFiring) { |
| 483 _onPauseStateChange(); | 490 if (timesPaused > 0) { |
| 491 _onPauseStateChange(); | |
| 492 } | |
| 493 _onSubscriptionStateChange(); | |
| 484 } | 494 } |
| 485 _onSubscriptionStateChange(); | |
| 486 } | 495 } |
| 487 | 496 |
| 488 void _forEachSubscriber( | 497 void _forEachSubscriber( |
| 489 void action(_StreamListener<T> subscription)) { | 498 void action(_StreamListener<T> subscription)) { |
| 499 assert(!_isPaused); | |
| 490 _StreamListener subscription = _subscriber; | 500 _StreamListener subscription = _subscriber; |
| 491 assert(subscription != null); | 501 assert(subscription != null); |
| 492 _startFiring(); | 502 _startFiring(); |
| 493 action(subscription); | 503 action(subscription); |
| 494 _endFiring(); | 504 _endFiring(); |
| 495 } | 505 } |
| 496 } | 506 } |
| 497 | 507 |
| 498 // ------------------------------------------------------------------- | 508 // ------------------------------------------------------------------- |
| 499 // Default implementation of a stream with subscribers. | 509 // Default implementation of a stream with subscribers. |
| (...skipping 83 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 583 action(current); | 593 action(current); |
| 584 // Marks as having received the event. | 594 // Marks as having received the event. |
| 585 current._toggleEventReceived(); | 595 current._toggleEventReceived(); |
| 586 } | 596 } |
| 587 cursor = current._nextLink; | 597 cursor = current._nextLink; |
| 588 if (current._isPendingUnsubscribe) { | 598 if (current._isPendingUnsubscribe) { |
| 589 _removeListener(current); | 599 _removeListener(current); |
| 590 } | 600 } |
| 591 } | 601 } |
| 592 _endFiring(); | 602 _endFiring(); |
| 593 if (_isPaused) _onPauseStateChange(); | |
| 594 if (!_hasSubscribers) _onSubscriptionStateChange(); | |
| 595 } | 603 } |
| 596 | 604 |
| 597 void _addListener(_StreamListener listener) { | 605 void _addListener(_StreamListener listener) { |
| 598 listener._setSubscribed(_currentEventIdBit); | 606 listener._setSubscribed(_currentEventIdBit); |
| 599 bool firstSubscriber = !_hasSubscribers; | 607 bool firstSubscriber = !_hasSubscribers; |
| 600 _InternalLinkList.add(this, listener); | 608 _InternalLinkList.add(this, listener); |
| 601 if (firstSubscriber) { | 609 if (firstSubscriber) { |
| 602 _onSubscriptionStateChange(); | 610 _onSubscriptionStateChange(); |
| 603 } | 611 } |
| 604 } | 612 } |
| (...skipping 19 matching lines...) Expand all Loading... | |
| 624 } else { | 632 } else { |
| 625 // The listener has been notified of the event (or don't need to, | 633 // The listener has been notified of the event (or don't need to, |
| 626 // if it's still pending subscription) so it's safe to remove it. | 634 // if it's still pending subscription) so it's safe to remove it. |
| 627 _removeListener(listener); | 635 _removeListener(listener); |
| 628 } | 636 } |
| 629 // Pause and subscription state changes are reported when we end | 637 // Pause and subscription state changes are reported when we end |
| 630 // firing. | 638 // firing. |
| 631 } else { | 639 } else { |
| 632 bool wasPaused = _isPaused; | 640 bool wasPaused = _isPaused; |
| 633 _removeListener(listener); | 641 _removeListener(listener); |
| 634 if (wasPaused != _isPaused) _onPauseStateChange(); | 642 if (!identical(wasPaused, _isPaused)) _onPauseStateChange(); |
|
floitsch
2013/01/14 16:31:24
why?
Lasse Reichstein Nielsen
2013/01/15 08:52:54
Probably overoptimizing :)
Reverted.
| |
| 635 if (!_hasSubscribers) _onSubscriptionStateChange(); | 643 if (!_hasSubscribers) _onSubscriptionStateChange(); |
| 636 } | 644 } |
| 637 } | 645 } |
| 638 | 646 |
| 639 /** | 647 /** |
| 640 * Removes a listener from this stream and cancels its pauses. | 648 * Removes a listener from this stream and cancels its pauses. |
| 641 * | 649 * |
| 642 * This is a low-level action that doesn't call [_onSubscriptionStateChange]. | 650 * This is a low-level action that doesn't call [_onSubscriptionStateChange]. |
| 643 * or [_onPauseStateChange]. | 651 * or [_onPauseStateChange]. |
| 644 */ | 652 */ |
| 645 void _removeListener(_StreamListener listener) { | 653 void _removeListener(_StreamListener listener) { |
| 646 int pauseCount = listener._setUnsubscribed(); | 654 int pauseCount = listener._setUnsubscribed(); |
| 647 _updatePauseCount(-pauseCount); | 655 _updatePauseCount(-pauseCount); |
| 648 _InternalLinkList.remove(listener); | 656 _InternalLinkList.remove(listener); |
| 649 } | 657 } |
| 650 } | 658 } |
| 651 | 659 |
| 652 | 660 |
| 653 /** Abstract superclass for streams that generate their own events. */ | 661 /** Abstract superclass for streams that generate their own events. */ |
| 654 abstract class _GeneratedSingleStreamImpl<T> extends _SingleStreamImpl<T> { | 662 abstract class _GeneratedSingleStreamImpl<T> extends _SingleStreamImpl<T> { |
| 655 bool _isHandlingPendingEvents = false; | 663 bool _isHandlingPendingEvents = false; |
| 656 bool get _hasPendingEvent => !_isClosed; | 664 bool get _hasPendingEvent => !_isClosed; |
| 657 | 665 |
| 666 void _schedulePendingEvents() { | |
| 667 if (_pendingEvents != null) { | |
| 668 _pendingEvents.schedule(this); | |
| 669 } else { | |
| 670 // In the case where there only pending events are generated ones. | |
|
floitsch
2013/01/14 16:31:24
Don't understand comment.
Lasse Reichstein Nielsen
2013/01/15 08:52:54
It's commenting that in this particular class, it'
floitsch
2013/01/15 14:40:19
Did you commit? The comment still looks the same.
| |
| 671 new Timer(0, (_) { _handlePendingEvents(); }); | |
| 672 } | |
| 673 } | |
| 674 | |
| 658 /** | 675 /** |
| 659 * Generate one (or possibly more) new events. | 676 * Generate one (or possibly more) new events. |
| 660 * | 677 * |
| 661 * The events should be added to the stream using [_add], [_signalError] and | 678 * The events should be added to the stream using [_add], [_signalError] and |
| 662 * [_close]. | 679 * [_close]. |
| 663 */ | 680 */ |
| 664 void _generateNextEvent(); | 681 void _generateNextEvent(); |
| 665 | 682 |
| 666 void _handlePendingEvents() { | 683 void _handlePendingEvents() { |
| 667 // Avoid reentry from _add/_signalError/_close potentially called | 684 // Avoid reentry from _add/_signalError/_close potentially called |
| (...skipping 418 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 1086 if (_isComplete) { | 1103 if (_isComplete) { |
| 1087 throw new StateError("Subscription has been canceled."); | 1104 throw new StateError("Subscription has been canceled."); |
| 1088 } | 1105 } |
| 1089 if (_timer != null) { | 1106 if (_timer != null) { |
| 1090 _timer.cancel(); | 1107 _timer.cancel(); |
| 1091 _timer = null; | 1108 _timer = null; |
| 1092 } | 1109 } |
| 1093 _pauseCount = 0; | 1110 _pauseCount = 0; |
| 1094 } | 1111 } |
| 1095 } | 1112 } |
| OLD | NEW |