Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(128)

Side by Side Diff: pkg/dev_compiler/tool/input_sdk/lib/async/stream_impl.dart

Issue 2698353003: unfork DDC's copy of most SDK libraries (Closed)
Patch Set: revert core_patch Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 part of dart.async;
6
7 /** Abstract and private interface for a place to put events. */
8 abstract class _EventSink<T> {
9 void _add(T data);
10 void _addError(Object error, StackTrace stackTrace);
11 void _close();
12 }
13
14 /**
15 * Abstract and private interface for a place to send events.
16 *
17 * Used by event buffering to finally dispatch the pending event, where
18 * [_EventSink] is where the event first enters the stream subscription,
19 * and may yet be buffered.
20 */
21 abstract class _EventDispatch<T> {
22 void _sendData(T data);
23 void _sendError(Object error, StackTrace stackTrace);
24 void _sendDone();
25 }
26
27 /**
28 * Default implementation of stream subscription of buffering events.
29 *
30 * The only public methods are those of [StreamSubscription], so instances of
31 * [_BufferingStreamSubscription] can be returned directly as a
32 * [StreamSubscription] without exposing internal functionality.
33 *
34 * The [StreamController] is a public facing version of [Stream] and this class,
35 * with some methods made public.
36 *
37 * The user interface of [_BufferingStreamSubscription] are the following
38 * methods:
39 *
40 * * [_add]: Add a data event to the stream.
41 * * [_addError]: Add an error event to the stream.
42 * * [_close]: Request to close the stream.
43 * * [_onCancel]: Called when the subscription will provide no more events,
44 * either due to being actively canceled, or after sending a done event.
45 * * [_onPause]: Called when the subscription wants the event source to pause.
46 * * [_onResume]: Called when allowing new events after a pause.
47 *
48 * The user should not add new events when the subscription requests a paused,
49 * but if it happens anyway, the subscription will enqueue the events just as
50 * when new events arrive while still firing an old event.
51 */
52 class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
53 _EventSink<T>,
54 _EventDispatch<T> {
55 /** The `cancelOnError` flag from the `listen` call. */
56 static const int _STATE_CANCEL_ON_ERROR = 1;
57 /**
58 * Whether the "done" event has been received.
59 * No further events are accepted after this.
60 */
61 static const int _STATE_CLOSED = 2;
62 /**
63 * Set if the input has been asked not to send events.
64 *
65 * This is not the same as being paused, since the input will remain paused
66 * after a call to [resume] if there are pending events.
67 */
68 static const int _STATE_INPUT_PAUSED = 4;
69 /**
70 * Whether the subscription has been canceled.
71 *
72 * Set by calling [cancel], or by handling a "done" event, or an "error" event
73 * when `cancelOnError` is true.
74 */
75 static const int _STATE_CANCELED = 8;
76 /**
77 * Set when either:
78 *
79 * * an error is sent, and [cancelOnError] is true, or
80 * * a done event is sent.
81 *
82 * If the subscription is canceled while _STATE_WAIT_FOR_CANCEL is set, the
83 * state is unset, and no furher events must be delivered.
84 */
85 static const int _STATE_WAIT_FOR_CANCEL = 16;
86 static const int _STATE_IN_CALLBACK = 32;
87 static const int _STATE_HAS_PENDING = 64;
88 static const int _STATE_PAUSE_COUNT = 128;
89 static const int _STATE_PAUSE_COUNT_SHIFT = 7;
90
91 /* Event handlers provided in constructor. */
92 _DataHandler<T> _onData;
93 Function _onError;
94 _DoneHandler _onDone;
95 final Zone _zone = Zone.current;
96
97 /** Bit vector based on state-constants above. */
98 int _state;
99
100 // TODO(floitsch): reuse another field
101 /** The future [_onCancel] may return. */
102 Future _cancelFuture;
103
104 /**
105 * Queue of pending events.
106 *
107 * Is created when necessary, or set in constructor for preconfigured events.
108 */
109 _PendingEvents<T> _pending;
110
111 _BufferingStreamSubscription(void onData(T data),
112 Function onError,
113 void onDone(),
114 bool cancelOnError)
115 : _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0) {
116 this.onData(onData);
117 this.onError(onError);
118 this.onDone(onDone);
119 }
120
121 /**
122 * Sets the subscription's pending events object.
123 *
124 * This can only be done once. The pending events object is used for the
125 * rest of the subscription's life cycle.
126 */
127 void _setPendingEvents(_PendingEvents<T> pendingEvents) {
128 assert(_pending == null);
129 if (pendingEvents == null) return;
130 _pending = pendingEvents;
131 if (!pendingEvents.isEmpty) {
132 _state |= _STATE_HAS_PENDING;
133 _pending.schedule(this);
134 }
135 }
136
137 // StreamSubscription interface.
138
139 void onData(void handleData(T event)) {
140 if (handleData == null) handleData = _nullDataHandler;
141 // TODO(floitsch): the return type should be 'void', and the type
142 // should be inferred.
143 _onData = _zone.registerUnaryCallback/*<dynamic, T>*/(handleData);
144 }
145
146 void onError(Function handleError) {
147 if (handleError == null) handleError = _nullErrorHandler;
148 _onError = _registerErrorHandler/*<T>*/(handleError, _zone);
149 }
150
151 void onDone(void handleDone()) {
152 if (handleDone == null) handleDone = _nullDoneHandler;
153 _onDone = _zone.registerCallback(handleDone);
154 }
155
156 void pause([Future resumeSignal]) {
157 if (_isCanceled) return;
158 bool wasPaused = _isPaused;
159 bool wasInputPaused = _isInputPaused;
160 // Increment pause count and mark input paused (if it isn't already).
161 _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED;
162 if (resumeSignal != null) resumeSignal.whenComplete(resume);
163 if (!wasPaused && _pending != null) _pending.cancelSchedule();
164 if (!wasInputPaused && !_inCallback) _guardCallback(_onPause);
165 }
166
167 void resume() {
168 if (_isCanceled) return;
169 if (_isPaused) {
170 _decrementPauseCount();
171 if (!_isPaused) {
172 if (_hasPending && !_pending.isEmpty) {
173 // Input is still paused.
174 _pending.schedule(this);
175 } else {
176 assert(_mayResumeInput);
177 _state &= ~_STATE_INPUT_PAUSED;
178 if (!_inCallback) _guardCallback(_onResume);
179 }
180 }
181 }
182 }
183
184 Future cancel() {
185 // The user doesn't want to receive any further events. If there is an
186 // error or done event pending (waiting for the cancel to be done) discard
187 // that event.
188 _state &= ~_STATE_WAIT_FOR_CANCEL;
189 if (_isCanceled) return _cancelFuture;
190 _cancel();
191 return _cancelFuture;
192 }
193
194 Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) {
195 _Future/*<E>*/ result = new _Future/*<E>*/();
196
197 // Overwrite the onDone and onError handlers.
198 _onDone = () { result._complete(futureValue); };
199 _onError = (error, stackTrace) {
200 cancel();
201 result._completeError(error, stackTrace);
202 };
203
204 return result;
205 }
206
207 // State management.
208
209 bool get _isInputPaused => (_state & _STATE_INPUT_PAUSED) != 0;
210 bool get _isClosed => (_state & _STATE_CLOSED) != 0;
211 bool get _isCanceled => (_state & _STATE_CANCELED) != 0;
212 bool get _waitsForCancel => (_state & _STATE_WAIT_FOR_CANCEL) != 0;
213 bool get _inCallback => (_state & _STATE_IN_CALLBACK) != 0;
214 bool get _hasPending => (_state & _STATE_HAS_PENDING) != 0;
215 bool get _isPaused => _state >= _STATE_PAUSE_COUNT;
216 bool get _canFire => _state < _STATE_IN_CALLBACK;
217 bool get _mayResumeInput =>
218 !_isPaused && (_pending == null || _pending.isEmpty);
219 bool get _cancelOnError => (_state & _STATE_CANCEL_ON_ERROR) != 0;
220
221 bool get isPaused => _isPaused;
222
223 void _cancel() {
224 _state |= _STATE_CANCELED;
225 if (_hasPending) {
226 _pending.cancelSchedule();
227 }
228 if (!_inCallback) _pending = null;
229 _cancelFuture = _onCancel();
230 }
231
232 /**
233 * Increment the pause count.
234 *
235 * Also marks input as paused.
236 */
237 void _incrementPauseCount() {
238 _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED;
239 }
240
241 /**
242 * Decrements the pause count.
243 *
244 * Does not automatically unpause the input (call [_onResume]) when
245 * the pause count reaches zero. This is handled elsewhere, and only
246 * if there are no pending events buffered.
247 */
248 void _decrementPauseCount() {
249 assert(_isPaused);
250 _state -= _STATE_PAUSE_COUNT;
251 }
252
253 // _EventSink interface.
254
255 void _add(T data) {
256 assert(!_isClosed);
257 if (_isCanceled) return;
258 if (_canFire) {
259 _sendData(data);
260 } else {
261 _addPending(new _DelayedData<dynamic /*=T*/>(data));
262 }
263 }
264
265 void _addError(Object error, StackTrace stackTrace) {
266 if (_isCanceled) return;
267 if (_canFire) {
268 _sendError(error, stackTrace); // Reports cancel after sending.
269 } else {
270 _addPending(new _DelayedError(error, stackTrace));
271 }
272 }
273
274 void _close() {
275 assert(!_isClosed);
276 if (_isCanceled) return;
277 _state |= _STATE_CLOSED;
278 if (_canFire) {
279 _sendDone();
280 } else {
281 _addPending(const _DelayedDone());
282 }
283 }
284
285 // Hooks called when the input is paused, unpaused or canceled.
286 // These must not throw. If overwritten to call user code, include suitable
287 // try/catch wrapping and send any errors to
288 // [_Zone.current.handleUncaughtError].
289 void _onPause() {
290 assert(_isInputPaused);
291 }
292
293 void _onResume() {
294 assert(!_isInputPaused);
295 }
296
297 Future _onCancel() {
298 assert(_isCanceled);
299 return null;
300 }
301
302 // Handle pending events.
303
304 /**
305 * Add a pending event.
306 *
307 * If the subscription is not paused, this also schedules a firing
308 * of pending events later (if necessary).
309 */
310 void _addPending(_DelayedEvent event) {
311 _StreamImplEvents<T> pending = _pending;
312 if (_pending == null) {
313 pending = _pending = new _StreamImplEvents<dynamic /*=T*/>();
314 }
315 pending.add(event);
316 if (!_hasPending) {
317 _state |= _STATE_HAS_PENDING;
318 if (!_isPaused) {
319 _pending.schedule(this);
320 }
321 }
322 }
323
324 /* _EventDispatch interface. */
325
326 void _sendData(T data) {
327 assert(!_isCanceled);
328 assert(!_isPaused);
329 assert(!_inCallback);
330 bool wasInputPaused = _isInputPaused;
331 _state |= _STATE_IN_CALLBACK;
332 _zone.runUnaryGuarded(_onData, data);
333 _state &= ~_STATE_IN_CALLBACK;
334 _checkState(wasInputPaused);
335 }
336
337 void _sendError(var error, StackTrace stackTrace) {
338 assert(!_isCanceled);
339 assert(!_isPaused);
340 assert(!_inCallback);
341 bool wasInputPaused = _isInputPaused;
342
343 void sendError() {
344 // If the subscription has been canceled while waiting for the cancel
345 // future to finish we must not report the error.
346 if (_isCanceled && !_waitsForCancel) return;
347 _state |= _STATE_IN_CALLBACK;
348 if (_onError is ZoneBinaryCallback<dynamic, Object, StackTrace>) {
349 ZoneBinaryCallback<dynamic, Object, StackTrace> errorCallback = _onError
350 as Object /*=ZoneBinaryCallback<dynamic, Object, StackTrace>*/;
351 _zone.runBinaryGuarded(errorCallback, error, stackTrace);
352 } else {
353 _zone.runUnaryGuarded/*<dynamic, dynamic>*/(
354 _onError as Object /*=ZoneUnaryCallback<dynamic, dynamic>*/, error);
355 }
356 _state &= ~_STATE_IN_CALLBACK;
357 }
358
359 if (_cancelOnError) {
360 _state |= _STATE_WAIT_FOR_CANCEL;
361 _cancel();
362 if (_cancelFuture is Future) {
363 _cancelFuture.whenComplete(sendError);
364 } else {
365 sendError();
366 }
367 } else {
368 sendError();
369 // Only check state if not cancelOnError.
370 _checkState(wasInputPaused);
371 }
372 }
373
374 void _sendDone() {
375 assert(!_isCanceled);
376 assert(!_isPaused);
377 assert(!_inCallback);
378
379 void sendDone() {
380 // If the subscription has been canceled while waiting for the cancel
381 // future to finish we must not report the done event.
382 if (!_waitsForCancel) return;
383 _state |= (_STATE_CANCELED | _STATE_CLOSED | _STATE_IN_CALLBACK);
384 _zone.runGuarded(_onDone);
385 _state &= ~_STATE_IN_CALLBACK;
386 }
387
388 _cancel();
389 _state |= _STATE_WAIT_FOR_CANCEL;
390 if (_cancelFuture is Future) {
391 _cancelFuture.whenComplete(sendDone);
392 } else {
393 sendDone();
394 }
395 }
396
397 /**
398 * Call a hook function.
399 *
400 * The call is properly wrapped in code to avoid other callbacks
401 * during the call, and it checks for state changes after the call
402 * that should cause further callbacks.
403 */
404 void _guardCallback(void callback()) {
405 assert(!_inCallback);
406 bool wasInputPaused = _isInputPaused;
407 _state |= _STATE_IN_CALLBACK;
408 callback();
409 _state &= ~_STATE_IN_CALLBACK;
410 _checkState(wasInputPaused);
411 }
412
413 /**
414 * Check if the input needs to be informed of state changes.
415 *
416 * State changes are pausing, resuming and canceling.
417 *
418 * After canceling, no further callbacks will happen.
419 *
420 * The cancel callback is called after a user cancel, or after
421 * the final done event is sent.
422 */
423 void _checkState(bool wasInputPaused) {
424 assert(!_inCallback);
425 if (_hasPending && _pending.isEmpty) {
426 _state &= ~_STATE_HAS_PENDING;
427 if (_isInputPaused && _mayResumeInput) {
428 _state &= ~_STATE_INPUT_PAUSED;
429 }
430 }
431 // If the state changes during a callback, we immediately
432 // make a new state-change callback. Loop until the state didn't change.
433 while (true) {
434 if (_isCanceled) {
435 _pending = null;
436 return;
437 }
438 bool isInputPaused = _isInputPaused;
439 if (wasInputPaused == isInputPaused) break;
440 _state ^= _STATE_IN_CALLBACK;
441 if (isInputPaused) {
442 _onPause();
443 } else {
444 _onResume();
445 }
446 _state &= ~_STATE_IN_CALLBACK;
447 wasInputPaused = isInputPaused;
448 }
449 if (_hasPending && !_isPaused) {
450 _pending.schedule(this);
451 }
452 }
453 }
454
455 // -------------------------------------------------------------------
456 // Common base class for single and multi-subscription streams.
457 // -------------------------------------------------------------------
458 abstract class _StreamImpl<T> extends Stream<T> {
459 // ------------------------------------------------------------------
460 // Stream interface.
461
462 StreamSubscription<T> listen(void onData(T data),
463 { Function onError,
464 void onDone(),
465 bool cancelOnError }) {
466 cancelOnError = identical(true, cancelOnError);
467 StreamSubscription<T> subscription =
468 _createSubscription(onData, onError, onDone, cancelOnError);
469 _onListen(subscription);
470 return subscription;
471 }
472
473 // -------------------------------------------------------------------
474 /** Create a subscription object. Called by [subcribe]. */
475 StreamSubscription<T> _createSubscription(
476 void onData(T data),
477 Function onError,
478 void onDone(),
479 bool cancelOnError) {
480 return new _BufferingStreamSubscription<T>(onData, onError, onDone,
481 cancelOnError);
482 }
483
484 /** Hook called when the subscription has been created. */
485 void _onListen(StreamSubscription subscription) {}
486 }
487
488 typedef _PendingEvents<T> _EventGenerator<T>();
489
490 /** Stream that generates its own events. */
491 class _GeneratedStreamImpl<T> extends _StreamImpl<T> {
492 final _EventGenerator<T> _pending;
493 bool _isUsed = false;
494 /**
495 * Initializes the stream to have only the events provided by a
496 * [_PendingEvents].
497 *
498 * A new [_PendingEvents] must be generated for each listen.
499 */
500 _GeneratedStreamImpl(this._pending);
501
502 StreamSubscription<T> _createSubscription(
503 void onData(T data),
504 Function onError,
505 void onDone(),
506 bool cancelOnError) {
507 if (_isUsed) throw new StateError("Stream has already been listened to.");
508 _isUsed = true;
509 return new _BufferingStreamSubscription<T>(
510 onData, onError, onDone, cancelOnError).._setPendingEvents(_pending());
511 }
512 }
513
514
515 /** Pending events object that gets its events from an [Iterable]. */
516 class _IterablePendingEvents<T> extends _PendingEvents<T> {
517 // The iterator providing data for data events.
518 // Set to null when iteration has completed.
519 Iterator<T> _iterator;
520
521 _IterablePendingEvents(Iterable<T> data) : _iterator = data.iterator;
522
523 bool get isEmpty => _iterator == null;
524
525 void handleNext(_EventDispatch<T> dispatch) {
526 if (_iterator == null) {
527 throw new StateError("No events pending.");
528 }
529 // Send one event per call to moveNext.
530 // If moveNext returns true, send the current element as data.
531 // If moveNext returns false, send a done event and clear the _iterator.
532 // If moveNext throws an error, send an error and clear the _iterator.
533 // After an error, no further events will be sent.
534 bool isDone;
535 try {
536 isDone = !_iterator.moveNext();
537 } catch (e, s) {
538 _iterator = null;
539 dispatch._sendError(e, s);
540 return;
541 }
542 if (!isDone) {
543 dispatch._sendData(_iterator.current);
544 } else {
545 _iterator = null;
546 dispatch._sendDone();
547 }
548 }
549
550 void clear() {
551 if (isScheduled) cancelSchedule();
552 _iterator = null;
553 }
554 }
555
556
557 // Internal helpers.
558
559 // Types of the different handlers on a stream. Types used to type fields.
560 typedef void _DataHandler<T>(T value);
561 typedef void _DoneHandler();
562
563
564 /** Default data handler, does nothing. */
565 void _nullDataHandler(var value) {}
566
567 /** Default error handler, reports the error to the current zone's handler. */
568 void _nullErrorHandler(error, [StackTrace stackTrace]) {
569 Zone.current.handleUncaughtError(error, stackTrace);
570 }
571
572 /** Default done handler, does nothing. */
573 void _nullDoneHandler() {}
574
575
576 /** A delayed event on a buffering stream subscription. */
577 abstract class _DelayedEvent<T> {
578 /** Added as a linked list on the [StreamController]. */
579 _DelayedEvent next;
580 /** Execute the delayed event on the [StreamController]. */
581 void perform(_EventDispatch<T> dispatch);
582 }
583
584 /** A delayed data event. */
585 class _DelayedData<T> extends _DelayedEvent<T> {
586 final T value;
587 _DelayedData(this.value);
588 void perform(_EventDispatch<T> dispatch) {
589 dispatch._sendData(value);
590 }
591 }
592
593 /** A delayed error event. */
594 class _DelayedError extends _DelayedEvent {
595 final error;
596 final StackTrace stackTrace;
597
598 _DelayedError(this.error, this.stackTrace);
599 void perform(_EventDispatch dispatch) {
600 dispatch._sendError(error, stackTrace);
601 }
602 }
603
604 /** A delayed done event. */
605 class _DelayedDone implements _DelayedEvent {
606 const _DelayedDone();
607 void perform(_EventDispatch dispatch) {
608 dispatch._sendDone();
609 }
610
611 _DelayedEvent get next => null;
612
613 void set next(_DelayedEvent _) {
614 throw new StateError("No events after a done.");
615 }
616 }
617
618 /** Superclass for provider of pending events. */
619 abstract class _PendingEvents<T> {
620 // No async event has been scheduled.
621 static const int _STATE_UNSCHEDULED = 0;
622 // An async event has been scheduled to run a function.
623 static const int _STATE_SCHEDULED = 1;
624 // An async event has been scheduled, but it will do nothing when it runs.
625 // Async events can't be preempted.
626 static const int _STATE_CANCELED = 3;
627
628 /**
629 * State of being scheduled.
630 *
631 * Set to [_STATE_SCHEDULED] when pending events are scheduled for
632 * async dispatch. Since we can't cancel a [scheduleMicrotask] call, if
633 * scheduling is "canceled", the _state is simply set to [_STATE_CANCELED]
634 * which will make the async code do nothing except resetting [_state].
635 *
636 * If events are scheduled while the state is [_STATE_CANCELED], it is
637 * merely switched back to [_STATE_SCHEDULED], but no new call to
638 * [scheduleMicrotask] is performed.
639 */
640 int _state = _STATE_UNSCHEDULED;
641
642 bool get isEmpty;
643
644 bool get isScheduled => _state == _STATE_SCHEDULED;
645 bool get _eventScheduled => _state >= _STATE_SCHEDULED;
646
647 /**
648 * Schedule an event to run later.
649 *
650 * If called more than once, it should be called with the same dispatch as
651 * argument each time. It may reuse an earlier argument in some cases.
652 */
653 void schedule(_EventDispatch<T> dispatch) {
654 if (isScheduled) return;
655 assert(!isEmpty);
656 if (_eventScheduled) {
657 assert(_state == _STATE_CANCELED);
658 _state = _STATE_SCHEDULED;
659 return;
660 }
661 scheduleMicrotask(() {
662 int oldState = _state;
663 _state = _STATE_UNSCHEDULED;
664 if (oldState == _STATE_CANCELED) return;
665 handleNext(dispatch);
666 });
667 _state = _STATE_SCHEDULED;
668 }
669
670 void cancelSchedule() {
671 if (isScheduled) _state = _STATE_CANCELED;
672 }
673
674 void handleNext(_EventDispatch<T> dispatch);
675
676 /** Throw away any pending events and cancel scheduled events. */
677 void clear();
678 }
679
680
681 /** Class holding pending events for a [_StreamImpl]. */
682 class _StreamImplEvents<T> extends _PendingEvents<T> {
683 /// Single linked list of [_DelayedEvent] objects.
684 _DelayedEvent firstPendingEvent = null;
685 /// Last element in the list of pending events. New events are added after it.
686 _DelayedEvent lastPendingEvent = null;
687
688 bool get isEmpty => lastPendingEvent == null;
689
690 void add(_DelayedEvent event) {
691 if (lastPendingEvent == null) {
692 firstPendingEvent = lastPendingEvent = event;
693 } else {
694 lastPendingEvent = lastPendingEvent.next = event;
695 }
696 }
697
698 void handleNext(_EventDispatch<T> dispatch) {
699 assert(!isScheduled);
700 _DelayedEvent event = firstPendingEvent;
701 firstPendingEvent = event.next;
702 if (firstPendingEvent == null) {
703 lastPendingEvent = null;
704 }
705 event.perform(dispatch);
706 }
707
708 void clear() {
709 if (isScheduled) cancelSchedule();
710 firstPendingEvent = lastPendingEvent = null;
711 }
712 }
713
714 class _BroadcastLinkedList {
715 _BroadcastLinkedList _next;
716 _BroadcastLinkedList _previous;
717
718 void _unlink() {
719 _previous._next = _next;
720 _next._previous = _previous;
721 _next = _previous = this;
722 }
723
724 void _insertBefore(_BroadcastLinkedList newNext) {
725 _BroadcastLinkedList newPrevious = newNext._previous;
726 newPrevious._next = this;
727 newNext._previous = _previous;
728 _previous._next = newNext;
729 _previous = newPrevious;
730 }
731 }
732
733 typedef void _BroadcastCallback<T>(StreamSubscription<T> subscription);
734
735 /**
736 * Done subscription that will send one done event as soon as possible.
737 */
738 class _DoneStreamSubscription<T> implements StreamSubscription<T> {
739 static const int _DONE_SENT = 1;
740 static const int _SCHEDULED = 2;
741 static const int _PAUSED = 4;
742
743 final Zone _zone;
744 int _state = 0;
745 _DoneHandler _onDone;
746
747 _DoneStreamSubscription(this._onDone) : _zone = Zone.current {
748 _schedule();
749 }
750
751 bool get _isSent => (_state & _DONE_SENT) != 0;
752 bool get _isScheduled => (_state & _SCHEDULED) != 0;
753 bool get isPaused => _state >= _PAUSED;
754
755 void _schedule() {
756 if (_isScheduled) return;
757 _zone.scheduleMicrotask(_sendDone);
758 _state |= _SCHEDULED;
759 }
760
761 void onData(void handleData(T data)) {}
762 void onError(Function handleError) {}
763 void onDone(void handleDone()) { _onDone = handleDone; }
764
765 void pause([Future resumeSignal]) {
766 _state += _PAUSED;
767 if (resumeSignal != null) resumeSignal.whenComplete(resume);
768 }
769
770 void resume() {
771 if (isPaused) {
772 _state -= _PAUSED;
773 if (!isPaused && !_isSent) {
774 _schedule();
775 }
776 }
777 }
778
779 Future cancel() => null;
780
781 Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) {
782 _Future/*<E>*/ result = new _Future/*<E>*/();
783 _onDone = () { result._completeWithValue(null); };
784 return result;
785 }
786
787 void _sendDone() {
788 _state &= ~_SCHEDULED;
789 if (isPaused) return;
790 _state |= _DONE_SENT;
791 if (_onDone != null) _zone.runGuarded(_onDone);
792 }
793 }
794
795 class _AsBroadcastStream<T> extends Stream<T> {
796 final Stream<T> _source;
797 final _BroadcastCallback<T> _onListenHandler;
798 final _BroadcastCallback<T> _onCancelHandler;
799 final Zone _zone;
800
801 _AsBroadcastStreamController<T> _controller;
802 StreamSubscription<T> _subscription;
803
804 _AsBroadcastStream(this._source,
805 void onListenHandler(StreamSubscription<T> subscription),
806 void onCancelHandler(StreamSubscription<T> subscription))
807 // TODO(floitsch): the return type should be void and should be
808 // inferred.
809 : _onListenHandler = Zone.current.registerUnaryCallback
810 /*<dynamic, StreamSubscription<T>>*/(onListenHandler),
811 _onCancelHandler = Zone.current.registerUnaryCallback
812 /*<dynamic, StreamSubscription<T>>*/(onCancelHandler),
813 _zone = Zone.current {
814 _controller = new _AsBroadcastStreamController<T>(_onListen, _onCancel);
815 }
816
817 bool get isBroadcast => true;
818
819 StreamSubscription<T> listen(void onData(T data),
820 { Function onError,
821 void onDone(),
822 bool cancelOnError}) {
823 if (_controller == null || _controller.isClosed) {
824 // Return a dummy subscription backed by nothing, since
825 // it will only ever send one done event.
826 return new _DoneStreamSubscription<T>(onDone);
827 }
828 if (_subscription == null) {
829 _subscription = _source.listen(_controller.add,
830 onError: _controller.addError,
831 onDone: _controller.close);
832 }
833 cancelOnError = identical(true, cancelOnError);
834 return _controller._subscribe(onData, onError, onDone, cancelOnError);
835 }
836
837 void _onCancel() {
838 bool shutdown = (_controller == null) || _controller.isClosed;
839 if (_onCancelHandler != null) {
840 _zone.runUnary(
841 _onCancelHandler, new _BroadcastSubscriptionWrapper<T>(this));
842 }
843 if (shutdown) {
844 if (_subscription != null) {
845 _subscription.cancel();
846 _subscription = null;
847 }
848 }
849 }
850
851 void _onListen() {
852 if (_onListenHandler != null) {
853 _zone.runUnary(
854 _onListenHandler, new _BroadcastSubscriptionWrapper<T>(this));
855 }
856 }
857
858 // Methods called from _BroadcastSubscriptionWrapper.
859 void _cancelSubscription() {
860 if (_subscription == null) return;
861 // Called by [_controller] when it has no subscribers left.
862 StreamSubscription subscription = _subscription;
863 _subscription = null;
864 _controller = null; // Marks the stream as no longer listenable.
865 subscription.cancel();
866 }
867
868 void _pauseSubscription(Future resumeSignal) {
869 if (_subscription == null) return;
870 _subscription.pause(resumeSignal);
871 }
872
873 void _resumeSubscription() {
874 if (_subscription == null) return;
875 _subscription.resume();
876 }
877
878 bool get _isSubscriptionPaused {
879 if (_subscription == null) return false;
880 return _subscription.isPaused;
881 }
882 }
883
884 /**
885 * Wrapper for subscription that disallows changing handlers.
886 */
887 class _BroadcastSubscriptionWrapper<T> implements StreamSubscription<T> {
888 final _AsBroadcastStream _stream;
889
890 _BroadcastSubscriptionWrapper(this._stream);
891
892 void onData(void handleData(T data)) {
893 throw new UnsupportedError(
894 "Cannot change handlers of asBroadcastStream source subscription.");
895 }
896
897 void onError(Function handleError) {
898 throw new UnsupportedError(
899 "Cannot change handlers of asBroadcastStream source subscription.");
900 }
901
902 void onDone(void handleDone()) {
903 throw new UnsupportedError(
904 "Cannot change handlers of asBroadcastStream source subscription.");
905 }
906
907 void pause([Future resumeSignal]) {
908 _stream._pauseSubscription(resumeSignal);
909 }
910
911 void resume() {
912 _stream._resumeSubscription();
913 }
914
915 Future cancel() {
916 _stream._cancelSubscription();
917 return null;
918 }
919
920 bool get isPaused {
921 return _stream._isSubscriptionPaused;
922 }
923
924 Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) {
925 throw new UnsupportedError(
926 "Cannot change handlers of asBroadcastStream source subscription.");
927 }
928 }
929
930
931 /**
932 * Simple implementation of [StreamIterator].
933 */
934 class _StreamIteratorImpl<T> implements StreamIterator<T> {
935 // Internal state of the stream iterator.
936 // At any time, it is in one of these states.
937 // The interpretation of the [_futureOrPrefecth] field depends on the state.
938 // In _STATE_MOVING, the _data field holds the most recently returned
939 // future.
940 // When in one of the _STATE_EXTRA_* states, the it may hold the
941 // next data/error object, and the subscription is paused.
942
943 /// The simple state where [_data] holds the data to return, and [moveNext]
944 /// is allowed. The subscription is actively listening.
945 static const int _STATE_FOUND = 0;
946 /// State set after [moveNext] has returned false or an error,
947 /// or after calling [cancel]. The subscription is always canceled.
948 static const int _STATE_DONE = 1;
949 /// State set after calling [moveNext], but before its returned future has
950 /// completed. Calling [moveNext] again is not allowed in this state.
951 /// The subscription is actively listening.
952 static const int _STATE_MOVING = 2;
953 /// States set when another event occurs while in _STATE_FOUND.
954 /// This extra overflow event is cached until the next call to [moveNext],
955 /// which will complete as if it received the event normally.
956 /// The subscription is paused in these states, so we only ever get one
957 /// event too many.
958 static const int _STATE_EXTRA_DATA = 3;
959 static const int _STATE_EXTRA_ERROR = 4;
960 static const int _STATE_EXTRA_DONE = 5;
961
962 /// Subscription being listened to.
963 StreamSubscription _subscription;
964
965 /// The current element represented by the most recent call to moveNext.
966 ///
967 /// Is null between the time moveNext is called and its future completes.
968 T _current = null;
969
970 /// The future returned by the most recent call to [moveNext].
971 ///
972 /// Also used to store the next value/error in case the stream provides an
973 /// event before [moveNext] is called again. In that case, the stream will
974 /// be paused to prevent further events.
975 var/*Future<bool> or T*/ _futureOrPrefetch = null;
976
977 /// The current state.
978 int _state = _STATE_FOUND;
979
980 _StreamIteratorImpl(final Stream<T> stream) {
981 _subscription = stream.listen(_onData,
982 onError: _onError,
983 onDone: _onDone,
984 cancelOnError: true);
985 }
986
987 T get current => _current;
988
989 Future<bool> moveNext() {
990 if (_state == _STATE_DONE) {
991 return new _Future<bool>.immediate(false);
992 }
993 if (_state == _STATE_MOVING) {
994 throw new StateError("Already waiting for next.");
995 }
996 if (_state == _STATE_FOUND) {
997 _state = _STATE_MOVING;
998 _current = null;
999 var result = new _Future<bool>();
1000 _futureOrPrefetch = result;
1001 return result;
1002 } else {
1003 assert(_state >= _STATE_EXTRA_DATA);
1004 switch (_state) {
1005 case _STATE_EXTRA_DATA:
1006 _state = _STATE_FOUND;
1007 _current = _futureOrPrefetch as Object /*=T*/;
1008 _futureOrPrefetch = null;
1009 _subscription.resume();
1010 return new _Future<bool>.immediate(true);
1011 case _STATE_EXTRA_ERROR:
1012 AsyncError prefetch = _futureOrPrefetch;
1013 _clear();
1014 return new _Future<bool>.immediateError(prefetch.error,
1015 prefetch.stackTrace);
1016 case _STATE_EXTRA_DONE:
1017 _clear();
1018 return new _Future<bool>.immediate(false);
1019 }
1020 }
1021 }
1022
1023 /** Clears up the internal state when the iterator ends. */
1024 void _clear() {
1025 _subscription = null;
1026 _futureOrPrefetch = null;
1027 _current = null;
1028 _state = _STATE_DONE;
1029 }
1030
1031 Future cancel() {
1032 StreamSubscription subscription = _subscription;
1033 if (subscription == null) return null;
1034 if (_state == _STATE_MOVING) {
1035 _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/;
1036 _clear();
1037 hasNext._complete(false);
1038 } else {
1039 _clear();
1040 }
1041 return subscription.cancel();
1042 }
1043
1044 void _onData(T data) {
1045 if (_state == _STATE_MOVING) {
1046 _current = data;
1047 _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/;
1048 _futureOrPrefetch = null;
1049 _state = _STATE_FOUND;
1050 hasNext._complete(true);
1051 return;
1052 }
1053 _subscription.pause();
1054 assert(_futureOrPrefetch == null);
1055 _futureOrPrefetch = data;
1056 _state = _STATE_EXTRA_DATA;
1057 }
1058
1059 void _onError(Object error, [StackTrace stackTrace]) {
1060 if (_state == _STATE_MOVING) {
1061 _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/;
1062 // We have cancelOnError: true, so the subscription is canceled.
1063 _clear();
1064 hasNext._completeError(error, stackTrace);
1065 return;
1066 }
1067 _subscription.pause();
1068 assert(_futureOrPrefetch == null);
1069 _futureOrPrefetch = new AsyncError(error, stackTrace);
1070 _state = _STATE_EXTRA_ERROR;
1071 }
1072
1073 void _onDone() {
1074 if (_state == _STATE_MOVING) {
1075 _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/;
1076 _clear();
1077 hasNext._complete(false);
1078 return;
1079 }
1080 _subscription.pause();
1081 _futureOrPrefetch = null;
1082 _state = _STATE_EXTRA_DONE;
1083 }
1084 }
1085
1086 /** An empty broadcast stream, sending a done event as soon as possible. */
1087 class _EmptyStream<T> extends Stream<T> {
1088 const _EmptyStream() : super._internal();
1089 bool get isBroadcast => true;
1090 StreamSubscription<T> listen(void onData(T data),
1091 {Function onError,
1092 void onDone(),
1093 bool cancelOnError}) {
1094 return new _DoneStreamSubscription<T>(onDone);
1095 }
1096 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698