| Index: pkg/scheduled_test/lib/src/utils.dart
|
| diff --git a/pkg/scheduled_test/lib/src/utils.dart b/pkg/scheduled_test/lib/src/utils.dart
|
| index b880a2d4832b0200279b5f0eace3c33585bdabfe..f24cc82831b5c826b2e8a43cfad07a5a57775435 100644
|
| --- a/pkg/scheduled_test/lib/src/utils.dart
|
| +++ b/pkg/scheduled_test/lib/src/utils.dart
|
| @@ -6,11 +6,28 @@ library utils;
|
|
|
| import 'dart:async';
|
|
|
| +/// A pair of values.
|
| +class Pair<E, F> {
|
| + E first;
|
| + F last;
|
| +
|
| + Pair(this.first, this.last);
|
| +
|
| + String toString() => '($first, $last)';
|
| +
|
| + bool operator==(other) {
|
| + if (other is! Pair) return false;
|
| + return other.first == first && other.last == last;
|
| + }
|
| +
|
| + int get hashCode => first.hashCode ^ last.hashCode;
|
| +}
|
| +
|
| /// Configures [future] so that its result (success or exception) is passed on
|
| /// to [completer].
|
| void chainToCompleter(Future future, Completer completer) {
|
| future.then((value) => completer.complete(value),
|
| - onError: (e) => completer.completeError(e.error, e.stackTrace));
|
| + onError: (e) => completer.completeError(e));
|
| }
|
|
|
| /// Prepends each line in [text] with [prefix].
|
| @@ -60,3 +77,86 @@ Stream futureStream(Future<Stream> future) {
|
| });
|
| return controller.stream;
|
| }
|
| +
|
| +// TODO(nweiz): remove this when issue 7964 is fixed.
|
| +/// Returns a [Future] that will complete to the first element of [stream].
|
| +/// Unlike [Stream.first], this is safe to use with single-subscription streams.
|
| +Future streamFirst(Stream stream) {
|
| + // TODO(nweiz): remove this when issue 8512 is fixed.
|
| + var cancelled = false;
|
| + var completer = new Completer();
|
| + var subscription;
|
| + subscription = stream.listen((value) {
|
| + if (!cancelled) {
|
| + cancelled = true;
|
| + subscription.cancel();
|
| + completer.complete(value);
|
| + }
|
| + }, onError: (e) {
|
| + if (!cancelled) {
|
| + completer.completeError(e.error, e.stackTrace);
|
| + }
|
| + }, onDone: () {
|
| + if (!cancelled) {
|
| + completer.completeError(new StateError("No elements"));
|
| + }
|
| + }, unsubscribeOnError: true);
|
| + return completer.future;
|
| +}
|
| +
|
| +/// Returns a wrapped version of [stream] along with a [StreamSubscription] that
|
| +/// can be used to control the wrapped stream.
|
| +Pair<Stream, StreamSubscription> streamWithSubscription(Stream stream) {
|
| + var controller = stream.isBroadcast ?
|
| + new StreamController.broadcast() :
|
| + new StreamController();
|
| + var subscription = stream.listen(controller.add,
|
| + onError: controller.signalError,
|
| + onDone: controller.close);
|
| + return new Pair<Stream, StreamSubscription>(controller.stream, subscription);
|
| +}
|
| +
|
| +// TODO(nweiz): remove this when issue 7787 is fixed.
|
| +/// Creates two single-subscription [Stream]s that each emit all values and
|
| +/// errors from [stream]. This is useful if [stream] is single-subscription but
|
| +/// multiple subscribers are necessary.
|
| +Pair<Stream, Stream> tee(Stream stream) {
|
| + var controller1 = new StreamController();
|
| + var controller2 = new StreamController();
|
| + stream.listen((value) {
|
| + controller1.add(value);
|
| + controller2.add(value);
|
| + }, onError: (error) {
|
| + controller1.signalError(error);
|
| + controller2.signalError(error);
|
| + }, onDone: () {
|
| + controller1.close();
|
| + controller2.close();
|
| + });
|
| + return new Pair<Stream, Stream>(controller1.stream, controller2.stream);
|
| +}
|
| +
|
| +/// Takes a simple data structure (composed of [Map]s, [Iterable]s, scalar
|
| +/// objects, and [Future]s) and recursively resolves all the [Future]s contained
|
| +/// within. Completes with the fully resolved structure.
|
| +Future awaitObject(object) {
|
| + // Unroll nested futures.
|
| + if (object is Future) return object.then(awaitObject);
|
| + if (object is Iterable) {
|
| + return Future.wait(object.map(awaitObject).toList());
|
| + }
|
| + if (object is! Map) return new Future.immediate(object);
|
| +
|
| + var pairs = <Future<Pair>>[];
|
| + object.forEach((key, value) {
|
| + pairs.add(awaitObject(value)
|
| + .then((resolved) => new Pair(key, resolved)));
|
| + });
|
| + return Future.wait(pairs).then((resolvedPairs) {
|
| + var map = {};
|
| + for (var pair in resolvedPairs) {
|
| + map[pair.first] = pair.last;
|
| + }
|
| + return map;
|
| + });
|
| +}
|
|
|