Index: tests/lib/async/stream_state_helper.dart |
diff --git a/tests/lib/async/stream_state_helper.dart b/tests/lib/async/stream_state_helper.dart |
index c433e391cb53ba8fb6fd818cd2d06b69bd810da6..1ab5a4cd9dffa92d4f8958efa4146d200b430206 100644 |
--- a/tests/lib/async/stream_state_helper.dart |
+++ b/tests/lib/async/stream_state_helper.dart |
@@ -8,28 +8,83 @@ import "../../../pkg/unittest/lib/unittest.dart"; |
import "dart:async"; |
import "dart:collection"; |
+class SubscriptionProtocolTest { |
+ final StreamProtocolTest _streamTest; |
+ final int id; |
+ StreamSubscription _subscription; |
+ |
+ SubscriptionProtocolTest(this.id, this._subscription, this._streamTest); |
+ |
+ void pause([Future resumeSignal]) { |
+ if (_subscription == null) throw new StateError("Not subscribed"); |
+ _subscription.pause(resumeSignal); |
+ } |
+ |
+ void resume() { |
+ if (_subscription == null) throw new StateError("Not subscribed"); |
+ _subscription.resume(); |
+ } |
+ |
+ void cancel() { |
+ if (_subscription == null) throw new StateError("Not subscribed"); |
+ _subscription.cancel(); |
+ _subscription = null; |
+ } |
+ |
+ void expectData(var data, [void action()]) { |
+ _streamTest._expectData(this, data, action); |
+ } |
+ |
+ void expectError(var error, [void action()]) { |
+ _streamTest._expectError(this, error, action); |
+ } |
+ |
+ void expectDone([void action()]) { |
+ _streamTest._expectDone(this, action); |
+ } |
+} |
+ |
class StreamProtocolTest { |
bool trace = false; |
+ // If not a broadcast stream, the onComplete is called automatically by |
+ // the first onCancel. |
+ bool isBroadcast; |
StreamController _controller; |
Stream _controllerStream; |
- StreamSubscription _subscription; |
+ // Most recent subscription created. Used as default for pause/resume. |
+ SubscriptionProtocolTest _latestSubscription; |
List<Event> _expectations = new List<Event>(); |
int _nextExpectationIndex = 0; |
+ int _subscriptionIdCounter = 0; |
Function _onComplete; |
- StreamProtocolTest([bool broadcast = false]) { |
+ StreamProtocolTest.broadcast({ bool sync: false }) |
+ : isBroadcast = true { |
+ _controller = new StreamController.broadcast( |
+ sync: sync, |
+ onListen: _onListen, |
+ onCancel: _onCancel); |
+ _controllerStream = _controller.stream; |
+ _onComplete = expectAsync0((){ |
+ _onComplete = null; // Being null marks the test as being complete. |
+ }); |
+ } |
+ |
+ StreamProtocolTest({ bool broadcast: false, bool sync: false }) |
+ : isBroadcast = false { |
_controller = new StreamController( |
- onListen: _onSubcription, |
- onPause: _onPause, |
- onResume: _onResume, |
- onCancel: _onCancel); |
+ sync: sync, |
+ onListen: _onListen, |
+ onPause: _onPause, |
+ onResume: _onResume, |
+ onCancel: _onCancel); |
if (broadcast) { |
_controllerStream = _controller.stream.asBroadcastStream(); |
} else { |
_controllerStream = _controller.stream; |
} |
_onComplete = expectAsync0((){ |
- _onComplete = null; // Being null marks the test to be complete. |
+ _onComplete = null; // Being null marks the test as being complete. |
}); |
} |
@@ -38,61 +93,76 @@ class StreamProtocolTest { |
void error(var error) { _controller.addError(error); } |
void close() { _controller.close(); } |
- void subscribe({bool cancelOnError : false}) { |
- // TODO(lrn): Handle more subscriptions (e.g., a subscription-id |
- // per subscription, and an id on event _expectations). |
- if (_subscription != null) throw new StateError("Already subscribed"); |
- _subscription = _controllerStream.listen(_onData, |
- onError: _onError, |
- onDone: _onDone, |
- cancelOnError: |
- cancelOnError); |
+ SubscriptionProtocolTest listen({bool cancelOnError : false}) { |
+ int subscriptionId = _subscriptionIdCounter++; |
+ |
+ StreamSubscription subscription = _controllerStream.listen( |
+ (var data) { _onData(subscriptionId, data); }, |
+ onError: (Object error) { _onError(subscriptionId, error); }, |
+ onDone: () { _onDone(subscriptionId); }, |
+ cancelOnError: cancelOnError); |
+ _latestSubscription = |
+ new SubscriptionProtocolTest(subscriptionId, subscription, this); |
+ if (trace) { |
+ print("[Listen #$subscriptionId(#${_latestSubscription.hashCode})]"); |
+ } |
+ return _latestSubscription; |
} |
+ // Actions on the most recently created subscription. |
void pause([Future resumeSignal]) { |
- if (_subscription == null) throw new StateError("Not subscribed"); |
- _subscription.pause(resumeSignal); |
+ _latestSubscription.pause(resumeSignal); |
} |
void resume() { |
- if (_subscription == null) throw new StateError("Not subscribed"); |
- _subscription.resume(); |
+ _latestSubscription.resume(); |
} |
void cancel() { |
- if (_subscription == null) throw new StateError("Not subscribed"); |
- _subscription.cancel(); |
- _subscription = null; |
+ _latestSubscription.cancel(); |
+ _latestSubscription = null; |
+ } |
+ |
+ // End the test now. There must be no open expectations, and no furter |
+ // expectations will be allowed. |
+ // Called automatically by an onCancel event on a non-broadcast stream. |
+ void terminate() { |
+ if (_nextExpectationIndex != _expectations.length) { |
+ _withNextExpectation((Event expect) { |
+ _fail("Expected: $expect\n" |
+ "Found : Early termination.\n${expect._stackTrace}"); |
+ }); |
+ } |
+ _onComplete(); |
} |
// Handling of stream events. |
- void _onData(var data) { |
- if (trace) print("[Data : $data]"); |
+ void _onData(int id, var data) { |
+ if (trace) print("[Data#$id : $data]"); |
_withNextExpectation((Event expect) { |
- if (!expect.matchData(data)) { |
+ if (!expect.matchData(id, data)) { |
_fail("Expected: $expect\n" |
- "Found : [Data: $data]"); |
+ "Found : [Data#$id: $data]\n${expect._stackTrace}"); |
} |
}); |
} |
- void _onError(error) { |
- if (trace) print("[Error : $error]"); |
+ void _onError(int id, Object error) { |
+ if (trace) print("[Error#$id : $error]"); |
_withNextExpectation((Event expect) { |
- if (!expect.matchError(error)) { |
+ if (!expect.matchError(id, error)) { |
_fail("Expected: $expect\n" |
- "Found : [Error: ${error}]"); |
+ "Found : [Error#$id: ${error}]\n${expect._stackTrace}"); |
} |
}); |
} |
- void _onDone() { |
- if (trace) print("[Done]"); |
- _subscription = null; |
+ void _onDone(int id) { |
+ if (trace) print("[Done#$id]"); |
_withNextExpectation((Event expect) { |
- if (!expect.matchDone()) { |
+ if (!expect.matchDone(id)) { |
_fail("Expected: $expect\n" |
- "Found : [Done]"); |
+ "Found : [Done#$id]\n${expect._stackTrace}"); |
} |
}); |
} |
@@ -102,7 +172,7 @@ class StreamProtocolTest { |
_withNextExpectation((Event expect) { |
if (!expect.matchPause()) { |
_fail("Expected: $expect\n" |
- "Found : [Paused]"); |
+ "Found : [Paused]\n${expect._stackTrace}"); |
} |
}); |
} |
@@ -112,17 +182,17 @@ class StreamProtocolTest { |
_withNextExpectation((Event expect) { |
if (!expect.matchResume()) { |
_fail("Expected: $expect\n" |
- "Found : [Resumed]"); |
+ "Found : [Resumed]\n${expect._stackTrace}"); |
} |
}); |
} |
- void _onSubcription() { |
+ void _onListen() { |
if (trace) print("[Subscribed]"); |
_withNextExpectation((Event expect) { |
if (!expect.matchSubscribe()) { |
_fail("Expected: $expect\n" |
- "Found: [Subscribed]"); |
+ "Found: [Subscribed]\n${expect._stackTrace}"); |
} |
}); |
} |
@@ -132,29 +202,22 @@ class StreamProtocolTest { |
_withNextExpectation((Event expect) { |
if (!expect.matchCancel()) { |
_fail("Expected: $expect\n" |
- "Found: [Cancelled]"); |
+ "Found: [Cancelled]\n${expect._stackTrace}"); |
} |
}); |
+ if (!isBroadcast) terminate(); |
} |
void _withNextExpectation(void action(Event expect)) { |
if (_nextExpectationIndex == _expectations.length) { |
+ _nextExpectationIndex++; |
action(new MismatchEvent()); |
} else { |
- Event next = _expectations[_nextExpectationIndex]; |
+ Event next = _expectations[_nextExpectationIndex++]; |
action(next); |
} |
- _nextExpectationIndex++; |
- _checkDone(); |
} |
- void _checkDone() { |
- if (_nextExpectationIndex == _expectations.length) { |
- _onComplete(); |
- } |
- } |
- |
- |
// Adds _expectations. |
void expectAny([void action()]) { |
if (_onComplete == null) { |
@@ -162,43 +225,62 @@ class StreamProtocolTest { |
} |
_expectations.add(new LogAnyEvent(action)); |
} |
+ |
void expectData(var data, [void action()]) { |
+ _expectData(null, data, action); |
+ } |
+ |
+ void _expectData(SubscriptionProtocolTest sub, var data, void action()) { |
if (_onComplete == null) { |
_fail("Adding expectation after completing"); |
} |
- _expectations.add(new DataEvent(data, action)); |
+ _expectations.add(new DataEvent(sub, data, action)); |
} |
+ |
void expectError(var error, [void action()]) { |
+ _expectError(null, error, action); |
+ } |
+ |
+ void _expectError(SubscriptionProtocolTest sub, var error, void action()) { |
if (_onComplete == null) { |
_fail("Adding expectation after completing"); |
} |
- _expectations.add(new ErrorEvent(error, action)); |
+ _expectations.add(new ErrorEvent(sub, error, action)); |
} |
+ |
void expectDone([void action()]) { |
+ _expectDone(null, action); |
+ } |
+ |
+ void _expectDone(SubscriptionProtocolTest sub, [void action()]) { |
if (_onComplete == null) { |
_fail("Adding expectation after completing"); |
} |
- _expectations.add(new DoneEvent(action)); |
+ _expectations.add(new DoneEvent(sub, action)); |
} |
+ |
void expectPause([void action()]) { |
if (_onComplete == null) { |
_fail("Adding expectation after completing"); |
} |
_expectations.add(new PauseCallbackEvent(action)); |
} |
+ |
void expectResume([void action()]) { |
if (_onComplete == null) { |
_fail("Adding expectation after completing"); |
} |
_expectations.add(new ResumeCallbackEvent(action)); |
} |
- void expectSubscription([void action()]) { |
+ |
+ void expectListen([void action()]) { |
if (_onComplete == null) { |
_fail("Adding expectation after completing"); |
} |
_expectations.add( |
new SubscriptionCallbackEvent(action)); |
} |
+ |
void expectCancel([void action()]) { |
if (_onComplete == null) { |
_fail("Adding expectation after completing"); |
@@ -211,45 +293,58 @@ class StreamProtocolTest { |
if (_nextExpectationIndex == 0) { |
throw "Unexpected event:\n$message\nNo earlier events matched."; |
} |
- throw "Unexpected event:\n$message\nMatched so far:\n" |
- " ${_expectations.take(_nextExpectationIndex).join("\n ")}"; |
+ StringBuffer buf = new StringBuffer(); |
+ for (int i = 0; i < _expectations.length; i++) { |
+ if (i == _nextExpectationIndex - 1) { |
+ buf.write("->"); |
+ } else { |
+ buf.write(" "); |
+ } |
+ buf.write(_expectations[i]); |
+ buf.write("\n"); |
+ } |
+ throw "Unexpected event:\n$message\nAll expectations:\n$buf"; |
} |
} |
class Event { |
Function _action; |
- Event(void this._action()); |
+ StackTrace _stackTrace; |
+ Event(void action()) |
+ : _action = (action == null) ? null : expectAsync0(action) { |
+ try { throw 0; } catch (_, s) { _stackTrace = s; } |
+ } |
- bool matchData(var data) { |
- if (!_testData(data)) return false; |
- if (_action != null) _action(); |
- return true; |
+ bool matchData(int id, var data) { |
+ return false; |
} |
- bool matchError(e) { |
- if (!_testError(e)) return false; |
- if (_action != null) _action(); |
- return true; |
+ |
+ bool matchError(int id, e) { |
+ return false; |
} |
- bool matchDone() { |
- if (!_testDone()) return false; |
- if (_action != null) _action(); |
- return true; |
+ |
+ bool matchDone(int id) { |
+ return false; |
} |
+ |
bool matchPause() { |
if (!_testPause()) return false; |
if (_action != null) _action(); |
return true; |
} |
+ |
bool matchResume() { |
if (!_testResume()) return false; |
if (_action != null) _action(); |
return true; |
} |
+ |
bool matchSubscribe() { |
if (!_testSubscribe()) return false; |
if (_action != null) _action(); |
return true; |
} |
+ |
bool matchCancel() { |
if (!_testCancel()) return false; |
if (_action != null) _action(); |
@@ -265,29 +360,59 @@ class Event { |
bool _testCancel() => false; |
} |
+class SubscriptionEvent extends Event { |
+ SubscriptionProtocolTest subscription; |
+ SubscriptionEvent(this.subscription, void action()) : super(action); |
+ |
+ bool matchData(int id, var data) { |
+ if (subscription != null && subscription.id != id) return false; |
+ if (!_testData(data)) return false; |
+ if (_action != null) _action(); |
+ return true; |
+ } |
+ |
+ bool matchError(int id, e) { |
+ if (subscription != null && subscription.id != id) return false; |
+ if (!_testError(e)) return false; |
+ if (_action != null) _action(); |
+ return true; |
+ } |
+ |
+ bool matchDone(int id) { |
+ if (subscription != null && subscription.id != id) return false; |
+ if (!_testDone()) return false; |
+ if (_action != null) _action(); |
+ return true; |
+ } |
+ |
+ String get _id => (subscription == null) ? "" : "#${subscription.id}"; |
+} |
+ |
class MismatchEvent extends Event { |
MismatchEvent() : super(null); |
toString() => "[No event expected]"; |
} |
-class DataEvent extends Event { |
+class DataEvent extends SubscriptionEvent { |
final data; |
- DataEvent(this.data, void action()) : super(action); |
+ DataEvent(SubscriptionProtocolTest sub, this.data, void action()) |
+ : super(sub, action); |
bool _testData(var data) => this.data == data; |
- String toString() => "[Data: $data]"; |
+ String toString() => "[Data$_id: $data]"; |
} |
-class ErrorEvent extends Event { |
+class ErrorEvent extends SubscriptionEvent { |
final error; |
- ErrorEvent(this.error, void action()) : super(action); |
+ ErrorEvent(SubscriptionProtocolTest sub, this.error, void action()) |
+ : super(sub, action); |
bool _testError(error) => this.error == error; |
- String toString() => "[Error: $error]"; |
+ String toString() => "[Error$_id: $error]"; |
} |
-class DoneEvent extends Event { |
- DoneEvent(void action()) : super(action); |
+class DoneEvent extends SubscriptionEvent { |
+ DoneEvent(SubscriptionProtocolTest sub, void action()) : super(sub, action); |
bool _testDone() => true; |
- String toString() => "[Done]"; |
+ String toString() => "[Done$_id]"; |
} |
class PauseCallbackEvent extends Event { |
@@ -314,38 +439,47 @@ class CancelCallbackEvent extends Event { |
String toString() => "[Cancelled]"; |
} |
- |
+/** Event matcher that matches any other event. */ |
class LogAnyEvent extends Event { |
String _actual = "*Not matched yet*"; |
+ |
LogAnyEvent(void action()) : super(action); |
+ |
bool _testData(var data) { |
_actual = "*[Data $data]"; |
return true; |
} |
+ |
bool _testError(error) { |
_actual = "*[Error ${error}]"; |
return true; |
} |
+ |
bool _testDone() { |
_actual = "*[Done]"; |
return true; |
} |
+ |
bool _testPause() { |
_actual = "*[Paused]"; |
return true; |
} |
+ |
bool _testResume() { |
_actual = "*[Resumed]"; |
return true; |
} |
+ |
bool _testSubcribe() { |
_actual = "*[Subscribed]"; |
return true; |
} |
+ |
bool _testCancel() { |
_actual = "*[Cancelled]"; |
return true; |
} |
+ /** Returns a representation of the event it was tested against. */ |
String toString() => _actual; |
} |