Chromium Code Reviews| Index: sdk/lib/async/stream_impl.dart |
| diff --git a/sdk/lib/async/stream_impl.dart b/sdk/lib/async/stream_impl.dart |
| index fa0edf2fc2857059260ee53c8e9ef9392a500415..04557d7f1441ec459be9238a80839292cb7b1717 100644 |
| --- a/sdk/lib/async/stream_impl.dart |
| +++ b/sdk/lib/async/stream_impl.dart |
| @@ -12,7 +12,7 @@ const int _STREAM_OPEN = 0; |
| /// The stream has received a request to complete, but hasn't done so yet. |
| /// No further events can be added to the stream. |
| const int _STREAM_CLOSED = 1; |
| -/// The stream has completed and will no longer receive or send events. |
| +// /// The stream has completed and will no longer receive or send events. |
|
Anders Johnsen
2013/04/15 16:34:17
Double uncomment.
floitsch
2013/04/15 18:54:10
Done.
|
| /// Also counts as closed. The stream must not be paused when it's completed. |
| /// Always used in conjunction with [_STREAM_CLOSED]. |
| const int _STREAM_COMPLETE = 2; |
| @@ -55,6 +55,20 @@ const int _LISTENER_EVENT_ID_SHIFT = 2; |
| /// state, shifted by this amount. |
| const int _LISTENER_PAUSE_COUNT_SHIFT = 3; |
| +/** Throws the given error in the next cycle. */ |
| +_throwDelayed(var error, [Object stackTrace]) { |
| + // We are going to reach the top-level here, but there might be a global |
| + // exception handler. This means that we shouldn't print the stack trace. |
| + // TODO(floitsch): Find better solution that doesn't print the stack trace |
| + // if there is a global exception handler. |
| + runAsync(() { |
| + if (stackTrace != null) print(stackTrace); |
| + var trace = getAttachedStackTrace(error); |
| + if (trace != null && trace != stackTrace) print(trace); |
| + throw error; |
| + }); |
| +} |
| + |
| // ------------------------------------------------------------------- |
| // Common base class for single and multi-subscription streams. |
| @@ -77,7 +91,7 @@ abstract class _StreamImpl<T> extends Stream<T> { |
| // Stream interface. |
| StreamSubscription<T> listen(void onData(T data), |
| - { void onError(AsyncError error), |
| + { void onError(error), |
| void onDone(), |
| bool unsubscribeOnError }) { |
| if (_isComplete) { |
| @@ -123,7 +137,7 @@ abstract class _StreamImpl<T> extends Stream<T> { |
| * If a subscription has requested to be unsubscribed on errors, |
| * it will be unsubscribed after receiving this event. |
| */ |
| - void _addError(AsyncError error) { |
| + void _addError(error) { |
| if (_isClosed) throw new StateError("Sending on closed stream"); |
| if (!_mayFireState) { |
| // Not the time to send events. |
| @@ -374,7 +388,7 @@ abstract class _StreamImpl<T> extends Stream<T> { |
| /** Create a subscription object. Called by [subcribe]. */ |
| _StreamSubscriptionImpl<T> _createSubscription( |
| void onData(T data), |
| - void onError(AsyncError error), |
| + void onError(error), |
| void onDone(), |
| bool unsubscribeOnError); |
| @@ -493,10 +507,8 @@ abstract class _StreamImpl<T> extends Stream<T> { |
| _forEachSubscriber((subscriber) { |
| try { |
| subscriber._sendData(value); |
| - } on AsyncError catch (e) { |
| - e.throwDelayed(); |
| } catch (e, s) { |
| - new AsyncError(e, s).throwDelayed(); |
| + _throwDelayed(e, s); |
| } |
| }); |
| } |
| @@ -504,17 +516,15 @@ abstract class _StreamImpl<T> extends Stream<T> { |
| /** |
| * Sends an error event directly to each subscriber. |
| */ |
| - void _sendError(AsyncError error) { |
| + void _sendError(error) { |
| assert(!_isPaused); |
| assert(!_isComplete); |
| if (!_hasListener) return; |
| _forEachSubscriber((subscriber) { |
| try { |
| subscriber._sendError(error); |
| - } on AsyncError catch (e) { |
| - e.throwDelayed(); |
| } catch (e, s) { |
| - new AsyncError.withCause(e, s, error).throwDelayed(); |
| + _throwDelayed(e, s); |
| } |
| }); |
| } |
| @@ -533,10 +543,8 @@ abstract class _StreamImpl<T> extends Stream<T> { |
| _cancel(subscriber); |
| try { |
| subscriber._sendDone(); |
| - } on AsyncError catch (e) { |
| - e.throwDelayed(); |
| } catch (e, s) { |
| - new AsyncError(e, s).throwDelayed(); |
| + _throwDelayed(e, s); |
| } |
| }); |
| assert(!_hasListener); |
| @@ -592,7 +600,7 @@ class _SingleStreamImpl<T> extends _StreamImpl<T> { |
| */ |
| _StreamSubscriptionImpl<T> _createSubscription( |
| void onData(T data), |
| - void onError(AsyncError error), |
| + void onError(error), |
| void onDone(), |
| bool unsubscribeOnError) { |
| return new _StreamSubscriptionImpl<T>( |
| @@ -712,7 +720,7 @@ class _MultiStreamImpl<T> extends _StreamImpl<T> |
| */ |
| _StreamListener<T> _createSubscription( |
| void onData(T data), |
| - void onError(AsyncError error), |
| + void onError(error), |
| void onDone(), |
| bool unsubscribeOnError) { |
| return new _StreamSubscriptionImpl<T>( |
| @@ -841,7 +849,7 @@ class _GeneratedSingleStreamImpl<T> extends _SingleStreamImpl<T> { |
| throw new UnsupportedError("Cannot inject events into generated stream"); |
| } |
| - void _addError(AsyncError value) { |
| + void _addError(value) { |
| throw new UnsupportedError("Cannot inject events into generated stream"); |
| } |
| @@ -876,7 +884,7 @@ class _IterablePendingEvents<T> extends _PendingEvents { |
| stream._sendDone(); |
| } |
| } catch (e, s) { |
| - stream._sendError(new AsyncError(e, s)); |
| + stream._sendError(_asyncError(e, s)); |
| stream._sendDone(); |
| _isDone = true; |
| } |
| @@ -918,7 +926,7 @@ class _StreamSubscriptionImpl<T> extends _StreamListener<T> |
| _onData = handleData; |
| } |
| - void onError(void handleError(AsyncError error)) { |
| + void onError(void handleError(error)) { |
| if (handleError == null) handleError = _nullErrorHandler; |
| _onError = handleError; |
| } |
| @@ -932,7 +940,7 @@ class _StreamSubscriptionImpl<T> extends _StreamListener<T> |
| _onData(data); |
| } |
| - void _sendError(AsyncError error) { |
| + void _sendError(error) { |
| _onError(error); |
| if (_unsubscribeOnError) _source._cancel(this); |
| } |
| @@ -961,7 +969,7 @@ class _StreamSubscriptionImpl<T> extends _StreamListener<T> |
| // Overwrite the onDone and onError handlers. |
| onDone(() { result._setValue(futureValue); }); |
| - onError((AsyncError error) { |
| + onError((error) { |
| cancel(); |
| result._setError(error); |
| }); |
| @@ -974,7 +982,7 @@ class _StreamSubscriptionImpl<T> extends _StreamListener<T> |
| // Types of the different handlers on a stream. Types used to type fields. |
| typedef void _DataHandler<T>(T value); |
| -typedef void _ErrorHandler(AsyncError error); |
| +typedef void _ErrorHandler(error); |
| typedef void _DoneHandler(); |
| @@ -982,8 +990,8 @@ typedef void _DoneHandler(); |
| void _nullDataHandler(var value) {} |
| /** Default error handler, reports the error to the global handler. */ |
| -void _nullErrorHandler(AsyncError error) { |
| - error.throwDelayed(); |
| +void _nullErrorHandler(error) { |
| + _throwDelayed(error); |
| } |
| /** Default done handler, does nothing. */ |
| @@ -1000,7 +1008,7 @@ abstract class _DelayedEvent { |
| /** A delayed data event. */ |
| class _DelayedData<T> extends _DelayedEvent{ |
| - T value; |
| + final T value; |
| _DelayedData(this.value); |
| void perform(_StreamImpl<T> stream) { |
| stream._sendData(value); |
| @@ -1009,7 +1017,7 @@ class _DelayedData<T> extends _DelayedEvent{ |
| /** A delayed error event. */ |
| class _DelayedError extends _DelayedEvent { |
| - AsyncError error; |
| + final error; |
| _DelayedError(this.error); |
| void perform(_StreamImpl stream) { |
| stream._sendError(error); |
| @@ -1113,7 +1121,7 @@ abstract class _InternalLinkList extends _InternalLink { |
| /** Abstract type for an internal interface for sending events. */ |
| abstract class _EventOutputSink<T> { |
| _sendData(T data); |
| - _sendError(AsyncError error); |
| + _sendError(error); |
| _sendDone(); |
| } |
| @@ -1188,7 +1196,7 @@ abstract class _StreamListener<T> extends _InternalLink |
| } |
| _sendData(T data); |
| - _sendError(AsyncError error); |
| + _sendError(error); |
| _sendDone(); |
| } |
| @@ -1276,7 +1284,7 @@ class _DoneSubscription<T> implements StreamSubscription<T> { |
| void onData(void handleAction(T value)) {} |
| - void onError(void handleError(AsyncError error)) {} |
| + void onError(void handleError(error)) {} |
| void onDone(void handleDone()) { |
| _handler = handleDone; |
| @@ -1318,7 +1326,7 @@ class _DoneSubscription<T> implements StreamSubscription<T> { |
| // Overwrite the onDone and onError handlers. |
| onDone(() { result._setValue(futureValue); }); |
| - onError((AsyncError error) { |
| + onError((error) { |
| cancel(); |
| result._setError(error); |
| }); |