OLD | NEW |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
8 // Controller for creating and adding events to a stream. | 8 // Controller for creating and adding events to a stream. |
9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
10 | 10 |
(...skipping 28 matching lines...) Expand all Loading... |
39 * Whether to invoke a callback depends only on the state before and after | 39 * Whether to invoke a callback depends only on the state before and after |
40 * a stream action, for example firing an event. If the state changes multiple | 40 * a stream action, for example firing an event. If the state changes multiple |
41 * times during the action, and then ends up in the same state as before, no | 41 * times during the action, and then ends up in the same state as before, no |
42 * callback is performed. | 42 * callback is performed. |
43 * | 43 * |
44 * If listeners are added after the stream has completed (sent a "done" event), | 44 * If listeners are added after the stream has completed (sent a "done" event), |
45 * the listeners will be sent a "done" event eventually, but they won't affect | 45 * the listeners will be sent a "done" event eventually, but they won't affect |
46 * the stream at all, and won't trigger callbacks. From the controller's point | 46 * the stream at all, and won't trigger callbacks. From the controller's point |
47 * of view, the stream is completely inert when has completed. | 47 * of view, the stream is completely inert when has completed. |
48 */ | 48 */ |
49 class StreamController<T> extends EventSink<T> { | 49 abstract class StreamController<T> implements EventSink<T> { |
50 final _StreamImpl<T> stream; | 50 /** The stream that this controller is controlling. */ |
| 51 Stream<T> get stream; |
51 | 52 |
52 /** | 53 /** |
53 * A controller with a [stream] that supports only one single subscriber. | 54 * A controller with a [stream] that supports only one single subscriber. |
54 * | 55 * |
55 * The controller will buffer all incoming events until the subscriber is | 56 * The controller will buffer all incoming events until the subscriber is |
56 * registered. | 57 * registered. |
57 * | 58 * |
58 * The [onPause] function is called when the stream becomes | 59 * The [onPause] function is called when the stream becomes |
59 * paused. [onResume] is called when the stream resumed. | 60 * paused. [onResume] is called when the stream resumed. |
60 * | 61 * |
61 * The [onListen] callback is called when the stream | 62 * The [onListen] callback is called when the stream |
62 * receives its listener. [onCancel] when the listener cancels | 63 * receives its listener and [onCancel] when the listener ends |
63 * its subscription. | 64 * its subscription. |
64 * | 65 * |
65 * If the stream is canceled before the controller needs new data the | 66 * If the stream is canceled before the controller needs new data the |
66 * [onResume] call might not be executed. | 67 * [onResume] call might not be executed. |
67 */ | 68 */ |
68 StreamController({void onListen(), | 69 factory StreamController({void onListen(), |
69 void onPause(), | 70 void onPause(), |
70 void onResume(), | 71 void onResume(), |
71 void onCancel()}) | 72 void onCancel()}) |
72 : stream = new _SingleControllerStream<T>( | 73 => new _StreamControllerImpl<T>(onListen, onPause, onResume, onCancel); |
73 onListen, onPause, onResume, onCancel); | 74 |
| 75 /** |
| 76 * A controller where [stream] creates new stream each time it is read. |
| 77 * |
| 78 * The controller distributes any events to all currently subscribed streams. |
| 79 * |
| 80 * The [onListen] callback is called when the first listener is subscribed, |
| 81 * and the [onCancel] is called when there is no longer any active listeners. |
| 82 * If a listener is added again later, after the [onCancel] was called, |
| 83 * the [onListen] will be called again. |
| 84 */ |
| 85 factory StreamController.multiplex({void onListen(), void onCancel()}) { |
| 86 return new _MultiplexStreamController<T>(onListen, onCancel); |
| 87 } |
74 | 88 |
75 /** | 89 /** |
76 * Returns a view of this object that only exposes the [EventSink] interface. | 90 * Returns a view of this object that only exposes the [EventSink] interface. |
77 */ | 91 */ |
78 EventSink<T> get sink => new _EventSinkView<T>(this); | 92 EventSink<T> get sink; |
79 | 93 |
80 /** | 94 /** |
81 * Whether the stream is closed for adding more events. | 95 * Whether the stream is closed for adding more events. |
82 * | 96 * |
83 * If true, the "done" event might not have fired yet, but it has been | 97 * If true, the "done" event might not have fired yet, but it has been |
84 * scheduled, and it is too late to add more events. | 98 * scheduled, and it is too late to add more events. |
85 */ | 99 */ |
86 bool get isClosed => stream._isClosed; | 100 bool get isClosed; |
87 | 101 |
88 /** Whether one or more active subscribers have requested a pause. */ | 102 /** Whether the subscription is active and paused. */ |
89 bool get isPaused => stream._isInputPaused; | 103 bool get isPaused; |
90 | 104 |
91 /** Whether there are currently any subscribers on this [Stream]. */ | 105 /** Whether there is a subscriber on the [Stream]. */ |
92 bool get hasListener => stream._hasListener; | 106 bool get hasListener; |
| 107 |
| 108 /** |
| 109 * Send or enqueue an error event. |
| 110 * |
| 111 * Also allows an objection stack trace object, on top of what [EventSink] |
| 112 * allows. |
| 113 */ |
| 114 void addError(Object error, [Object stackTrace]); |
| 115 } |
| 116 |
| 117 |
| 118 abstract class _StreamControllerLifecycle<T> { |
| 119 void _recordListen(StreamSubscription<T> subscription) {} |
| 120 void _recordPause(StreamSubscription<T> subscription) {} |
| 121 void _recordResume(StreamSubscription<T> subscription) {} |
| 122 void _recordCancel(StreamSubscription<T> subscription) {} |
| 123 } |
| 124 |
| 125 /** |
| 126 * Default implementation of [StreamController]. |
| 127 * |
| 128 * Controls a stream that only supports a single controller. |
| 129 */ |
| 130 class _StreamControllerImpl<T> implements StreamController<T>, |
| 131 _StreamControllerLifecycle<T> { |
| 132 static const int _STATE_OPEN = 0; |
| 133 static const int _STATE_CANCELLED = 1; |
| 134 static const int _STATE_CLOSED = 2; |
| 135 |
| 136 final _NotificationHandler _onListen; |
| 137 final _NotificationHandler _onPause; |
| 138 final _NotificationHandler _onResume; |
| 139 final _NotificationHandler _onCancel; |
| 140 _StreamImpl<T> _stream; |
| 141 |
| 142 // An active subscription on the stream, or null if no subscripton is active. |
| 143 _ControllerSubscription<T> _subscription; |
| 144 |
| 145 // Whether we have sent a "done" event. |
| 146 int _state = _STATE_OPEN; |
| 147 |
| 148 // Events added to the stream before it has an active subscription. |
| 149 _PendingEvents _pendingEvents = null; |
| 150 |
| 151 _StreamControllerImpl(this._onListen, |
| 152 this._onPause, |
| 153 this._onResume, |
| 154 this._onCancel) { |
| 155 _stream = new _ControllerStream<T>(this); |
| 156 } |
| 157 |
| 158 Stream<T> get stream => _stream; |
| 159 |
| 160 /** |
| 161 * Returns a view of this object that only exposes the [EventSink] interface. |
| 162 */ |
| 163 EventSink<T> get sink => new _EventSinkView<T>(this); |
| 164 |
| 165 /** |
| 166 * Whether a listener has existed and been cancelled. |
| 167 * |
| 168 * After this, adding more events will be ignored. |
| 169 */ |
| 170 bool get _isCancelled => (_state & _STATE_CANCELLED) != 0; |
| 171 |
| 172 bool get isClosed => (_state & _STATE_CLOSED) != 0; |
| 173 |
| 174 bool get isPaused => _subscription != null && _subscription._isInputPaused; |
| 175 |
| 176 bool get hasListener => _subscription != null; |
93 | 177 |
94 /** | 178 /** |
95 * Send or queue a data event. | 179 * Send or queue a data event. |
96 */ | 180 */ |
97 void add(T value) => stream._add(value); | 181 void add(T value) { |
| 182 if (isClosed) throw new StateError("Adding event after close"); |
| 183 if (_subscription != null) { |
| 184 _subscription._add(value); |
| 185 } else if (!_isCancelled) { |
| 186 _addPendingEvent(new _DelayedData<T>(value)); |
| 187 } |
| 188 } |
98 | 189 |
99 /** | 190 /** |
100 * Send or enqueue an error event. | 191 * Send or enqueue an error event. |
101 * | |
102 * If a subscription has requested to be unsubscribed on errors, | |
103 * it will be unsubscribed after receiving this event. | |
104 */ | 192 */ |
105 void addError(Object error, [Object stackTrace]) { | 193 void addError(Object error, [Object stackTrace]) { |
| 194 if (isClosed) throw new StateError("Adding event after close"); |
106 if (stackTrace != null) { | 195 if (stackTrace != null) { |
107 // Force stack trace overwrite. Even if the error already contained | 196 // Force stack trace overwrite. Even if the error already contained |
108 // a stack trace. | 197 // a stack trace. |
109 _attachStackTrace(error, stackTrace); | 198 _attachStackTrace(error, stackTrace); |
110 } | 199 } |
111 stream._addError(error); | 200 if (_subscription != null) { |
112 } | 201 _subscription._addError(error); |
113 | 202 } else if (!_isCancelled) { |
114 /** | 203 _addPendingEvent(new _DelayedError(error)); |
115 * Send or enqueue a "done" message. | 204 } |
116 * | 205 } |
117 * The "done" message should be sent at most once by a stream, and it | 206 |
118 * should be the last message sent. | 207 /** |
119 */ | 208 * Closes this controller. |
120 void close() { stream._close(); } | 209 * |
| 210 * After closing, no further events may be added using [add] or [addError]. |
| 211 * |
| 212 * You are allowed to close the controller more than once, but only the first |
| 213 * call has any effect. |
| 214 * |
| 215 * The first time a controller is closed, a "done" event is sent to its |
| 216 * stream. |
| 217 */ |
| 218 void close() { |
| 219 if (isClosed) return; |
| 220 _state |= _STATE_CLOSED; |
| 221 if (_subscription != null) { |
| 222 _subscription._close(); |
| 223 } else if (!_isCancelled) { |
| 224 _addPendingEvent(const _DelayedDone()); |
| 225 } |
| 226 } |
| 227 |
| 228 void _addPendingEvent(_DelayedEvent event) { |
| 229 if (_isCancelled) return; |
| 230 _StreamImplEvents events = _pendingEvents; |
| 231 if (events == null) { |
| 232 events = _pendingEvents = new _StreamImplEvents(); |
| 233 } |
| 234 events.add(event); |
| 235 } |
| 236 |
| 237 void _recordListen(_BufferingStreamSubscription<T> subscription) { |
| 238 assert(_subscription == null); |
| 239 _subscription = subscription; |
| 240 subscription._setPendingEvents(_pendingEvents); |
| 241 _pendingEvents = null; |
| 242 subscription._guardCallback(() { |
| 243 _runGuarded(_onListen); |
| 244 }); |
| 245 } |
| 246 |
| 247 void _recordCancel(StreamSubscription<T> subscription) { |
| 248 assert(identical(_subscription, subscription)); |
| 249 _subscription = null; |
| 250 _state |= _STATE_CANCELLED; |
| 251 _runGuarded(_onCancel); |
| 252 } |
| 253 |
| 254 void _recordPause(StreamSubscription<T> subscription) { |
| 255 _runGuarded(_onPause); |
| 256 } |
| 257 |
| 258 void _recordResume(StreamSubscription<T> subscription) { |
| 259 _runGuarded(_onResume); |
| 260 } |
121 } | 261 } |
122 | 262 |
123 typedef void _NotificationHandler(); | 263 typedef void _NotificationHandler(); |
124 | 264 |
125 class _SingleControllerStream<T> extends _SingleStreamImpl<T> { | 265 void _runGuarded(_NotificationHandler notificationHandler) { |
126 _NotificationHandler _onListen; | 266 if (notificationHandler == null) return; |
127 _NotificationHandler _onPause; | 267 try { |
128 _NotificationHandler _onResume; | 268 notificationHandler(); |
129 _NotificationHandler _onCancel; | 269 } catch (e, s) { |
130 | 270 _throwDelayed(e, s); |
131 // TODO(floitsch): share this code with _MultiControllerStream. | 271 } |
132 _runGuarded(_NotificationHandler notificationHandler) { | 272 } |
133 if (notificationHandler == null) return; | 273 |
134 try { | 274 class _ControllerStream<T> extends _StreamImpl<T> { |
135 notificationHandler(); | 275 _StreamControllerLifecycle<T> _controller; |
136 } catch (e, s) { | 276 bool _hasListener = false; |
137 _throwDelayed(e, s); | 277 |
138 } | 278 _ControllerStream(this._controller); |
139 } | 279 |
140 | 280 StreamSubscription<T> _createSubscription( |
141 _SingleControllerStream(this._onListen, | 281 void onData(T data), |
142 this._onPause, | 282 void onError(Object error), |
143 this._onResume, | 283 void onDone(), |
144 this._onCancel); | 284 bool cancelOnError) { |
145 | 285 if (_hasListener) { |
146 void _onSubscriptionStateChange() { | 286 throw new StateError("The stream has already been listened to."); |
147 _runGuarded(_hasListener ? _onListen : _onCancel); | 287 } |
148 } | 288 _hasListener = true; |
149 | 289 return new _ControllerSubscription<T>( |
150 void _onPauseStateChange() { | 290 _controller, onData, onError, onDone, cancelOnError); |
151 _runGuarded(_isPaused ? _onPause : _onResume); | 291 } |
152 } | 292 |
153 } | 293 void _onListen(_BufferingStreamSubscription subscription) { |
| 294 _controller._recordListen(subscription); |
| 295 } |
| 296 } |
| 297 |
| 298 class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> { |
| 299 final _StreamControllerLifecycle<T> _controller; |
| 300 |
| 301 _ControllerSubscription(this._controller, |
| 302 void onData(T data), |
| 303 void onError(Object error), |
| 304 void onDone(), |
| 305 bool cancelOnError) |
| 306 : super(onData, onError, onDone, cancelOnError); |
| 307 |
| 308 void _onCancel() { |
| 309 _controller._recordCancel(this); |
| 310 } |
| 311 |
| 312 void _onPause() { |
| 313 _controller._recordPause(this); |
| 314 } |
| 315 |
| 316 void _onResume() { |
| 317 _controller._recordResume(this); |
| 318 } |
| 319 } |
| 320 |
| 321 class _MultiplexStreamController<T> implements StreamController<T>, |
| 322 _StreamControllerLifecycle<T> { |
| 323 final _NotificationHandler _onListen; |
| 324 final _NotificationHandler _onCancel; |
| 325 /** Set when the [close] method is called. */ |
| 326 bool _isClosed = false; |
| 327 |
| 328 // TODO(lrn): Make a more efficient implementation of these subscriptions, |
| 329 // e.g., the traditional double-linked list with concurrent add and remove |
| 330 // while firing. |
| 331 Set<_BufferingStreamSubscription<T>> _streams; |
| 332 |
| 333 _MultiplexStreamController(this._onListen, this._onCancel) |
| 334 : _streams = new Set<_BufferingStreamSubscription<T>>(); |
| 335 |
| 336 // StreamController interface. |
| 337 |
| 338 Stream<T> get stream => new _ControllerStream<T>(this); |
| 339 |
| 340 EventSink<T> get sink => new _EventSinkView<T>(this); |
| 341 |
| 342 bool get isClosed => _isClosed; |
| 343 |
| 344 /** |
| 345 * A multiplex controller is never paused. |
| 346 * |
| 347 * Each receiving stream may be paused individually, and they handle their |
| 348 * own buffering. |
| 349 */ |
| 350 bool get isPaused => false; |
| 351 |
| 352 /** Whether there are currently a subscriber on the [Stream]. */ |
| 353 bool get hasListener => !_streams.isEmpty; |
| 354 |
| 355 // _StreamControllerLifecycle interface. |
| 356 |
| 357 void _recordListen(_BufferingStreamSubscription<T> subscription) { |
| 358 bool isFirst = _streams.isEmpty; |
| 359 _streams.add(subscription); |
| 360 if (isFirst) { |
| 361 _runGuarded(_onListen); |
| 362 } |
| 363 } |
| 364 |
| 365 void _recordCancel(_BufferingStreamSubscription<T> subscription) { |
| 366 _streams.remove(subscription); |
| 367 if (_streams.isEmpty) { |
| 368 _runGuarded(_onCancel); |
| 369 } |
| 370 } |
| 371 |
| 372 void _recordPause(StreamSubscription<T> subscription) {} |
| 373 void _recordResume(StreamSubscription<T> subscription) {} |
| 374 |
| 375 // EventSink interface. |
| 376 |
| 377 void add(T data) { |
| 378 if (_streams.isEmpty) return; |
| 379 _forEachListener((_BufferingStreamSubscription<T> subscription) { |
| 380 subscription._add(data); |
| 381 }); |
| 382 } |
| 383 |
| 384 void addError(Object error, [Object stackTrace]) { |
| 385 if (_streams.isEmpty) return; |
| 386 _forEachListener((_BufferingStreamSubscription<T> subscription) { |
| 387 subscription._addError(error); |
| 388 }); |
| 389 } |
| 390 |
| 391 void close() { |
| 392 _isClosed = true; |
| 393 if (_streams.isEmpty) return; |
| 394 _forEachListener((_BufferingStreamSubscription<T> subscription) { |
| 395 _streams.remove(subscription); |
| 396 subscription._close(); |
| 397 }); |
| 398 } |
| 399 |
| 400 void _forEachListener( |
| 401 void action(_BufferingStreamSubscription<T> subscription)) { |
| 402 List<_BufferingStreamSubscription<T>> subscriptions = _streams.toList(); |
| 403 for (_BufferingStreamSubscription<T> subscription in subscriptions) { |
| 404 if (_streams.contains(subscription)) { |
| 405 action(subscription); |
| 406 } |
| 407 } |
| 408 } |
| 409 } |
| 410 |
OLD | NEW |