Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(370)

Side by Side Diff: sdk/lib/async/stream_impl.dart

Issue 11880019: Avoid the _onSubscriptionStateChange being called twice in some cases. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 7 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « no previous file | sdk/lib/async/stream_pipe.dart » ('j') | sdk/lib/async/stream_pipe.dart » ('J')
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // States shared by single/multi stream implementations. 7 // States shared by single/multi stream implementations.
8 8
9 /// Initial and default state where the stream can receive and send events. 9 /// Initial and default state where the stream can receive and send events.
10 const int _STREAM_OPEN = 0; 10 const int _STREAM_OPEN = 0;
(...skipping 186 matching lines...) Expand 10 before | Expand all | Expand 10 after
197 _state |= _STREAM_CLOSED; 197 _state |= _STREAM_CLOSED;
198 } 198 }
199 199
200 void _setComplete() { 200 void _setComplete() {
201 assert(_isClosed); 201 assert(_isClosed);
202 _state = _state |_STREAM_COMPLETE; 202 _state = _state |_STREAM_COMPLETE;
203 } 203 }
204 204
205 void _startFiring() { 205 void _startFiring() {
206 assert(!_isFiring); 206 assert(!_isFiring);
207 assert(_hasSubscribers);
208 assert(!_isPaused);
207 // This sets the _STREAM_FIRING bit and toggles the _STREAM_EVENT_ID 209 // This sets the _STREAM_FIRING bit and toggles the _STREAM_EVENT_ID
208 // bit. All current subscribers will now have a _LISTENER_EVENT_ID 210 // bit. All current subscribers will now have a _LISTENER_EVENT_ID
209 // that doesn't match _STREAM_EVENT_ID, and they will receive the 211 // that doesn't match _STREAM_EVENT_ID, and they will receive the
210 // event being fired. 212 // event being fired.
211 _state ^= _STREAM_FIRING | _STREAM_EVENT_ID; 213 _state ^= _STREAM_FIRING | _STREAM_EVENT_ID;
212 } 214 }
213 215
214 void _endFiring() { 216 void _endFiring() {
215 assert(_isFiring); 217 assert(_isFiring);
216 _state ^= _STREAM_FIRING; 218 _state ^= _STREAM_FIRING;
219 if (_isPaused) _onPauseStateChange();
220 if (!_hasSubscribers) _onSubscriptionStateChange();
217 } 221 }
218 222
219 /** 223 /**
220 * Record that a listener wants a pause from events. 224 * Record that a listener wants a pause from events.
221 * 225 *
222 * This methods is called from [_StreamListener.pause()]. 226 * This methods is called from [_StreamListener.pause()].
223 * Subclasses can override this method, along with [isPaused] and 227 * Subclasses can override this method, along with [isPaused] and
224 * [createSubscription], if they want to do a different handling of paused 228 * [createSubscription], if they want to do a different handling of paused
225 * subscriptions, e.g., a filtering stream pausing its own source if all its 229 * subscriptions, e.g., a filtering stream pausing its own source if all its
226 * subscribers are paused. 230 * subscribers are paused.
227 */ 231 */
228 void _pause(_StreamListener<T> listener, Future resumeSignal) { 232 void _pause(_StreamListener<T> listener, Future resumeSignal) {
229 assert(identical(listener._source, this)); 233 assert(identical(listener._source, this));
230 if (!listener._isSubscribed) { 234 if (!listener._isSubscribed) {
231 throw new StateError("Subscription has been canceled."); 235 throw new StateError("Subscription has been canceled.");
232 } 236 }
233 assert(!_isComplete); // There can be no subscribers when complete. 237 assert(!_isComplete); // There can be no subscribers when complete.
234 bool wasPaused = _isPaused; 238 bool wasPaused = _isPaused;
235 _incrementPauseCount(listener); 239 _incrementPauseCount(listener);
236 if (resumeSignal != null) { 240 if (resumeSignal != null) {
237 resumeSignal.whenComplete(() { this._resume(listener, true); }); 241 resumeSignal.whenComplete(() { this._resume(listener, true); });
238 } 242 }
239 if (!wasPaused) { 243 if (!wasPaused && !_isFiring) {
240 _onPauseStateChange(); 244 _onPauseStateChange();
241 } 245 }
242 } 246 }
243 247
244 /** Stops pausing due to one request from the given listener. */ 248 /** Stops pausing due to one request from the given listener. */
245 void _resume(_StreamListener<T> listener, bool fromEvent) { 249 void _resume(_StreamListener<T> listener, bool fromEvent) {
246 if (!listener.isPaused) return; 250 if (!listener.isPaused) return;
247 assert(listener._isSubscribed); 251 assert(listener._isSubscribed);
248 assert(_isPaused); 252 assert(_isPaused);
249 _decrementPauseCount(listener); 253 _decrementPauseCount(listener);
250 if (!_isPaused) { 254 if (!_isPaused) {
251 _onPauseStateChange(); 255 if (!_isFiring) _onPauseStateChange();
252 if (_hasPendingEvent) { 256 if (_hasPendingEvent) {
253 // If we can fire events now, fire any pending events right away. 257 // If we can fire events now, fire any pending events right away.
254 if (fromEvent && !_isFiring) { 258 if (fromEvent && !_isFiring) {
255 _handlePendingEvents(); 259 _handlePendingEvents();
256 } else { 260 } else {
257 _pendingEvents.schedule(this); 261 _schedulePendingEvents();
258 } 262 }
259 } 263 }
260 } 264 }
261 } 265 }
262 266
267 /** Schedule pending events to be executed. */
268 void _schedulePendingEvents() {
269 assert(_hasPendingEvent);
270 _pendingEvents.schedule(this);
271 }
272
263 /** Create a subscription object. Called by [subcribe]. */ 273 /** Create a subscription object. Called by [subcribe]. */
264 _StreamSubscriptionImpl<T> _createSubscription( 274 _StreamSubscriptionImpl<T> _createSubscription(
265 void onData(T data), 275 void onData(T data),
266 void onError(AsyncError error), 276 void onError(AsyncError error),
267 void onDone(), 277 void onDone(),
268 bool unsubscribeOnError); 278 bool unsubscribeOnError);
269 279
270 /** 280 /**
271 * Adds a listener to this stream. 281 * Adds a listener to this stream.
272 */ 282 */
(...skipping 103 matching lines...) Expand 10 before | Expand all | Expand 10 after
376 _cancel(subscriber); 386 _cancel(subscriber);
377 try { 387 try {
378 subscriber._sendDone(); 388 subscriber._sendDone();
379 } on AsyncError catch (e) { 389 } on AsyncError catch (e) {
380 e.throwDelayed(); 390 e.throwDelayed();
381 } catch (e, s) { 391 } catch (e, s) {
382 new AsyncError(e, s).throwDelayed(); 392 new AsyncError(e, s).throwDelayed();
383 } 393 }
384 }); 394 });
385 assert(!_hasSubscribers); 395 assert(!_hasSubscribers);
386 _onSubscriptionStateChange();
387 } 396 }
388 } 397 }
389 398
390 // ------------------------------------------------------------------- 399 // -------------------------------------------------------------------
391 // Default implementation of a stream with a single subscriber. 400 // Default implementation of a stream with a single subscriber.
392 // ------------------------------------------------------------------- 401 // -------------------------------------------------------------------
393 /** 402 /**
394 * Default implementation of stream capable of sending events to one subscriber. 403 * Default implementation of stream capable of sending events to one subscriber.
395 * 404 *
396 * Any class needing to implement [Stream] can either directly extend this 405 * Any class needing to implement [Stream] can either directly extend this
(...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after
446 } 455 }
447 456
448 void _addListener(_StreamListener subscription) { 457 void _addListener(_StreamListener subscription) {
449 if (_hasSubscribers) { 458 if (_hasSubscribers) {
450 throw new StateError("Stream already has subscriber."); 459 throw new StateError("Stream already has subscriber.");
451 } 460 }
452 _subscriber = subscription; 461 _subscriber = subscription;
453 subscription._setSubscribed(0); 462 subscription._setSubscribed(0);
454 _onSubscriptionStateChange(); 463 _onSubscriptionStateChange();
455 if (_hasPendingEvent) { 464 if (_hasPendingEvent) {
456 new Timer(0, (_) { 465 _schedulePendingEvents();
457 _handlePendingEvents();
458 });
459 } 466 }
460 } 467 }
461 468
462 /** 469 /**
463 * Handle a cancel requested from a [_StreamSubscriptionImpl]. 470 * Handle a cancel requested from a [_StreamSubscriptionImpl].
464 * 471 *
465 * This method is called from [_StreamSubscriptionImpl.cancel]. 472 * This method is called from [_StreamSubscriptionImpl.cancel].
466 * 473 *
467 * If an event is currently firing, the cancel is delayed 474 * If an event is currently firing, the cancel is delayed
468 * until after the subscriber has received the event. 475 * until after the subscriber has received the event.
469 */ 476 */
470 void _cancel(_StreamListener subscriber) { 477 void _cancel(_StreamListener subscriber) {
471 assert(identical(subscriber._source, this)); 478 assert(identical(subscriber._source, this));
472 // We allow unsubscribing the currently firing subscription during 479 // We allow unsubscribing the currently firing subscription during
473 // the event firing, because it is indistinguishable from delaying it since 480 // the event firing, because it is indistinguishable from delaying it since
474 // that event has already received the event. 481 // that event has already received the event.
475 if (!identical(_subscriber, subscriber)) { 482 if (!identical(_subscriber, subscriber)) {
476 // You may unsubscribe more than once, only the first one counts. 483 // You may unsubscribe more than once, only the first one counts.
477 return; 484 return;
478 } 485 }
479 _subscriber = null; 486 _subscriber = null;
480 int timesPaused = subscriber._setUnsubscribed(); 487 int timesPaused = subscriber._setUnsubscribed();
floitsch 2013/01/14 16:31:24 Add comment: Unsubscribing a paused subscriber can
Lasse Reichstein Nielsen 2013/01/15 08:52:54 Done.
481 _updatePauseCount(-timesPaused); 488 _updatePauseCount(-timesPaused);
482 if (timesPaused > 0) { 489 if (!_isFiring) {
483 _onPauseStateChange(); 490 if (timesPaused > 0) {
491 _onPauseStateChange();
492 }
493 _onSubscriptionStateChange();
484 } 494 }
485 _onSubscriptionStateChange();
486 } 495 }
487 496
488 void _forEachSubscriber( 497 void _forEachSubscriber(
489 void action(_StreamListener<T> subscription)) { 498 void action(_StreamListener<T> subscription)) {
499 assert(!_isPaused);
490 _StreamListener subscription = _subscriber; 500 _StreamListener subscription = _subscriber;
491 assert(subscription != null); 501 assert(subscription != null);
492 _startFiring(); 502 _startFiring();
493 action(subscription); 503 action(subscription);
494 _endFiring(); 504 _endFiring();
495 } 505 }
496 } 506 }
497 507
498 // ------------------------------------------------------------------- 508 // -------------------------------------------------------------------
499 // Default implementation of a stream with subscribers. 509 // Default implementation of a stream with subscribers.
(...skipping 83 matching lines...) Expand 10 before | Expand all | Expand 10 after
583 action(current); 593 action(current);
584 // Marks as having received the event. 594 // Marks as having received the event.
585 current._toggleEventReceived(); 595 current._toggleEventReceived();
586 } 596 }
587 cursor = current._nextLink; 597 cursor = current._nextLink;
588 if (current._isPendingUnsubscribe) { 598 if (current._isPendingUnsubscribe) {
589 _removeListener(current); 599 _removeListener(current);
590 } 600 }
591 } 601 }
592 _endFiring(); 602 _endFiring();
593 if (_isPaused) _onPauseStateChange();
594 if (!_hasSubscribers) _onSubscriptionStateChange();
595 } 603 }
596 604
597 void _addListener(_StreamListener listener) { 605 void _addListener(_StreamListener listener) {
598 listener._setSubscribed(_currentEventIdBit); 606 listener._setSubscribed(_currentEventIdBit);
599 bool firstSubscriber = !_hasSubscribers; 607 bool firstSubscriber = !_hasSubscribers;
600 _InternalLinkList.add(this, listener); 608 _InternalLinkList.add(this, listener);
601 if (firstSubscriber) { 609 if (firstSubscriber) {
602 _onSubscriptionStateChange(); 610 _onSubscriptionStateChange();
603 } 611 }
604 } 612 }
(...skipping 19 matching lines...) Expand all
624 } else { 632 } else {
625 // The listener has been notified of the event (or don't need to, 633 // The listener has been notified of the event (or don't need to,
626 // if it's still pending subscription) so it's safe to remove it. 634 // if it's still pending subscription) so it's safe to remove it.
627 _removeListener(listener); 635 _removeListener(listener);
628 } 636 }
629 // Pause and subscription state changes are reported when we end 637 // Pause and subscription state changes are reported when we end
630 // firing. 638 // firing.
631 } else { 639 } else {
632 bool wasPaused = _isPaused; 640 bool wasPaused = _isPaused;
633 _removeListener(listener); 641 _removeListener(listener);
634 if (wasPaused != _isPaused) _onPauseStateChange(); 642 if (!identical(wasPaused, _isPaused)) _onPauseStateChange();
floitsch 2013/01/14 16:31:24 why?
Lasse Reichstein Nielsen 2013/01/15 08:52:54 Probably overoptimizing :) Reverted.
635 if (!_hasSubscribers) _onSubscriptionStateChange(); 643 if (!_hasSubscribers) _onSubscriptionStateChange();
636 } 644 }
637 } 645 }
638 646
639 /** 647 /**
640 * Removes a listener from this stream and cancels its pauses. 648 * Removes a listener from this stream and cancels its pauses.
641 * 649 *
642 * This is a low-level action that doesn't call [_onSubscriptionStateChange]. 650 * This is a low-level action that doesn't call [_onSubscriptionStateChange].
643 * or [_onPauseStateChange]. 651 * or [_onPauseStateChange].
644 */ 652 */
645 void _removeListener(_StreamListener listener) { 653 void _removeListener(_StreamListener listener) {
646 int pauseCount = listener._setUnsubscribed(); 654 int pauseCount = listener._setUnsubscribed();
647 _updatePauseCount(-pauseCount); 655 _updatePauseCount(-pauseCount);
648 _InternalLinkList.remove(listener); 656 _InternalLinkList.remove(listener);
649 } 657 }
650 } 658 }
651 659
652 660
653 /** Abstract superclass for streams that generate their own events. */ 661 /** Abstract superclass for streams that generate their own events. */
654 abstract class _GeneratedSingleStreamImpl<T> extends _SingleStreamImpl<T> { 662 abstract class _GeneratedSingleStreamImpl<T> extends _SingleStreamImpl<T> {
655 bool _isHandlingPendingEvents = false; 663 bool _isHandlingPendingEvents = false;
656 bool get _hasPendingEvent => !_isClosed; 664 bool get _hasPendingEvent => !_isClosed;
657 665
666 void _schedulePendingEvents() {
667 if (_pendingEvents != null) {
668 _pendingEvents.schedule(this);
669 } else {
670 // In the case where there only pending events are generated ones.
floitsch 2013/01/14 16:31:24 Don't understand comment.
Lasse Reichstein Nielsen 2013/01/15 08:52:54 It's commenting that in this particular class, it'
floitsch 2013/01/15 14:40:19 Did you commit? The comment still looks the same.
671 new Timer(0, (_) { _handlePendingEvents(); });
672 }
673 }
674
658 /** 675 /**
659 * Generate one (or possibly more) new events. 676 * Generate one (or possibly more) new events.
660 * 677 *
661 * The events should be added to the stream using [_add], [_signalError] and 678 * The events should be added to the stream using [_add], [_signalError] and
662 * [_close]. 679 * [_close].
663 */ 680 */
664 void _generateNextEvent(); 681 void _generateNextEvent();
665 682
666 void _handlePendingEvents() { 683 void _handlePendingEvents() {
667 // Avoid reentry from _add/_signalError/_close potentially called 684 // Avoid reentry from _add/_signalError/_close potentially called
(...skipping 418 matching lines...) Expand 10 before | Expand all | Expand 10 after
1086 if (_isComplete) { 1103 if (_isComplete) {
1087 throw new StateError("Subscription has been canceled."); 1104 throw new StateError("Subscription has been canceled.");
1088 } 1105 }
1089 if (_timer != null) { 1106 if (_timer != null) {
1090 _timer.cancel(); 1107 _timer.cancel();
1091 _timer = null; 1108 _timer = null;
1092 } 1109 }
1093 _pauseCount = 0; 1110 _pauseCount = 0;
1094 } 1111 }
1095 } 1112 }
OLDNEW
« no previous file with comments | « no previous file | sdk/lib/async/stream_pipe.dart » ('j') | sdk/lib/async/stream_pipe.dart » ('J')

Powered by Google App Engine
This is Rietveld 408576698