| Index: sdk/lib/async/stream_pipe.dart
 | 
| diff --git a/sdk/lib/async/stream_pipe.dart b/sdk/lib/async/stream_pipe.dart
 | 
| index aaa34b059362638f8f9b04fad2f50f6eaccc9eea..69210ecabd1f2259571bfe6db85f32fc8942bd19 100644
 | 
| --- a/sdk/lib/async/stream_pipe.dart
 | 
| +++ b/sdk/lib/async/stream_pipe.dart
 | 
| @@ -74,16 +74,16 @@ abstract class _ForwardingStream<S, T> extends Stream<T> {
 | 
|  
 | 
|    // Override the following methods in subclasses to change the behavior.
 | 
|  
 | 
| -  void _handleData(S data, _StreamOutputSink<T> sink) {
 | 
| +  void _handleData(S data, _EventOutputSink<T> sink) {
 | 
|      var outputData = data;
 | 
|      sink._sendData(outputData);
 | 
|    }
 | 
|  
 | 
| -  void _handleError(AsyncError error, _StreamOutputSink<T> sink) {
 | 
| +  void _handleError(AsyncError error, _EventOutputSink<T> sink) {
 | 
|      sink._sendError(error);
 | 
|    }
 | 
|  
 | 
| -  void _handleDone(_StreamOutputSink<T> sink) {
 | 
| +  void _handleDone(_EventOutputSink<T> sink) {
 | 
|      sink._sendDone();
 | 
|    }
 | 
|  }
 | 
| @@ -136,7 +136,7 @@ abstract class _BaseStreamSubscription<T> implements StreamSubscription<T> {
 | 
|   * Abstract superclass for subscriptions that forward to other subscriptions.
 | 
|   */
 | 
|  class _ForwardingStreamSubscription<S, T>
 | 
| -    extends _BaseStreamSubscription<T> implements _StreamOutputSink<T> {
 | 
| +    extends _BaseStreamSubscription<T> implements _EventOutputSink<T> {
 | 
|    final _ForwardingStream<S, T> _stream;
 | 
|    final bool _unsubscribeOnError;
 | 
|  
 | 
| @@ -179,7 +179,7 @@ class _ForwardingStreamSubscription<S, T>
 | 
|      _subscription = null;
 | 
|    }
 | 
|  
 | 
| -  // _StreamOutputSink interface. Sends data to this subscription.
 | 
| +  // _EventOutputSink interface. Sends data to this subscription.
 | 
|  
 | 
|    void _sendData(T data) {
 | 
|      _onData(data);
 | 
| @@ -233,7 +233,7 @@ class _WhereStream<T> extends _ForwardingStream<T, T> {
 | 
|    _WhereStream(Stream<T> source, bool test(T value))
 | 
|        : _test = test, super(source);
 | 
|  
 | 
| -  void _handleData(T inputEvent, _StreamOutputSink<T> sink) {
 | 
| +  void _handleData(T inputEvent, _EventOutputSink<T> sink) {
 | 
|      bool satisfies;
 | 
|      try {
 | 
|        satisfies = _test(inputEvent);
 | 
| @@ -259,7 +259,7 @@ class _MapStream<S, T> extends _ForwardingStream<S, T> {
 | 
|    _MapStream(Stream<S> source, T transform(S event))
 | 
|        : this._transform = transform, super(source);
 | 
|  
 | 
| -  void _handleData(S inputEvent, _StreamOutputSink<T> sink) {
 | 
| +  void _handleData(S inputEvent, _EventOutputSink<T> sink) {
 | 
|      T outputEvent;
 | 
|      try {
 | 
|        outputEvent = _transform(inputEvent);
 | 
| @@ -280,7 +280,7 @@ class _ExpandStream<S, T> extends _ForwardingStream<S, T> {
 | 
|    _ExpandStream(Stream<S> source, Iterable<T> expand(S event))
 | 
|        : this._expand = expand, super(source);
 | 
|  
 | 
| -  void _handleData(S inputEvent, _StreamOutputSink<T> sink) {
 | 
| +  void _handleData(S inputEvent, _EventOutputSink<T> sink) {
 | 
|      try {
 | 
|        for (T value in _expand(inputEvent)) {
 | 
|          sink._sendData(value);
 | 
| @@ -310,7 +310,7 @@ class _HandleErrorStream<T> extends _ForwardingStream<T, T> {
 | 
|                      bool test(error))
 | 
|        : this._transform = transform, this._test = test, super(source);
 | 
|  
 | 
| -  void _handleError(AsyncError error, _StreamOutputSink<T> sink) {
 | 
| +  void _handleError(AsyncError error, _EventOutputSink<T> sink) {
 | 
|      bool matches = true;
 | 
|      if (_test != null) {
 | 
|        try {
 | 
| @@ -344,7 +344,7 @@ class _TakeStream<T> extends _ForwardingStream<T, T> {
 | 
|      if (count is! int) throw new ArgumentError(count);
 | 
|    }
 | 
|  
 | 
| -  void _handleData(T inputEvent, _StreamOutputSink<T> sink) {
 | 
| +  void _handleData(T inputEvent, _EventOutputSink<T> sink) {
 | 
|      if (_remaining > 0) {
 | 
|        sink._sendData(inputEvent);
 | 
|        _remaining -= 1;
 | 
| @@ -364,7 +364,7 @@ class _TakeWhileStream<T> extends _ForwardingStream<T, T> {
 | 
|    _TakeWhileStream(Stream<T> source, bool test(T value))
 | 
|        : this._test = test, super(source);
 | 
|  
 | 
| -  void _handleData(T inputEvent, _StreamOutputSink<T> sink) {
 | 
| +  void _handleData(T inputEvent, _EventOutputSink<T> sink) {
 | 
|      bool satisfies;
 | 
|      try {
 | 
|        satisfies = _test(inputEvent);
 | 
| @@ -392,7 +392,7 @@ class _SkipStream<T> extends _ForwardingStream<T, T> {
 | 
|      if (count is! int || count < 0) throw new ArgumentError(count);
 | 
|    }
 | 
|  
 | 
| -  void _handleData(T inputEvent, _StreamOutputSink<T> sink) {
 | 
| +  void _handleData(T inputEvent, _EventOutputSink<T> sink) {
 | 
|      if (_remaining > 0) {
 | 
|        _remaining--;
 | 
|        return;
 | 
| @@ -408,7 +408,7 @@ class _SkipWhileStream<T> extends _ForwardingStream<T, T> {
 | 
|    _SkipWhileStream(Stream<T> source, bool test(T value))
 | 
|        : this._test = test, super(source);
 | 
|  
 | 
| -  void _handleData(T inputEvent, _StreamOutputSink<T> sink) {
 | 
| +  void _handleData(T inputEvent, _EventOutputSink<T> sink) {
 | 
|      if (_hasFailed) {
 | 
|        sink._sendData(inputEvent);
 | 
|      }
 | 
| @@ -439,7 +439,7 @@ class _DistinctStream<T> extends _ForwardingStream<T, T> {
 | 
|    _DistinctStream(Stream<T> source, bool equals(T a, T b))
 | 
|        : _equals = equals, super(source);
 | 
|  
 | 
| -  void _handleData(T inputEvent, _StreamOutputSink<T> sink) {
 | 
| +  void _handleData(T inputEvent, _EventOutputSink<T> sink) {
 | 
|      if (identical(_previous, _SENTINEL)) {
 | 
|        _previous = inputEvent;
 | 
|        return sink._sendData(inputEvent);
 | 
| @@ -465,35 +465,26 @@ class _DistinctStream<T> extends _ForwardingStream<T, T> {
 | 
|  
 | 
|  // Stream transformations and event transformations.
 | 
|  
 | 
| -typedef void _TransformDataHandler<S, T>(S data, StreamSink<T> sink);
 | 
| -typedef void _TransformErrorHandler<T>(AsyncError data, StreamSink<T> sink);
 | 
| -typedef void _TransformDoneHandler<T>(StreamSink<T> sink);
 | 
| +typedef void _TransformDataHandler<S, T>(S data, EventSink<T> sink);
 | 
| +typedef void _TransformErrorHandler<T>(AsyncError data, EventSink<T> sink);
 | 
| +typedef void _TransformDoneHandler<T>(EventSink<T> sink);
 | 
|  
 | 
|  /** Default data handler forwards all data. */
 | 
| -void _defaultHandleData(var data, StreamSink sink) {
 | 
| +void _defaultHandleData(var data, EventSink sink) {
 | 
|    sink.add(data);
 | 
|  }
 | 
|  
 | 
|  /** Default error handler forwards all errors. */
 | 
| -void _defaultHandleError(AsyncError error, StreamSink sink) {
 | 
| -  sink.signalError(error);
 | 
| +void _defaultHandleError(AsyncError error, EventSink sink) {
 | 
| +  sink.addError(error);
 | 
|  }
 | 
|  
 | 
|  /** Default done handler forwards done. */
 | 
| -void _defaultHandleDone(StreamSink sink) {
 | 
| +void _defaultHandleDone(EventSink sink) {
 | 
|    sink.close();
 | 
|  }
 | 
|  
 | 
|  
 | 
| -/** Creates a [StreamSink] from a [_StreamImpl]'s input methods. */
 | 
| -class _StreamImplSink<T> implements StreamSink<T> {
 | 
| -  _StreamImpl<T> _target;
 | 
| -  _StreamImplSink(this._target);
 | 
| -  void add(T data) { _target._add(data); }
 | 
| -  void signalError(AsyncError error) { _target._signalError(error); }
 | 
| -  void close() { _target._close(); }
 | 
| -}
 | 
| -
 | 
|  /**
 | 
|   * A [StreamTransformer] that modifies stream events.
 | 
|   *
 | 
| @@ -511,9 +502,9 @@ class _StreamTransformerImpl<S, T> extends StreamEventTransformer<S, T> {
 | 
|    final _TransformErrorHandler<T> _handleError;
 | 
|    final _TransformDoneHandler<T> _handleDone;
 | 
|  
 | 
| -  _StreamTransformerImpl(void handleData(S data, StreamSink<T> sink),
 | 
| -                         void handleError(AsyncError data, StreamSink<T> sink),
 | 
| -                         void handleDone(StreamSink<T> sink))
 | 
| +  _StreamTransformerImpl(void handleData(S data, EventSink<T> sink),
 | 
| +                         void handleError(AsyncError data, EventSink<T> sink),
 | 
| +                         void handleDone(EventSink<T> sink))
 | 
|        : this._handleData  = (handleData == null  ? _defaultHandleData
 | 
|                                                   : handleData),
 | 
|          this._handleError = (handleError == null ? _defaultHandleError
 | 
| @@ -521,15 +512,15 @@ class _StreamTransformerImpl<S, T> extends StreamEventTransformer<S, T> {
 | 
|          this._handleDone  = (handleDone == null  ? _defaultHandleDone
 | 
|                                                   : handleDone);
 | 
|  
 | 
| -  void handleData(S data, StreamSink<T> sink) {
 | 
| +  void handleData(S data, EventSink<T> sink) {
 | 
|      _handleData(data, sink);
 | 
|    }
 | 
|  
 | 
| -  void handleError(AsyncError error, StreamSink<T> sink) {
 | 
| +  void handleError(AsyncError error, EventSink<T> sink) {
 | 
|      _handleError(error, sink);
 | 
|    }
 | 
|  
 | 
| -  void handleDone(StreamSink<T> sink) {
 | 
| +  void handleDone(EventSink<T> sink) {
 | 
|      _handleDone(sink);
 | 
|    }
 | 
|  }
 | 
| 
 |