| Index: sdk/lib/async/stream_impl.dart | 
| diff --git a/sdk/lib/async/stream_impl.dart b/sdk/lib/async/stream_impl.dart | 
| index 29cfbd1c3ce5cc1055f0f5240734352e7be9b7d3..164d3b242802874e9d1d4e2cbffaeaf6438a3d68 100644 | 
| --- a/sdk/lib/async/stream_impl.dart | 
| +++ b/sdk/lib/async/stream_impl.dart | 
| @@ -49,9 +49,8 @@ abstract class _EventDispatch<T> { | 
| * but if it happens anyway, the subscription will enqueue the events just as | 
| * when new events arrive while still firing an old event. | 
| */ | 
| -class _BufferingStreamSubscription<T> implements StreamSubscription<T>, | 
| -                                                 _EventSink<T>, | 
| -                                                 _EventDispatch<T> { | 
| +class _BufferingStreamSubscription<T> | 
| +    implements StreamSubscription<T>, _EventSink<T>, _EventDispatch<T> { | 
| /** The `cancelOnError` flag from the `listen` call. */ | 
| static const int _STATE_CANCEL_ON_ERROR = 1; | 
| /** | 
| @@ -107,10 +106,8 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, | 
| */ | 
| _PendingEvents<T> _pending; | 
|  | 
| -  _BufferingStreamSubscription(void onData(T data), | 
| -                               Function onError, | 
| -                               void onDone(), | 
| -                               bool cancelOnError) | 
| +  _BufferingStreamSubscription( | 
| +      void onData(T data), Function onError, void onDone(), bool cancelOnError) | 
| : _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0) { | 
| this.onData(onData); | 
| this.onError(onError); | 
| @@ -197,7 +194,9 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, | 
| _Future<E> result = new _Future<E>(); | 
|  | 
| // Overwrite the onDone and onError handlers. | 
| -    _onDone = () { result._complete(futureValue); }; | 
| +    _onDone = () { | 
| +      result._complete(futureValue); | 
| +    }; | 
| _onError = (error, stackTrace) { | 
| Future cancelFuture = cancel(); | 
| if (!identical(cancelFuture, Future._nullFuture)) { | 
| @@ -264,7 +263,7 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, | 
| void _addError(Object error, StackTrace stackTrace) { | 
| if (_isCanceled) return; | 
| if (_canFire) { | 
| -      _sendError(error, stackTrace);  // Reports cancel after sending. | 
| +      _sendError(error, stackTrace); // Reports cancel after sending. | 
| } else { | 
| _addPending(new _DelayedError(error, stackTrace)); | 
| } | 
| @@ -346,11 +345,11 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, | 
| _state |= _STATE_IN_CALLBACK; | 
| if (_onError is ZoneBinaryCallback<dynamic, Object, StackTrace>) { | 
| ZoneBinaryCallback<dynamic, Object, StackTrace> errorCallback = _onError | 
| -            as Object /*=ZoneBinaryCallback<dynamic, Object, StackTrace>*/; | 
| +            as Object/*=ZoneBinaryCallback<dynamic, Object, StackTrace>*/; | 
| _zone.runBinaryGuarded(errorCallback, error, stackTrace); | 
| } else { | 
| _zone.runUnaryGuarded<dynamic, dynamic>( | 
| -            _onError as Object /*=ZoneUnaryCallback<dynamic, dynamic>*/, error); | 
| +            _onError as Object/*=ZoneUnaryCallback<dynamic, dynamic>*/, error); | 
| } | 
| _state &= ~_STATE_IN_CALLBACK; | 
| } | 
| @@ -461,9 +460,7 @@ abstract class _StreamImpl<T> extends Stream<T> { | 
| // Stream interface. | 
|  | 
| StreamSubscription<T> listen(void onData(T data), | 
| -                               { Function onError, | 
| -                                 void onDone(), | 
| -                                 bool cancelOnError }) { | 
| +      {Function onError, void onDone(), bool cancelOnError}) { | 
| cancelOnError = identical(true, cancelOnError); | 
| StreamSubscription<T> subscription = | 
| _createSubscription(onData, onError, onDone, cancelOnError); | 
| @@ -473,13 +470,10 @@ abstract class _StreamImpl<T> extends Stream<T> { | 
|  | 
| // ------------------------------------------------------------------- | 
| /** Create a subscription object. Called by [subcribe]. */ | 
| -  StreamSubscription<T> _createSubscription( | 
| -      void onData(T data), | 
| -      Function onError, | 
| -      void onDone(), | 
| -      bool cancelOnError) { | 
| -    return new _BufferingStreamSubscription<T>(onData, onError, onDone, | 
| -                                               cancelOnError); | 
| +  StreamSubscription<T> _createSubscription(void onData(T data), | 
| +      Function onError, void onDone(), bool cancelOnError) { | 
| +    return new _BufferingStreamSubscription<T>( | 
| +        onData, onError, onDone, cancelOnError); | 
| } | 
|  | 
| /** Hook called when the subscription has been created. */ | 
| @@ -500,11 +494,8 @@ class _GeneratedStreamImpl<T> extends _StreamImpl<T> { | 
| */ | 
| _GeneratedStreamImpl(this._pending); | 
|  | 
| -  StreamSubscription<T> _createSubscription( | 
| -      void onData(T data), | 
| -      Function onError, | 
| -      void onDone(), | 
| -      bool cancelOnError) { | 
| +  StreamSubscription<T> _createSubscription(void onData(T data), | 
| +      Function onError, void onDone(), bool cancelOnError) { | 
| if (_isUsed) throw new StateError("Stream has already been listened to."); | 
| _isUsed = true; | 
| return new _BufferingStreamSubscription<T>( | 
| @@ -512,7 +503,6 @@ class _GeneratedStreamImpl<T> extends _StreamImpl<T> { | 
| } | 
| } | 
|  | 
| - | 
| /** Pending events object that gets its events from an [Iterable]. */ | 
| class _IterablePendingEvents<T> extends _PendingEvents<T> { | 
| // The iterator providing data for data events. | 
| @@ -554,14 +544,12 @@ class _IterablePendingEvents<T> extends _PendingEvents<T> { | 
| } | 
| } | 
|  | 
| - | 
| // Internal helpers. | 
|  | 
| // Types of the different handlers on a stream. Types used to type fields. | 
| typedef void _DataHandler<T>(T value); | 
| typedef void _DoneHandler(); | 
|  | 
| - | 
| /** Default data handler, does nothing. */ | 
| void _nullDataHandler(var value) {} | 
|  | 
| @@ -573,7 +561,6 @@ void _nullErrorHandler(error, [StackTrace stackTrace]) { | 
| /** Default done handler, does nothing. */ | 
| void _nullDoneHandler() {} | 
|  | 
| - | 
| /** A delayed event on a buffering stream subscription. */ | 
| abstract class _DelayedEvent<T> { | 
| /** Added as a linked list on the [StreamController]. */ | 
| @@ -678,11 +665,11 @@ abstract class _PendingEvents<T> { | 
| void clear(); | 
| } | 
|  | 
| - | 
| /** Class holding pending events for a [_StreamImpl]. */ | 
| class _StreamImplEvents<T> extends _PendingEvents<T> { | 
| /// Single linked list of [_DelayedEvent] objects. | 
| _DelayedEvent firstPendingEvent = null; | 
| + | 
| /// Last element in the list of pending events. New events are added after it. | 
| _DelayedEvent lastPendingEvent = null; | 
|  | 
| @@ -742,7 +729,9 @@ class _DoneStreamSubscription<T> implements StreamSubscription<T> { | 
|  | 
| void onData(void handleData(T data)) {} | 
| void onError(Function handleError) {} | 
| -  void onDone(void handleDone()) { _onDone = handleDone; } | 
| +  void onDone(void handleDone()) { | 
| +    _onDone = handleDone; | 
| +  } | 
|  | 
| void pause([Future resumeSignal]) { | 
| _state += _PAUSED; | 
| @@ -762,7 +751,9 @@ class _DoneStreamSubscription<T> implements StreamSubscription<T> { | 
|  | 
| Future<E> asFuture<E>([E futureValue]) { | 
| _Future<E> result = new _Future<E>(); | 
| -    _onDone = () { result._completeWithValue(null); }; | 
| +    _onDone = () { | 
| +      result._completeWithValue(null); | 
| +    }; | 
| return result; | 
| } | 
|  | 
| @@ -783,15 +774,18 @@ class _AsBroadcastStream<T> extends Stream<T> { | 
| _AsBroadcastStreamController<T> _controller; | 
| StreamSubscription<T> _subscription; | 
|  | 
| -  _AsBroadcastStream(this._source, | 
| -                     void onListenHandler(StreamSubscription<T> subscription), | 
| -                     void onCancelHandler(StreamSubscription<T> subscription)) | 
| +  _AsBroadcastStream( | 
| +      this._source, | 
| +      void onListenHandler(StreamSubscription<T> subscription), | 
| +      void onCancelHandler(StreamSubscription<T> subscription)) | 
| // TODO(floitsch): the return type should be void and should be | 
| // inferred. | 
| -      : _onListenHandler = Zone.current.registerUnaryCallback | 
| -            <dynamic, StreamSubscription<T>>(onListenHandler), | 
| -        _onCancelHandler = Zone.current.registerUnaryCallback | 
| -            <dynamic, StreamSubscription<T>>(onCancelHandler), | 
| +      : _onListenHandler = Zone.current | 
| +            .registerUnaryCallback<dynamic, StreamSubscription<T>>( | 
| +                onListenHandler), | 
| +        _onCancelHandler = Zone.current | 
| +            .registerUnaryCallback<dynamic, StreamSubscription<T>>( | 
| +                onCancelHandler), | 
| _zone = Zone.current { | 
| _controller = new _AsBroadcastStreamController<T>(_onListen, _onCancel); | 
| } | 
| @@ -799,9 +793,7 @@ class _AsBroadcastStream<T> extends Stream<T> { | 
| bool get isBroadcast => true; | 
|  | 
| StreamSubscription<T> listen(void onData(T data), | 
| -                               { Function onError, | 
| -                                 void onDone(), | 
| -                                 bool cancelOnError}) { | 
| +      {Function onError, void onDone(), bool cancelOnError}) { | 
| if (_controller == null || _controller.isClosed) { | 
| // Return a dummy subscription backed by nothing, since | 
| // it will only ever send one done event. | 
| @@ -809,8 +801,7 @@ class _AsBroadcastStream<T> extends Stream<T> { | 
| } | 
| if (_subscription == null) { | 
| _subscription = _source.listen(_controller.add, | 
| -                                     onError: _controller.addError, | 
| -                                     onDone: _controller.close); | 
| +          onError: _controller.addError, onDone: _controller.close); | 
| } | 
| cancelOnError = identical(true, cancelOnError); | 
| return _controller._subscribe(onData, onError, onDone, cancelOnError); | 
| @@ -843,7 +834,7 @@ class _AsBroadcastStream<T> extends Stream<T> { | 
| // Called by [_controller] when it has no subscribers left. | 
| StreamSubscription subscription = _subscription; | 
| _subscription = null; | 
| -    _controller = null;  // Marks the stream as no longer listenable. | 
| +    _controller = null; // Marks the stream as no longer listenable. | 
| subscription.cancel(); | 
| } | 
|  | 
| @@ -909,7 +900,6 @@ class _BroadcastSubscriptionWrapper<T> implements StreamSubscription<T> { | 
| } | 
| } | 
|  | 
| - | 
| /** | 
| * Simple implementation of [StreamIterator]. | 
| * | 
| @@ -969,7 +959,7 @@ class _StreamIterator<T> implements StreamIterator<T> { | 
|  | 
| T get current { | 
| if (_subscription != null && _isPaused) { | 
| -      return _stateData as Object /*=T*/; | 
| +      return _stateData as Object/*=T*/; | 
| } | 
| return null; | 
| } | 
| @@ -997,9 +987,9 @@ class _StreamIterator<T> implements StreamIterator<T> { | 
| assert(_subscription == null); | 
| var stateData = _stateData; | 
| if (stateData != null) { | 
| -      Stream<T> stream = stateData as Object /*=Stream<T>*/; | 
| -      _subscription = stream.listen( | 
| -          _onData, onError: _onError, onDone: _onDone, cancelOnError: true); | 
| +      Stream<T> stream = stateData as Object/*=Stream<T>*/; | 
| +      _subscription = stream.listen(_onData, | 
| +          onError: _onError, onDone: _onDone, cancelOnError: true); | 
| var future = new _Future<bool>(); | 
| _stateData = future; | 
| return future; | 
| @@ -1014,7 +1004,7 @@ class _StreamIterator<T> implements StreamIterator<T> { | 
| if (subscription != null) { | 
| _subscription = null; | 
| if (!_isPaused) { | 
| -        _Future<bool> future = stateData as Object /*=_Future<bool>*/; | 
| +        _Future<bool> future = stateData as Object/*=_Future<bool>*/; | 
| future._asyncComplete(false); | 
| } | 
| return subscription.cancel(); | 
| @@ -1024,7 +1014,7 @@ class _StreamIterator<T> implements StreamIterator<T> { | 
|  | 
| void _onData(T data) { | 
| assert(_subscription != null && !_isPaused); | 
| -    _Future<bool> moveNextFuture = _stateData as Object /*=_Future<bool>*/; | 
| +    _Future<bool> moveNextFuture = _stateData as Object/*=_Future<bool>*/; | 
| _stateData = data; | 
| _isPaused = true; | 
| moveNextFuture._complete(true); | 
| @@ -1033,7 +1023,7 @@ class _StreamIterator<T> implements StreamIterator<T> { | 
|  | 
| void _onError(Object error, [StackTrace stackTrace]) { | 
| assert(_subscription != null && !_isPaused); | 
| -    _Future<bool> moveNextFuture = _stateData as Object /*=_Future<bool>*/; | 
| +    _Future<bool> moveNextFuture = _stateData as Object/*=_Future<bool>*/; | 
| _subscription = null; | 
| _stateData = null; | 
| moveNextFuture._completeError(error, stackTrace); | 
| @@ -1041,7 +1031,7 @@ class _StreamIterator<T> implements StreamIterator<T> { | 
|  | 
| void _onDone() { | 
| assert(_subscription != null && !_isPaused); | 
| -    _Future<bool> moveNextFuture = _stateData as Object /*=_Future<bool>*/; | 
| +    _Future<bool> moveNextFuture = _stateData as Object/*=_Future<bool>*/; | 
| _subscription = null; | 
| _stateData = null; | 
| moveNextFuture._complete(false); | 
| @@ -1053,9 +1043,7 @@ class _EmptyStream<T> extends Stream<T> { | 
| const _EmptyStream() : super._internal(); | 
| bool get isBroadcast => true; | 
| StreamSubscription<T> listen(void onData(T data), | 
| -                               {Function onError, | 
| -                                void onDone(), | 
| -                                bool cancelOnError}) { | 
| +      {Function onError, void onDone(), bool cancelOnError}) { | 
| return new _DoneStreamSubscription<T>(onDone); | 
| } | 
| } | 
|  |