Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1)

Side by Side Diff: sdk/lib/async/stream_controller.dart

Issue 16240008: Make StreamController be a StreamSink, not just an EventSink. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 7 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Controller for creating and adding events to a stream. 8 // Controller for creating and adding events to a stream.
9 // ------------------------------------------------------------------- 9 // -------------------------------------------------------------------
10 10
(...skipping 107 matching lines...) Expand 10 before | Expand all | Expand 10 after
118 void onCancel(), 118 void onCancel(),
119 bool sync: false}) { 119 bool sync: false}) {
120 return sync 120 return sync
121 ? new _SyncBroadcastStreamController<T>(onListen, onCancel) 121 ? new _SyncBroadcastStreamController<T>(onListen, onCancel)
122 : new _AsyncBroadcastStreamController<T>(onListen, onCancel); 122 : new _AsyncBroadcastStreamController<T>(onListen, onCancel);
123 } 123 }
124 124
125 /** 125 /**
126 * Returns a view of this object that only exposes the [EventSink] interface. 126 * Returns a view of this object that only exposes the [EventSink] interface.
127 */ 127 */
128 EventSink<T> get sink; 128 StreamSink<T> get sink;
129 129
130 /** 130 /**
131 * Whether the stream is closed for adding more events. 131 * Whether the stream is closed for adding more events.
132 * 132 *
133 * If true, the "done" event might not have fired yet, but it has been 133 * If true, the "done" event might not have fired yet, but it has been
134 * scheduled, and it is too late to add more events. 134 * scheduled, and it is too late to add more events.
135 */ 135 */
136 bool get isClosed; 136 bool get isClosed;
137 137
138 /** 138 /**
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after
178 _EventDispatch<T> { 178 _EventDispatch<T> {
179 static const int _STATE_OPEN = 0; 179 static const int _STATE_OPEN = 0;
180 static const int _STATE_CANCELLED = 1; 180 static const int _STATE_CANCELLED = 1;
181 static const int _STATE_CLOSED = 2; 181 static const int _STATE_CLOSED = 2;
182 182
183 final _NotificationHandler _onListen; 183 final _NotificationHandler _onListen;
184 final _NotificationHandler _onPause; 184 final _NotificationHandler _onPause;
185 final _NotificationHandler _onResume; 185 final _NotificationHandler _onResume;
186 final _NotificationHandler _onCancel; 186 final _NotificationHandler _onCancel;
187 _StreamImpl<T> _stream; 187 _StreamImpl<T> _stream;
188 /**
189 * Cached value returned by [sink].
190 *
191 * Used to pause the stream if necessary.
192 */
193 _ControllerStreamSink _sink;
188 194
189 // An active subscription on the stream, or null if no subscripton is active. 195 // An active subscription on the stream, or null if no subscripton is active.
190 _ControllerSubscription<T> _subscription; 196 _ControllerSubscription<T> _subscription;
191 197
192 // Whether we have sent a "done" event. 198 // Whether we have sent a "done" event.
193 int _state = _STATE_OPEN; 199 int _state = _STATE_OPEN;
194 200
195 // Events added to the stream before it has an active subscription. 201 // Events added to the stream before it has an active subscription.
196 _PendingEvents _pendingEvents = null; 202 _PendingEvents _pendingEvents = null;
197 203
198 _StreamController(this._onListen, 204 _StreamController(this._onListen,
199 this._onPause, 205 this._onPause,
200 this._onResume, 206 this._onResume,
201 this._onCancel) { 207 this._onCancel) {
202 _stream = new _ControllerStream<T>(this); 208 _stream = new _ControllerStream<T>(this);
203 } 209 }
204 210
205 Stream<T> get stream => _stream; 211 Stream<T> get stream => _stream;
206 212
207 /** 213 /**
208 * Returns a view of this object that only exposes the [EventSink] interface. 214 * Returns a view of this object that only exposes the [EventSink] interface.
209 */ 215 */
210 EventSink<T> get sink => new _EventSinkView<T>(this); 216 StreamSink<T> get sink =>
217 (_sink != null) ? _sink
218 : _sink = new _ControllerStreamSink<T>(this);
211 219
212 /** 220 /**
213 * Whether a listener has existed and been cancelled. 221 * Whether a listener has existed and been cancelled.
214 * 222 *
215 * After this, adding more events will be ignored. 223 * After this, adding more events will be ignored.
216 */ 224 */
217 bool get _isCancelled => (_state & _STATE_CANCELLED) != 0; 225 bool get _isCancelled => (_state & _STATE_CANCELLED) != 0;
218 226
219 bool get isClosed => (_state & _STATE_CLOSED) != 0; 227 bool get isClosed => (_state & _STATE_CLOSED) != 0;
220 228
(...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after
291 _pendingEvents = null; 299 _pendingEvents = null;
292 subscription._guardCallback(() { 300 subscription._guardCallback(() {
293 _runGuarded(_onListen); 301 _runGuarded(_onListen);
294 }); 302 });
295 } 303 }
296 304
297 void _recordCancel(StreamSubscription<T> subscription) { 305 void _recordCancel(StreamSubscription<T> subscription) {
298 assert(identical(_subscription, subscription)); 306 assert(identical(_subscription, subscription));
299 _subscription = null; 307 _subscription = null;
300 _state |= _STATE_CANCELLED; 308 _state |= _STATE_CANCELLED;
309 if (_sink != null) _sink._cancel();
301 _runGuarded(_onCancel); 310 _runGuarded(_onCancel);
302 } 311 }
303 312
304 void _recordPause(StreamSubscription<T> subscription) { 313 void _recordPause(StreamSubscription<T> subscription) {
314 if (_sink != null) _sink._pause();
305 _runGuarded(_onPause); 315 _runGuarded(_onPause);
floitsch 2013/06/06 15:08:29 Do we want to call onPause when there is an addStr
306 } 316 }
307 317
308 void _recordResume(StreamSubscription<T> subscription) { 318 void _recordResume(StreamSubscription<T> subscription) {
319 if (_sink != null) _sink._resume();
309 _runGuarded(_onResume); 320 _runGuarded(_onResume);
floitsch 2013/06/06 15:08:29 ditto.
310 } 321 }
311 } 322 }
312 323
313 class _SyncStreamController<T> extends _StreamController<T> { 324 class _SyncStreamController<T> extends _StreamController<T> {
314 _SyncStreamController(void onListen(), 325 _SyncStreamController(void onListen(),
315 void onPause(), 326 void onPause(),
316 void onResume(), 327 void onResume(),
317 void onCancel()) 328 void onCancel())
318 : super(onListen, onPause, onResume, onCancel); 329 : super(onListen, onPause, onResume, onCancel);
319 330
(...skipping 168 matching lines...) Expand 10 before | Expand all | Expand 10 after
488 final _NotificationHandler _onListen; 499 final _NotificationHandler _onListen;
489 final _NotificationHandler _onCancel; 500 final _NotificationHandler _onCancel;
490 501
491 // State of the controller. 502 // State of the controller.
492 int _state; 503 int _state;
493 504
494 // Double-linked list of active listeners. 505 // Double-linked list of active listeners.
495 _BroadcastSubscriptionLink _next; 506 _BroadcastSubscriptionLink _next;
496 _BroadcastSubscriptionLink _previous; 507 _BroadcastSubscriptionLink _previous;
497 508
509 // Cached return value of [sink]. Used to cancel a `sink.addStream`
floitsch 2013/06/06 15:08:29 cancel, pause and resume a `sink.addStream`.
510 // when the stream ends.
511 _ControllerStreamSink _sink;
512
498 _BroadcastStreamController(this._onListen, this._onCancel) 513 _BroadcastStreamController(this._onListen, this._onCancel)
499 : _state = _STATE_INITIAL { 514 : _state = _STATE_INITIAL {
500 _next = _previous = this; 515 _next = _previous = this;
501 } 516 }
502 517
503 // StreamController interface. 518 // StreamController interface.
504 519
505 Stream<T> get stream => new _BroadcastStream<T>(this); 520 Stream<T> get stream => new _BroadcastStream<T>(this);
506 521
507 EventSink<T> get sink => new _EventSinkView<T>(this); 522 StreamSink<T> get sink =>
523 (_sink != null) ? _sink : _sink = new _ControllerStreamSink<T>(this);
508 524
509 bool get isClosed => (_state & _STATE_CLOSED) != 0; 525 bool get isClosed => (_state & _STATE_CLOSED) != 0;
510 526
511 /** 527 /**
512 * A broadcast controller is never paused. 528 * A broadcast controller is never paused.
513 * 529 *
514 * Each receiving stream may be paused individually, and they handle their 530 * Each receiving stream may be paused individually, and they handle their
515 * own buffering. 531 * own buffering.
516 */ 532 */
517 bool get isPaused => false; 533 bool get isPaused => false;
(...skipping 11 matching lines...) Expand all
529 /** Adds subscription to linked list of active listeners. */ 545 /** Adds subscription to linked list of active listeners. */
530 void _addListener(_BroadcastSubscription<T> subscription) { 546 void _addListener(_BroadcastSubscription<T> subscription) {
531 _BroadcastSubscriptionLink previous = _previous; 547 _BroadcastSubscriptionLink previous = _previous;
532 previous._next = subscription; 548 previous._next = subscription;
533 _previous = subscription._previous; 549 _previous = subscription._previous;
534 subscription._previous._next = this; 550 subscription._previous._next = this;
535 subscription._previous = previous; 551 subscription._previous = previous;
536 subscription._eventState = (_state & _STATE_EVENT_ID); 552 subscription._eventState = (_state & _STATE_EVENT_ID);
537 } 553 }
538 554
539 void _removeListener(_BroadcastSubscription<T> subscription) { 555 bool _removeListener(_BroadcastSubscription<T> subscription) {
540 assert(identical(subscription._controller, this)); 556 assert(identical(subscription._controller, this));
541 assert(!identical(subscription._next, subscription)); 557 assert(!identical(subscription._next, subscription));
542 subscription._previous._next = subscription._next; 558 subscription._previous._next = subscription._next;
543 subscription._next._previous = subscription._previous; 559 subscription._next._previous = subscription._previous;
544 subscription._next = subscription._previous = subscription; 560 subscription._next = subscription._previous = subscription;
561 return true;
545 } 562 }
546 563
547 // _StreamControllerLifecycle interface. 564 // _StreamControllerLifecycle interface.
548 565
549 void _recordListen(_BroadcastSubscription<T> subscription) { 566 void _recordListen(_BroadcastSubscription<T> subscription) {
550 _addListener(subscription); 567 _addListener(subscription);
551 if (identical(_next, _previous)) { 568 if (identical(_next, _previous)) {
552 // Only one listener, so it must be the first listener. 569 // Only one listener, so it must be the first listener.
553 _runGuarded(_onListen); 570 _runGuarded(_onListen);
554 } 571 }
555 } 572 }
556 573
557 void _recordCancel(_BroadcastSubscription<T> subscription) { 574 void _recordCancel(_BroadcastSubscription<T> subscription) {
575 // If already removed by the stream, don't remove it again.
576 if (identical(subscription._next, subscription)) return;
558 if (subscription._isFiring) { 577 if (subscription._isFiring) {
559 subscription._setRemoveAfterFiring(); 578 subscription._setRemoveAfterFiring();
560 } else { 579 } else {
561 _removeListener(subscription); 580 _removeListener(subscription);
562 // If we are currently firing an event, the empty-check is performed at 581 // If we are currently firing an event, the empty-check is performed at
563 // the end of the listener loop instead of here. 582 // the end of the listener loop instead of here.
564 if ((_state & _STATE_FIRING) == 0 && _isEmpty) { 583 if ((_state & _STATE_FIRING) == 0 && _isEmpty) {
565 _callOnCancel(); 584 _callOnCancel();
566 } 585 }
567 } 586 }
(...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after
629 } 648 }
630 } 649 }
631 _state &= ~_STATE_FIRING; 650 _state &= ~_STATE_FIRING;
632 651
633 if (_isEmpty) { 652 if (_isEmpty) {
634 _callOnCancel(); 653 _callOnCancel();
635 } 654 }
636 } 655 }
637 656
638 void _callOnCancel() { 657 void _callOnCancel() {
658 if (_sink != null && isClosed) {
floitsch 2013/06/06 15:08:29 add comment explaining why you look at `isClosed`.
659 _sink._cancel();
660 }
639 _runGuarded(_onCancel); 661 _runGuarded(_onCancel);
640 } 662 }
641 } 663 }
642 664
643 class _SyncBroadcastStreamController<T> extends _BroadcastStreamController<T> { 665 class _SyncBroadcastStreamController<T> extends _BroadcastStreamController<T> {
644 _SyncBroadcastStreamController(void onListen(), void onCancel()) 666 _SyncBroadcastStreamController(void onListen(), void onCancel())
645 : super(onListen, onCancel); 667 : super(onListen, onCancel);
646 668
647 // EventDispatch interface. 669 // EventDispatch interface.
648 670
(...skipping 116 matching lines...) Expand 10 before | Expand all | Expand 10 after
765 } 787 }
766 788
767 void _callOnCancel() { 789 void _callOnCancel() {
768 if (_hasPending) { 790 if (_hasPending) {
769 _pending.clear(); 791 _pending.clear();
770 _pending = null; 792 _pending = null;
771 } 793 }
772 super._callOnCancel(); 794 super._callOnCancel();
773 } 795 }
774 } 796 }
797
798
799 /**
800 * [EventSink] wrapper that only exposes a [StreamSink] interface.
801 */
802 class _ControllerStreamSink<T> implements StreamSink<T> {
803 final EventSink<T> _sink;
804 // Future completed when then controller stream is closed.
805 _FutureImpl _doneFuture;
806 // [_FutureImpl] returned by latest call to addStream.
807 // Set to null while not processing an [addStream] stream.
808 _FutureImpl _addStreamFuture;
809 // Subscription of latest call to addStream.
810 // Set to null while not processing an [addStream] stream.
811 StreamSubscription _subscription;
812
813 _ControllerStreamSink(this._sink);
814
815 bool get _isAddStreamActive => _addStreamFuture != null;
816
817 void _pause() {
818 if (_subscription != null) _subscription.pause();
819 }
820
821 void _resume() {
822 if (_subscription != null) _subscription.resume();
823 }
824
825 void _cancel() {
826 if (_isAddStreamActive) {
827 StreamSubscription subscription = _subscription;
828 _FutureImpl future = _addStreamFuture;
829 _subscription = null;
830 _addStreamFuture = null;
831 subscription.cancel();
832 future._setValue(null);
833 }
834 if (_doneFuture != null) {
835 _doneFuture._setValue(null);
836 }
837 }
838
839 void add(T value) {
840 if (_isAddStreamActive) {
841 throw new StateError("Cannot add events while addStream is running.");
842 }
843 _sink.add(value);
844 }
845
846 void addError(error) {
847 if (_isAddStreamActive) {
848 throw new StateError("Cannot add events while addStream is running.");
849 }
850 _sink.addError(error);
851 }
852
853 Future close() {
854 if (_isAddStreamActive) {
855 throw new StateError("Cannot add events while addStream is running.");
856 }
857 if (_doneFuture == null) _doneFuture = new _FutureImpl();
858 _sink.close();
859 return _doneFuture;
860 }
861
862 Future addStream(Stream<T> stream) {
863 if (_isAddStreamActive) {
864 throw new StateError("Cannot add a new stream while "
865 "addStream is running.");
866 }
867 _addStreamFuture = new _FutureImpl();
868 _subscription = stream.listen(
869 _sink.add,
870 onError: (error) {
871 _FutureImpl future = _addStreamFuture;
872 _addStreamFuture = null;
873 _subscription = null;
874 future._setError(error);
875 },
876 onDone: () {
877 _FutureImpl future = _addStreamFuture;
878 _addStreamFuture = null;
879 _subscription = null;
880 future._setValue(null);
881 },
882 cancelOnError: true
883 );
884 return _addStreamFuture;
885 }
886
887 Future get done =>
888 (_addStreamFuture != null) ? _addStreamFuture
889 : new _FutureImpl.immediate(null);
890 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698