Index: utils/pub/utils.dart |
diff --git a/utils/pub/utils.dart b/utils/pub/utils.dart |
index 4311c0946e59e13fb89b344e2b7158f95e7bde33..7e91b43801528ccd11ba72d2695f1b420b1efd0b 100644 |
--- a/utils/pub/utils.dart |
+++ b/utils/pub/utils.dart |
@@ -118,6 +118,83 @@ void chainToCompleter(Future future, Completer completer) { |
onError: (e) => completer.completeError(e.error, e.stackTrace)); |
} |
+// TODO(nweiz): remove this when issue 7964 is fixed. |
+/// Returns a [Future] that will complete to the first element of [stream]. |
+/// Unlike [Stream.first], this is safe to use with single-subscription streams. |
+Future streamFirst(Stream stream) { |
+ var completer = new Completer(); |
+ var subscription; |
+ subscription = stream.listen((value) { |
+ subscription.cancel(); |
+ completer.complete(value); |
+ }, |
+ onError: (e) => completer.completeError(e.error, e.stackTrace), |
+ onDone: () => completer.completeError(new StateError("No elements")), |
+ unsubscribeOnError: true); |
+ return completer.future; |
+} |
+ |
+/// Returns a wrapped version of [stream] along with a [StreamSubscription] that |
+/// can be used to control the wrapped stream. |
+Pair<Stream, StreamSubscription> streamWithSubscription(Stream stream) { |
+ var controller = stream.isSingleSubscription ? |
+ new StreamController() : |
+ new StreamController.multiSubscription(); |
+ var subscription = stream.listen(controller.add, |
+ onError: controller.signalError, |
+ onDone: controller.close); |
+ return new Pair<Stream, StreamSubscription>(controller.stream, subscription); |
+} |
+ |
+// TODO(nweiz): remove this when issue 7787 is fixed. |
+/// Creates two single-subscription [Stream]s that each emit all values and |
+/// errors from [stream]. This is useful if [stream] is single-subscription but |
+/// multiple subscribers are necessary. |
+Pair<Stream, Stream> tee(Stream stream) { |
+ var controller1 = new StreamController(); |
+ var controller2 = new StreamController(); |
+ stream.listen((value) { |
+ controller1.add(value); |
+ controller2.add(value); |
+ }, onError: (error) { |
+ controller1.signalError(error); |
+ controller2.signalError(error); |
+ }, onDone: () { |
+ controller1.close(); |
+ controller2.close(); |
+ }); |
+ return new Pair<Stream, Stream>(controller1.stream, controller2.stream); |
+} |
+ |
+/// A regular expression matching a line termination character or character |
+/// sequence. |
+final RegExp _lineRegexp = new RegExp(r"\r\n|\r|\n"); |
+ |
+/// Converts a stream of arbitrarily chunked strings into a line-by-line stream. |
+/// The lines don't include line termination characters. A single trailing |
+/// newline is ignored. |
+Stream<String> streamToLines(Stream<String> stream) { |
+ var buffer = new StringBuffer(); |
+ return stream.transform(new StreamTransformer.from( |
+ onData: (chunk, sink) { |
+ var lines = chunk.split(_lineRegexp); |
+ var leftover = lines.removeLast(); |
+ for (var line in lines) { |
+ if (!buffer.isEmpty) { |
+ buffer.add(line); |
+ line = buffer.toString(); |
+ buffer.clear(); |
+ } |
+ |
+ sink.add(line); |
+ } |
+ buffer.add(leftover); |
+ }, onDone: (sink) { |
+ if (!buffer.isEmpty) sink.add(buffer.toString()); |
+ sink.close(); |
+ })); |
+} |
+ |
/// Like [Iterable.where], but allows [test] to return [Future]s and uses the |
/// results of those [Future]s as the test. |
Future<Iterable> futureWhere(Iterable iter, test(value)) { |