Index: pkg/http/lib/src/utils.dart |
diff --git a/pkg/http/lib/src/utils.dart b/pkg/http/lib/src/utils.dart |
index 0a82dfd1225b5ef17b632defc5557ddc1a68cac0..28e2bfbec3141a0f8a7e42fef898ff92b49d8738 100644 |
--- a/pkg/http/lib/src/utils.dart |
+++ b/pkg/http/lib/src/utils.dart |
@@ -11,6 +11,8 @@ import 'dart:scalarlist'; |
import 'dart:uri'; |
import 'dart:utf'; |
+import 'byte_stream.dart'; |
+ |
/// Converts a URL query string (or `application/x-www-form-urlencoded` body) |
/// into a [Map] from parameter names to values. |
/// |
@@ -131,49 +133,157 @@ Uint8List toUint8List(List<int> input) { |
return output; |
} |
-/// Buffers all input from an InputStream and returns it as a future. |
-Future<List<int>> consumeInputStream(InputStream stream) { |
- if (stream.closed) return new Future<List<int>>.immediate(<int>[]); |
+/// If [stream] is already a [ByteStream], returns it. Otherwise, wraps it in a |
+/// [ByteStream]. |
+ByteStream toByteStream(Stream<List<int>> stream) { |
+ if (stream is ByteStream) return stream; |
+ return new ByteStream(stream); |
+} |
- var completer = new Completer<List<int>>(); |
- /// TODO(nweiz): use BufferList when issue 6409 is fixed |
- var buffer = <int>[]; |
- stream.onClosed = () => completer.complete(buffer); |
- stream.onData = () => buffer.addAll(stream.read()); |
- stream.onError = completer.completeError; |
- return completer.future; |
+/// Calls [onDone] once [stream] (a single-subscription [Stream]) is finished. |
+/// The return value, also a single-subscription [Stream] should be used in |
+/// place of [stream] after calling this method. |
+Stream onDone(Stream stream, void onDone()) { |
+ var pair = tee(stream); |
+ pair.first.listen((_) {}, onError: (_) {}, onDone: onDone); |
+ return pair.last; |
} |
-/// Takes all input from [source] and writes it to [sink], then closes [sink]. |
-/// Returns a [Future] that completes when [source] is exhausted. |
-void pipeInputToInput(InputStream source, ListInputStream sink) { |
- source.onClosed = sink.markEndOfStream; |
- source.onData = () => sink.write(source.read()); |
- // TODO(nweiz): propagate source errors to the sink. See issue 3657. |
- // TODO(nweiz): we need to use async here to avoid issue 4974. |
- source.onError = (e) => async.then((_) { |
- throw e; |
- }); |
+// TODO(nweiz): remove this once issue 7785 is fixed. |
+/// Wraps [stream] in a single-subscription [ByteStream] that emits the same |
+/// data. |
+ByteStream wrapInputStream(InputStream stream) { |
+ if (stream.closed) return emptyStream; |
+ |
+ var controller = new StreamController.singleSubscription(); |
+ stream.onClosed = controller.close; |
+ stream.onData = () => controller.add(stream.read()); |
+ stream.onError = (e) => controller.signalError(new AsyncError(e)); |
+ return new ByteStream(controller); |
+} |
+ |
+// TODO(nweiz): remove this once issue 7785 is fixed. |
+/// Wraps [stream] in a [StreamConsumer] so that [Stream]s can by piped into it |
+/// using [Stream.pipe]. |
+StreamConsumer<List<int>, dynamic> wrapOutputStream(OutputStream stream) => |
+ new _OutputStreamConsumer(stream); |
+ |
+/// A [StreamConsumer] that pipes data into an [OutputStream]. |
+class _OutputStreamConsumer implements StreamConsumer<List<int>, dynamic> { |
+ final OutputStream _outputStream; |
+ |
+ _OutputStreamConsumer(this._outputStream) |
+ : super(); |
+ |
+ Future consume(Stream<List<int>> stream) { |
+ // TODO(nweiz): we have to manually keep track of whether or not the |
+ // completer has completed since the output stream could signal an error |
+ // after close() has been called but before it has shut down internally. See |
+ // the following TODO. |
+ var completed = false; |
+ var completer = new Completer(); |
+ stream.listen((data) => _outputStream.write(data), onDone: () { |
+ _outputStream.close(); |
+ // TODO(nweiz): wait until _outputStream.onClosed is called once issue |
+ // 7761 is fixed. |
+ if (!completed) completer.complete(); |
+ completed = true; |
+ }); |
+ |
+ _outputStream.onError = (e) { |
+ if (!completed) completer.completeError(e); |
+ completed = true; |
+ }; |
+ |
+ return completer.future; |
+ } |
} |
-/// Takes all input from [source] and writes it to [sink], but does not close |
-/// [sink] when [source] is closed. Returns a [Future] that completes when |
-/// [source] is closed. |
-Future writeInputToInput(InputStream source, ListInputStream sink) { |
+// TODO(nweiz): remove this when issue 7786 is fixed. |
+/// Pipes all data and errors from [stream] into [sink]. When [stream] is done, |
+/// [sink] is closed and the returned [Future] is completed. |
+Future store(Stream stream, StreamSink sink) { |
var completer = new Completer(); |
- source.onClosed = () => completer.complete(null); |
- source.onData = () => sink.write(source.read()); |
- // TODO(nweiz): propagate source errors to the sink. See issue 3657. |
+ stream.listen(sink.add, |
+ onError: sink.signalError, |
+ onDone: () { |
+ sink.close(); |
+ completer.complete(); |
+ }); |
return completer.future; |
} |
-/// Returns a [Future] that asynchronously completes to `null`. |
-Future get async { |
+/// Pipes all data and errors from [stream] into [sink]. Completes [Future] once |
+/// [stream] is done. Unlike [store], [sink] remains open after [stream] is |
+/// done. |
+Future writeStreamToSink(Stream stream, StreamSink sink) { |
var completer = new Completer(); |
- new Timer(0, (_) => completer.complete(null)); |
+ stream.listen(sink.add, |
+ onError: sink.signalError, |
+ onDone: () => completer.complete()); |
return completer.future; |
} |
+/// Returns a [Future] that asynchronously completes to `null`. |
+Future get async => new Future.immediate(null); |
+ |
+/// Returns a closed [Stream] with no elements. |
+Stream get emptyStream => streamFromIterable([]); |
+ |
+/// Creates a single-subscription stream that emits the items in [iter] and then |
+/// ends. |
+Stream streamFromIterable(Iterable iter) { |
+ var stream = new StreamController.singleSubscription(); |
+ iter.forEach(stream.add); |
+ stream.close(); |
+ return stream.stream; |
+} |
+ |
+// TODO(nweiz): remove this when issue 7787 is fixed. |
+/// Creates two single-subscription [Stream]s that each emit all values and |
+/// errors from [stream]. This is useful if [stream] is single-subscription but |
+/// multiple subscribers are necessary. |
+Pair<Stream, Stream> tee(Stream stream) { |
+ var controller1 = new StreamController.singleSubscription(); |
+ var controller2 = new StreamController.singleSubscription(); |
+ stream.listen((value) { |
+ controller1.add(value); |
+ controller2.add(value); |
+ }, onError: (error) { |
+ controller1.signalError(error); |
+ controller2.signalError(error); |
+ }, onDone: () { |
+ controller1.close(); |
+ controller2.close(); |
+ }); |
+ return new Pair<Stream, Stream>(controller1.stream, controller2.stream); |
+} |
+ |
+/// A pair of values. |
+class Pair<E, F> { |
+ E first; |
+ F last; |
+ |
+ Pair(this.first, this.last); |
+ |
+ String toString() => '($first, $last)'; |
+ |
+ bool operator==(other) { |
+ if (other is! Pair) return false; |
+ return other.first == first && other.last == last; |
+ } |
+ |
+ int get hashCode => first.hashCode ^ last.hashCode; |
+} |
+ |
+/// Configures [future] so that its result (success or exception) is passed on |
+/// to [completer]. |
+void chainToCompleter(Future future, Completer completer) { |
+ future.then((v) => completer.complete(v)).catchError((e) { |
+ completer.completeError(e.error, e.stackTrace); |
+ }); |
+} |
+ |
// TOOD(nweiz): Get rid of this once https://codereview.chromium.org/11293132/ |
// is in. |
/// Runs [fn] for each element in [input] in order, moving to the next element |