| Index: pkg/dev_compiler/tool/input_sdk/lib/async/stream_pipe.dart
|
| diff --git a/pkg/dev_compiler/tool/input_sdk/lib/async/stream_pipe.dart b/pkg/dev_compiler/tool/input_sdk/lib/async/stream_pipe.dart
|
| deleted file mode 100644
|
| index 1125620aacb3b7b445c3745e8a079c725a980f37..0000000000000000000000000000000000000000
|
| --- a/pkg/dev_compiler/tool/input_sdk/lib/async/stream_pipe.dart
|
| +++ /dev/null
|
| @@ -1,495 +0,0 @@
|
| -// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
|
| -// for details. All rights reserved. Use of this source code is governed by a
|
| -// BSD-style license that can be found in the LICENSE file.
|
| -
|
| -part of dart.async;
|
| -
|
| -/** Runs user code and takes actions depending on success or failure. */
|
| -_runUserCode(userCode(),
|
| - onSuccess(value),
|
| - onError(error, StackTrace stackTrace)) {
|
| - try {
|
| - onSuccess(userCode());
|
| - } catch (e, s) {
|
| - AsyncError replacement = Zone.current.errorCallback(e, s);
|
| - if (replacement == null) {
|
| - onError(e, s);
|
| - } else {
|
| - var error = _nonNullError(replacement.error);
|
| - var stackTrace = replacement.stackTrace;
|
| - onError(error, stackTrace);
|
| - }
|
| - }
|
| -}
|
| -
|
| -/** Helper function to cancel a subscription and wait for the potential future,
|
| - before completing with an error. */
|
| -void _cancelAndError(StreamSubscription subscription,
|
| - _Future future,
|
| - error,
|
| - StackTrace stackTrace) {
|
| - var cancelFuture = subscription.cancel();
|
| - if (cancelFuture is Future) {
|
| - cancelFuture.whenComplete(() => future._completeError(error, stackTrace));
|
| - } else {
|
| - future._completeError(error, stackTrace);
|
| - }
|
| -}
|
| -
|
| -void _cancelAndErrorWithReplacement(StreamSubscription subscription,
|
| - _Future future,
|
| - error, StackTrace stackTrace) {
|
| - AsyncError replacement = Zone.current.errorCallback(error, stackTrace);
|
| - if (replacement != null) {
|
| - error = _nonNullError(replacement.error);
|
| - stackTrace = replacement.stackTrace;
|
| - }
|
| - _cancelAndError(subscription, future, error, stackTrace);
|
| -}
|
| -
|
| -typedef void _ErrorCallback(error, StackTrace stackTrace);
|
| -
|
| -/** Helper function to make an onError argument to [_runUserCode]. */
|
| -_ErrorCallback _cancelAndErrorClosure(
|
| - StreamSubscription subscription, _Future future) {
|
| - return (error, StackTrace stackTrace) {
|
| - _cancelAndError(subscription, future, error, stackTrace);
|
| - };
|
| -}
|
| -
|
| -/** Helper function to cancel a subscription and wait for the potential future,
|
| - before completing with a value. */
|
| -void _cancelAndValue(StreamSubscription subscription, _Future future, value) {
|
| - var cancelFuture = subscription.cancel();
|
| - if (cancelFuture is Future) {
|
| - cancelFuture.whenComplete(() => future._complete(value));
|
| - } else {
|
| - future._complete(value);
|
| - }
|
| -}
|
| -
|
| -
|
| -/**
|
| - * A [Stream] that forwards subscriptions to another stream.
|
| - *
|
| - * This stream implements [Stream], but forwards all subscriptions
|
| - * to an underlying stream, and wraps the returned subscription to
|
| - * modify the events on the way.
|
| - *
|
| - * This class is intended for internal use only.
|
| - */
|
| -abstract class _ForwardingStream<S, T> extends Stream<T> {
|
| - final Stream<S> _source;
|
| -
|
| - _ForwardingStream(this._source);
|
| -
|
| - bool get isBroadcast => _source.isBroadcast;
|
| -
|
| - StreamSubscription<T> listen(void onData(T value),
|
| - { Function onError,
|
| - void onDone(),
|
| - bool cancelOnError }) {
|
| - cancelOnError = identical(true, cancelOnError);
|
| - return _createSubscription(onData, onError, onDone, cancelOnError);
|
| - }
|
| -
|
| - StreamSubscription<T> _createSubscription(
|
| - void onData(T data),
|
| - Function onError,
|
| - void onDone(),
|
| - bool cancelOnError) {
|
| - return new _ForwardingStreamSubscription<S, T>(
|
| - this, onData, onError, onDone, cancelOnError);
|
| - }
|
| -
|
| - // Override the following methods in subclasses to change the behavior.
|
| -
|
| - void _handleData(S data, _EventSink<T> sink) {
|
| - sink._add(data as Object /*=T*/);
|
| - }
|
| -
|
| - void _handleError(error, StackTrace stackTrace, _EventSink<T> sink) {
|
| - sink._addError(error, stackTrace);
|
| - }
|
| -
|
| - void _handleDone(_EventSink<T> sink) {
|
| - sink._close();
|
| - }
|
| -}
|
| -
|
| -/**
|
| - * Abstract superclass for subscriptions that forward to other subscriptions.
|
| - */
|
| -class _ForwardingStreamSubscription<S, T>
|
| - extends _BufferingStreamSubscription<T> {
|
| - final _ForwardingStream<S, T> _stream;
|
| -
|
| - StreamSubscription<S> _subscription;
|
| -
|
| - _ForwardingStreamSubscription(this._stream, void onData(T data),
|
| - Function onError, void onDone(),
|
| - bool cancelOnError)
|
| - : super(onData, onError, onDone, cancelOnError) {
|
| - _subscription = _stream._source.listen(_handleData,
|
| - onError: _handleError,
|
| - onDone: _handleDone);
|
| - }
|
| -
|
| - // _StreamSink interface.
|
| - // Transformers sending more than one event have no way to know if the stream
|
| - // is canceled or closed after the first, so we just ignore remaining events.
|
| -
|
| - void _add(T data) {
|
| - if (_isClosed) return;
|
| - super._add(data);
|
| - }
|
| -
|
| - void _addError(Object error, StackTrace stackTrace) {
|
| - if (_isClosed) return;
|
| - super._addError(error, stackTrace);
|
| - }
|
| -
|
| - // StreamSubscription callbacks.
|
| -
|
| - void _onPause() {
|
| - if (_subscription == null) return;
|
| - _subscription.pause();
|
| - }
|
| -
|
| - void _onResume() {
|
| - if (_subscription == null) return;
|
| - _subscription.resume();
|
| - }
|
| -
|
| - Future _onCancel() {
|
| - if (_subscription != null) {
|
| - StreamSubscription subscription = _subscription;
|
| - _subscription = null;
|
| - return subscription.cancel();
|
| - }
|
| - return null;
|
| - }
|
| -
|
| - // Methods used as listener on source subscription.
|
| -
|
| - void _handleData(S data) {
|
| - _stream._handleData(data, this);
|
| - }
|
| -
|
| - void _handleError(error, StackTrace stackTrace) {
|
| - _stream._handleError(error, stackTrace, this);
|
| - }
|
| -
|
| - void _handleDone() {
|
| - _stream._handleDone(this);
|
| - }
|
| -}
|
| -
|
| -// -------------------------------------------------------------------
|
| -// Stream transformers used by the default Stream implementation.
|
| -// -------------------------------------------------------------------
|
| -
|
| -typedef bool _Predicate<T>(T value);
|
| -
|
| -void _addErrorWithReplacement(_EventSink sink, error, stackTrace) {
|
| - AsyncError replacement = Zone.current.errorCallback(error, stackTrace);
|
| - if (replacement != null) {
|
| - error = _nonNullError(replacement.error);
|
| - stackTrace = replacement.stackTrace;
|
| - }
|
| - sink._addError(error, stackTrace);
|
| -}
|
| -
|
| -
|
| -class _WhereStream<T> extends _ForwardingStream<T, T> {
|
| - final _Predicate<T> _test;
|
| -
|
| - _WhereStream(Stream<T> source, bool test(T value))
|
| - : _test = test, super(source);
|
| -
|
| - void _handleData(T inputEvent, _EventSink<T> sink) {
|
| - bool satisfies;
|
| - try {
|
| - satisfies = _test(inputEvent);
|
| - } catch (e, s) {
|
| - _addErrorWithReplacement(sink, e, s);
|
| - return;
|
| - }
|
| - if (satisfies) {
|
| - sink._add(inputEvent);
|
| - }
|
| - }
|
| -}
|
| -
|
| -
|
| -typedef T _Transformation<S, T>(S value);
|
| -
|
| -/**
|
| - * A stream pipe that converts data events before passing them on.
|
| - */
|
| -class _MapStream<S, T> extends _ForwardingStream<S, T> {
|
| - final _Transformation<S, T> _transform;
|
| -
|
| - _MapStream(Stream<S> source, T transform(S event))
|
| - : this._transform = transform, super(source);
|
| -
|
| - void _handleData(S inputEvent, _EventSink<T> sink) {
|
| - T outputEvent;
|
| - try {
|
| - outputEvent = _transform(inputEvent);
|
| - } catch (e, s) {
|
| - _addErrorWithReplacement(sink, e, s);
|
| - return;
|
| - }
|
| - sink._add(outputEvent);
|
| - }
|
| -}
|
| -
|
| -/**
|
| - * A stream pipe that converts data events before passing them on.
|
| - */
|
| -class _ExpandStream<S, T> extends _ForwardingStream<S, T> {
|
| - final _Transformation<S, Iterable<T>> _expand;
|
| -
|
| - _ExpandStream(Stream<S> source, Iterable<T> expand(S event))
|
| - : this._expand = expand, super(source);
|
| -
|
| - void _handleData(S inputEvent, _EventSink<T> sink) {
|
| - try {
|
| - for (T value in _expand(inputEvent)) {
|
| - sink._add(value);
|
| - }
|
| - } catch (e, s) {
|
| - // If either _expand or iterating the generated iterator throws,
|
| - // we abort the iteration.
|
| - _addErrorWithReplacement(sink, e, s);
|
| - }
|
| - }
|
| -}
|
| -
|
| -
|
| -typedef bool _ErrorTest(error);
|
| -
|
| -/**
|
| - * A stream pipe that converts or disposes error events
|
| - * before passing them on.
|
| - */
|
| -class _HandleErrorStream<T> extends _ForwardingStream<T, T> {
|
| - final Function _transform;
|
| - final _ErrorTest _test;
|
| -
|
| - _HandleErrorStream(Stream<T> source,
|
| - Function onError,
|
| - bool test(error))
|
| - : this._transform = onError, this._test = test, super(source);
|
| -
|
| - void _handleError(Object error, StackTrace stackTrace, _EventSink<T> sink) {
|
| - bool matches = true;
|
| - if (_test != null) {
|
| - try {
|
| - matches = _test(error);
|
| - } catch (e, s) {
|
| - _addErrorWithReplacement(sink, e, s);
|
| - return;
|
| - }
|
| - }
|
| - if (matches) {
|
| - try {
|
| - _invokeErrorHandler(_transform, error, stackTrace);
|
| - } catch (e, s) {
|
| - if (identical(e, error)) {
|
| - sink._addError(error, stackTrace);
|
| - } else {
|
| - _addErrorWithReplacement(sink, e, s);
|
| - }
|
| - return;
|
| - }
|
| - } else {
|
| - sink._addError(error, stackTrace);
|
| - }
|
| - }
|
| -}
|
| -
|
| -
|
| -class _TakeStream<T> extends _ForwardingStream<T, T> {
|
| - final int _count;
|
| -
|
| - _TakeStream(Stream<T> source, int count)
|
| - : this._count = count, super(source) {
|
| - // This test is done early to avoid handling an async error
|
| - // in the _handleData method.
|
| - if (count is! int) throw new ArgumentError(count);
|
| - }
|
| -
|
| - StreamSubscription<T> _createSubscription(
|
| - void onData(T data),
|
| - Function onError,
|
| - void onDone(),
|
| - bool cancelOnError) {
|
| - return new _StateStreamSubscription<T>(
|
| - this, onData, onError, onDone, cancelOnError, _count);
|
| - }
|
| -
|
| - void _handleData(T inputEvent, _EventSink<T> sink) {
|
| - _StateStreamSubscription<T> subscription = sink;
|
| - int count = subscription._count;
|
| - if (count > 0) {
|
| - sink._add(inputEvent);
|
| - count -= 1;
|
| - subscription._count = count;
|
| - if (count == 0) {
|
| - // Closing also unsubscribes all subscribers, which unsubscribes
|
| - // this from source.
|
| - sink._close();
|
| - }
|
| - }
|
| - }
|
| -}
|
| -
|
| -/**
|
| - * A [_ForwardingStreamSubscription] with one extra state field.
|
| - *
|
| - * Use by several different classes, some storing an integer, others a bool.
|
| - */
|
| -class _StateStreamSubscription<T> extends _ForwardingStreamSubscription<T, T> {
|
| - // Raw state field. Typed access provided by getters and setters below.
|
| - var _sharedState;
|
| -
|
| - _StateStreamSubscription(_ForwardingStream<T, T> stream, void onData(T data),
|
| - Function onError, void onDone(),
|
| - bool cancelOnError, this._sharedState)
|
| - : super(stream, onData, onError, onDone, cancelOnError);
|
| -
|
| - bool get _flag => _sharedState;
|
| - void set _flag(bool flag) { _sharedState = flag; }
|
| - int get _count => _sharedState;
|
| - void set _count(int count) { _sharedState = count; }
|
| -}
|
| -
|
| -
|
| -class _TakeWhileStream<T> extends _ForwardingStream<T, T> {
|
| - final _Predicate<T> _test;
|
| -
|
| - _TakeWhileStream(Stream<T> source, bool test(T value))
|
| - : this._test = test, super(source);
|
| -
|
| - void _handleData(T inputEvent, _EventSink<T> sink) {
|
| - bool satisfies;
|
| - try {
|
| - satisfies = _test(inputEvent);
|
| - } catch (e, s) {
|
| - _addErrorWithReplacement(sink, e, s);
|
| - // The test didn't say true. Didn't say false either, but we stop anyway.
|
| - sink._close();
|
| - return;
|
| - }
|
| - if (satisfies) {
|
| - sink._add(inputEvent);
|
| - } else {
|
| - sink._close();
|
| - }
|
| - }
|
| -}
|
| -
|
| -class _SkipStream<T> extends _ForwardingStream<T, T> {
|
| - final int _count;
|
| -
|
| - _SkipStream(Stream<T> source, int count)
|
| - : this._count = count, super(source) {
|
| - // This test is done early to avoid handling an async error
|
| - // in the _handleData method.
|
| - if (count is! int || count < 0) throw new ArgumentError(count);
|
| - }
|
| -
|
| - StreamSubscription<T> _createSubscription(
|
| - void onData(T data),
|
| - Function onError,
|
| - void onDone(),
|
| - bool cancelOnError) {
|
| - return new _StateStreamSubscription<T>(
|
| - this, onData, onError, onDone, cancelOnError, _count);
|
| - }
|
| -
|
| - void _handleData(T inputEvent, _EventSink<T> sink) {
|
| - _StateStreamSubscription<T> subscription = sink;
|
| - int count = subscription._count;
|
| - if (count > 0) {
|
| - subscription._count = count - 1;
|
| - return;
|
| - }
|
| - sink._add(inputEvent);
|
| - }
|
| -}
|
| -
|
| -class _SkipWhileStream<T> extends _ForwardingStream<T, T> {
|
| - final _Predicate<T> _test;
|
| -
|
| - _SkipWhileStream(Stream<T> source, bool test(T value))
|
| - : this._test = test, super(source);
|
| -
|
| - StreamSubscription<T> _createSubscription(
|
| - void onData(T data),
|
| - Function onError,
|
| - void onDone(),
|
| - bool cancelOnError) {
|
| - return new _StateStreamSubscription<T>(
|
| - this, onData, onError, onDone, cancelOnError, false);
|
| - }
|
| -
|
| - void _handleData(T inputEvent, _EventSink<T> sink) {
|
| - _StateStreamSubscription<T> subscription = sink;
|
| - bool hasFailed = subscription._flag;
|
| - if (hasFailed) {
|
| - sink._add(inputEvent);
|
| - return;
|
| - }
|
| - bool satisfies;
|
| - try {
|
| - satisfies = _test(inputEvent);
|
| - } catch (e, s) {
|
| - _addErrorWithReplacement(sink, e, s);
|
| - // A failure to return a boolean is considered "not matching".
|
| - subscription._flag = true;
|
| - return;
|
| - }
|
| - if (!satisfies) {
|
| - subscription._flag = true;
|
| - sink._add(inputEvent);
|
| - }
|
| - }
|
| -}
|
| -
|
| -typedef bool _Equality<T>(T a, T b);
|
| -
|
| -class _DistinctStream<T> extends _ForwardingStream<T, T> {
|
| - static var _SENTINEL = new Object();
|
| -
|
| - _Equality<T> _equals;
|
| - var _previous = _SENTINEL;
|
| -
|
| - _DistinctStream(Stream<T> source, bool equals(T a, T b))
|
| - : _equals = equals, super(source);
|
| -
|
| - void _handleData(T inputEvent, _EventSink<T> sink) {
|
| - if (identical(_previous, _SENTINEL)) {
|
| - _previous = inputEvent;
|
| - return sink._add(inputEvent);
|
| - } else {
|
| - bool isEqual;
|
| - try {
|
| - if (_equals == null) {
|
| - isEqual = (_previous == inputEvent);
|
| - } else {
|
| - isEqual = _equals(_previous as Object /*=T*/, inputEvent);
|
| - }
|
| - } catch (e, s) {
|
| - _addErrorWithReplacement(sink, e, s);
|
| - return null;
|
| - }
|
| - if (!isEqual) {
|
| - sink._add(inputEvent);
|
| - _previous = inputEvent;
|
| - }
|
| - }
|
| - }
|
| -}
|
|
|