Index: sdk/lib/async/stream_impl.dart |
diff --git a/sdk/lib/async/stream_impl.dart b/sdk/lib/async/stream_impl.dart |
index 69aaf37d652024e7a97eca9c80ac361d12320162..54596a6b0e69d9daf43e9dfde31d535762a4ba0e 100644 |
--- a/sdk/lib/async/stream_impl.dart |
+++ b/sdk/lib/async/stream_impl.dart |
@@ -7,7 +7,7 @@ part of dart.async; |
/** Abstract and private interface for a place to put events. */ |
abstract class _EventSink<T> { |
void _add(T data); |
- void _addError(Object error); |
+ void _addError(Object error, StackTrace stackTrace); |
void _close(); |
} |
@@ -20,7 +20,7 @@ abstract class _EventSink<T> { |
*/ |
abstract class _EventDispatch<T> { |
void _sendData(T data); |
- void _sendError(Object error); |
+ void _sendError(Object error, StackTrace stackTrace); |
void _sendDone(); |
} |
@@ -78,7 +78,7 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
/* Event handlers provided in constructor. */ |
_DataHandler<T> _onData; |
- _ErrorHandler _onError; |
+ Function _onError; |
_DoneHandler _onDone; |
final Zone _zone = Zone.current; |
@@ -93,11 +93,11 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
_PendingEvents _pending; |
_BufferingStreamSubscription(void onData(T data), |
- void onError(error), |
+ Function onError, |
void onDone(), |
bool cancelOnError) |
: _onData = Zone.current.registerUnaryCallback(onData), |
- _onError = Zone.current.registerUnaryCallback(onError), |
+ _onError = _registerErrorCallback(onError), |
_onDone = Zone.current.registerCallback(onDone), |
_state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0) { |
assert(_onData != null); |
@@ -105,6 +105,14 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
assert(_onDone != null); |
} |
+ static _registerErrorCallback(Function errorCallback) { |
+ if (errorCallback is ZoneBinaryCallback) { |
+ return Zone.current.registerBinaryCallback(errorCallback); |
+ } else { |
+ return Zone.current.registerUnaryCallback(errorCallback); |
+ } |
+ } |
+ |
/** |
* Sets the subscription's pending events object. |
* |
@@ -138,17 +146,17 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
void onData(void handleData(T event)) { |
if (handleData == null) handleData = _nullDataHandler; |
- _onData = handleData; |
+ _onData = Zone.current.registerUnaryCallback(handleData); |
} |
- void onError(void handleError(error)) { |
+ void onError(Function handleError) { |
if (handleError == null) handleError = _nullErrorHandler; |
- _onError = handleError; |
+ _onError = _registerErrorCallback(handleError); |
} |
void onDone(void handleDone()) { |
if (handleDone == null) handleDone = _nullDoneHandler; |
- _onDone = handleDone; |
+ _onDone = Zone.current.registerCallback(handleDone); |
} |
void pause([Future resumeSignal]) { |
@@ -196,9 +204,9 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
// Overwrite the onDone and onError handlers. |
_onDone = () { result._complete(futureValue); }; |
- _onError = (error) { |
+ _onError = (error, stackTrace) { |
cancel(); |
- result._completeError(error); |
+ result._completeError(error, stackTrace); |
}; |
return result; |
@@ -259,12 +267,12 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
} |
} |
- void _addError(Object error) { |
+ void _addError(Object error, StackTrace stackTrace) { |
if (_isCanceled) return; |
if (_canFire) { |
- _sendError(error); // Reports cancel after sending. |
+ _sendError(error, stackTrace); // Reports cancel after sending. |
} else { |
- _addPending(new _DelayedError(error)); |
+ _addPending(new _DelayedError(error, stackTrace)); |
} |
} |
@@ -328,7 +336,7 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
_checkState(wasInputPaused); |
} |
- void _sendError(var error) { |
+ void _sendError(var error, StackTrace stackTrace) { |
assert(!_isCanceled); |
assert(!_isPaused); |
assert(!_inCallback); |
@@ -336,7 +344,9 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
_state |= _STATE_IN_CALLBACK; |
if (!_zone.inSameErrorZone(Zone.current)) { |
// Errors are not allowed to traverse zone boundaries. |
- Zone.current.handleUncaughtError(error, getAttachedStackTrace(error)); |
+ Zone.current.handleUncaughtError(error, stackTrace); |
+ } else if (_onError is ZoneBinaryCallback) { |
+ _zone.runBinaryGuarded(_onError, error, stackTrace); |
} else { |
_zone.runUnaryGuarded(_onError, error); |
} |
@@ -424,7 +434,7 @@ abstract class _StreamImpl<T> extends Stream<T> { |
// Stream interface. |
StreamSubscription<T> listen(void onData(T data), |
- { void onError(error), |
+ { Function onError, |
void onDone(), |
bool cancelOnError }) { |
if (onData == null) onData = _nullDataHandler; |
@@ -466,7 +476,7 @@ class _GeneratedStreamImpl<T> extends _StreamImpl<T> { |
_GeneratedStreamImpl(this._pending); |
StreamSubscription _createSubscription(void onData(T data), |
- void onError(Object error), |
+ Function onError, |
void onDone(), |
bool cancelOnError) { |
_BufferingStreamSubscription<T> subscription = |
@@ -502,7 +512,7 @@ class _IterablePendingEvents<T> extends _PendingEvents { |
isDone = !_iterator.moveNext(); |
} catch (e, s) { |
_iterator = null; |
- dispatch._sendError(_asyncError(e, s)); |
+ dispatch._sendError(_asyncError(e, s), s); |
return; |
} |
if (!isDone) { |
@@ -524,7 +534,6 @@ class _IterablePendingEvents<T> extends _PendingEvents { |
// Types of the different handlers on a stream. Types used to type fields. |
typedef void _DataHandler<T>(T value); |
-typedef void _ErrorHandler(error); |
typedef void _DoneHandler(); |
@@ -532,8 +541,8 @@ typedef void _DoneHandler(); |
void _nullDataHandler(var value) {} |
/** Default error handler, reports the error to the current zone's handler. */ |
-void _nullErrorHandler(error) { |
- Zone.current.handleUncaughtError(error, getAttachedStackTrace(error)); |
+void _nullErrorHandler(error, [StackTrace stackTrace]) { |
+ Zone.current.handleUncaughtError(error, stackTrace); |
} |
/** Default done handler, does nothing. */ |
@@ -560,9 +569,11 @@ class _DelayedData<T> extends _DelayedEvent { |
/** A delayed error event. */ |
class _DelayedError extends _DelayedEvent { |
final error; |
- _DelayedError(this.error); |
+ final StackTrace stackTrace; |
+ |
+ _DelayedError(this.error, this.stackTrace); |
void perform(_EventDispatch dispatch) { |
- dispatch._sendError(error); |
+ dispatch._sendError(error, stackTrace); |
} |
} |
@@ -704,7 +715,7 @@ class _DummyStreamSubscription<T> implements StreamSubscription<T> { |
int _pauseCounter = 0; |
void onData(void handleData(T data)) {} |
- void onError(void handleError(Object data)) {} |
+ void onError(Function handleError) {} |
void onDone(void handleDone()) {} |
void pause([Future resumeSignal]) { |
@@ -741,7 +752,7 @@ class _AsBroadcastStream<T> extends Stream<T> { |
bool get isBroadcast => true; |
StreamSubscription<T> listen(void onData(T data), |
- { void onError(Object error), |
+ { Function onError, |
void onDone(), |
bool cancelOnError}) { |
if (_controller == null) { |
@@ -976,12 +987,12 @@ class _StreamIteratorImpl<T> implements StreamIterator<T> { |
_state = _STATE_EXTRA_DATA; |
} |
- void _onError(Object error) { |
+ void _onError(Object error, [StackTrace stackTrace]) { |
if (_state == _STATE_MOVING) { |
_Future<bool> hasNext = _futureOrPrefetch; |
// We have cancelOnError: true, so the subscription is canceled. |
_clear(); |
- hasNext._completeError(error); |
+ hasNext._completeError(error, stackTrace); |
return; |
} |
_subscription.pause(); |