| OLD | NEW |
| (Empty) |
| 1 part of dart.async; | |
| 2 abstract class StreamController<T> implements StreamSink<T> {Stream<T> get stre
am; | |
| 3 factory StreamController({ | |
| 4 void onListen(), void onPause(), void onResume(), onCancel(), bool sync : fals
e} | |
| 5 ) { | |
| 6 if (onListen == null && onPause == null && onResume == null && onCancel == nul
l) { | |
| 7 return ((__x42) => DEVC$RT.cast(__x42, DEVC$RT.type((_StreamController<dynam
ic> _) { | |
| 8 } | |
| 9 ), DEVC$RT.type((StreamController<T> _) { | |
| 10 } | |
| 11 ), "CompositeCast", """line 83, column 14 of dart:async/stream_controller.da
rt: """, __x42 is StreamController<T>, false))(sync ? new _NoCallbackSyncStreamC
ontroller() : new _NoCallbackAsyncStreamController()); | |
| 12 } | |
| 13 return sync ? new _SyncStreamController<T>(onListen, onPause, onResume, onCan
cel) : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel); | |
| 14 } | |
| 15 factory StreamController.broadcast({ | |
| 16 void onListen(), void onCancel(), bool sync : false} | |
| 17 ) { | |
| 18 return sync ? new _SyncBroadcastStreamController<T>(onListen, onCancel) : new
_AsyncBroadcastStreamController<T>(onListen, onCancel); | |
| 19 } | |
| 20 StreamSink<T> get sink; | |
| 21 bool get isClosed; | |
| 22 bool get isPaused; | |
| 23 bool get hasListener; | |
| 24 void addError(Object error, [StackTrace stackTrace]); | |
| 25 Future addStream(Stream<T> source, { | |
| 26 bool cancelOnError : true} | |
| 27 ); | |
| 28 } | |
| 29 abstract class _StreamControllerLifecycle<T> {StreamSubscription<T> _subscribe(
void onData(T data), Function onError, void onDone(), bool cancelOnError); | |
| 30 void _recordPause(StreamSubscription<T> subscription) { | |
| 31 } | |
| 32 void _recordResume(StreamSubscription<T> subscription) { | |
| 33 } | |
| 34 Future _recordCancel(StreamSubscription<T> subscription) => null; | |
| 35 } | |
| 36 abstract class _StreamController<T> implements StreamController<T>, _StreamCont
rollerLifecycle<T>, _EventSink<T>, _EventDispatch<T> {static const int _STATE_IN
ITIAL = 0; | |
| 37 static const int _STATE_SUBSCRIBED = 1; | |
| 38 static const int _STATE_CANCELED = 2; | |
| 39 static const int _STATE_SUBSCRIPTION_MASK = 3; | |
| 40 static const int _STATE_CLOSED = 4; | |
| 41 static const int _STATE_ADDSTREAM = 8; | |
| 42 var _varData; | |
| 43 int _state = _STATE_INITIAL; | |
| 44 _Future _doneFuture; | |
| 45 _StreamController(); | |
| 46 _NotificationHandler get _onListen; | |
| 47 _NotificationHandler get _onPause; | |
| 48 _NotificationHandler get _onResume; | |
| 49 _NotificationHandler get _onCancel; | |
| 50 Stream<T> get stream => new _ControllerStream<T>(this); | |
| 51 StreamSink<T> get sink => new _StreamSinkWrapper<T>(this); | |
| 52 bool get _isCanceled => (_state & _STATE_CANCELED) != 0; | |
| 53 bool get hasListener => (_state & _STATE_SUBSCRIBED) != 0; | |
| 54 bool get _isInitialState => (_state & _STATE_SUBSCRIPTION_MASK) == _STATE_INITI
AL; | |
| 55 bool get isClosed => (_state & _STATE_CLOSED) != 0; | |
| 56 bool get isPaused => hasListener ? _subscription._isInputPaused : !_isCanceled; | |
| 57 bool get _isAddingStream => (_state & _STATE_ADDSTREAM) != 0; | |
| 58 bool get _mayAddEvent => (_state < _STATE_CLOSED); | |
| 59 _PendingEvents get _pendingEvents { | |
| 60 assert (_isInitialState); if (!_isAddingStream) { | |
| 61 return DEVC$RT.cast(_varData, dynamic, _PendingEvents, "DynamicCast", """line 33
4, column 14 of dart:async/stream_controller.dart: """, _varData is _PendingEven
ts, true); | |
| 62 } | |
| 63 _StreamControllerAddStreamState state = DEVC$RT.cast(_varData, dynamic, DEVC$RT
.type((_StreamControllerAddStreamState<dynamic> _) { | |
| 64 } | |
| 65 ), "DynamicCast", """line 336, column 45 of dart:async/stream_controller.dart: "
"", _varData is _StreamControllerAddStreamState<dynamic>, true); | |
| 66 return DEVC$RT.cast(state.varData, dynamic, _PendingEvents, "DynamicCast", """l
ine 337, column 12 of dart:async/stream_controller.dart: """, state.varData is _
PendingEvents, true); | |
| 67 } | |
| 68 _StreamImplEvents _ensurePendingEvents() { | |
| 69 assert (_isInitialState); if (!_isAddingStream) { | |
| 70 if (_varData == null) _varData = new _StreamImplEvents(); | |
| 71 return DEVC$RT.cast(_varData, dynamic, _StreamImplEvents, "DynamicCast", """lin
e 345, column 14 of dart:async/stream_controller.dart: """, _varData is _StreamI
mplEvents, true); | |
| 72 } | |
| 73 _StreamControllerAddStreamState state = DEVC$RT.cast(_varData, dynamic, DEVC$RT
.type((_StreamControllerAddStreamState<dynamic> _) { | |
| 74 } | |
| 75 ), "DynamicCast", """line 347, column 45 of dart:async/stream_controller.dart: "
"", _varData is _StreamControllerAddStreamState<dynamic>, true); | |
| 76 if (state.varData == null) state.varData = new _StreamImplEvents(); | |
| 77 return DEVC$RT.cast(state.varData, dynamic, _StreamImplEvents, "DynamicCast", "
""line 349, column 12 of dart:async/stream_controller.dart: """, state.varData i
s _StreamImplEvents, true); | |
| 78 } | |
| 79 _ControllerSubscription get _subscription { | |
| 80 assert (hasListener); if (_isAddingStream) { | |
| 81 _StreamControllerAddStreamState addState = DEVC$RT.cast(_varData, dynamic, DEVC$
RT.type((_StreamControllerAddStreamState<dynamic> _) { | |
| 82 } | |
| 83 ), "DynamicCast", """line 358, column 50 of dart:async/stream_controller.dart: "
"", _varData is _StreamControllerAddStreamState<dynamic>, true); | |
| 84 return DEVC$RT.cast(addState.varData, dynamic, DEVC$RT.type((_ControllerSubscri
ption<dynamic> _) { | |
| 85 } | |
| 86 ), "DynamicCast", """line 359, column 14 of dart:async/stream_controller.dart: "
"", addState.varData is _ControllerSubscription<dynamic>, true); | |
| 87 } | |
| 88 return DEVC$RT.cast(_varData, dynamic, DEVC$RT.type((_ControllerSubscription<dy
namic> _) { | |
| 89 } | |
| 90 ), "DynamicCast", """line 361, column 12 of dart:async/stream_controller.dart: "
"", _varData is _ControllerSubscription<dynamic>, true); | |
| 91 } | |
| 92 Error _badEventState() { | |
| 93 if (isClosed) { | |
| 94 return new StateError("Cannot add event after closing"); | |
| 95 } | |
| 96 assert (_isAddingStream); return new StateError("Cannot add event while adding
a stream"); | |
| 97 } | |
| 98 Future addStream(Stream<T> source, { | |
| 99 bool cancelOnError : true} | |
| 100 ) { | |
| 101 if (!_mayAddEvent) throw _badEventState(); | |
| 102 if (_isCanceled) return new _Future.immediate(null); | |
| 103 _StreamControllerAddStreamState addState = new _StreamControllerAddStreamState(
this, _varData, source, cancelOnError); | |
| 104 _varData = addState; | |
| 105 _state |= _STATE_ADDSTREAM; | |
| 106 return addState.addStreamFuture; | |
| 107 } | |
| 108 Future get done => _ensureDoneFuture(); | |
| 109 Future _ensureDoneFuture() { | |
| 110 if (_doneFuture == null) { | |
| 111 _doneFuture = _isCanceled ? Future._nullFuture : new _Future(); | |
| 112 } | |
| 113 return _doneFuture; | |
| 114 } | |
| 115 void add(T value) { | |
| 116 if (!_mayAddEvent) throw _badEventState(); | |
| 117 _add(value); | |
| 118 } | |
| 119 void addError(Object error, [StackTrace stackTrace]) { | |
| 120 error = _nonNullError(error); | |
| 121 if (!_mayAddEvent) throw _badEventState(); | |
| 122 AsyncError replacement = Zone.current.errorCallback(error, stackTrace); | |
| 123 if (replacement != null) { | |
| 124 error = _nonNullError(replacement.error); | |
| 125 stackTrace = replacement.stackTrace; | |
| 126 } | |
| 127 _addError(error, stackTrace); | |
| 128 } | |
| 129 Future close() { | |
| 130 if (isClosed) { | |
| 131 return _ensureDoneFuture(); | |
| 132 } | |
| 133 if (!_mayAddEvent) throw _badEventState(); | |
| 134 _closeUnchecked(); | |
| 135 return _ensureDoneFuture(); | |
| 136 } | |
| 137 void _closeUnchecked() { | |
| 138 _state |= _STATE_CLOSED; | |
| 139 if (hasListener) { | |
| 140 _sendDone(); | |
| 141 } | |
| 142 else if (_isInitialState) { | |
| 143 _ensurePendingEvents().add(const _DelayedDone()); | |
| 144 } | |
| 145 } | |
| 146 void _add(T value) { | |
| 147 if (hasListener) { | |
| 148 _sendData(value); | |
| 149 } | |
| 150 else if (_isInitialState) { | |
| 151 _ensurePendingEvents().add(new _DelayedData<T>(value)); | |
| 152 } | |
| 153 } | |
| 154 void _addError(Object error, StackTrace stackTrace) { | |
| 155 if (hasListener) { | |
| 156 _sendError(error, stackTrace); | |
| 157 } | |
| 158 else if (_isInitialState) { | |
| 159 _ensurePendingEvents().add(new _DelayedError(error, stackTrace)); | |
| 160 } | |
| 161 } | |
| 162 void _close() { | |
| 163 assert (_isAddingStream); _StreamControllerAddStreamState addState = DEVC$RT.cas
t(_varData, dynamic, DEVC$RT.type((_StreamControllerAddStreamState<dynamic> _) { | |
| 164 } | |
| 165 ), "DynamicCast", """line 482, column 48 of dart:async/stream_controller.dart: "
"", _varData is _StreamControllerAddStreamState<dynamic>, true); | |
| 166 _varData = addState.varData; | |
| 167 _state &= ~_STATE_ADDSTREAM; | |
| 168 addState.complete(); | |
| 169 } | |
| 170 StreamSubscription<T> _subscribe(void onData(T data), Function onError, void on
Done(), bool cancelOnError) { | |
| 171 if (!_isInitialState) { | |
| 172 throw new StateError("Stream has already been listened to."); | |
| 173 } | |
| 174 _ControllerSubscription subscription = new _ControllerSubscription(this, onData
, onError, onDone, cancelOnError); | |
| 175 _PendingEvents pendingEvents = _pendingEvents; | |
| 176 _state |= _STATE_SUBSCRIBED; | |
| 177 if (_isAddingStream) { | |
| 178 _StreamControllerAddStreamState addState = DEVC$RT.cast(_varData, dynamic, DEVC$
RT.type((_StreamControllerAddStreamState<dynamic> _) { | |
| 179 } | |
| 180 ), "DynamicCast", """line 505, column 50 of dart:async/stream_controller.dart: "
"", _varData is _StreamControllerAddStreamState<dynamic>, true); | |
| 181 addState.varData = subscription; | |
| 182 addState.resume(); | |
| 183 } | |
| 184 else { | |
| 185 _varData = subscription; | |
| 186 } | |
| 187 subscription._setPendingEvents(pendingEvents); | |
| 188 subscription._guardCallback(() { | |
| 189 _runGuarded(_onListen); | |
| 190 } | |
| 191 ); | |
| 192 return DEVC$RT.cast(subscription, DEVC$RT.type((_ControllerSubscription<dynamic
> _) { | |
| 193 } | |
| 194 ), DEVC$RT.type((StreamSubscription<T> _) { | |
| 195 } | |
| 196 ), "CompositeCast", """line 516, column 12 of dart:async/stream_controller.dart:
""", subscription is StreamSubscription<T>, false); | |
| 197 } | |
| 198 Future _recordCancel(StreamSubscription<T> subscription) { | |
| 199 Future result; | |
| 200 if (_isAddingStream) { | |
| 201 _StreamControllerAddStreamState addState = DEVC$RT.cast(_varData, dynamic, DEVC$
RT.type((_StreamControllerAddStreamState<dynamic> _) { | |
| 202 } | |
| 203 ), "DynamicCast", """line 530, column 50 of dart:async/stream_controller.dart: "
"", _varData is _StreamControllerAddStreamState<dynamic>, true); | |
| 204 result = addState.cancel(); | |
| 205 } | |
| 206 _varData = null; | |
| 207 _state = (_state & ~(_STATE_SUBSCRIBED | _STATE_ADDSTREAM)) | _STATE_CANCELED; | |
| 208 if (_onCancel != null) { | |
| 209 if (result == null) { | |
| 210 try { | |
| 211 result = ((__x43) => DEVC$RT.cast(__x43, dynamic, DEVC$RT.type((Future<dynam
ic> _) { | |
| 212 } | |
| 213 ), "DynamicCast", """line 542, column 20 of dart:async/stream_controller.dar
t: """, __x43 is Future<dynamic>, true))(_onCancel()); | |
| 214 } | |
| 215 catch (e, s) { | |
| 216 result = new _Future().._asyncCompleteError(e, s); | |
| 217 } | |
| 218 } | |
| 219 else { | |
| 220 result = result.whenComplete(_onCancel); | |
| 221 } | |
| 222 } | |
| 223 void complete() { | |
| 224 if (_doneFuture != null && _doneFuture._mayComplete) { | |
| 225 _doneFuture._asyncComplete(null); | |
| 226 } | |
| 227 } | |
| 228 if (result != null) { | |
| 229 result = result.whenComplete(complete); | |
| 230 } | |
| 231 else { | |
| 232 complete(); | |
| 233 } | |
| 234 return result; | |
| 235 } | |
| 236 void _recordPause(StreamSubscription<T> subscription) { | |
| 237 if (_isAddingStream) { | |
| 238 _StreamControllerAddStreamState addState = DEVC$RT.cast(_varData, dynamic, DEVC$
RT.type((_StreamControllerAddStreamState<dynamic> _) { | |
| 239 } | |
| 240 ), "DynamicCast", """line 572, column 50 of dart:async/stream_controller.dart: "
"", _varData is _StreamControllerAddStreamState<dynamic>, true); | |
| 241 addState.pause(); | |
| 242 } | |
| 243 _runGuarded(_onPause); | |
| 244 } | |
| 245 void _recordResume(StreamSubscription<T> subscription) { | |
| 246 if (_isAddingStream) { | |
| 247 _StreamControllerAddStreamState addState = DEVC$RT.cast(_varData, dynamic, DEVC$
RT.type((_StreamControllerAddStreamState<dynamic> _) { | |
| 248 } | |
| 249 ), "DynamicCast", """line 580, column 50 of dart:async/stream_controller.dart: "
"", _varData is _StreamControllerAddStreamState<dynamic>, true); | |
| 250 addState.resume(); | |
| 251 } | |
| 252 _runGuarded(_onResume); | |
| 253 } | |
| 254 } | |
| 255 abstract class _SyncStreamControllerDispatch<T> implements _StreamController<T>
{void _sendData(T data) { | |
| 256 _subscription._add(data); | |
| 257 } | |
| 258 void _sendError(Object error, StackTrace stackTrace) { | |
| 259 _subscription._addError(error, stackTrace); | |
| 260 } | |
| 261 void _sendDone() { | |
| 262 _subscription._close(); | |
| 263 } | |
| 264 } | |
| 265 abstract class _AsyncStreamControllerDispatch<T> implements _StreamController<T
> {void _sendData(T data) { | |
| 266 _subscription._addPending(new _DelayedData(data)); | |
| 267 } | |
| 268 void _sendError(Object error, StackTrace stackTrace) { | |
| 269 _subscription._addPending(new _DelayedError(error, stackTrace)); | |
| 270 } | |
| 271 void _sendDone() { | |
| 272 _subscription._addPending(const _DelayedDone()); | |
| 273 } | |
| 274 } | |
| 275 class _AsyncStreamController<T> extends _StreamController<T> with _AsyncStreamC
ontrollerDispatch<T> {final _NotificationHandler _onListen; | |
| 276 final _NotificationHandler _onPause; | |
| 277 final _NotificationHandler _onResume; | |
| 278 final _NotificationHandler _onCancel; | |
| 279 _AsyncStreamController(void this._onListen(), void this._onPause(), void this._
onResume(), this._onCancel()); | |
| 280 } | |
| 281 class _SyncStreamController<T> extends _StreamController<T> with _SyncStreamCon
trollerDispatch<T> {final _NotificationHandler _onListen; | |
| 282 final _NotificationHandler _onPause; | |
| 283 final _NotificationHandler _onResume; | |
| 284 final _NotificationHandler _onCancel; | |
| 285 _SyncStreamController(void this._onListen(), void this._onPause(), void this._o
nResume(), this._onCancel()); | |
| 286 } | |
| 287 abstract class _NoCallbacks {_NotificationHandler get _onListen => null; | |
| 288 _NotificationHandler get _onPause => null; | |
| 289 _NotificationHandler get _onResume => null; | |
| 290 _NotificationHandler get _onCancel => null; | |
| 291 } | |
| 292 class _NoCallbackAsyncStreamController = _StreamController with _AsyncStreamCon
trollerDispatch, _NoCallbacks; | |
| 293 class _NoCallbackSyncStreamController = _StreamController with _SyncStreamContr
ollerDispatch, _NoCallbacks; | |
| 294 typedef _NotificationHandler(); | |
| 295 Future _runGuarded(_NotificationHandler notificationHandler) { | |
| 296 if (notificationHandler == null) return null; | |
| 297 try { | |
| 298 var result = notificationHandler(); | |
| 299 if (result is Future) return DEVC$RT.cast(result, dynamic, DEVC$RT.type((Future
<dynamic> _) { | |
| 300 } | |
| 301 ), "DynamicCast", """line 665, column 34 of dart:async/stream_controller.dart: "
"", result is Future<dynamic>, true); | |
| 302 return null; | |
| 303 } | |
| 304 catch (e, s) { | |
| 305 Zone.current.handleUncaughtError(e, s); | |
| 306 } | |
| 307 } | |
| 308 class _ControllerStream<T> extends _StreamImpl<T> {_StreamControllerLifecycle<T
> _controller; | |
| 309 _ControllerStream(this._controller); | |
| 310 StreamSubscription<T> _createSubscription(void onData(T data), Function onError
, void onDone(), bool cancelOnError) => _controller._subscribe(onData, onError,
onDone, cancelOnError); | |
| 311 int get hashCode => _controller.hashCode ^ 0x35323532; | |
| 312 bool operator ==(Object other) { | |
| 313 if (identical(this, other)) return true; | |
| 314 if (other is! _ControllerStream) return false; | |
| 315 _ControllerStream otherStream = DEVC$RT.cast(other, Object, DEVC$RT.type((_Cont
rollerStream<dynamic> _) { | |
| 316 } | |
| 317 ), "AssignmentCast", """line 693, column 37 of dart:async/stream_controller.dart
: """, other is _ControllerStream<dynamic>, true); | |
| 318 return identical(otherStream._controller, this._controller); | |
| 319 } | |
| 320 } | |
| 321 class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> {final
_StreamControllerLifecycle<T> _controller; | |
| 322 _ControllerSubscription(this._controller, void onData(T data), Function onError
, void onDone(), bool cancelOnError) : super(onData, onError, onDone, cancelOnEr
ror); | |
| 323 Future _onCancel() { | |
| 324 return _controller._recordCancel(this); | |
| 325 } | |
| 326 void _onPause() { | |
| 327 _controller._recordPause(this); | |
| 328 } | |
| 329 void _onResume() { | |
| 330 _controller._recordResume(this); | |
| 331 } | |
| 332 } | |
| 333 class _StreamSinkWrapper<T> implements StreamSink<T> {final StreamController _t
arget; | |
| 334 _StreamSinkWrapper(this._target); | |
| 335 void add(T data) { | |
| 336 _target.add(data); | |
| 337 } | |
| 338 void addError(Object error, [StackTrace stackTrace]) { | |
| 339 _target.addError(error, stackTrace); | |
| 340 } | |
| 341 Future close() => _target.close(); | |
| 342 Future addStream(Stream<T> source, { | |
| 343 bool cancelOnError : true} | |
| 344 ) => _target.addStream(source, cancelOnError: cancelOnError); | |
| 345 Future get done => _target.done; | |
| 346 } | |
| 347 class _AddStreamState<T> {final _Future addStreamFuture; | |
| 348 final StreamSubscription addSubscription; | |
| 349 _AddStreamState(_EventSink<T> controller, Stream source, bool cancelOnError) :
addStreamFuture = new _Future(), addSubscription = source.listen(controller._add
, onError: cancelOnError ? ((__x44) => DEVC$RT.cast(__x44, dynamic, Function, "D
ynamicCast", """line 747, column 48 of dart:async/stream_controller.dart: """, _
_x44 is Function, true))(makeErrorHandler(controller)) : controller._addError, o
nDone: controller._close, cancelOnError: cancelOnError); | |
| 350 static makeErrorHandler(_EventSink controller) => (e, StackTrace s) { | |
| 351 controller._addError(e, s); | |
| 352 controller._close(); | |
| 353 } | |
| 354 ; | |
| 355 void pause() { | |
| 356 addSubscription.pause(); | |
| 357 } | |
| 358 void resume() { | |
| 359 addSubscription.resume(); | |
| 360 } | |
| 361 Future cancel() { | |
| 362 var cancel = addSubscription.cancel(); | |
| 363 if (cancel == null) { | |
| 364 addStreamFuture._asyncComplete(null); | |
| 365 return null; | |
| 366 } | |
| 367 return cancel.whenComplete(() { | |
| 368 addStreamFuture._asyncComplete(null); | |
| 369 } | |
| 370 ); | |
| 371 } | |
| 372 void complete() { | |
| 373 addStreamFuture._asyncComplete(null); | |
| 374 } | |
| 375 } | |
| 376 class _StreamControllerAddStreamState<T> extends _AddStreamState<T> {var varDat
a; | |
| 377 _StreamControllerAddStreamState(_StreamController controller, this.varData, Str
eam source, bool cancelOnError) : super(DEVC$RT.cast(controller, DEVC$RT.type((_
StreamController<dynamic> _) { | |
| 378 } | |
| 379 ), DEVC$RT.type((_EventSink<T> _) { | |
| 380 } | |
| 381 ), "CompositeCast", """line 798, column 15 of dart:async/stream_controller.dart:
""", controller is _EventSink<T>, false), source, cancelOnError) { | |
| 382 if (controller.isPaused) { | |
| 383 addSubscription.pause(); | |
| 384 } | |
| 385 } | |
| 386 } | |
| OLD | NEW |