OLD | NEW |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 class _BroadcastStream<T> extends _ControllerStream<T> { | 7 class _BroadcastStream<T> extends _ControllerStream<T> { |
8 _BroadcastStream(_StreamControllerLifecycle controller) : super(controller); | 8 _BroadcastStream(_StreamControllerLifecycle controller) : super(controller); |
9 | 9 |
10 bool get isBroadcast => true; | 10 bool get isBroadcast => true; |
(...skipping 183 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
194 new _BroadcastSubscription<T>(this, onData, onError, onDone, | 194 new _BroadcastSubscription<T>(this, onData, onError, onDone, |
195 cancelOnError); | 195 cancelOnError); |
196 _addListener(subscription); | 196 _addListener(subscription); |
197 if (identical(_next, _previous)) { | 197 if (identical(_next, _previous)) { |
198 // Only one listener, so it must be the first listener. | 198 // Only one listener, so it must be the first listener. |
199 _runGuarded(_onListen); | 199 _runGuarded(_onListen); |
200 } | 200 } |
201 return subscription; | 201 return subscription; |
202 } | 202 } |
203 | 203 |
204 Future _recordCancel(StreamSubscription<T> subscription) { | 204 Future _recordCancel(StreamSubscription<T> sub) { |
| 205 var subscription = sub as _BroadcastSubscription<T>; |
205 // If already removed by the stream, don't remove it again. | 206 // If already removed by the stream, don't remove it again. |
206 if (identical(subscription._next, subscription)) return null; | 207 if (identical(subscription._next, subscription)) return null; |
207 assert(!identical(subscription._next, subscription)); | 208 assert(!identical(subscription._next, subscription)); |
208 if (subscription._isFiring) { | 209 if (subscription._isFiring) { |
209 subscription._setRemoveAfterFiring(); | 210 subscription._setRemoveAfterFiring(); |
210 } else { | 211 } else { |
211 assert(!identical(subscription._next, subscription)); | 212 assert(!identical(subscription._next, subscription)); |
212 _removeListener(subscription); | 213 _removeListener(subscription); |
213 // If we are currently firing an event, the empty-check is performed at | 214 // If we are currently firing an event, the empty-check is performed at |
214 // the end of the listener loop instead of here. | 215 // the end of the listener loop instead of here. |
(...skipping 288 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
503 _pauseCount++; | 504 _pauseCount++; |
504 } | 505 } |
505 void resume() { _resume(null); } | 506 void resume() { _resume(null); } |
506 void _resume(_) { | 507 void _resume(_) { |
507 if (_pauseCount > 0) _pauseCount--; | 508 if (_pauseCount > 0) _pauseCount--; |
508 } | 509 } |
509 Future cancel() { return new _Future.immediate(null); } | 510 Future cancel() { return new _Future.immediate(null); } |
510 bool get isPaused => _pauseCount > 0; | 511 bool get isPaused => _pauseCount > 0; |
511 Future asFuture([Object value]) => new _Future(); | 512 Future asFuture([Object value]) => new _Future(); |
512 } | 513 } |
OLD | NEW |