Index: tests/lib/async/stream_state_helper.dart |
diff --git a/tests/lib/async/stream_state_helper.dart b/tests/lib/async/stream_state_helper.dart |
index 1ab5a4cd9dffa92d4f8958efa4146d200b430206..074a8f125afb6776eef4c3f76cdc3eeb8107866a 100644 |
--- a/tests/lib/async/stream_state_helper.dart |
+++ b/tests/lib/async/stream_state_helper.dart |
@@ -46,9 +46,8 @@ class SubscriptionProtocolTest { |
class StreamProtocolTest { |
bool trace = false; |
- // If not a broadcast stream, the onComplete is called automatically by |
- // the first onCancel. |
- bool isBroadcast; |
+ final bool isBroadcast; |
+ final bool isAsBroadcast; |
StreamController _controller; |
Stream _controllerStream; |
// Most recent subscription created. Used as default for pause/resume. |
@@ -59,7 +58,7 @@ class StreamProtocolTest { |
Function _onComplete; |
StreamProtocolTest.broadcast({ bool sync: false }) |
- : isBroadcast = true { |
+ : isBroadcast = true, isAsBroadcast = false { |
_controller = new StreamController.broadcast( |
sync: sync, |
onListen: _onListen, |
@@ -70,19 +69,31 @@ class StreamProtocolTest { |
}); |
} |
- StreamProtocolTest({ bool broadcast: false, bool sync: false }) |
- : isBroadcast = false { |
+ StreamProtocolTest({ bool sync: false }) |
+ : isBroadcast = false, isAsBroadcast = false { |
_controller = new StreamController( |
sync: sync, |
onListen: _onListen, |
onPause: _onPause, |
onResume: _onResume, |
onCancel: _onCancel); |
- if (broadcast) { |
- _controllerStream = _controller.stream.asBroadcastStream(); |
- } else { |
- _controllerStream = _controller.stream; |
- } |
+ _controllerStream = _controller.stream; |
+ _onComplete = expectAsync0((){ |
+ _onComplete = null; // Being null marks the test as being complete. |
+ }); |
+ } |
+ |
+ StreamProtocolTest.asBroadcast({ bool sync: false }) |
+ : isBroadcast = false, isAsBroadcast = true { |
+ _controller = new StreamController( |
+ sync: sync, |
+ onListen: _onListen, |
+ onPause: _onPause, |
+ onResume: _onResume, |
+ onCancel: _onCancel); |
+ _controllerStream = _controller.stream.asBroadcastStream( |
+ onListen: _onBroadcastListen, |
+ onCancel: _onBroadcastCancel); |
_onComplete = expectAsync0((){ |
_onComplete = null; // Being null marks the test as being complete. |
}); |
@@ -205,7 +216,26 @@ class StreamProtocolTest { |
"Found: [Cancelled]\n${expect._stackTrace}"); |
} |
}); |
- if (!isBroadcast) terminate(); |
+ } |
+ |
+ void _onBroadcastListen(StreamSubscription sub) { |
+ if (trace) print("[BroadcastListen]"); |
+ _withNextExpectation((Event expect) { |
+ if (!expect.matchBroadcastListen(sub)) { |
+ _fail("Expected: $expect\n" |
+ "Found: [BroadcastListen]\n${expect._stackTrace}"); |
+ } |
+ }); |
+ } |
+ |
+ void _onBroadcastCancel(StreamSubscription sub) { |
+ if (trace) print("[BroadcastCancel]"); |
+ _withNextExpectation((Event expect) { |
+ if (!expect.matchBroadcastCancel(sub)) { |
+ _fail("Expected: $expect\n" |
+ "Found: [BroadcastCancel]\n${expect._stackTrace}"); |
+ } |
+ }); |
} |
void _withNextExpectation(void action(Event expect)) { |
@@ -289,6 +319,38 @@ class StreamProtocolTest { |
new CancelCallbackEvent(action)); |
} |
+ void expectBroadcastListen([void action(StreamSubscription sub)]) { |
+ if (_onComplete == null) { |
+ _fail("Adding expectation after completing"); |
+ } |
+ if (!isAsBroadcast) throw new StateError("Not an asBroadcast stream"); |
+ _expectations.add(new BroadcastListenCallbackEvent(action)); |
+ } |
+ |
+ void expectBroadcastCancel([void action(StreamSubscription sub)]) { |
+ if (_onComplete == null) { |
+ _fail("Adding expectation after completing"); |
+ } |
+ if (!isAsBroadcast) throw new StateError("Not an asBroadcast stream"); |
+ _expectations.add(new BroadcastCancelCallbackEvent(action)); |
+ } |
+ |
+ void expectBroadcastListenOpt([void action(StreamSubscription sub)]) { |
+ if (_onComplete == null) { |
+ _fail("Adding expectation after completing"); |
+ } |
+ if (!isAsBroadcast) return; |
+ _expectations.add(new BroadcastListenCallbackEvent(action)); |
+ } |
+ |
+ void expectBroadcastCancelOpt([void action(StreamSubscription sub)]) { |
+ if (_onComplete == null) { |
+ _fail("Adding expectation after completing"); |
+ } |
+ if (!isAsBroadcast) return; |
+ _expectations.add(new BroadcastCancelCallbackEvent(action)); |
+ } |
+ |
void _fail(String message) { |
if (_nextExpectationIndex == 0) { |
throw "Unexpected event:\n$message\nNo earlier events matched."; |
@@ -314,6 +376,10 @@ class Event { |
: _action = (action == null) ? null : expectAsync0(action) { |
try { throw 0; } catch (_, s) { _stackTrace = s; } |
} |
+ Event.broadcast(void action(StreamSubscription sub)) |
+ : _action = (action == null) ? null : expectAsync1(action) { |
+ try { throw 0; } catch (_, s) { _stackTrace = s; } |
+ } |
bool matchData(int id, var data) { |
return false; |
@@ -351,6 +417,18 @@ class Event { |
return true; |
} |
+ bool matchBroadcastListen(StreamSubscription sub) { |
+ if (!_testBroadcastListen()) return false; |
+ if (_action != null) _action(sub); |
+ return true; |
+ } |
+ |
+ bool matchBroadcastCancel(StreamSubscription sub) { |
+ if (!_testBroadcastCancel()) return false; |
+ if (_action != null) _action(sub); |
+ return true; |
+ } |
+ |
bool _testData(_) => false; |
bool _testError(_) => false; |
bool _testDone() => false; |
@@ -358,6 +436,8 @@ class Event { |
bool _testResume() => false; |
bool _testSubscribe() => false; |
bool _testCancel() => false; |
+ bool _testBroadcastListen() => false; |
+ bool _testBroadcastCancel() => false; |
} |
class SubscriptionEvent extends Event { |
@@ -439,6 +519,25 @@ class CancelCallbackEvent extends Event { |
String toString() => "[Cancelled]"; |
} |
+class _BroadcastEventCallback extends Event { |
+ Function _action; |
+ _BroadcastEventCallback(); |
+} |
+ |
+class BroadcastCancelCallbackEvent extends Event { |
+ BroadcastCancelCallbackEvent(void action(StreamSubscription sub)) |
+ : super.broadcast(action); |
+ bool _testBroadcastCancel() => true; |
+ String toString() => "[BroadcastCancel]"; |
+} |
+ |
+class BroadcastListenCallbackEvent extends Event { |
+ BroadcastListenCallbackEvent(void action(StreamSubscription sub)) |
+ : super.broadcast(action); |
+ bool _testBroadcastListen() => true; |
+ String toString() => "[BroadcastListen]"; |
+} |
+ |
/** Event matcher that matches any other event. */ |
class LogAnyEvent extends Event { |
String _actual = "*Not matched yet*"; |
@@ -480,6 +579,16 @@ class LogAnyEvent extends Event { |
return true; |
} |
+ bool _testBroadcastListen() { |
+ _actual = "*[BroadcastListen]"; |
+ return true; |
+ } |
+ |
+ bool _testBroadcastCancel() { |
+ _actual = "*[BroadcastCancel]"; |
+ return true; |
+ } |
+ |
/** Returns a representation of the event it was tested against. */ |
String toString() => _actual; |
} |