OLD | NEW |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 // States shared by single/multi stream implementations. | 7 // States shared by single/multi stream implementations. |
8 | 8 |
9 /// Initial and default state where the stream can receive and send events. | 9 /// Initial and default state where the stream can receive and send events. |
10 const int _STREAM_OPEN = 0; | 10 const int _STREAM_OPEN = 0; |
(...skipping 186 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
197 _state |= _STREAM_CLOSED; | 197 _state |= _STREAM_CLOSED; |
198 } | 198 } |
199 | 199 |
200 void _setComplete() { | 200 void _setComplete() { |
201 assert(_isClosed); | 201 assert(_isClosed); |
202 _state = _state |_STREAM_COMPLETE; | 202 _state = _state |_STREAM_COMPLETE; |
203 } | 203 } |
204 | 204 |
205 void _startFiring() { | 205 void _startFiring() { |
206 assert(!_isFiring); | 206 assert(!_isFiring); |
| 207 assert(_hasSubscribers); |
| 208 assert(!_isPaused); |
207 // This sets the _STREAM_FIRING bit and toggles the _STREAM_EVENT_ID | 209 // This sets the _STREAM_FIRING bit and toggles the _STREAM_EVENT_ID |
208 // bit. All current subscribers will now have a _LISTENER_EVENT_ID | 210 // bit. All current subscribers will now have a _LISTENER_EVENT_ID |
209 // that doesn't match _STREAM_EVENT_ID, and they will receive the | 211 // that doesn't match _STREAM_EVENT_ID, and they will receive the |
210 // event being fired. | 212 // event being fired. |
211 _state ^= _STREAM_FIRING | _STREAM_EVENT_ID; | 213 _state ^= _STREAM_FIRING | _STREAM_EVENT_ID; |
212 } | 214 } |
213 | 215 |
214 void _endFiring() { | 216 void _endFiring() { |
215 assert(_isFiring); | 217 assert(_isFiring); |
216 _state ^= _STREAM_FIRING; | 218 _state ^= _STREAM_FIRING; |
| 219 if (_isPaused) _onPauseStateChange(); |
| 220 if (!_hasSubscribers) _onSubscriptionStateChange(); |
217 } | 221 } |
218 | 222 |
219 /** | 223 /** |
220 * Record that a listener wants a pause from events. | 224 * Record that a listener wants a pause from events. |
221 * | 225 * |
222 * This methods is called from [_StreamListener.pause()]. | 226 * This methods is called from [_StreamListener.pause()]. |
223 * Subclasses can override this method, along with [isPaused] and | 227 * Subclasses can override this method, along with [isPaused] and |
224 * [createSubscription], if they want to do a different handling of paused | 228 * [createSubscription], if they want to do a different handling of paused |
225 * subscriptions, e.g., a filtering stream pausing its own source if all its | 229 * subscriptions, e.g., a filtering stream pausing its own source if all its |
226 * subscribers are paused. | 230 * subscribers are paused. |
227 */ | 231 */ |
228 void _pause(_StreamListener<T> listener, Future resumeSignal) { | 232 void _pause(_StreamListener<T> listener, Future resumeSignal) { |
229 assert(identical(listener._source, this)); | 233 assert(identical(listener._source, this)); |
230 if (!listener._isSubscribed) { | 234 if (!listener._isSubscribed) { |
231 throw new StateError("Subscription has been canceled."); | 235 throw new StateError("Subscription has been canceled."); |
232 } | 236 } |
233 assert(!_isComplete); // There can be no subscribers when complete. | 237 assert(!_isComplete); // There can be no subscribers when complete. |
234 bool wasPaused = _isPaused; | 238 bool wasPaused = _isPaused; |
235 _incrementPauseCount(listener); | 239 _incrementPauseCount(listener); |
236 if (resumeSignal != null) { | 240 if (resumeSignal != null) { |
237 resumeSignal.whenComplete(() { this._resume(listener, true); }); | 241 resumeSignal.whenComplete(() { this._resume(listener, true); }); |
238 } | 242 } |
239 if (!wasPaused) { | 243 if (!wasPaused && !_isFiring) { |
240 _onPauseStateChange(); | 244 _onPauseStateChange(); |
241 } | 245 } |
242 } | 246 } |
243 | 247 |
244 /** Stops pausing due to one request from the given listener. */ | 248 /** Stops pausing due to one request from the given listener. */ |
245 void _resume(_StreamListener<T> listener, bool fromEvent) { | 249 void _resume(_StreamListener<T> listener, bool fromEvent) { |
246 if (!listener.isPaused) return; | 250 if (!listener.isPaused) return; |
247 assert(listener._isSubscribed); | 251 assert(listener._isSubscribed); |
248 assert(_isPaused); | 252 assert(_isPaused); |
249 _decrementPauseCount(listener); | 253 _decrementPauseCount(listener); |
250 if (!_isPaused) { | 254 if (!_isPaused) { |
251 _onPauseStateChange(); | 255 if (!_isFiring) _onPauseStateChange(); |
252 if (_hasPendingEvent) { | 256 if (_hasPendingEvent) { |
253 // If we can fire events now, fire any pending events right away. | 257 // If we can fire events now, fire any pending events right away. |
254 if (fromEvent && !_isFiring) { | 258 if (fromEvent && !_isFiring) { |
255 _handlePendingEvents(); | 259 _handlePendingEvents(); |
256 } else { | 260 } else { |
257 _pendingEvents.schedule(this); | 261 _schedulePendingEvents(); |
258 } | 262 } |
259 } | 263 } |
260 } | 264 } |
261 } | 265 } |
262 | 266 |
| 267 /** Schedule pending events to be executed. */ |
| 268 void _schedulePendingEvents() { |
| 269 assert(_hasPendingEvent); |
| 270 _pendingEvents.schedule(this); |
| 271 } |
| 272 |
263 /** Create a subscription object. Called by [subcribe]. */ | 273 /** Create a subscription object. Called by [subcribe]. */ |
264 _StreamSubscriptionImpl<T> _createSubscription( | 274 _StreamSubscriptionImpl<T> _createSubscription( |
265 void onData(T data), | 275 void onData(T data), |
266 void onError(AsyncError error), | 276 void onError(AsyncError error), |
267 void onDone(), | 277 void onDone(), |
268 bool unsubscribeOnError); | 278 bool unsubscribeOnError); |
269 | 279 |
270 /** | 280 /** |
271 * Adds a listener to this stream. | 281 * Adds a listener to this stream. |
272 */ | 282 */ |
(...skipping 103 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
376 _cancel(subscriber); | 386 _cancel(subscriber); |
377 try { | 387 try { |
378 subscriber._sendDone(); | 388 subscriber._sendDone(); |
379 } on AsyncError catch (e) { | 389 } on AsyncError catch (e) { |
380 e.throwDelayed(); | 390 e.throwDelayed(); |
381 } catch (e, s) { | 391 } catch (e, s) { |
382 new AsyncError(e, s).throwDelayed(); | 392 new AsyncError(e, s).throwDelayed(); |
383 } | 393 } |
384 }); | 394 }); |
385 assert(!_hasSubscribers); | 395 assert(!_hasSubscribers); |
386 _onSubscriptionStateChange(); | |
387 } | 396 } |
388 } | 397 } |
389 | 398 |
390 // ------------------------------------------------------------------- | 399 // ------------------------------------------------------------------- |
391 // Default implementation of a stream with a single subscriber. | 400 // Default implementation of a stream with a single subscriber. |
392 // ------------------------------------------------------------------- | 401 // ------------------------------------------------------------------- |
393 /** | 402 /** |
394 * Default implementation of stream capable of sending events to one subscriber. | 403 * Default implementation of stream capable of sending events to one subscriber. |
395 * | 404 * |
396 * Any class needing to implement [Stream] can either directly extend this | 405 * Any class needing to implement [Stream] can either directly extend this |
(...skipping 23 matching lines...) Expand all Loading... |
420 class _SingleStreamImpl<T> extends _StreamImpl<T> { | 429 class _SingleStreamImpl<T> extends _StreamImpl<T> { |
421 _StreamListener _subscriber = null; | 430 _StreamListener _subscriber = null; |
422 | 431 |
423 Stream<T> asMultiSubscriberStream() { | 432 Stream<T> asMultiSubscriberStream() { |
424 return new _SingleStreamMultiplexer<T>(this); | 433 return new _SingleStreamMultiplexer<T>(this); |
425 } | 434 } |
426 | 435 |
427 bool get isSingleSubscription => true; | 436 bool get isSingleSubscription => true; |
428 | 437 |
429 /** Whether one or more active subscribers have requested a pause. */ | 438 /** Whether one or more active subscribers have requested a pause. */ |
430 bool get _isPaused => !_hasSubscribers || super._isPaused; | 439 bool get _isPaused => (!_hasSubscribers && !_isClosed) || super._isPaused; |
431 | 440 |
432 /** Whether there is currently a subscriber on this [Stream]. */ | 441 /** Whether there is currently a subscriber on this [Stream]. */ |
433 bool get _hasSubscribers => _subscriber != null; | 442 bool get _hasSubscribers => _subscriber != null; |
434 | 443 |
435 // ------------------------------------------------------------------- | 444 // ------------------------------------------------------------------- |
436 // Internal implementation. | 445 // Internal implementation. |
437 | 446 |
438 /** | 447 /** |
439 * Create the new subscription object. | 448 * Create the new subscription object. |
440 */ | 449 */ |
441 _StreamSubscriptionImpl<T> _createSubscription( | 450 _StreamSubscriptionImpl<T> _createSubscription( |
442 void onData(T data), | 451 void onData(T data), |
443 void onError(AsyncError error), | 452 void onError(AsyncError error), |
444 void onDone(), | 453 void onDone(), |
445 bool unsubscribeOnError) { | 454 bool unsubscribeOnError) { |
446 return new _StreamSubscriptionImpl<T>( | 455 return new _StreamSubscriptionImpl<T>( |
447 this, onData, onError, onDone, unsubscribeOnError); | 456 this, onData, onError, onDone, unsubscribeOnError); |
448 } | 457 } |
449 | 458 |
450 void _addListener(_StreamListener subscription) { | 459 void _addListener(_StreamListener subscription) { |
451 if (_hasSubscribers) { | 460 if (_hasSubscribers) { |
452 throw new StateError("Stream already has subscriber."); | 461 throw new StateError("Stream already has subscriber."); |
453 } | 462 } |
454 _subscriber = subscription; | 463 _subscriber = subscription; |
455 subscription._setSubscribed(0); | 464 subscription._setSubscribed(0); |
456 _onSubscriptionStateChange(); | 465 _onSubscriptionStateChange(); |
457 if (_hasPendingEvent) { | 466 if (_hasPendingEvent) { |
458 new Timer(0, (_) { | 467 _schedulePendingEvents(); |
459 _handlePendingEvents(); | |
460 }); | |
461 } | 468 } |
462 } | 469 } |
463 | 470 |
464 /** | 471 /** |
465 * Handle a cancel requested from a [_StreamSubscriptionImpl]. | 472 * Handle a cancel requested from a [_StreamSubscriptionImpl]. |
466 * | 473 * |
467 * This method is called from [_StreamSubscriptionImpl.cancel]. | 474 * This method is called from [_StreamSubscriptionImpl.cancel]. |
468 * | 475 * |
469 * If an event is currently firing, the cancel is delayed | 476 * If an event is currently firing, the cancel is delayed |
470 * until after the subscriber has received the event. | 477 * until after the subscriber has received the event. |
471 */ | 478 */ |
472 void _cancel(_StreamListener subscriber) { | 479 void _cancel(_StreamListener subscriber) { |
473 assert(identical(subscriber._source, this)); | 480 assert(identical(subscriber._source, this)); |
474 // We allow unsubscribing the currently firing subscription during | 481 // We allow unsubscribing the currently firing subscription during |
475 // the event firing, because it is indistinguishable from delaying it since | 482 // the event firing, because it is indistinguishable from delaying it since |
476 // that event has already received the event. | 483 // that event has already received the event. |
477 if (!identical(_subscriber, subscriber)) { | 484 if (!identical(_subscriber, subscriber)) { |
478 // You may unsubscribe more than once, only the first one counts. | 485 // You may unsubscribe more than once, only the first one counts. |
479 return; | 486 return; |
480 } | 487 } |
481 _subscriber = null; | 488 _subscriber = null; |
482 int timesPaused = subscriber._setUnsubscribed(); | 489 // Unsubscribing a paused subscription also cancels its pauses. |
483 _updatePauseCount(-timesPaused); | 490 int subscriptionPauseCount = subscriber._setUnsubscribed(); |
484 if (timesPaused > 0) { | 491 _updatePauseCount(-subscriptionPauseCount); |
485 _onPauseStateChange(); | 492 if (!_isFiring) { |
| 493 if (subscriptionPauseCount > 0) { |
| 494 _onPauseStateChange(); |
| 495 } |
| 496 _onSubscriptionStateChange(); |
486 } | 497 } |
487 _onSubscriptionStateChange(); | |
488 } | 498 } |
489 | 499 |
490 void _forEachSubscriber( | 500 void _forEachSubscriber( |
491 void action(_StreamListener<T> subscription)) { | 501 void action(_StreamListener<T> subscription)) { |
| 502 assert(!_isPaused); |
492 _StreamListener subscription = _subscriber; | 503 _StreamListener subscription = _subscriber; |
493 assert(subscription != null); | 504 assert(subscription != null); |
494 _startFiring(); | 505 _startFiring(); |
495 action(subscription); | 506 action(subscription); |
496 _endFiring(); | 507 _endFiring(); |
497 } | 508 } |
498 } | 509 } |
499 | 510 |
500 // ------------------------------------------------------------------- | 511 // ------------------------------------------------------------------- |
501 // Default implementation of a stream with subscribers. | 512 // Default implementation of a stream with subscribers. |
(...skipping 85 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
587 action(current); | 598 action(current); |
588 // Marks as having received the event. | 599 // Marks as having received the event. |
589 current._toggleEventReceived(); | 600 current._toggleEventReceived(); |
590 } | 601 } |
591 cursor = current._nextLink; | 602 cursor = current._nextLink; |
592 if (current._isPendingUnsubscribe) { | 603 if (current._isPendingUnsubscribe) { |
593 _removeListener(current); | 604 _removeListener(current); |
594 } | 605 } |
595 } | 606 } |
596 _endFiring(); | 607 _endFiring(); |
597 if (_isPaused) _onPauseStateChange(); | |
598 if (!_hasSubscribers) _onSubscriptionStateChange(); | |
599 } | 608 } |
600 | 609 |
601 void _addListener(_StreamListener listener) { | 610 void _addListener(_StreamListener listener) { |
602 listener._setSubscribed(_currentEventIdBit); | 611 listener._setSubscribed(_currentEventIdBit); |
603 bool firstSubscriber = !_hasSubscribers; | 612 bool firstSubscriber = !_hasSubscribers; |
604 _InternalLinkList.add(this, listener); | 613 _InternalLinkList.add(this, listener); |
605 if (firstSubscriber) { | 614 if (firstSubscriber) { |
606 _onSubscriptionStateChange(); | 615 _onSubscriptionStateChange(); |
607 } | 616 } |
608 } | 617 } |
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
652 _InternalLinkList.remove(listener); | 661 _InternalLinkList.remove(listener); |
653 } | 662 } |
654 } | 663 } |
655 | 664 |
656 | 665 |
657 /** Abstract superclass for streams that generate their own events. */ | 666 /** Abstract superclass for streams that generate their own events. */ |
658 abstract class _GeneratedSingleStreamImpl<T> extends _SingleStreamImpl<T> { | 667 abstract class _GeneratedSingleStreamImpl<T> extends _SingleStreamImpl<T> { |
659 bool _isHandlingPendingEvents = false; | 668 bool _isHandlingPendingEvents = false; |
660 bool get _hasPendingEvent => !_isClosed; | 669 bool get _hasPendingEvent => !_isClosed; |
661 | 670 |
| 671 void _schedulePendingEvents() { |
| 672 if (_pendingEvents != null) { |
| 673 _pendingEvents.schedule(this); |
| 674 } else { |
| 675 // In the case where there only pending events are generated ones. |
| 676 new Timer(0, (_) { _handlePendingEvents(); }); |
| 677 } |
| 678 } |
| 679 |
662 /** | 680 /** |
663 * Generate one (or possibly more) new events. | 681 * Generate one (or possibly more) new events. |
664 * | 682 * |
665 * The events should be added to the stream using [_add], [_signalError] and | 683 * The events should be added to the stream using [_add], [_signalError] and |
666 * [_close]. | 684 * [_close]. |
667 */ | 685 */ |
668 void _generateNextEvent(); | 686 void _generateNextEvent(); |
669 | 687 |
670 void _handlePendingEvents() { | 688 void _handlePendingEvents() { |
671 // Avoid reentry from _add/_signalError/_close potentially called | 689 // Avoid reentry from _add/_signalError/_close potentially called |
(...skipping 455 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1127 onError: this._signalError, | 1145 onError: this._signalError, |
1128 onDone: this._close); | 1146 onDone: this._close); |
1129 } else { | 1147 } else { |
1130 // TODO(lrn): Check why this can happen. | 1148 // TODO(lrn): Check why this can happen. |
1131 if (_subscription == null) return; | 1149 if (_subscription == null) return; |
1132 _subscription.cancel(); | 1150 _subscription.cancel(); |
1133 _subscription = null; | 1151 _subscription = null; |
1134 } | 1152 } |
1135 } | 1153 } |
1136 } | 1154 } |
OLD | NEW |