Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(270)

Unified Diff: sdk/lib/async/stream_impl.dart

Issue 25094002: Adapt streams for additional stackTrace argument. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Address comments. Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: sdk/lib/async/stream_impl.dart
diff --git a/sdk/lib/async/stream_impl.dart b/sdk/lib/async/stream_impl.dart
index 69aaf37d652024e7a97eca9c80ac361d12320162..80a3d3941e02c9baeaac7d4548bd362ae40c51f1 100644
--- a/sdk/lib/async/stream_impl.dart
+++ b/sdk/lib/async/stream_impl.dart
@@ -7,7 +7,7 @@ part of dart.async;
/** Abstract and private interface for a place to put events. */
abstract class _EventSink<T> {
void _add(T data);
- void _addError(Object error);
+ void _addError(Object error, StackTrace stackTrace);
void _close();
}
@@ -20,7 +20,7 @@ abstract class _EventSink<T> {
*/
abstract class _EventDispatch<T> {
void _sendData(T data);
- void _sendError(Object error);
+ void _sendError(Object error, StackTrace stackTrace);
void _sendDone();
}
@@ -78,7 +78,7 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
/* Event handlers provided in constructor. */
_DataHandler<T> _onData;
- _ErrorHandler _onError;
+ Function _onError;
_DoneHandler _onDone;
final Zone _zone = Zone.current;
@@ -93,11 +93,11 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
_PendingEvents _pending;
_BufferingStreamSubscription(void onData(T data),
- void onError(error),
+ Function onError,
void onDone(),
bool cancelOnError)
: _onData = Zone.current.registerUnaryCallback(onData),
- _onError = Zone.current.registerUnaryCallback(onError),
+ _onError = _registerErrorHandler(onError, Zone.current),
_onDone = Zone.current.registerCallback(onDone),
_state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0) {
assert(_onData != null);
@@ -138,17 +138,17 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
void onData(void handleData(T event)) {
if (handleData == null) handleData = _nullDataHandler;
- _onData = handleData;
+ _onData = Zone.current.registerUnaryCallback(handleData);
}
- void onError(void handleError(error)) {
+ void onError(Function handleError) {
if (handleError == null) handleError = _nullErrorHandler;
- _onError = handleError;
+ _onError = _registerErrorHandler(handleError, Zone.current);
}
void onDone(void handleDone()) {
if (handleDone == null) handleDone = _nullDoneHandler;
- _onDone = handleDone;
+ _onDone = Zone.current.registerCallback(handleDone);
}
void pause([Future resumeSignal]) {
@@ -196,9 +196,9 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
// Overwrite the onDone and onError handlers.
_onDone = () { result._complete(futureValue); };
- _onError = (error) {
+ _onError = (error, stackTrace) {
cancel();
- result._completeError(error);
+ result._completeError(error, stackTrace);
};
return result;
@@ -259,12 +259,12 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
}
}
- void _addError(Object error) {
+ void _addError(Object error, StackTrace stackTrace) {
if (_isCanceled) return;
if (_canFire) {
- _sendError(error); // Reports cancel after sending.
+ _sendError(error, stackTrace); // Reports cancel after sending.
} else {
- _addPending(new _DelayedError(error));
+ _addPending(new _DelayedError(error, stackTrace));
}
}
@@ -328,7 +328,7 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
_checkState(wasInputPaused);
}
- void _sendError(var error) {
+ void _sendError(var error, StackTrace stackTrace) {
assert(!_isCanceled);
assert(!_isPaused);
assert(!_inCallback);
@@ -336,7 +336,9 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
_state |= _STATE_IN_CALLBACK;
if (!_zone.inSameErrorZone(Zone.current)) {
// Errors are not allowed to traverse zone boundaries.
- Zone.current.handleUncaughtError(error, getAttachedStackTrace(error));
+ Zone.current.handleUncaughtError(error, stackTrace);
+ } else if (_onError is ZoneBinaryCallback) {
+ _zone.runBinaryGuarded(_onError, error, stackTrace);
} else {
_zone.runUnaryGuarded(_onError, error);
}
@@ -424,7 +426,7 @@ abstract class _StreamImpl<T> extends Stream<T> {
// Stream interface.
StreamSubscription<T> listen(void onData(T data),
- { void onError(error),
+ { Function onError,
void onDone(),
bool cancelOnError }) {
if (onData == null) onData = _nullDataHandler;
@@ -466,7 +468,7 @@ class _GeneratedStreamImpl<T> extends _StreamImpl<T> {
_GeneratedStreamImpl(this._pending);
StreamSubscription _createSubscription(void onData(T data),
- void onError(Object error),
+ Function onError,
void onDone(),
bool cancelOnError) {
_BufferingStreamSubscription<T> subscription =
@@ -502,7 +504,7 @@ class _IterablePendingEvents<T> extends _PendingEvents {
isDone = !_iterator.moveNext();
} catch (e, s) {
_iterator = null;
- dispatch._sendError(_asyncError(e, s));
+ dispatch._sendError(_asyncError(e, s), s);
return;
}
if (!isDone) {
@@ -524,7 +526,6 @@ class _IterablePendingEvents<T> extends _PendingEvents {
// Types of the different handlers on a stream. Types used to type fields.
typedef void _DataHandler<T>(T value);
-typedef void _ErrorHandler(error);
typedef void _DoneHandler();
@@ -532,8 +533,8 @@ typedef void _DoneHandler();
void _nullDataHandler(var value) {}
/** Default error handler, reports the error to the current zone's handler. */
-void _nullErrorHandler(error) {
- Zone.current.handleUncaughtError(error, getAttachedStackTrace(error));
+void _nullErrorHandler(error, [StackTrace stackTrace]) {
+ Zone.current.handleUncaughtError(error, stackTrace);
}
/** Default done handler, does nothing. */
@@ -560,9 +561,11 @@ class _DelayedData<T> extends _DelayedEvent {
/** A delayed error event. */
class _DelayedError extends _DelayedEvent {
final error;
- _DelayedError(this.error);
+ final StackTrace stackTrace;
+
+ _DelayedError(this.error, this.stackTrace);
void perform(_EventDispatch dispatch) {
- dispatch._sendError(error);
+ dispatch._sendError(error, stackTrace);
}
}
@@ -704,7 +707,7 @@ class _DummyStreamSubscription<T> implements StreamSubscription<T> {
int _pauseCounter = 0;
void onData(void handleData(T data)) {}
- void onError(void handleError(Object data)) {}
+ void onError(Function handleError) {}
void onDone(void handleDone()) {}
void pause([Future resumeSignal]) {
@@ -741,7 +744,7 @@ class _AsBroadcastStream<T> extends Stream<T> {
bool get isBroadcast => true;
StreamSubscription<T> listen(void onData(T data),
- { void onError(Object error),
+ { Function onError,
void onDone(),
bool cancelOnError}) {
if (_controller == null) {
@@ -976,12 +979,12 @@ class _StreamIteratorImpl<T> implements StreamIterator<T> {
_state = _STATE_EXTRA_DATA;
}
- void _onError(Object error) {
+ void _onError(Object error, [StackTrace stackTrace]) {
if (_state == _STATE_MOVING) {
_Future<bool> hasNext = _futureOrPrefetch;
// We have cancelOnError: true, so the subscription is canceled.
_clear();
- hasNext._completeError(error);
+ hasNext._completeError(error, stackTrace);
return;
}
_subscription.pause();

Powered by Google App Engine
This is Rietveld 408576698