OLD | NEW |
| (Empty) |
1 part of dart.async; | |
2 abstract class StreamController<T> implements StreamSink<T> {Stream<T> get stre
am; | |
3 factory StreamController({ | |
4 void onListen(), void onPause(), void onResume(), onCancel(), bool sync : fals
e} | |
5 ) { | |
6 if (onListen == null && onPause == null && onResume == null && onCancel == nul
l) { | |
7 return ((__x42) => DEVC$RT.cast(__x42, DEVC$RT.type((_StreamController<dynam
ic> _) { | |
8 } | |
9 ), DEVC$RT.type((StreamController<T> _) { | |
10 } | |
11 ), "CompositeCast", """line 83, column 14 of dart:async/stream_controller.da
rt: """, __x42 is StreamController<T>, false))(sync ? new _NoCallbackSyncStreamC
ontroller() : new _NoCallbackAsyncStreamController()); | |
12 } | |
13 return sync ? new _SyncStreamController<T>(onListen, onPause, onResume, onCan
cel) : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel); | |
14 } | |
15 factory StreamController.broadcast({ | |
16 void onListen(), void onCancel(), bool sync : false} | |
17 ) { | |
18 return sync ? new _SyncBroadcastStreamController<T>(onListen, onCancel) : new
_AsyncBroadcastStreamController<T>(onListen, onCancel); | |
19 } | |
20 StreamSink<T> get sink; | |
21 bool get isClosed; | |
22 bool get isPaused; | |
23 bool get hasListener; | |
24 void addError(Object error, [StackTrace stackTrace]); | |
25 Future addStream(Stream<T> source, { | |
26 bool cancelOnError : true} | |
27 ); | |
28 } | |
29 abstract class _StreamControllerLifecycle<T> {StreamSubscription<T> _subscribe(
void onData(T data), Function onError, void onDone(), bool cancelOnError); | |
30 void _recordPause(StreamSubscription<T> subscription) { | |
31 } | |
32 void _recordResume(StreamSubscription<T> subscription) { | |
33 } | |
34 Future _recordCancel(StreamSubscription<T> subscription) => null; | |
35 } | |
36 abstract class _StreamController<T> implements StreamController<T>, _StreamCont
rollerLifecycle<T>, _EventSink<T>, _EventDispatch<T> {static const int _STATE_IN
ITIAL = 0; | |
37 static const int _STATE_SUBSCRIBED = 1; | |
38 static const int _STATE_CANCELED = 2; | |
39 static const int _STATE_SUBSCRIPTION_MASK = 3; | |
40 static const int _STATE_CLOSED = 4; | |
41 static const int _STATE_ADDSTREAM = 8; | |
42 var _varData; | |
43 int _state = _STATE_INITIAL; | |
44 _Future _doneFuture; | |
45 _StreamController(); | |
46 _NotificationHandler get _onListen; | |
47 _NotificationHandler get _onPause; | |
48 _NotificationHandler get _onResume; | |
49 _NotificationHandler get _onCancel; | |
50 Stream<T> get stream => new _ControllerStream<T>(this); | |
51 StreamSink<T> get sink => new _StreamSinkWrapper<T>(this); | |
52 bool get _isCanceled => (_state & _STATE_CANCELED) != 0; | |
53 bool get hasListener => (_state & _STATE_SUBSCRIBED) != 0; | |
54 bool get _isInitialState => (_state & _STATE_SUBSCRIPTION_MASK) == _STATE_INITI
AL; | |
55 bool get isClosed => (_state & _STATE_CLOSED) != 0; | |
56 bool get isPaused => hasListener ? _subscription._isInputPaused : !_isCanceled; | |
57 bool get _isAddingStream => (_state & _STATE_ADDSTREAM) != 0; | |
58 bool get _mayAddEvent => (_state < _STATE_CLOSED); | |
59 _PendingEvents get _pendingEvents { | |
60 assert (_isInitialState); if (!_isAddingStream) { | |
61 return DEVC$RT.cast(_varData, dynamic, _PendingEvents, "DynamicCast", """line 33
4, column 14 of dart:async/stream_controller.dart: """, _varData is _PendingEven
ts, true); | |
62 } | |
63 _StreamControllerAddStreamState state = DEVC$RT.cast(_varData, dynamic, DEVC$RT
.type((_StreamControllerAddStreamState<dynamic> _) { | |
64 } | |
65 ), "DynamicCast", """line 336, column 45 of dart:async/stream_controller.dart: "
"", _varData is _StreamControllerAddStreamState<dynamic>, true); | |
66 return DEVC$RT.cast(state.varData, dynamic, _PendingEvents, "DynamicCast", """l
ine 337, column 12 of dart:async/stream_controller.dart: """, state.varData is _
PendingEvents, true); | |
67 } | |
68 _StreamImplEvents _ensurePendingEvents() { | |
69 assert (_isInitialState); if (!_isAddingStream) { | |
70 if (_varData == null) _varData = new _StreamImplEvents(); | |
71 return DEVC$RT.cast(_varData, dynamic, _StreamImplEvents, "DynamicCast", """lin
e 345, column 14 of dart:async/stream_controller.dart: """, _varData is _StreamI
mplEvents, true); | |
72 } | |
73 _StreamControllerAddStreamState state = DEVC$RT.cast(_varData, dynamic, DEVC$RT
.type((_StreamControllerAddStreamState<dynamic> _) { | |
74 } | |
75 ), "DynamicCast", """line 347, column 45 of dart:async/stream_controller.dart: "
"", _varData is _StreamControllerAddStreamState<dynamic>, true); | |
76 if (state.varData == null) state.varData = new _StreamImplEvents(); | |
77 return DEVC$RT.cast(state.varData, dynamic, _StreamImplEvents, "DynamicCast", "
""line 349, column 12 of dart:async/stream_controller.dart: """, state.varData i
s _StreamImplEvents, true); | |
78 } | |
79 _ControllerSubscription get _subscription { | |
80 assert (hasListener); if (_isAddingStream) { | |
81 _StreamControllerAddStreamState addState = DEVC$RT.cast(_varData, dynamic, DEVC$
RT.type((_StreamControllerAddStreamState<dynamic> _) { | |
82 } | |
83 ), "DynamicCast", """line 358, column 50 of dart:async/stream_controller.dart: "
"", _varData is _StreamControllerAddStreamState<dynamic>, true); | |
84 return DEVC$RT.cast(addState.varData, dynamic, DEVC$RT.type((_ControllerSubscri
ption<dynamic> _) { | |
85 } | |
86 ), "DynamicCast", """line 359, column 14 of dart:async/stream_controller.dart: "
"", addState.varData is _ControllerSubscription<dynamic>, true); | |
87 } | |
88 return DEVC$RT.cast(_varData, dynamic, DEVC$RT.type((_ControllerSubscription<dy
namic> _) { | |
89 } | |
90 ), "DynamicCast", """line 361, column 12 of dart:async/stream_controller.dart: "
"", _varData is _ControllerSubscription<dynamic>, true); | |
91 } | |
92 Error _badEventState() { | |
93 if (isClosed) { | |
94 return new StateError("Cannot add event after closing"); | |
95 } | |
96 assert (_isAddingStream); return new StateError("Cannot add event while adding
a stream"); | |
97 } | |
98 Future addStream(Stream<T> source, { | |
99 bool cancelOnError : true} | |
100 ) { | |
101 if (!_mayAddEvent) throw _badEventState(); | |
102 if (_isCanceled) return new _Future.immediate(null); | |
103 _StreamControllerAddStreamState addState = new _StreamControllerAddStreamState(
this, _varData, source, cancelOnError); | |
104 _varData = addState; | |
105 _state |= _STATE_ADDSTREAM; | |
106 return addState.addStreamFuture; | |
107 } | |
108 Future get done => _ensureDoneFuture(); | |
109 Future _ensureDoneFuture() { | |
110 if (_doneFuture == null) { | |
111 _doneFuture = _isCanceled ? Future._nullFuture : new _Future(); | |
112 } | |
113 return _doneFuture; | |
114 } | |
115 void add(T value) { | |
116 if (!_mayAddEvent) throw _badEventState(); | |
117 _add(value); | |
118 } | |
119 void addError(Object error, [StackTrace stackTrace]) { | |
120 error = _nonNullError(error); | |
121 if (!_mayAddEvent) throw _badEventState(); | |
122 AsyncError replacement = Zone.current.errorCallback(error, stackTrace); | |
123 if (replacement != null) { | |
124 error = _nonNullError(replacement.error); | |
125 stackTrace = replacement.stackTrace; | |
126 } | |
127 _addError(error, stackTrace); | |
128 } | |
129 Future close() { | |
130 if (isClosed) { | |
131 return _ensureDoneFuture(); | |
132 } | |
133 if (!_mayAddEvent) throw _badEventState(); | |
134 _closeUnchecked(); | |
135 return _ensureDoneFuture(); | |
136 } | |
137 void _closeUnchecked() { | |
138 _state |= _STATE_CLOSED; | |
139 if (hasListener) { | |
140 _sendDone(); | |
141 } | |
142 else if (_isInitialState) { | |
143 _ensurePendingEvents().add(const _DelayedDone()); | |
144 } | |
145 } | |
146 void _add(T value) { | |
147 if (hasListener) { | |
148 _sendData(value); | |
149 } | |
150 else if (_isInitialState) { | |
151 _ensurePendingEvents().add(new _DelayedData<T>(value)); | |
152 } | |
153 } | |
154 void _addError(Object error, StackTrace stackTrace) { | |
155 if (hasListener) { | |
156 _sendError(error, stackTrace); | |
157 } | |
158 else if (_isInitialState) { | |
159 _ensurePendingEvents().add(new _DelayedError(error, stackTrace)); | |
160 } | |
161 } | |
162 void _close() { | |
163 assert (_isAddingStream); _StreamControllerAddStreamState addState = DEVC$RT.cas
t(_varData, dynamic, DEVC$RT.type((_StreamControllerAddStreamState<dynamic> _) { | |
164 } | |
165 ), "DynamicCast", """line 482, column 48 of dart:async/stream_controller.dart: "
"", _varData is _StreamControllerAddStreamState<dynamic>, true); | |
166 _varData = addState.varData; | |
167 _state &= ~_STATE_ADDSTREAM; | |
168 addState.complete(); | |
169 } | |
170 StreamSubscription<T> _subscribe(void onData(T data), Function onError, void on
Done(), bool cancelOnError) { | |
171 if (!_isInitialState) { | |
172 throw new StateError("Stream has already been listened to."); | |
173 } | |
174 _ControllerSubscription subscription = new _ControllerSubscription(this, onData
, onError, onDone, cancelOnError); | |
175 _PendingEvents pendingEvents = _pendingEvents; | |
176 _state |= _STATE_SUBSCRIBED; | |
177 if (_isAddingStream) { | |
178 _StreamControllerAddStreamState addState = DEVC$RT.cast(_varData, dynamic, DEVC$
RT.type((_StreamControllerAddStreamState<dynamic> _) { | |
179 } | |
180 ), "DynamicCast", """line 505, column 50 of dart:async/stream_controller.dart: "
"", _varData is _StreamControllerAddStreamState<dynamic>, true); | |
181 addState.varData = subscription; | |
182 addState.resume(); | |
183 } | |
184 else { | |
185 _varData = subscription; | |
186 } | |
187 subscription._setPendingEvents(pendingEvents); | |
188 subscription._guardCallback(() { | |
189 _runGuarded(_onListen); | |
190 } | |
191 ); | |
192 return DEVC$RT.cast(subscription, DEVC$RT.type((_ControllerSubscription<dynamic
> _) { | |
193 } | |
194 ), DEVC$RT.type((StreamSubscription<T> _) { | |
195 } | |
196 ), "CompositeCast", """line 516, column 12 of dart:async/stream_controller.dart:
""", subscription is StreamSubscription<T>, false); | |
197 } | |
198 Future _recordCancel(StreamSubscription<T> subscription) { | |
199 Future result; | |
200 if (_isAddingStream) { | |
201 _StreamControllerAddStreamState addState = DEVC$RT.cast(_varData, dynamic, DEVC$
RT.type((_StreamControllerAddStreamState<dynamic> _) { | |
202 } | |
203 ), "DynamicCast", """line 530, column 50 of dart:async/stream_controller.dart: "
"", _varData is _StreamControllerAddStreamState<dynamic>, true); | |
204 result = addState.cancel(); | |
205 } | |
206 _varData = null; | |
207 _state = (_state & ~(_STATE_SUBSCRIBED | _STATE_ADDSTREAM)) | _STATE_CANCELED; | |
208 if (_onCancel != null) { | |
209 if (result == null) { | |
210 try { | |
211 result = ((__x43) => DEVC$RT.cast(__x43, dynamic, DEVC$RT.type((Future<dynam
ic> _) { | |
212 } | |
213 ), "DynamicCast", """line 542, column 20 of dart:async/stream_controller.dar
t: """, __x43 is Future<dynamic>, true))(_onCancel()); | |
214 } | |
215 catch (e, s) { | |
216 result = new _Future().._asyncCompleteError(e, s); | |
217 } | |
218 } | |
219 else { | |
220 result = result.whenComplete(_onCancel); | |
221 } | |
222 } | |
223 void complete() { | |
224 if (_doneFuture != null && _doneFuture._mayComplete) { | |
225 _doneFuture._asyncComplete(null); | |
226 } | |
227 } | |
228 if (result != null) { | |
229 result = result.whenComplete(complete); | |
230 } | |
231 else { | |
232 complete(); | |
233 } | |
234 return result; | |
235 } | |
236 void _recordPause(StreamSubscription<T> subscription) { | |
237 if (_isAddingStream) { | |
238 _StreamControllerAddStreamState addState = DEVC$RT.cast(_varData, dynamic, DEVC$
RT.type((_StreamControllerAddStreamState<dynamic> _) { | |
239 } | |
240 ), "DynamicCast", """line 572, column 50 of dart:async/stream_controller.dart: "
"", _varData is _StreamControllerAddStreamState<dynamic>, true); | |
241 addState.pause(); | |
242 } | |
243 _runGuarded(_onPause); | |
244 } | |
245 void _recordResume(StreamSubscription<T> subscription) { | |
246 if (_isAddingStream) { | |
247 _StreamControllerAddStreamState addState = DEVC$RT.cast(_varData, dynamic, DEVC$
RT.type((_StreamControllerAddStreamState<dynamic> _) { | |
248 } | |
249 ), "DynamicCast", """line 580, column 50 of dart:async/stream_controller.dart: "
"", _varData is _StreamControllerAddStreamState<dynamic>, true); | |
250 addState.resume(); | |
251 } | |
252 _runGuarded(_onResume); | |
253 } | |
254 } | |
255 abstract class _SyncStreamControllerDispatch<T> implements _StreamController<T>
{void _sendData(T data) { | |
256 _subscription._add(data); | |
257 } | |
258 void _sendError(Object error, StackTrace stackTrace) { | |
259 _subscription._addError(error, stackTrace); | |
260 } | |
261 void _sendDone() { | |
262 _subscription._close(); | |
263 } | |
264 } | |
265 abstract class _AsyncStreamControllerDispatch<T> implements _StreamController<T
> {void _sendData(T data) { | |
266 _subscription._addPending(new _DelayedData(data)); | |
267 } | |
268 void _sendError(Object error, StackTrace stackTrace) { | |
269 _subscription._addPending(new _DelayedError(error, stackTrace)); | |
270 } | |
271 void _sendDone() { | |
272 _subscription._addPending(const _DelayedDone()); | |
273 } | |
274 } | |
275 class _AsyncStreamController<T> extends _StreamController<T> with _AsyncStreamC
ontrollerDispatch<T> {final _NotificationHandler _onListen; | |
276 final _NotificationHandler _onPause; | |
277 final _NotificationHandler _onResume; | |
278 final _NotificationHandler _onCancel; | |
279 _AsyncStreamController(void this._onListen(), void this._onPause(), void this._
onResume(), this._onCancel()); | |
280 } | |
281 class _SyncStreamController<T> extends _StreamController<T> with _SyncStreamCon
trollerDispatch<T> {final _NotificationHandler _onListen; | |
282 final _NotificationHandler _onPause; | |
283 final _NotificationHandler _onResume; | |
284 final _NotificationHandler _onCancel; | |
285 _SyncStreamController(void this._onListen(), void this._onPause(), void this._o
nResume(), this._onCancel()); | |
286 } | |
287 abstract class _NoCallbacks {_NotificationHandler get _onListen => null; | |
288 _NotificationHandler get _onPause => null; | |
289 _NotificationHandler get _onResume => null; | |
290 _NotificationHandler get _onCancel => null; | |
291 } | |
292 class _NoCallbackAsyncStreamController = _StreamController with _AsyncStreamCon
trollerDispatch, _NoCallbacks; | |
293 class _NoCallbackSyncStreamController = _StreamController with _SyncStreamContr
ollerDispatch, _NoCallbacks; | |
294 typedef _NotificationHandler(); | |
295 Future _runGuarded(_NotificationHandler notificationHandler) { | |
296 if (notificationHandler == null) return null; | |
297 try { | |
298 var result = notificationHandler(); | |
299 if (result is Future) return DEVC$RT.cast(result, dynamic, DEVC$RT.type((Future
<dynamic> _) { | |
300 } | |
301 ), "DynamicCast", """line 665, column 34 of dart:async/stream_controller.dart: "
"", result is Future<dynamic>, true); | |
302 return null; | |
303 } | |
304 catch (e, s) { | |
305 Zone.current.handleUncaughtError(e, s); | |
306 } | |
307 } | |
308 class _ControllerStream<T> extends _StreamImpl<T> {_StreamControllerLifecycle<T
> _controller; | |
309 _ControllerStream(this._controller); | |
310 StreamSubscription<T> _createSubscription(void onData(T data), Function onError
, void onDone(), bool cancelOnError) => _controller._subscribe(onData, onError,
onDone, cancelOnError); | |
311 int get hashCode => _controller.hashCode ^ 0x35323532; | |
312 bool operator ==(Object other) { | |
313 if (identical(this, other)) return true; | |
314 if (other is! _ControllerStream) return false; | |
315 _ControllerStream otherStream = DEVC$RT.cast(other, Object, DEVC$RT.type((_Cont
rollerStream<dynamic> _) { | |
316 } | |
317 ), "AssignmentCast", """line 693, column 37 of dart:async/stream_controller.dart
: """, other is _ControllerStream<dynamic>, true); | |
318 return identical(otherStream._controller, this._controller); | |
319 } | |
320 } | |
321 class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> {final
_StreamControllerLifecycle<T> _controller; | |
322 _ControllerSubscription(this._controller, void onData(T data), Function onError
, void onDone(), bool cancelOnError) : super(onData, onError, onDone, cancelOnEr
ror); | |
323 Future _onCancel() { | |
324 return _controller._recordCancel(this); | |
325 } | |
326 void _onPause() { | |
327 _controller._recordPause(this); | |
328 } | |
329 void _onResume() { | |
330 _controller._recordResume(this); | |
331 } | |
332 } | |
333 class _StreamSinkWrapper<T> implements StreamSink<T> {final StreamController _t
arget; | |
334 _StreamSinkWrapper(this._target); | |
335 void add(T data) { | |
336 _target.add(data); | |
337 } | |
338 void addError(Object error, [StackTrace stackTrace]) { | |
339 _target.addError(error, stackTrace); | |
340 } | |
341 Future close() => _target.close(); | |
342 Future addStream(Stream<T> source, { | |
343 bool cancelOnError : true} | |
344 ) => _target.addStream(source, cancelOnError: cancelOnError); | |
345 Future get done => _target.done; | |
346 } | |
347 class _AddStreamState<T> {final _Future addStreamFuture; | |
348 final StreamSubscription addSubscription; | |
349 _AddStreamState(_EventSink<T> controller, Stream source, bool cancelOnError) :
addStreamFuture = new _Future(), addSubscription = source.listen(controller._add
, onError: cancelOnError ? ((__x44) => DEVC$RT.cast(__x44, dynamic, Function, "D
ynamicCast", """line 747, column 48 of dart:async/stream_controller.dart: """, _
_x44 is Function, true))(makeErrorHandler(controller)) : controller._addError, o
nDone: controller._close, cancelOnError: cancelOnError); | |
350 static makeErrorHandler(_EventSink controller) => (e, StackTrace s) { | |
351 controller._addError(e, s); | |
352 controller._close(); | |
353 } | |
354 ; | |
355 void pause() { | |
356 addSubscription.pause(); | |
357 } | |
358 void resume() { | |
359 addSubscription.resume(); | |
360 } | |
361 Future cancel() { | |
362 var cancel = addSubscription.cancel(); | |
363 if (cancel == null) { | |
364 addStreamFuture._asyncComplete(null); | |
365 return null; | |
366 } | |
367 return cancel.whenComplete(() { | |
368 addStreamFuture._asyncComplete(null); | |
369 } | |
370 ); | |
371 } | |
372 void complete() { | |
373 addStreamFuture._asyncComplete(null); | |
374 } | |
375 } | |
376 class _StreamControllerAddStreamState<T> extends _AddStreamState<T> {var varDat
a; | |
377 _StreamControllerAddStreamState(_StreamController controller, this.varData, Str
eam source, bool cancelOnError) : super(DEVC$RT.cast(controller, DEVC$RT.type((_
StreamController<dynamic> _) { | |
378 } | |
379 ), DEVC$RT.type((_EventSink<T> _) { | |
380 } | |
381 ), "CompositeCast", """line 798, column 15 of dart:async/stream_controller.dart:
""", controller is _EventSink<T>, false), source, cancelOnError) { | |
382 if (controller.isPaused) { | |
383 addSubscription.pause(); | |
384 } | |
385 } | |
386 } | |
OLD | NEW |