Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
| 8 // Controller for creating and adding events to a stream. | 8 // Controller for creating and adding events to a stream. |
| 9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
| 10 | 10 |
| (...skipping 29 matching lines...) Expand all Loading... | |
| 40 * a stream action, for example firing an event. If the state changes multiple | 40 * a stream action, for example firing an event. If the state changes multiple |
| 41 * times during the action, and then ends up in the same state as before, no | 41 * times during the action, and then ends up in the same state as before, no |
| 42 * callback is performed. | 42 * callback is performed. |
| 43 * | 43 * |
| 44 * If listeners are added after the stream has completed (sent a "done" event), | 44 * If listeners are added after the stream has completed (sent a "done" event), |
| 45 * the listeners will be sent a "done" event eventually, but they won't affect | 45 * the listeners will be sent a "done" event eventually, but they won't affect |
| 46 * the stream at all, and won't trigger callbacks. From the controller's point | 46 * the stream at all, and won't trigger callbacks. From the controller's point |
| 47 * of view, the stream is completely inert when has completed. | 47 * of view, the stream is completely inert when has completed. |
| 48 */ | 48 */ |
| 49 class StreamController<T> extends EventSink<T> { | 49 class StreamController<T> extends EventSink<T> { |
| 50 final _StreamImpl<T> stream; | 50 static const int _STATE_CANCELLED = 1; |
| 51 static const int _STATE_CLOSED = 2; | |
| 52 | |
| 53 final _NotificationHandler _onListen; | |
| 54 final _NotificationHandler _onPause; | |
| 55 final _NotificationHandler _onResume; | |
| 56 final _NotificationHandler _onCancel; | |
| 57 _StreamImpl<T> _stream; | |
| 58 | |
| 59 // An active subscription on the stream, or null if no subscripton is active. | |
| 60 _ControllerSubscription<T> _subscription; | |
| 61 | |
| 62 // Whether we have sent a "done" event. | |
|
floitsch
2013/05/22 16:26:29
"Wether" is not right.
Also, what does "0" mean?
Lasse Reichstein Nielsen
2013/05/24 06:02:49
I see "Whether"?
floitsch
2013/05/24 13:53:41
I meant that this doesn't look a boolean. "Whether
| |
| 63 int _state = 0; | |
| 64 | |
| 65 // Events added to the stream before it has an active subscription. | |
| 66 _PendingEvents _pendingEvents = null; | |
| 51 | 67 |
| 52 /** | 68 /** |
| 53 * | |
| 54 * If the stream is canceled before the controller needs new data the | 69 * If the stream is canceled before the controller needs new data the |
| 55 * [onResume] call might not be executed. | 70 * [onResume] call might not be executed. |
| 56 : stream = new _MultiControllerStream<T>( | |
| 57 onListen, onPause, onResume, onCancel); | |
|
floitsch
2013/05/22 16:26:29
You will have to merge this. I removed lines 53 to
Lasse Reichstein Nielsen
2013/05/24 06:02:49
ACK.
| |
| 58 * A controller with a [stream] that supports only one single subscriber. | 71 * A controller with a [stream] that supports only one single subscriber. |
| 59 * | 72 * |
| 60 * The controller will buffer all incoming events until the subscriber is | 73 * The controller will buffer all incoming events until the subscriber is |
| 61 * registered. | 74 * registered. |
| 62 * | 75 * |
| 63 * The [onPause] function is called when the stream becomes | 76 * The [onPause] function is called when the stream becomes |
| 64 * paused. [onResume] is called when the stream resumed. | 77 * paused. [onResume] is called when the stream resumed. |
| 65 * | 78 * |
| 66 * The [onListen] callback is called when the stream | 79 * The [onListen] callback is called when the stream |
| 67 * receives its listener. [onCancel] when the listener cancels | 80 * receives its listener. [onCancel] when the listener cancels |
| 68 * its subscription. | 81 * its subscription. |
| 69 * | 82 * |
| 70 * If the stream is canceled before the controller needs new data the | 83 * If the stream is canceled before the controller needs new data the |
| 71 * [onResume] call might not be executed. | 84 * [onResume] call might not be executed. |
| 72 */ | 85 */ |
| 73 StreamController({void onListen(), | 86 StreamController({void onListen(), |
| 74 void onPause(), | 87 void onPause(), |
| 75 void onResume(), | 88 void onResume(), |
| 76 void onCancel()}) | 89 void onCancel()}) |
| 77 : stream = new _SingleControllerStream<T>( | 90 : _onListen = onListen, |
| 78 onListen, onPause, onResume, onCancel); | 91 _onPause = onPause, |
| 92 _onResume = onResume, | |
| 93 _onCancel = onCancel { | |
| 94 _stream = new _ControllerStream<T>(this); | |
| 95 } | |
| 96 | |
| 97 Stream<T> get stream => _stream; | |
| 79 | 98 |
| 80 /** | 99 /** |
| 81 * Returns a view of this object that only exposes the [EventSink] interface. | 100 * Returns a view of this object that only exposes the [EventSink] interface. |
| 82 */ | 101 */ |
| 83 EventSink<T> get sink => new _EventSinkView<T>(this); | 102 EventSink<T> get sink => new _EventSinkView<T>(this); |
| 84 | 103 |
| 85 /** | 104 /** |
| 105 * Whether a listener has existed and been cancelled. | |
| 106 * | |
| 107 * After this, adding more events will be ignored. | |
| 108 */ | |
| 109 bool get _isCancelled => (_state & _STATE_CANCELLED) != 0; | |
| 110 | |
| 111 /** | |
| 86 * Whether the stream is closed for adding more events. | 112 * Whether the stream is closed for adding more events. |
| 87 * | 113 * |
| 88 * If true, the "done" event might not have fired yet, but it has been | 114 * If true, the "done" event might not have fired yet, but it has been |
| 89 * scheduled, and it is too late to add more events. | 115 * scheduled, and it is too late to add more events. |
| 90 */ | 116 */ |
| 91 bool get isClosed => stream._isClosed; | 117 bool get isClosed => (_state & _STATE_CLOSED) != 0; |
| 92 | 118 |
| 93 /** Whether one or more active subscribers have requested a pause. */ | 119 /** Whether the subscription is active and paused. */ |
| 94 bool get isPaused => stream._isInputPaused; | 120 bool get isPaused => _subscription != null && _subscription._isInputPaused; |
| 95 | 121 |
| 96 /** Whether there are currently any subscribers on this [Stream]. */ | 122 /** Whether there are currently any subscribers on the [Stream]. */ |
|
floitsch
2013/05/22 16:26:29
there is currently a subscriber on the [Stream].
Lasse Reichstein Nielsen
2013/05/24 06:02:49
Done.
| |
| 97 bool get hasListener => stream._hasListener; | 123 bool get hasListener => _subscription != null; |
| 98 | 124 |
| 99 /** | 125 /** |
| 100 * Send or queue a data event. | 126 * Send or queue a data event. |
| 101 */ | 127 */ |
| 102 void add(T value) => stream._add(value); | 128 void add(T value) { |
| 129 if (isClosed) throw new StateError("Adding event after close"); | |
| 130 if (_subscription != null) { | |
| 131 _subscription._add(value); | |
| 132 } else if (!_isCancelled) { | |
| 133 _addPendingEvent(new _DelayedData<T>(value)); | |
| 134 } | |
| 135 } | |
| 103 | 136 |
| 104 /** | 137 /** |
| 105 * Send or enqueue an error event. | 138 * Send or enqueue an error event. |
| 106 * | 139 * |
| 107 * If a subscription has requested to be unsubscribed on errors, | 140 * If the subscription has requested to be unsubscribed on errors, |
|
floitsch
2013/05/22 16:26:29
Should we keep this here? It's not really a Contro
Lasse Reichstein Nielsen
2013/05/24 06:02:49
I'm fine with removing it.
| |
| 108 * it will be unsubscribed after receiving this event. | 141 * it will be unsubscribed after receiving this event. |
| 109 */ | 142 */ |
| 110 void addError(Object error, [Object stackTrace]) { | 143 void addError(Object error, [Object stackTrace]) { |
| 144 if (isClosed) throw new StateError("Adding event after close"); | |
| 111 if (stackTrace != null) { | 145 if (stackTrace != null) { |
| 112 // Force stack trace overwrite. Even if the error already contained | 146 // Force stack trace overwrite. Even if the error already contained |
| 113 // a stack trace. | 147 // a stack trace. |
| 114 _attachStackTrace(error, stackTrace); | 148 _attachStackTrace(error, stackTrace); |
| 115 } | 149 } |
| 116 stream._addError(error); | 150 if (_subscription != null) { |
| 151 _subscription._addError(error); | |
| 152 } else if (!_isCancelled) { | |
| 153 _addPendingEvent(new _DelayedError(error)); | |
| 154 } | |
| 117 } | 155 } |
| 118 | 156 |
| 119 /** | 157 /** |
| 120 * Send or enqueue a "done" message. | 158 * Send or enqueue a "done" message. |
| 121 * | 159 * |
| 122 * The "done" message should be sent at most once by a stream, and it | 160 * The "done" message should be sent at most once by a stream, and it |
|
floitsch
2013/05/22 16:26:29
Too many "should"s.
Maybe:
Closes this controller.
Lasse Reichstein Nielsen
2013/05/24 06:02:49
sounds fine.
| |
| 123 * should be the last message sent. | 161 * should be the last message sent. |
| 124 */ | 162 */ |
| 125 void close() { stream._close(); } | 163 void close() { |
| 164 if (isClosed) return; | |
| 165 _state |= _STATE_CLOSED; | |
| 166 if (_subscription != null) { | |
| 167 _subscription._close(); | |
| 168 } else if (!_isCancelled) { | |
| 169 _addPendingEvent(const _DelayedDone()); | |
| 170 } | |
| 171 } | |
| 172 | |
| 173 void _addPendingEvent(_DelayedEvent event) { | |
| 174 if (_isCancelled) return; | |
| 175 _StreamImplEvents events = _pendingEvents; | |
| 176 if (events == null) { | |
| 177 events = _pendingEvents = new _StreamImplEvents(); | |
| 178 } | |
| 179 events.add(event); | |
| 180 } | |
| 181 | |
| 182 void _recordListen(_BufferingStreamSubscription subscription) { | |
| 183 assert(_subscription == null); | |
| 184 _subscription = subscription; | |
| 185 _pendingEvents = null; // These have been taken over by the stream. | |
|
floitsch
2013/05/22 16:26:29
by the subscription.
I would prefer if we transfe
Lasse Reichstein Nielsen
2013/05/24 06:02:49
ok, will change to not pass them in the constructo
| |
| 186 _subscription._guardCallback(() { | |
| 187 _runGuarded(_onListen); | |
| 188 }); | |
| 189 } | |
| 190 | |
| 191 void _recordCancel() { | |
| 192 _subscription = null; | |
| 193 _state |= _STATE_CANCELLED; | |
| 194 _runGuarded(_onCancel); | |
| 195 } | |
| 196 | |
| 197 void _recordPause() { | |
| 198 _runGuarded(_onPause); | |
| 199 } | |
| 200 | |
| 201 void _recordResume() { | |
| 202 _runGuarded(_onResume); | |
| 203 } | |
| 126 } | 204 } |
| 127 | 205 |
| 128 typedef void _NotificationHandler(); | 206 typedef void _NotificationHandler(); |
| 129 | 207 |
| 130 class _SingleControllerStream<T> extends _SingleStreamImpl<T> { | 208 void _runGuarded(_NotificationHandler notificationHandler) { |
| 131 _NotificationHandler _onListen; | 209 if (notificationHandler == null) return; |
| 132 _NotificationHandler _onPause; | 210 try { |
| 133 _NotificationHandler _onResume; | 211 notificationHandler(); |
| 134 _NotificationHandler _onCancel; | 212 } catch (e, s) { |
| 213 _throwDelayed(e, s); | |
| 214 } | |
| 215 } | |
| 135 | 216 |
| 136 // TODO(floitsch): share this code with _MultiControllerStream. | 217 class _ControllerStream<T> extends _StreamImpl<T> { |
| 137 _runGuarded(_NotificationHandler notificationHandler) { | 218 StreamController _controller; |
| 138 if (notificationHandler == null) return; | 219 bool _hasListener = false; |
| 139 try { | 220 |
| 140 notificationHandler(); | 221 _ControllerStream(this._controller); |
| 141 } catch (e, s) { | 222 |
| 142 _throwDelayed(e, s); | 223 StreamSubscription<T> _createSubscription( |
| 224 void onData(T data), | |
| 225 void onError(Object error), | |
| 226 void onDone(), | |
| 227 bool cancelOnError) { | |
| 228 if (_hasListener) { | |
| 229 try { | |
| 230 throw 0; | |
| 231 } catch (e, s) { | |
| 232 print("LISTEN TWICE(#$hashCode)\n$s"); | |
| 233 } | |
| 234 throw new StateError("The stream has already been listened to."); | |
| 143 } | 235 } |
| 236 //try { throw 0; } catch (e, s) { print("LISTEN ONCE(#$hashCode):\n$s"); } | |
| 237 _hasListener = true; | |
| 238 return new _ControllerSubscription<T>( | |
| 239 _controller, onData, onError, onDone, cancelOnError); | |
| 144 } | 240 } |
| 145 | 241 |
| 146 _SingleControllerStream(this._onListen, | 242 void _onListen(_BufferingStreamSubscription subscription) { |
| 147 this._onPause, | 243 _controller._recordListen(subscription); |
| 148 this._onResume, | 244 } |
| 149 this._onCancel); | 245 } |
| 150 | 246 |
| 151 void _onSubscriptionStateChange() { | 247 class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> { |
| 152 _runGuarded(_hasListener ? _onListen : _onCancel); | 248 final StreamController _controller; |
| 249 | |
| 250 _ControllerSubscription(StreamController controller, | |
| 251 void onData(T data), | |
| 252 void onError(Object error), | |
| 253 void onDone(), | |
| 254 bool cancelOnError) | |
| 255 : _controller = controller, | |
| 256 super(onData, onError, onDone, cancelOnError, | |
| 257 controller._pendingEvents); | |
| 258 | |
| 259 void _onCancel() { | |
| 260 _controller._recordCancel(); | |
| 153 } | 261 } |
| 154 | 262 |
| 155 void _onPauseStateChange() { | 263 void _onPause() { |
| 156 _runGuarded(_isPaused ? _onPause : _onResume); | 264 _controller._recordPause(); |
| 265 } | |
| 266 | |
| 267 void _onResume() { | |
| 268 _controller._recordResume(); | |
| 157 } | 269 } |
| 158 } | 270 } |
| OLD | NEW |