OLD | NEW |
| (Empty) |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | |
2 // for details. All rights reserved. Use of this source code is governed by a | |
3 // BSD-style license that can be found in the LICENSE file. | |
4 | |
5 part of dart.async; | |
6 | |
7 /** Abstract and private interface for a place to put events. */ | |
8 abstract class _EventSink<T> { | |
9 void _add(T data); | |
10 void _addError(Object error, StackTrace stackTrace); | |
11 void _close(); | |
12 } | |
13 | |
14 /** | |
15 * Abstract and private interface for a place to send events. | |
16 * | |
17 * Used by event buffering to finally dispatch the pending event, where | |
18 * [_EventSink] is where the event first enters the stream subscription, | |
19 * and may yet be buffered. | |
20 */ | |
21 abstract class _EventDispatch<T> { | |
22 void _sendData(T data); | |
23 void _sendError(Object error, StackTrace stackTrace); | |
24 void _sendDone(); | |
25 } | |
26 | |
27 /** | |
28 * Default implementation of stream subscription of buffering events. | |
29 * | |
30 * The only public methods are those of [StreamSubscription], so instances of | |
31 * [_BufferingStreamSubscription] can be returned directly as a | |
32 * [StreamSubscription] without exposing internal functionality. | |
33 * | |
34 * The [StreamController] is a public facing version of [Stream] and this class, | |
35 * with some methods made public. | |
36 * | |
37 * The user interface of [_BufferingStreamSubscription] are the following | |
38 * methods: | |
39 * | |
40 * * [_add]: Add a data event to the stream. | |
41 * * [_addError]: Add an error event to the stream. | |
42 * * [_close]: Request to close the stream. | |
43 * * [_onCancel]: Called when the subscription will provide no more events, | |
44 * either due to being actively canceled, or after sending a done event. | |
45 * * [_onPause]: Called when the subscription wants the event source to pause. | |
46 * * [_onResume]: Called when allowing new events after a pause. | |
47 * | |
48 * The user should not add new events when the subscription requests a paused, | |
49 * but if it happens anyway, the subscription will enqueue the events just as | |
50 * when new events arrive while still firing an old event. | |
51 */ | |
52 class _BufferingStreamSubscription<T> implements StreamSubscription<T>, | |
53 _EventSink<T>, | |
54 _EventDispatch<T> { | |
55 /** The `cancelOnError` flag from the `listen` call. */ | |
56 static const int _STATE_CANCEL_ON_ERROR = 1; | |
57 /** | |
58 * Whether the "done" event has been received. | |
59 * No further events are accepted after this. | |
60 */ | |
61 static const int _STATE_CLOSED = 2; | |
62 /** | |
63 * Set if the input has been asked not to send events. | |
64 * | |
65 * This is not the same as being paused, since the input will remain paused | |
66 * after a call to [resume] if there are pending events. | |
67 */ | |
68 static const int _STATE_INPUT_PAUSED = 4; | |
69 /** | |
70 * Whether the subscription has been canceled. | |
71 * | |
72 * Set by calling [cancel], or by handling a "done" event, or an "error" event | |
73 * when `cancelOnError` is true. | |
74 */ | |
75 static const int _STATE_CANCELED = 8; | |
76 /** | |
77 * Set when either: | |
78 * | |
79 * * an error is sent, and [cancelOnError] is true, or | |
80 * * a done event is sent. | |
81 * | |
82 * If the subscription is canceled while _STATE_WAIT_FOR_CANCEL is set, the | |
83 * state is unset, and no furher events must be delivered. | |
84 */ | |
85 static const int _STATE_WAIT_FOR_CANCEL = 16; | |
86 static const int _STATE_IN_CALLBACK = 32; | |
87 static const int _STATE_HAS_PENDING = 64; | |
88 static const int _STATE_PAUSE_COUNT = 128; | |
89 static const int _STATE_PAUSE_COUNT_SHIFT = 7; | |
90 | |
91 /* Event handlers provided in constructor. */ | |
92 _DataHandler<T> _onData; | |
93 Function _onError; | |
94 _DoneHandler _onDone; | |
95 final Zone _zone = Zone.current; | |
96 | |
97 /** Bit vector based on state-constants above. */ | |
98 int _state; | |
99 | |
100 // TODO(floitsch): reuse another field | |
101 /** The future [_onCancel] may return. */ | |
102 Future _cancelFuture; | |
103 | |
104 /** | |
105 * Queue of pending events. | |
106 * | |
107 * Is created when necessary, or set in constructor for preconfigured events. | |
108 */ | |
109 _PendingEvents<T> _pending; | |
110 | |
111 _BufferingStreamSubscription(void onData(T data), | |
112 Function onError, | |
113 void onDone(), | |
114 bool cancelOnError) | |
115 : _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0) { | |
116 this.onData(onData); | |
117 this.onError(onError); | |
118 this.onDone(onDone); | |
119 } | |
120 | |
121 /** | |
122 * Sets the subscription's pending events object. | |
123 * | |
124 * This can only be done once. The pending events object is used for the | |
125 * rest of the subscription's life cycle. | |
126 */ | |
127 void _setPendingEvents(_PendingEvents<T> pendingEvents) { | |
128 assert(_pending == null); | |
129 if (pendingEvents == null) return; | |
130 _pending = pendingEvents; | |
131 if (!pendingEvents.isEmpty) { | |
132 _state |= _STATE_HAS_PENDING; | |
133 _pending.schedule(this); | |
134 } | |
135 } | |
136 | |
137 // StreamSubscription interface. | |
138 | |
139 void onData(void handleData(T event)) { | |
140 if (handleData == null) handleData = _nullDataHandler; | |
141 // TODO(floitsch): the return type should be 'void', and the type | |
142 // should be inferred. | |
143 _onData = _zone.registerUnaryCallback/*<dynamic, T>*/(handleData); | |
144 } | |
145 | |
146 void onError(Function handleError) { | |
147 if (handleError == null) handleError = _nullErrorHandler; | |
148 _onError = _registerErrorHandler/*<T>*/(handleError, _zone); | |
149 } | |
150 | |
151 void onDone(void handleDone()) { | |
152 if (handleDone == null) handleDone = _nullDoneHandler; | |
153 _onDone = _zone.registerCallback(handleDone); | |
154 } | |
155 | |
156 void pause([Future resumeSignal]) { | |
157 if (_isCanceled) return; | |
158 bool wasPaused = _isPaused; | |
159 bool wasInputPaused = _isInputPaused; | |
160 // Increment pause count and mark input paused (if it isn't already). | |
161 _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED; | |
162 if (resumeSignal != null) resumeSignal.whenComplete(resume); | |
163 if (!wasPaused && _pending != null) _pending.cancelSchedule(); | |
164 if (!wasInputPaused && !_inCallback) _guardCallback(_onPause); | |
165 } | |
166 | |
167 void resume() { | |
168 if (_isCanceled) return; | |
169 if (_isPaused) { | |
170 _decrementPauseCount(); | |
171 if (!_isPaused) { | |
172 if (_hasPending && !_pending.isEmpty) { | |
173 // Input is still paused. | |
174 _pending.schedule(this); | |
175 } else { | |
176 assert(_mayResumeInput); | |
177 _state &= ~_STATE_INPUT_PAUSED; | |
178 if (!_inCallback) _guardCallback(_onResume); | |
179 } | |
180 } | |
181 } | |
182 } | |
183 | |
184 Future cancel() { | |
185 // The user doesn't want to receive any further events. If there is an | |
186 // error or done event pending (waiting for the cancel to be done) discard | |
187 // that event. | |
188 _state &= ~_STATE_WAIT_FOR_CANCEL; | |
189 if (_isCanceled) return _cancelFuture; | |
190 _cancel(); | |
191 return _cancelFuture; | |
192 } | |
193 | |
194 Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) { | |
195 _Future/*<E>*/ result = new _Future/*<E>*/(); | |
196 | |
197 // Overwrite the onDone and onError handlers. | |
198 _onDone = () { result._complete(futureValue); }; | |
199 _onError = (error, stackTrace) { | |
200 cancel(); | |
201 result._completeError(error, stackTrace); | |
202 }; | |
203 | |
204 return result; | |
205 } | |
206 | |
207 // State management. | |
208 | |
209 bool get _isInputPaused => (_state & _STATE_INPUT_PAUSED) != 0; | |
210 bool get _isClosed => (_state & _STATE_CLOSED) != 0; | |
211 bool get _isCanceled => (_state & _STATE_CANCELED) != 0; | |
212 bool get _waitsForCancel => (_state & _STATE_WAIT_FOR_CANCEL) != 0; | |
213 bool get _inCallback => (_state & _STATE_IN_CALLBACK) != 0; | |
214 bool get _hasPending => (_state & _STATE_HAS_PENDING) != 0; | |
215 bool get _isPaused => _state >= _STATE_PAUSE_COUNT; | |
216 bool get _canFire => _state < _STATE_IN_CALLBACK; | |
217 bool get _mayResumeInput => | |
218 !_isPaused && (_pending == null || _pending.isEmpty); | |
219 bool get _cancelOnError => (_state & _STATE_CANCEL_ON_ERROR) != 0; | |
220 | |
221 bool get isPaused => _isPaused; | |
222 | |
223 void _cancel() { | |
224 _state |= _STATE_CANCELED; | |
225 if (_hasPending) { | |
226 _pending.cancelSchedule(); | |
227 } | |
228 if (!_inCallback) _pending = null; | |
229 _cancelFuture = _onCancel(); | |
230 } | |
231 | |
232 /** | |
233 * Increment the pause count. | |
234 * | |
235 * Also marks input as paused. | |
236 */ | |
237 void _incrementPauseCount() { | |
238 _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED; | |
239 } | |
240 | |
241 /** | |
242 * Decrements the pause count. | |
243 * | |
244 * Does not automatically unpause the input (call [_onResume]) when | |
245 * the pause count reaches zero. This is handled elsewhere, and only | |
246 * if there are no pending events buffered. | |
247 */ | |
248 void _decrementPauseCount() { | |
249 assert(_isPaused); | |
250 _state -= _STATE_PAUSE_COUNT; | |
251 } | |
252 | |
253 // _EventSink interface. | |
254 | |
255 void _add(T data) { | |
256 assert(!_isClosed); | |
257 if (_isCanceled) return; | |
258 if (_canFire) { | |
259 _sendData(data); | |
260 } else { | |
261 _addPending(new _DelayedData<dynamic /*=T*/>(data)); | |
262 } | |
263 } | |
264 | |
265 void _addError(Object error, StackTrace stackTrace) { | |
266 if (_isCanceled) return; | |
267 if (_canFire) { | |
268 _sendError(error, stackTrace); // Reports cancel after sending. | |
269 } else { | |
270 _addPending(new _DelayedError(error, stackTrace)); | |
271 } | |
272 } | |
273 | |
274 void _close() { | |
275 assert(!_isClosed); | |
276 if (_isCanceled) return; | |
277 _state |= _STATE_CLOSED; | |
278 if (_canFire) { | |
279 _sendDone(); | |
280 } else { | |
281 _addPending(const _DelayedDone()); | |
282 } | |
283 } | |
284 | |
285 // Hooks called when the input is paused, unpaused or canceled. | |
286 // These must not throw. If overwritten to call user code, include suitable | |
287 // try/catch wrapping and send any errors to | |
288 // [_Zone.current.handleUncaughtError]. | |
289 void _onPause() { | |
290 assert(_isInputPaused); | |
291 } | |
292 | |
293 void _onResume() { | |
294 assert(!_isInputPaused); | |
295 } | |
296 | |
297 Future _onCancel() { | |
298 assert(_isCanceled); | |
299 return null; | |
300 } | |
301 | |
302 // Handle pending events. | |
303 | |
304 /** | |
305 * Add a pending event. | |
306 * | |
307 * If the subscription is not paused, this also schedules a firing | |
308 * of pending events later (if necessary). | |
309 */ | |
310 void _addPending(_DelayedEvent event) { | |
311 _StreamImplEvents<T> pending = _pending; | |
312 if (_pending == null) { | |
313 pending = _pending = new _StreamImplEvents<dynamic /*=T*/>(); | |
314 } | |
315 pending.add(event); | |
316 if (!_hasPending) { | |
317 _state |= _STATE_HAS_PENDING; | |
318 if (!_isPaused) { | |
319 _pending.schedule(this); | |
320 } | |
321 } | |
322 } | |
323 | |
324 /* _EventDispatch interface. */ | |
325 | |
326 void _sendData(T data) { | |
327 assert(!_isCanceled); | |
328 assert(!_isPaused); | |
329 assert(!_inCallback); | |
330 bool wasInputPaused = _isInputPaused; | |
331 _state |= _STATE_IN_CALLBACK; | |
332 _zone.runUnaryGuarded(_onData, data); | |
333 _state &= ~_STATE_IN_CALLBACK; | |
334 _checkState(wasInputPaused); | |
335 } | |
336 | |
337 void _sendError(var error, StackTrace stackTrace) { | |
338 assert(!_isCanceled); | |
339 assert(!_isPaused); | |
340 assert(!_inCallback); | |
341 bool wasInputPaused = _isInputPaused; | |
342 | |
343 void sendError() { | |
344 // If the subscription has been canceled while waiting for the cancel | |
345 // future to finish we must not report the error. | |
346 if (_isCanceled && !_waitsForCancel) return; | |
347 _state |= _STATE_IN_CALLBACK; | |
348 if (_onError is ZoneBinaryCallback<dynamic, Object, StackTrace>) { | |
349 ZoneBinaryCallback<dynamic, Object, StackTrace> errorCallback = _onError | |
350 as Object /*=ZoneBinaryCallback<dynamic, Object, StackTrace>*/; | |
351 _zone.runBinaryGuarded(errorCallback, error, stackTrace); | |
352 } else { | |
353 _zone.runUnaryGuarded/*<dynamic, dynamic>*/( | |
354 _onError as Object /*=ZoneUnaryCallback<dynamic, dynamic>*/, error); | |
355 } | |
356 _state &= ~_STATE_IN_CALLBACK; | |
357 } | |
358 | |
359 if (_cancelOnError) { | |
360 _state |= _STATE_WAIT_FOR_CANCEL; | |
361 _cancel(); | |
362 if (_cancelFuture is Future) { | |
363 _cancelFuture.whenComplete(sendError); | |
364 } else { | |
365 sendError(); | |
366 } | |
367 } else { | |
368 sendError(); | |
369 // Only check state if not cancelOnError. | |
370 _checkState(wasInputPaused); | |
371 } | |
372 } | |
373 | |
374 void _sendDone() { | |
375 assert(!_isCanceled); | |
376 assert(!_isPaused); | |
377 assert(!_inCallback); | |
378 | |
379 void sendDone() { | |
380 // If the subscription has been canceled while waiting for the cancel | |
381 // future to finish we must not report the done event. | |
382 if (!_waitsForCancel) return; | |
383 _state |= (_STATE_CANCELED | _STATE_CLOSED | _STATE_IN_CALLBACK); | |
384 _zone.runGuarded(_onDone); | |
385 _state &= ~_STATE_IN_CALLBACK; | |
386 } | |
387 | |
388 _cancel(); | |
389 _state |= _STATE_WAIT_FOR_CANCEL; | |
390 if (_cancelFuture is Future) { | |
391 _cancelFuture.whenComplete(sendDone); | |
392 } else { | |
393 sendDone(); | |
394 } | |
395 } | |
396 | |
397 /** | |
398 * Call a hook function. | |
399 * | |
400 * The call is properly wrapped in code to avoid other callbacks | |
401 * during the call, and it checks for state changes after the call | |
402 * that should cause further callbacks. | |
403 */ | |
404 void _guardCallback(void callback()) { | |
405 assert(!_inCallback); | |
406 bool wasInputPaused = _isInputPaused; | |
407 _state |= _STATE_IN_CALLBACK; | |
408 callback(); | |
409 _state &= ~_STATE_IN_CALLBACK; | |
410 _checkState(wasInputPaused); | |
411 } | |
412 | |
413 /** | |
414 * Check if the input needs to be informed of state changes. | |
415 * | |
416 * State changes are pausing, resuming and canceling. | |
417 * | |
418 * After canceling, no further callbacks will happen. | |
419 * | |
420 * The cancel callback is called after a user cancel, or after | |
421 * the final done event is sent. | |
422 */ | |
423 void _checkState(bool wasInputPaused) { | |
424 assert(!_inCallback); | |
425 if (_hasPending && _pending.isEmpty) { | |
426 _state &= ~_STATE_HAS_PENDING; | |
427 if (_isInputPaused && _mayResumeInput) { | |
428 _state &= ~_STATE_INPUT_PAUSED; | |
429 } | |
430 } | |
431 // If the state changes during a callback, we immediately | |
432 // make a new state-change callback. Loop until the state didn't change. | |
433 while (true) { | |
434 if (_isCanceled) { | |
435 _pending = null; | |
436 return; | |
437 } | |
438 bool isInputPaused = _isInputPaused; | |
439 if (wasInputPaused == isInputPaused) break; | |
440 _state ^= _STATE_IN_CALLBACK; | |
441 if (isInputPaused) { | |
442 _onPause(); | |
443 } else { | |
444 _onResume(); | |
445 } | |
446 _state &= ~_STATE_IN_CALLBACK; | |
447 wasInputPaused = isInputPaused; | |
448 } | |
449 if (_hasPending && !_isPaused) { | |
450 _pending.schedule(this); | |
451 } | |
452 } | |
453 } | |
454 | |
455 // ------------------------------------------------------------------- | |
456 // Common base class for single and multi-subscription streams. | |
457 // ------------------------------------------------------------------- | |
458 abstract class _StreamImpl<T> extends Stream<T> { | |
459 // ------------------------------------------------------------------ | |
460 // Stream interface. | |
461 | |
462 StreamSubscription<T> listen(void onData(T data), | |
463 { Function onError, | |
464 void onDone(), | |
465 bool cancelOnError }) { | |
466 cancelOnError = identical(true, cancelOnError); | |
467 StreamSubscription<T> subscription = | |
468 _createSubscription(onData, onError, onDone, cancelOnError); | |
469 _onListen(subscription); | |
470 return subscription; | |
471 } | |
472 | |
473 // ------------------------------------------------------------------- | |
474 /** Create a subscription object. Called by [subcribe]. */ | |
475 StreamSubscription<T> _createSubscription( | |
476 void onData(T data), | |
477 Function onError, | |
478 void onDone(), | |
479 bool cancelOnError) { | |
480 return new _BufferingStreamSubscription<T>(onData, onError, onDone, | |
481 cancelOnError); | |
482 } | |
483 | |
484 /** Hook called when the subscription has been created. */ | |
485 void _onListen(StreamSubscription subscription) {} | |
486 } | |
487 | |
488 typedef _PendingEvents<T> _EventGenerator<T>(); | |
489 | |
490 /** Stream that generates its own events. */ | |
491 class _GeneratedStreamImpl<T> extends _StreamImpl<T> { | |
492 final _EventGenerator<T> _pending; | |
493 bool _isUsed = false; | |
494 /** | |
495 * Initializes the stream to have only the events provided by a | |
496 * [_PendingEvents]. | |
497 * | |
498 * A new [_PendingEvents] must be generated for each listen. | |
499 */ | |
500 _GeneratedStreamImpl(this._pending); | |
501 | |
502 StreamSubscription<T> _createSubscription( | |
503 void onData(T data), | |
504 Function onError, | |
505 void onDone(), | |
506 bool cancelOnError) { | |
507 if (_isUsed) throw new StateError("Stream has already been listened to."); | |
508 _isUsed = true; | |
509 return new _BufferingStreamSubscription<T>( | |
510 onData, onError, onDone, cancelOnError).._setPendingEvents(_pending()); | |
511 } | |
512 } | |
513 | |
514 | |
515 /** Pending events object that gets its events from an [Iterable]. */ | |
516 class _IterablePendingEvents<T> extends _PendingEvents<T> { | |
517 // The iterator providing data for data events. | |
518 // Set to null when iteration has completed. | |
519 Iterator<T> _iterator; | |
520 | |
521 _IterablePendingEvents(Iterable<T> data) : _iterator = data.iterator; | |
522 | |
523 bool get isEmpty => _iterator == null; | |
524 | |
525 void handleNext(_EventDispatch<T> dispatch) { | |
526 if (_iterator == null) { | |
527 throw new StateError("No events pending."); | |
528 } | |
529 // Send one event per call to moveNext. | |
530 // If moveNext returns true, send the current element as data. | |
531 // If moveNext returns false, send a done event and clear the _iterator. | |
532 // If moveNext throws an error, send an error and clear the _iterator. | |
533 // After an error, no further events will be sent. | |
534 bool isDone; | |
535 try { | |
536 isDone = !_iterator.moveNext(); | |
537 } catch (e, s) { | |
538 _iterator = null; | |
539 dispatch._sendError(e, s); | |
540 return; | |
541 } | |
542 if (!isDone) { | |
543 dispatch._sendData(_iterator.current); | |
544 } else { | |
545 _iterator = null; | |
546 dispatch._sendDone(); | |
547 } | |
548 } | |
549 | |
550 void clear() { | |
551 if (isScheduled) cancelSchedule(); | |
552 _iterator = null; | |
553 } | |
554 } | |
555 | |
556 | |
557 // Internal helpers. | |
558 | |
559 // Types of the different handlers on a stream. Types used to type fields. | |
560 typedef void _DataHandler<T>(T value); | |
561 typedef void _DoneHandler(); | |
562 | |
563 | |
564 /** Default data handler, does nothing. */ | |
565 void _nullDataHandler(var value) {} | |
566 | |
567 /** Default error handler, reports the error to the current zone's handler. */ | |
568 void _nullErrorHandler(error, [StackTrace stackTrace]) { | |
569 Zone.current.handleUncaughtError(error, stackTrace); | |
570 } | |
571 | |
572 /** Default done handler, does nothing. */ | |
573 void _nullDoneHandler() {} | |
574 | |
575 | |
576 /** A delayed event on a buffering stream subscription. */ | |
577 abstract class _DelayedEvent<T> { | |
578 /** Added as a linked list on the [StreamController]. */ | |
579 _DelayedEvent next; | |
580 /** Execute the delayed event on the [StreamController]. */ | |
581 void perform(_EventDispatch<T> dispatch); | |
582 } | |
583 | |
584 /** A delayed data event. */ | |
585 class _DelayedData<T> extends _DelayedEvent<T> { | |
586 final T value; | |
587 _DelayedData(this.value); | |
588 void perform(_EventDispatch<T> dispatch) { | |
589 dispatch._sendData(value); | |
590 } | |
591 } | |
592 | |
593 /** A delayed error event. */ | |
594 class _DelayedError extends _DelayedEvent { | |
595 final error; | |
596 final StackTrace stackTrace; | |
597 | |
598 _DelayedError(this.error, this.stackTrace); | |
599 void perform(_EventDispatch dispatch) { | |
600 dispatch._sendError(error, stackTrace); | |
601 } | |
602 } | |
603 | |
604 /** A delayed done event. */ | |
605 class _DelayedDone implements _DelayedEvent { | |
606 const _DelayedDone(); | |
607 void perform(_EventDispatch dispatch) { | |
608 dispatch._sendDone(); | |
609 } | |
610 | |
611 _DelayedEvent get next => null; | |
612 | |
613 void set next(_DelayedEvent _) { | |
614 throw new StateError("No events after a done."); | |
615 } | |
616 } | |
617 | |
618 /** Superclass for provider of pending events. */ | |
619 abstract class _PendingEvents<T> { | |
620 // No async event has been scheduled. | |
621 static const int _STATE_UNSCHEDULED = 0; | |
622 // An async event has been scheduled to run a function. | |
623 static const int _STATE_SCHEDULED = 1; | |
624 // An async event has been scheduled, but it will do nothing when it runs. | |
625 // Async events can't be preempted. | |
626 static const int _STATE_CANCELED = 3; | |
627 | |
628 /** | |
629 * State of being scheduled. | |
630 * | |
631 * Set to [_STATE_SCHEDULED] when pending events are scheduled for | |
632 * async dispatch. Since we can't cancel a [scheduleMicrotask] call, if | |
633 * scheduling is "canceled", the _state is simply set to [_STATE_CANCELED] | |
634 * which will make the async code do nothing except resetting [_state]. | |
635 * | |
636 * If events are scheduled while the state is [_STATE_CANCELED], it is | |
637 * merely switched back to [_STATE_SCHEDULED], but no new call to | |
638 * [scheduleMicrotask] is performed. | |
639 */ | |
640 int _state = _STATE_UNSCHEDULED; | |
641 | |
642 bool get isEmpty; | |
643 | |
644 bool get isScheduled => _state == _STATE_SCHEDULED; | |
645 bool get _eventScheduled => _state >= _STATE_SCHEDULED; | |
646 | |
647 /** | |
648 * Schedule an event to run later. | |
649 * | |
650 * If called more than once, it should be called with the same dispatch as | |
651 * argument each time. It may reuse an earlier argument in some cases. | |
652 */ | |
653 void schedule(_EventDispatch<T> dispatch) { | |
654 if (isScheduled) return; | |
655 assert(!isEmpty); | |
656 if (_eventScheduled) { | |
657 assert(_state == _STATE_CANCELED); | |
658 _state = _STATE_SCHEDULED; | |
659 return; | |
660 } | |
661 scheduleMicrotask(() { | |
662 int oldState = _state; | |
663 _state = _STATE_UNSCHEDULED; | |
664 if (oldState == _STATE_CANCELED) return; | |
665 handleNext(dispatch); | |
666 }); | |
667 _state = _STATE_SCHEDULED; | |
668 } | |
669 | |
670 void cancelSchedule() { | |
671 if (isScheduled) _state = _STATE_CANCELED; | |
672 } | |
673 | |
674 void handleNext(_EventDispatch<T> dispatch); | |
675 | |
676 /** Throw away any pending events and cancel scheduled events. */ | |
677 void clear(); | |
678 } | |
679 | |
680 | |
681 /** Class holding pending events for a [_StreamImpl]. */ | |
682 class _StreamImplEvents<T> extends _PendingEvents<T> { | |
683 /// Single linked list of [_DelayedEvent] objects. | |
684 _DelayedEvent firstPendingEvent = null; | |
685 /// Last element in the list of pending events. New events are added after it. | |
686 _DelayedEvent lastPendingEvent = null; | |
687 | |
688 bool get isEmpty => lastPendingEvent == null; | |
689 | |
690 void add(_DelayedEvent event) { | |
691 if (lastPendingEvent == null) { | |
692 firstPendingEvent = lastPendingEvent = event; | |
693 } else { | |
694 lastPendingEvent = lastPendingEvent.next = event; | |
695 } | |
696 } | |
697 | |
698 void handleNext(_EventDispatch<T> dispatch) { | |
699 assert(!isScheduled); | |
700 _DelayedEvent event = firstPendingEvent; | |
701 firstPendingEvent = event.next; | |
702 if (firstPendingEvent == null) { | |
703 lastPendingEvent = null; | |
704 } | |
705 event.perform(dispatch); | |
706 } | |
707 | |
708 void clear() { | |
709 if (isScheduled) cancelSchedule(); | |
710 firstPendingEvent = lastPendingEvent = null; | |
711 } | |
712 } | |
713 | |
714 class _BroadcastLinkedList { | |
715 _BroadcastLinkedList _next; | |
716 _BroadcastLinkedList _previous; | |
717 | |
718 void _unlink() { | |
719 _previous._next = _next; | |
720 _next._previous = _previous; | |
721 _next = _previous = this; | |
722 } | |
723 | |
724 void _insertBefore(_BroadcastLinkedList newNext) { | |
725 _BroadcastLinkedList newPrevious = newNext._previous; | |
726 newPrevious._next = this; | |
727 newNext._previous = _previous; | |
728 _previous._next = newNext; | |
729 _previous = newPrevious; | |
730 } | |
731 } | |
732 | |
733 typedef void _BroadcastCallback<T>(StreamSubscription<T> subscription); | |
734 | |
735 /** | |
736 * Done subscription that will send one done event as soon as possible. | |
737 */ | |
738 class _DoneStreamSubscription<T> implements StreamSubscription<T> { | |
739 static const int _DONE_SENT = 1; | |
740 static const int _SCHEDULED = 2; | |
741 static const int _PAUSED = 4; | |
742 | |
743 final Zone _zone; | |
744 int _state = 0; | |
745 _DoneHandler _onDone; | |
746 | |
747 _DoneStreamSubscription(this._onDone) : _zone = Zone.current { | |
748 _schedule(); | |
749 } | |
750 | |
751 bool get _isSent => (_state & _DONE_SENT) != 0; | |
752 bool get _isScheduled => (_state & _SCHEDULED) != 0; | |
753 bool get isPaused => _state >= _PAUSED; | |
754 | |
755 void _schedule() { | |
756 if (_isScheduled) return; | |
757 _zone.scheduleMicrotask(_sendDone); | |
758 _state |= _SCHEDULED; | |
759 } | |
760 | |
761 void onData(void handleData(T data)) {} | |
762 void onError(Function handleError) {} | |
763 void onDone(void handleDone()) { _onDone = handleDone; } | |
764 | |
765 void pause([Future resumeSignal]) { | |
766 _state += _PAUSED; | |
767 if (resumeSignal != null) resumeSignal.whenComplete(resume); | |
768 } | |
769 | |
770 void resume() { | |
771 if (isPaused) { | |
772 _state -= _PAUSED; | |
773 if (!isPaused && !_isSent) { | |
774 _schedule(); | |
775 } | |
776 } | |
777 } | |
778 | |
779 Future cancel() => null; | |
780 | |
781 Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) { | |
782 _Future/*<E>*/ result = new _Future/*<E>*/(); | |
783 _onDone = () { result._completeWithValue(null); }; | |
784 return result; | |
785 } | |
786 | |
787 void _sendDone() { | |
788 _state &= ~_SCHEDULED; | |
789 if (isPaused) return; | |
790 _state |= _DONE_SENT; | |
791 if (_onDone != null) _zone.runGuarded(_onDone); | |
792 } | |
793 } | |
794 | |
795 class _AsBroadcastStream<T> extends Stream<T> { | |
796 final Stream<T> _source; | |
797 final _BroadcastCallback<T> _onListenHandler; | |
798 final _BroadcastCallback<T> _onCancelHandler; | |
799 final Zone _zone; | |
800 | |
801 _AsBroadcastStreamController<T> _controller; | |
802 StreamSubscription<T> _subscription; | |
803 | |
804 _AsBroadcastStream(this._source, | |
805 void onListenHandler(StreamSubscription<T> subscription), | |
806 void onCancelHandler(StreamSubscription<T> subscription)) | |
807 // TODO(floitsch): the return type should be void and should be | |
808 // inferred. | |
809 : _onListenHandler = Zone.current.registerUnaryCallback | |
810 /*<dynamic, StreamSubscription<T>>*/(onListenHandler), | |
811 _onCancelHandler = Zone.current.registerUnaryCallback | |
812 /*<dynamic, StreamSubscription<T>>*/(onCancelHandler), | |
813 _zone = Zone.current { | |
814 _controller = new _AsBroadcastStreamController<T>(_onListen, _onCancel); | |
815 } | |
816 | |
817 bool get isBroadcast => true; | |
818 | |
819 StreamSubscription<T> listen(void onData(T data), | |
820 { Function onError, | |
821 void onDone(), | |
822 bool cancelOnError}) { | |
823 if (_controller == null || _controller.isClosed) { | |
824 // Return a dummy subscription backed by nothing, since | |
825 // it will only ever send one done event. | |
826 return new _DoneStreamSubscription<T>(onDone); | |
827 } | |
828 if (_subscription == null) { | |
829 _subscription = _source.listen(_controller.add, | |
830 onError: _controller.addError, | |
831 onDone: _controller.close); | |
832 } | |
833 cancelOnError = identical(true, cancelOnError); | |
834 return _controller._subscribe(onData, onError, onDone, cancelOnError); | |
835 } | |
836 | |
837 void _onCancel() { | |
838 bool shutdown = (_controller == null) || _controller.isClosed; | |
839 if (_onCancelHandler != null) { | |
840 _zone.runUnary( | |
841 _onCancelHandler, new _BroadcastSubscriptionWrapper<T>(this)); | |
842 } | |
843 if (shutdown) { | |
844 if (_subscription != null) { | |
845 _subscription.cancel(); | |
846 _subscription = null; | |
847 } | |
848 } | |
849 } | |
850 | |
851 void _onListen() { | |
852 if (_onListenHandler != null) { | |
853 _zone.runUnary( | |
854 _onListenHandler, new _BroadcastSubscriptionWrapper<T>(this)); | |
855 } | |
856 } | |
857 | |
858 // Methods called from _BroadcastSubscriptionWrapper. | |
859 void _cancelSubscription() { | |
860 if (_subscription == null) return; | |
861 // Called by [_controller] when it has no subscribers left. | |
862 StreamSubscription subscription = _subscription; | |
863 _subscription = null; | |
864 _controller = null; // Marks the stream as no longer listenable. | |
865 subscription.cancel(); | |
866 } | |
867 | |
868 void _pauseSubscription(Future resumeSignal) { | |
869 if (_subscription == null) return; | |
870 _subscription.pause(resumeSignal); | |
871 } | |
872 | |
873 void _resumeSubscription() { | |
874 if (_subscription == null) return; | |
875 _subscription.resume(); | |
876 } | |
877 | |
878 bool get _isSubscriptionPaused { | |
879 if (_subscription == null) return false; | |
880 return _subscription.isPaused; | |
881 } | |
882 } | |
883 | |
884 /** | |
885 * Wrapper for subscription that disallows changing handlers. | |
886 */ | |
887 class _BroadcastSubscriptionWrapper<T> implements StreamSubscription<T> { | |
888 final _AsBroadcastStream _stream; | |
889 | |
890 _BroadcastSubscriptionWrapper(this._stream); | |
891 | |
892 void onData(void handleData(T data)) { | |
893 throw new UnsupportedError( | |
894 "Cannot change handlers of asBroadcastStream source subscription."); | |
895 } | |
896 | |
897 void onError(Function handleError) { | |
898 throw new UnsupportedError( | |
899 "Cannot change handlers of asBroadcastStream source subscription."); | |
900 } | |
901 | |
902 void onDone(void handleDone()) { | |
903 throw new UnsupportedError( | |
904 "Cannot change handlers of asBroadcastStream source subscription."); | |
905 } | |
906 | |
907 void pause([Future resumeSignal]) { | |
908 _stream._pauseSubscription(resumeSignal); | |
909 } | |
910 | |
911 void resume() { | |
912 _stream._resumeSubscription(); | |
913 } | |
914 | |
915 Future cancel() { | |
916 _stream._cancelSubscription(); | |
917 return null; | |
918 } | |
919 | |
920 bool get isPaused { | |
921 return _stream._isSubscriptionPaused; | |
922 } | |
923 | |
924 Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) { | |
925 throw new UnsupportedError( | |
926 "Cannot change handlers of asBroadcastStream source subscription."); | |
927 } | |
928 } | |
929 | |
930 | |
931 /** | |
932 * Simple implementation of [StreamIterator]. | |
933 */ | |
934 class _StreamIteratorImpl<T> implements StreamIterator<T> { | |
935 // Internal state of the stream iterator. | |
936 // At any time, it is in one of these states. | |
937 // The interpretation of the [_futureOrPrefecth] field depends on the state. | |
938 // In _STATE_MOVING, the _data field holds the most recently returned | |
939 // future. | |
940 // When in one of the _STATE_EXTRA_* states, the it may hold the | |
941 // next data/error object, and the subscription is paused. | |
942 | |
943 /// The simple state where [_data] holds the data to return, and [moveNext] | |
944 /// is allowed. The subscription is actively listening. | |
945 static const int _STATE_FOUND = 0; | |
946 /// State set after [moveNext] has returned false or an error, | |
947 /// or after calling [cancel]. The subscription is always canceled. | |
948 static const int _STATE_DONE = 1; | |
949 /// State set after calling [moveNext], but before its returned future has | |
950 /// completed. Calling [moveNext] again is not allowed in this state. | |
951 /// The subscription is actively listening. | |
952 static const int _STATE_MOVING = 2; | |
953 /// States set when another event occurs while in _STATE_FOUND. | |
954 /// This extra overflow event is cached until the next call to [moveNext], | |
955 /// which will complete as if it received the event normally. | |
956 /// The subscription is paused in these states, so we only ever get one | |
957 /// event too many. | |
958 static const int _STATE_EXTRA_DATA = 3; | |
959 static const int _STATE_EXTRA_ERROR = 4; | |
960 static const int _STATE_EXTRA_DONE = 5; | |
961 | |
962 /// Subscription being listened to. | |
963 StreamSubscription _subscription; | |
964 | |
965 /// The current element represented by the most recent call to moveNext. | |
966 /// | |
967 /// Is null between the time moveNext is called and its future completes. | |
968 T _current = null; | |
969 | |
970 /// The future returned by the most recent call to [moveNext]. | |
971 /// | |
972 /// Also used to store the next value/error in case the stream provides an | |
973 /// event before [moveNext] is called again. In that case, the stream will | |
974 /// be paused to prevent further events. | |
975 var/*Future<bool> or T*/ _futureOrPrefetch = null; | |
976 | |
977 /// The current state. | |
978 int _state = _STATE_FOUND; | |
979 | |
980 _StreamIteratorImpl(final Stream<T> stream) { | |
981 _subscription = stream.listen(_onData, | |
982 onError: _onError, | |
983 onDone: _onDone, | |
984 cancelOnError: true); | |
985 } | |
986 | |
987 T get current => _current; | |
988 | |
989 Future<bool> moveNext() { | |
990 if (_state == _STATE_DONE) { | |
991 return new _Future<bool>.immediate(false); | |
992 } | |
993 if (_state == _STATE_MOVING) { | |
994 throw new StateError("Already waiting for next."); | |
995 } | |
996 if (_state == _STATE_FOUND) { | |
997 _state = _STATE_MOVING; | |
998 _current = null; | |
999 var result = new _Future<bool>(); | |
1000 _futureOrPrefetch = result; | |
1001 return result; | |
1002 } else { | |
1003 assert(_state >= _STATE_EXTRA_DATA); | |
1004 switch (_state) { | |
1005 case _STATE_EXTRA_DATA: | |
1006 _state = _STATE_FOUND; | |
1007 _current = _futureOrPrefetch as Object /*=T*/; | |
1008 _futureOrPrefetch = null; | |
1009 _subscription.resume(); | |
1010 return new _Future<bool>.immediate(true); | |
1011 case _STATE_EXTRA_ERROR: | |
1012 AsyncError prefetch = _futureOrPrefetch; | |
1013 _clear(); | |
1014 return new _Future<bool>.immediateError(prefetch.error, | |
1015 prefetch.stackTrace); | |
1016 case _STATE_EXTRA_DONE: | |
1017 _clear(); | |
1018 return new _Future<bool>.immediate(false); | |
1019 } | |
1020 } | |
1021 } | |
1022 | |
1023 /** Clears up the internal state when the iterator ends. */ | |
1024 void _clear() { | |
1025 _subscription = null; | |
1026 _futureOrPrefetch = null; | |
1027 _current = null; | |
1028 _state = _STATE_DONE; | |
1029 } | |
1030 | |
1031 Future cancel() { | |
1032 StreamSubscription subscription = _subscription; | |
1033 if (subscription == null) return null; | |
1034 if (_state == _STATE_MOVING) { | |
1035 _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/; | |
1036 _clear(); | |
1037 hasNext._complete(false); | |
1038 } else { | |
1039 _clear(); | |
1040 } | |
1041 return subscription.cancel(); | |
1042 } | |
1043 | |
1044 void _onData(T data) { | |
1045 if (_state == _STATE_MOVING) { | |
1046 _current = data; | |
1047 _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/; | |
1048 _futureOrPrefetch = null; | |
1049 _state = _STATE_FOUND; | |
1050 hasNext._complete(true); | |
1051 return; | |
1052 } | |
1053 _subscription.pause(); | |
1054 assert(_futureOrPrefetch == null); | |
1055 _futureOrPrefetch = data; | |
1056 _state = _STATE_EXTRA_DATA; | |
1057 } | |
1058 | |
1059 void _onError(Object error, [StackTrace stackTrace]) { | |
1060 if (_state == _STATE_MOVING) { | |
1061 _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/; | |
1062 // We have cancelOnError: true, so the subscription is canceled. | |
1063 _clear(); | |
1064 hasNext._completeError(error, stackTrace); | |
1065 return; | |
1066 } | |
1067 _subscription.pause(); | |
1068 assert(_futureOrPrefetch == null); | |
1069 _futureOrPrefetch = new AsyncError(error, stackTrace); | |
1070 _state = _STATE_EXTRA_ERROR; | |
1071 } | |
1072 | |
1073 void _onDone() { | |
1074 if (_state == _STATE_MOVING) { | |
1075 _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/; | |
1076 _clear(); | |
1077 hasNext._complete(false); | |
1078 return; | |
1079 } | |
1080 _subscription.pause(); | |
1081 _futureOrPrefetch = null; | |
1082 _state = _STATE_EXTRA_DONE; | |
1083 } | |
1084 } | |
1085 | |
1086 /** An empty broadcast stream, sending a done event as soon as possible. */ | |
1087 class _EmptyStream<T> extends Stream<T> { | |
1088 const _EmptyStream() : super._internal(); | |
1089 bool get isBroadcast => true; | |
1090 StreamSubscription<T> listen(void onData(T data), | |
1091 {Function onError, | |
1092 void onDone(), | |
1093 bool cancelOnError}) { | |
1094 return new _DoneStreamSubscription<T>(onDone); | |
1095 } | |
1096 } | |
OLD | NEW |