Index: test/utils.dart |
diff --git a/test/utils.dart b/test/utils.dart |
index 0dc47fa1521120b567a89c10adefc3aa3a173928..445b9fcffc7b184c9c427e5f90a047abc147f5a2 100644 |
--- a/test/utils.dart |
+++ b/test/utils.dart |
@@ -6,6 +6,8 @@ |
library async.test.util; |
import "dart:async"; |
+ |
+import "package:async/async.dart"; |
import "package:test/test.dart"; |
/// A zero-millisecond timer should wait until after all microtasks. |
@@ -40,3 +42,46 @@ class CompleterStreamSink<T> implements StreamSink<T> { |
Future addStream(Stream<T> stream) async {} |
Future close() => completer.future; |
} |
+ |
+/// A [StreamSink] that collects all events added to it as results. |
+/// |
+/// This is used for testing code that interacts with sinks. |
+class TestSink<T> implements StreamSink<T> { |
+ /// The results corresponding to events that have been added to the sink. |
+ final results = <Result<T>>[]; |
+ |
+ /// Whether [close] has been called. |
+ bool get isClosed => _isClosed; |
+ var _isClosed = false; |
+ |
+ Future get done => _doneCompleter.future; |
+ final _doneCompleter = new Completer(); |
+ |
+ final Function _onDone; |
+ |
+ /// Creates a new sink. |
+ /// |
+ /// If [onDone] is passed, it's called when the user calls [close]. Its result |
+ /// is piped to the [done] future. |
+ TestSink({onDone()}) : _onDone = onDone ?? (() {}); |
+ |
+ void add(T event) { |
+ results.add(new Result<T>.value(event)); |
+ } |
+ |
+ void addError(error, [StackTrace stackTrace]) { |
+ results.add(new Result<T>.error(error, stackTrace)); |
+ } |
+ |
+ Future addStream(Stream<T> stream) { |
+ var completer = new Completer.sync(); |
+ stream.listen(add, onError: addError, onDone: completer.complete); |
+ return completer.future; |
+ } |
+ |
+ Future close() { |
+ _isClosed = true; |
+ _doneCompleter.complete(new Future.microtask(_onDone)); |
+ return done; |
+ } |
+} |