Chromium Code Reviews| Index: sdk/lib/async/stream_impl.dart | 
| diff --git a/sdk/lib/async/stream_impl.dart b/sdk/lib/async/stream_impl.dart | 
| index fa0edf2fc2857059260ee53c8e9ef9392a500415..04557d7f1441ec459be9238a80839292cb7b1717 100644 | 
| --- a/sdk/lib/async/stream_impl.dart | 
| +++ b/sdk/lib/async/stream_impl.dart | 
| @@ -12,7 +12,7 @@ const int _STREAM_OPEN = 0; | 
| /// The stream has received a request to complete, but hasn't done so yet. | 
| /// No further events can be added to the stream. | 
| const int _STREAM_CLOSED = 1; | 
| -/// The stream has completed and will no longer receive or send events. | 
| +// /// The stream has completed and will no longer receive or send events. | 
| 
 
Anders Johnsen
2013/04/15 16:34:17
Double uncomment.
 
floitsch
2013/04/15 18:54:10
Done.
 
 | 
| /// Also counts as closed. The stream must not be paused when it's completed. | 
| /// Always used in conjunction with [_STREAM_CLOSED]. | 
| const int _STREAM_COMPLETE = 2; | 
| @@ -55,6 +55,20 @@ const int _LISTENER_EVENT_ID_SHIFT = 2; | 
| /// state, shifted by this amount. | 
| const int _LISTENER_PAUSE_COUNT_SHIFT = 3; | 
| +/** Throws the given error in the next cycle. */ | 
| +_throwDelayed(var error, [Object stackTrace]) { | 
| + // We are going to reach the top-level here, but there might be a global | 
| + // exception handler. This means that we shouldn't print the stack trace. | 
| + // TODO(floitsch): Find better solution that doesn't print the stack trace | 
| + // if there is a global exception handler. | 
| + runAsync(() { | 
| + if (stackTrace != null) print(stackTrace); | 
| + var trace = getAttachedStackTrace(error); | 
| + if (trace != null && trace != stackTrace) print(trace); | 
| + throw error; | 
| + }); | 
| +} | 
| + | 
| // ------------------------------------------------------------------- | 
| // Common base class for single and multi-subscription streams. | 
| @@ -77,7 +91,7 @@ abstract class _StreamImpl<T> extends Stream<T> { | 
| // Stream interface. | 
| StreamSubscription<T> listen(void onData(T data), | 
| - { void onError(AsyncError error), | 
| + { void onError(error), | 
| void onDone(), | 
| bool unsubscribeOnError }) { | 
| if (_isComplete) { | 
| @@ -123,7 +137,7 @@ abstract class _StreamImpl<T> extends Stream<T> { | 
| * If a subscription has requested to be unsubscribed on errors, | 
| * it will be unsubscribed after receiving this event. | 
| */ | 
| - void _addError(AsyncError error) { | 
| + void _addError(error) { | 
| if (_isClosed) throw new StateError("Sending on closed stream"); | 
| if (!_mayFireState) { | 
| // Not the time to send events. | 
| @@ -374,7 +388,7 @@ abstract class _StreamImpl<T> extends Stream<T> { | 
| /** Create a subscription object. Called by [subcribe]. */ | 
| _StreamSubscriptionImpl<T> _createSubscription( | 
| void onData(T data), | 
| - void onError(AsyncError error), | 
| + void onError(error), | 
| void onDone(), | 
| bool unsubscribeOnError); | 
| @@ -493,10 +507,8 @@ abstract class _StreamImpl<T> extends Stream<T> { | 
| _forEachSubscriber((subscriber) { | 
| try { | 
| subscriber._sendData(value); | 
| - } on AsyncError catch (e) { | 
| - e.throwDelayed(); | 
| } catch (e, s) { | 
| - new AsyncError(e, s).throwDelayed(); | 
| + _throwDelayed(e, s); | 
| } | 
| }); | 
| } | 
| @@ -504,17 +516,15 @@ abstract class _StreamImpl<T> extends Stream<T> { | 
| /** | 
| * Sends an error event directly to each subscriber. | 
| */ | 
| - void _sendError(AsyncError error) { | 
| + void _sendError(error) { | 
| assert(!_isPaused); | 
| assert(!_isComplete); | 
| if (!_hasListener) return; | 
| _forEachSubscriber((subscriber) { | 
| try { | 
| subscriber._sendError(error); | 
| - } on AsyncError catch (e) { | 
| - e.throwDelayed(); | 
| } catch (e, s) { | 
| - new AsyncError.withCause(e, s, error).throwDelayed(); | 
| + _throwDelayed(e, s); | 
| } | 
| }); | 
| } | 
| @@ -533,10 +543,8 @@ abstract class _StreamImpl<T> extends Stream<T> { | 
| _cancel(subscriber); | 
| try { | 
| subscriber._sendDone(); | 
| - } on AsyncError catch (e) { | 
| - e.throwDelayed(); | 
| } catch (e, s) { | 
| - new AsyncError(e, s).throwDelayed(); | 
| + _throwDelayed(e, s); | 
| } | 
| }); | 
| assert(!_hasListener); | 
| @@ -592,7 +600,7 @@ class _SingleStreamImpl<T> extends _StreamImpl<T> { | 
| */ | 
| _StreamSubscriptionImpl<T> _createSubscription( | 
| void onData(T data), | 
| - void onError(AsyncError error), | 
| + void onError(error), | 
| void onDone(), | 
| bool unsubscribeOnError) { | 
| return new _StreamSubscriptionImpl<T>( | 
| @@ -712,7 +720,7 @@ class _MultiStreamImpl<T> extends _StreamImpl<T> | 
| */ | 
| _StreamListener<T> _createSubscription( | 
| void onData(T data), | 
| - void onError(AsyncError error), | 
| + void onError(error), | 
| void onDone(), | 
| bool unsubscribeOnError) { | 
| return new _StreamSubscriptionImpl<T>( | 
| @@ -841,7 +849,7 @@ class _GeneratedSingleStreamImpl<T> extends _SingleStreamImpl<T> { | 
| throw new UnsupportedError("Cannot inject events into generated stream"); | 
| } | 
| - void _addError(AsyncError value) { | 
| + void _addError(value) { | 
| throw new UnsupportedError("Cannot inject events into generated stream"); | 
| } | 
| @@ -876,7 +884,7 @@ class _IterablePendingEvents<T> extends _PendingEvents { | 
| stream._sendDone(); | 
| } | 
| } catch (e, s) { | 
| - stream._sendError(new AsyncError(e, s)); | 
| + stream._sendError(_asyncError(e, s)); | 
| stream._sendDone(); | 
| _isDone = true; | 
| } | 
| @@ -918,7 +926,7 @@ class _StreamSubscriptionImpl<T> extends _StreamListener<T> | 
| _onData = handleData; | 
| } | 
| - void onError(void handleError(AsyncError error)) { | 
| + void onError(void handleError(error)) { | 
| if (handleError == null) handleError = _nullErrorHandler; | 
| _onError = handleError; | 
| } | 
| @@ -932,7 +940,7 @@ class _StreamSubscriptionImpl<T> extends _StreamListener<T> | 
| _onData(data); | 
| } | 
| - void _sendError(AsyncError error) { | 
| + void _sendError(error) { | 
| _onError(error); | 
| if (_unsubscribeOnError) _source._cancel(this); | 
| } | 
| @@ -961,7 +969,7 @@ class _StreamSubscriptionImpl<T> extends _StreamListener<T> | 
| // Overwrite the onDone and onError handlers. | 
| onDone(() { result._setValue(futureValue); }); | 
| - onError((AsyncError error) { | 
| + onError((error) { | 
| cancel(); | 
| result._setError(error); | 
| }); | 
| @@ -974,7 +982,7 @@ class _StreamSubscriptionImpl<T> extends _StreamListener<T> | 
| // Types of the different handlers on a stream. Types used to type fields. | 
| typedef void _DataHandler<T>(T value); | 
| -typedef void _ErrorHandler(AsyncError error); | 
| +typedef void _ErrorHandler(error); | 
| typedef void _DoneHandler(); | 
| @@ -982,8 +990,8 @@ typedef void _DoneHandler(); | 
| void _nullDataHandler(var value) {} | 
| /** Default error handler, reports the error to the global handler. */ | 
| -void _nullErrorHandler(AsyncError error) { | 
| - error.throwDelayed(); | 
| +void _nullErrorHandler(error) { | 
| + _throwDelayed(error); | 
| } | 
| /** Default done handler, does nothing. */ | 
| @@ -1000,7 +1008,7 @@ abstract class _DelayedEvent { | 
| /** A delayed data event. */ | 
| class _DelayedData<T> extends _DelayedEvent{ | 
| - T value; | 
| + final T value; | 
| _DelayedData(this.value); | 
| void perform(_StreamImpl<T> stream) { | 
| stream._sendData(value); | 
| @@ -1009,7 +1017,7 @@ class _DelayedData<T> extends _DelayedEvent{ | 
| /** A delayed error event. */ | 
| class _DelayedError extends _DelayedEvent { | 
| - AsyncError error; | 
| + final error; | 
| _DelayedError(this.error); | 
| void perform(_StreamImpl stream) { | 
| stream._sendError(error); | 
| @@ -1113,7 +1121,7 @@ abstract class _InternalLinkList extends _InternalLink { | 
| /** Abstract type for an internal interface for sending events. */ | 
| abstract class _EventOutputSink<T> { | 
| _sendData(T data); | 
| - _sendError(AsyncError error); | 
| + _sendError(error); | 
| _sendDone(); | 
| } | 
| @@ -1188,7 +1196,7 @@ abstract class _StreamListener<T> extends _InternalLink | 
| } | 
| _sendData(T data); | 
| - _sendError(AsyncError error); | 
| + _sendError(error); | 
| _sendDone(); | 
| } | 
| @@ -1276,7 +1284,7 @@ class _DoneSubscription<T> implements StreamSubscription<T> { | 
| void onData(void handleAction(T value)) {} | 
| - void onError(void handleError(AsyncError error)) {} | 
| + void onError(void handleError(error)) {} | 
| void onDone(void handleDone()) { | 
| _handler = handleDone; | 
| @@ -1318,7 +1326,7 @@ class _DoneSubscription<T> implements StreamSubscription<T> { | 
| // Overwrite the onDone and onError handlers. | 
| onDone(() { result._setValue(futureValue); }); | 
| - onError((AsyncError error) { | 
| + onError((error) { | 
| cancel(); | 
| result._setError(error); | 
| }); |