OLD | NEW |
(Empty) | |
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. |
| 4 |
| 5 // part of dart.async; |
| 6 |
| 7 // ------------------------------------------------------------------- |
| 8 // Default implementation of a stream with a controller for adding |
| 9 // events to the stream. |
| 10 // ------------------------------------------------------------------- |
| 11 |
| 12 /** |
| 13 * A controller and the stream it controls. |
| 14 * |
| 15 * This controller allows sending data, error and done events on |
| 16 * its [stream]. |
| 17 * This class can be used to create a simple stream that others |
| 18 * can listen on, and to push events to that stream. |
| 19 * For more specialized streams, the [createStream] method can be |
| 20 * overridden to return a specialization of [ControllerStream], and |
| 21 * other public methods can be overridden too (but it's recommended |
| 22 * that the overriding method calls its super method). |
| 23 * |
| 24 * A [StreamController] may have zero or more subscribers. |
| 25 * |
| 26 * If it has subscribers, it may also be paused by any number of its |
| 27 * subscribers. When paused, all incoming events are queued. It is the |
| 28 * responsibility of the user of this stream to prevent incoming events when |
| 29 * the controller is paused. When there are no pausing subscriptions left, |
| 30 * either due to them resuming, or due to the pausing subscriptions |
| 31 * unsubscribing, events are resumed. |
| 32 * |
| 33 * When "close" is invoked (but not necessarily when the done event is fired, |
| 34 * depending on pause state) the stream controller is closed. |
| 35 * When the done event is fired to a subscriber, the subscriber is automatically |
| 36 * unsubscribed. |
| 37 */ |
| 38 class StreamController<T> extends Stream<T> implements StreamSink<T> { |
| 39 _StreamImpl<T> _stream; |
| 40 Stream<T> get stream => _stream; |
| 41 |
| 42 /** |
| 43 * A controller with a [stream] that supports multiple subscribers. |
| 44 */ |
| 45 StreamController() { |
| 46 _stream = new _MultiControllerStream<T>(onSubscriptionStateChange, |
| 47 onPauseStateChange); |
| 48 } |
| 49 /** |
| 50 * A controller with a [stream] that supports only one single subscriber. |
| 51 * The controller will buffer all incoming events until the subscriber is |
| 52 * registered. |
| 53 */ |
| 54 StreamController.singleSubscription() { |
| 55 _stream = new _SingleControllerStream<T>(onSubscriptionStateChange, |
| 56 onPauseStateChange); |
| 57 } |
| 58 |
| 59 StreamSubscription listen(void onData(T data), |
| 60 { void onError(AsyncError error), |
| 61 void onDone(), |
| 62 bool unsubscribeOnError}) { |
| 63 return _stream.listen(onData, |
| 64 onError: onError, |
| 65 onDone: onDone, |
| 66 unsubscribeOnError: unsubscribeOnError); |
| 67 } |
| 68 |
| 69 /** |
| 70 * Returns a view of this object that only exposes the [StreamSink] interface. |
| 71 */ |
| 72 StreamSink<T> get sink => new StreamSinkView<T>(this); |
| 73 |
| 74 /** Whether one or more active subscribers have requested a pause. */ |
| 75 bool get isPaused => _stream._isPaused; |
| 76 |
| 77 /** Whether there are currently any subscribers on this [Stream]. */ |
| 78 bool get hasSubscribers => _stream._hasSubscribers; |
| 79 |
| 80 /** |
| 81 * Send or queue a data event. |
| 82 */ |
| 83 Signal add(T value) => _stream._add(value); |
| 84 |
| 85 /** |
| 86 * Send or enqueue an error event. |
| 87 * |
| 88 * If a subscription has requested to be unsubscribed on errors, |
| 89 * it will be unsubscribed after receiving this event. |
| 90 */ |
| 91 void signalError(AsyncError error) { _stream._signalError(error); } |
| 92 |
| 93 /** |
| 94 * Send or enqueue a "done" message. |
| 95 * |
| 96 * The "done" message should be sent at most once by a stream, and it |
| 97 * should be the last message sent. |
| 98 */ |
| 99 void close() { _stream._close(); } |
| 100 |
| 101 /** |
| 102 * Called when the first subscriber requests a pause or the last a resume. |
| 103 * |
| 104 * Read [isPaused] to see the new state. |
| 105 */ |
| 106 void onPauseStateChange() {} |
| 107 |
| 108 /** |
| 109 * Called when the first listener subscribes or the last unsubscribes. |
| 110 * |
| 111 * Read [hasSubscribers] to see what the new state is. |
| 112 */ |
| 113 void onSubscriptionStateChange() {} |
| 114 |
| 115 void forEachSubscriber(void action(_StreamSubscriptionImpl<T> subscription)) { |
| 116 _stream._forEachSubscriber(() { |
| 117 try { |
| 118 action(); |
| 119 } catch (e, s) { |
| 120 new AsyncError(e, s).throwDelayed(); |
| 121 } |
| 122 }); |
| 123 } |
| 124 } |
| 125 |
| 126 typedef void _NotificationHandler(); |
| 127 |
| 128 class _MultiControllerStream<T> extends _MultiStreamImpl<T> { |
| 129 _NotificationHandler _subscriptionHandler; |
| 130 _NotificationHandler _pauseHandler; |
| 131 |
| 132 _MultiControllerStream(this._subscriptionHandler, this._pauseHandler); |
| 133 |
| 134 void _onSubscriptionStateChange() { |
| 135 _subscriptionHandler(); |
| 136 } |
| 137 |
| 138 void _onPauseStateChange() { |
| 139 _pauseHandler(); |
| 140 } |
| 141 } |
| 142 |
| 143 class _SingleControllerStream<T> extends _SingleStreamImpl<T> { |
| 144 _NotificationHandler _subscriptionHandler; |
| 145 _NotificationHandler _pauseHandler; |
| 146 |
| 147 _SingleControllerStream(this._subscriptionHandler, this._pauseHandler); |
| 148 |
| 149 void _onSubscriptionStateChange() { |
| 150 _subscriptionHandler(); |
| 151 } |
| 152 |
| 153 void _onPauseStateChange() { |
| 154 _pauseHandler(); |
| 155 } |
| 156 } |
OLD | NEW |