OLD | NEW |
---|---|
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
8 // Controller for creating and adding events to a stream. | 8 // Controller for creating and adding events to a stream. |
9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
10 | 10 |
(...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
70 * is done. | 70 * is done. |
71 * | 71 * |
72 * If the stream is canceled before the controller needs new data the | 72 * If the stream is canceled before the controller needs new data the |
73 * [onResume] call might not be executed. | 73 * [onResume] call might not be executed. |
74 */ | 74 */ |
75 factory StreamController({void onListen(), | 75 factory StreamController({void onListen(), |
76 void onPause(), | 76 void onPause(), |
77 void onResume(), | 77 void onResume(), |
78 onCancel(), | 78 onCancel(), |
79 bool sync: false}) { | 79 bool sync: false}) { |
80 if (onListen == null && onPause == null && | |
81 onResume == null && onCancel == null) { | |
82 return sync | |
83 ? new _NoCallbackSyncStreamController<T>() | |
84 : new _NoCallbackAsyncStreamController<T>(); | |
85 } | |
86 return sync | 80 return sync |
87 ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel) | 81 ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel) |
88 : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel); | 82 : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel); |
89 } | 83 } |
90 | 84 |
91 /** | 85 /** |
92 * A controller where [stream] can be listened to more than once. | 86 * A controller where [stream] can be listened to more than once. |
93 * | 87 * |
94 * The [Stream] returned by [stream] is a broadcast stream. | 88 * The [Stream] returned by [stream] is a broadcast stream. |
95 * It can be listened to more than once. | 89 * It can be listened to more than once. |
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
132 */ | 126 */ |
133 factory StreamController.broadcast({void onListen(), | 127 factory StreamController.broadcast({void onListen(), |
134 void onCancel(), | 128 void onCancel(), |
135 bool sync: false}) { | 129 bool sync: false}) { |
136 return sync | 130 return sync |
137 ? new _SyncBroadcastStreamController<T>(onListen, onCancel) | 131 ? new _SyncBroadcastStreamController<T>(onListen, onCancel) |
138 : new _AsyncBroadcastStreamController<T>(onListen, onCancel); | 132 : new _AsyncBroadcastStreamController<T>(onListen, onCancel); |
139 } | 133 } |
140 | 134 |
141 /** | 135 /** |
136 * Sets the callback which is called when the stream is listened to. | |
137 * | |
138 * This overrides the previous callback, or clears it if the [onListenHandler] | |
139 * is `null`. | |
140 */ | |
141 void set onListen(void onListenHandler()); | |
nweiz
2015/07/17 20:40:16
It's kind of weird that these are setters, whereas
Lasse Reichstein Nielsen
2015/08/05 09:08:45
It is annoying that we didn't go for setters in St
nweiz
2015/08/06 01:02:13
I talked to Bob about this, and he pointed out tha
Lasse Reichstein Nielsen
2015/08/06 07:00:41
Well, it's quite symmetric - you have read-only fi
floitsch
2015/08/06 09:28:12
In this case I wouldn't onListen to setOnListen. I
floitsch
2015/08/06 09:28:12
I actually disagree. Sometimes a "setX" method is
Lasse Reichstein Nielsen
2015/08/06 14:09:40
I think we did screw up that particular type of na
nweiz
2015/08/06 19:56:12
I'm fine with these being setters as long as there
| |
142 | |
143 /** | |
144 * Sets the callback which is called when the stream is paused. | |
145 * | |
146 * This overrides the previous callback, or clears it if the [onPauseHandler] | |
147 * is `null`. | |
148 * | |
149 * Pause related callbacks are not supported on broadcast stream controllers. | |
150 */ | |
151 void set onPause(void onPauseHandler()); | |
152 | |
153 /** | |
154 * Sets the callback which is called when the stream is resumed. | |
155 * | |
156 * This overrides the previous callback, or clears it if the [onResumeHandler] | |
157 * is `null`. | |
158 * | |
159 * Pause related callbacks are not supported on broadcast stream controllers. | |
160 */ | |
161 void set onResume(void onResumeHandler()); | |
162 | |
163 /** | |
164 * Sets the callback which is called when the stream is canceled. | |
165 * | |
166 * This overrides the previous callback, or clears it if the [onCancelHandler] | |
167 * is `null`. | |
168 */ | |
169 void set onCancel(onCancelHandler()); | |
170 | |
171 /** | |
142 * Returns a view of this object that only exposes the [StreamSink] interface. | 172 * Returns a view of this object that only exposes the [StreamSink] interface. |
143 */ | 173 */ |
144 StreamSink<T> get sink; | 174 StreamSink<T> get sink; |
145 | 175 |
146 /** | 176 /** |
147 * Whether the stream controller is closed for adding more events. | 177 * Whether the stream controller is closed for adding more events. |
148 * | 178 * |
149 * The controller becomes closed by calling the [close] method. | 179 * The controller becomes closed by calling the [close] method. |
150 * New events cannot be added, by calling [add] or [addError], | 180 * New events cannot be added, by calling [add] or [addError], |
151 * to a closed controller. | 181 * to a closed controller. |
(...skipping 224 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
376 /** | 406 /** |
377 * Future completed when the stream sends its last event. | 407 * Future completed when the stream sends its last event. |
378 * | 408 * |
379 * This is also the future returned by [close]. | 409 * This is also the future returned by [close]. |
380 */ | 410 */ |
381 // TODO(lrn): Could this be stored in the varData field too, if it's not | 411 // TODO(lrn): Could this be stored in the varData field too, if it's not |
382 // accessed until the call to "close"? Then we need to special case if it's | 412 // accessed until the call to "close"? Then we need to special case if it's |
383 // accessed earlier, or if close is called before subscribing. | 413 // accessed earlier, or if close is called before subscribing. |
384 _Future _doneFuture; | 414 _Future _doneFuture; |
385 | 415 |
386 _StreamController(); | 416 _NotificationHandler _onListen; |
417 _NotificationHandler _onPause; | |
418 _NotificationHandler _onResume; | |
419 _NotificationHandler _onCancel; | |
387 | 420 |
388 _NotificationHandler get _onListen; | 421 _StreamController(void this._onListen(), |
389 _NotificationHandler get _onPause; | 422 void this._onPause(), |
390 _NotificationHandler get _onResume; | 423 void this._onResume(), |
391 _NotificationHandler get _onCancel; | 424 this._onCancel()); |
425 | |
426 void set onListen(void onListenHandler()) { _onListen = onListenHandler; } | |
427 | |
428 void set onPause(void onPauseHandler()) { _onPause = onPauseHandler; } | |
429 | |
430 void set onResume(void onResumeHandler()) { _onResume = onResumeHandler; } | |
431 | |
432 void set onCancel(onCancelHandler()) { _onCancel = onCancelHandler; } | |
392 | 433 |
393 // Return a new stream every time. The streams are equal, but not identical. | 434 // Return a new stream every time. The streams are equal, but not identical. |
394 Stream<T> get stream => new _ControllerStream<T>(this); | 435 Stream<T> get stream => new _ControllerStream<T>(this); |
395 | 436 |
396 /** | 437 /** |
397 * Returns a view of this object that only exposes the [StreamSink] interface. | 438 * Returns a view of this object that only exposes the [StreamSink] interface. |
398 */ | 439 */ |
399 StreamSink<T> get sink => new _StreamSinkWrapper<T>(this); | 440 StreamSink<T> get sink => new _StreamSinkWrapper<T>(this); |
400 | 441 |
401 /** | 442 /** |
(...skipping 313 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
715 } | 756 } |
716 | 757 |
717 void _sendDone() { | 758 void _sendDone() { |
718 _subscription._addPending(const _DelayedDone()); | 759 _subscription._addPending(const _DelayedDone()); |
719 } | 760 } |
720 } | 761 } |
721 | 762 |
722 // TODO(lrn): Use common superclass for callback-controllers when VM supports | 763 // TODO(lrn): Use common superclass for callback-controllers when VM supports |
723 // constructors in mixin superclasses. | 764 // constructors in mixin superclasses. |
724 | 765 |
725 class _AsyncStreamController<T> extends _StreamController<T> | 766 class _AsyncStreamController<T> = _StreamController<T> |
726 with _AsyncStreamControllerDispatch<T> { | 767 with _AsyncStreamControllerDispatch<T>; |
727 final _NotificationHandler _onListen; | |
728 final _NotificationHandler _onPause; | |
729 final _NotificationHandler _onResume; | |
730 final _NotificationHandler _onCancel; | |
731 | 768 |
732 _AsyncStreamController(void this._onListen(), | 769 class _SyncStreamController<T> = _StreamController<T> |
733 void this._onPause(), | 770 with _SyncStreamControllerDispatch<T>; |
734 void this._onResume(), | |
735 this._onCancel()); | |
736 } | |
737 | |
738 class _SyncStreamController<T> extends _StreamController<T> | |
739 with _SyncStreamControllerDispatch<T> { | |
740 final _NotificationHandler _onListen; | |
741 final _NotificationHandler _onPause; | |
742 final _NotificationHandler _onResume; | |
743 final _NotificationHandler _onCancel; | |
744 | |
745 _SyncStreamController(void this._onListen(), | |
746 void this._onPause(), | |
747 void this._onResume(), | |
748 this._onCancel()); | |
749 } | |
750 | |
751 abstract class _NoCallbacks { | |
752 _NotificationHandler get _onListen => null; | |
753 _NotificationHandler get _onPause => null; | |
754 _NotificationHandler get _onResume => null; | |
755 _NotificationHandler get _onCancel => null; | |
756 } | |
757 | |
758 class _NoCallbackAsyncStreamController<T> = _StreamController<T> | |
759 with _AsyncStreamControllerDispatch<T>, _NoCallbacks; | |
760 | |
761 class _NoCallbackSyncStreamController<T> = _StreamController<T> | |
762 with _SyncStreamControllerDispatch<T>, _NoCallbacks; | |
763 | 771 |
764 typedef _NotificationHandler(); | 772 typedef _NotificationHandler(); |
765 | 773 |
766 Future _runGuarded(_NotificationHandler notificationHandler) { | 774 Future _runGuarded(_NotificationHandler notificationHandler) { |
767 if (notificationHandler == null) return null; | 775 if (notificationHandler == null) return null; |
768 try { | 776 try { |
769 var result = notificationHandler(); | 777 var result = notificationHandler(); |
770 if (result is Future) return result; | 778 if (result is Future) return result; |
771 return null; | 779 return null; |
772 } catch (e, s) { | 780 } catch (e, s) { |
(...skipping 126 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
899 _StreamControllerAddStreamState(_StreamController controller, | 907 _StreamControllerAddStreamState(_StreamController controller, |
900 this.varData, | 908 this.varData, |
901 Stream source, | 909 Stream source, |
902 bool cancelOnError) | 910 bool cancelOnError) |
903 : super(controller, source, cancelOnError) { | 911 : super(controller, source, cancelOnError) { |
904 if (controller.isPaused) { | 912 if (controller.isPaused) { |
905 addSubscription.pause(); | 913 addSubscription.pause(); |
906 } | 914 } |
907 } | 915 } |
908 } | 916 } |
OLD | NEW |