Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(228)

Unified Diff: pkg/dev_compiler/tool/input_sdk/lib/async/stream_pipe.dart

Issue 2698353003: unfork DDC's copy of most SDK libraries (Closed)
Patch Set: revert core_patch Created 3 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: pkg/dev_compiler/tool/input_sdk/lib/async/stream_pipe.dart
diff --git a/pkg/dev_compiler/tool/input_sdk/lib/async/stream_pipe.dart b/pkg/dev_compiler/tool/input_sdk/lib/async/stream_pipe.dart
deleted file mode 100644
index 1125620aacb3b7b445c3745e8a079c725a980f37..0000000000000000000000000000000000000000
--- a/pkg/dev_compiler/tool/input_sdk/lib/async/stream_pipe.dart
+++ /dev/null
@@ -1,495 +0,0 @@
-// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
-// for details. All rights reserved. Use of this source code is governed by a
-// BSD-style license that can be found in the LICENSE file.
-
-part of dart.async;
-
-/** Runs user code and takes actions depending on success or failure. */
-_runUserCode(userCode(),
- onSuccess(value),
- onError(error, StackTrace stackTrace)) {
- try {
- onSuccess(userCode());
- } catch (e, s) {
- AsyncError replacement = Zone.current.errorCallback(e, s);
- if (replacement == null) {
- onError(e, s);
- } else {
- var error = _nonNullError(replacement.error);
- var stackTrace = replacement.stackTrace;
- onError(error, stackTrace);
- }
- }
-}
-
-/** Helper function to cancel a subscription and wait for the potential future,
- before completing with an error. */
-void _cancelAndError(StreamSubscription subscription,
- _Future future,
- error,
- StackTrace stackTrace) {
- var cancelFuture = subscription.cancel();
- if (cancelFuture is Future) {
- cancelFuture.whenComplete(() => future._completeError(error, stackTrace));
- } else {
- future._completeError(error, stackTrace);
- }
-}
-
-void _cancelAndErrorWithReplacement(StreamSubscription subscription,
- _Future future,
- error, StackTrace stackTrace) {
- AsyncError replacement = Zone.current.errorCallback(error, stackTrace);
- if (replacement != null) {
- error = _nonNullError(replacement.error);
- stackTrace = replacement.stackTrace;
- }
- _cancelAndError(subscription, future, error, stackTrace);
-}
-
-typedef void _ErrorCallback(error, StackTrace stackTrace);
-
-/** Helper function to make an onError argument to [_runUserCode]. */
-_ErrorCallback _cancelAndErrorClosure(
- StreamSubscription subscription, _Future future) {
- return (error, StackTrace stackTrace) {
- _cancelAndError(subscription, future, error, stackTrace);
- };
-}
-
-/** Helper function to cancel a subscription and wait for the potential future,
- before completing with a value. */
-void _cancelAndValue(StreamSubscription subscription, _Future future, value) {
- var cancelFuture = subscription.cancel();
- if (cancelFuture is Future) {
- cancelFuture.whenComplete(() => future._complete(value));
- } else {
- future._complete(value);
- }
-}
-
-
-/**
- * A [Stream] that forwards subscriptions to another stream.
- *
- * This stream implements [Stream], but forwards all subscriptions
- * to an underlying stream, and wraps the returned subscription to
- * modify the events on the way.
- *
- * This class is intended for internal use only.
- */
-abstract class _ForwardingStream<S, T> extends Stream<T> {
- final Stream<S> _source;
-
- _ForwardingStream(this._source);
-
- bool get isBroadcast => _source.isBroadcast;
-
- StreamSubscription<T> listen(void onData(T value),
- { Function onError,
- void onDone(),
- bool cancelOnError }) {
- cancelOnError = identical(true, cancelOnError);
- return _createSubscription(onData, onError, onDone, cancelOnError);
- }
-
- StreamSubscription<T> _createSubscription(
- void onData(T data),
- Function onError,
- void onDone(),
- bool cancelOnError) {
- return new _ForwardingStreamSubscription<S, T>(
- this, onData, onError, onDone, cancelOnError);
- }
-
- // Override the following methods in subclasses to change the behavior.
-
- void _handleData(S data, _EventSink<T> sink) {
- sink._add(data as Object /*=T*/);
- }
-
- void _handleError(error, StackTrace stackTrace, _EventSink<T> sink) {
- sink._addError(error, stackTrace);
- }
-
- void _handleDone(_EventSink<T> sink) {
- sink._close();
- }
-}
-
-/**
- * Abstract superclass for subscriptions that forward to other subscriptions.
- */
-class _ForwardingStreamSubscription<S, T>
- extends _BufferingStreamSubscription<T> {
- final _ForwardingStream<S, T> _stream;
-
- StreamSubscription<S> _subscription;
-
- _ForwardingStreamSubscription(this._stream, void onData(T data),
- Function onError, void onDone(),
- bool cancelOnError)
- : super(onData, onError, onDone, cancelOnError) {
- _subscription = _stream._source.listen(_handleData,
- onError: _handleError,
- onDone: _handleDone);
- }
-
- // _StreamSink interface.
- // Transformers sending more than one event have no way to know if the stream
- // is canceled or closed after the first, so we just ignore remaining events.
-
- void _add(T data) {
- if (_isClosed) return;
- super._add(data);
- }
-
- void _addError(Object error, StackTrace stackTrace) {
- if (_isClosed) return;
- super._addError(error, stackTrace);
- }
-
- // StreamSubscription callbacks.
-
- void _onPause() {
- if (_subscription == null) return;
- _subscription.pause();
- }
-
- void _onResume() {
- if (_subscription == null) return;
- _subscription.resume();
- }
-
- Future _onCancel() {
- if (_subscription != null) {
- StreamSubscription subscription = _subscription;
- _subscription = null;
- return subscription.cancel();
- }
- return null;
- }
-
- // Methods used as listener on source subscription.
-
- void _handleData(S data) {
- _stream._handleData(data, this);
- }
-
- void _handleError(error, StackTrace stackTrace) {
- _stream._handleError(error, stackTrace, this);
- }
-
- void _handleDone() {
- _stream._handleDone(this);
- }
-}
-
-// -------------------------------------------------------------------
-// Stream transformers used by the default Stream implementation.
-// -------------------------------------------------------------------
-
-typedef bool _Predicate<T>(T value);
-
-void _addErrorWithReplacement(_EventSink sink, error, stackTrace) {
- AsyncError replacement = Zone.current.errorCallback(error, stackTrace);
- if (replacement != null) {
- error = _nonNullError(replacement.error);
- stackTrace = replacement.stackTrace;
- }
- sink._addError(error, stackTrace);
-}
-
-
-class _WhereStream<T> extends _ForwardingStream<T, T> {
- final _Predicate<T> _test;
-
- _WhereStream(Stream<T> source, bool test(T value))
- : _test = test, super(source);
-
- void _handleData(T inputEvent, _EventSink<T> sink) {
- bool satisfies;
- try {
- satisfies = _test(inputEvent);
- } catch (e, s) {
- _addErrorWithReplacement(sink, e, s);
- return;
- }
- if (satisfies) {
- sink._add(inputEvent);
- }
- }
-}
-
-
-typedef T _Transformation<S, T>(S value);
-
-/**
- * A stream pipe that converts data events before passing them on.
- */
-class _MapStream<S, T> extends _ForwardingStream<S, T> {
- final _Transformation<S, T> _transform;
-
- _MapStream(Stream<S> source, T transform(S event))
- : this._transform = transform, super(source);
-
- void _handleData(S inputEvent, _EventSink<T> sink) {
- T outputEvent;
- try {
- outputEvent = _transform(inputEvent);
- } catch (e, s) {
- _addErrorWithReplacement(sink, e, s);
- return;
- }
- sink._add(outputEvent);
- }
-}
-
-/**
- * A stream pipe that converts data events before passing them on.
- */
-class _ExpandStream<S, T> extends _ForwardingStream<S, T> {
- final _Transformation<S, Iterable<T>> _expand;
-
- _ExpandStream(Stream<S> source, Iterable<T> expand(S event))
- : this._expand = expand, super(source);
-
- void _handleData(S inputEvent, _EventSink<T> sink) {
- try {
- for (T value in _expand(inputEvent)) {
- sink._add(value);
- }
- } catch (e, s) {
- // If either _expand or iterating the generated iterator throws,
- // we abort the iteration.
- _addErrorWithReplacement(sink, e, s);
- }
- }
-}
-
-
-typedef bool _ErrorTest(error);
-
-/**
- * A stream pipe that converts or disposes error events
- * before passing them on.
- */
-class _HandleErrorStream<T> extends _ForwardingStream<T, T> {
- final Function _transform;
- final _ErrorTest _test;
-
- _HandleErrorStream(Stream<T> source,
- Function onError,
- bool test(error))
- : this._transform = onError, this._test = test, super(source);
-
- void _handleError(Object error, StackTrace stackTrace, _EventSink<T> sink) {
- bool matches = true;
- if (_test != null) {
- try {
- matches = _test(error);
- } catch (e, s) {
- _addErrorWithReplacement(sink, e, s);
- return;
- }
- }
- if (matches) {
- try {
- _invokeErrorHandler(_transform, error, stackTrace);
- } catch (e, s) {
- if (identical(e, error)) {
- sink._addError(error, stackTrace);
- } else {
- _addErrorWithReplacement(sink, e, s);
- }
- return;
- }
- } else {
- sink._addError(error, stackTrace);
- }
- }
-}
-
-
-class _TakeStream<T> extends _ForwardingStream<T, T> {
- final int _count;
-
- _TakeStream(Stream<T> source, int count)
- : this._count = count, super(source) {
- // This test is done early to avoid handling an async error
- // in the _handleData method.
- if (count is! int) throw new ArgumentError(count);
- }
-
- StreamSubscription<T> _createSubscription(
- void onData(T data),
- Function onError,
- void onDone(),
- bool cancelOnError) {
- return new _StateStreamSubscription<T>(
- this, onData, onError, onDone, cancelOnError, _count);
- }
-
- void _handleData(T inputEvent, _EventSink<T> sink) {
- _StateStreamSubscription<T> subscription = sink;
- int count = subscription._count;
- if (count > 0) {
- sink._add(inputEvent);
- count -= 1;
- subscription._count = count;
- if (count == 0) {
- // Closing also unsubscribes all subscribers, which unsubscribes
- // this from source.
- sink._close();
- }
- }
- }
-}
-
-/**
- * A [_ForwardingStreamSubscription] with one extra state field.
- *
- * Use by several different classes, some storing an integer, others a bool.
- */
-class _StateStreamSubscription<T> extends _ForwardingStreamSubscription<T, T> {
- // Raw state field. Typed access provided by getters and setters below.
- var _sharedState;
-
- _StateStreamSubscription(_ForwardingStream<T, T> stream, void onData(T data),
- Function onError, void onDone(),
- bool cancelOnError, this._sharedState)
- : super(stream, onData, onError, onDone, cancelOnError);
-
- bool get _flag => _sharedState;
- void set _flag(bool flag) { _sharedState = flag; }
- int get _count => _sharedState;
- void set _count(int count) { _sharedState = count; }
-}
-
-
-class _TakeWhileStream<T> extends _ForwardingStream<T, T> {
- final _Predicate<T> _test;
-
- _TakeWhileStream(Stream<T> source, bool test(T value))
- : this._test = test, super(source);
-
- void _handleData(T inputEvent, _EventSink<T> sink) {
- bool satisfies;
- try {
- satisfies = _test(inputEvent);
- } catch (e, s) {
- _addErrorWithReplacement(sink, e, s);
- // The test didn't say true. Didn't say false either, but we stop anyway.
- sink._close();
- return;
- }
- if (satisfies) {
- sink._add(inputEvent);
- } else {
- sink._close();
- }
- }
-}
-
-class _SkipStream<T> extends _ForwardingStream<T, T> {
- final int _count;
-
- _SkipStream(Stream<T> source, int count)
- : this._count = count, super(source) {
- // This test is done early to avoid handling an async error
- // in the _handleData method.
- if (count is! int || count < 0) throw new ArgumentError(count);
- }
-
- StreamSubscription<T> _createSubscription(
- void onData(T data),
- Function onError,
- void onDone(),
- bool cancelOnError) {
- return new _StateStreamSubscription<T>(
- this, onData, onError, onDone, cancelOnError, _count);
- }
-
- void _handleData(T inputEvent, _EventSink<T> sink) {
- _StateStreamSubscription<T> subscription = sink;
- int count = subscription._count;
- if (count > 0) {
- subscription._count = count - 1;
- return;
- }
- sink._add(inputEvent);
- }
-}
-
-class _SkipWhileStream<T> extends _ForwardingStream<T, T> {
- final _Predicate<T> _test;
-
- _SkipWhileStream(Stream<T> source, bool test(T value))
- : this._test = test, super(source);
-
- StreamSubscription<T> _createSubscription(
- void onData(T data),
- Function onError,
- void onDone(),
- bool cancelOnError) {
- return new _StateStreamSubscription<T>(
- this, onData, onError, onDone, cancelOnError, false);
- }
-
- void _handleData(T inputEvent, _EventSink<T> sink) {
- _StateStreamSubscription<T> subscription = sink;
- bool hasFailed = subscription._flag;
- if (hasFailed) {
- sink._add(inputEvent);
- return;
- }
- bool satisfies;
- try {
- satisfies = _test(inputEvent);
- } catch (e, s) {
- _addErrorWithReplacement(sink, e, s);
- // A failure to return a boolean is considered "not matching".
- subscription._flag = true;
- return;
- }
- if (!satisfies) {
- subscription._flag = true;
- sink._add(inputEvent);
- }
- }
-}
-
-typedef bool _Equality<T>(T a, T b);
-
-class _DistinctStream<T> extends _ForwardingStream<T, T> {
- static var _SENTINEL = new Object();
-
- _Equality<T> _equals;
- var _previous = _SENTINEL;
-
- _DistinctStream(Stream<T> source, bool equals(T a, T b))
- : _equals = equals, super(source);
-
- void _handleData(T inputEvent, _EventSink<T> sink) {
- if (identical(_previous, _SENTINEL)) {
- _previous = inputEvent;
- return sink._add(inputEvent);
- } else {
- bool isEqual;
- try {
- if (_equals == null) {
- isEqual = (_previous == inputEvent);
- } else {
- isEqual = _equals(_previous as Object /*=T*/, inputEvent);
- }
- } catch (e, s) {
- _addErrorWithReplacement(sink, e, s);
- return null;
- }
- if (!isEqual) {
- sink._add(inputEvent);
- _previous = inputEvent;
- }
- }
- }
-}

Powered by Google App Engine
This is Rietveld 408576698