Index: sdk/lib/async/stream_impl.dart |
diff --git a/sdk/lib/async/stream_impl.dart b/sdk/lib/async/stream_impl.dart |
index e827adf2e8b6557c45c28b001c4480cdd2370783..8fa1848d14ac0261b2943dfc41c4ba314d457162 100644 |
--- a/sdk/lib/async/stream_impl.dart |
+++ b/sdk/lib/async/stream_impl.dart |
@@ -55,6 +55,20 @@ const int _LISTENER_EVENT_ID_SHIFT = 2; |
/// state, shifted by this amount. |
const int _LISTENER_PAUSE_COUNT_SHIFT = 3; |
+/** Throws the given error in the next cycle. */ |
+_throwDelayed(var error, [Object stackTrace]) { |
+ // We are going to reach the top-level here, but there might be a global |
+ // exception handler. This means that we shouldn't print the stack trace. |
+ // TODO(floitsch): Find better solution that doesn't print the stack trace |
+ // if there is a global exception handler. |
+ runAsync(() { |
+ if (stackTrace != null) print(stackTrace); |
+ var trace = getAttachedStackTrace(error); |
+ if (trace != null && trace != stackTrace) print(trace); |
+ throw error; |
+ }); |
+} |
+ |
// ------------------------------------------------------------------- |
// Common base class for single and multi-subscription streams. |
@@ -77,7 +91,7 @@ abstract class _StreamImpl<T> extends Stream<T> { |
// Stream interface. |
StreamSubscription<T> listen(void onData(T data), |
- { void onError(AsyncError error), |
+ { void onError(error), |
void onDone(), |
bool cancelOnError }) { |
if (_isComplete) { |
@@ -123,7 +137,7 @@ abstract class _StreamImpl<T> extends Stream<T> { |
* If a subscription has requested to be unsubscribed on errors, |
* it will be unsubscribed after receiving this event. |
*/ |
- void _addError(AsyncError error) { |
+ void _addError(error) { |
if (_isClosed) throw new StateError("Sending on closed stream"); |
if (!_mayFireState) { |
// Not the time to send events. |
@@ -374,7 +388,7 @@ abstract class _StreamImpl<T> extends Stream<T> { |
/** Create a subscription object. Called by [subcribe]. */ |
_StreamSubscriptionImpl<T> _createSubscription( |
void onData(T data), |
- void onError(AsyncError error), |
+ void onError(error), |
void onDone(), |
bool cancelOnError); |
@@ -493,10 +507,8 @@ abstract class _StreamImpl<T> extends Stream<T> { |
_forEachSubscriber((subscriber) { |
try { |
subscriber._sendData(value); |
- } on AsyncError catch (e) { |
- e.throwDelayed(); |
} catch (e, s) { |
- new AsyncError(e, s).throwDelayed(); |
+ _throwDelayed(e, s); |
} |
}); |
} |
@@ -504,17 +516,15 @@ abstract class _StreamImpl<T> extends Stream<T> { |
/** |
* Sends an error event directly to each subscriber. |
*/ |
- void _sendError(AsyncError error) { |
+ void _sendError(error) { |
assert(!_isPaused); |
assert(!_isComplete); |
if (!_hasListener) return; |
_forEachSubscriber((subscriber) { |
try { |
subscriber._sendError(error); |
- } on AsyncError catch (e) { |
- e.throwDelayed(); |
} catch (e, s) { |
- new AsyncError.withCause(e, s, error).throwDelayed(); |
+ _throwDelayed(e, s); |
} |
}); |
} |
@@ -533,10 +543,8 @@ abstract class _StreamImpl<T> extends Stream<T> { |
_cancel(subscriber); |
try { |
subscriber._sendDone(); |
- } on AsyncError catch (e) { |
- e.throwDelayed(); |
} catch (e, s) { |
- new AsyncError(e, s).throwDelayed(); |
+ _throwDelayed(e, s); |
} |
}); |
assert(!_hasListener); |
@@ -592,7 +600,7 @@ class _SingleStreamImpl<T> extends _StreamImpl<T> { |
*/ |
_StreamSubscriptionImpl<T> _createSubscription( |
void onData(T data), |
- void onError(AsyncError error), |
+ void onError(error), |
void onDone(), |
bool cancelOnError) { |
return new _StreamSubscriptionImpl<T>( |
@@ -712,7 +720,7 @@ class _MultiStreamImpl<T> extends _StreamImpl<T> |
*/ |
_StreamListener<T> _createSubscription( |
void onData(T data), |
- void onError(AsyncError error), |
+ void onError(error), |
void onDone(), |
bool cancelOnError) { |
return new _StreamSubscriptionImpl<T>( |
@@ -841,7 +849,7 @@ class _GeneratedSingleStreamImpl<T> extends _SingleStreamImpl<T> { |
throw new UnsupportedError("Cannot inject events into generated stream"); |
} |
- void _addError(AsyncError value) { |
+ void _addError(value) { |
throw new UnsupportedError("Cannot inject events into generated stream"); |
} |
@@ -876,7 +884,7 @@ class _IterablePendingEvents<T> extends _PendingEvents { |
stream._sendDone(); |
} |
} catch (e, s) { |
- stream._sendError(new AsyncError(e, s)); |
+ stream._sendError(_asyncError(e, s)); |
stream._sendDone(); |
_isDone = true; |
} |
@@ -918,7 +926,7 @@ class _StreamSubscriptionImpl<T> extends _StreamListener<T> |
_onData = handleData; |
} |
- void onError(void handleError(AsyncError error)) { |
+ void onError(void handleError(error)) { |
if (handleError == null) handleError = _nullErrorHandler; |
_onError = handleError; |
} |
@@ -932,7 +940,7 @@ class _StreamSubscriptionImpl<T> extends _StreamListener<T> |
_onData(data); |
} |
- void _sendError(AsyncError error) { |
+ void _sendError(error) { |
_onError(error); |
if (_cancelOnError) _source._cancel(this); |
} |
@@ -961,7 +969,7 @@ class _StreamSubscriptionImpl<T> extends _StreamListener<T> |
// Overwrite the onDone and onError handlers. |
onDone(() { result._setValue(futureValue); }); |
- onError((AsyncError error) { |
+ onError((error) { |
cancel(); |
result._setError(error); |
}); |
@@ -974,7 +982,7 @@ class _StreamSubscriptionImpl<T> extends _StreamListener<T> |
// Types of the different handlers on a stream. Types used to type fields. |
typedef void _DataHandler<T>(T value); |
-typedef void _ErrorHandler(AsyncError error); |
+typedef void _ErrorHandler(error); |
typedef void _DoneHandler(); |
@@ -982,8 +990,8 @@ typedef void _DoneHandler(); |
void _nullDataHandler(var value) {} |
/** Default error handler, reports the error to the global handler. */ |
-void _nullErrorHandler(AsyncError error) { |
- error.throwDelayed(); |
+void _nullErrorHandler(error) { |
+ _throwDelayed(error); |
} |
/** Default done handler, does nothing. */ |
@@ -1000,7 +1008,7 @@ abstract class _DelayedEvent { |
/** A delayed data event. */ |
class _DelayedData<T> extends _DelayedEvent{ |
- T value; |
+ final T value; |
_DelayedData(this.value); |
void perform(_StreamImpl<T> stream) { |
stream._sendData(value); |
@@ -1009,7 +1017,7 @@ class _DelayedData<T> extends _DelayedEvent{ |
/** A delayed error event. */ |
class _DelayedError extends _DelayedEvent { |
- AsyncError error; |
+ final error; |
_DelayedError(this.error); |
void perform(_StreamImpl stream) { |
stream._sendError(error); |
@@ -1113,7 +1121,7 @@ abstract class _InternalLinkList extends _InternalLink { |
/** Abstract type for an internal interface for sending events. */ |
abstract class _EventOutputSink<T> { |
_sendData(T data); |
- _sendError(AsyncError error); |
+ _sendError(error); |
_sendDone(); |
} |
@@ -1188,7 +1196,7 @@ abstract class _StreamListener<T> extends _InternalLink |
} |
_sendData(T data); |
- _sendError(AsyncError error); |
+ _sendError(error); |
_sendDone(); |
} |
@@ -1276,7 +1284,7 @@ class _DoneSubscription<T> implements StreamSubscription<T> { |
void onData(void handleAction(T value)) {} |
- void onError(void handleError(AsyncError error)) {} |
+ void onError(void handleError(error)) {} |
void onDone(void handleDone()) { |
_handler = handleDone; |
@@ -1318,7 +1326,7 @@ class _DoneSubscription<T> implements StreamSubscription<T> { |
// Overwrite the onDone and onError handlers. |
onDone(() { result._setValue(futureValue); }); |
- onError((AsyncError error) { |
+ onError((error) { |
cancel(); |
result._setError(error); |
}); |