Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
| 8 // Controller for creating and adding events to a stream. | 8 // Controller for creating and adding events to a stream. |
| 9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
| 10 | 10 |
| (...skipping 28 matching lines...) Expand all Loading... | |
| 39 * Whether to invoke a callback depends only on the state before and after | 39 * Whether to invoke a callback depends only on the state before and after |
| 40 * a stream action, for example firing an event. If the state changes multiple | 40 * a stream action, for example firing an event. If the state changes multiple |
| 41 * times during the action, and then ends up in the same state as before, no | 41 * times during the action, and then ends up in the same state as before, no |
| 42 * callback is performed. | 42 * callback is performed. |
| 43 * | 43 * |
| 44 * If listeners are added after the stream has completed (sent a "done" event), | 44 * If listeners are added after the stream has completed (sent a "done" event), |
| 45 * the listeners will be sent a "done" event eventually, but they won't affect | 45 * the listeners will be sent a "done" event eventually, but they won't affect |
| 46 * the stream at all, and won't trigger callbacks. From the controller's point | 46 * the stream at all, and won't trigger callbacks. From the controller's point |
| 47 * of view, the stream is completely inert when has completed. | 47 * of view, the stream is completely inert when has completed. |
| 48 */ | 48 */ |
| 49 class StreamController<T> extends EventSink<T> { | 49 abstract class StreamController<T> implements EventSink<T> { |
| 50 /** The stream that this controller is controlling. */ | |
| 51 Stream<T> get stream; | |
| 52 | |
| 53 /** | |
| 54 * A controller with a [stream] that supports only one single subscriber. | |
| 55 * | |
| 56 * The controller will buffer all incoming events until the subscriber is | |
| 57 * registered. | |
| 58 * | |
| 59 * The [onPause] function is called when the stream becomes | |
| 60 * paused. [onResume] is called when the stream resumed. | |
| 61 * | |
| 62 * The [onListen] callback is called when the stream | |
| 63 * receives its listener and [onCancel] when the listener ends | |
| 64 * its subscription. | |
| 65 * | |
| 66 * If the stream is canceled before the controller needs new data the | |
| 67 * [onResume] call might not be executed. | |
| 68 */ | |
| 69 factory StreamController({void onListen(), | |
| 70 void onPause(), | |
| 71 void onResume(), | |
| 72 void onCancel()}) | |
| 73 => new _StreamControllerImpl<T>(onListen, onPause, onResume, onCancel); | |
| 74 | |
| 75 /** | |
| 76 * A controller where [stream] creates new stream each time it is read. | |
| 77 * | |
| 78 * The controller distributes any events to all currently subscribed streams. | |
| 79 * | |
| 80 * The [onListen] callback is called when the first listener is subscribed, | |
| 81 * and the [onCancel] is called when there is no longer any active listeners. | |
| 82 * If a listener is added again later, after the [onCancel] was called, | |
| 83 * the [onListen] will be called again. | |
| 84 */ | |
| 85 factory StreamController.multiplex({void onListen(), void onCancel()}) { | |
| 86 return new _MultiplexStreamController<T>(onListen, onCancel); | |
| 87 } | |
| 88 | |
| 89 /** | |
| 90 * Returns a view of this object that only exposes the [EventSink] interface. | |
| 91 */ | |
| 92 EventSink<T> get sink; | |
| 93 } | |
| 94 | |
| 95 | |
| 96 abstract class _StreamControllerLifecycle<T> { | |
| 97 void _recordListen(StreamSubscription<T> subscription) {} | |
| 98 void _recordPause(StreamSubscription<T> subscription) {} | |
| 99 void _recordResume(StreamSubscription<T> subscription) {} | |
| 100 void _recordCancel(StreamSubscription<T> subscription) {} | |
| 101 } | |
| 102 | |
| 103 /** | |
| 104 * Default implementation of [StreamController]. | |
| 105 * | |
| 106 * Controls a stream that only supports a single controller. | |
| 107 */ | |
| 108 class _StreamControllerImpl<T> implements StreamController<T>, | |
| 109 _StreamControllerLifecycle<T> { | |
| 50 static const int _STATE_OPEN = 0; | 110 static const int _STATE_OPEN = 0; |
| 51 static const int _STATE_CANCELLED = 1; | 111 static const int _STATE_CANCELLED = 1; |
| 52 static const int _STATE_CLOSED = 2; | 112 static const int _STATE_CLOSED = 2; |
| 53 | 113 |
| 54 final _NotificationHandler _onListen; | 114 final _NotificationHandler _onListen; |
| 55 final _NotificationHandler _onPause; | 115 final _NotificationHandler _onPause; |
| 56 final _NotificationHandler _onResume; | 116 final _NotificationHandler _onResume; |
| 57 final _NotificationHandler _onCancel; | 117 final _NotificationHandler _onCancel; |
| 58 _StreamImpl<T> _stream; | 118 _StreamImpl<T> _stream; |
| 59 | 119 |
| 60 // An active subscription on the stream, or null if no subscripton is active. | 120 // An active subscription on the stream, or null if no subscripton is active. |
| 61 _ControllerSubscription<T> _subscription; | 121 _ControllerSubscription<T> _subscription; |
| 62 | 122 |
| 63 // Whether we have sent a "done" event. | 123 // Whether we have sent a "done" event. |
| 64 int _state = _STATE_OPEN; | 124 int _state = _STATE_OPEN; |
| 65 | 125 |
| 66 // Events added to the stream before it has an active subscription. | 126 // Events added to the stream before it has an active subscription. |
| 67 _PendingEvents _pendingEvents = null; | 127 _PendingEvents _pendingEvents = null; |
| 68 | 128 |
| 69 /** | 129 _StreamControllerImpl(this._onListen, |
| 70 * A controller with a [stream] that supports only one single subscriber. | 130 this._onPause, |
| 71 * | 131 this._onResume, |
| 72 * The controller will buffer all incoming events until the subscriber is | 132 this._onCancel) { |
| 73 * registered. | |
| 74 * | |
| 75 * The [onPause] function is called when the stream becomes | |
| 76 * paused. [onResume] is called when the stream resumed. | |
| 77 * | |
| 78 * The [onListen] callback is called when the stream | |
| 79 * receives its listener and [onCancel] when the listener ends | |
| 80 * its subscription. | |
| 81 * | |
| 82 * If the stream is canceled before the controller needs new data the | |
| 83 * [onResume] call might not be executed. | |
| 84 */ | |
| 85 StreamController({void onListen(), | |
| 86 void onPause(), | |
| 87 void onResume(), | |
| 88 void onCancel()}) | |
| 89 : _onListen = onListen, | |
| 90 _onPause = onPause, | |
| 91 _onResume = onResume, | |
| 92 _onCancel = onCancel { | |
| 93 _stream = new _ControllerStream<T>(this); | 133 _stream = new _ControllerStream<T>(this); |
| 94 } | 134 } |
| 95 | 135 |
| 96 Stream<T> get stream => _stream; | 136 Stream<T> get stream => _stream; |
| 97 | 137 |
| 98 /** | 138 /** |
| 99 * Returns a view of this object that only exposes the [EventSink] interface. | 139 * Returns a view of this object that only exposes the [EventSink] interface. |
| 100 */ | 140 */ |
| 101 EventSink<T> get sink => new _EventSinkView<T>(this); | 141 EventSink<T> get sink => new _EventSinkView<T>(this); |
| 102 | 142 |
| (...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 173 | 213 |
| 174 void _addPendingEvent(_DelayedEvent event) { | 214 void _addPendingEvent(_DelayedEvent event) { |
| 175 if (_isCancelled) return; | 215 if (_isCancelled) return; |
| 176 _StreamImplEvents events = _pendingEvents; | 216 _StreamImplEvents events = _pendingEvents; |
| 177 if (events == null) { | 217 if (events == null) { |
| 178 events = _pendingEvents = new _StreamImplEvents(); | 218 events = _pendingEvents = new _StreamImplEvents(); |
| 179 } | 219 } |
| 180 events.add(event); | 220 events.add(event); |
| 181 } | 221 } |
| 182 | 222 |
| 183 void _recordListen(_BufferingStreamSubscription subscription) { | 223 void _recordListen(StreamSubscription<T> subscription) { |
|
floitsch
2013/05/24 20:51:42
Why not keep the type?
Lasse Reichstein Nielsen
2013/05/27 08:04:12
True, it is true for this subclass, and I avoid a
| |
| 224 _BufferingStreamSubscription bufferingSubscription = subscription; | |
| 184 assert(_subscription == null); | 225 assert(_subscription == null); |
| 185 _subscription = subscription; | 226 _subscription = bufferingSubscription; |
| 186 subscription._setPendingEvents(_pendingEvents); | 227 bufferingSubscription._setPendingEvents(_pendingEvents); |
| 187 _pendingEvents = null; | 228 _pendingEvents = null; |
| 188 subscription._guardCallback(() { | 229 bufferingSubscription._guardCallback(() { |
| 189 _runGuarded(_onListen); | 230 _runGuarded(_onListen); |
| 190 }); | 231 }); |
| 191 } | 232 } |
| 192 | 233 |
| 193 void _recordCancel() { | 234 void _recordCancel(StreamSubscription<T> subscription) { |
| 194 _subscription = null; | 235 _subscription = null; |
|
floitsch
2013/05/24 20:51:42
assert that the subscription is the same?
Lasse Reichstein Nielsen
2013/05/27 08:04:12
Done.
| |
| 195 _state |= _STATE_CANCELLED; | 236 _state |= _STATE_CANCELLED; |
| 196 _runGuarded(_onCancel); | 237 _runGuarded(_onCancel); |
| 197 } | 238 } |
| 198 | 239 |
| 199 void _recordPause() { | 240 void _recordPause(StreamSubscription<T> subscription) { |
| 200 _runGuarded(_onPause); | 241 _runGuarded(_onPause); |
| 201 } | 242 } |
| 202 | 243 |
| 203 void _recordResume() { | 244 void _recordResume(StreamSubscription<T> subscription) { |
| 204 _runGuarded(_onResume); | 245 _runGuarded(_onResume); |
| 205 } | 246 } |
| 206 } | 247 } |
| 207 | 248 |
| 208 typedef void _NotificationHandler(); | 249 typedef void _NotificationHandler(); |
| 209 | 250 |
| 210 void _runGuarded(_NotificationHandler notificationHandler) { | 251 void _runGuarded(_NotificationHandler notificationHandler) { |
| 211 if (notificationHandler == null) return; | 252 if (notificationHandler == null) return; |
| 212 try { | 253 try { |
| 213 notificationHandler(); | 254 notificationHandler(); |
| 214 } catch (e, s) { | 255 } catch (e, s) { |
| 215 _throwDelayed(e, s); | 256 _throwDelayed(e, s); |
| 216 } | 257 } |
| 217 } | 258 } |
| 218 | 259 |
| 219 class _ControllerStream<T> extends _StreamImpl<T> { | 260 class _ControllerStream<T> extends _StreamImpl<T> { |
| 220 StreamController _controller; | 261 _StreamControllerLifecycle<T> _controller; |
| 221 bool _hasListener = false; | 262 bool _hasListener = false; |
| 222 | 263 |
| 223 _ControllerStream(this._controller); | 264 _ControllerStream(this._controller); |
| 224 | 265 |
| 225 StreamSubscription<T> _createSubscription( | 266 StreamSubscription<T> _createSubscription( |
| 226 void onData(T data), | 267 void onData(T data), |
| 227 void onError(Object error), | 268 void onError(Object error), |
| 228 void onDone(), | 269 void onDone(), |
| 229 bool cancelOnError) { | 270 bool cancelOnError) { |
| 230 if (_hasListener) { | 271 if (_hasListener) { |
| 231 throw new StateError("The stream has already been listened to."); | 272 throw new StateError("The stream has already been listened to."); |
| 232 } | 273 } |
| 233 _hasListener = true; | 274 _hasListener = true; |
| 234 return new _ControllerSubscription<T>( | 275 return new _ControllerSubscription<T>( |
| 235 _controller, onData, onError, onDone, cancelOnError); | 276 _controller, onData, onError, onDone, cancelOnError); |
| 236 } | 277 } |
| 237 | 278 |
| 238 void _onListen(_BufferingStreamSubscription subscription) { | 279 void _onListen(_BufferingStreamSubscription subscription) { |
| 239 _controller._recordListen(subscription); | 280 _controller._recordListen(subscription); |
| 240 } | 281 } |
| 241 } | 282 } |
| 242 | 283 |
| 243 class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> { | 284 class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> { |
| 244 final StreamController _controller; | 285 final _StreamControllerLifecycle<T> _controller; |
| 245 | 286 |
| 246 _ControllerSubscription(StreamController controller, | 287 _ControllerSubscription(this._controller, |
| 247 void onData(T data), | 288 void onData(T data), |
| 248 void onError(Object error), | 289 void onError(Object error), |
| 249 void onDone(), | 290 void onDone(), |
| 250 bool cancelOnError) | 291 bool cancelOnError) |
| 251 : _controller = controller, | 292 : super(onData, onError, onDone, cancelOnError); |
| 252 super(onData, onError, onDone, cancelOnError); | |
| 253 | 293 |
| 254 void _onCancel() { | 294 void _onCancel() { |
| 255 _controller._recordCancel(); | 295 _controller._recordCancel(this); |
| 256 } | 296 } |
| 257 | 297 |
| 258 void _onPause() { | 298 void _onPause() { |
| 259 _controller._recordPause(); | 299 _controller._recordPause(this); |
| 260 } | 300 } |
| 261 | 301 |
| 262 void _onResume() { | 302 void _onResume() { |
| 263 _controller._recordResume(); | 303 _controller._recordResume(this); |
| 264 } | 304 } |
| 265 } | 305 } |
| 306 | |
| 307 class _MultiplexStreamController<T> implements StreamController<T>, | |
| 308 _StreamControllerLifecycle<T> { | |
| 309 final _NotificationHandler _onListen; | |
| 310 final _NotificationHandler _onCancel; | |
| 311 // TODO(lrn): Make a more efficient implementation of these subscriptions, | |
| 312 // e.g., the traditional double-linked list with concurrent add and remove | |
| 313 // while firing. | |
| 314 Set<_BufferingStreamSubscription<T>> _streams; | |
| 315 | |
| 316 _MultiplexStreamController(this._onListen, this._onCancel) | |
| 317 : _streams = new Set<_BufferingStreamSubscription<T>>(); | |
| 318 | |
| 319 // StreamController interface. | |
| 320 | |
| 321 Stream<T> get stream => new _ControllerStream<T>(this); | |
| 322 | |
| 323 EventSink<T> get sink => new _EventSinkView<T>(this); | |
| 324 | |
| 325 // _StreamControllerLifecycle interface. | |
| 326 | |
| 327 void _recordListen(_BufferingStreamSubscription<T> subscription) { | |
| 328 bool isFirst = _streams.isEmpty; | |
| 329 _streams.add(subscription); | |
| 330 if (isFirst) { | |
| 331 _runGuarded(_onListen); | |
| 332 } | |
| 333 } | |
| 334 | |
| 335 void _recordCancel(_BufferingStreamSubscription<T> subscription) { | |
| 336 _streams.remove(subscription); | |
| 337 if (_streams.isEmpty) { | |
| 338 _runGuarded(_onCancel); | |
| 339 } | |
| 340 } | |
| 341 | |
| 342 void _recordPause(StreamSubscription<T> subscription) {} | |
| 343 void _recordResume(StreamSubscription<T> subscription) {} | |
| 344 | |
| 345 // EventSink interface. | |
| 346 | |
| 347 void add(T data) { | |
| 348 if (_streams.isEmpty) return; | |
| 349 _forEachListener((_BufferingStreamSubscription<T> subscription) { | |
| 350 subscription._add(data); | |
| 351 }); | |
| 352 } | |
| 353 | |
| 354 void addError(Object error) { | |
| 355 if (_streams.isEmpty) return; | |
| 356 _forEachListener((_BufferingStreamSubscription<T> subscription) { | |
| 357 subscription._addError(error); | |
| 358 }); | |
| 359 } | |
| 360 | |
| 361 void close() { | |
| 362 if (_streams.isEmpty) return; | |
| 363 _forEachListener((_BufferingStreamSubscription<T> subscription) { | |
| 364 _streams.remove(subscription); | |
| 365 subscription._close(); | |
| 366 }); | |
| 367 } | |
| 368 | |
| 369 void _forEachListener( | |
| 370 void action(_BufferingStreamSubscription<T> subscription)) { | |
| 371 List<_BufferingStreamSubscription<T>> subscriptions = _streams.toList(); | |
| 372 for (_BufferingStreamSubscription<T> subscription in subscriptions) { | |
| 373 if (_streams.contains(subscription)) { | |
| 374 action(subscription); | |
| 375 } | |
| 376 } | |
| 377 } | |
| 378 } | |
| 379 | |
| OLD | NEW |