OLD | NEW |
| (Empty) |
1 part of dart.async; | |
2 class _BroadcastStream<T> extends _ControllerStream<T> {_BroadcastStream(_Strea
mControllerLifecycle controller) : super(DEVC$RT.cast(controller, DEVC$RT.type((
_StreamControllerLifecycle<dynamic> _) { | |
3 } | |
4 ), DEVC$RT.type((_StreamControllerLifecycle<T> _) { | |
5 } | |
6 ), "CompositeCast", """line 8, column 67 of dart:async/broadcast_stream_controll
er.dart: """, controller is _StreamControllerLifecycle<T>, false)); | |
7 bool get isBroadcast => true; | |
8 } | |
9 abstract class _BroadcastSubscriptionLink {_BroadcastSubscriptionLink _next; | |
10 _BroadcastSubscriptionLink _previous; | |
11 } | |
12 class _BroadcastSubscription<T> extends _ControllerSubscription<T> implements _
BroadcastSubscriptionLink {static const int _STATE_EVENT_ID = 1; | |
13 static const int _STATE_FIRING = 2; | |
14 static const int _STATE_REMOVE_AFTER_FIRING = 4; | |
15 int _eventState; | |
16 _BroadcastSubscriptionLink _next; | |
17 _BroadcastSubscriptionLink _previous; | |
18 _BroadcastSubscription(_StreamControllerLifecycle controller, void onData(T dat
a), Function onError, void onDone(), bool cancelOnError) : super(DEVC$RT.cast(co
ntroller, DEVC$RT.type((_StreamControllerLifecycle<dynamic> _) { | |
19 } | |
20 ), DEVC$RT.type((_StreamControllerLifecycle<T> _) { | |
21 } | |
22 ), "CompositeCast", """line 36, column 15 of dart:async/broadcast_stream_control
ler.dart: """, controller is _StreamControllerLifecycle<T>, false), onData, onEr
ror, onDone, cancelOnError) { | |
23 _next = _previous = this; | |
24 } | |
25 _BroadcastStreamController<T> get _controller => ((__x2) => DEVC$RT.cast(__x2,
DEVC$RT.type((_StreamControllerLifecycle<T> _) { | |
26 } | |
27 ), DEVC$RT.type((_BroadcastStreamController<T> _) { | |
28 } | |
29 ), "CompositeCast", """line 40, column 52 of dart:async/broadcast_stream_control
ler.dart: """, __x2 is _BroadcastStreamController<T>, false))(super._controller)
; | |
30 bool _expectsEvent(int eventId) => (_eventState & _STATE_EVENT_ID) == eventId; | |
31 void _toggleEventId() { | |
32 _eventState ^= _STATE_EVENT_ID; | |
33 } | |
34 bool get _isFiring => (_eventState & _STATE_FIRING) != 0; | |
35 void _setRemoveAfterFiring() { | |
36 assert (_isFiring); _eventState |= _STATE_REMOVE_AFTER_FIRING; | |
37 } | |
38 bool get _removeAfterFiring => (_eventState & _STATE_REMOVE_AFTER_FIRING) != 0; | |
39 void _onPause() { | |
40 } | |
41 void _onResume() { | |
42 } | |
43 } | |
44 abstract class _BroadcastStreamController<T> implements StreamController<T>, _S
treamControllerLifecycle<T>, _BroadcastSubscriptionLink, _EventSink<T>, _EventDi
spatch<T> {static const int _STATE_INITIAL = 0; | |
45 static const int _STATE_EVENT_ID = 1; | |
46 static const int _STATE_FIRING = 2; | |
47 static const int _STATE_CLOSED = 4; | |
48 static const int _STATE_ADDSTREAM = 8; | |
49 final _NotificationHandler _onListen; | |
50 final _NotificationHandler _onCancel; | |
51 int _state; | |
52 _BroadcastSubscriptionLink _next; | |
53 _BroadcastSubscriptionLink _previous; | |
54 _AddStreamState<T> _addStreamState; | |
55 _Future _doneFuture; | |
56 _BroadcastStreamController(this._onListen, this._onCancel) : _state = _STATE_IN
ITIAL { | |
57 _next = _previous = this; | |
58 } | |
59 Stream<T> get stream => new _BroadcastStream<T>(this); | |
60 StreamSink<T> get sink => new _StreamSinkWrapper<T>(this); | |
61 bool get isClosed => (_state & _STATE_CLOSED) != 0; | |
62 bool get isPaused => false; | |
63 bool get hasListener => !_isEmpty; | |
64 bool get _hasOneListener { | |
65 assert (!_isEmpty); return identical(_next._next, this); | |
66 } | |
67 bool get _isFiring => (_state & _STATE_FIRING) != 0; | |
68 bool get _isAddingStream => (_state & _STATE_ADDSTREAM) != 0; | |
69 bool get _mayAddEvent => (_state < _STATE_CLOSED); | |
70 _Future _ensureDoneFuture() { | |
71 if (_doneFuture != null) return _doneFuture; | |
72 return _doneFuture = new _Future(); | |
73 } | |
74 bool get _isEmpty => identical(_next, this); | |
75 void _addListener(_BroadcastSubscription<T> subscription) { | |
76 assert (identical(subscription._next, subscription)); subscription._previous = _
previous; | |
77 subscription._next = this; | |
78 this._previous._next = subscription; | |
79 this._previous = subscription; | |
80 subscription._eventState = (_state & _STATE_EVENT_ID); | |
81 } | |
82 void _removeListener(_BroadcastSubscription<T> subscription) { | |
83 assert (identical(subscription._controller, this)); assert (!identical(subscript
ion._next, subscription)); _BroadcastSubscriptionLink previous = subscription._p
revious; | |
84 _BroadcastSubscriptionLink next = subscription._next; | |
85 previous._next = next; | |
86 next._previous = previous; | |
87 subscription._next = subscription._previous = subscription; | |
88 } | |
89 StreamSubscription<T> _subscribe(void onData(T data), Function onError, void on
Done(), bool cancelOnError) { | |
90 if (isClosed) { | |
91 if (onDone == null) onDone = _nullDoneHandler; | |
92 return new _DoneStreamSubscription<T>(onDone); | |
93 } | |
94 StreamSubscription subscription = new _BroadcastSubscription<T>(this, onData, o
nError, onDone, cancelOnError); | |
95 _addListener(DEVC$RT.cast(subscription, DEVC$RT.type((StreamSubscription<dynami
c> _) { | |
96 } | |
97 ), DEVC$RT.type((_BroadcastSubscription<T> _) { | |
98 } | |
99 ), "CompositeCast", """line 196, column 18 of dart:async/broadcast_stream_contro
ller.dart: """, subscription is _BroadcastSubscription<T>, false)); | |
100 if (identical(_next, _previous)) { | |
101 _runGuarded(_onListen); | |
102 } | |
103 return DEVC$RT.cast(subscription, DEVC$RT.type((StreamSubscription<dynamic> _)
{ | |
104 } | |
105 ), DEVC$RT.type((StreamSubscription<T> _) { | |
106 } | |
107 ), "CompositeCast", """line 201, column 12 of dart:async/broadcast_stream_contro
ller.dart: """, subscription is StreamSubscription<T>, false); | |
108 } | |
109 Future _recordCancel(StreamSubscription<T> subscription) { | |
110 if (identical(subscription._next, subscription)) return null; | |
111 assert (!identical(subscription._next, subscription)); if (subscription._isFiri
ng) { | |
112 subscription._setRemoveAfterFiring(); | |
113 } | |
114 else { | |
115 assert (!identical(subscription._next, subscription)); _removeListener(DEVC$RT.c
ast(subscription, DEVC$RT.type((StreamSubscription<T> _) { | |
116 } | |
117 ), DEVC$RT.type((_BroadcastSubscription<T> _) { | |
118 } | |
119 ), "CompositeCast", """line 212, column 23 of dart:async/broadcast_stream_contro
ller.dart: """, subscription is _BroadcastSubscription<T>, false)); | |
120 if (!_isFiring && _isEmpty) { | |
121 _callOnCancel(); | |
122 } | |
123 } | |
124 return null; | |
125 } | |
126 void _recordPause(StreamSubscription<T> subscription) { | |
127 } | |
128 void _recordResume(StreamSubscription<T> subscription) { | |
129 } | |
130 Error _addEventError() { | |
131 if (isClosed) { | |
132 return new StateError("Cannot add new events after calling close"); | |
133 } | |
134 assert (_isAddingStream); return new StateError("Cannot add new events while do
ing an addStream"); | |
135 } | |
136 void add(T data) { | |
137 if (!_mayAddEvent) throw _addEventError(); | |
138 _sendData(data); | |
139 } | |
140 void addError(Object error, [StackTrace stackTrace]) { | |
141 error = _nonNullError(error); | |
142 if (!_mayAddEvent) throw _addEventError(); | |
143 AsyncError replacement = Zone.current.errorCallback(error, stackTrace); | |
144 if (replacement != null) { | |
145 error = _nonNullError(replacement.error); | |
146 stackTrace = replacement.stackTrace; | |
147 } | |
148 _sendError(error, stackTrace); | |
149 } | |
150 Future close() { | |
151 if (isClosed) { | |
152 assert (_doneFuture != null); return _doneFuture; | |
153 } | |
154 if (!_mayAddEvent) throw _addEventError(); | |
155 _state |= _STATE_CLOSED; | |
156 Future doneFuture = _ensureDoneFuture(); | |
157 _sendDone(); | |
158 return doneFuture; | |
159 } | |
160 Future get done => _ensureDoneFuture(); | |
161 Future addStream(Stream<T> stream, { | |
162 bool cancelOnError : true} | |
163 ) { | |
164 if (!_mayAddEvent) throw _addEventError(); | |
165 _state |= _STATE_ADDSTREAM; | |
166 _addStreamState = new _AddStreamState<T>(this, stream, cancelOnError); | |
167 return _addStreamState.addStreamFuture; | |
168 } | |
169 void _add(T data) { | |
170 _sendData(data); | |
171 } | |
172 void _addError(Object error, StackTrace stackTrace) { | |
173 _sendError(error, stackTrace); | |
174 } | |
175 void _close() { | |
176 assert (_isAddingStream); _AddStreamState addState = _addStreamState; | |
177 _addStreamState = null; | |
178 _state &= ~_STATE_ADDSTREAM; | |
179 addState.complete(); | |
180 } | |
181 void _forEachListener(void action(_BufferingStreamSubscription<T> subscription)
) { | |
182 if (_isFiring) { | |
183 throw new StateError("Cannot fire new event. Controller is already firing an eve
nt"); | |
184 } | |
185 if (_isEmpty) return; int id = (_state & _STATE_EVENT_ID); | |
186 _state ^= _STATE_EVENT_ID | _STATE_FIRING; | |
187 _BroadcastSubscriptionLink link = _next; | |
188 while (!identical(link, this)) { | |
189 _BroadcastSubscription<T> subscription = DEVC$RT.cast(link, _BroadcastSubscripti
onLink, DEVC$RT.type((_BroadcastSubscription<T> _) { | |
190 } | |
191 ), "CompositeCast", """line 309, column 48 of dart:async/broadcast_stream_contro
ller.dart: """, link is _BroadcastSubscription<T>, false); | |
192 if (subscription._expectsEvent(id)) { | |
193 subscription._eventState |= _BroadcastSubscription._STATE_FIRING; | |
194 action(subscription); | |
195 subscription._toggleEventId(); | |
196 link = subscription._next; | |
197 if (subscription._removeAfterFiring) { | |
198 _removeListener(subscription); | |
199 } | |
200 subscription._eventState &= ~_BroadcastSubscription._STATE_FIRING; | |
201 } | |
202 else { | |
203 link = subscription._next; | |
204 } | |
205 } | |
206 _state &= ~_STATE_FIRING; | |
207 if (_isEmpty) { | |
208 _callOnCancel(); | |
209 } | |
210 } | |
211 void _callOnCancel() { | |
212 assert (_isEmpty); if (isClosed && _doneFuture._mayComplete) { | |
213 _doneFuture._asyncComplete(null); | |
214 } | |
215 _runGuarded(_onCancel); | |
216 } | |
217 } | |
218 class _SyncBroadcastStreamController<T> extends _BroadcastStreamController<T> {
_SyncBroadcastStreamController(void onListen(), void onCancel()) : super(onListe
n, onCancel); | |
219 void _sendData(T data) { | |
220 if (_isEmpty) return; if (_hasOneListener) { | |
221 _state |= _BroadcastStreamController._STATE_FIRING; | |
222 _BroadcastSubscription subscription = DEVC$RT.cast(_next, _BroadcastSubscriptio
nLink, DEVC$RT.type((_BroadcastSubscription<dynamic> _) { | |
223 } | |
224 ), "AssignmentCast", """line 350, column 45 of dart:async/broadcast_stream_contr
oller.dart: """, _next is _BroadcastSubscription<dynamic>, true); | |
225 subscription._add(data); | |
226 _state &= ~_BroadcastStreamController._STATE_FIRING; | |
227 if (_isEmpty) { | |
228 _callOnCancel(); | |
229 } | |
230 return;} | |
231 _forEachListener((_BufferingStreamSubscription<T> subscription) { | |
232 subscription._add(data); | |
233 } | |
234 ); | |
235 } | |
236 void _sendError(Object error, StackTrace stackTrace) { | |
237 if (_isEmpty) return; _forEachListener((_BufferingStreamSubscription<T> subscrip
tion) { | |
238 subscription._addError(error, stackTrace); | |
239 } | |
240 ); | |
241 } | |
242 void _sendDone() { | |
243 if (!_isEmpty) { | |
244 _forEachListener(((__x7) => DEVC$RT.cast(__x7, DEVC$RT.type((__CastType5<T> _) { | |
245 } | |
246 ), DEVC$RT.type((__CastType3<T> _) { | |
247 } | |
248 ), "InferableClosure", """line 372, column 24 of dart:async/broadcast_stream_con
troller.dart: """, __x7 is __CastType3<T>, false))((_BroadcastSubscription<T> su
bscription) { | |
249 subscription._close(); | |
250 } | |
251 )); | |
252 } | |
253 else { | |
254 assert (_doneFuture != null); assert (_doneFuture._mayComplete); _doneFuture._as
yncComplete(null); | |
255 } | |
256 } | |
257 } | |
258 class _AsyncBroadcastStreamController<T> extends _BroadcastStreamController<T>
{_AsyncBroadcastStreamController(void onListen(), void onCancel()) : super(onLis
ten, onCancel); | |
259 void _sendData(T data) { | |
260 for (_BroadcastSubscriptionLink link = _next; !identical(link, this); link = lin
k._next) { | |
261 _BroadcastSubscription<T> subscription = DEVC$RT.cast(link, _BroadcastSubscripti
onLink, DEVC$RT.type((_BroadcastSubscription<T> _) { | |
262 } | |
263 ), "CompositeCast", """line 393, column 48 of dart:async/broadcast_stream_contro
ller.dart: """, link is _BroadcastSubscription<T>, false); | |
264 subscription._addPending(new _DelayedData(data)); | |
265 } | |
266 } | |
267 void _sendError(Object error, StackTrace stackTrace) { | |
268 for (_BroadcastSubscriptionLink link = _next; !identical(link, this); link = lin
k._next) { | |
269 _BroadcastSubscription<T> subscription = DEVC$RT.cast(link, _BroadcastSubscripti
onLink, DEVC$RT.type((_BroadcastSubscription<T> _) { | |
270 } | |
271 ), "CompositeCast", """line 402, column 48 of dart:async/broadcast_stream_contro
ller.dart: """, link is _BroadcastSubscription<T>, false); | |
272 subscription._addPending(new _DelayedError(error, stackTrace)); | |
273 } | |
274 } | |
275 void _sendDone() { | |
276 if (!_isEmpty) { | |
277 for (_BroadcastSubscriptionLink link = _next; !identical(link, this); link = lin
k._next) { | |
278 _BroadcastSubscription<T> subscription = DEVC$RT.cast(link, _BroadcastSubscripti
onLink, DEVC$RT.type((_BroadcastSubscription<T> _) { | |
279 } | |
280 ), "CompositeCast", """line 412, column 50 of dart:async/broadcast_stream_contro
ller.dart: """, link is _BroadcastSubscription<T>, false); | |
281 subscription._addPending(const _DelayedDone()); | |
282 } | |
283 } | |
284 else { | |
285 assert (_doneFuture != null); assert (_doneFuture._mayComplete); _doneFuture._as
yncComplete(null); | |
286 } | |
287 } | |
288 } | |
289 class _AsBroadcastStreamController<T> extends _SyncBroadcastStreamController<T>
implements _EventDispatch<T> {_StreamImplEvents _pending; | |
290 _AsBroadcastStreamController(void onListen(), void onCancel()) : super(onListen
, onCancel); | |
291 bool get _hasPending => _pending != null && !_pending.isEmpty; | |
292 void _addPendingEvent(_DelayedEvent event) { | |
293 if (_pending == null) { | |
294 _pending = new _StreamImplEvents(); | |
295 } | |
296 _pending.add(event); | |
297 } | |
298 void add(T data) { | |
299 if (!isClosed && _isFiring) { | |
300 _addPendingEvent(new _DelayedData<T>(data)); | |
301 return;} | |
302 super.add(data); | |
303 while (_hasPending) { | |
304 _pending.handleNext(this); | |
305 } | |
306 } | |
307 void addError(Object error, [StackTrace stackTrace]) { | |
308 if (!isClosed && _isFiring) { | |
309 _addPendingEvent(new _DelayedError(error, stackTrace)); | |
310 return;} | |
311 if (!_mayAddEvent) throw _addEventError(); | |
312 _sendError(error, stackTrace); | |
313 while (_hasPending) { | |
314 _pending.handleNext(this); | |
315 } | |
316 } | |
317 Future close() { | |
318 if (!isClosed && _isFiring) { | |
319 _addPendingEvent(const _DelayedDone()); | |
320 _state |= _BroadcastStreamController._STATE_CLOSED; | |
321 return super.done; | |
322 } | |
323 Future result = super.close(); | |
324 assert (!_hasPending); return result; | |
325 } | |
326 void _callOnCancel() { | |
327 if (_hasPending) { | |
328 _pending.clear(); | |
329 _pending = null; | |
330 } | |
331 super._callOnCancel(); | |
332 } | |
333 } | |
334 class _DoneSubscription<T> implements StreamSubscription<T> {int _pauseCount =
0; | |
335 void onData(void handleData(T data)) { | |
336 } | |
337 void onError(Function handleError) { | |
338 } | |
339 void onDone(void handleDone()) { | |
340 } | |
341 void pause([Future resumeSignal]) { | |
342 if (resumeSignal != null) resumeSignal.then(_resume); | |
343 _pauseCount++; | |
344 } | |
345 void resume() { | |
346 _resume(null); | |
347 } | |
348 void _resume(_) { | |
349 if (_pauseCount > 0) _pauseCount--; | |
350 } | |
351 Future cancel() { | |
352 return new _Future.immediate(null); | |
353 } | |
354 bool get isPaused => _pauseCount > 0; | |
355 Future asFuture([Object value]) => new _Future(); | |
356 } | |
357 typedef void __CastType3<T>(_BufferingStreamSubscription<T> __u4); | |
358 typedef dynamic __CastType5<T>(_BroadcastSubscription<T> __u6); | |
OLD | NEW |