Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
| 8 // Controller for creating and adding events to a stream. | 8 // Controller for creating and adding events to a stream. |
| 9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
| 10 | 10 |
| (...skipping 107 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 118 void onCancel(), | 118 void onCancel(), |
| 119 bool sync: false}) { | 119 bool sync: false}) { |
| 120 return sync | 120 return sync |
| 121 ? new _SyncBroadcastStreamController<T>(onListen, onCancel) | 121 ? new _SyncBroadcastStreamController<T>(onListen, onCancel) |
| 122 : new _AsyncBroadcastStreamController<T>(onListen, onCancel); | 122 : new _AsyncBroadcastStreamController<T>(onListen, onCancel); |
| 123 } | 123 } |
| 124 | 124 |
| 125 /** | 125 /** |
| 126 * Returns a view of this object that only exposes the [EventSink] interface. | 126 * Returns a view of this object that only exposes the [EventSink] interface. |
| 127 */ | 127 */ |
| 128 EventSink<T> get sink; | 128 StreamSink<T> get sink; |
| 129 | 129 |
| 130 /** | 130 /** |
| 131 * Whether the stream is closed for adding more events. | 131 * Whether the stream is closed for adding more events. |
| 132 * | 132 * |
| 133 * If true, the "done" event might not have fired yet, but it has been | 133 * If true, the "done" event might not have fired yet, but it has been |
| 134 * scheduled, and it is too late to add more events. | 134 * scheduled, and it is too late to add more events. |
| 135 */ | 135 */ |
| 136 bool get isClosed; | 136 bool get isClosed; |
| 137 | 137 |
| 138 /** | 138 /** |
| (...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 178 _EventDispatch<T> { | 178 _EventDispatch<T> { |
| 179 static const int _STATE_OPEN = 0; | 179 static const int _STATE_OPEN = 0; |
| 180 static const int _STATE_CANCELLED = 1; | 180 static const int _STATE_CANCELLED = 1; |
| 181 static const int _STATE_CLOSED = 2; | 181 static const int _STATE_CLOSED = 2; |
| 182 | 182 |
| 183 final _NotificationHandler _onListen; | 183 final _NotificationHandler _onListen; |
| 184 final _NotificationHandler _onPause; | 184 final _NotificationHandler _onPause; |
| 185 final _NotificationHandler _onResume; | 185 final _NotificationHandler _onResume; |
| 186 final _NotificationHandler _onCancel; | 186 final _NotificationHandler _onCancel; |
| 187 _StreamImpl<T> _stream; | 187 _StreamImpl<T> _stream; |
| 188 /** | |
| 189 * Cached value returned by [sink]. | |
| 190 * | |
| 191 * Used to pause the stream if necessary. | |
| 192 */ | |
| 193 _ControllerStreamSink _sink; | |
| 188 | 194 |
| 189 // An active subscription on the stream, or null if no subscripton is active. | 195 // An active subscription on the stream, or null if no subscripton is active. |
| 190 _ControllerSubscription<T> _subscription; | 196 _ControllerSubscription<T> _subscription; |
| 191 | 197 |
| 192 // Whether we have sent a "done" event. | 198 // Whether we have sent a "done" event. |
| 193 int _state = _STATE_OPEN; | 199 int _state = _STATE_OPEN; |
| 194 | 200 |
| 195 // Events added to the stream before it has an active subscription. | 201 // Events added to the stream before it has an active subscription. |
| 196 _PendingEvents _pendingEvents = null; | 202 _PendingEvents _pendingEvents = null; |
| 197 | 203 |
| 198 _StreamController(this._onListen, | 204 _StreamController(this._onListen, |
| 199 this._onPause, | 205 this._onPause, |
| 200 this._onResume, | 206 this._onResume, |
| 201 this._onCancel) { | 207 this._onCancel) { |
| 202 _stream = new _ControllerStream<T>(this); | 208 _stream = new _ControllerStream<T>(this); |
| 203 } | 209 } |
| 204 | 210 |
| 205 Stream<T> get stream => _stream; | 211 Stream<T> get stream => _stream; |
| 206 | 212 |
| 207 /** | 213 /** |
| 208 * Returns a view of this object that only exposes the [EventSink] interface. | 214 * Returns a view of this object that only exposes the [EventSink] interface. |
| 209 */ | 215 */ |
| 210 EventSink<T> get sink => new _EventSinkView<T>(this); | 216 StreamSink<T> get sink => |
| 217 (_sink != null) ? _sink | |
| 218 : _sink = new _ControllerStreamSink<T>(this); | |
| 211 | 219 |
| 212 /** | 220 /** |
| 213 * Whether a listener has existed and been cancelled. | 221 * Whether a listener has existed and been cancelled. |
| 214 * | 222 * |
| 215 * After this, adding more events will be ignored. | 223 * After this, adding more events will be ignored. |
| 216 */ | 224 */ |
| 217 bool get _isCancelled => (_state & _STATE_CANCELLED) != 0; | 225 bool get _isCancelled => (_state & _STATE_CANCELLED) != 0; |
| 218 | 226 |
| 219 bool get isClosed => (_state & _STATE_CLOSED) != 0; | 227 bool get isClosed => (_state & _STATE_CLOSED) != 0; |
| 220 | 228 |
| (...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 291 _pendingEvents = null; | 299 _pendingEvents = null; |
| 292 subscription._guardCallback(() { | 300 subscription._guardCallback(() { |
| 293 _runGuarded(_onListen); | 301 _runGuarded(_onListen); |
| 294 }); | 302 }); |
| 295 } | 303 } |
| 296 | 304 |
| 297 void _recordCancel(StreamSubscription<T> subscription) { | 305 void _recordCancel(StreamSubscription<T> subscription) { |
| 298 assert(identical(_subscription, subscription)); | 306 assert(identical(_subscription, subscription)); |
| 299 _subscription = null; | 307 _subscription = null; |
| 300 _state |= _STATE_CANCELLED; | 308 _state |= _STATE_CANCELLED; |
| 309 if (_sink != null) _sink._cancel(); | |
| 301 _runGuarded(_onCancel); | 310 _runGuarded(_onCancel); |
| 302 } | 311 } |
| 303 | 312 |
| 304 void _recordPause(StreamSubscription<T> subscription) { | 313 void _recordPause(StreamSubscription<T> subscription) { |
| 314 if (_sink != null) _sink._pause(); | |
| 305 _runGuarded(_onPause); | 315 _runGuarded(_onPause); |
|
floitsch
2013/06/06 15:08:29
Do we want to call onPause when there is an addStr
| |
| 306 } | 316 } |
| 307 | 317 |
| 308 void _recordResume(StreamSubscription<T> subscription) { | 318 void _recordResume(StreamSubscription<T> subscription) { |
| 319 if (_sink != null) _sink._resume(); | |
| 309 _runGuarded(_onResume); | 320 _runGuarded(_onResume); |
|
floitsch
2013/06/06 15:08:29
ditto.
| |
| 310 } | 321 } |
| 311 } | 322 } |
| 312 | 323 |
| 313 class _SyncStreamController<T> extends _StreamController<T> { | 324 class _SyncStreamController<T> extends _StreamController<T> { |
| 314 _SyncStreamController(void onListen(), | 325 _SyncStreamController(void onListen(), |
| 315 void onPause(), | 326 void onPause(), |
| 316 void onResume(), | 327 void onResume(), |
| 317 void onCancel()) | 328 void onCancel()) |
| 318 : super(onListen, onPause, onResume, onCancel); | 329 : super(onListen, onPause, onResume, onCancel); |
| 319 | 330 |
| (...skipping 168 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 488 final _NotificationHandler _onListen; | 499 final _NotificationHandler _onListen; |
| 489 final _NotificationHandler _onCancel; | 500 final _NotificationHandler _onCancel; |
| 490 | 501 |
| 491 // State of the controller. | 502 // State of the controller. |
| 492 int _state; | 503 int _state; |
| 493 | 504 |
| 494 // Double-linked list of active listeners. | 505 // Double-linked list of active listeners. |
| 495 _BroadcastSubscriptionLink _next; | 506 _BroadcastSubscriptionLink _next; |
| 496 _BroadcastSubscriptionLink _previous; | 507 _BroadcastSubscriptionLink _previous; |
| 497 | 508 |
| 509 // Cached return value of [sink]. Used to cancel a `sink.addStream` | |
|
floitsch
2013/06/06 15:08:29
cancel, pause and resume a `sink.addStream`.
| |
| 510 // when the stream ends. | |
| 511 _ControllerStreamSink _sink; | |
| 512 | |
| 498 _BroadcastStreamController(this._onListen, this._onCancel) | 513 _BroadcastStreamController(this._onListen, this._onCancel) |
| 499 : _state = _STATE_INITIAL { | 514 : _state = _STATE_INITIAL { |
| 500 _next = _previous = this; | 515 _next = _previous = this; |
| 501 } | 516 } |
| 502 | 517 |
| 503 // StreamController interface. | 518 // StreamController interface. |
| 504 | 519 |
| 505 Stream<T> get stream => new _BroadcastStream<T>(this); | 520 Stream<T> get stream => new _BroadcastStream<T>(this); |
| 506 | 521 |
| 507 EventSink<T> get sink => new _EventSinkView<T>(this); | 522 StreamSink<T> get sink => |
| 523 (_sink != null) ? _sink : _sink = new _ControllerStreamSink<T>(this); | |
| 508 | 524 |
| 509 bool get isClosed => (_state & _STATE_CLOSED) != 0; | 525 bool get isClosed => (_state & _STATE_CLOSED) != 0; |
| 510 | 526 |
| 511 /** | 527 /** |
| 512 * A broadcast controller is never paused. | 528 * A broadcast controller is never paused. |
| 513 * | 529 * |
| 514 * Each receiving stream may be paused individually, and they handle their | 530 * Each receiving stream may be paused individually, and they handle their |
| 515 * own buffering. | 531 * own buffering. |
| 516 */ | 532 */ |
| 517 bool get isPaused => false; | 533 bool get isPaused => false; |
| (...skipping 11 matching lines...) Expand all Loading... | |
| 529 /** Adds subscription to linked list of active listeners. */ | 545 /** Adds subscription to linked list of active listeners. */ |
| 530 void _addListener(_BroadcastSubscription<T> subscription) { | 546 void _addListener(_BroadcastSubscription<T> subscription) { |
| 531 _BroadcastSubscriptionLink previous = _previous; | 547 _BroadcastSubscriptionLink previous = _previous; |
| 532 previous._next = subscription; | 548 previous._next = subscription; |
| 533 _previous = subscription._previous; | 549 _previous = subscription._previous; |
| 534 subscription._previous._next = this; | 550 subscription._previous._next = this; |
| 535 subscription._previous = previous; | 551 subscription._previous = previous; |
| 536 subscription._eventState = (_state & _STATE_EVENT_ID); | 552 subscription._eventState = (_state & _STATE_EVENT_ID); |
| 537 } | 553 } |
| 538 | 554 |
| 539 void _removeListener(_BroadcastSubscription<T> subscription) { | 555 bool _removeListener(_BroadcastSubscription<T> subscription) { |
| 540 assert(identical(subscription._controller, this)); | 556 assert(identical(subscription._controller, this)); |
| 541 assert(!identical(subscription._next, subscription)); | 557 assert(!identical(subscription._next, subscription)); |
| 542 subscription._previous._next = subscription._next; | 558 subscription._previous._next = subscription._next; |
| 543 subscription._next._previous = subscription._previous; | 559 subscription._next._previous = subscription._previous; |
| 544 subscription._next = subscription._previous = subscription; | 560 subscription._next = subscription._previous = subscription; |
| 561 return true; | |
| 545 } | 562 } |
| 546 | 563 |
| 547 // _StreamControllerLifecycle interface. | 564 // _StreamControllerLifecycle interface. |
| 548 | 565 |
| 549 void _recordListen(_BroadcastSubscription<T> subscription) { | 566 void _recordListen(_BroadcastSubscription<T> subscription) { |
| 550 _addListener(subscription); | 567 _addListener(subscription); |
| 551 if (identical(_next, _previous)) { | 568 if (identical(_next, _previous)) { |
| 552 // Only one listener, so it must be the first listener. | 569 // Only one listener, so it must be the first listener. |
| 553 _runGuarded(_onListen); | 570 _runGuarded(_onListen); |
| 554 } | 571 } |
| 555 } | 572 } |
| 556 | 573 |
| 557 void _recordCancel(_BroadcastSubscription<T> subscription) { | 574 void _recordCancel(_BroadcastSubscription<T> subscription) { |
| 575 // If already removed by the stream, don't remove it again. | |
| 576 if (identical(subscription._next, subscription)) return; | |
| 558 if (subscription._isFiring) { | 577 if (subscription._isFiring) { |
| 559 subscription._setRemoveAfterFiring(); | 578 subscription._setRemoveAfterFiring(); |
| 560 } else { | 579 } else { |
| 561 _removeListener(subscription); | 580 _removeListener(subscription); |
| 562 // If we are currently firing an event, the empty-check is performed at | 581 // If we are currently firing an event, the empty-check is performed at |
| 563 // the end of the listener loop instead of here. | 582 // the end of the listener loop instead of here. |
| 564 if ((_state & _STATE_FIRING) == 0 && _isEmpty) { | 583 if ((_state & _STATE_FIRING) == 0 && _isEmpty) { |
| 565 _callOnCancel(); | 584 _callOnCancel(); |
| 566 } | 585 } |
| 567 } | 586 } |
| (...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 629 } | 648 } |
| 630 } | 649 } |
| 631 _state &= ~_STATE_FIRING; | 650 _state &= ~_STATE_FIRING; |
| 632 | 651 |
| 633 if (_isEmpty) { | 652 if (_isEmpty) { |
| 634 _callOnCancel(); | 653 _callOnCancel(); |
| 635 } | 654 } |
| 636 } | 655 } |
| 637 | 656 |
| 638 void _callOnCancel() { | 657 void _callOnCancel() { |
| 658 if (_sink != null && isClosed) { | |
|
floitsch
2013/06/06 15:08:29
add comment explaining why you look at `isClosed`.
| |
| 659 _sink._cancel(); | |
| 660 } | |
| 639 _runGuarded(_onCancel); | 661 _runGuarded(_onCancel); |
| 640 } | 662 } |
| 641 } | 663 } |
| 642 | 664 |
| 643 class _SyncBroadcastStreamController<T> extends _BroadcastStreamController<T> { | 665 class _SyncBroadcastStreamController<T> extends _BroadcastStreamController<T> { |
| 644 _SyncBroadcastStreamController(void onListen(), void onCancel()) | 666 _SyncBroadcastStreamController(void onListen(), void onCancel()) |
| 645 : super(onListen, onCancel); | 667 : super(onListen, onCancel); |
| 646 | 668 |
| 647 // EventDispatch interface. | 669 // EventDispatch interface. |
| 648 | 670 |
| (...skipping 116 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 765 } | 787 } |
| 766 | 788 |
| 767 void _callOnCancel() { | 789 void _callOnCancel() { |
| 768 if (_hasPending) { | 790 if (_hasPending) { |
| 769 _pending.clear(); | 791 _pending.clear(); |
| 770 _pending = null; | 792 _pending = null; |
| 771 } | 793 } |
| 772 super._callOnCancel(); | 794 super._callOnCancel(); |
| 773 } | 795 } |
| 774 } | 796 } |
| 797 | |
| 798 | |
| 799 /** | |
| 800 * [EventSink] wrapper that only exposes a [StreamSink] interface. | |
| 801 */ | |
| 802 class _ControllerStreamSink<T> implements StreamSink<T> { | |
| 803 final EventSink<T> _sink; | |
| 804 // Future completed when then controller stream is closed. | |
| 805 _FutureImpl _doneFuture; | |
| 806 // [_FutureImpl] returned by latest call to addStream. | |
| 807 // Set to null while not processing an [addStream] stream. | |
| 808 _FutureImpl _addStreamFuture; | |
| 809 // Subscription of latest call to addStream. | |
| 810 // Set to null while not processing an [addStream] stream. | |
| 811 StreamSubscription _subscription; | |
| 812 | |
| 813 _ControllerStreamSink(this._sink); | |
| 814 | |
| 815 bool get _isAddStreamActive => _addStreamFuture != null; | |
| 816 | |
| 817 void _pause() { | |
| 818 if (_subscription != null) _subscription.pause(); | |
| 819 } | |
| 820 | |
| 821 void _resume() { | |
| 822 if (_subscription != null) _subscription.resume(); | |
| 823 } | |
| 824 | |
| 825 void _cancel() { | |
| 826 if (_isAddStreamActive) { | |
| 827 StreamSubscription subscription = _subscription; | |
| 828 _FutureImpl future = _addStreamFuture; | |
| 829 _subscription = null; | |
| 830 _addStreamFuture = null; | |
| 831 subscription.cancel(); | |
| 832 future._setValue(null); | |
| 833 } | |
| 834 if (_doneFuture != null) { | |
| 835 _doneFuture._setValue(null); | |
| 836 } | |
| 837 } | |
| 838 | |
| 839 void add(T value) { | |
| 840 if (_isAddStreamActive) { | |
| 841 throw new StateError("Cannot add events while addStream is running."); | |
| 842 } | |
| 843 _sink.add(value); | |
| 844 } | |
| 845 | |
| 846 void addError(error) { | |
| 847 if (_isAddStreamActive) { | |
| 848 throw new StateError("Cannot add events while addStream is running."); | |
| 849 } | |
| 850 _sink.addError(error); | |
| 851 } | |
| 852 | |
| 853 Future close() { | |
| 854 if (_isAddStreamActive) { | |
| 855 throw new StateError("Cannot add events while addStream is running."); | |
| 856 } | |
| 857 if (_doneFuture == null) _doneFuture = new _FutureImpl(); | |
| 858 _sink.close(); | |
| 859 return _doneFuture; | |
| 860 } | |
| 861 | |
| 862 Future addStream(Stream<T> stream) { | |
| 863 if (_isAddStreamActive) { | |
| 864 throw new StateError("Cannot add a new stream while " | |
| 865 "addStream is running."); | |
| 866 } | |
| 867 _addStreamFuture = new _FutureImpl(); | |
| 868 _subscription = stream.listen( | |
| 869 _sink.add, | |
| 870 onError: (error) { | |
| 871 _FutureImpl future = _addStreamFuture; | |
| 872 _addStreamFuture = null; | |
| 873 _subscription = null; | |
| 874 future._setError(error); | |
| 875 }, | |
| 876 onDone: () { | |
| 877 _FutureImpl future = _addStreamFuture; | |
| 878 _addStreamFuture = null; | |
| 879 _subscription = null; | |
| 880 future._setValue(null); | |
| 881 }, | |
| 882 cancelOnError: true | |
| 883 ); | |
| 884 return _addStreamFuture; | |
| 885 } | |
| 886 | |
| 887 Future get done => | |
| 888 (_addStreamFuture != null) ? _addStreamFuture | |
| 889 : new _FutureImpl.immediate(null); | |
| 890 } | |
| OLD | NEW |