| OLD | NEW |
| (Empty) |
| 1 part of dart.async; | |
| 2 class _BroadcastStream<T> extends _ControllerStream<T> {_BroadcastStream(_Strea
mControllerLifecycle controller) : super(DEVC$RT.cast(controller, DEVC$RT.type((
_StreamControllerLifecycle<dynamic> _) { | |
| 3 } | |
| 4 ), DEVC$RT.type((_StreamControllerLifecycle<T> _) { | |
| 5 } | |
| 6 ), "CompositeCast", """line 8, column 67 of dart:async/broadcast_stream_controll
er.dart: """, controller is _StreamControllerLifecycle<T>, false)); | |
| 7 bool get isBroadcast => true; | |
| 8 } | |
| 9 abstract class _BroadcastSubscriptionLink {_BroadcastSubscriptionLink _next; | |
| 10 _BroadcastSubscriptionLink _previous; | |
| 11 } | |
| 12 class _BroadcastSubscription<T> extends _ControllerSubscription<T> implements _
BroadcastSubscriptionLink {static const int _STATE_EVENT_ID = 1; | |
| 13 static const int _STATE_FIRING = 2; | |
| 14 static const int _STATE_REMOVE_AFTER_FIRING = 4; | |
| 15 int _eventState; | |
| 16 _BroadcastSubscriptionLink _next; | |
| 17 _BroadcastSubscriptionLink _previous; | |
| 18 _BroadcastSubscription(_StreamControllerLifecycle controller, void onData(T dat
a), Function onError, void onDone(), bool cancelOnError) : super(DEVC$RT.cast(co
ntroller, DEVC$RT.type((_StreamControllerLifecycle<dynamic> _) { | |
| 19 } | |
| 20 ), DEVC$RT.type((_StreamControllerLifecycle<T> _) { | |
| 21 } | |
| 22 ), "CompositeCast", """line 36, column 15 of dart:async/broadcast_stream_control
ler.dart: """, controller is _StreamControllerLifecycle<T>, false), onData, onEr
ror, onDone, cancelOnError) { | |
| 23 _next = _previous = this; | |
| 24 } | |
| 25 _BroadcastStreamController<T> get _controller => ((__x2) => DEVC$RT.cast(__x2,
DEVC$RT.type((_StreamControllerLifecycle<T> _) { | |
| 26 } | |
| 27 ), DEVC$RT.type((_BroadcastStreamController<T> _) { | |
| 28 } | |
| 29 ), "CompositeCast", """line 40, column 52 of dart:async/broadcast_stream_control
ler.dart: """, __x2 is _BroadcastStreamController<T>, false))(super._controller)
; | |
| 30 bool _expectsEvent(int eventId) => (_eventState & _STATE_EVENT_ID) == eventId; | |
| 31 void _toggleEventId() { | |
| 32 _eventState ^= _STATE_EVENT_ID; | |
| 33 } | |
| 34 bool get _isFiring => (_eventState & _STATE_FIRING) != 0; | |
| 35 void _setRemoveAfterFiring() { | |
| 36 assert (_isFiring); _eventState |= _STATE_REMOVE_AFTER_FIRING; | |
| 37 } | |
| 38 bool get _removeAfterFiring => (_eventState & _STATE_REMOVE_AFTER_FIRING) != 0; | |
| 39 void _onPause() { | |
| 40 } | |
| 41 void _onResume() { | |
| 42 } | |
| 43 } | |
| 44 abstract class _BroadcastStreamController<T> implements StreamController<T>, _S
treamControllerLifecycle<T>, _BroadcastSubscriptionLink, _EventSink<T>, _EventDi
spatch<T> {static const int _STATE_INITIAL = 0; | |
| 45 static const int _STATE_EVENT_ID = 1; | |
| 46 static const int _STATE_FIRING = 2; | |
| 47 static const int _STATE_CLOSED = 4; | |
| 48 static const int _STATE_ADDSTREAM = 8; | |
| 49 final _NotificationHandler _onListen; | |
| 50 final _NotificationHandler _onCancel; | |
| 51 int _state; | |
| 52 _BroadcastSubscriptionLink _next; | |
| 53 _BroadcastSubscriptionLink _previous; | |
| 54 _AddStreamState<T> _addStreamState; | |
| 55 _Future _doneFuture; | |
| 56 _BroadcastStreamController(this._onListen, this._onCancel) : _state = _STATE_IN
ITIAL { | |
| 57 _next = _previous = this; | |
| 58 } | |
| 59 Stream<T> get stream => new _BroadcastStream<T>(this); | |
| 60 StreamSink<T> get sink => new _StreamSinkWrapper<T>(this); | |
| 61 bool get isClosed => (_state & _STATE_CLOSED) != 0; | |
| 62 bool get isPaused => false; | |
| 63 bool get hasListener => !_isEmpty; | |
| 64 bool get _hasOneListener { | |
| 65 assert (!_isEmpty); return identical(_next._next, this); | |
| 66 } | |
| 67 bool get _isFiring => (_state & _STATE_FIRING) != 0; | |
| 68 bool get _isAddingStream => (_state & _STATE_ADDSTREAM) != 0; | |
| 69 bool get _mayAddEvent => (_state < _STATE_CLOSED); | |
| 70 _Future _ensureDoneFuture() { | |
| 71 if (_doneFuture != null) return _doneFuture; | |
| 72 return _doneFuture = new _Future(); | |
| 73 } | |
| 74 bool get _isEmpty => identical(_next, this); | |
| 75 void _addListener(_BroadcastSubscription<T> subscription) { | |
| 76 assert (identical(subscription._next, subscription)); subscription._previous = _
previous; | |
| 77 subscription._next = this; | |
| 78 this._previous._next = subscription; | |
| 79 this._previous = subscription; | |
| 80 subscription._eventState = (_state & _STATE_EVENT_ID); | |
| 81 } | |
| 82 void _removeListener(_BroadcastSubscription<T> subscription) { | |
| 83 assert (identical(subscription._controller, this)); assert (!identical(subscript
ion._next, subscription)); _BroadcastSubscriptionLink previous = subscription._p
revious; | |
| 84 _BroadcastSubscriptionLink next = subscription._next; | |
| 85 previous._next = next; | |
| 86 next._previous = previous; | |
| 87 subscription._next = subscription._previous = subscription; | |
| 88 } | |
| 89 StreamSubscription<T> _subscribe(void onData(T data), Function onError, void on
Done(), bool cancelOnError) { | |
| 90 if (isClosed) { | |
| 91 if (onDone == null) onDone = _nullDoneHandler; | |
| 92 return new _DoneStreamSubscription<T>(onDone); | |
| 93 } | |
| 94 StreamSubscription subscription = new _BroadcastSubscription<T>(this, onData, o
nError, onDone, cancelOnError); | |
| 95 _addListener(DEVC$RT.cast(subscription, DEVC$RT.type((StreamSubscription<dynami
c> _) { | |
| 96 } | |
| 97 ), DEVC$RT.type((_BroadcastSubscription<T> _) { | |
| 98 } | |
| 99 ), "CompositeCast", """line 196, column 18 of dart:async/broadcast_stream_contro
ller.dart: """, subscription is _BroadcastSubscription<T>, false)); | |
| 100 if (identical(_next, _previous)) { | |
| 101 _runGuarded(_onListen); | |
| 102 } | |
| 103 return DEVC$RT.cast(subscription, DEVC$RT.type((StreamSubscription<dynamic> _)
{ | |
| 104 } | |
| 105 ), DEVC$RT.type((StreamSubscription<T> _) { | |
| 106 } | |
| 107 ), "CompositeCast", """line 201, column 12 of dart:async/broadcast_stream_contro
ller.dart: """, subscription is StreamSubscription<T>, false); | |
| 108 } | |
| 109 Future _recordCancel(StreamSubscription<T> subscription) { | |
| 110 if (identical(subscription._next, subscription)) return null; | |
| 111 assert (!identical(subscription._next, subscription)); if (subscription._isFiri
ng) { | |
| 112 subscription._setRemoveAfterFiring(); | |
| 113 } | |
| 114 else { | |
| 115 assert (!identical(subscription._next, subscription)); _removeListener(DEVC$RT.c
ast(subscription, DEVC$RT.type((StreamSubscription<T> _) { | |
| 116 } | |
| 117 ), DEVC$RT.type((_BroadcastSubscription<T> _) { | |
| 118 } | |
| 119 ), "CompositeCast", """line 212, column 23 of dart:async/broadcast_stream_contro
ller.dart: """, subscription is _BroadcastSubscription<T>, false)); | |
| 120 if (!_isFiring && _isEmpty) { | |
| 121 _callOnCancel(); | |
| 122 } | |
| 123 } | |
| 124 return null; | |
| 125 } | |
| 126 void _recordPause(StreamSubscription<T> subscription) { | |
| 127 } | |
| 128 void _recordResume(StreamSubscription<T> subscription) { | |
| 129 } | |
| 130 Error _addEventError() { | |
| 131 if (isClosed) { | |
| 132 return new StateError("Cannot add new events after calling close"); | |
| 133 } | |
| 134 assert (_isAddingStream); return new StateError("Cannot add new events while do
ing an addStream"); | |
| 135 } | |
| 136 void add(T data) { | |
| 137 if (!_mayAddEvent) throw _addEventError(); | |
| 138 _sendData(data); | |
| 139 } | |
| 140 void addError(Object error, [StackTrace stackTrace]) { | |
| 141 error = _nonNullError(error); | |
| 142 if (!_mayAddEvent) throw _addEventError(); | |
| 143 AsyncError replacement = Zone.current.errorCallback(error, stackTrace); | |
| 144 if (replacement != null) { | |
| 145 error = _nonNullError(replacement.error); | |
| 146 stackTrace = replacement.stackTrace; | |
| 147 } | |
| 148 _sendError(error, stackTrace); | |
| 149 } | |
| 150 Future close() { | |
| 151 if (isClosed) { | |
| 152 assert (_doneFuture != null); return _doneFuture; | |
| 153 } | |
| 154 if (!_mayAddEvent) throw _addEventError(); | |
| 155 _state |= _STATE_CLOSED; | |
| 156 Future doneFuture = _ensureDoneFuture(); | |
| 157 _sendDone(); | |
| 158 return doneFuture; | |
| 159 } | |
| 160 Future get done => _ensureDoneFuture(); | |
| 161 Future addStream(Stream<T> stream, { | |
| 162 bool cancelOnError : true} | |
| 163 ) { | |
| 164 if (!_mayAddEvent) throw _addEventError(); | |
| 165 _state |= _STATE_ADDSTREAM; | |
| 166 _addStreamState = new _AddStreamState<T>(this, stream, cancelOnError); | |
| 167 return _addStreamState.addStreamFuture; | |
| 168 } | |
| 169 void _add(T data) { | |
| 170 _sendData(data); | |
| 171 } | |
| 172 void _addError(Object error, StackTrace stackTrace) { | |
| 173 _sendError(error, stackTrace); | |
| 174 } | |
| 175 void _close() { | |
| 176 assert (_isAddingStream); _AddStreamState addState = _addStreamState; | |
| 177 _addStreamState = null; | |
| 178 _state &= ~_STATE_ADDSTREAM; | |
| 179 addState.complete(); | |
| 180 } | |
| 181 void _forEachListener(void action(_BufferingStreamSubscription<T> subscription)
) { | |
| 182 if (_isFiring) { | |
| 183 throw new StateError("Cannot fire new event. Controller is already firing an eve
nt"); | |
| 184 } | |
| 185 if (_isEmpty) return; int id = (_state & _STATE_EVENT_ID); | |
| 186 _state ^= _STATE_EVENT_ID | _STATE_FIRING; | |
| 187 _BroadcastSubscriptionLink link = _next; | |
| 188 while (!identical(link, this)) { | |
| 189 _BroadcastSubscription<T> subscription = DEVC$RT.cast(link, _BroadcastSubscripti
onLink, DEVC$RT.type((_BroadcastSubscription<T> _) { | |
| 190 } | |
| 191 ), "CompositeCast", """line 309, column 48 of dart:async/broadcast_stream_contro
ller.dart: """, link is _BroadcastSubscription<T>, false); | |
| 192 if (subscription._expectsEvent(id)) { | |
| 193 subscription._eventState |= _BroadcastSubscription._STATE_FIRING; | |
| 194 action(subscription); | |
| 195 subscription._toggleEventId(); | |
| 196 link = subscription._next; | |
| 197 if (subscription._removeAfterFiring) { | |
| 198 _removeListener(subscription); | |
| 199 } | |
| 200 subscription._eventState &= ~_BroadcastSubscription._STATE_FIRING; | |
| 201 } | |
| 202 else { | |
| 203 link = subscription._next; | |
| 204 } | |
| 205 } | |
| 206 _state &= ~_STATE_FIRING; | |
| 207 if (_isEmpty) { | |
| 208 _callOnCancel(); | |
| 209 } | |
| 210 } | |
| 211 void _callOnCancel() { | |
| 212 assert (_isEmpty); if (isClosed && _doneFuture._mayComplete) { | |
| 213 _doneFuture._asyncComplete(null); | |
| 214 } | |
| 215 _runGuarded(_onCancel); | |
| 216 } | |
| 217 } | |
| 218 class _SyncBroadcastStreamController<T> extends _BroadcastStreamController<T> {
_SyncBroadcastStreamController(void onListen(), void onCancel()) : super(onListe
n, onCancel); | |
| 219 void _sendData(T data) { | |
| 220 if (_isEmpty) return; if (_hasOneListener) { | |
| 221 _state |= _BroadcastStreamController._STATE_FIRING; | |
| 222 _BroadcastSubscription subscription = DEVC$RT.cast(_next, _BroadcastSubscriptio
nLink, DEVC$RT.type((_BroadcastSubscription<dynamic> _) { | |
| 223 } | |
| 224 ), "AssignmentCast", """line 350, column 45 of dart:async/broadcast_stream_contr
oller.dart: """, _next is _BroadcastSubscription<dynamic>, true); | |
| 225 subscription._add(data); | |
| 226 _state &= ~_BroadcastStreamController._STATE_FIRING; | |
| 227 if (_isEmpty) { | |
| 228 _callOnCancel(); | |
| 229 } | |
| 230 return;} | |
| 231 _forEachListener((_BufferingStreamSubscription<T> subscription) { | |
| 232 subscription._add(data); | |
| 233 } | |
| 234 ); | |
| 235 } | |
| 236 void _sendError(Object error, StackTrace stackTrace) { | |
| 237 if (_isEmpty) return; _forEachListener((_BufferingStreamSubscription<T> subscrip
tion) { | |
| 238 subscription._addError(error, stackTrace); | |
| 239 } | |
| 240 ); | |
| 241 } | |
| 242 void _sendDone() { | |
| 243 if (!_isEmpty) { | |
| 244 _forEachListener(((__x7) => DEVC$RT.cast(__x7, DEVC$RT.type((__CastType5<T> _) { | |
| 245 } | |
| 246 ), DEVC$RT.type((__CastType3<T> _) { | |
| 247 } | |
| 248 ), "InferableClosure", """line 372, column 24 of dart:async/broadcast_stream_con
troller.dart: """, __x7 is __CastType3<T>, false))((_BroadcastSubscription<T> su
bscription) { | |
| 249 subscription._close(); | |
| 250 } | |
| 251 )); | |
| 252 } | |
| 253 else { | |
| 254 assert (_doneFuture != null); assert (_doneFuture._mayComplete); _doneFuture._as
yncComplete(null); | |
| 255 } | |
| 256 } | |
| 257 } | |
| 258 class _AsyncBroadcastStreamController<T> extends _BroadcastStreamController<T>
{_AsyncBroadcastStreamController(void onListen(), void onCancel()) : super(onLis
ten, onCancel); | |
| 259 void _sendData(T data) { | |
| 260 for (_BroadcastSubscriptionLink link = _next; !identical(link, this); link = lin
k._next) { | |
| 261 _BroadcastSubscription<T> subscription = DEVC$RT.cast(link, _BroadcastSubscripti
onLink, DEVC$RT.type((_BroadcastSubscription<T> _) { | |
| 262 } | |
| 263 ), "CompositeCast", """line 393, column 48 of dart:async/broadcast_stream_contro
ller.dart: """, link is _BroadcastSubscription<T>, false); | |
| 264 subscription._addPending(new _DelayedData(data)); | |
| 265 } | |
| 266 } | |
| 267 void _sendError(Object error, StackTrace stackTrace) { | |
| 268 for (_BroadcastSubscriptionLink link = _next; !identical(link, this); link = lin
k._next) { | |
| 269 _BroadcastSubscription<T> subscription = DEVC$RT.cast(link, _BroadcastSubscripti
onLink, DEVC$RT.type((_BroadcastSubscription<T> _) { | |
| 270 } | |
| 271 ), "CompositeCast", """line 402, column 48 of dart:async/broadcast_stream_contro
ller.dart: """, link is _BroadcastSubscription<T>, false); | |
| 272 subscription._addPending(new _DelayedError(error, stackTrace)); | |
| 273 } | |
| 274 } | |
| 275 void _sendDone() { | |
| 276 if (!_isEmpty) { | |
| 277 for (_BroadcastSubscriptionLink link = _next; !identical(link, this); link = lin
k._next) { | |
| 278 _BroadcastSubscription<T> subscription = DEVC$RT.cast(link, _BroadcastSubscripti
onLink, DEVC$RT.type((_BroadcastSubscription<T> _) { | |
| 279 } | |
| 280 ), "CompositeCast", """line 412, column 50 of dart:async/broadcast_stream_contro
ller.dart: """, link is _BroadcastSubscription<T>, false); | |
| 281 subscription._addPending(const _DelayedDone()); | |
| 282 } | |
| 283 } | |
| 284 else { | |
| 285 assert (_doneFuture != null); assert (_doneFuture._mayComplete); _doneFuture._as
yncComplete(null); | |
| 286 } | |
| 287 } | |
| 288 } | |
| 289 class _AsBroadcastStreamController<T> extends _SyncBroadcastStreamController<T>
implements _EventDispatch<T> {_StreamImplEvents _pending; | |
| 290 _AsBroadcastStreamController(void onListen(), void onCancel()) : super(onListen
, onCancel); | |
| 291 bool get _hasPending => _pending != null && !_pending.isEmpty; | |
| 292 void _addPendingEvent(_DelayedEvent event) { | |
| 293 if (_pending == null) { | |
| 294 _pending = new _StreamImplEvents(); | |
| 295 } | |
| 296 _pending.add(event); | |
| 297 } | |
| 298 void add(T data) { | |
| 299 if (!isClosed && _isFiring) { | |
| 300 _addPendingEvent(new _DelayedData<T>(data)); | |
| 301 return;} | |
| 302 super.add(data); | |
| 303 while (_hasPending) { | |
| 304 _pending.handleNext(this); | |
| 305 } | |
| 306 } | |
| 307 void addError(Object error, [StackTrace stackTrace]) { | |
| 308 if (!isClosed && _isFiring) { | |
| 309 _addPendingEvent(new _DelayedError(error, stackTrace)); | |
| 310 return;} | |
| 311 if (!_mayAddEvent) throw _addEventError(); | |
| 312 _sendError(error, stackTrace); | |
| 313 while (_hasPending) { | |
| 314 _pending.handleNext(this); | |
| 315 } | |
| 316 } | |
| 317 Future close() { | |
| 318 if (!isClosed && _isFiring) { | |
| 319 _addPendingEvent(const _DelayedDone()); | |
| 320 _state |= _BroadcastStreamController._STATE_CLOSED; | |
| 321 return super.done; | |
| 322 } | |
| 323 Future result = super.close(); | |
| 324 assert (!_hasPending); return result; | |
| 325 } | |
| 326 void _callOnCancel() { | |
| 327 if (_hasPending) { | |
| 328 _pending.clear(); | |
| 329 _pending = null; | |
| 330 } | |
| 331 super._callOnCancel(); | |
| 332 } | |
| 333 } | |
| 334 class _DoneSubscription<T> implements StreamSubscription<T> {int _pauseCount =
0; | |
| 335 void onData(void handleData(T data)) { | |
| 336 } | |
| 337 void onError(Function handleError) { | |
| 338 } | |
| 339 void onDone(void handleDone()) { | |
| 340 } | |
| 341 void pause([Future resumeSignal]) { | |
| 342 if (resumeSignal != null) resumeSignal.then(_resume); | |
| 343 _pauseCount++; | |
| 344 } | |
| 345 void resume() { | |
| 346 _resume(null); | |
| 347 } | |
| 348 void _resume(_) { | |
| 349 if (_pauseCount > 0) _pauseCount--; | |
| 350 } | |
| 351 Future cancel() { | |
| 352 return new _Future.immediate(null); | |
| 353 } | |
| 354 bool get isPaused => _pauseCount > 0; | |
| 355 Future asFuture([Object value]) => new _Future(); | |
| 356 } | |
| 357 typedef void __CastType3<T>(_BufferingStreamSubscription<T> __u4); | |
| 358 typedef dynamic __CastType5<T>(_BroadcastSubscription<T> __u6); | |
| OLD | NEW |