Chromium Code Reviews| Index: pkg/http/lib/src/utils.dart |
| diff --git a/pkg/http/lib/src/utils.dart b/pkg/http/lib/src/utils.dart |
| index 0a82dfd1225b5ef17b632defc5557ddc1a68cac0..3833b9f4a4dd06ca873c63ebd83b8be1853b8605 100644 |
| --- a/pkg/http/lib/src/utils.dart |
| +++ b/pkg/http/lib/src/utils.dart |
| @@ -11,6 +11,8 @@ import 'dart:scalarlist'; |
| import 'dart:uri'; |
| import 'dart:utf'; |
| +import 'byte_stream.dart'; |
| + |
| /// Converts a URL query string (or `application/x-www-form-urlencoded` body) |
| /// into a [Map] from parameter names to values. |
| /// |
| @@ -131,49 +133,155 @@ Uint8List toUint8List(List<int> input) { |
| return output; |
| } |
| -/// Buffers all input from an InputStream and returns it as a future. |
| -Future<List<int>> consumeInputStream(InputStream stream) { |
| - if (stream.closed) return new Future<List<int>>.immediate(<int>[]); |
| +/// If [stream] is already a [ByteStream], returns it. Otherwise, wraps it in a |
| +/// [ByteStream]. |
|
Bob Nystrom
2013/01/08 23:50:49
How about making this a (potentially factory) cons
nweiz
2013/01/09 00:52:11
It feels too similar to the default ByteStream con
|
| +ByteStream toByteStream(Stream<List<int>> stream) { |
| + if (stream is ByteStream) return stream; |
| + return new ByteStream(stream); |
| +} |
| - var completer = new Completer<List<int>>(); |
| - /// TODO(nweiz): use BufferList when issue 6409 is fixed |
| - var buffer = <int>[]; |
| - stream.onClosed = () => completer.complete(buffer); |
| - stream.onData = () => buffer.addAll(stream.read()); |
| - stream.onError = completer.completeError; |
| - return completer.future; |
| +/// Calls [onDone] once [stream] (a single-subscription [Stream]) is finished. |
| +/// The return value, also a single-subscription [Stream] should be used in |
| +/// place of [stream] after calling this method. |
| +Stream onDone(Stream stream, void onDone()) { |
| + var pair = tee(stream); |
| + pair.first.listen((_) {}, onError: (_) {}, onDone: onDone); |
| + return pair.last; |
| } |
| -/// Takes all input from [source] and writes it to [sink], then closes [sink]. |
| -/// Returns a [Future] that completes when [source] is exhausted. |
| -void pipeInputToInput(InputStream source, ListInputStream sink) { |
| - source.onClosed = sink.markEndOfStream; |
| - source.onData = () => sink.write(source.read()); |
| - // TODO(nweiz): propagate source errors to the sink. See issue 3657. |
| - // TODO(nweiz): we need to use async here to avoid issue 4974. |
| - source.onError = (e) => async.then((_) { |
| - throw e; |
| - }); |
| +/// Wraps [stream] in a single-subscription [ByteStream] that emits the same |
| +/// data. |
|
Bob Nystrom
2013/01/08 23:50:49
Is this just for compatibility with the existing d
nweiz
2013/01/09 00:52:11
Done.
|
| +ByteStream wrapInputStream(InputStream stream) { |
| + if (stream.closed) return emptyStream; |
| + |
| + var controller = new StreamController.singleSubscription(); |
| + stream.onClosed = controller.close; |
| + stream.onData = () => controller.add(stream.read()); |
| + stream.onError = (e) => controller.signalError(new AsyncError(e)); |
|
Bob Nystrom
2013/01/08 23:50:49
Having to manually create the AsyncError here is l
nweiz
2013/01/09 00:52:11
I'm not sure what a better API would be.
|
| + return new ByteStream(controller); |
| } |
| -/// Takes all input from [source] and writes it to [sink], but does not close |
| -/// [sink] when [source] is closed. Returns a [Future] that completes when |
| -/// [source] is closed. |
| -Future writeInputToInput(InputStream source, ListInputStream sink) { |
| +/// Wraps [stream] in a [StreamConsumer] so that [Stream]s can by piped into it |
| +/// using [Stream.pipe]. |
| +StreamConsumer<List<int>, dynamic> wrapOutputStream(OutputStream stream) => |
| + new _OutputStreamConsumer(stream); |
| + |
| +/// A [StreamConsumer] that pipes data into an [OutputStream]. |
| +class _OutputStreamConsumer implements StreamConsumer<List<int>, dynamic> { |
| + final OutputStream _outputStream; |
| + |
| + _OutputStreamConsumer(this._outputStream) |
| + : super(); |
| + |
| + Future consume(Stream<List<int>> stream) { |
| + // TODO(nweiz): we have to manually keep track of whether or not the |
| + // completer has completed since the output stream could signal an error |
| + // after close() has been called but before it has shut down internally. See |
| + // the following TODO. |
| + var completed = false; |
| + var completer = new Completer(); |
| + stream.listen((data) => _outputStream.write(data), onDone: () { |
| + _outputStream.close(); |
| + // TODO(nweiz): wait until _outputStream.onClosed is called once issue |
| + // 7761 is fixed. |
| + if (!completed) completer.complete(null); |
| + completed = true; |
| + }); |
| + |
| + _outputStream.onError = (e) { |
| + if (!completed) completer.completeError(e); |
| + completed = true; |
| + }; |
| + |
| + return completer.future; |
| + } |
| +} |
| + |
| +// TODO(nweiz): remove this when it's added to the Stream API. |
|
Bob Nystrom
2013/01/08 23:50:49
Is there a bug# for this?
nweiz
2013/01/09 00:52:11
Filed one.
|
| + |
| +/// Pipes all data and errors from [stream] into [sink]. When [stream] is done, |
| +/// [sink] is closed and the returned [Future] is completed. |
| +Future store(Stream stream, StreamSink sink) { |
| var completer = new Completer(); |
| - source.onClosed = () => completer.complete(null); |
| - source.onData = () => sink.write(source.read()); |
| - // TODO(nweiz): propagate source errors to the sink. See issue 3657. |
| + stream.listen(sink.add, |
| + onError: sink.signalError, |
| + onDone: () { |
| + sink.close(); |
| + completer.complete(null); |
|
Bob Nystrom
2013/01/08 23:50:49
The arg is optional here now. Just do:
completer.
nweiz
2013/01/09 00:52:11
Done.
|
| + }); |
| return completer.future; |
| } |
| -/// Returns a [Future] that asynchronously completes to `null`. |
| -Future get async { |
| +/// Pipes all data and errors from [stream] into [sink]. Completes [Future] once |
| +/// [stream] is done. Unlike [store], [sink] remains open after [stream] is |
| +/// done. |
| +Future writeStreamToSink(Stream stream, StreamSink sink) { |
| var completer = new Completer(); |
| - new Timer(0, (_) => completer.complete(null)); |
| + stream.listen(sink.add, |
| + onError: sink.signalError, |
| + onDone: () => completer.complete(null)); |
|
Bob Nystrom
2013/01/08 23:50:49
complete()
nweiz
2013/01/09 00:52:11
Done.
|
| return completer.future; |
| } |
| +/// Returns a [Future] that asynchronously completes to `null`. |
| +Future get async => new Future.immediate(null); |
| + |
| +/// Returns a closed [Stream] with no elements. |
| +Stream get emptyStream => streamFromIterable([]); |
| + |
| +/// Creates a single-subscription stream that emits the items in [iter] and then |
| +/// ends. |
| +Stream streamFromIterable(Iterable iter) { |
| + var stream = new StreamController.singleSubscription(); |
| + iter.forEach(stream.add); |
| + stream.close(); |
| + return stream.stream; |
| +} |
| + |
| +/// Creates two single-subscription [Stream]s that each emit all values and |
| +/// errors from [stream]. This is useful if [stream] is single-subscription but |
| +/// multiple subscribers are necessary. |
| +Pair<Stream, Stream> tee(Stream stream) { |
|
Bob Nystrom
2013/01/08 23:50:49
If single-subscriber streams are going to stick ar
nweiz
2013/01/09 00:52:11
Done.
|
| + var controller1 = new StreamController.singleSubscription(); |
| + var controller2 = new StreamController.singleSubscription(); |
| + stream.listen((value) { |
| + controller1.add(value); |
| + controller2.add(value); |
| + }, onError: (error) { |
| + controller1.signalError(error); |
| + controller2.signalError(error); |
| + }, onDone: () { |
| + controller1.close(); |
| + controller2.close(); |
| + }); |
| + return new Pair<Stream, Stream>(controller1.stream, controller2.stream); |
| +} |
| + |
| +/// A pair of values. |
| +class Pair<E, F> { |
| + E first; |
| + F last; |
| + |
| + Pair(this.first, this.last); |
| + |
| + String toString() => '($first, $last)'; |
| + |
| + bool operator==(other) { |
| + if (other is! Pair) return false; |
| + return other.first == first && other.last == last; |
| + } |
| + |
| + int get hashCode => first.hashCode ^ last.hashCode; |
| +} |
| + |
| +/// Configures [future] so that its result (success or exception) is passed on |
| +/// to [completer]. |
| +void chainToCompleter(Future future, Completer completer) { |
| + future.then((v) => completer.complete(v)).catchError((e) { |
| + completer.completeError(e.error, e.stackTrace); |
| + }); |
|
Bob Nystrom
2013/01/08 23:50:49
Use onError:
future.then((v) => completer.complet
nweiz
2013/01/09 00:52:11
Why? The documentation says (and I agree) that cat
Bob Nystrom
2013/01/09 03:00:43
<shrug>
I figured one method call with two args w
nweiz
2013/01/09 03:07:40
I vote chaining.
|
| +} |
| + |
| // TOOD(nweiz): Get rid of this once https://codereview.chromium.org/11293132/ |
| // is in. |
| /// Runs [fn] for each element in [input] in order, moving to the next element |