Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(628)

Side by Side Diff: sdk/lib/async/stream_controller.dart

Issue 1242023007: Add setters for callbacks on StreamController. (Closed) Base URL: https://github.com/dart-lang/sdk.git@master
Patch Set: Address comments. Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Controller for creating and adding events to a stream. 8 // Controller for creating and adding events to a stream.
9 // ------------------------------------------------------------------- 9 // -------------------------------------------------------------------
10 10
(...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after
70 * is done. 70 * is done.
71 * 71 *
72 * If the stream is canceled before the controller needs new data the 72 * If the stream is canceled before the controller needs new data the
73 * [onResume] call might not be executed. 73 * [onResume] call might not be executed.
74 */ 74 */
75 factory StreamController({void onListen(), 75 factory StreamController({void onListen(),
76 void onPause(), 76 void onPause(),
77 void onResume(), 77 void onResume(),
78 onCancel(), 78 onCancel(),
79 bool sync: false}) { 79 bool sync: false}) {
80 if (onListen == null && onPause == null &&
81 onResume == null && onCancel == null) {
82 return sync
83 ? new _NoCallbackSyncStreamController<T>()
84 : new _NoCallbackAsyncStreamController<T>();
85 }
86 return sync 80 return sync
87 ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel) 81 ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel)
88 : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel); 82 : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel);
89 } 83 }
90 84
91 /** 85 /**
92 * A controller where [stream] can be listened to more than once. 86 * A controller where [stream] can be listened to more than once.
93 * 87 *
94 * The [Stream] returned by [stream] is a broadcast stream. 88 * The [Stream] returned by [stream] is a broadcast stream.
95 * It can be listened to more than once. 89 * It can be listened to more than once.
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after
132 */ 126 */
133 factory StreamController.broadcast({void onListen(), 127 factory StreamController.broadcast({void onListen(),
134 void onCancel(), 128 void onCancel(),
135 bool sync: false}) { 129 bool sync: false}) {
136 return sync 130 return sync
137 ? new _SyncBroadcastStreamController<T>(onListen, onCancel) 131 ? new _SyncBroadcastStreamController<T>(onListen, onCancel)
138 : new _AsyncBroadcastStreamController<T>(onListen, onCancel); 132 : new _AsyncBroadcastStreamController<T>(onListen, onCancel);
139 } 133 }
140 134
141 /** 135 /**
136 * Sets the callback which is called when the stream is listened to.
137 *
138 * This overrides the previous callback, or clears it if the [onListenHandler]
139 * is `null`.
140 */
141 void set onListen(void onListenHandler());
nweiz 2015/07/17 20:40:16 It's kind of weird that these are setters, whereas
Lasse Reichstein Nielsen 2015/08/05 09:08:45 It is annoying that we didn't go for setters in St
nweiz 2015/08/06 01:02:13 I talked to Bob about this, and he pointed out tha
Lasse Reichstein Nielsen 2015/08/06 07:00:41 Well, it's quite symmetric - you have read-only fi
floitsch 2015/08/06 09:28:12 In this case I wouldn't onListen to setOnListen. I
floitsch 2015/08/06 09:28:12 I actually disagree. Sometimes a "setX" method is
Lasse Reichstein Nielsen 2015/08/06 14:09:40 I think we did screw up that particular type of na
nweiz 2015/08/06 19:56:12 I'm fine with these being setters as long as there
142
143 /**
144 * Sets the callback which is called when the stream is paused.
145 *
146 * This overrides the previous callback, or clears it if the [onPauseHandler]
147 * is `null`.
148 *
149 * Pause related callbacks are not supported on broadcast stream controllers.
150 */
151 void set onPause(void onPauseHandler());
152
153 /**
154 * Sets the callback which is called when the stream is resumed.
155 *
156 * This overrides the previous callback, or clears it if the [onResumeHandler]
157 * is `null`.
158 *
159 * Pause related callbacks are not supported on broadcast stream controllers.
160 */
161 void set onResume(void onResumeHandler());
162
163 /**
164 * Sets the callback which is called when the stream is canceled.
165 *
166 * This overrides the previous callback, or clears it if the [onCancelHandler]
167 * is `null`.
168 */
169 void set onCancel(onCancelHandler());
170
171 /**
142 * Returns a view of this object that only exposes the [StreamSink] interface. 172 * Returns a view of this object that only exposes the [StreamSink] interface.
143 */ 173 */
144 StreamSink<T> get sink; 174 StreamSink<T> get sink;
145 175
146 /** 176 /**
147 * Whether the stream controller is closed for adding more events. 177 * Whether the stream controller is closed for adding more events.
148 * 178 *
149 * The controller becomes closed by calling the [close] method. 179 * The controller becomes closed by calling the [close] method.
150 * New events cannot be added, by calling [add] or [addError], 180 * New events cannot be added, by calling [add] or [addError],
151 * to a closed controller. 181 * to a closed controller.
(...skipping 224 matching lines...) Expand 10 before | Expand all | Expand 10 after
376 /** 406 /**
377 * Future completed when the stream sends its last event. 407 * Future completed when the stream sends its last event.
378 * 408 *
379 * This is also the future returned by [close]. 409 * This is also the future returned by [close].
380 */ 410 */
381 // TODO(lrn): Could this be stored in the varData field too, if it's not 411 // TODO(lrn): Could this be stored in the varData field too, if it's not
382 // accessed until the call to "close"? Then we need to special case if it's 412 // accessed until the call to "close"? Then we need to special case if it's
383 // accessed earlier, or if close is called before subscribing. 413 // accessed earlier, or if close is called before subscribing.
384 _Future _doneFuture; 414 _Future _doneFuture;
385 415
386 _StreamController(); 416 _NotificationHandler _onListen;
417 _NotificationHandler _onPause;
418 _NotificationHandler _onResume;
419 _NotificationHandler _onCancel;
387 420
388 _NotificationHandler get _onListen; 421 _StreamController(void this._onListen(),
389 _NotificationHandler get _onPause; 422 void this._onPause(),
390 _NotificationHandler get _onResume; 423 void this._onResume(),
391 _NotificationHandler get _onCancel; 424 this._onCancel());
425
426 void set onListen(void onListenHandler()) { _onListen = onListenHandler; }
427
428 void set onPause(void onPauseHandler()) { _onPause = onPauseHandler; }
429
430 void set onResume(void onResumeHandler()) { _onResume = onResumeHandler; }
431
432 void set onCancel(onCancelHandler()) { _onCancel = onCancelHandler; }
392 433
393 // Return a new stream every time. The streams are equal, but not identical. 434 // Return a new stream every time. The streams are equal, but not identical.
394 Stream<T> get stream => new _ControllerStream<T>(this); 435 Stream<T> get stream => new _ControllerStream<T>(this);
395 436
396 /** 437 /**
397 * Returns a view of this object that only exposes the [StreamSink] interface. 438 * Returns a view of this object that only exposes the [StreamSink] interface.
398 */ 439 */
399 StreamSink<T> get sink => new _StreamSinkWrapper<T>(this); 440 StreamSink<T> get sink => new _StreamSinkWrapper<T>(this);
400 441
401 /** 442 /**
(...skipping 313 matching lines...) Expand 10 before | Expand all | Expand 10 after
715 } 756 }
716 757
717 void _sendDone() { 758 void _sendDone() {
718 _subscription._addPending(const _DelayedDone()); 759 _subscription._addPending(const _DelayedDone());
719 } 760 }
720 } 761 }
721 762
722 // TODO(lrn): Use common superclass for callback-controllers when VM supports 763 // TODO(lrn): Use common superclass for callback-controllers when VM supports
723 // constructors in mixin superclasses. 764 // constructors in mixin superclasses.
724 765
725 class _AsyncStreamController<T> extends _StreamController<T> 766 class _AsyncStreamController<T> = _StreamController<T>
726 with _AsyncStreamControllerDispatch<T> { 767 with _AsyncStreamControllerDispatch<T>;
727 final _NotificationHandler _onListen;
728 final _NotificationHandler _onPause;
729 final _NotificationHandler _onResume;
730 final _NotificationHandler _onCancel;
731 768
732 _AsyncStreamController(void this._onListen(), 769 class _SyncStreamController<T> = _StreamController<T>
733 void this._onPause(), 770 with _SyncStreamControllerDispatch<T>;
734 void this._onResume(),
735 this._onCancel());
736 }
737
738 class _SyncStreamController<T> extends _StreamController<T>
739 with _SyncStreamControllerDispatch<T> {
740 final _NotificationHandler _onListen;
741 final _NotificationHandler _onPause;
742 final _NotificationHandler _onResume;
743 final _NotificationHandler _onCancel;
744
745 _SyncStreamController(void this._onListen(),
746 void this._onPause(),
747 void this._onResume(),
748 this._onCancel());
749 }
750
751 abstract class _NoCallbacks {
752 _NotificationHandler get _onListen => null;
753 _NotificationHandler get _onPause => null;
754 _NotificationHandler get _onResume => null;
755 _NotificationHandler get _onCancel => null;
756 }
757
758 class _NoCallbackAsyncStreamController<T> = _StreamController<T>
759 with _AsyncStreamControllerDispatch<T>, _NoCallbacks;
760
761 class _NoCallbackSyncStreamController<T> = _StreamController<T>
762 with _SyncStreamControllerDispatch<T>, _NoCallbacks;
763 771
764 typedef _NotificationHandler(); 772 typedef _NotificationHandler();
765 773
766 Future _runGuarded(_NotificationHandler notificationHandler) { 774 Future _runGuarded(_NotificationHandler notificationHandler) {
767 if (notificationHandler == null) return null; 775 if (notificationHandler == null) return null;
768 try { 776 try {
769 var result = notificationHandler(); 777 var result = notificationHandler();
770 if (result is Future) return result; 778 if (result is Future) return result;
771 return null; 779 return null;
772 } catch (e, s) { 780 } catch (e, s) {
(...skipping 126 matching lines...) Expand 10 before | Expand all | Expand 10 after
899 _StreamControllerAddStreamState(_StreamController controller, 907 _StreamControllerAddStreamState(_StreamController controller,
900 this.varData, 908 this.varData,
901 Stream source, 909 Stream source,
902 bool cancelOnError) 910 bool cancelOnError)
903 : super(controller, source, cancelOnError) { 911 : super(controller, source, cancelOnError) {
904 if (controller.isPaused) { 912 if (controller.isPaused) {
905 addSubscription.pause(); 913 addSubscription.pause();
906 } 914 }
907 } 915 }
908 } 916 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698