| Index: sdk/lib/async/stream_pipe.dart
|
| diff --git a/sdk/lib/async/stream_pipe.dart b/sdk/lib/async/stream_pipe.dart
|
| index b126747d848bcaa84ebe0315132ecc37ff042330..5d75136a6d15d0221e4a5944fb97ed3360fb6d81 100644
|
| --- a/sdk/lib/async/stream_pipe.dart
|
| +++ b/sdk/lib/async/stream_pipe.dart
|
| @@ -16,19 +16,21 @@ _asyncError(Object error, Object stackTrace) {
|
| }
|
|
|
| /** Runs user code and takes actions depending on success or failure. */
|
| -_runUserCode(userCode(), onSuccess(value), onError(error)) {
|
| +_runUserCode(userCode(),
|
| + onSuccess(value),
|
| + onError(error, StackTrace stackTrace)) {
|
| try {
|
| onSuccess(userCode());
|
| } catch (e, s) {
|
| - onError(_asyncError(e, s));
|
| + onError(_asyncError(e, s), s);
|
| }
|
| }
|
|
|
| /** Helper function to make an onError argument to [_runUserCode]. */
|
| _cancelAndError(StreamSubscription subscription, _Future future) =>
|
| - (error) {
|
| + (error, StackTrace stackTrace) {
|
| subscription.cancel();
|
| - future._completeError(error);
|
| + future._completeError(error, stackTrace);
|
| };
|
|
|
|
|
| @@ -49,7 +51,7 @@ abstract class _ForwardingStream<S, T> extends Stream<T> {
|
| bool get isBroadcast => _source.isBroadcast;
|
|
|
| StreamSubscription<T> listen(void onData(T value),
|
| - { void onError(error),
|
| + { Function onError,
|
| void onDone(),
|
| bool cancelOnError }) {
|
| if (onData == null) onData = _nullDataHandler;
|
| @@ -60,7 +62,7 @@ abstract class _ForwardingStream<S, T> extends Stream<T> {
|
| }
|
|
|
| StreamSubscription<T> _createSubscription(void onData(T value),
|
| - void onError(error),
|
| + Function onError,
|
| void onDone(),
|
| bool cancelOnError) {
|
| return new _ForwardingStreamSubscription<S, T>(
|
| @@ -74,8 +76,8 @@ abstract class _ForwardingStream<S, T> extends Stream<T> {
|
| sink._add(outputData);
|
| }
|
|
|
| - void _handleError(error, _EventSink<T> sink) {
|
| - sink._addError(error);
|
| + void _handleError(error, StackTrace stackTrace, _EventSink<T> sink) {
|
| + sink._addError(error, stackTrace);
|
| }
|
|
|
| void _handleDone(_EventSink<T> sink) {
|
| @@ -94,7 +96,7 @@ class _ForwardingStreamSubscription<S, T>
|
|
|
| _ForwardingStreamSubscription(this._stream,
|
| void onData(T data),
|
| - void onError(error),
|
| + Function onError,
|
| void onDone(),
|
| bool cancelOnError)
|
| : super(onData, onError, onDone, cancelOnError) {
|
| @@ -113,9 +115,9 @@ class _ForwardingStreamSubscription<S, T>
|
| super._add(data);
|
| }
|
|
|
| - void _addError(Object error) {
|
| + void _addError(Object error, StackTrace stackTrace) {
|
| if (_isClosed) return;
|
| - super._addError(error);
|
| + super._addError(error, stackTrace);
|
| }
|
|
|
| // StreamSubscription callbacks.
|
| @@ -144,8 +146,8 @@ class _ForwardingStreamSubscription<S, T>
|
| _stream._handleData(data, this);
|
| }
|
|
|
| - void _handleError(error) {
|
| - _stream._handleError(error, this);
|
| + void _handleError(error, StackTrace stackTrace) {
|
| + _stream._handleError(error, stackTrace, this);
|
| }
|
|
|
| void _handleDone() {
|
| @@ -170,7 +172,7 @@ class _WhereStream<T> extends _ForwardingStream<T, T> {
|
| try {
|
| satisfies = _test(inputEvent);
|
| } catch (e, s) {
|
| - sink._addError(_asyncError(e, s));
|
| + sink._addError(_asyncError(e, s), s);
|
| return;
|
| }
|
| if (satisfies) {
|
| @@ -196,7 +198,7 @@ class _MapStream<S, T> extends _ForwardingStream<S, T> {
|
| try {
|
| outputEvent = _transform(inputEvent);
|
| } catch (e, s) {
|
| - sink._addError(_asyncError(e, s));
|
| + sink._addError(_asyncError(e, s), s);
|
| return;
|
| }
|
| sink._add(outputEvent);
|
| @@ -220,13 +222,12 @@ class _ExpandStream<S, T> extends _ForwardingStream<S, T> {
|
| } catch (e, s) {
|
| // If either _expand or iterating the generated iterator throws,
|
| // we abort the iteration.
|
| - sink._addError(_asyncError(e, s));
|
| + sink._addError(_asyncError(e, s), s);
|
| }
|
| }
|
| }
|
|
|
|
|
| -typedef void _ErrorTransformation(error);
|
| typedef bool _ErrorTest(error);
|
|
|
| /**
|
| @@ -234,33 +235,41 @@ typedef bool _ErrorTest(error);
|
| * before passing them on.
|
| */
|
| class _HandleErrorStream<T> extends _ForwardingStream<T, T> {
|
| - final _ErrorTransformation _transform;
|
| + final Function _transform;
|
| final _ErrorTest _test;
|
|
|
| _HandleErrorStream(Stream<T> source,
|
| - void transform(event),
|
| - bool test(error))
|
| - : this._transform = transform, this._test = test, super(source);
|
| + Function onError,
|
| + bool test(error))
|
| + : this._transform = onError, this._test = test, super(source);
|
|
|
| - void _handleError(Object error, _EventSink<T> sink) {
|
| + void _handleError(Object error, StackTrace stackTrace, _EventSink<T> sink) {
|
| bool matches = true;
|
| if (_test != null) {
|
| try {
|
| matches = _test(error);
|
| } catch (e, s) {
|
| - sink._addError(_asyncError(e, s));
|
| + sink._addError(_asyncError(e, s), s);
|
| return;
|
| }
|
| }
|
| if (matches) {
|
| try {
|
| - _transform(error);
|
| + if (_transform is ZoneBinaryCallback) {
|
| + _transform(error, stackTrace);
|
| + } else {
|
| + _transform(error);
|
| + }
|
| } catch (e, s) {
|
| - sink._addError(_asyncError(e, s));
|
| + if (identical(e, error)) {
|
| + sink._addError(error, stackTrace);
|
| + } else {
|
| + sink._addError(_asyncError(e, s), s);
|
| + }
|
| return;
|
| }
|
| } else {
|
| - sink._addError(error);
|
| + sink._addError(error, stackTrace);
|
| }
|
| }
|
| }
|
| @@ -301,7 +310,7 @@ class _TakeWhileStream<T> extends _ForwardingStream<T, T> {
|
| try {
|
| satisfies = _test(inputEvent);
|
| } catch (e, s) {
|
| - sink._addError(_asyncError(e, s));
|
| + sink._addError(_asyncError(e, s), s);
|
| // The test didn't say true. Didn't say false either, but we stop anyway.
|
| sink._close();
|
| return;
|
| @@ -349,7 +358,7 @@ class _SkipWhileStream<T> extends _ForwardingStream<T, T> {
|
| try {
|
| satisfies = _test(inputEvent);
|
| } catch (e, s) {
|
| - sink._addError(_asyncError(e, s));
|
| + sink._addError(_asyncError(e, s), s);
|
| // A failure to return a boolean is considered "not matching".
|
| _hasFailed = true;
|
| return;
|
| @@ -385,7 +394,7 @@ class _DistinctStream<T> extends _ForwardingStream<T, T> {
|
| isEqual = _equals(_previous, inputEvent);
|
| }
|
| } catch (e, s) {
|
| - sink._addError(_asyncError(e, s));
|
| + sink._addError(_asyncError(e, s), s);
|
| return null;
|
| }
|
| if (!isEqual) {
|
| @@ -399,7 +408,7 @@ class _DistinctStream<T> extends _ForwardingStream<T, T> {
|
| // Stream transformations and event transformations.
|
|
|
| typedef void _TransformDataHandler<S, T>(S data, EventSink<T> sink);
|
| -typedef void _TransformErrorHandler<T>(data, EventSink<T> sink);
|
| +typedef void _TransformErrorHandler<T>(Object error, EventSink<T> sink);
|
| typedef void _TransformDoneHandler<T>(EventSink<T> sink);
|
|
|
| /** Default data handler forwards all data. */
|
|
|