| Index: pkg/http/lib/src/utils.dart
 | 
| diff --git a/pkg/http/lib/src/utils.dart b/pkg/http/lib/src/utils.dart
 | 
| index 0a82dfd1225b5ef17b632defc5557ddc1a68cac0..28e2bfbec3141a0f8a7e42fef898ff92b49d8738 100644
 | 
| --- a/pkg/http/lib/src/utils.dart
 | 
| +++ b/pkg/http/lib/src/utils.dart
 | 
| @@ -11,6 +11,8 @@ import 'dart:scalarlist';
 | 
|  import 'dart:uri';
 | 
|  import 'dart:utf';
 | 
|  
 | 
| +import 'byte_stream.dart';
 | 
| +
 | 
|  /// Converts a URL query string (or `application/x-www-form-urlencoded` body)
 | 
|  /// into a [Map] from parameter names to values.
 | 
|  ///
 | 
| @@ -131,49 +133,157 @@ Uint8List toUint8List(List<int> input) {
 | 
|    return output;
 | 
|  }
 | 
|  
 | 
| -/// Buffers all input from an InputStream and returns it as a future.
 | 
| -Future<List<int>> consumeInputStream(InputStream stream) {
 | 
| -  if (stream.closed) return new Future<List<int>>.immediate(<int>[]);
 | 
| +/// If [stream] is already a [ByteStream], returns it. Otherwise, wraps it in a
 | 
| +/// [ByteStream].
 | 
| +ByteStream toByteStream(Stream<List<int>> stream) {
 | 
| +  if (stream is ByteStream) return stream;
 | 
| +  return new ByteStream(stream);
 | 
| +}
 | 
|  
 | 
| -  var completer = new Completer<List<int>>();
 | 
| -  /// TODO(nweiz): use BufferList when issue 6409 is fixed
 | 
| -  var buffer = <int>[];
 | 
| -  stream.onClosed = () => completer.complete(buffer);
 | 
| -  stream.onData = () => buffer.addAll(stream.read());
 | 
| -  stream.onError = completer.completeError;
 | 
| -  return completer.future;
 | 
| +/// Calls [onDone] once [stream] (a single-subscription [Stream]) is finished.
 | 
| +/// The return value, also a single-subscription [Stream] should be used in
 | 
| +/// place of [stream] after calling this method.
 | 
| +Stream onDone(Stream stream, void onDone()) {
 | 
| +  var pair = tee(stream);
 | 
| +  pair.first.listen((_) {}, onError: (_) {}, onDone: onDone);
 | 
| +  return pair.last;
 | 
|  }
 | 
|  
 | 
| -/// Takes all input from [source] and writes it to [sink], then closes [sink].
 | 
| -/// Returns a [Future] that completes when [source] is exhausted.
 | 
| -void pipeInputToInput(InputStream source, ListInputStream sink) {
 | 
| -  source.onClosed = sink.markEndOfStream;
 | 
| -  source.onData = () => sink.write(source.read());
 | 
| -  // TODO(nweiz): propagate source errors to the sink. See issue 3657.
 | 
| -  // TODO(nweiz): we need to use async here to avoid issue 4974.
 | 
| -  source.onError = (e) => async.then((_) {
 | 
| -    throw e;
 | 
| -  });
 | 
| +// TODO(nweiz): remove this once issue 7785 is fixed.
 | 
| +/// Wraps [stream] in a single-subscription [ByteStream] that emits the same
 | 
| +/// data.
 | 
| +ByteStream wrapInputStream(InputStream stream) {
 | 
| +  if (stream.closed) return emptyStream;
 | 
| +
 | 
| +  var controller = new StreamController.singleSubscription();
 | 
| +  stream.onClosed = controller.close;
 | 
| +  stream.onData = () => controller.add(stream.read());
 | 
| +  stream.onError = (e) => controller.signalError(new AsyncError(e));
 | 
| +  return new ByteStream(controller);
 | 
| +}
 | 
| +
 | 
| +// TODO(nweiz): remove this once issue 7785 is fixed.
 | 
| +/// Wraps [stream] in a [StreamConsumer] so that [Stream]s can by piped into it
 | 
| +/// using [Stream.pipe].
 | 
| +StreamConsumer<List<int>, dynamic> wrapOutputStream(OutputStream stream) =>
 | 
| +  new _OutputStreamConsumer(stream);
 | 
| +
 | 
| +/// A [StreamConsumer] that pipes data into an [OutputStream].
 | 
| +class _OutputStreamConsumer implements StreamConsumer<List<int>, dynamic> {
 | 
| +  final OutputStream _outputStream;
 | 
| +
 | 
| +  _OutputStreamConsumer(this._outputStream)
 | 
| +    : super();
 | 
| +
 | 
| +  Future consume(Stream<List<int>> stream) {
 | 
| +    // TODO(nweiz): we have to manually keep track of whether or not the
 | 
| +    // completer has completed since the output stream could signal an error
 | 
| +    // after close() has been called but before it has shut down internally. See
 | 
| +    // the following TODO.
 | 
| +    var completed = false;
 | 
| +    var completer = new Completer();
 | 
| +    stream.listen((data) => _outputStream.write(data), onDone: () {
 | 
| +      _outputStream.close();
 | 
| +      // TODO(nweiz): wait until _outputStream.onClosed is called once issue
 | 
| +      // 7761 is fixed.
 | 
| +      if (!completed) completer.complete();
 | 
| +      completed = true;
 | 
| +    });
 | 
| +
 | 
| +    _outputStream.onError = (e) {
 | 
| +      if (!completed) completer.completeError(e);
 | 
| +      completed = true;
 | 
| +    };
 | 
| +
 | 
| +    return completer.future;
 | 
| +  }
 | 
|  }
 | 
|  
 | 
| -/// Takes all input from [source] and writes it to [sink], but does not close
 | 
| -/// [sink] when [source] is closed. Returns a [Future] that completes when
 | 
| -/// [source] is closed.
 | 
| -Future writeInputToInput(InputStream source, ListInputStream sink) {
 | 
| +// TODO(nweiz): remove this when issue 7786 is fixed.
 | 
| +/// Pipes all data and errors from [stream] into [sink]. When [stream] is done,
 | 
| +/// [sink] is closed and the returned [Future] is completed.
 | 
| +Future store(Stream stream, StreamSink sink) {
 | 
|    var completer = new Completer();
 | 
| -  source.onClosed = () => completer.complete(null);
 | 
| -  source.onData = () => sink.write(source.read());
 | 
| -  // TODO(nweiz): propagate source errors to the sink. See issue 3657.
 | 
| +  stream.listen(sink.add,
 | 
| +      onError: sink.signalError,
 | 
| +      onDone: () {
 | 
| +        sink.close();
 | 
| +        completer.complete();
 | 
| +      });
 | 
|    return completer.future;
 | 
|  }
 | 
|  
 | 
| -/// Returns a [Future] that asynchronously completes to `null`.
 | 
| -Future get async {
 | 
| +/// Pipes all data and errors from [stream] into [sink]. Completes [Future] once
 | 
| +/// [stream] is done. Unlike [store], [sink] remains open after [stream] is
 | 
| +/// done.
 | 
| +Future writeStreamToSink(Stream stream, StreamSink sink) {
 | 
|    var completer = new Completer();
 | 
| -  new Timer(0, (_) => completer.complete(null));
 | 
| +  stream.listen(sink.add,
 | 
| +      onError: sink.signalError,
 | 
| +      onDone: () => completer.complete());
 | 
|    return completer.future;
 | 
|  }
 | 
|  
 | 
| +/// Returns a [Future] that asynchronously completes to `null`.
 | 
| +Future get async => new Future.immediate(null);
 | 
| +
 | 
| +/// Returns a closed [Stream] with no elements.
 | 
| +Stream get emptyStream => streamFromIterable([]);
 | 
| +
 | 
| +/// Creates a single-subscription stream that emits the items in [iter] and then
 | 
| +/// ends.
 | 
| +Stream streamFromIterable(Iterable iter) {
 | 
| +  var stream = new StreamController.singleSubscription();
 | 
| +  iter.forEach(stream.add);
 | 
| +  stream.close();
 | 
| +  return stream.stream;
 | 
| +}
 | 
| +
 | 
| +// TODO(nweiz): remove this when issue 7787 is fixed.
 | 
| +/// Creates two single-subscription [Stream]s that each emit all values and
 | 
| +/// errors from [stream]. This is useful if [stream] is single-subscription but
 | 
| +/// multiple subscribers are necessary.
 | 
| +Pair<Stream, Stream> tee(Stream stream) {
 | 
| +  var controller1 = new StreamController.singleSubscription();
 | 
| +  var controller2 = new StreamController.singleSubscription();
 | 
| +  stream.listen((value) {
 | 
| +    controller1.add(value);
 | 
| +    controller2.add(value);
 | 
| +  }, onError: (error) {
 | 
| +    controller1.signalError(error);
 | 
| +    controller2.signalError(error);
 | 
| +  }, onDone: () {
 | 
| +    controller1.close();
 | 
| +    controller2.close();
 | 
| +  });
 | 
| +  return new Pair<Stream, Stream>(controller1.stream, controller2.stream);
 | 
| +}
 | 
| +
 | 
| +/// A pair of values.
 | 
| +class Pair<E, F> {
 | 
| +  E first;
 | 
| +  F last;
 | 
| +
 | 
| +  Pair(this.first, this.last);
 | 
| +
 | 
| +  String toString() => '($first, $last)';
 | 
| +
 | 
| +  bool operator==(other) {
 | 
| +    if (other is! Pair) return false;
 | 
| +    return other.first == first && other.last == last;
 | 
| +  }
 | 
| +
 | 
| +  int get hashCode => first.hashCode ^ last.hashCode;
 | 
| +}
 | 
| +
 | 
| +/// Configures [future] so that its result (success or exception) is passed on
 | 
| +/// to [completer].
 | 
| +void chainToCompleter(Future future, Completer completer) {
 | 
| +  future.then((v) => completer.complete(v)).catchError((e) {
 | 
| +    completer.completeError(e.error, e.stackTrace);
 | 
| +  });
 | 
| +}
 | 
| +
 | 
|  // TOOD(nweiz): Get rid of this once https://codereview.chromium.org/11293132/
 | 
|  // is in.
 | 
|  /// Runs [fn] for each element in [input] in order, moving to the next element
 | 
| 
 |