| OLD | NEW |
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 class _BroadcastStream<T> extends _ControllerStream<T> { | 7 class _BroadcastStream<T> extends _ControllerStream<T> { |
| 8 _BroadcastStream(_StreamControllerLifecycle controller) : super(controller); | 8 _BroadcastStream(_StreamControllerLifecycle controller) : super(controller); |
| 9 | 9 |
| 10 bool get isBroadcast => true; | 10 bool get isBroadcast => true; |
| (...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 72 _StreamControllerLifecycle<T>, | 72 _StreamControllerLifecycle<T>, |
| 73 _BroadcastSubscriptionLink, | 73 _BroadcastSubscriptionLink, |
| 74 _EventSink<T>, | 74 _EventSink<T>, |
| 75 _EventDispatch<T> { | 75 _EventDispatch<T> { |
| 76 static const int _STATE_INITIAL = 0; | 76 static const int _STATE_INITIAL = 0; |
| 77 static const int _STATE_EVENT_ID = 1; | 77 static const int _STATE_EVENT_ID = 1; |
| 78 static const int _STATE_FIRING = 2; | 78 static const int _STATE_FIRING = 2; |
| 79 static const int _STATE_CLOSED = 4; | 79 static const int _STATE_CLOSED = 4; |
| 80 static const int _STATE_ADDSTREAM = 8; | 80 static const int _STATE_ADDSTREAM = 8; |
| 81 | 81 |
| 82 _NotificationHandler _onListen; | 82 ControllerCallback onListen; |
| 83 _NotificationHandler _onCancel; | 83 ControllerCancelCallback onCancel; |
| 84 | 84 |
| 85 // State of the controller. | 85 // State of the controller. |
| 86 int _state; | 86 int _state; |
| 87 | 87 |
| 88 // Double-linked list of active listeners. | 88 // Double-linked list of active listeners. |
| 89 _BroadcastSubscriptionLink _next; | 89 _BroadcastSubscriptionLink _next; |
| 90 _BroadcastSubscriptionLink _previous; | 90 _BroadcastSubscriptionLink _previous; |
| 91 | 91 |
| 92 // Extra state used during an [addStream] call. | 92 // Extra state used during an [addStream] call. |
| 93 _AddStreamState<T> _addStreamState; | 93 _AddStreamState<T> _addStreamState; |
| 94 | 94 |
| 95 /** | 95 /** |
| 96 * Future returned by [close] and [done]. | 96 * Future returned by [close] and [done]. |
| 97 * | 97 * |
| 98 * The future is completed whenever the done event has been sent to all | 98 * The future is completed whenever the done event has been sent to all |
| 99 * relevant listeners. | 99 * relevant listeners. |
| 100 * The relevant listeners are the ones that were listening when [close] was | 100 * The relevant listeners are the ones that were listening when [close] was |
| 101 * called. When all of these have been canceled (sending the done event makes | 101 * called. When all of these have been canceled (sending the done event makes |
| 102 * them cancel, but they can also be canceled before sending the event), | 102 * them cancel, but they can also be canceled before sending the event), |
| 103 * this future completes. | 103 * this future completes. |
| 104 * | 104 * |
| 105 * Any attempt to listen after calling [close] will throw, so there won't | 105 * Any attempt to listen after calling [close] will throw, so there won't |
| 106 * be any further listeners. | 106 * be any further listeners. |
| 107 */ | 107 */ |
| 108 _Future _doneFuture; | 108 _Future _doneFuture; |
| 109 | 109 |
| 110 _BroadcastStreamController(this._onListen, this._onCancel) | 110 _BroadcastStreamController(this.onListen, this.onCancel) |
| 111 : _state = _STATE_INITIAL { | 111 : _state = _STATE_INITIAL { |
| 112 _next = _previous = this; | 112 _next = _previous = this; |
| 113 } | 113 } |
| 114 | 114 |
| 115 void set onListen(void onListenHandler()) { _onListen = onListenHandler; } | 115 ControllerCallback get onPause { |
| 116 throw new UnsupportedError( |
| 117 "Broadcast stream controllers do not support pause callbacks"); |
| 118 } |
| 116 | 119 |
| 117 void set onPause(void onPauseHandler()) { | 120 void set onPause(void onPauseHandler()) { |
| 118 throw new UnsupportedError( | 121 throw new UnsupportedError( |
| 119 "Broadcast stream controllers do not support pause callbacks"); | 122 "Broadcast stream controllers do not support pause callbacks"); |
| 120 } | 123 } |
| 121 | 124 |
| 125 ControllerCallback get onResume { |
| 126 throw new UnsupportedError( |
| 127 "Broadcast stream controllers do not support pause callbacks"); |
| 128 } |
| 129 |
| 122 void set onResume(void onResumeHandler()) { | 130 void set onResume(void onResumeHandler()) { |
| 123 throw new UnsupportedError( | 131 throw new UnsupportedError( |
| 124 "Broadcast stream controllers do not support pause callbacks"); | 132 "Broadcast stream controllers do not support pause callbacks"); |
| 125 } | 133 } |
| 126 | 134 |
| 127 void set onCancel(onCancelHandler()) { _onCancel = onCancelHandler; } | |
| 128 | |
| 129 // StreamController interface. | 135 // StreamController interface. |
| 130 | 136 |
| 131 Stream<T> get stream => new _BroadcastStream<T>(this); | 137 Stream<T> get stream => new _BroadcastStream<T>(this); |
| 132 | 138 |
| 133 StreamSink<T> get sink => new _StreamSinkWrapper<T>(this); | 139 StreamSink<T> get sink => new _StreamSinkWrapper<T>(this); |
| 134 | 140 |
| 135 bool get isClosed => (_state & _STATE_CLOSED) != 0; | 141 bool get isClosed => (_state & _STATE_CLOSED) != 0; |
| 136 | 142 |
| 137 /** | 143 /** |
| 138 * A broadcast controller is never paused. | 144 * A broadcast controller is never paused. |
| (...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 202 if (isClosed) { | 208 if (isClosed) { |
| 203 if (onDone == null) onDone = _nullDoneHandler; | 209 if (onDone == null) onDone = _nullDoneHandler; |
| 204 return new _DoneStreamSubscription<T>(onDone); | 210 return new _DoneStreamSubscription<T>(onDone); |
| 205 } | 211 } |
| 206 StreamSubscription subscription = | 212 StreamSubscription subscription = |
| 207 new _BroadcastSubscription<T>(this, onData, onError, onDone, | 213 new _BroadcastSubscription<T>(this, onData, onError, onDone, |
| 208 cancelOnError); | 214 cancelOnError); |
| 209 _addListener(subscription); | 215 _addListener(subscription); |
| 210 if (identical(_next, _previous)) { | 216 if (identical(_next, _previous)) { |
| 211 // Only one listener, so it must be the first listener. | 217 // Only one listener, so it must be the first listener. |
| 212 _runGuarded(_onListen); | 218 _runGuarded(onListen); |
| 213 } | 219 } |
| 214 return subscription; | 220 return subscription; |
| 215 } | 221 } |
| 216 | 222 |
| 217 Future _recordCancel(_BroadcastSubscription<T> subscription) { | 223 Future _recordCancel(_BroadcastSubscription<T> subscription) { |
| 218 // If already removed by the stream, don't remove it again. | 224 // If already removed by the stream, don't remove it again. |
| 219 if (identical(subscription._next, subscription)) return null; | 225 if (identical(subscription._next, subscription)) return null; |
| 220 assert(!identical(subscription._next, subscription)); | 226 assert(!identical(subscription._next, subscription)); |
| 221 if (subscription._isFiring) { | 227 if (subscription._isFiring) { |
| 222 subscription._setRemoveAfterFiring(); | 228 subscription._setRemoveAfterFiring(); |
| (...skipping 80 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 303 void _forEachListener( | 309 void _forEachListener( |
| 304 void action(_BufferingStreamSubscription<T> subscription)) { | 310 void action(_BufferingStreamSubscription<T> subscription)) { |
| 305 if (_isFiring) { | 311 if (_isFiring) { |
| 306 throw new StateError( | 312 throw new StateError( |
| 307 "Cannot fire new event. Controller is already firing an event"); | 313 "Cannot fire new event. Controller is already firing an event"); |
| 308 } | 314 } |
| 309 if (_isEmpty) return; | 315 if (_isEmpty) return; |
| 310 | 316 |
| 311 // Get event id of this event. | 317 // Get event id of this event. |
| 312 int id = (_state & _STATE_EVENT_ID); | 318 int id = (_state & _STATE_EVENT_ID); |
| 313 // Start firing (set the _STATE_FIRING bit). We don't do [_onCancel] | 319 // Start firing (set the _STATE_FIRING bit). We don't do [onCancel] |
| 314 // callbacks while firing, and we prevent reentrancy of this function. | 320 // callbacks while firing, and we prevent reentrancy of this function. |
| 315 // | 321 // |
| 316 // Set [_state]'s event id to the next event's id. | 322 // Set [_state]'s event id to the next event's id. |
| 317 // Any listeners added while firing this event will expect the next event, | 323 // Any listeners added while firing this event will expect the next event, |
| 318 // not this one, and won't get notified. | 324 // not this one, and won't get notified. |
| 319 _state ^= _STATE_EVENT_ID | _STATE_FIRING; | 325 _state ^= _STATE_EVENT_ID | _STATE_FIRING; |
| 320 _BroadcastSubscriptionLink link = _next; | 326 _BroadcastSubscriptionLink link = _next; |
| 321 while (!identical(link, this)) { | 327 while (!identical(link, this)) { |
| 322 _BroadcastSubscription<T> subscription = link; | 328 _BroadcastSubscription<T> subscription = link; |
| 323 if (subscription._expectsEvent(id)) { | 329 if (subscription._expectsEvent(id)) { |
| (...skipping 15 matching lines...) Expand all Loading... |
| 339 _callOnCancel(); | 345 _callOnCancel(); |
| 340 } | 346 } |
| 341 } | 347 } |
| 342 | 348 |
| 343 void _callOnCancel() { | 349 void _callOnCancel() { |
| 344 assert(_isEmpty); | 350 assert(_isEmpty); |
| 345 if (isClosed && _doneFuture._mayComplete) { | 351 if (isClosed && _doneFuture._mayComplete) { |
| 346 // When closed, _doneFuture is not null. | 352 // When closed, _doneFuture is not null. |
| 347 _doneFuture._asyncComplete(null); | 353 _doneFuture._asyncComplete(null); |
| 348 } | 354 } |
| 349 _runGuarded(_onCancel); | 355 _runGuarded(onCancel); |
| 350 } | 356 } |
| 351 } | 357 } |
| 352 | 358 |
| 353 class _SyncBroadcastStreamController<T> extends _BroadcastStreamController<T> | 359 class _SyncBroadcastStreamController<T> extends _BroadcastStreamController<T> |
| 354 implements SynchronousStreamController<T> { | 360 implements SynchronousStreamController<T> { |
| 355 _SyncBroadcastStreamController(void onListen(), void onCancel()) | 361 _SyncBroadcastStreamController(void onListen(), void onCancel()) |
| 356 : super(onListen, onCancel); | 362 : super(onListen, onCancel); |
| 357 | 363 |
| 358 // EventDispatch interface. | 364 // EventDispatch interface. |
| 359 | 365 |
| (...skipping 167 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 527 _pauseCount++; | 533 _pauseCount++; |
| 528 } | 534 } |
| 529 void resume() { _resume(null); } | 535 void resume() { _resume(null); } |
| 530 void _resume(_) { | 536 void _resume(_) { |
| 531 if (_pauseCount > 0) _pauseCount--; | 537 if (_pauseCount > 0) _pauseCount--; |
| 532 } | 538 } |
| 533 Future cancel() { return new _Future.immediate(null); } | 539 Future cancel() { return new _Future.immediate(null); } |
| 534 bool get isPaused => _pauseCount > 0; | 540 bool get isPaused => _pauseCount > 0; |
| 535 Future asFuture([Object value]) => new _Future(); | 541 Future asFuture([Object value]) => new _Future(); |
| 536 } | 542 } |
| OLD | NEW |