Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(60)

Unified Diff: pkg/http/lib/src/utils.dart

Issue 11825010: Update pkg/http to use the new async APIs. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Code review changes Created 7 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « pkg/http/lib/src/streamed_response.dart ('k') | pkg/http/test/client_test.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: pkg/http/lib/src/utils.dart
diff --git a/pkg/http/lib/src/utils.dart b/pkg/http/lib/src/utils.dart
index 0a82dfd1225b5ef17b632defc5557ddc1a68cac0..28e2bfbec3141a0f8a7e42fef898ff92b49d8738 100644
--- a/pkg/http/lib/src/utils.dart
+++ b/pkg/http/lib/src/utils.dart
@@ -11,6 +11,8 @@ import 'dart:scalarlist';
import 'dart:uri';
import 'dart:utf';
+import 'byte_stream.dart';
+
/// Converts a URL query string (or `application/x-www-form-urlencoded` body)
/// into a [Map] from parameter names to values.
///
@@ -131,49 +133,157 @@ Uint8List toUint8List(List<int> input) {
return output;
}
-/// Buffers all input from an InputStream and returns it as a future.
-Future<List<int>> consumeInputStream(InputStream stream) {
- if (stream.closed) return new Future<List<int>>.immediate(<int>[]);
+/// If [stream] is already a [ByteStream], returns it. Otherwise, wraps it in a
+/// [ByteStream].
+ByteStream toByteStream(Stream<List<int>> stream) {
+ if (stream is ByteStream) return stream;
+ return new ByteStream(stream);
+}
- var completer = new Completer<List<int>>();
- /// TODO(nweiz): use BufferList when issue 6409 is fixed
- var buffer = <int>[];
- stream.onClosed = () => completer.complete(buffer);
- stream.onData = () => buffer.addAll(stream.read());
- stream.onError = completer.completeError;
- return completer.future;
+/// Calls [onDone] once [stream] (a single-subscription [Stream]) is finished.
+/// The return value, also a single-subscription [Stream] should be used in
+/// place of [stream] after calling this method.
+Stream onDone(Stream stream, void onDone()) {
+ var pair = tee(stream);
+ pair.first.listen((_) {}, onError: (_) {}, onDone: onDone);
+ return pair.last;
}
-/// Takes all input from [source] and writes it to [sink], then closes [sink].
-/// Returns a [Future] that completes when [source] is exhausted.
-void pipeInputToInput(InputStream source, ListInputStream sink) {
- source.onClosed = sink.markEndOfStream;
- source.onData = () => sink.write(source.read());
- // TODO(nweiz): propagate source errors to the sink. See issue 3657.
- // TODO(nweiz): we need to use async here to avoid issue 4974.
- source.onError = (e) => async.then((_) {
- throw e;
- });
+// TODO(nweiz): remove this once issue 7785 is fixed.
+/// Wraps [stream] in a single-subscription [ByteStream] that emits the same
+/// data.
+ByteStream wrapInputStream(InputStream stream) {
+ if (stream.closed) return emptyStream;
+
+ var controller = new StreamController.singleSubscription();
+ stream.onClosed = controller.close;
+ stream.onData = () => controller.add(stream.read());
+ stream.onError = (e) => controller.signalError(new AsyncError(e));
+ return new ByteStream(controller);
+}
+
+// TODO(nweiz): remove this once issue 7785 is fixed.
+/// Wraps [stream] in a [StreamConsumer] so that [Stream]s can by piped into it
+/// using [Stream.pipe].
+StreamConsumer<List<int>, dynamic> wrapOutputStream(OutputStream stream) =>
+ new _OutputStreamConsumer(stream);
+
+/// A [StreamConsumer] that pipes data into an [OutputStream].
+class _OutputStreamConsumer implements StreamConsumer<List<int>, dynamic> {
+ final OutputStream _outputStream;
+
+ _OutputStreamConsumer(this._outputStream)
+ : super();
+
+ Future consume(Stream<List<int>> stream) {
+ // TODO(nweiz): we have to manually keep track of whether or not the
+ // completer has completed since the output stream could signal an error
+ // after close() has been called but before it has shut down internally. See
+ // the following TODO.
+ var completed = false;
+ var completer = new Completer();
+ stream.listen((data) => _outputStream.write(data), onDone: () {
+ _outputStream.close();
+ // TODO(nweiz): wait until _outputStream.onClosed is called once issue
+ // 7761 is fixed.
+ if (!completed) completer.complete();
+ completed = true;
+ });
+
+ _outputStream.onError = (e) {
+ if (!completed) completer.completeError(e);
+ completed = true;
+ };
+
+ return completer.future;
+ }
}
-/// Takes all input from [source] and writes it to [sink], but does not close
-/// [sink] when [source] is closed. Returns a [Future] that completes when
-/// [source] is closed.
-Future writeInputToInput(InputStream source, ListInputStream sink) {
+// TODO(nweiz): remove this when issue 7786 is fixed.
+/// Pipes all data and errors from [stream] into [sink]. When [stream] is done,
+/// [sink] is closed and the returned [Future] is completed.
+Future store(Stream stream, StreamSink sink) {
var completer = new Completer();
- source.onClosed = () => completer.complete(null);
- source.onData = () => sink.write(source.read());
- // TODO(nweiz): propagate source errors to the sink. See issue 3657.
+ stream.listen(sink.add,
+ onError: sink.signalError,
+ onDone: () {
+ sink.close();
+ completer.complete();
+ });
return completer.future;
}
-/// Returns a [Future] that asynchronously completes to `null`.
-Future get async {
+/// Pipes all data and errors from [stream] into [sink]. Completes [Future] once
+/// [stream] is done. Unlike [store], [sink] remains open after [stream] is
+/// done.
+Future writeStreamToSink(Stream stream, StreamSink sink) {
var completer = new Completer();
- new Timer(0, (_) => completer.complete(null));
+ stream.listen(sink.add,
+ onError: sink.signalError,
+ onDone: () => completer.complete());
return completer.future;
}
+/// Returns a [Future] that asynchronously completes to `null`.
+Future get async => new Future.immediate(null);
+
+/// Returns a closed [Stream] with no elements.
+Stream get emptyStream => streamFromIterable([]);
+
+/// Creates a single-subscription stream that emits the items in [iter] and then
+/// ends.
+Stream streamFromIterable(Iterable iter) {
+ var stream = new StreamController.singleSubscription();
+ iter.forEach(stream.add);
+ stream.close();
+ return stream.stream;
+}
+
+// TODO(nweiz): remove this when issue 7787 is fixed.
+/// Creates two single-subscription [Stream]s that each emit all values and
+/// errors from [stream]. This is useful if [stream] is single-subscription but
+/// multiple subscribers are necessary.
+Pair<Stream, Stream> tee(Stream stream) {
+ var controller1 = new StreamController.singleSubscription();
+ var controller2 = new StreamController.singleSubscription();
+ stream.listen((value) {
+ controller1.add(value);
+ controller2.add(value);
+ }, onError: (error) {
+ controller1.signalError(error);
+ controller2.signalError(error);
+ }, onDone: () {
+ controller1.close();
+ controller2.close();
+ });
+ return new Pair<Stream, Stream>(controller1.stream, controller2.stream);
+}
+
+/// A pair of values.
+class Pair<E, F> {
+ E first;
+ F last;
+
+ Pair(this.first, this.last);
+
+ String toString() => '($first, $last)';
+
+ bool operator==(other) {
+ if (other is! Pair) return false;
+ return other.first == first && other.last == last;
+ }
+
+ int get hashCode => first.hashCode ^ last.hashCode;
+}
+
+/// Configures [future] so that its result (success or exception) is passed on
+/// to [completer].
+void chainToCompleter(Future future, Completer completer) {
+ future.then((v) => completer.complete(v)).catchError((e) {
+ completer.completeError(e.error, e.stackTrace);
+ });
+}
+
// TOOD(nweiz): Get rid of this once https://codereview.chromium.org/11293132/
// is in.
/// Runs [fn] for each element in [input] in order, moving to the next element
« no previous file with comments | « pkg/http/lib/src/streamed_response.dart ('k') | pkg/http/test/client_test.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698