| OLD | NEW |
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
| 8 // Controller for creating and adding events to a stream. | 8 // Controller for creating and adding events to a stream. |
| 9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
| 10 | 10 |
| 11 /** | 11 /** |
| 12 * Type of a stream controller's `onListen`, `onPause` and `onResume` callbacks. |
| 13 */ |
| 14 typedef void ControllerCallback(); |
| 15 |
| 16 /** |
| 17 * Type of stream controller `onCancel` callbacks. |
| 18 * |
| 19 * The callback may return either `void` or a future. |
| 20 */ |
| 21 typedef ControllerCancelCallback(); |
| 22 |
| 23 /** |
| 12 * A controller with the stream it controls. | 24 * A controller with the stream it controls. |
| 13 * | 25 * |
| 14 * This controller allows sending data, error and done events on | 26 * This controller allows sending data, error and done events on |
| 15 * its [stream]. | 27 * its [stream]. |
| 16 * This class can be used to create a simple stream that others | 28 * This class can be used to create a simple stream that others |
| 17 * can listen on, and to push events to that stream. | 29 * can listen on, and to push events to that stream. |
| 18 * | 30 * |
| 19 * It's possible to check whether the stream is paused or not, and whether | 31 * It's possible to check whether the stream is paused or not, and whether |
| 20 * it has subscribers or not, as well as getting a callback when either of | 32 * it has subscribers or not, as well as getting a callback when either of |
| 21 * these change. | 33 * these change. |
| (...skipping 104 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 126 */ | 138 */ |
| 127 factory StreamController.broadcast({void onListen(), | 139 factory StreamController.broadcast({void onListen(), |
| 128 void onCancel(), | 140 void onCancel(), |
| 129 bool sync: false}) { | 141 bool sync: false}) { |
| 130 return sync | 142 return sync |
| 131 ? new _SyncBroadcastStreamController<T>(onListen, onCancel) | 143 ? new _SyncBroadcastStreamController<T>(onListen, onCancel) |
| 132 : new _AsyncBroadcastStreamController<T>(onListen, onCancel); | 144 : new _AsyncBroadcastStreamController<T>(onListen, onCancel); |
| 133 } | 145 } |
| 134 | 146 |
| 135 /** | 147 /** |
| 136 * Sets the callback which is called when the stream is listened to. | 148 * The callback which is called when the stream is listened to. |
| 137 * | 149 * |
| 138 * This overrides the previous callback, or clears it if the [onListenHandler] | 150 * May be set to `null`, in which case no callback will happen. |
| 139 * is `null`. | |
| 140 */ | 151 */ |
| 152 ControllerCallback get onListen; |
| 153 |
| 141 void set onListen(void onListenHandler()); | 154 void set onListen(void onListenHandler()); |
| 142 | 155 |
| 143 /** | 156 /** |
| 144 * Sets the callback which is called when the stream is paused. | 157 * The callback which is called when the stream is paused. |
| 145 * | 158 * |
| 146 * This overrides the previous callback, or clears it if the [onPauseHandler] | 159 * May be set to `null`, in which case no callback will happen. |
| 147 * is `null`. | |
| 148 * | 160 * |
| 149 * Pause related callbacks are not supported on broadcast stream controllers. | 161 * Pause related callbacks are not supported on broadcast stream controllers. |
| 150 */ | 162 */ |
| 163 ControllerCallback get onPause; |
| 164 |
| 151 void set onPause(void onPauseHandler()); | 165 void set onPause(void onPauseHandler()); |
| 152 | 166 |
| 153 /** | 167 /** |
| 154 * Sets the callback which is called when the stream is resumed. | 168 * The callback which is called when the stream is resumed. |
| 155 * | 169 * |
| 156 * This overrides the previous callback, or clears it if the [onResumeHandler] | 170 * May be set to `null`, in which case no callback will happen. |
| 157 * is `null`. | |
| 158 * | 171 * |
| 159 * Pause related callbacks are not supported on broadcast stream controllers. | 172 * Pause related callbacks are not supported on broadcast stream controllers. |
| 160 */ | 173 */ |
| 174 ControllerCallback get onResume; |
| 175 |
| 161 void set onResume(void onResumeHandler()); | 176 void set onResume(void onResumeHandler()); |
| 162 | 177 |
| 163 /** | 178 /** |
| 164 * Sets the callback which is called when the stream is canceled. | 179 * The callback which is called when the stream is canceled. |
| 165 * | 180 * |
| 166 * This overrides the previous callback, or clears it if the [onCancelHandler] | 181 * May be set to `null`, in which case no callback will happen. |
| 167 * is `null`. | |
| 168 */ | 182 */ |
| 183 ControllerCancelCallback get onCancel; |
| 184 |
| 169 void set onCancel(onCancelHandler()); | 185 void set onCancel(onCancelHandler()); |
| 170 | 186 |
| 171 /** | 187 /** |
| 172 * Returns a view of this object that only exposes the [StreamSink] interface. | 188 * Returns a view of this object that only exposes the [StreamSink] interface. |
| 173 */ | 189 */ |
| 174 StreamSink<T> get sink; | 190 StreamSink<T> get sink; |
| 175 | 191 |
| 176 /** | 192 /** |
| 177 * Whether the stream controller is closed for adding more events. | 193 * Whether the stream controller is closed for adding more events. |
| 178 * | 194 * |
| (...skipping 227 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 406 /** | 422 /** |
| 407 * Future completed when the stream sends its last event. | 423 * Future completed when the stream sends its last event. |
| 408 * | 424 * |
| 409 * This is also the future returned by [close]. | 425 * This is also the future returned by [close]. |
| 410 */ | 426 */ |
| 411 // TODO(lrn): Could this be stored in the varData field too, if it's not | 427 // TODO(lrn): Could this be stored in the varData field too, if it's not |
| 412 // accessed until the call to "close"? Then we need to special case if it's | 428 // accessed until the call to "close"? Then we need to special case if it's |
| 413 // accessed earlier, or if close is called before subscribing. | 429 // accessed earlier, or if close is called before subscribing. |
| 414 _Future _doneFuture; | 430 _Future _doneFuture; |
| 415 | 431 |
| 416 _NotificationHandler _onListen; | 432 ControllerCallback onListen; |
| 417 _NotificationHandler _onPause; | 433 ControllerCallback onPause; |
| 418 _NotificationHandler _onResume; | 434 ControllerCallback onResume; |
| 419 _NotificationHandler _onCancel; | 435 ControllerCancelCallback onCancel; |
| 420 | 436 |
| 421 _StreamController(void this._onListen(), | 437 _StreamController(this.onListen, |
| 422 void this._onPause(), | 438 this.onPause, |
| 423 void this._onResume(), | 439 this.onResume, |
| 424 this._onCancel()); | 440 this.onCancel); |
| 425 | |
| 426 void set onListen(void onListenHandler()) { _onListen = onListenHandler; } | |
| 427 | |
| 428 void set onPause(void onPauseHandler()) { _onPause = onPauseHandler; } | |
| 429 | |
| 430 void set onResume(void onResumeHandler()) { _onResume = onResumeHandler; } | |
| 431 | |
| 432 void set onCancel(onCancelHandler()) { _onCancel = onCancelHandler; } | |
| 433 | 441 |
| 434 // Return a new stream every time. The streams are equal, but not identical. | 442 // Return a new stream every time. The streams are equal, but not identical. |
| 435 Stream<T> get stream => new _ControllerStream<T>(this); | 443 Stream<T> get stream => new _ControllerStream<T>(this); |
| 436 | 444 |
| 437 /** | 445 /** |
| 438 * Returns a view of this object that only exposes the [StreamSink] interface. | 446 * Returns a view of this object that only exposes the [StreamSink] interface. |
| 439 */ | 447 */ |
| 440 StreamSink<T> get sink => new _StreamSinkWrapper<T>(this); | 448 StreamSink<T> get sink => new _StreamSinkWrapper<T>(this); |
| 441 | 449 |
| 442 /** | 450 /** |
| (...skipping 203 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 646 _state |= _STATE_SUBSCRIBED; | 654 _state |= _STATE_SUBSCRIBED; |
| 647 if (_isAddingStream) { | 655 if (_isAddingStream) { |
| 648 _StreamControllerAddStreamState addState = _varData; | 656 _StreamControllerAddStreamState addState = _varData; |
| 649 addState.varData = subscription; | 657 addState.varData = subscription; |
| 650 addState.resume(); | 658 addState.resume(); |
| 651 } else { | 659 } else { |
| 652 _varData = subscription; | 660 _varData = subscription; |
| 653 } | 661 } |
| 654 subscription._setPendingEvents(pendingEvents); | 662 subscription._setPendingEvents(pendingEvents); |
| 655 subscription._guardCallback(() { | 663 subscription._guardCallback(() { |
| 656 _runGuarded(_onListen); | 664 _runGuarded(onListen); |
| 657 }); | 665 }); |
| 658 | 666 |
| 659 return subscription; | 667 return subscription; |
| 660 } | 668 } |
| 661 | 669 |
| 662 Future _recordCancel(StreamSubscription<T> subscription) { | 670 Future _recordCancel(StreamSubscription<T> subscription) { |
| 663 // When we cancel, we first cancel any stream being added, | 671 // When we cancel, we first cancel any stream being added, |
| 664 // Then we call _onCancel, and finally the _doneFuture is completed. | 672 // Then we call `onCancel`, and finally the _doneFuture is completed. |
| 665 // If either of addStream's cancel or _onCancel returns a future, | 673 // If either of addStream's cancel or `onCancel` returns a future, |
| 666 // we wait for it before continuing. | 674 // we wait for it before continuing. |
| 667 // Any error during this process ends up in the returned future. | 675 // Any error during this process ends up in the returned future. |
| 668 // If more errors happen, we act as if it happens inside nested try/finallys | 676 // If more errors happen, we act as if it happens inside nested try/finallys |
| 669 // or whenComplete calls, and only the last error ends up in the | 677 // or whenComplete calls, and only the last error ends up in the |
| 670 // returned future. | 678 // returned future. |
| 671 Future result; | 679 Future result; |
| 672 if (_isAddingStream) { | 680 if (_isAddingStream) { |
| 673 _StreamControllerAddStreamState addState = _varData; | 681 _StreamControllerAddStreamState addState = _varData; |
| 674 result = addState.cancel(); | 682 result = addState.cancel(); |
| 675 } | 683 } |
| 676 _varData = null; | 684 _varData = null; |
| 677 _state = | 685 _state = |
| 678 (_state & ~(_STATE_SUBSCRIBED | _STATE_ADDSTREAM)) | _STATE_CANCELED; | 686 (_state & ~(_STATE_SUBSCRIBED | _STATE_ADDSTREAM)) | _STATE_CANCELED; |
| 679 | 687 |
| 680 if (_onCancel != null) { | 688 if (onCancel != null) { |
| 681 if (result == null) { | 689 if (result == null) { |
| 682 // Only introduce a future if one is needed. | 690 // Only introduce a future if one is needed. |
| 683 // If _onCancel returns null, no future is needed. | 691 // If _onCancel returns null, no future is needed. |
| 684 try { | 692 try { |
| 685 result = _onCancel(); | 693 result = onCancel(); |
| 686 } catch (e, s) { | 694 } catch (e, s) { |
| 687 // Return the error in the returned future. | 695 // Return the error in the returned future. |
| 688 // Complete it asynchronously, so there is time for a listener | 696 // Complete it asynchronously, so there is time for a listener |
| 689 // to handle the error. | 697 // to handle the error. |
| 690 result = new _Future().._asyncCompleteError(e, s); | 698 result = new _Future().._asyncCompleteError(e, s); |
| 691 } | 699 } |
| 692 } else { | 700 } else { |
| 693 // Simpler case when we already know that we will return a future. | 701 // Simpler case when we already know that we will return a future. |
| 694 result = result.whenComplete(_onCancel); | 702 result = result.whenComplete(onCancel); |
| 695 } | 703 } |
| 696 } | 704 } |
| 697 | 705 |
| 698 void complete() { | 706 void complete() { |
| 699 if (_doneFuture != null && _doneFuture._mayComplete) { | 707 if (_doneFuture != null && _doneFuture._mayComplete) { |
| 700 _doneFuture._asyncComplete(null); | 708 _doneFuture._asyncComplete(null); |
| 701 } | 709 } |
| 702 } | 710 } |
| 703 | 711 |
| 704 if (result != null) { | 712 if (result != null) { |
| 705 result = result.whenComplete(complete); | 713 result = result.whenComplete(complete); |
| 706 } else { | 714 } else { |
| 707 complete(); | 715 complete(); |
| 708 } | 716 } |
| 709 | 717 |
| 710 return result; | 718 return result; |
| 711 } | 719 } |
| 712 | 720 |
| 713 void _recordPause(StreamSubscription<T> subscription) { | 721 void _recordPause(StreamSubscription<T> subscription) { |
| 714 if (_isAddingStream) { | 722 if (_isAddingStream) { |
| 715 _StreamControllerAddStreamState addState = _varData; | 723 _StreamControllerAddStreamState addState = _varData; |
| 716 addState.pause(); | 724 addState.pause(); |
| 717 } | 725 } |
| 718 _runGuarded(_onPause); | 726 _runGuarded(onPause); |
| 719 } | 727 } |
| 720 | 728 |
| 721 void _recordResume(StreamSubscription<T> subscription) { | 729 void _recordResume(StreamSubscription<T> subscription) { |
| 722 if (_isAddingStream) { | 730 if (_isAddingStream) { |
| 723 _StreamControllerAddStreamState addState = _varData; | 731 _StreamControllerAddStreamState addState = _varData; |
| 724 addState.resume(); | 732 addState.resume(); |
| 725 } | 733 } |
| 726 _runGuarded(_onResume); | 734 _runGuarded(onResume); |
| 727 } | 735 } |
| 728 } | 736 } |
| 729 | 737 |
| 730 abstract class _SyncStreamControllerDispatch<T> | 738 abstract class _SyncStreamControllerDispatch<T> |
| 731 implements _StreamController<T>, SynchronousStreamController<T> { | 739 implements _StreamController<T>, SynchronousStreamController<T> { |
| 732 int get _state; | 740 int get _state; |
| 733 void set _state(int state); | 741 void set _state(int state); |
| 734 | 742 |
| 735 void _sendData(T data) { | 743 void _sendData(T data) { |
| 736 _subscription._add(data); | 744 _subscription._add(data); |
| (...skipping 170 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 907 _StreamControllerAddStreamState(_StreamController controller, | 915 _StreamControllerAddStreamState(_StreamController controller, |
| 908 this.varData, | 916 this.varData, |
| 909 Stream source, | 917 Stream source, |
| 910 bool cancelOnError) | 918 bool cancelOnError) |
| 911 : super(controller, source, cancelOnError) { | 919 : super(controller, source, cancelOnError) { |
| 912 if (controller.isPaused) { | 920 if (controller.isPaused) { |
| 913 addSubscription.pause(); | 921 addSubscription.pause(); |
| 914 } | 922 } |
| 915 } | 923 } |
| 916 } | 924 } |
| OLD | NEW |