| Index: sdk/lib/async/stream_impl.dart
|
| diff --git a/sdk/lib/async/stream_impl.dart b/sdk/lib/async/stream_impl.dart
|
| index e827adf2e8b6557c45c28b001c4480cdd2370783..8fa1848d14ac0261b2943dfc41c4ba314d457162 100644
|
| --- a/sdk/lib/async/stream_impl.dart
|
| +++ b/sdk/lib/async/stream_impl.dart
|
| @@ -55,6 +55,20 @@ const int _LISTENER_EVENT_ID_SHIFT = 2;
|
| /// state, shifted by this amount.
|
| const int _LISTENER_PAUSE_COUNT_SHIFT = 3;
|
|
|
| +/** Throws the given error in the next cycle. */
|
| +_throwDelayed(var error, [Object stackTrace]) {
|
| + // We are going to reach the top-level here, but there might be a global
|
| + // exception handler. This means that we shouldn't print the stack trace.
|
| + // TODO(floitsch): Find better solution that doesn't print the stack trace
|
| + // if there is a global exception handler.
|
| + runAsync(() {
|
| + if (stackTrace != null) print(stackTrace);
|
| + var trace = getAttachedStackTrace(error);
|
| + if (trace != null && trace != stackTrace) print(trace);
|
| + throw error;
|
| + });
|
| +}
|
| +
|
|
|
| // -------------------------------------------------------------------
|
| // Common base class for single and multi-subscription streams.
|
| @@ -77,7 +91,7 @@ abstract class _StreamImpl<T> extends Stream<T> {
|
| // Stream interface.
|
|
|
| StreamSubscription<T> listen(void onData(T data),
|
| - { void onError(AsyncError error),
|
| + { void onError(error),
|
| void onDone(),
|
| bool cancelOnError }) {
|
| if (_isComplete) {
|
| @@ -123,7 +137,7 @@ abstract class _StreamImpl<T> extends Stream<T> {
|
| * If a subscription has requested to be unsubscribed on errors,
|
| * it will be unsubscribed after receiving this event.
|
| */
|
| - void _addError(AsyncError error) {
|
| + void _addError(error) {
|
| if (_isClosed) throw new StateError("Sending on closed stream");
|
| if (!_mayFireState) {
|
| // Not the time to send events.
|
| @@ -374,7 +388,7 @@ abstract class _StreamImpl<T> extends Stream<T> {
|
| /** Create a subscription object. Called by [subcribe]. */
|
| _StreamSubscriptionImpl<T> _createSubscription(
|
| void onData(T data),
|
| - void onError(AsyncError error),
|
| + void onError(error),
|
| void onDone(),
|
| bool cancelOnError);
|
|
|
| @@ -493,10 +507,8 @@ abstract class _StreamImpl<T> extends Stream<T> {
|
| _forEachSubscriber((subscriber) {
|
| try {
|
| subscriber._sendData(value);
|
| - } on AsyncError catch (e) {
|
| - e.throwDelayed();
|
| } catch (e, s) {
|
| - new AsyncError(e, s).throwDelayed();
|
| + _throwDelayed(e, s);
|
| }
|
| });
|
| }
|
| @@ -504,17 +516,15 @@ abstract class _StreamImpl<T> extends Stream<T> {
|
| /**
|
| * Sends an error event directly to each subscriber.
|
| */
|
| - void _sendError(AsyncError error) {
|
| + void _sendError(error) {
|
| assert(!_isPaused);
|
| assert(!_isComplete);
|
| if (!_hasListener) return;
|
| _forEachSubscriber((subscriber) {
|
| try {
|
| subscriber._sendError(error);
|
| - } on AsyncError catch (e) {
|
| - e.throwDelayed();
|
| } catch (e, s) {
|
| - new AsyncError.withCause(e, s, error).throwDelayed();
|
| + _throwDelayed(e, s);
|
| }
|
| });
|
| }
|
| @@ -533,10 +543,8 @@ abstract class _StreamImpl<T> extends Stream<T> {
|
| _cancel(subscriber);
|
| try {
|
| subscriber._sendDone();
|
| - } on AsyncError catch (e) {
|
| - e.throwDelayed();
|
| } catch (e, s) {
|
| - new AsyncError(e, s).throwDelayed();
|
| + _throwDelayed(e, s);
|
| }
|
| });
|
| assert(!_hasListener);
|
| @@ -592,7 +600,7 @@ class _SingleStreamImpl<T> extends _StreamImpl<T> {
|
| */
|
| _StreamSubscriptionImpl<T> _createSubscription(
|
| void onData(T data),
|
| - void onError(AsyncError error),
|
| + void onError(error),
|
| void onDone(),
|
| bool cancelOnError) {
|
| return new _StreamSubscriptionImpl<T>(
|
| @@ -712,7 +720,7 @@ class _MultiStreamImpl<T> extends _StreamImpl<T>
|
| */
|
| _StreamListener<T> _createSubscription(
|
| void onData(T data),
|
| - void onError(AsyncError error),
|
| + void onError(error),
|
| void onDone(),
|
| bool cancelOnError) {
|
| return new _StreamSubscriptionImpl<T>(
|
| @@ -841,7 +849,7 @@ class _GeneratedSingleStreamImpl<T> extends _SingleStreamImpl<T> {
|
| throw new UnsupportedError("Cannot inject events into generated stream");
|
| }
|
|
|
| - void _addError(AsyncError value) {
|
| + void _addError(value) {
|
| throw new UnsupportedError("Cannot inject events into generated stream");
|
| }
|
|
|
| @@ -876,7 +884,7 @@ class _IterablePendingEvents<T> extends _PendingEvents {
|
| stream._sendDone();
|
| }
|
| } catch (e, s) {
|
| - stream._sendError(new AsyncError(e, s));
|
| + stream._sendError(_asyncError(e, s));
|
| stream._sendDone();
|
| _isDone = true;
|
| }
|
| @@ -918,7 +926,7 @@ class _StreamSubscriptionImpl<T> extends _StreamListener<T>
|
| _onData = handleData;
|
| }
|
|
|
| - void onError(void handleError(AsyncError error)) {
|
| + void onError(void handleError(error)) {
|
| if (handleError == null) handleError = _nullErrorHandler;
|
| _onError = handleError;
|
| }
|
| @@ -932,7 +940,7 @@ class _StreamSubscriptionImpl<T> extends _StreamListener<T>
|
| _onData(data);
|
| }
|
|
|
| - void _sendError(AsyncError error) {
|
| + void _sendError(error) {
|
| _onError(error);
|
| if (_cancelOnError) _source._cancel(this);
|
| }
|
| @@ -961,7 +969,7 @@ class _StreamSubscriptionImpl<T> extends _StreamListener<T>
|
|
|
| // Overwrite the onDone and onError handlers.
|
| onDone(() { result._setValue(futureValue); });
|
| - onError((AsyncError error) {
|
| + onError((error) {
|
| cancel();
|
| result._setError(error);
|
| });
|
| @@ -974,7 +982,7 @@ class _StreamSubscriptionImpl<T> extends _StreamListener<T>
|
|
|
| // Types of the different handlers on a stream. Types used to type fields.
|
| typedef void _DataHandler<T>(T value);
|
| -typedef void _ErrorHandler(AsyncError error);
|
| +typedef void _ErrorHandler(error);
|
| typedef void _DoneHandler();
|
|
|
|
|
| @@ -982,8 +990,8 @@ typedef void _DoneHandler();
|
| void _nullDataHandler(var value) {}
|
|
|
| /** Default error handler, reports the error to the global handler. */
|
| -void _nullErrorHandler(AsyncError error) {
|
| - error.throwDelayed();
|
| +void _nullErrorHandler(error) {
|
| + _throwDelayed(error);
|
| }
|
|
|
| /** Default done handler, does nothing. */
|
| @@ -1000,7 +1008,7 @@ abstract class _DelayedEvent {
|
|
|
| /** A delayed data event. */
|
| class _DelayedData<T> extends _DelayedEvent{
|
| - T value;
|
| + final T value;
|
| _DelayedData(this.value);
|
| void perform(_StreamImpl<T> stream) {
|
| stream._sendData(value);
|
| @@ -1009,7 +1017,7 @@ class _DelayedData<T> extends _DelayedEvent{
|
|
|
| /** A delayed error event. */
|
| class _DelayedError extends _DelayedEvent {
|
| - AsyncError error;
|
| + final error;
|
| _DelayedError(this.error);
|
| void perform(_StreamImpl stream) {
|
| stream._sendError(error);
|
| @@ -1113,7 +1121,7 @@ abstract class _InternalLinkList extends _InternalLink {
|
| /** Abstract type for an internal interface for sending events. */
|
| abstract class _EventOutputSink<T> {
|
| _sendData(T data);
|
| - _sendError(AsyncError error);
|
| + _sendError(error);
|
| _sendDone();
|
| }
|
|
|
| @@ -1188,7 +1196,7 @@ abstract class _StreamListener<T> extends _InternalLink
|
| }
|
|
|
| _sendData(T data);
|
| - _sendError(AsyncError error);
|
| + _sendError(error);
|
| _sendDone();
|
| }
|
|
|
| @@ -1276,7 +1284,7 @@ class _DoneSubscription<T> implements StreamSubscription<T> {
|
|
|
| void onData(void handleAction(T value)) {}
|
|
|
| - void onError(void handleError(AsyncError error)) {}
|
| + void onError(void handleError(error)) {}
|
|
|
| void onDone(void handleDone()) {
|
| _handler = handleDone;
|
| @@ -1318,7 +1326,7 @@ class _DoneSubscription<T> implements StreamSubscription<T> {
|
|
|
| // Overwrite the onDone and onError handlers.
|
| onDone(() { result._setValue(futureValue); });
|
| - onError((AsyncError error) {
|
| + onError((error) {
|
| cancel();
|
| result._setError(error);
|
| });
|
|
|