| Index: sdk/lib/async/stream_impl.dart
|
| diff --git a/sdk/lib/async/stream_impl.dart b/sdk/lib/async/stream_impl.dart
|
| index 69aaf37d652024e7a97eca9c80ac361d12320162..80a3d3941e02c9baeaac7d4548bd362ae40c51f1 100644
|
| --- a/sdk/lib/async/stream_impl.dart
|
| +++ b/sdk/lib/async/stream_impl.dart
|
| @@ -7,7 +7,7 @@ part of dart.async;
|
| /** Abstract and private interface for a place to put events. */
|
| abstract class _EventSink<T> {
|
| void _add(T data);
|
| - void _addError(Object error);
|
| + void _addError(Object error, StackTrace stackTrace);
|
| void _close();
|
| }
|
|
|
| @@ -20,7 +20,7 @@ abstract class _EventSink<T> {
|
| */
|
| abstract class _EventDispatch<T> {
|
| void _sendData(T data);
|
| - void _sendError(Object error);
|
| + void _sendError(Object error, StackTrace stackTrace);
|
| void _sendDone();
|
| }
|
|
|
| @@ -78,7 +78,7 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
|
|
|
| /* Event handlers provided in constructor. */
|
| _DataHandler<T> _onData;
|
| - _ErrorHandler _onError;
|
| + Function _onError;
|
| _DoneHandler _onDone;
|
| final Zone _zone = Zone.current;
|
|
|
| @@ -93,11 +93,11 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
|
| _PendingEvents _pending;
|
|
|
| _BufferingStreamSubscription(void onData(T data),
|
| - void onError(error),
|
| + Function onError,
|
| void onDone(),
|
| bool cancelOnError)
|
| : _onData = Zone.current.registerUnaryCallback(onData),
|
| - _onError = Zone.current.registerUnaryCallback(onError),
|
| + _onError = _registerErrorHandler(onError, Zone.current),
|
| _onDone = Zone.current.registerCallback(onDone),
|
| _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0) {
|
| assert(_onData != null);
|
| @@ -138,17 +138,17 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
|
|
|
| void onData(void handleData(T event)) {
|
| if (handleData == null) handleData = _nullDataHandler;
|
| - _onData = handleData;
|
| + _onData = Zone.current.registerUnaryCallback(handleData);
|
| }
|
|
|
| - void onError(void handleError(error)) {
|
| + void onError(Function handleError) {
|
| if (handleError == null) handleError = _nullErrorHandler;
|
| - _onError = handleError;
|
| + _onError = _registerErrorHandler(handleError, Zone.current);
|
| }
|
|
|
| void onDone(void handleDone()) {
|
| if (handleDone == null) handleDone = _nullDoneHandler;
|
| - _onDone = handleDone;
|
| + _onDone = Zone.current.registerCallback(handleDone);
|
| }
|
|
|
| void pause([Future resumeSignal]) {
|
| @@ -196,9 +196,9 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
|
|
|
| // Overwrite the onDone and onError handlers.
|
| _onDone = () { result._complete(futureValue); };
|
| - _onError = (error) {
|
| + _onError = (error, stackTrace) {
|
| cancel();
|
| - result._completeError(error);
|
| + result._completeError(error, stackTrace);
|
| };
|
|
|
| return result;
|
| @@ -259,12 +259,12 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
|
| }
|
| }
|
|
|
| - void _addError(Object error) {
|
| + void _addError(Object error, StackTrace stackTrace) {
|
| if (_isCanceled) return;
|
| if (_canFire) {
|
| - _sendError(error); // Reports cancel after sending.
|
| + _sendError(error, stackTrace); // Reports cancel after sending.
|
| } else {
|
| - _addPending(new _DelayedError(error));
|
| + _addPending(new _DelayedError(error, stackTrace));
|
| }
|
| }
|
|
|
| @@ -328,7 +328,7 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
|
| _checkState(wasInputPaused);
|
| }
|
|
|
| - void _sendError(var error) {
|
| + void _sendError(var error, StackTrace stackTrace) {
|
| assert(!_isCanceled);
|
| assert(!_isPaused);
|
| assert(!_inCallback);
|
| @@ -336,7 +336,9 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
|
| _state |= _STATE_IN_CALLBACK;
|
| if (!_zone.inSameErrorZone(Zone.current)) {
|
| // Errors are not allowed to traverse zone boundaries.
|
| - Zone.current.handleUncaughtError(error, getAttachedStackTrace(error));
|
| + Zone.current.handleUncaughtError(error, stackTrace);
|
| + } else if (_onError is ZoneBinaryCallback) {
|
| + _zone.runBinaryGuarded(_onError, error, stackTrace);
|
| } else {
|
| _zone.runUnaryGuarded(_onError, error);
|
| }
|
| @@ -424,7 +426,7 @@ abstract class _StreamImpl<T> extends Stream<T> {
|
| // Stream interface.
|
|
|
| StreamSubscription<T> listen(void onData(T data),
|
| - { void onError(error),
|
| + { Function onError,
|
| void onDone(),
|
| bool cancelOnError }) {
|
| if (onData == null) onData = _nullDataHandler;
|
| @@ -466,7 +468,7 @@ class _GeneratedStreamImpl<T> extends _StreamImpl<T> {
|
| _GeneratedStreamImpl(this._pending);
|
|
|
| StreamSubscription _createSubscription(void onData(T data),
|
| - void onError(Object error),
|
| + Function onError,
|
| void onDone(),
|
| bool cancelOnError) {
|
| _BufferingStreamSubscription<T> subscription =
|
| @@ -502,7 +504,7 @@ class _IterablePendingEvents<T> extends _PendingEvents {
|
| isDone = !_iterator.moveNext();
|
| } catch (e, s) {
|
| _iterator = null;
|
| - dispatch._sendError(_asyncError(e, s));
|
| + dispatch._sendError(_asyncError(e, s), s);
|
| return;
|
| }
|
| if (!isDone) {
|
| @@ -524,7 +526,6 @@ class _IterablePendingEvents<T> extends _PendingEvents {
|
|
|
| // Types of the different handlers on a stream. Types used to type fields.
|
| typedef void _DataHandler<T>(T value);
|
| -typedef void _ErrorHandler(error);
|
| typedef void _DoneHandler();
|
|
|
|
|
| @@ -532,8 +533,8 @@ typedef void _DoneHandler();
|
| void _nullDataHandler(var value) {}
|
|
|
| /** Default error handler, reports the error to the current zone's handler. */
|
| -void _nullErrorHandler(error) {
|
| - Zone.current.handleUncaughtError(error, getAttachedStackTrace(error));
|
| +void _nullErrorHandler(error, [StackTrace stackTrace]) {
|
| + Zone.current.handleUncaughtError(error, stackTrace);
|
| }
|
|
|
| /** Default done handler, does nothing. */
|
| @@ -560,9 +561,11 @@ class _DelayedData<T> extends _DelayedEvent {
|
| /** A delayed error event. */
|
| class _DelayedError extends _DelayedEvent {
|
| final error;
|
| - _DelayedError(this.error);
|
| + final StackTrace stackTrace;
|
| +
|
| + _DelayedError(this.error, this.stackTrace);
|
| void perform(_EventDispatch dispatch) {
|
| - dispatch._sendError(error);
|
| + dispatch._sendError(error, stackTrace);
|
| }
|
| }
|
|
|
| @@ -704,7 +707,7 @@ class _DummyStreamSubscription<T> implements StreamSubscription<T> {
|
| int _pauseCounter = 0;
|
|
|
| void onData(void handleData(T data)) {}
|
| - void onError(void handleError(Object data)) {}
|
| + void onError(Function handleError) {}
|
| void onDone(void handleDone()) {}
|
|
|
| void pause([Future resumeSignal]) {
|
| @@ -741,7 +744,7 @@ class _AsBroadcastStream<T> extends Stream<T> {
|
| bool get isBroadcast => true;
|
|
|
| StreamSubscription<T> listen(void onData(T data),
|
| - { void onError(Object error),
|
| + { Function onError,
|
| void onDone(),
|
| bool cancelOnError}) {
|
| if (_controller == null) {
|
| @@ -976,12 +979,12 @@ class _StreamIteratorImpl<T> implements StreamIterator<T> {
|
| _state = _STATE_EXTRA_DATA;
|
| }
|
|
|
| - void _onError(Object error) {
|
| + void _onError(Object error, [StackTrace stackTrace]) {
|
| if (_state == _STATE_MOVING) {
|
| _Future<bool> hasNext = _futureOrPrefetch;
|
| // We have cancelOnError: true, so the subscription is canceled.
|
| _clear();
|
| - hasNext._completeError(error);
|
| + hasNext._completeError(error, stackTrace);
|
| return;
|
| }
|
| _subscription.pause();
|
|
|