| Index: utils/pub/utils.dart
|
| diff --git a/utils/pub/utils.dart b/utils/pub/utils.dart
|
| index 4311c0946e59e13fb89b344e2b7158f95e7bde33..7e91b43801528ccd11ba72d2695f1b420b1efd0b 100644
|
| --- a/utils/pub/utils.dart
|
| +++ b/utils/pub/utils.dart
|
| @@ -118,6 +118,83 @@ void chainToCompleter(Future future, Completer completer) {
|
| onError: (e) => completer.completeError(e.error, e.stackTrace));
|
| }
|
|
|
| +// TODO(nweiz): remove this when issue 7964 is fixed.
|
| +/// Returns a [Future] that will complete to the first element of [stream].
|
| +/// Unlike [Stream.first], this is safe to use with single-subscription streams.
|
| +Future streamFirst(Stream stream) {
|
| + var completer = new Completer();
|
| + var subscription;
|
| + subscription = stream.listen((value) {
|
| + subscription.cancel();
|
| + completer.complete(value);
|
| + },
|
| + onError: (e) => completer.completeError(e.error, e.stackTrace),
|
| + onDone: () => completer.completeError(new StateError("No elements")),
|
| + unsubscribeOnError: true);
|
| + return completer.future;
|
| +}
|
| +
|
| +/// Returns a wrapped version of [stream] along with a [StreamSubscription] that
|
| +/// can be used to control the wrapped stream.
|
| +Pair<Stream, StreamSubscription> streamWithSubscription(Stream stream) {
|
| + var controller = stream.isSingleSubscription ?
|
| + new StreamController() :
|
| + new StreamController.multiSubscription();
|
| + var subscription = stream.listen(controller.add,
|
| + onError: controller.signalError,
|
| + onDone: controller.close);
|
| + return new Pair<Stream, StreamSubscription>(controller.stream, subscription);
|
| +}
|
| +
|
| +// TODO(nweiz): remove this when issue 7787 is fixed.
|
| +/// Creates two single-subscription [Stream]s that each emit all values and
|
| +/// errors from [stream]. This is useful if [stream] is single-subscription but
|
| +/// multiple subscribers are necessary.
|
| +Pair<Stream, Stream> tee(Stream stream) {
|
| + var controller1 = new StreamController();
|
| + var controller2 = new StreamController();
|
| + stream.listen((value) {
|
| + controller1.add(value);
|
| + controller2.add(value);
|
| + }, onError: (error) {
|
| + controller1.signalError(error);
|
| + controller2.signalError(error);
|
| + }, onDone: () {
|
| + controller1.close();
|
| + controller2.close();
|
| + });
|
| + return new Pair<Stream, Stream>(controller1.stream, controller2.stream);
|
| +}
|
| +
|
| +/// A regular expression matching a line termination character or character
|
| +/// sequence.
|
| +final RegExp _lineRegexp = new RegExp(r"\r\n|\r|\n");
|
| +
|
| +/// Converts a stream of arbitrarily chunked strings into a line-by-line stream.
|
| +/// The lines don't include line termination characters. A single trailing
|
| +/// newline is ignored.
|
| +Stream<String> streamToLines(Stream<String> stream) {
|
| + var buffer = new StringBuffer();
|
| + return stream.transform(new StreamTransformer.from(
|
| + onData: (chunk, sink) {
|
| + var lines = chunk.split(_lineRegexp);
|
| + var leftover = lines.removeLast();
|
| + for (var line in lines) {
|
| + if (!buffer.isEmpty) {
|
| + buffer.add(line);
|
| + line = buffer.toString();
|
| + buffer.clear();
|
| + }
|
| +
|
| + sink.add(line);
|
| + }
|
| + buffer.add(leftover);
|
| + }, onDone: (sink) {
|
| + if (!buffer.isEmpty) sink.add(buffer.toString());
|
| + sink.close();
|
| + }));
|
| +}
|
| +
|
| /// Like [Iterable.where], but allows [test] to return [Future]s and uses the
|
| /// results of those [Future]s as the test.
|
| Future<Iterable> futureWhere(Iterable iter, test(value)) {
|
|
|