Chromium Code Reviews| Index: pkg/scheduled_test/lib/src/utils.dart |
| diff --git a/pkg/scheduled_test/lib/src/utils.dart b/pkg/scheduled_test/lib/src/utils.dart |
| index b880a2d4832b0200279b5f0eace3c33585bdabfe..8a36ab15dc6f829c3e7654dff476953d8b47dae3 100644 |
| --- a/pkg/scheduled_test/lib/src/utils.dart |
| +++ b/pkg/scheduled_test/lib/src/utils.dart |
| @@ -6,11 +6,28 @@ library utils; |
| import 'dart:async'; |
| +/// A pair of values. |
| +class Pair<E, F> { |
| + E first; |
| + F last; |
| + |
| + Pair(this.first, this.last); |
| + |
| + String toString() => '($first, $last)'; |
| + |
| + bool operator==(other) { |
| + if (other is! Pair) return false; |
| + return other.first == first && other.last == last; |
| + } |
| + |
| + int get hashCode => first.hashCode ^ last.hashCode; |
| +} |
| + |
| /// Configures [future] so that its result (success or exception) is passed on |
| /// to [completer]. |
| void chainToCompleter(Future future, Completer completer) { |
| future.then((value) => completer.complete(value), |
| - onError: (e) => completer.completeError(e.error, e.stackTrace)); |
| + onError: (e) => completer.completeError(e)); |
| } |
| /// Prepends each line in [text] with [prefix]. |
| @@ -60,3 +77,86 @@ Stream futureStream(Future<Stream> future) { |
| }); |
| return controller.stream; |
| } |
| + |
| +// TODO(nweiz): remove this when issue 7964 is fixed. |
| +/// Returns a [Future] that will complete to the first element of [stream]. |
| +/// Unlike [Stream.first], this is safe to use with single-subscription streams. |
| +Future streamFirst(Stream stream) { |
| + // TODO(nweiz): remove this when issue 8512 is fixed. |
| + var cancelled = false; |
| + var completer = new Completer(); |
| + var subscription; |
| + subscription = stream.listen((value) { |
| + if (!cancelled) { |
| + cancelled = true; |
| + subscription.cancel(); |
| + completer.complete(value); |
| + } |
| + }, onError: (e) { |
| + if (!cancelled) { |
| + completer.completeError(e.error, e.stackTrace); |
| + } |
| + }, onDone: () { |
| + if (!cancelled) { |
| + completer.completeError(new StateError("No elements")); |
| + } |
| + }, unsubscribeOnError: true); |
| + return completer.future; |
| +} |
| + |
| +/// Returns a wrapped version of [stream] along with a [StreamSubscription] that |
| +/// can be used to control the wrapped stream. |
| +Pair<Stream, StreamSubscription> streamWithSubscription(Stream stream) { |
| + var controller = stream.isBroadcast ? |
| + new StreamController.broadcast() : |
| + new StreamController(); |
| + var subscription = stream.listen(controller.add, |
| + onError: controller.signalError, |
| + onDone: controller.close); |
| + return new Pair<Stream, StreamSubscription>(controller.stream, subscription); |
| +} |
| + |
| +// TODO(nweiz): remove this when issue 7787 is fixed. |
| +/// Creates two single-subscription [Stream]s that each emit all values and |
| +/// errors from [stream]. This is useful if [stream] is single-subscription but |
| +/// multiple subscribers are necessary. |
| +Pair<Stream, Stream> tee(Stream stream) { |
| + var controller1 = new StreamController(); |
| + var controller2 = new StreamController(); |
| + stream.listen((value) { |
| + controller1.add(value); |
| + controller2.add(value); |
| + }, onError: (error) { |
| + controller1.signalError(error); |
| + controller2.signalError(error); |
| + }, onDone: () { |
| + controller1.close(); |
| + controller2.close(); |
| + }); |
| + return new Pair<Stream, Stream>(controller1.stream, controller2.stream); |
| +} |
| + |
| +/// Takes a simple data structure (composed of [Map]s, [List]s, scalar objects, |
| +/// and [Future]s) and recursively resolves all the [Future]s contained within. |
| +/// Completes with the fully resolved structure. |
| +Future awaitObject(object) { |
| + // Unroll nested futures. |
| + if (object is Future) return object.then(awaitObject); |
| + if (object is Collection) { |
|
Bob Nystrom
2013/03/04 23:52:00
Iterable?
nweiz
2013/03/05 02:16:09
Done.
|
| + return Future.wait(object.map(awaitObject).toList()); |
| + } |
| + if (object is! Map) return new Future.immediate(object); |
| + |
| + var pairs = <Future<Pair>>[]; |
| + object.forEach((key, value) { |
| + pairs.add(awaitObject(value) |
| + .then((resolved) => new Pair(key, resolved))); |
| + }); |
| + return Future.wait(pairs).then((resolvedPairs) { |
| + var map = {}; |
| + for (var pair in resolvedPairs) { |
| + map[pair.first] = pair.last; |
| + } |
| + return map; |
| + }); |
| +} |