Index: pkg/scheduled_test/lib/src/utils.dart |
diff --git a/pkg/scheduled_test/lib/src/utils.dart b/pkg/scheduled_test/lib/src/utils.dart |
index b880a2d4832b0200279b5f0eace3c33585bdabfe..8a36ab15dc6f829c3e7654dff476953d8b47dae3 100644 |
--- a/pkg/scheduled_test/lib/src/utils.dart |
+++ b/pkg/scheduled_test/lib/src/utils.dart |
@@ -6,11 +6,28 @@ library utils; |
import 'dart:async'; |
+/// A pair of values. |
+class Pair<E, F> { |
+ E first; |
+ F last; |
+ |
+ Pair(this.first, this.last); |
+ |
+ String toString() => '($first, $last)'; |
+ |
+ bool operator==(other) { |
+ if (other is! Pair) return false; |
+ return other.first == first && other.last == last; |
+ } |
+ |
+ int get hashCode => first.hashCode ^ last.hashCode; |
+} |
+ |
/// Configures [future] so that its result (success or exception) is passed on |
/// to [completer]. |
void chainToCompleter(Future future, Completer completer) { |
future.then((value) => completer.complete(value), |
- onError: (e) => completer.completeError(e.error, e.stackTrace)); |
+ onError: (e) => completer.completeError(e)); |
} |
/// Prepends each line in [text] with [prefix]. |
@@ -60,3 +77,86 @@ Stream futureStream(Future<Stream> future) { |
}); |
return controller.stream; |
} |
+ |
+// TODO(nweiz): remove this when issue 7964 is fixed. |
+/// Returns a [Future] that will complete to the first element of [stream]. |
+/// Unlike [Stream.first], this is safe to use with single-subscription streams. |
+Future streamFirst(Stream stream) { |
+ // TODO(nweiz): remove this when issue 8512 is fixed. |
+ var cancelled = false; |
+ var completer = new Completer(); |
+ var subscription; |
+ subscription = stream.listen((value) { |
+ if (!cancelled) { |
+ cancelled = true; |
+ subscription.cancel(); |
+ completer.complete(value); |
+ } |
+ }, onError: (e) { |
+ if (!cancelled) { |
+ completer.completeError(e.error, e.stackTrace); |
+ } |
+ }, onDone: () { |
+ if (!cancelled) { |
+ completer.completeError(new StateError("No elements")); |
+ } |
+ }, unsubscribeOnError: true); |
+ return completer.future; |
+} |
+ |
+/// Returns a wrapped version of [stream] along with a [StreamSubscription] that |
+/// can be used to control the wrapped stream. |
+Pair<Stream, StreamSubscription> streamWithSubscription(Stream stream) { |
+ var controller = stream.isBroadcast ? |
+ new StreamController.broadcast() : |
+ new StreamController(); |
+ var subscription = stream.listen(controller.add, |
+ onError: controller.signalError, |
+ onDone: controller.close); |
+ return new Pair<Stream, StreamSubscription>(controller.stream, subscription); |
+} |
+ |
+// TODO(nweiz): remove this when issue 7787 is fixed. |
+/// Creates two single-subscription [Stream]s that each emit all values and |
+/// errors from [stream]. This is useful if [stream] is single-subscription but |
+/// multiple subscribers are necessary. |
+Pair<Stream, Stream> tee(Stream stream) { |
+ var controller1 = new StreamController(); |
+ var controller2 = new StreamController(); |
+ stream.listen((value) { |
+ controller1.add(value); |
+ controller2.add(value); |
+ }, onError: (error) { |
+ controller1.signalError(error); |
+ controller2.signalError(error); |
+ }, onDone: () { |
+ controller1.close(); |
+ controller2.close(); |
+ }); |
+ return new Pair<Stream, Stream>(controller1.stream, controller2.stream); |
+} |
+ |
+/// Takes a simple data structure (composed of [Map]s, [List]s, scalar objects, |
+/// and [Future]s) and recursively resolves all the [Future]s contained within. |
+/// Completes with the fully resolved structure. |
+Future awaitObject(object) { |
+ // Unroll nested futures. |
+ if (object is Future) return object.then(awaitObject); |
+ if (object is Collection) { |
Bob Nystrom
2013/03/04 23:52:00
Iterable?
nweiz
2013/03/05 02:16:09
Done.
|
+ return Future.wait(object.map(awaitObject).toList()); |
+ } |
+ if (object is! Map) return new Future.immediate(object); |
+ |
+ var pairs = <Future<Pair>>[]; |
+ object.forEach((key, value) { |
+ pairs.add(awaitObject(value) |
+ .then((resolved) => new Pair(key, resolved))); |
+ }); |
+ return Future.wait(pairs).then((resolvedPairs) { |
+ var map = {}; |
+ for (var pair in resolvedPairs) { |
+ map[pair.first] = pair.last; |
+ } |
+ return map; |
+ }); |
+} |