Index: sdk/lib/async/stream_pipe.dart |
diff --git a/sdk/lib/async/stream_pipe.dart b/sdk/lib/async/stream_pipe.dart |
index b126747d848bcaa84ebe0315132ecc37ff042330..5d75136a6d15d0221e4a5944fb97ed3360fb6d81 100644 |
--- a/sdk/lib/async/stream_pipe.dart |
+++ b/sdk/lib/async/stream_pipe.dart |
@@ -16,19 +16,21 @@ _asyncError(Object error, Object stackTrace) { |
} |
/** Runs user code and takes actions depending on success or failure. */ |
-_runUserCode(userCode(), onSuccess(value), onError(error)) { |
+_runUserCode(userCode(), |
+ onSuccess(value), |
+ onError(error, StackTrace stackTrace)) { |
try { |
onSuccess(userCode()); |
} catch (e, s) { |
- onError(_asyncError(e, s)); |
+ onError(_asyncError(e, s), s); |
} |
} |
/** Helper function to make an onError argument to [_runUserCode]. */ |
_cancelAndError(StreamSubscription subscription, _Future future) => |
- (error) { |
+ (error, StackTrace stackTrace) { |
subscription.cancel(); |
- future._completeError(error); |
+ future._completeError(error, stackTrace); |
}; |
@@ -49,7 +51,7 @@ abstract class _ForwardingStream<S, T> extends Stream<T> { |
bool get isBroadcast => _source.isBroadcast; |
StreamSubscription<T> listen(void onData(T value), |
- { void onError(error), |
+ { Function onError, |
void onDone(), |
bool cancelOnError }) { |
if (onData == null) onData = _nullDataHandler; |
@@ -60,7 +62,7 @@ abstract class _ForwardingStream<S, T> extends Stream<T> { |
} |
StreamSubscription<T> _createSubscription(void onData(T value), |
- void onError(error), |
+ Function onError, |
void onDone(), |
bool cancelOnError) { |
return new _ForwardingStreamSubscription<S, T>( |
@@ -74,8 +76,8 @@ abstract class _ForwardingStream<S, T> extends Stream<T> { |
sink._add(outputData); |
} |
- void _handleError(error, _EventSink<T> sink) { |
- sink._addError(error); |
+ void _handleError(error, StackTrace stackTrace, _EventSink<T> sink) { |
+ sink._addError(error, stackTrace); |
} |
void _handleDone(_EventSink<T> sink) { |
@@ -94,7 +96,7 @@ class _ForwardingStreamSubscription<S, T> |
_ForwardingStreamSubscription(this._stream, |
void onData(T data), |
- void onError(error), |
+ Function onError, |
void onDone(), |
bool cancelOnError) |
: super(onData, onError, onDone, cancelOnError) { |
@@ -113,9 +115,9 @@ class _ForwardingStreamSubscription<S, T> |
super._add(data); |
} |
- void _addError(Object error) { |
+ void _addError(Object error, StackTrace stackTrace) { |
if (_isClosed) return; |
- super._addError(error); |
+ super._addError(error, stackTrace); |
} |
// StreamSubscription callbacks. |
@@ -144,8 +146,8 @@ class _ForwardingStreamSubscription<S, T> |
_stream._handleData(data, this); |
} |
- void _handleError(error) { |
- _stream._handleError(error, this); |
+ void _handleError(error, StackTrace stackTrace) { |
+ _stream._handleError(error, stackTrace, this); |
} |
void _handleDone() { |
@@ -170,7 +172,7 @@ class _WhereStream<T> extends _ForwardingStream<T, T> { |
try { |
satisfies = _test(inputEvent); |
} catch (e, s) { |
- sink._addError(_asyncError(e, s)); |
+ sink._addError(_asyncError(e, s), s); |
return; |
} |
if (satisfies) { |
@@ -196,7 +198,7 @@ class _MapStream<S, T> extends _ForwardingStream<S, T> { |
try { |
outputEvent = _transform(inputEvent); |
} catch (e, s) { |
- sink._addError(_asyncError(e, s)); |
+ sink._addError(_asyncError(e, s), s); |
return; |
} |
sink._add(outputEvent); |
@@ -220,13 +222,12 @@ class _ExpandStream<S, T> extends _ForwardingStream<S, T> { |
} catch (e, s) { |
// If either _expand or iterating the generated iterator throws, |
// we abort the iteration. |
- sink._addError(_asyncError(e, s)); |
+ sink._addError(_asyncError(e, s), s); |
} |
} |
} |
-typedef void _ErrorTransformation(error); |
typedef bool _ErrorTest(error); |
/** |
@@ -234,33 +235,41 @@ typedef bool _ErrorTest(error); |
* before passing them on. |
*/ |
class _HandleErrorStream<T> extends _ForwardingStream<T, T> { |
- final _ErrorTransformation _transform; |
+ final Function _transform; |
final _ErrorTest _test; |
_HandleErrorStream(Stream<T> source, |
- void transform(event), |
- bool test(error)) |
- : this._transform = transform, this._test = test, super(source); |
+ Function onError, |
+ bool test(error)) |
+ : this._transform = onError, this._test = test, super(source); |
- void _handleError(Object error, _EventSink<T> sink) { |
+ void _handleError(Object error, StackTrace stackTrace, _EventSink<T> sink) { |
bool matches = true; |
if (_test != null) { |
try { |
matches = _test(error); |
} catch (e, s) { |
- sink._addError(_asyncError(e, s)); |
+ sink._addError(_asyncError(e, s), s); |
return; |
} |
} |
if (matches) { |
try { |
- _transform(error); |
+ if (_transform is ZoneBinaryCallback) { |
+ _transform(error, stackTrace); |
+ } else { |
+ _transform(error); |
+ } |
} catch (e, s) { |
- sink._addError(_asyncError(e, s)); |
+ if (identical(e, error)) { |
+ sink._addError(error, stackTrace); |
+ } else { |
+ sink._addError(_asyncError(e, s), s); |
+ } |
return; |
} |
} else { |
- sink._addError(error); |
+ sink._addError(error, stackTrace); |
} |
} |
} |
@@ -301,7 +310,7 @@ class _TakeWhileStream<T> extends _ForwardingStream<T, T> { |
try { |
satisfies = _test(inputEvent); |
} catch (e, s) { |
- sink._addError(_asyncError(e, s)); |
+ sink._addError(_asyncError(e, s), s); |
// The test didn't say true. Didn't say false either, but we stop anyway. |
sink._close(); |
return; |
@@ -349,7 +358,7 @@ class _SkipWhileStream<T> extends _ForwardingStream<T, T> { |
try { |
satisfies = _test(inputEvent); |
} catch (e, s) { |
- sink._addError(_asyncError(e, s)); |
+ sink._addError(_asyncError(e, s), s); |
// A failure to return a boolean is considered "not matching". |
_hasFailed = true; |
return; |
@@ -385,7 +394,7 @@ class _DistinctStream<T> extends _ForwardingStream<T, T> { |
isEqual = _equals(_previous, inputEvent); |
} |
} catch (e, s) { |
- sink._addError(_asyncError(e, s)); |
+ sink._addError(_asyncError(e, s), s); |
return null; |
} |
if (!isEqual) { |
@@ -399,7 +408,7 @@ class _DistinctStream<T> extends _ForwardingStream<T, T> { |
// Stream transformations and event transformations. |
typedef void _TransformDataHandler<S, T>(S data, EventSink<T> sink); |
-typedef void _TransformErrorHandler<T>(data, EventSink<T> sink); |
+typedef void _TransformErrorHandler<T>(Object error, EventSink<T> sink); |
typedef void _TransformDoneHandler<T>(EventSink<T> sink); |
/** Default data handler forwards all data. */ |