Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(493)

Side by Side Diff: sdk/lib/async/stream_controller.dart

Issue 1278873008: Add getters for callbacks on StreamController. (Closed) Base URL: https://github.com/dart-lang/sdk.git@master
Patch Set: Fix incorrect return type on broadcast stream controller. Created 5 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Controller for creating and adding events to a stream. 8 // Controller for creating and adding events to a stream.
9 // ------------------------------------------------------------------- 9 // -------------------------------------------------------------------
10 10
11 /** 11 /**
12 * Type of a stream controller's `onListen`, `onPause` and `onResume` callbacks.
13 */
14 typedef void ControllerCallback();
15
16 /**
17 * Type of stream controller `onCancel` callbacks.
18 *
19 * The callback may return either `void` or a future.
20 */
21 typedef ControllerCancelCallback();
22
23 /**
12 * A controller with the stream it controls. 24 * A controller with the stream it controls.
13 * 25 *
14 * This controller allows sending data, error and done events on 26 * This controller allows sending data, error and done events on
15 * its [stream]. 27 * its [stream].
16 * This class can be used to create a simple stream that others 28 * This class can be used to create a simple stream that others
17 * can listen on, and to push events to that stream. 29 * can listen on, and to push events to that stream.
18 * 30 *
19 * It's possible to check whether the stream is paused or not, and whether 31 * It's possible to check whether the stream is paused or not, and whether
20 * it has subscribers or not, as well as getting a callback when either of 32 * it has subscribers or not, as well as getting a callback when either of
21 * these change. 33 * these change.
(...skipping 104 matching lines...) Expand 10 before | Expand all | Expand 10 after
126 */ 138 */
127 factory StreamController.broadcast({void onListen(), 139 factory StreamController.broadcast({void onListen(),
128 void onCancel(), 140 void onCancel(),
129 bool sync: false}) { 141 bool sync: false}) {
130 return sync 142 return sync
131 ? new _SyncBroadcastStreamController<T>(onListen, onCancel) 143 ? new _SyncBroadcastStreamController<T>(onListen, onCancel)
132 : new _AsyncBroadcastStreamController<T>(onListen, onCancel); 144 : new _AsyncBroadcastStreamController<T>(onListen, onCancel);
133 } 145 }
134 146
135 /** 147 /**
136 * Sets the callback which is called when the stream is listened to. 148 * The callback which is called when the stream is listened to.
137 * 149 *
138 * This overrides the previous callback, or clears it if the [onListenHandler] 150 * May be set to `null`, in which case no callback will happen.
139 * is `null`.
140 */ 151 */
152 ControllerCallback get onListen;
153
141 void set onListen(void onListenHandler()); 154 void set onListen(void onListenHandler());
142 155
143 /** 156 /**
144 * Sets the callback which is called when the stream is paused. 157 * The callback which is called when the stream is paused.
145 * 158 *
146 * This overrides the previous callback, or clears it if the [onPauseHandler] 159 * May be set to `null`, in which case no callback will happen.
147 * is `null`.
148 * 160 *
149 * Pause related callbacks are not supported on broadcast stream controllers. 161 * Pause related callbacks are not supported on broadcast stream controllers.
150 */ 162 */
163 ControllerCallback get onPause;
164
151 void set onPause(void onPauseHandler()); 165 void set onPause(void onPauseHandler());
152 166
153 /** 167 /**
154 * Sets the callback which is called when the stream is resumed. 168 * The callback which is called when the stream is resumed.
155 * 169 *
156 * This overrides the previous callback, or clears it if the [onResumeHandler] 170 * May be set to `null`, in which case no callback will happen.
157 * is `null`.
158 * 171 *
159 * Pause related callbacks are not supported on broadcast stream controllers. 172 * Pause related callbacks are not supported on broadcast stream controllers.
160 */ 173 */
174 ControllerCallback get onResume;
175
161 void set onResume(void onResumeHandler()); 176 void set onResume(void onResumeHandler());
162 177
163 /** 178 /**
164 * Sets the callback which is called when the stream is canceled. 179 * The callback which is called when the stream is canceled.
165 * 180 *
166 * This overrides the previous callback, or clears it if the [onCancelHandler] 181 * May be set to `null`, in which case no callback will happen.
167 * is `null`.
168 */ 182 */
183 ControllerCancelCallback get onCancel;
184
169 void set onCancel(onCancelHandler()); 185 void set onCancel(onCancelHandler());
170 186
171 /** 187 /**
172 * Returns a view of this object that only exposes the [StreamSink] interface. 188 * Returns a view of this object that only exposes the [StreamSink] interface.
173 */ 189 */
174 StreamSink<T> get sink; 190 StreamSink<T> get sink;
175 191
176 /** 192 /**
177 * Whether the stream controller is closed for adding more events. 193 * Whether the stream controller is closed for adding more events.
178 * 194 *
(...skipping 227 matching lines...) Expand 10 before | Expand all | Expand 10 after
406 /** 422 /**
407 * Future completed when the stream sends its last event. 423 * Future completed when the stream sends its last event.
408 * 424 *
409 * This is also the future returned by [close]. 425 * This is also the future returned by [close].
410 */ 426 */
411 // TODO(lrn): Could this be stored in the varData field too, if it's not 427 // TODO(lrn): Could this be stored in the varData field too, if it's not
412 // accessed until the call to "close"? Then we need to special case if it's 428 // accessed until the call to "close"? Then we need to special case if it's
413 // accessed earlier, or if close is called before subscribing. 429 // accessed earlier, or if close is called before subscribing.
414 _Future _doneFuture; 430 _Future _doneFuture;
415 431
416 _NotificationHandler _onListen; 432 ControllerCallback onListen;
417 _NotificationHandler _onPause; 433 ControllerCallback onPause;
418 _NotificationHandler _onResume; 434 ControllerCallback onResume;
419 _NotificationHandler _onCancel; 435 ControllerCancelCallback onCancel;
420 436
421 _StreamController(void this._onListen(), 437 _StreamController(this.onListen,
422 void this._onPause(), 438 this.onPause,
423 void this._onResume(), 439 this.onResume,
424 this._onCancel()); 440 this.onCancel);
425
426 void set onListen(void onListenHandler()) { _onListen = onListenHandler; }
427
428 void set onPause(void onPauseHandler()) { _onPause = onPauseHandler; }
429
430 void set onResume(void onResumeHandler()) { _onResume = onResumeHandler; }
431
432 void set onCancel(onCancelHandler()) { _onCancel = onCancelHandler; }
433 441
434 // Return a new stream every time. The streams are equal, but not identical. 442 // Return a new stream every time. The streams are equal, but not identical.
435 Stream<T> get stream => new _ControllerStream<T>(this); 443 Stream<T> get stream => new _ControllerStream<T>(this);
436 444
437 /** 445 /**
438 * Returns a view of this object that only exposes the [StreamSink] interface. 446 * Returns a view of this object that only exposes the [StreamSink] interface.
439 */ 447 */
440 StreamSink<T> get sink => new _StreamSinkWrapper<T>(this); 448 StreamSink<T> get sink => new _StreamSinkWrapper<T>(this);
441 449
442 /** 450 /**
(...skipping 203 matching lines...) Expand 10 before | Expand all | Expand 10 after
646 _state |= _STATE_SUBSCRIBED; 654 _state |= _STATE_SUBSCRIBED;
647 if (_isAddingStream) { 655 if (_isAddingStream) {
648 _StreamControllerAddStreamState addState = _varData; 656 _StreamControllerAddStreamState addState = _varData;
649 addState.varData = subscription; 657 addState.varData = subscription;
650 addState.resume(); 658 addState.resume();
651 } else { 659 } else {
652 _varData = subscription; 660 _varData = subscription;
653 } 661 }
654 subscription._setPendingEvents(pendingEvents); 662 subscription._setPendingEvents(pendingEvents);
655 subscription._guardCallback(() { 663 subscription._guardCallback(() {
656 _runGuarded(_onListen); 664 _runGuarded(onListen);
657 }); 665 });
658 666
659 return subscription; 667 return subscription;
660 } 668 }
661 669
662 Future _recordCancel(StreamSubscription<T> subscription) { 670 Future _recordCancel(StreamSubscription<T> subscription) {
663 // When we cancel, we first cancel any stream being added, 671 // When we cancel, we first cancel any stream being added,
664 // Then we call _onCancel, and finally the _doneFuture is completed. 672 // Then we call `onCancel`, and finally the _doneFuture is completed.
665 // If either of addStream's cancel or _onCancel returns a future, 673 // If either of addStream's cancel or `onCancel` returns a future,
666 // we wait for it before continuing. 674 // we wait for it before continuing.
667 // Any error during this process ends up in the returned future. 675 // Any error during this process ends up in the returned future.
668 // If more errors happen, we act as if it happens inside nested try/finallys 676 // If more errors happen, we act as if it happens inside nested try/finallys
669 // or whenComplete calls, and only the last error ends up in the 677 // or whenComplete calls, and only the last error ends up in the
670 // returned future. 678 // returned future.
671 Future result; 679 Future result;
672 if (_isAddingStream) { 680 if (_isAddingStream) {
673 _StreamControllerAddStreamState addState = _varData; 681 _StreamControllerAddStreamState addState = _varData;
674 result = addState.cancel(); 682 result = addState.cancel();
675 } 683 }
676 _varData = null; 684 _varData = null;
677 _state = 685 _state =
678 (_state & ~(_STATE_SUBSCRIBED | _STATE_ADDSTREAM)) | _STATE_CANCELED; 686 (_state & ~(_STATE_SUBSCRIBED | _STATE_ADDSTREAM)) | _STATE_CANCELED;
679 687
680 if (_onCancel != null) { 688 if (onCancel != null) {
681 if (result == null) { 689 if (result == null) {
682 // Only introduce a future if one is needed. 690 // Only introduce a future if one is needed.
683 // If _onCancel returns null, no future is needed. 691 // If _onCancel returns null, no future is needed.
684 try { 692 try {
685 result = _onCancel(); 693 result = onCancel();
686 } catch (e, s) { 694 } catch (e, s) {
687 // Return the error in the returned future. 695 // Return the error in the returned future.
688 // Complete it asynchronously, so there is time for a listener 696 // Complete it asynchronously, so there is time for a listener
689 // to handle the error. 697 // to handle the error.
690 result = new _Future().._asyncCompleteError(e, s); 698 result = new _Future().._asyncCompleteError(e, s);
691 } 699 }
692 } else { 700 } else {
693 // Simpler case when we already know that we will return a future. 701 // Simpler case when we already know that we will return a future.
694 result = result.whenComplete(_onCancel); 702 result = result.whenComplete(onCancel);
695 } 703 }
696 } 704 }
697 705
698 void complete() { 706 void complete() {
699 if (_doneFuture != null && _doneFuture._mayComplete) { 707 if (_doneFuture != null && _doneFuture._mayComplete) {
700 _doneFuture._asyncComplete(null); 708 _doneFuture._asyncComplete(null);
701 } 709 }
702 } 710 }
703 711
704 if (result != null) { 712 if (result != null) {
705 result = result.whenComplete(complete); 713 result = result.whenComplete(complete);
706 } else { 714 } else {
707 complete(); 715 complete();
708 } 716 }
709 717
710 return result; 718 return result;
711 } 719 }
712 720
713 void _recordPause(StreamSubscription<T> subscription) { 721 void _recordPause(StreamSubscription<T> subscription) {
714 if (_isAddingStream) { 722 if (_isAddingStream) {
715 _StreamControllerAddStreamState addState = _varData; 723 _StreamControllerAddStreamState addState = _varData;
716 addState.pause(); 724 addState.pause();
717 } 725 }
718 _runGuarded(_onPause); 726 _runGuarded(onPause);
719 } 727 }
720 728
721 void _recordResume(StreamSubscription<T> subscription) { 729 void _recordResume(StreamSubscription<T> subscription) {
722 if (_isAddingStream) { 730 if (_isAddingStream) {
723 _StreamControllerAddStreamState addState = _varData; 731 _StreamControllerAddStreamState addState = _varData;
724 addState.resume(); 732 addState.resume();
725 } 733 }
726 _runGuarded(_onResume); 734 _runGuarded(onResume);
727 } 735 }
728 } 736 }
729 737
730 abstract class _SyncStreamControllerDispatch<T> 738 abstract class _SyncStreamControllerDispatch<T>
731 implements _StreamController<T>, SynchronousStreamController<T> { 739 implements _StreamController<T>, SynchronousStreamController<T> {
732 int get _state; 740 int get _state;
733 void set _state(int state); 741 void set _state(int state);
734 742
735 void _sendData(T data) { 743 void _sendData(T data) {
736 _subscription._add(data); 744 _subscription._add(data);
(...skipping 170 matching lines...) Expand 10 before | Expand all | Expand 10 after
907 _StreamControllerAddStreamState(_StreamController controller, 915 _StreamControllerAddStreamState(_StreamController controller,
908 this.varData, 916 this.varData,
909 Stream source, 917 Stream source,
910 bool cancelOnError) 918 bool cancelOnError)
911 : super(controller, source, cancelOnError) { 919 : super(controller, source, cancelOnError) {
912 if (controller.isPaused) { 920 if (controller.isPaused) {
913 addSubscription.pause(); 921 addSubscription.pause();
914 } 922 }
915 } 923 }
916 } 924 }
OLDNEW
« no previous file with comments | « sdk/lib/async/broadcast_stream_controller.dart ('k') | tests/lib/async/stream_controller_test.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698