Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(20)

Unified Diff: utils/pub/utils.dart

Issue 12021008: Use the dart:async Stream API thoroughly in Pub. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Code review changes Created 7 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « utils/pub/log.dart ('k') | utils/tests/pub/curl_client_test.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: utils/pub/utils.dart
diff --git a/utils/pub/utils.dart b/utils/pub/utils.dart
index aac48fe0e305a7203f943955e5cd9619b3594c9e..d1cea825419a3f29e3014a3eb476eabc8a71733a 100644
--- a/utils/pub/utils.dart
+++ b/utils/pub/utils.dart
@@ -118,6 +118,83 @@ void chainToCompleter(Future future, Completer completer) {
onError: (e) => completer.completeError(e.error, e.stackTrace));
}
+// TODO(nweiz): remove this when issue 7964 is fixed.
+/// Returns a [Future] that will complete to the first element of [stream].
+/// Unlike [Stream.first], this is safe to use with single-subscription streams.
+Future streamFirst(Stream stream) {
+ var completer = new Completer();
+ var subscription;
+ subscription = stream.listen((value) {
+ subscription.cancel();
+ completer.complete(value);
+ },
+ onError: (e) => completer.completeError(e.error, e.stackTrace),
+ onDone: () => completer.completeError(new StateError("No elements")),
+ unsubscribeOnError: true);
+ return completer.future;
+}
+
+/// Returns a wrapped version of [stream] along with a [StreamSubscription] that
+/// can be used to control the wrapped stream.
+Pair<Stream, StreamSubscription> streamWithSubscription(Stream stream) {
+ var controller = stream.isSingleSubscription ?
+ new StreamController() :
+ new StreamController.multiSubscription();
+ var subscription = stream.listen(controller.add,
+ onError: controller.signalError,
+ onDone: controller.close);
+ return new Pair<Stream, StreamSubscription>(controller.stream, subscription);
+}
+
+// TODO(nweiz): remove this when issue 7787 is fixed.
+/// Creates two single-subscription [Stream]s that each emit all values and
+/// errors from [stream]. This is useful if [stream] is single-subscription but
+/// multiple subscribers are necessary.
+Pair<Stream, Stream> tee(Stream stream) {
+ var controller1 = new StreamController();
+ var controller2 = new StreamController();
+ stream.listen((value) {
+ controller1.add(value);
+ controller2.add(value);
+ }, onError: (error) {
+ controller1.signalError(error);
+ controller2.signalError(error);
+ }, onDone: () {
+ controller1.close();
+ controller2.close();
+ });
+ return new Pair<Stream, Stream>(controller1.stream, controller2.stream);
+}
+
+/// A regular expression matching a line termination character or character
+/// sequence.
+final RegExp _lineRegexp = new RegExp(r"\r\n|\r|\n");
+
+/// Converts a stream of arbitrarily chunked strings into a line-by-line stream.
+/// The lines don't include line termination characters. A single trailing
+/// newline is ignored.
+Stream<String> streamToLines(Stream<String> stream) {
+ var buffer = new StringBuffer();
+ return stream.transform(new StreamTransformer.from(
+ onData: (chunk, sink) {
+ var lines = chunk.split(_lineRegexp);
+ var leftover = lines.removeLast();
+ for (var line in lines) {
+ if (!buffer.isEmpty) {
+ buffer.add(line);
+ line = buffer.toString();
+ buffer.clear();
+ }
+
+ sink.add(line);
+ }
+ buffer.add(leftover);
+ }, onDone: (sink) {
+ if (!buffer.isEmpty) sink.add(buffer.toString());
+ sink.close();
+ }));
+}
+
// TODO(nweiz): unify the following functions with the utility functions in
// pkg/http.
« no previous file with comments | « utils/pub/log.dart ('k') | utils/tests/pub/curl_client_test.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698