| Index: pkg/dev_compiler/tool/input_sdk/lib/async/stream_impl.dart
|
| diff --git a/pkg/dev_compiler/tool/input_sdk/lib/async/stream_impl.dart b/pkg/dev_compiler/tool/input_sdk/lib/async/stream_impl.dart
|
| deleted file mode 100644
|
| index 7099973bebfb77a974bb4c53f6bb6ae8b00b2e05..0000000000000000000000000000000000000000
|
| --- a/pkg/dev_compiler/tool/input_sdk/lib/async/stream_impl.dart
|
| +++ /dev/null
|
| @@ -1,1096 +0,0 @@
|
| -// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
|
| -// for details. All rights reserved. Use of this source code is governed by a
|
| -// BSD-style license that can be found in the LICENSE file.
|
| -
|
| -part of dart.async;
|
| -
|
| -/** Abstract and private interface for a place to put events. */
|
| -abstract class _EventSink<T> {
|
| - void _add(T data);
|
| - void _addError(Object error, StackTrace stackTrace);
|
| - void _close();
|
| -}
|
| -
|
| -/**
|
| - * Abstract and private interface for a place to send events.
|
| - *
|
| - * Used by event buffering to finally dispatch the pending event, where
|
| - * [_EventSink] is where the event first enters the stream subscription,
|
| - * and may yet be buffered.
|
| - */
|
| -abstract class _EventDispatch<T> {
|
| - void _sendData(T data);
|
| - void _sendError(Object error, StackTrace stackTrace);
|
| - void _sendDone();
|
| -}
|
| -
|
| -/**
|
| - * Default implementation of stream subscription of buffering events.
|
| - *
|
| - * The only public methods are those of [StreamSubscription], so instances of
|
| - * [_BufferingStreamSubscription] can be returned directly as a
|
| - * [StreamSubscription] without exposing internal functionality.
|
| - *
|
| - * The [StreamController] is a public facing version of [Stream] and this class,
|
| - * with some methods made public.
|
| - *
|
| - * The user interface of [_BufferingStreamSubscription] are the following
|
| - * methods:
|
| - *
|
| - * * [_add]: Add a data event to the stream.
|
| - * * [_addError]: Add an error event to the stream.
|
| - * * [_close]: Request to close the stream.
|
| - * * [_onCancel]: Called when the subscription will provide no more events,
|
| - * either due to being actively canceled, or after sending a done event.
|
| - * * [_onPause]: Called when the subscription wants the event source to pause.
|
| - * * [_onResume]: Called when allowing new events after a pause.
|
| - *
|
| - * The user should not add new events when the subscription requests a paused,
|
| - * but if it happens anyway, the subscription will enqueue the events just as
|
| - * when new events arrive while still firing an old event.
|
| - */
|
| -class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
|
| - _EventSink<T>,
|
| - _EventDispatch<T> {
|
| - /** The `cancelOnError` flag from the `listen` call. */
|
| - static const int _STATE_CANCEL_ON_ERROR = 1;
|
| - /**
|
| - * Whether the "done" event has been received.
|
| - * No further events are accepted after this.
|
| - */
|
| - static const int _STATE_CLOSED = 2;
|
| - /**
|
| - * Set if the input has been asked not to send events.
|
| - *
|
| - * This is not the same as being paused, since the input will remain paused
|
| - * after a call to [resume] if there are pending events.
|
| - */
|
| - static const int _STATE_INPUT_PAUSED = 4;
|
| - /**
|
| - * Whether the subscription has been canceled.
|
| - *
|
| - * Set by calling [cancel], or by handling a "done" event, or an "error" event
|
| - * when `cancelOnError` is true.
|
| - */
|
| - static const int _STATE_CANCELED = 8;
|
| - /**
|
| - * Set when either:
|
| - *
|
| - * * an error is sent, and [cancelOnError] is true, or
|
| - * * a done event is sent.
|
| - *
|
| - * If the subscription is canceled while _STATE_WAIT_FOR_CANCEL is set, the
|
| - * state is unset, and no furher events must be delivered.
|
| - */
|
| - static const int _STATE_WAIT_FOR_CANCEL = 16;
|
| - static const int _STATE_IN_CALLBACK = 32;
|
| - static const int _STATE_HAS_PENDING = 64;
|
| - static const int _STATE_PAUSE_COUNT = 128;
|
| - static const int _STATE_PAUSE_COUNT_SHIFT = 7;
|
| -
|
| - /* Event handlers provided in constructor. */
|
| - _DataHandler<T> _onData;
|
| - Function _onError;
|
| - _DoneHandler _onDone;
|
| - final Zone _zone = Zone.current;
|
| -
|
| - /** Bit vector based on state-constants above. */
|
| - int _state;
|
| -
|
| - // TODO(floitsch): reuse another field
|
| - /** The future [_onCancel] may return. */
|
| - Future _cancelFuture;
|
| -
|
| - /**
|
| - * Queue of pending events.
|
| - *
|
| - * Is created when necessary, or set in constructor for preconfigured events.
|
| - */
|
| - _PendingEvents<T> _pending;
|
| -
|
| - _BufferingStreamSubscription(void onData(T data),
|
| - Function onError,
|
| - void onDone(),
|
| - bool cancelOnError)
|
| - : _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0) {
|
| - this.onData(onData);
|
| - this.onError(onError);
|
| - this.onDone(onDone);
|
| - }
|
| -
|
| - /**
|
| - * Sets the subscription's pending events object.
|
| - *
|
| - * This can only be done once. The pending events object is used for the
|
| - * rest of the subscription's life cycle.
|
| - */
|
| - void _setPendingEvents(_PendingEvents<T> pendingEvents) {
|
| - assert(_pending == null);
|
| - if (pendingEvents == null) return;
|
| - _pending = pendingEvents;
|
| - if (!pendingEvents.isEmpty) {
|
| - _state |= _STATE_HAS_PENDING;
|
| - _pending.schedule(this);
|
| - }
|
| - }
|
| -
|
| - // StreamSubscription interface.
|
| -
|
| - void onData(void handleData(T event)) {
|
| - if (handleData == null) handleData = _nullDataHandler;
|
| - // TODO(floitsch): the return type should be 'void', and the type
|
| - // should be inferred.
|
| - _onData = _zone.registerUnaryCallback/*<dynamic, T>*/(handleData);
|
| - }
|
| -
|
| - void onError(Function handleError) {
|
| - if (handleError == null) handleError = _nullErrorHandler;
|
| - _onError = _registerErrorHandler/*<T>*/(handleError, _zone);
|
| - }
|
| -
|
| - void onDone(void handleDone()) {
|
| - if (handleDone == null) handleDone = _nullDoneHandler;
|
| - _onDone = _zone.registerCallback(handleDone);
|
| - }
|
| -
|
| - void pause([Future resumeSignal]) {
|
| - if (_isCanceled) return;
|
| - bool wasPaused = _isPaused;
|
| - bool wasInputPaused = _isInputPaused;
|
| - // Increment pause count and mark input paused (if it isn't already).
|
| - _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED;
|
| - if (resumeSignal != null) resumeSignal.whenComplete(resume);
|
| - if (!wasPaused && _pending != null) _pending.cancelSchedule();
|
| - if (!wasInputPaused && !_inCallback) _guardCallback(_onPause);
|
| - }
|
| -
|
| - void resume() {
|
| - if (_isCanceled) return;
|
| - if (_isPaused) {
|
| - _decrementPauseCount();
|
| - if (!_isPaused) {
|
| - if (_hasPending && !_pending.isEmpty) {
|
| - // Input is still paused.
|
| - _pending.schedule(this);
|
| - } else {
|
| - assert(_mayResumeInput);
|
| - _state &= ~_STATE_INPUT_PAUSED;
|
| - if (!_inCallback) _guardCallback(_onResume);
|
| - }
|
| - }
|
| - }
|
| - }
|
| -
|
| - Future cancel() {
|
| - // The user doesn't want to receive any further events. If there is an
|
| - // error or done event pending (waiting for the cancel to be done) discard
|
| - // that event.
|
| - _state &= ~_STATE_WAIT_FOR_CANCEL;
|
| - if (_isCanceled) return _cancelFuture;
|
| - _cancel();
|
| - return _cancelFuture;
|
| - }
|
| -
|
| - Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) {
|
| - _Future/*<E>*/ result = new _Future/*<E>*/();
|
| -
|
| - // Overwrite the onDone and onError handlers.
|
| - _onDone = () { result._complete(futureValue); };
|
| - _onError = (error, stackTrace) {
|
| - cancel();
|
| - result._completeError(error, stackTrace);
|
| - };
|
| -
|
| - return result;
|
| - }
|
| -
|
| - // State management.
|
| -
|
| - bool get _isInputPaused => (_state & _STATE_INPUT_PAUSED) != 0;
|
| - bool get _isClosed => (_state & _STATE_CLOSED) != 0;
|
| - bool get _isCanceled => (_state & _STATE_CANCELED) != 0;
|
| - bool get _waitsForCancel => (_state & _STATE_WAIT_FOR_CANCEL) != 0;
|
| - bool get _inCallback => (_state & _STATE_IN_CALLBACK) != 0;
|
| - bool get _hasPending => (_state & _STATE_HAS_PENDING) != 0;
|
| - bool get _isPaused => _state >= _STATE_PAUSE_COUNT;
|
| - bool get _canFire => _state < _STATE_IN_CALLBACK;
|
| - bool get _mayResumeInput =>
|
| - !_isPaused && (_pending == null || _pending.isEmpty);
|
| - bool get _cancelOnError => (_state & _STATE_CANCEL_ON_ERROR) != 0;
|
| -
|
| - bool get isPaused => _isPaused;
|
| -
|
| - void _cancel() {
|
| - _state |= _STATE_CANCELED;
|
| - if (_hasPending) {
|
| - _pending.cancelSchedule();
|
| - }
|
| - if (!_inCallback) _pending = null;
|
| - _cancelFuture = _onCancel();
|
| - }
|
| -
|
| - /**
|
| - * Increment the pause count.
|
| - *
|
| - * Also marks input as paused.
|
| - */
|
| - void _incrementPauseCount() {
|
| - _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED;
|
| - }
|
| -
|
| - /**
|
| - * Decrements the pause count.
|
| - *
|
| - * Does not automatically unpause the input (call [_onResume]) when
|
| - * the pause count reaches zero. This is handled elsewhere, and only
|
| - * if there are no pending events buffered.
|
| - */
|
| - void _decrementPauseCount() {
|
| - assert(_isPaused);
|
| - _state -= _STATE_PAUSE_COUNT;
|
| - }
|
| -
|
| - // _EventSink interface.
|
| -
|
| - void _add(T data) {
|
| - assert(!_isClosed);
|
| - if (_isCanceled) return;
|
| - if (_canFire) {
|
| - _sendData(data);
|
| - } else {
|
| - _addPending(new _DelayedData<dynamic /*=T*/>(data));
|
| - }
|
| - }
|
| -
|
| - void _addError(Object error, StackTrace stackTrace) {
|
| - if (_isCanceled) return;
|
| - if (_canFire) {
|
| - _sendError(error, stackTrace); // Reports cancel after sending.
|
| - } else {
|
| - _addPending(new _DelayedError(error, stackTrace));
|
| - }
|
| - }
|
| -
|
| - void _close() {
|
| - assert(!_isClosed);
|
| - if (_isCanceled) return;
|
| - _state |= _STATE_CLOSED;
|
| - if (_canFire) {
|
| - _sendDone();
|
| - } else {
|
| - _addPending(const _DelayedDone());
|
| - }
|
| - }
|
| -
|
| - // Hooks called when the input is paused, unpaused or canceled.
|
| - // These must not throw. If overwritten to call user code, include suitable
|
| - // try/catch wrapping and send any errors to
|
| - // [_Zone.current.handleUncaughtError].
|
| - void _onPause() {
|
| - assert(_isInputPaused);
|
| - }
|
| -
|
| - void _onResume() {
|
| - assert(!_isInputPaused);
|
| - }
|
| -
|
| - Future _onCancel() {
|
| - assert(_isCanceled);
|
| - return null;
|
| - }
|
| -
|
| - // Handle pending events.
|
| -
|
| - /**
|
| - * Add a pending event.
|
| - *
|
| - * If the subscription is not paused, this also schedules a firing
|
| - * of pending events later (if necessary).
|
| - */
|
| - void _addPending(_DelayedEvent event) {
|
| - _StreamImplEvents<T> pending = _pending;
|
| - if (_pending == null) {
|
| - pending = _pending = new _StreamImplEvents<dynamic /*=T*/>();
|
| - }
|
| - pending.add(event);
|
| - if (!_hasPending) {
|
| - _state |= _STATE_HAS_PENDING;
|
| - if (!_isPaused) {
|
| - _pending.schedule(this);
|
| - }
|
| - }
|
| - }
|
| -
|
| - /* _EventDispatch interface. */
|
| -
|
| - void _sendData(T data) {
|
| - assert(!_isCanceled);
|
| - assert(!_isPaused);
|
| - assert(!_inCallback);
|
| - bool wasInputPaused = _isInputPaused;
|
| - _state |= _STATE_IN_CALLBACK;
|
| - _zone.runUnaryGuarded(_onData, data);
|
| - _state &= ~_STATE_IN_CALLBACK;
|
| - _checkState(wasInputPaused);
|
| - }
|
| -
|
| - void _sendError(var error, StackTrace stackTrace) {
|
| - assert(!_isCanceled);
|
| - assert(!_isPaused);
|
| - assert(!_inCallback);
|
| - bool wasInputPaused = _isInputPaused;
|
| -
|
| - void sendError() {
|
| - // If the subscription has been canceled while waiting for the cancel
|
| - // future to finish we must not report the error.
|
| - if (_isCanceled && !_waitsForCancel) return;
|
| - _state |= _STATE_IN_CALLBACK;
|
| - if (_onError is ZoneBinaryCallback<dynamic, Object, StackTrace>) {
|
| - ZoneBinaryCallback<dynamic, Object, StackTrace> errorCallback = _onError
|
| - as Object /*=ZoneBinaryCallback<dynamic, Object, StackTrace>*/;
|
| - _zone.runBinaryGuarded(errorCallback, error, stackTrace);
|
| - } else {
|
| - _zone.runUnaryGuarded/*<dynamic, dynamic>*/(
|
| - _onError as Object /*=ZoneUnaryCallback<dynamic, dynamic>*/, error);
|
| - }
|
| - _state &= ~_STATE_IN_CALLBACK;
|
| - }
|
| -
|
| - if (_cancelOnError) {
|
| - _state |= _STATE_WAIT_FOR_CANCEL;
|
| - _cancel();
|
| - if (_cancelFuture is Future) {
|
| - _cancelFuture.whenComplete(sendError);
|
| - } else {
|
| - sendError();
|
| - }
|
| - } else {
|
| - sendError();
|
| - // Only check state if not cancelOnError.
|
| - _checkState(wasInputPaused);
|
| - }
|
| - }
|
| -
|
| - void _sendDone() {
|
| - assert(!_isCanceled);
|
| - assert(!_isPaused);
|
| - assert(!_inCallback);
|
| -
|
| - void sendDone() {
|
| - // If the subscription has been canceled while waiting for the cancel
|
| - // future to finish we must not report the done event.
|
| - if (!_waitsForCancel) return;
|
| - _state |= (_STATE_CANCELED | _STATE_CLOSED | _STATE_IN_CALLBACK);
|
| - _zone.runGuarded(_onDone);
|
| - _state &= ~_STATE_IN_CALLBACK;
|
| - }
|
| -
|
| - _cancel();
|
| - _state |= _STATE_WAIT_FOR_CANCEL;
|
| - if (_cancelFuture is Future) {
|
| - _cancelFuture.whenComplete(sendDone);
|
| - } else {
|
| - sendDone();
|
| - }
|
| - }
|
| -
|
| - /**
|
| - * Call a hook function.
|
| - *
|
| - * The call is properly wrapped in code to avoid other callbacks
|
| - * during the call, and it checks for state changes after the call
|
| - * that should cause further callbacks.
|
| - */
|
| - void _guardCallback(void callback()) {
|
| - assert(!_inCallback);
|
| - bool wasInputPaused = _isInputPaused;
|
| - _state |= _STATE_IN_CALLBACK;
|
| - callback();
|
| - _state &= ~_STATE_IN_CALLBACK;
|
| - _checkState(wasInputPaused);
|
| - }
|
| -
|
| - /**
|
| - * Check if the input needs to be informed of state changes.
|
| - *
|
| - * State changes are pausing, resuming and canceling.
|
| - *
|
| - * After canceling, no further callbacks will happen.
|
| - *
|
| - * The cancel callback is called after a user cancel, or after
|
| - * the final done event is sent.
|
| - */
|
| - void _checkState(bool wasInputPaused) {
|
| - assert(!_inCallback);
|
| - if (_hasPending && _pending.isEmpty) {
|
| - _state &= ~_STATE_HAS_PENDING;
|
| - if (_isInputPaused && _mayResumeInput) {
|
| - _state &= ~_STATE_INPUT_PAUSED;
|
| - }
|
| - }
|
| - // If the state changes during a callback, we immediately
|
| - // make a new state-change callback. Loop until the state didn't change.
|
| - while (true) {
|
| - if (_isCanceled) {
|
| - _pending = null;
|
| - return;
|
| - }
|
| - bool isInputPaused = _isInputPaused;
|
| - if (wasInputPaused == isInputPaused) break;
|
| - _state ^= _STATE_IN_CALLBACK;
|
| - if (isInputPaused) {
|
| - _onPause();
|
| - } else {
|
| - _onResume();
|
| - }
|
| - _state &= ~_STATE_IN_CALLBACK;
|
| - wasInputPaused = isInputPaused;
|
| - }
|
| - if (_hasPending && !_isPaused) {
|
| - _pending.schedule(this);
|
| - }
|
| - }
|
| -}
|
| -
|
| -// -------------------------------------------------------------------
|
| -// Common base class for single and multi-subscription streams.
|
| -// -------------------------------------------------------------------
|
| -abstract class _StreamImpl<T> extends Stream<T> {
|
| - // ------------------------------------------------------------------
|
| - // Stream interface.
|
| -
|
| - StreamSubscription<T> listen(void onData(T data),
|
| - { Function onError,
|
| - void onDone(),
|
| - bool cancelOnError }) {
|
| - cancelOnError = identical(true, cancelOnError);
|
| - StreamSubscription<T> subscription =
|
| - _createSubscription(onData, onError, onDone, cancelOnError);
|
| - _onListen(subscription);
|
| - return subscription;
|
| - }
|
| -
|
| - // -------------------------------------------------------------------
|
| - /** Create a subscription object. Called by [subcribe]. */
|
| - StreamSubscription<T> _createSubscription(
|
| - void onData(T data),
|
| - Function onError,
|
| - void onDone(),
|
| - bool cancelOnError) {
|
| - return new _BufferingStreamSubscription<T>(onData, onError, onDone,
|
| - cancelOnError);
|
| - }
|
| -
|
| - /** Hook called when the subscription has been created. */
|
| - void _onListen(StreamSubscription subscription) {}
|
| -}
|
| -
|
| -typedef _PendingEvents<T> _EventGenerator<T>();
|
| -
|
| -/** Stream that generates its own events. */
|
| -class _GeneratedStreamImpl<T> extends _StreamImpl<T> {
|
| - final _EventGenerator<T> _pending;
|
| - bool _isUsed = false;
|
| - /**
|
| - * Initializes the stream to have only the events provided by a
|
| - * [_PendingEvents].
|
| - *
|
| - * A new [_PendingEvents] must be generated for each listen.
|
| - */
|
| - _GeneratedStreamImpl(this._pending);
|
| -
|
| - StreamSubscription<T> _createSubscription(
|
| - void onData(T data),
|
| - Function onError,
|
| - void onDone(),
|
| - bool cancelOnError) {
|
| - if (_isUsed) throw new StateError("Stream has already been listened to.");
|
| - _isUsed = true;
|
| - return new _BufferingStreamSubscription<T>(
|
| - onData, onError, onDone, cancelOnError).._setPendingEvents(_pending());
|
| - }
|
| -}
|
| -
|
| -
|
| -/** Pending events object that gets its events from an [Iterable]. */
|
| -class _IterablePendingEvents<T> extends _PendingEvents<T> {
|
| - // The iterator providing data for data events.
|
| - // Set to null when iteration has completed.
|
| - Iterator<T> _iterator;
|
| -
|
| - _IterablePendingEvents(Iterable<T> data) : _iterator = data.iterator;
|
| -
|
| - bool get isEmpty => _iterator == null;
|
| -
|
| - void handleNext(_EventDispatch<T> dispatch) {
|
| - if (_iterator == null) {
|
| - throw new StateError("No events pending.");
|
| - }
|
| - // Send one event per call to moveNext.
|
| - // If moveNext returns true, send the current element as data.
|
| - // If moveNext returns false, send a done event and clear the _iterator.
|
| - // If moveNext throws an error, send an error and clear the _iterator.
|
| - // After an error, no further events will be sent.
|
| - bool isDone;
|
| - try {
|
| - isDone = !_iterator.moveNext();
|
| - } catch (e, s) {
|
| - _iterator = null;
|
| - dispatch._sendError(e, s);
|
| - return;
|
| - }
|
| - if (!isDone) {
|
| - dispatch._sendData(_iterator.current);
|
| - } else {
|
| - _iterator = null;
|
| - dispatch._sendDone();
|
| - }
|
| - }
|
| -
|
| - void clear() {
|
| - if (isScheduled) cancelSchedule();
|
| - _iterator = null;
|
| - }
|
| -}
|
| -
|
| -
|
| -// Internal helpers.
|
| -
|
| -// Types of the different handlers on a stream. Types used to type fields.
|
| -typedef void _DataHandler<T>(T value);
|
| -typedef void _DoneHandler();
|
| -
|
| -
|
| -/** Default data handler, does nothing. */
|
| -void _nullDataHandler(var value) {}
|
| -
|
| -/** Default error handler, reports the error to the current zone's handler. */
|
| -void _nullErrorHandler(error, [StackTrace stackTrace]) {
|
| - Zone.current.handleUncaughtError(error, stackTrace);
|
| -}
|
| -
|
| -/** Default done handler, does nothing. */
|
| -void _nullDoneHandler() {}
|
| -
|
| -
|
| -/** A delayed event on a buffering stream subscription. */
|
| -abstract class _DelayedEvent<T> {
|
| - /** Added as a linked list on the [StreamController]. */
|
| - _DelayedEvent next;
|
| - /** Execute the delayed event on the [StreamController]. */
|
| - void perform(_EventDispatch<T> dispatch);
|
| -}
|
| -
|
| -/** A delayed data event. */
|
| -class _DelayedData<T> extends _DelayedEvent<T> {
|
| - final T value;
|
| - _DelayedData(this.value);
|
| - void perform(_EventDispatch<T> dispatch) {
|
| - dispatch._sendData(value);
|
| - }
|
| -}
|
| -
|
| -/** A delayed error event. */
|
| -class _DelayedError extends _DelayedEvent {
|
| - final error;
|
| - final StackTrace stackTrace;
|
| -
|
| - _DelayedError(this.error, this.stackTrace);
|
| - void perform(_EventDispatch dispatch) {
|
| - dispatch._sendError(error, stackTrace);
|
| - }
|
| -}
|
| -
|
| -/** A delayed done event. */
|
| -class _DelayedDone implements _DelayedEvent {
|
| - const _DelayedDone();
|
| - void perform(_EventDispatch dispatch) {
|
| - dispatch._sendDone();
|
| - }
|
| -
|
| - _DelayedEvent get next => null;
|
| -
|
| - void set next(_DelayedEvent _) {
|
| - throw new StateError("No events after a done.");
|
| - }
|
| -}
|
| -
|
| -/** Superclass for provider of pending events. */
|
| -abstract class _PendingEvents<T> {
|
| - // No async event has been scheduled.
|
| - static const int _STATE_UNSCHEDULED = 0;
|
| - // An async event has been scheduled to run a function.
|
| - static const int _STATE_SCHEDULED = 1;
|
| - // An async event has been scheduled, but it will do nothing when it runs.
|
| - // Async events can't be preempted.
|
| - static const int _STATE_CANCELED = 3;
|
| -
|
| - /**
|
| - * State of being scheduled.
|
| - *
|
| - * Set to [_STATE_SCHEDULED] when pending events are scheduled for
|
| - * async dispatch. Since we can't cancel a [scheduleMicrotask] call, if
|
| - * scheduling is "canceled", the _state is simply set to [_STATE_CANCELED]
|
| - * which will make the async code do nothing except resetting [_state].
|
| - *
|
| - * If events are scheduled while the state is [_STATE_CANCELED], it is
|
| - * merely switched back to [_STATE_SCHEDULED], but no new call to
|
| - * [scheduleMicrotask] is performed.
|
| - */
|
| - int _state = _STATE_UNSCHEDULED;
|
| -
|
| - bool get isEmpty;
|
| -
|
| - bool get isScheduled => _state == _STATE_SCHEDULED;
|
| - bool get _eventScheduled => _state >= _STATE_SCHEDULED;
|
| -
|
| - /**
|
| - * Schedule an event to run later.
|
| - *
|
| - * If called more than once, it should be called with the same dispatch as
|
| - * argument each time. It may reuse an earlier argument in some cases.
|
| - */
|
| - void schedule(_EventDispatch<T> dispatch) {
|
| - if (isScheduled) return;
|
| - assert(!isEmpty);
|
| - if (_eventScheduled) {
|
| - assert(_state == _STATE_CANCELED);
|
| - _state = _STATE_SCHEDULED;
|
| - return;
|
| - }
|
| - scheduleMicrotask(() {
|
| - int oldState = _state;
|
| - _state = _STATE_UNSCHEDULED;
|
| - if (oldState == _STATE_CANCELED) return;
|
| - handleNext(dispatch);
|
| - });
|
| - _state = _STATE_SCHEDULED;
|
| - }
|
| -
|
| - void cancelSchedule() {
|
| - if (isScheduled) _state = _STATE_CANCELED;
|
| - }
|
| -
|
| - void handleNext(_EventDispatch<T> dispatch);
|
| -
|
| - /** Throw away any pending events and cancel scheduled events. */
|
| - void clear();
|
| -}
|
| -
|
| -
|
| -/** Class holding pending events for a [_StreamImpl]. */
|
| -class _StreamImplEvents<T> extends _PendingEvents<T> {
|
| - /// Single linked list of [_DelayedEvent] objects.
|
| - _DelayedEvent firstPendingEvent = null;
|
| - /// Last element in the list of pending events. New events are added after it.
|
| - _DelayedEvent lastPendingEvent = null;
|
| -
|
| - bool get isEmpty => lastPendingEvent == null;
|
| -
|
| - void add(_DelayedEvent event) {
|
| - if (lastPendingEvent == null) {
|
| - firstPendingEvent = lastPendingEvent = event;
|
| - } else {
|
| - lastPendingEvent = lastPendingEvent.next = event;
|
| - }
|
| - }
|
| -
|
| - void handleNext(_EventDispatch<T> dispatch) {
|
| - assert(!isScheduled);
|
| - _DelayedEvent event = firstPendingEvent;
|
| - firstPendingEvent = event.next;
|
| - if (firstPendingEvent == null) {
|
| - lastPendingEvent = null;
|
| - }
|
| - event.perform(dispatch);
|
| - }
|
| -
|
| - void clear() {
|
| - if (isScheduled) cancelSchedule();
|
| - firstPendingEvent = lastPendingEvent = null;
|
| - }
|
| -}
|
| -
|
| -class _BroadcastLinkedList {
|
| - _BroadcastLinkedList _next;
|
| - _BroadcastLinkedList _previous;
|
| -
|
| - void _unlink() {
|
| - _previous._next = _next;
|
| - _next._previous = _previous;
|
| - _next = _previous = this;
|
| - }
|
| -
|
| - void _insertBefore(_BroadcastLinkedList newNext) {
|
| - _BroadcastLinkedList newPrevious = newNext._previous;
|
| - newPrevious._next = this;
|
| - newNext._previous = _previous;
|
| - _previous._next = newNext;
|
| - _previous = newPrevious;
|
| - }
|
| -}
|
| -
|
| -typedef void _BroadcastCallback<T>(StreamSubscription<T> subscription);
|
| -
|
| -/**
|
| - * Done subscription that will send one done event as soon as possible.
|
| - */
|
| -class _DoneStreamSubscription<T> implements StreamSubscription<T> {
|
| - static const int _DONE_SENT = 1;
|
| - static const int _SCHEDULED = 2;
|
| - static const int _PAUSED = 4;
|
| -
|
| - final Zone _zone;
|
| - int _state = 0;
|
| - _DoneHandler _onDone;
|
| -
|
| - _DoneStreamSubscription(this._onDone) : _zone = Zone.current {
|
| - _schedule();
|
| - }
|
| -
|
| - bool get _isSent => (_state & _DONE_SENT) != 0;
|
| - bool get _isScheduled => (_state & _SCHEDULED) != 0;
|
| - bool get isPaused => _state >= _PAUSED;
|
| -
|
| - void _schedule() {
|
| - if (_isScheduled) return;
|
| - _zone.scheduleMicrotask(_sendDone);
|
| - _state |= _SCHEDULED;
|
| - }
|
| -
|
| - void onData(void handleData(T data)) {}
|
| - void onError(Function handleError) {}
|
| - void onDone(void handleDone()) { _onDone = handleDone; }
|
| -
|
| - void pause([Future resumeSignal]) {
|
| - _state += _PAUSED;
|
| - if (resumeSignal != null) resumeSignal.whenComplete(resume);
|
| - }
|
| -
|
| - void resume() {
|
| - if (isPaused) {
|
| - _state -= _PAUSED;
|
| - if (!isPaused && !_isSent) {
|
| - _schedule();
|
| - }
|
| - }
|
| - }
|
| -
|
| - Future cancel() => null;
|
| -
|
| - Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) {
|
| - _Future/*<E>*/ result = new _Future/*<E>*/();
|
| - _onDone = () { result._completeWithValue(null); };
|
| - return result;
|
| - }
|
| -
|
| - void _sendDone() {
|
| - _state &= ~_SCHEDULED;
|
| - if (isPaused) return;
|
| - _state |= _DONE_SENT;
|
| - if (_onDone != null) _zone.runGuarded(_onDone);
|
| - }
|
| -}
|
| -
|
| -class _AsBroadcastStream<T> extends Stream<T> {
|
| - final Stream<T> _source;
|
| - final _BroadcastCallback<T> _onListenHandler;
|
| - final _BroadcastCallback<T> _onCancelHandler;
|
| - final Zone _zone;
|
| -
|
| - _AsBroadcastStreamController<T> _controller;
|
| - StreamSubscription<T> _subscription;
|
| -
|
| - _AsBroadcastStream(this._source,
|
| - void onListenHandler(StreamSubscription<T> subscription),
|
| - void onCancelHandler(StreamSubscription<T> subscription))
|
| - // TODO(floitsch): the return type should be void and should be
|
| - // inferred.
|
| - : _onListenHandler = Zone.current.registerUnaryCallback
|
| - /*<dynamic, StreamSubscription<T>>*/(onListenHandler),
|
| - _onCancelHandler = Zone.current.registerUnaryCallback
|
| - /*<dynamic, StreamSubscription<T>>*/(onCancelHandler),
|
| - _zone = Zone.current {
|
| - _controller = new _AsBroadcastStreamController<T>(_onListen, _onCancel);
|
| - }
|
| -
|
| - bool get isBroadcast => true;
|
| -
|
| - StreamSubscription<T> listen(void onData(T data),
|
| - { Function onError,
|
| - void onDone(),
|
| - bool cancelOnError}) {
|
| - if (_controller == null || _controller.isClosed) {
|
| - // Return a dummy subscription backed by nothing, since
|
| - // it will only ever send one done event.
|
| - return new _DoneStreamSubscription<T>(onDone);
|
| - }
|
| - if (_subscription == null) {
|
| - _subscription = _source.listen(_controller.add,
|
| - onError: _controller.addError,
|
| - onDone: _controller.close);
|
| - }
|
| - cancelOnError = identical(true, cancelOnError);
|
| - return _controller._subscribe(onData, onError, onDone, cancelOnError);
|
| - }
|
| -
|
| - void _onCancel() {
|
| - bool shutdown = (_controller == null) || _controller.isClosed;
|
| - if (_onCancelHandler != null) {
|
| - _zone.runUnary(
|
| - _onCancelHandler, new _BroadcastSubscriptionWrapper<T>(this));
|
| - }
|
| - if (shutdown) {
|
| - if (_subscription != null) {
|
| - _subscription.cancel();
|
| - _subscription = null;
|
| - }
|
| - }
|
| - }
|
| -
|
| - void _onListen() {
|
| - if (_onListenHandler != null) {
|
| - _zone.runUnary(
|
| - _onListenHandler, new _BroadcastSubscriptionWrapper<T>(this));
|
| - }
|
| - }
|
| -
|
| - // Methods called from _BroadcastSubscriptionWrapper.
|
| - void _cancelSubscription() {
|
| - if (_subscription == null) return;
|
| - // Called by [_controller] when it has no subscribers left.
|
| - StreamSubscription subscription = _subscription;
|
| - _subscription = null;
|
| - _controller = null; // Marks the stream as no longer listenable.
|
| - subscription.cancel();
|
| - }
|
| -
|
| - void _pauseSubscription(Future resumeSignal) {
|
| - if (_subscription == null) return;
|
| - _subscription.pause(resumeSignal);
|
| - }
|
| -
|
| - void _resumeSubscription() {
|
| - if (_subscription == null) return;
|
| - _subscription.resume();
|
| - }
|
| -
|
| - bool get _isSubscriptionPaused {
|
| - if (_subscription == null) return false;
|
| - return _subscription.isPaused;
|
| - }
|
| -}
|
| -
|
| -/**
|
| - * Wrapper for subscription that disallows changing handlers.
|
| - */
|
| -class _BroadcastSubscriptionWrapper<T> implements StreamSubscription<T> {
|
| - final _AsBroadcastStream _stream;
|
| -
|
| - _BroadcastSubscriptionWrapper(this._stream);
|
| -
|
| - void onData(void handleData(T data)) {
|
| - throw new UnsupportedError(
|
| - "Cannot change handlers of asBroadcastStream source subscription.");
|
| - }
|
| -
|
| - void onError(Function handleError) {
|
| - throw new UnsupportedError(
|
| - "Cannot change handlers of asBroadcastStream source subscription.");
|
| - }
|
| -
|
| - void onDone(void handleDone()) {
|
| - throw new UnsupportedError(
|
| - "Cannot change handlers of asBroadcastStream source subscription.");
|
| - }
|
| -
|
| - void pause([Future resumeSignal]) {
|
| - _stream._pauseSubscription(resumeSignal);
|
| - }
|
| -
|
| - void resume() {
|
| - _stream._resumeSubscription();
|
| - }
|
| -
|
| - Future cancel() {
|
| - _stream._cancelSubscription();
|
| - return null;
|
| - }
|
| -
|
| - bool get isPaused {
|
| - return _stream._isSubscriptionPaused;
|
| - }
|
| -
|
| - Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) {
|
| - throw new UnsupportedError(
|
| - "Cannot change handlers of asBroadcastStream source subscription.");
|
| - }
|
| -}
|
| -
|
| -
|
| -/**
|
| - * Simple implementation of [StreamIterator].
|
| - */
|
| -class _StreamIteratorImpl<T> implements StreamIterator<T> {
|
| - // Internal state of the stream iterator.
|
| - // At any time, it is in one of these states.
|
| - // The interpretation of the [_futureOrPrefecth] field depends on the state.
|
| - // In _STATE_MOVING, the _data field holds the most recently returned
|
| - // future.
|
| - // When in one of the _STATE_EXTRA_* states, the it may hold the
|
| - // next data/error object, and the subscription is paused.
|
| -
|
| - /// The simple state where [_data] holds the data to return, and [moveNext]
|
| - /// is allowed. The subscription is actively listening.
|
| - static const int _STATE_FOUND = 0;
|
| - /// State set after [moveNext] has returned false or an error,
|
| - /// or after calling [cancel]. The subscription is always canceled.
|
| - static const int _STATE_DONE = 1;
|
| - /// State set after calling [moveNext], but before its returned future has
|
| - /// completed. Calling [moveNext] again is not allowed in this state.
|
| - /// The subscription is actively listening.
|
| - static const int _STATE_MOVING = 2;
|
| - /// States set when another event occurs while in _STATE_FOUND.
|
| - /// This extra overflow event is cached until the next call to [moveNext],
|
| - /// which will complete as if it received the event normally.
|
| - /// The subscription is paused in these states, so we only ever get one
|
| - /// event too many.
|
| - static const int _STATE_EXTRA_DATA = 3;
|
| - static const int _STATE_EXTRA_ERROR = 4;
|
| - static const int _STATE_EXTRA_DONE = 5;
|
| -
|
| - /// Subscription being listened to.
|
| - StreamSubscription _subscription;
|
| -
|
| - /// The current element represented by the most recent call to moveNext.
|
| - ///
|
| - /// Is null between the time moveNext is called and its future completes.
|
| - T _current = null;
|
| -
|
| - /// The future returned by the most recent call to [moveNext].
|
| - ///
|
| - /// Also used to store the next value/error in case the stream provides an
|
| - /// event before [moveNext] is called again. In that case, the stream will
|
| - /// be paused to prevent further events.
|
| - var/*Future<bool> or T*/ _futureOrPrefetch = null;
|
| -
|
| - /// The current state.
|
| - int _state = _STATE_FOUND;
|
| -
|
| - _StreamIteratorImpl(final Stream<T> stream) {
|
| - _subscription = stream.listen(_onData,
|
| - onError: _onError,
|
| - onDone: _onDone,
|
| - cancelOnError: true);
|
| - }
|
| -
|
| - T get current => _current;
|
| -
|
| - Future<bool> moveNext() {
|
| - if (_state == _STATE_DONE) {
|
| - return new _Future<bool>.immediate(false);
|
| - }
|
| - if (_state == _STATE_MOVING) {
|
| - throw new StateError("Already waiting for next.");
|
| - }
|
| - if (_state == _STATE_FOUND) {
|
| - _state = _STATE_MOVING;
|
| - _current = null;
|
| - var result = new _Future<bool>();
|
| - _futureOrPrefetch = result;
|
| - return result;
|
| - } else {
|
| - assert(_state >= _STATE_EXTRA_DATA);
|
| - switch (_state) {
|
| - case _STATE_EXTRA_DATA:
|
| - _state = _STATE_FOUND;
|
| - _current = _futureOrPrefetch as Object /*=T*/;
|
| - _futureOrPrefetch = null;
|
| - _subscription.resume();
|
| - return new _Future<bool>.immediate(true);
|
| - case _STATE_EXTRA_ERROR:
|
| - AsyncError prefetch = _futureOrPrefetch;
|
| - _clear();
|
| - return new _Future<bool>.immediateError(prefetch.error,
|
| - prefetch.stackTrace);
|
| - case _STATE_EXTRA_DONE:
|
| - _clear();
|
| - return new _Future<bool>.immediate(false);
|
| - }
|
| - }
|
| - }
|
| -
|
| - /** Clears up the internal state when the iterator ends. */
|
| - void _clear() {
|
| - _subscription = null;
|
| - _futureOrPrefetch = null;
|
| - _current = null;
|
| - _state = _STATE_DONE;
|
| - }
|
| -
|
| - Future cancel() {
|
| - StreamSubscription subscription = _subscription;
|
| - if (subscription == null) return null;
|
| - if (_state == _STATE_MOVING) {
|
| - _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/;
|
| - _clear();
|
| - hasNext._complete(false);
|
| - } else {
|
| - _clear();
|
| - }
|
| - return subscription.cancel();
|
| - }
|
| -
|
| - void _onData(T data) {
|
| - if (_state == _STATE_MOVING) {
|
| - _current = data;
|
| - _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/;
|
| - _futureOrPrefetch = null;
|
| - _state = _STATE_FOUND;
|
| - hasNext._complete(true);
|
| - return;
|
| - }
|
| - _subscription.pause();
|
| - assert(_futureOrPrefetch == null);
|
| - _futureOrPrefetch = data;
|
| - _state = _STATE_EXTRA_DATA;
|
| - }
|
| -
|
| - void _onError(Object error, [StackTrace stackTrace]) {
|
| - if (_state == _STATE_MOVING) {
|
| - _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/;
|
| - // We have cancelOnError: true, so the subscription is canceled.
|
| - _clear();
|
| - hasNext._completeError(error, stackTrace);
|
| - return;
|
| - }
|
| - _subscription.pause();
|
| - assert(_futureOrPrefetch == null);
|
| - _futureOrPrefetch = new AsyncError(error, stackTrace);
|
| - _state = _STATE_EXTRA_ERROR;
|
| - }
|
| -
|
| - void _onDone() {
|
| - if (_state == _STATE_MOVING) {
|
| - _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/;
|
| - _clear();
|
| - hasNext._complete(false);
|
| - return;
|
| - }
|
| - _subscription.pause();
|
| - _futureOrPrefetch = null;
|
| - _state = _STATE_EXTRA_DONE;
|
| - }
|
| -}
|
| -
|
| -/** An empty broadcast stream, sending a done event as soon as possible. */
|
| -class _EmptyStream<T> extends Stream<T> {
|
| - const _EmptyStream() : super._internal();
|
| - bool get isBroadcast => true;
|
| - StreamSubscription<T> listen(void onData(T data),
|
| - {Function onError,
|
| - void onDone(),
|
| - bool cancelOnError}) {
|
| - return new _DoneStreamSubscription<T>(onDone);
|
| - }
|
| -}
|
|
|