Index: tests/lib/async/stream_state_helper.dart |
diff --git a/tests/lib/async/stream_state_helper.dart b/tests/lib/async/stream_state_helper.dart |
index dbc645a47c9029bf4d51fdee7023090b41bfc94c..c433e391cb53ba8fb6fd818cd2d06b69bd810da6 100644 |
--- a/tests/lib/async/stream_state_helper.dart |
+++ b/tests/lib/async/stream_state_helper.dart |
@@ -9,6 +9,7 @@ import "dart:async"; |
import "dart:collection"; |
class StreamProtocolTest { |
+ bool trace = false; |
StreamController _controller; |
Stream _controllerStream; |
StreamSubscription _subscription; |
@@ -20,9 +21,8 @@ class StreamProtocolTest { |
_controller = new StreamController( |
onListen: _onSubcription, |
onPause: _onPause, |
- onResume: _onPause, |
- onCancel: _onSubcription); |
- // TODO(lrn): Make it work with multiple subscribers too. |
+ onResume: _onResume, |
+ onCancel: _onCancel); |
if (broadcast) { |
_controllerStream = _controller.stream.asBroadcastStream(); |
} else { |
@@ -54,7 +54,7 @@ class StreamProtocolTest { |
_subscription.pause(resumeSignal); |
} |
- void resume([Future resumeSignal]) { |
+ void resume() { |
if (_subscription == null) throw new StateError("Not subscribed"); |
_subscription.resume(); |
} |
@@ -67,6 +67,7 @@ class StreamProtocolTest { |
// Handling of stream events. |
void _onData(var data) { |
+ if (trace) print("[Data : $data]"); |
_withNextExpectation((Event expect) { |
if (!expect.matchData(data)) { |
_fail("Expected: $expect\n" |
@@ -76,15 +77,17 @@ class StreamProtocolTest { |
} |
void _onError(error) { |
+ if (trace) print("[Error : $error]"); |
_withNextExpectation((Event expect) { |
if (!expect.matchError(error)) { |
_fail("Expected: $expect\n" |
- "Found : [Data: ${error}]"); |
+ "Found : [Error: ${error}]"); |
} |
}); |
} |
void _onDone() { |
+ if (trace) print("[Done]"); |
_subscription = null; |
_withNextExpectation((Event expect) { |
if (!expect.matchDone()) { |
@@ -95,20 +98,41 @@ class StreamProtocolTest { |
} |
void _onPause() { |
+ if (trace) print("[Pause]"); |
_withNextExpectation((Event expect) { |
- if (!expect.matchPauseChange(_controller)) { |
+ if (!expect.matchPause()) { |
_fail("Expected: $expect\n" |
- "Found : [Paused:${_controller.isPaused}]"); |
+ "Found : [Paused]"); |
+ } |
+ }); |
+ } |
+ |
+ void _onResume() { |
+ if (trace) print("[Resumed]"); |
+ _withNextExpectation((Event expect) { |
+ if (!expect.matchResume()) { |
+ _fail("Expected: $expect\n" |
+ "Found : [Resumed]"); |
} |
}); |
} |
void _onSubcription() { |
+ if (trace) print("[Subscribed]"); |
_withNextExpectation((Event expect) { |
- if (!expect.matchSubscriptionChange(_controller)) { |
+ if (!expect.matchSubscribe()) { |
_fail("Expected: $expect\n" |
- "Found: [Has listener:${_controller.hasListener}, " |
- "Paused:${_controller.isPaused}]"); |
+ "Found: [Subscribed]"); |
+ } |
+ }); |
+ } |
+ |
+ void _onCancel() { |
+ if (trace) print("[Cancelled]"); |
+ _withNextExpectation((Event expect) { |
+ if (!expect.matchCancel()) { |
+ _fail("Expected: $expect\n" |
+ "Found: [Cancelled]"); |
} |
}); |
} |
@@ -117,9 +141,10 @@ class StreamProtocolTest { |
if (_nextExpectationIndex == _expectations.length) { |
action(new MismatchEvent()); |
} else { |
- Event next = _expectations[_nextExpectationIndex++]; |
+ Event next = _expectations[_nextExpectationIndex]; |
action(next); |
} |
+ _nextExpectationIndex++; |
_checkDone(); |
} |
@@ -155,18 +180,31 @@ class StreamProtocolTest { |
} |
_expectations.add(new DoneEvent(action)); |
} |
- void expectPause(bool isPaused, [void action()]) { |
+ void expectPause([void action()]) { |
+ if (_onComplete == null) { |
+ _fail("Adding expectation after completing"); |
+ } |
+ _expectations.add(new PauseCallbackEvent(action)); |
+ } |
+ void expectResume([void action()]) { |
+ if (_onComplete == null) { |
+ _fail("Adding expectation after completing"); |
+ } |
+ _expectations.add(new ResumeCallbackEvent(action)); |
+ } |
+ void expectSubscription([void action()]) { |
if (_onComplete == null) { |
_fail("Adding expectation after completing"); |
} |
- _expectations.add(new PauseCallbackEvent(isPaused, action)); |
+ _expectations.add( |
+ new SubscriptionCallbackEvent(action)); |
} |
- void expectSubscription(bool hasListener, bool isPaused, [void action()]) { |
+ void expectCancel([void action()]) { |
if (_onComplete == null) { |
_fail("Adding expectation after completing"); |
} |
_expectations.add( |
- new SubscriptionCallbackEvent(hasListener, isPaused, action)); |
+ new CancelCallbackEvent(action)); |
} |
void _fail(String message) { |
@@ -178,11 +216,6 @@ class StreamProtocolTest { |
} |
} |
-class EventCollector { |
- final Queue<Event> events = new Queue<Event>(); |
- |
-} |
- |
class Event { |
Function _action; |
Event(void this._action()); |
@@ -202,13 +235,23 @@ class Event { |
if (_action != null) _action(); |
return true; |
} |
- bool matchPauseChange(StreamController c) { |
- if (!_testPause(c)) return false; |
+ bool matchPause() { |
+ if (!_testPause()) return false; |
if (_action != null) _action(); |
return true; |
} |
- bool matchSubscriptionChange(StreamController c) { |
- if (!_testSubscribe(c)) return false; |
+ bool matchResume() { |
+ if (!_testResume()) return false; |
+ if (_action != null) _action(); |
+ return true; |
+ } |
+ bool matchSubscribe() { |
+ if (!_testSubscribe()) return false; |
+ if (_action != null) _action(); |
+ return true; |
+ } |
+ bool matchCancel() { |
+ if (!_testCancel()) return false; |
if (_action != null) _action(); |
return true; |
} |
@@ -216,8 +259,10 @@ class Event { |
bool _testData(_) => false; |
bool _testError(_) => false; |
bool _testDone() => false; |
- bool _testPause(_) => false; |
- bool _testSubscribe(_) => false; |
+ bool _testPause() => false; |
+ bool _testResume() => false; |
+ bool _testSubscribe() => false; |
+ bool _testCancel() => false; |
} |
class MismatchEvent extends Event { |
@@ -246,22 +291,27 @@ class DoneEvent extends Event { |
} |
class PauseCallbackEvent extends Event { |
- final bool isPaused; |
- PauseCallbackEvent(this.isPaused, void action()) |
- : super(action); |
- bool _testPause(StreamController c) => isPaused == c.isPaused; |
- String toString() => "[Paused:$isPaused]"; |
+ PauseCallbackEvent(void action()) : super(action); |
+ bool _testPause() => true; |
+ String toString() => "[Paused]"; |
+} |
+ |
+class ResumeCallbackEvent extends Event { |
+ ResumeCallbackEvent(void action()) : super(action); |
+ bool _testResume() => true; |
+ String toString() => "[Resumed]"; |
} |
class SubscriptionCallbackEvent extends Event { |
- final bool hasListener; |
- final bool isPaused; |
- SubscriptionCallbackEvent(this.hasListener, this.isPaused, void action()) |
- : super(action); |
- bool _testSubscribe(StreamController c) { |
- return hasListener == c.hasListener && isPaused == c.isPaused; |
- } |
- String toString() => "[Has listener:$hasListener, Paused:$isPaused]"; |
+ SubscriptionCallbackEvent(void action()) : super(action); |
+ bool _testSubscribe() => true; |
+ String toString() => "[Subscribed]"; |
+} |
+ |
+class CancelCallbackEvent extends Event { |
+ CancelCallbackEvent(void action()) : super(action); |
+ bool _testCancel() => true; |
+ String toString() => "[Cancelled]"; |
} |
@@ -280,12 +330,20 @@ class LogAnyEvent extends Event { |
_actual = "*[Done]"; |
return true; |
} |
- bool _testPause(StreamController c) { |
- _actual = "*[Paused:${c.isPaused}]"; |
+ bool _testPause() { |
+ _actual = "*[Paused]"; |
+ return true; |
+ } |
+ bool _testResume() { |
+ _actual = "*[Resumed]"; |
+ return true; |
+ } |
+ bool _testSubcribe() { |
+ _actual = "*[Subscribed]"; |
return true; |
} |
- bool _testSubcribe(StreamController c) { |
- _actual = "*[Has listener:${c.hasListener}, Paused:${c.isPaused}]"; |
+ bool _testCancel() { |
+ _actual = "*[Cancelled]"; |
return true; |
} |