Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(107)

Unified Diff: sdk/lib/async/stream_pipe.dart

Issue 25094002: Adapt streams for additional stackTrace argument. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Address comments. Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: sdk/lib/async/stream_pipe.dart
diff --git a/sdk/lib/async/stream_pipe.dart b/sdk/lib/async/stream_pipe.dart
index b126747d848bcaa84ebe0315132ecc37ff042330..3494de541e476f479cbdeec0d65e17f7d762fb93 100644
--- a/sdk/lib/async/stream_pipe.dart
+++ b/sdk/lib/async/stream_pipe.dart
@@ -16,19 +16,21 @@ _asyncError(Object error, Object stackTrace) {
}
/** Runs user code and takes actions depending on success or failure. */
-_runUserCode(userCode(), onSuccess(value), onError(error)) {
+_runUserCode(userCode(),
+ onSuccess(value),
+ onError(error, StackTrace stackTrace)) {
try {
onSuccess(userCode());
} catch (e, s) {
- onError(_asyncError(e, s));
+ onError(_asyncError(e, s), s);
}
}
/** Helper function to make an onError argument to [_runUserCode]. */
_cancelAndError(StreamSubscription subscription, _Future future) =>
- (error) {
+ (error, StackTrace stackTrace) {
subscription.cancel();
- future._completeError(error);
+ future._completeError(error, stackTrace);
};
@@ -49,7 +51,7 @@ abstract class _ForwardingStream<S, T> extends Stream<T> {
bool get isBroadcast => _source.isBroadcast;
StreamSubscription<T> listen(void onData(T value),
- { void onError(error),
+ { Function onError,
void onDone(),
bool cancelOnError }) {
if (onData == null) onData = _nullDataHandler;
@@ -60,7 +62,7 @@ abstract class _ForwardingStream<S, T> extends Stream<T> {
}
StreamSubscription<T> _createSubscription(void onData(T value),
- void onError(error),
+ Function onError,
void onDone(),
bool cancelOnError) {
return new _ForwardingStreamSubscription<S, T>(
@@ -74,8 +76,8 @@ abstract class _ForwardingStream<S, T> extends Stream<T> {
sink._add(outputData);
}
- void _handleError(error, _EventSink<T> sink) {
- sink._addError(error);
+ void _handleError(error, StackTrace stackTrace, _EventSink<T> sink) {
+ sink._addError(error, stackTrace);
}
void _handleDone(_EventSink<T> sink) {
@@ -94,7 +96,7 @@ class _ForwardingStreamSubscription<S, T>
_ForwardingStreamSubscription(this._stream,
void onData(T data),
- void onError(error),
+ Function onError,
void onDone(),
bool cancelOnError)
: super(onData, onError, onDone, cancelOnError) {
@@ -113,9 +115,9 @@ class _ForwardingStreamSubscription<S, T>
super._add(data);
}
- void _addError(Object error) {
+ void _addError(Object error, StackTrace stackTrace) {
if (_isClosed) return;
- super._addError(error);
+ super._addError(error, stackTrace);
}
// StreamSubscription callbacks.
@@ -144,8 +146,8 @@ class _ForwardingStreamSubscription<S, T>
_stream._handleData(data, this);
}
- void _handleError(error) {
- _stream._handleError(error, this);
+ void _handleError(error, StackTrace stackTrace) {
+ _stream._handleError(error, stackTrace, this);
}
void _handleDone() {
@@ -170,7 +172,7 @@ class _WhereStream<T> extends _ForwardingStream<T, T> {
try {
satisfies = _test(inputEvent);
} catch (e, s) {
- sink._addError(_asyncError(e, s));
+ sink._addError(_asyncError(e, s), s);
return;
}
if (satisfies) {
@@ -196,7 +198,7 @@ class _MapStream<S, T> extends _ForwardingStream<S, T> {
try {
outputEvent = _transform(inputEvent);
} catch (e, s) {
- sink._addError(_asyncError(e, s));
+ sink._addError(_asyncError(e, s), s);
return;
}
sink._add(outputEvent);
@@ -220,13 +222,12 @@ class _ExpandStream<S, T> extends _ForwardingStream<S, T> {
} catch (e, s) {
// If either _expand or iterating the generated iterator throws,
// we abort the iteration.
- sink._addError(_asyncError(e, s));
+ sink._addError(_asyncError(e, s), s);
}
}
}
-typedef void _ErrorTransformation(error);
typedef bool _ErrorTest(error);
/**
@@ -234,33 +235,37 @@ typedef bool _ErrorTest(error);
* before passing them on.
*/
class _HandleErrorStream<T> extends _ForwardingStream<T, T> {
- final _ErrorTransformation _transform;
+ final Function _transform;
final _ErrorTest _test;
_HandleErrorStream(Stream<T> source,
- void transform(event),
- bool test(error))
- : this._transform = transform, this._test = test, super(source);
+ Function onError,
+ bool test(error))
+ : this._transform = onError, this._test = test, super(source);
- void _handleError(Object error, _EventSink<T> sink) {
+ void _handleError(Object error, StackTrace stackTrace, _EventSink<T> sink) {
bool matches = true;
if (_test != null) {
try {
matches = _test(error);
} catch (e, s) {
- sink._addError(_asyncError(e, s));
+ sink._addError(_asyncError(e, s), s);
return;
}
}
if (matches) {
try {
- _transform(error);
+ _invokeErrorHandler(_transform, error, stackTrace);
} catch (e, s) {
- sink._addError(_asyncError(e, s));
+ if (identical(e, error)) {
+ sink._addError(error, stackTrace);
+ } else {
+ sink._addError(_asyncError(e, s), s);
+ }
return;
}
} else {
- sink._addError(error);
+ sink._addError(error, stackTrace);
}
}
}
@@ -301,7 +306,7 @@ class _TakeWhileStream<T> extends _ForwardingStream<T, T> {
try {
satisfies = _test(inputEvent);
} catch (e, s) {
- sink._addError(_asyncError(e, s));
+ sink._addError(_asyncError(e, s), s);
// The test didn't say true. Didn't say false either, but we stop anyway.
sink._close();
return;
@@ -349,7 +354,7 @@ class _SkipWhileStream<T> extends _ForwardingStream<T, T> {
try {
satisfies = _test(inputEvent);
} catch (e, s) {
- sink._addError(_asyncError(e, s));
+ sink._addError(_asyncError(e, s), s);
// A failure to return a boolean is considered "not matching".
_hasFailed = true;
return;
@@ -385,7 +390,7 @@ class _DistinctStream<T> extends _ForwardingStream<T, T> {
isEqual = _equals(_previous, inputEvent);
}
} catch (e, s) {
- sink._addError(_asyncError(e, s));
+ sink._addError(_asyncError(e, s), s);
return null;
}
if (!isEqual) {
@@ -399,7 +404,7 @@ class _DistinctStream<T> extends _ForwardingStream<T, T> {
// Stream transformations and event transformations.
typedef void _TransformDataHandler<S, T>(S data, EventSink<T> sink);
-typedef void _TransformErrorHandler<T>(data, EventSink<T> sink);
+typedef void _TransformErrorHandler<T>(Object error, EventSink<T> sink);
typedef void _TransformDoneHandler<T>(EventSink<T> sink);
/** Default data handler forwards all data. */

Powered by Google App Engine
This is Rietveld 408576698