OLD | NEW |
| (Empty) |
1 part of dart.async; | |
2 abstract class _EventSink<T> {void _add(T data); | |
3 void _addError(Object error, StackTrace stackTrace); | |
4 void _close(); | |
5 } | |
6 abstract class _EventDispatch<T> {void _sendData(T data); | |
7 void _sendError(Object error, StackTrace stackTrace); | |
8 void _sendDone(); | |
9 } | |
10 class _BufferingStreamSubscription<T> implements StreamSubscription<T>, _EventS
ink<T>, _EventDispatch<T> {static const int _STATE_CANCEL_ON_ERROR = 1; | |
11 static const int _STATE_CLOSED = 2; | |
12 static const int _STATE_INPUT_PAUSED = 4; | |
13 static const int _STATE_CANCELED = 8; | |
14 static const int _STATE_WAIT_FOR_CANCEL = 16; | |
15 static const int _STATE_IN_CALLBACK = 32; | |
16 static const int _STATE_HAS_PENDING = 64; | |
17 static const int _STATE_PAUSE_COUNT = 128; | |
18 static const int _STATE_PAUSE_COUNT_SHIFT = 7; | |
19 _DataHandler<T> _onData; | |
20 Function _onError; | |
21 _DoneHandler _onDone; | |
22 final Zone _zone = Zone.current; | |
23 int _state; | |
24 Future _cancelFuture; | |
25 _PendingEvents _pending; | |
26 _BufferingStreamSubscription(void onData(T data), Function onError, void onDone
(), bool cancelOnError) : _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0)
{ | |
27 this.onData(onData); | |
28 this.onError(onError); | |
29 this.onDone(onDone); | |
30 } | |
31 void _setPendingEvents(_PendingEvents pendingEvents) { | |
32 assert (_pending == null); if (pendingEvents == null) return; _pending = pending
Events; | |
33 if (!pendingEvents.isEmpty) { | |
34 _state |= _STATE_HAS_PENDING; | |
35 _pending.schedule(this); | |
36 } | |
37 } | |
38 _PendingEvents _extractPending() { | |
39 assert (_isCanceled); _PendingEvents events = _pending; | |
40 _pending = null; | |
41 return events; | |
42 } | |
43 void onData(void handleData(T event)) { | |
44 if (handleData == null) handleData = DEVC$RT.cast(_nullDataHandler, __CastType47
, DEVC$RT.type((__CastType45<T> _) { | |
45 } | |
46 ), "CompositeCast", """line 153, column 42 of dart:async/stream_impl.dart: """,
_nullDataHandler is __CastType45<T>, false); | |
47 _onData = ((__x49) => DEVC$RT.cast(__x49, ZoneUnaryCallback, DEVC$RT.type((_Dat
aHandler<T> _) { | |
48 } | |
49 ), "CompositeCast", """line 154, column 15 of dart:async/stream_impl.dart: """,
__x49 is _DataHandler<T>, false))(_zone.registerUnaryCallback(handleData)); | |
50 } | |
51 void onError(Function handleError) { | |
52 if (handleError == null) handleError = _nullErrorHandler; | |
53 _onError = _registerErrorHandler(handleError, _zone); | |
54 } | |
55 void onDone(void handleDone()) { | |
56 if (handleDone == null) handleDone = _nullDoneHandler; | |
57 _onDone = _zone.registerCallback(handleDone); | |
58 } | |
59 void pause([Future resumeSignal]) { | |
60 if (_isCanceled) return; bool wasPaused = _isPaused; | |
61 bool wasInputPaused = _isInputPaused; | |
62 _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED; | |
63 if (resumeSignal != null) resumeSignal.whenComplete(resume); | |
64 if (!wasPaused && _pending != null) _pending.cancelSchedule(); | |
65 if (!wasInputPaused && !_inCallback) _guardCallback(_onPause); | |
66 } | |
67 void resume() { | |
68 if (_isCanceled) return; if (_isPaused) { | |
69 _decrementPauseCount(); | |
70 if (!_isPaused) { | |
71 if (_hasPending && !_pending.isEmpty) { | |
72 _pending.schedule(this); | |
73 } | |
74 else { | |
75 assert (_mayResumeInput); _state &= ~_STATE_INPUT_PAUSED; | |
76 if (!_inCallback) _guardCallback(_onResume); | |
77 } | |
78 } | |
79 } | |
80 } | |
81 Future cancel() { | |
82 _state &= ~_STATE_WAIT_FOR_CANCEL; | |
83 if (_isCanceled) return _cancelFuture; | |
84 _cancel(); | |
85 return _cancelFuture; | |
86 } | |
87 Future asFuture([var futureValue]) { | |
88 _Future<T> result = new _Future<T>(); | |
89 _onDone = () { | |
90 result._complete(futureValue); | |
91 } | |
92 ; | |
93 _onError = (error, stackTrace) { | |
94 cancel(); | |
95 result._completeError(error, DEVC$RT.cast(stackTrace, dynamic, StackTrace, "Dyn
amicCast", """line 212, column 36 of dart:async/stream_impl.dart: """, stackTrac
e is StackTrace, true)); | |
96 } | |
97 ; | |
98 return result; | |
99 } | |
100 bool get _isInputPaused => (_state & _STATE_INPUT_PAUSED) != 0; | |
101 bool get _isClosed => (_state & _STATE_CLOSED) != 0; | |
102 bool get _isCanceled => (_state & _STATE_CANCELED) != 0; | |
103 bool get _waitsForCancel => (_state & _STATE_WAIT_FOR_CANCEL) != 0; | |
104 bool get _inCallback => (_state & _STATE_IN_CALLBACK) != 0; | |
105 bool get _hasPending => (_state & _STATE_HAS_PENDING) != 0; | |
106 bool get _isPaused => _state >= _STATE_PAUSE_COUNT; | |
107 bool get _canFire => _state < _STATE_IN_CALLBACK; | |
108 bool get _mayResumeInput => !_isPaused && (_pending == null || _pending.isEmpty
); | |
109 bool get _cancelOnError => (_state & _STATE_CANCEL_ON_ERROR) != 0; | |
110 bool get isPaused => _isPaused; | |
111 void _cancel() { | |
112 _state |= _STATE_CANCELED; | |
113 if (_hasPending) { | |
114 _pending.cancelSchedule(); | |
115 } | |
116 if (!_inCallback) _pending = null; | |
117 _cancelFuture = _onCancel(); | |
118 } | |
119 void _incrementPauseCount() { | |
120 _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED; | |
121 } | |
122 void _decrementPauseCount() { | |
123 assert (_isPaused); _state -= _STATE_PAUSE_COUNT; | |
124 } | |
125 void _add(T data) { | |
126 assert (!_isClosed); if (_isCanceled) return; if (_canFire) { | |
127 _sendData(data); | |
128 } | |
129 else { | |
130 _addPending(new _DelayedData(data)); | |
131 } | |
132 } | |
133 void _addError(Object error, StackTrace stackTrace) { | |
134 if (_isCanceled) return; if (_canFire) { | |
135 _sendError(error, stackTrace); | |
136 } | |
137 else { | |
138 _addPending(new _DelayedError(error, stackTrace)); | |
139 } | |
140 } | |
141 void _close() { | |
142 assert (!_isClosed); if (_isCanceled) return; _state |= _STATE_CLOSED; | |
143 if (_canFire) { | |
144 _sendDone(); | |
145 } | |
146 else { | |
147 _addPending(const _DelayedDone()); | |
148 } | |
149 } | |
150 void _onPause() { | |
151 assert (_isInputPaused);} | |
152 void _onResume() { | |
153 assert (!_isInputPaused);} | |
154 Future _onCancel() { | |
155 assert (_isCanceled); return null; | |
156 } | |
157 void _addPending(_DelayedEvent event) { | |
158 _StreamImplEvents pending = DEVC$RT.cast(_pending, _PendingEvents, _StreamImplEv
ents, "AssignmentCast", """line 322, column 33 of dart:async/stream_impl.dart: "
"", _pending is _StreamImplEvents, true); | |
159 if (_pending == null) pending = _pending = new _StreamImplEvents(); | |
160 pending.add(event); | |
161 if (!_hasPending) { | |
162 _state |= _STATE_HAS_PENDING; | |
163 if (!_isPaused) { | |
164 _pending.schedule(this); | |
165 } | |
166 } | |
167 } | |
168 void _sendData(T data) { | |
169 assert (!_isCanceled); assert (!_isPaused); assert (!_inCallback); bool wasInput
Paused = _isInputPaused; | |
170 _state |= _STATE_IN_CALLBACK; | |
171 _zone.runUnaryGuarded(_onData, data); | |
172 _state &= ~_STATE_IN_CALLBACK; | |
173 _checkState(wasInputPaused); | |
174 } | |
175 void _sendError(Object error, StackTrace stackTrace) { | |
176 assert (!_isCanceled); assert (!_isPaused); assert (!_inCallback); bool wasInput
Paused = _isInputPaused; | |
177 void sendError() { | |
178 if (_isCanceled && !_waitsForCancel) return; _state |= _STATE_IN_CALLBACK; | |
179 if (_onError is ZoneBinaryCallback) { | |
180 _zone.runBinaryGuarded(DEVC$RT.cast(_onError, Function, __CastType50, "Implici
tCast", """line 358, column 32 of dart:async/stream_impl.dart: """, _onError is
__CastType50, true), error, stackTrace); | |
181 } | |
182 else { | |
183 _zone.runUnaryGuarded(DEVC$RT.cast(_onError, Function, __CastType53, "Implicit
Cast", """line 360, column 31 of dart:async/stream_impl.dart: """, _onError is _
_CastType53, true), error); | |
184 } | |
185 _state &= ~_STATE_IN_CALLBACK; | |
186 } | |
187 if (_cancelOnError) { | |
188 _state |= _STATE_WAIT_FOR_CANCEL; | |
189 _cancel(); | |
190 if (_cancelFuture is Future) { | |
191 _cancelFuture.whenComplete(sendError); | |
192 } | |
193 else { | |
194 sendError(); | |
195 } | |
196 } | |
197 else { | |
198 sendError(); | |
199 _checkState(wasInputPaused); | |
200 } | |
201 } | |
202 void _sendDone() { | |
203 assert (!_isCanceled); assert (!_isPaused); assert (!_inCallback); void sendDone
() { | |
204 if (!_waitsForCancel) return; _state |= (_STATE_CANCELED | _STATE_CLOSED | _STAT
E_IN_CALLBACK); | |
205 _zone.runGuarded(_onDone); | |
206 _state &= ~_STATE_IN_CALLBACK; | |
207 } | |
208 _cancel(); | |
209 _state |= _STATE_WAIT_FOR_CANCEL; | |
210 if (_cancelFuture is Future) { | |
211 _cancelFuture.whenComplete(sendDone); | |
212 } | |
213 else { | |
214 sendDone(); | |
215 } | |
216 } | |
217 void _guardCallback(callback) { | |
218 assert (!_inCallback); bool wasInputPaused = _isInputPaused; | |
219 _state |= _STATE_IN_CALLBACK; | |
220 callback(); | |
221 _state &= ~_STATE_IN_CALLBACK; | |
222 _checkState(wasInputPaused); | |
223 } | |
224 void _checkState(bool wasInputPaused) { | |
225 assert (!_inCallback); if (_hasPending && _pending.isEmpty) { | |
226 _state &= ~_STATE_HAS_PENDING; | |
227 if (_isInputPaused && _mayResumeInput) { | |
228 _state &= ~_STATE_INPUT_PAUSED; | |
229 } | |
230 } | |
231 while (true) { | |
232 if (_isCanceled) { | |
233 _pending = null; | |
234 return;} | |
235 bool isInputPaused = _isInputPaused; | |
236 if (wasInputPaused == isInputPaused) break; | |
237 _state ^= _STATE_IN_CALLBACK; | |
238 if (isInputPaused) { | |
239 _onPause(); | |
240 } | |
241 else { | |
242 _onResume(); | |
243 } | |
244 _state &= ~_STATE_IN_CALLBACK; | |
245 wasInputPaused = isInputPaused; | |
246 } | |
247 if (_hasPending && !_isPaused) { | |
248 _pending.schedule(this); | |
249 } | |
250 } | |
251 } | |
252 abstract class _StreamImpl<T> extends Stream<T> {StreamSubscription<T> listen(v
oid onData(T data), { | |
253 Function onError, void onDone(), bool cancelOnError} | |
254 ) { | |
255 cancelOnError = identical(true, cancelOnError); | |
256 StreamSubscription subscription = _createSubscription(onData, onError, onDone,
cancelOnError); | |
257 _onListen(subscription); | |
258 return DEVC$RT.cast(subscription, DEVC$RT.type((StreamSubscription<dynamic> _)
{ | |
259 } | |
260 ), DEVC$RT.type((StreamSubscription<T> _) { | |
261 } | |
262 ), "CompositeCast", """line 476, column 12 of dart:async/stream_impl.dart: """,
subscription is StreamSubscription<T>, false); | |
263 } | |
264 StreamSubscription<T> _createSubscription(void onData(T data), Function onError
, void onDone(), bool cancelOnError) { | |
265 return new _BufferingStreamSubscription<T>(onData, onError, onDone, cancelOnErro
r); | |
266 } | |
267 void _onListen(StreamSubscription subscription) { | |
268 } | |
269 } | |
270 typedef _PendingEvents _EventGenerator(); | |
271 class _GeneratedStreamImpl<T> extends _StreamImpl<T> {final _EventGenerator _pe
nding; | |
272 bool _isUsed = false; | |
273 _GeneratedStreamImpl(this._pending); | |
274 StreamSubscription<T> _createSubscription(void onData(T data), Function onError
, void onDone(), bool cancelOnError) { | |
275 if (_isUsed) throw new StateError("Stream has already been listened to."); | |
276 _isUsed = true; | |
277 return ((__x55) => DEVC$RT.cast(__x55, DEVC$RT.type((_BufferingStreamSubscripti
on<dynamic> _) { | |
278 } | |
279 ), DEVC$RT.type((StreamSubscription<T> _) { | |
280 } | |
281 ), "CompositeCast", """line 515, column 12 of dart:async/stream_impl.dart: """,
__x55 is StreamSubscription<T>, false))(new _BufferingStreamSubscription(onData,
onError, onDone, cancelOnError).._setPendingEvents(_pending())); | |
282 } | |
283 } | |
284 class _IterablePendingEvents<T> extends _PendingEvents {Iterator<T> _iterator; | |
285 _IterablePendingEvents(Iterable<T> data) : _iterator = data.iterator; | |
286 bool get isEmpty => _iterator == null; | |
287 void handleNext(_EventDispatch dispatch) { | |
288 if (_iterator == null) { | |
289 throw new StateError("No events pending."); | |
290 } | |
291 bool isDone; | |
292 try { | |
293 isDone = !_iterator.moveNext(); | |
294 } | |
295 catch (e, s) { | |
296 _iterator = null; | |
297 dispatch._sendError(e, s); | |
298 return;} | |
299 if (!isDone) { | |
300 dispatch._sendData(_iterator.current); | |
301 } | |
302 else { | |
303 _iterator = null; | |
304 dispatch._sendDone(); | |
305 } | |
306 } | |
307 void clear() { | |
308 if (isScheduled) cancelSchedule(); | |
309 _iterator = null; | |
310 } | |
311 } | |
312 typedef void _DataHandler<T>(T value); | |
313 typedef void _DoneHandler(); | |
314 void _nullDataHandler(var value) { | |
315 } | |
316 void _nullErrorHandler(error, [StackTrace stackTrace]) { | |
317 Zone.current.handleUncaughtError(error, stackTrace); | |
318 } | |
319 void _nullDoneHandler() { | |
320 } | |
321 abstract class _DelayedEvent<T> {_DelayedEvent next; | |
322 void perform(_EventDispatch<T> dispatch); | |
323 } | |
324 class _DelayedData<T> extends _DelayedEvent<T> {final T value; | |
325 _DelayedData(this.value); | |
326 void perform(_EventDispatch<T> dispatch) { | |
327 dispatch._sendData(value); | |
328 } | |
329 } | |
330 class _DelayedError extends _DelayedEvent {final error; | |
331 final StackTrace stackTrace; | |
332 _DelayedError(this.error, this.stackTrace); | |
333 void perform(_EventDispatch dispatch) { | |
334 dispatch._sendError(error, stackTrace); | |
335 } | |
336 } | |
337 class _DelayedDone implements _DelayedEvent {const _DelayedDone(); | |
338 void perform(_EventDispatch dispatch) { | |
339 dispatch._sendDone(); | |
340 } | |
341 _DelayedEvent get next => null; | |
342 void set next(_DelayedEvent _) { | |
343 throw new StateError("No events after a done."); | |
344 } | |
345 } | |
346 abstract class _PendingEvents {static const int _STATE_UNSCHEDULED = 0; | |
347 static const int _STATE_SCHEDULED = 1; | |
348 static const int _STATE_CANCELED = 3; | |
349 int _state = _STATE_UNSCHEDULED; | |
350 bool get isEmpty; | |
351 bool get isScheduled => _state == _STATE_SCHEDULED; | |
352 bool get _eventScheduled => _state >= _STATE_SCHEDULED; | |
353 void schedule(_EventDispatch dispatch) { | |
354 if (isScheduled) return; assert (!isEmpty); if (_eventScheduled) { | |
355 assert (_state == _STATE_CANCELED); _state = _STATE_SCHEDULED; | |
356 return;} | |
357 scheduleMicrotask(() { | |
358 int oldState = _state; | |
359 _state = _STATE_UNSCHEDULED; | |
360 if (oldState == _STATE_CANCELED) return; handleNext(dispatch); | |
361 } | |
362 ); | |
363 _state = _STATE_SCHEDULED; | |
364 } | |
365 void cancelSchedule() { | |
366 if (isScheduled) _state = _STATE_CANCELED; | |
367 } | |
368 void handleNext(_EventDispatch dispatch); | |
369 void clear(); | |
370 } | |
371 class _StreamImplEvents extends _PendingEvents {_DelayedEvent firstPendingEvent
= null; | |
372 _DelayedEvent lastPendingEvent = null; | |
373 bool get isEmpty => lastPendingEvent == null; | |
374 void add(_DelayedEvent event) { | |
375 if (lastPendingEvent == null) { | |
376 firstPendingEvent = lastPendingEvent = event; | |
377 } | |
378 else { | |
379 lastPendingEvent = lastPendingEvent.next = event; | |
380 } | |
381 } | |
382 void handleNext(_EventDispatch dispatch) { | |
383 assert (!isScheduled); _DelayedEvent event = firstPendingEvent; | |
384 firstPendingEvent = event.next; | |
385 if (firstPendingEvent == null) { | |
386 lastPendingEvent = null; | |
387 } | |
388 event.perform(dispatch); | |
389 } | |
390 void clear() { | |
391 if (isScheduled) cancelSchedule(); | |
392 firstPendingEvent = lastPendingEvent = null; | |
393 } | |
394 } | |
395 class _BroadcastLinkedList {_BroadcastLinkedList _next; | |
396 _BroadcastLinkedList _previous; | |
397 void _unlink() { | |
398 _previous._next = _next; | |
399 _next._previous = _previous; | |
400 _next = _previous = this; | |
401 } | |
402 void _insertBefore(_BroadcastLinkedList newNext) { | |
403 _BroadcastLinkedList newPrevious = newNext._previous; | |
404 newPrevious._next = this; | |
405 newNext._previous = _previous; | |
406 _previous._next = newNext; | |
407 _previous = newPrevious; | |
408 } | |
409 } | |
410 typedef void _broadcastCallback(StreamSubscription subscription); | |
411 class _DoneStreamSubscription<T> implements StreamSubscription<T> {static const
int _DONE_SENT = 1; | |
412 static const int _SCHEDULED = 2; | |
413 static const int _PAUSED = 4; | |
414 final Zone _zone; | |
415 int _state = 0; | |
416 _DoneHandler _onDone; | |
417 _DoneStreamSubscription(this._onDone) : _zone = Zone.current { | |
418 _schedule(); | |
419 } | |
420 bool get _isSent => (_state & _DONE_SENT) != 0; | |
421 bool get _isScheduled => (_state & _SCHEDULED) != 0; | |
422 bool get isPaused => _state >= _PAUSED; | |
423 void _schedule() { | |
424 if (_isScheduled) return; _zone.scheduleMicrotask(_sendDone); | |
425 _state |= _SCHEDULED; | |
426 } | |
427 void onData(void handleData(T data)) { | |
428 } | |
429 void onError(Function handleError) { | |
430 } | |
431 void onDone(void handleDone()) { | |
432 _onDone = handleDone; | |
433 } | |
434 void pause([Future resumeSignal]) { | |
435 _state += _PAUSED; | |
436 if (resumeSignal != null) resumeSignal.whenComplete(resume); | |
437 } | |
438 void resume() { | |
439 if (isPaused) { | |
440 _state -= _PAUSED; | |
441 if (!isPaused && !_isSent) { | |
442 _schedule(); | |
443 } | |
444 } | |
445 } | |
446 Future cancel() => null; | |
447 Future asFuture([futureValue]) { | |
448 _Future result = new _Future(); | |
449 _onDone = () { | |
450 result._completeWithValue(null); | |
451 } | |
452 ; | |
453 return result; | |
454 } | |
455 void _sendDone() { | |
456 _state &= ~_SCHEDULED; | |
457 if (isPaused) return; _state |= _DONE_SENT; | |
458 if (_onDone != null) _zone.runGuarded(_onDone); | |
459 } | |
460 } | |
461 class _AsBroadcastStream<T> extends Stream<T> {final Stream<T> _source; | |
462 final _broadcastCallback _onListenHandler; | |
463 final _broadcastCallback _onCancelHandler; | |
464 final Zone _zone; | |
465 _AsBroadcastStreamController<T> _controller; | |
466 StreamSubscription<T> _subscription; | |
467 _AsBroadcastStream(this._source, void onListenHandler(StreamSubscription subscr
iption), void onCancelHandler(StreamSubscription subscription)) : _onListenHandl
er = ((__x56) => DEVC$RT.cast(__x56, ZoneUnaryCallback, _broadcastCallback, "Com
positeCast", """line 813, column 28 of dart:async/stream_impl.dart: """, __x56 i
s _broadcastCallback, false))(Zone.current.registerUnaryCallback(onListenHandler
)), _onCancelHandler = ((__x57) => DEVC$RT.cast(__x57, ZoneUnaryCallback, _broad
castCallback, "CompositeCast", """line 814, column 28 of dart:async/stream_impl.
dart: """, __x57 is _broadcastCallback, false))(Zone.current.registerUnaryCallba
ck(onCancelHandler)), _zone = Zone.current { | |
468 _controller = new _AsBroadcastStreamController<T>(_onListen, _onCancel); | |
469 } | |
470 bool get isBroadcast => true; | |
471 StreamSubscription<T> listen(void onData(T data), { | |
472 Function onError, void onDone(), bool cancelOnError} | |
473 ) { | |
474 if (_controller == null || _controller.isClosed) { | |
475 return new _DoneStreamSubscription<T>(onDone); | |
476 } | |
477 if (_subscription == null) { | |
478 _subscription = _source.listen(_controller.add, onError: _controller.addError, o
nDone: _controller.close); | |
479 } | |
480 cancelOnError = identical(true, cancelOnError); | |
481 return _controller._subscribe(onData, onError, onDone, cancelOnError); | |
482 } | |
483 void _onCancel() { | |
484 bool shutdown = (_controller == null) || _controller.isClosed; | |
485 if (_onCancelHandler != null) { | |
486 _zone.runUnary(_onCancelHandler, new _BroadcastSubscriptionWrapper(this)); | |
487 } | |
488 if (shutdown) { | |
489 if (_subscription != null) { | |
490 _subscription.cancel(); | |
491 _subscription = null; | |
492 } | |
493 } | |
494 } | |
495 void _onListen() { | |
496 if (_onListenHandler != null) { | |
497 _zone.runUnary(_onListenHandler, new _BroadcastSubscriptionWrapper(this)); | |
498 } | |
499 } | |
500 void _cancelSubscription() { | |
501 if (_subscription == null) return; StreamSubscription subscription = _subscripti
on; | |
502 _subscription = null; | |
503 _controller = null; | |
504 subscription.cancel(); | |
505 } | |
506 void _pauseSubscription(Future resumeSignal) { | |
507 if (_subscription == null) return; _subscription.pause(resumeSignal); | |
508 } | |
509 void _resumeSubscription() { | |
510 if (_subscription == null) return; _subscription.resume(); | |
511 } | |
512 bool get _isSubscriptionPaused { | |
513 if (_subscription == null) return false; | |
514 return _subscription.isPaused; | |
515 } | |
516 } | |
517 class _BroadcastSubscriptionWrapper<T> implements StreamSubscription<T> {final
_AsBroadcastStream _stream; | |
518 _BroadcastSubscriptionWrapper(this._stream); | |
519 void onData(void handleData(T data)) { | |
520 throw new UnsupportedError("Cannot change handlers of asBroadcastStream source s
ubscription."); | |
521 } | |
522 void onError(Function handleError) { | |
523 throw new UnsupportedError("Cannot change handlers of asBroadcastStream source s
ubscription."); | |
524 } | |
525 void onDone(void handleDone()) { | |
526 throw new UnsupportedError("Cannot change handlers of asBroadcastStream source s
ubscription."); | |
527 } | |
528 void pause([Future resumeSignal]) { | |
529 _stream._pauseSubscription(resumeSignal); | |
530 } | |
531 void resume() { | |
532 _stream._resumeSubscription(); | |
533 } | |
534 Future cancel() { | |
535 _stream._cancelSubscription(); | |
536 return null; | |
537 } | |
538 bool get isPaused { | |
539 return _stream._isSubscriptionPaused; | |
540 } | |
541 Future asFuture([var futureValue]) { | |
542 throw new UnsupportedError("Cannot change handlers of asBroadcastStream source s
ubscription."); | |
543 } | |
544 } | |
545 class _StreamIteratorImpl<T> implements StreamIterator<T> {static const int _ST
ATE_FOUND = 0; | |
546 static const int _STATE_DONE = 1; | |
547 static const int _STATE_MOVING = 2; | |
548 static const int _STATE_EXTRA_DATA = 3; | |
549 static const int _STATE_EXTRA_ERROR = 4; | |
550 static const int _STATE_EXTRA_DONE = 5; | |
551 StreamSubscription _subscription; | |
552 T _current = null; | |
553 var _futureOrPrefetch = null; | |
554 int _state = _STATE_FOUND; | |
555 _StreamIteratorImpl(final Stream<T> stream) { | |
556 _subscription = stream.listen(_onData, onError: _onError, onDone: _onDone, cance
lOnError: true); | |
557 } | |
558 T get current => _current; | |
559 Future<bool> moveNext() { | |
560 if (_state == _STATE_DONE) { | |
561 return new _Future<bool>.immediate(false); | |
562 } | |
563 if (_state == _STATE_MOVING) { | |
564 throw new StateError("Already waiting for next."); | |
565 } | |
566 if (_state == _STATE_FOUND) { | |
567 _state = _STATE_MOVING; | |
568 _current = null; | |
569 _futureOrPrefetch = new _Future<bool>(); | |
570 return DEVC$RT.cast(_futureOrPrefetch, dynamic, DEVC$RT.type((Future<bool> _) { | |
571 } | |
572 ), "CompositeCast", """line 1000, column 14 of dart:async/stream_impl.dart: """,
_futureOrPrefetch is Future<bool>, false); | |
573 } | |
574 else { | |
575 assert (_state >= _STATE_EXTRA_DATA); switch (_state) {case _STATE_EXTRA_DATA: _
state = _STATE_FOUND; | |
576 _current = DEVC$RT.cast(_futureOrPrefetch, dynamic, T, "CompositeCast", """line
1006, column 22 of dart:async/stream_impl.dart: """, _futureOrPrefetch is T, fa
lse); | |
577 _futureOrPrefetch = null; | |
578 _subscription.resume(); | |
579 return new _Future<bool>.immediate(true); | |
580 case _STATE_EXTRA_ERROR: AsyncError prefetch = DEVC$RT.cast(_futureOrPrefetch,
dynamic, AsyncError, "DynamicCast", """line 1011, column 33 of dart:async/stream
_impl.dart: """, _futureOrPrefetch is AsyncError, true); | |
581 _clear(); | |
582 return new _Future<bool>.immediateError(prefetch.error, prefetch.stackTrace); | |
583 case _STATE_EXTRA_DONE: _clear(); | |
584 return new _Future<bool>.immediate(false); | |
585 } | |
586 } | |
587 } | |
588 void _clear() { | |
589 _subscription = null; | |
590 _futureOrPrefetch = null; | |
591 _current = null; | |
592 _state = _STATE_DONE; | |
593 } | |
594 Future cancel() { | |
595 StreamSubscription subscription = _subscription; | |
596 if (_state == _STATE_MOVING) { | |
597 _Future<bool> hasNext = DEVC$RT.cast(_futureOrPrefetch, dynamic, DEVC$RT.type((_
Future<bool> _) { | |
598 } | |
599 ), "CompositeCast", """line 1033, column 31 of dart:async/stream_impl.dart: """,
_futureOrPrefetch is _Future<bool>, false); | |
600 _clear(); | |
601 hasNext._complete(false); | |
602 } | |
603 else { | |
604 _clear(); | |
605 } | |
606 return subscription.cancel(); | |
607 } | |
608 void _onData(T data) { | |
609 if (_state == _STATE_MOVING) { | |
610 _current = data; | |
611 _Future<bool> hasNext = DEVC$RT.cast(_futureOrPrefetch, dynamic, DEVC$RT.type((
_Future<bool> _) { | |
612 } | |
613 ), "CompositeCast", """line 1045, column 31 of dart:async/stream_impl.dart: """,
_futureOrPrefetch is _Future<bool>, false); | |
614 _futureOrPrefetch = null; | |
615 _state = _STATE_FOUND; | |
616 hasNext._complete(true); | |
617 return;} | |
618 _subscription.pause(); | |
619 assert (_futureOrPrefetch == null); _futureOrPrefetch = data; | |
620 _state = _STATE_EXTRA_DATA; | |
621 } | |
622 void _onError(Object error, [StackTrace stackTrace]) { | |
623 if (_state == _STATE_MOVING) { | |
624 _Future<bool> hasNext = DEVC$RT.cast(_futureOrPrefetch, dynamic, DEVC$RT.type((_
Future<bool> _) { | |
625 } | |
626 ), "CompositeCast", """line 1059, column 31 of dart:async/stream_impl.dart: """,
_futureOrPrefetch is _Future<bool>, false); | |
627 _clear(); | |
628 hasNext._completeError(error, stackTrace); | |
629 return;} | |
630 _subscription.pause(); | |
631 assert (_futureOrPrefetch == null); _futureOrPrefetch = new AsyncError(error, s
tackTrace); | |
632 _state = _STATE_EXTRA_ERROR; | |
633 } | |
634 void _onDone() { | |
635 if (_state == _STATE_MOVING) { | |
636 _Future<bool> hasNext = DEVC$RT.cast(_futureOrPrefetch, dynamic, DEVC$RT.type((_
Future<bool> _) { | |
637 } | |
638 ), "CompositeCast", """line 1073, column 31 of dart:async/stream_impl.dart: """,
_futureOrPrefetch is _Future<bool>, false); | |
639 _clear(); | |
640 hasNext._complete(false); | |
641 return;} | |
642 _subscription.pause(); | |
643 _futureOrPrefetch = null; | |
644 _state = _STATE_EXTRA_DONE; | |
645 } | |
646 } | |
647 typedef void __CastType45<T>(T __u46); | |
648 typedef void __CastType47(dynamic __u48); | |
649 typedef dynamic __CastType50(dynamic __u51, dynamic __u52); | |
650 typedef dynamic __CastType53(dynamic __u54); | |
OLD | NEW |