Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(178)

Unified Diff: tests/lib/async/stream_state_helper.dart

Issue 16125005: Make new StreamController be async by default. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Address review comments Created 7 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: tests/lib/async/stream_state_helper.dart
diff --git a/tests/lib/async/stream_state_helper.dart b/tests/lib/async/stream_state_helper.dart
index c433e391cb53ba8fb6fd818cd2d06b69bd810da6..1ab5a4cd9dffa92d4f8958efa4146d200b430206 100644
--- a/tests/lib/async/stream_state_helper.dart
+++ b/tests/lib/async/stream_state_helper.dart
@@ -8,28 +8,83 @@ import "../../../pkg/unittest/lib/unittest.dart";
import "dart:async";
import "dart:collection";
+class SubscriptionProtocolTest {
+ final StreamProtocolTest _streamTest;
+ final int id;
+ StreamSubscription _subscription;
+
+ SubscriptionProtocolTest(this.id, this._subscription, this._streamTest);
+
+ void pause([Future resumeSignal]) {
+ if (_subscription == null) throw new StateError("Not subscribed");
+ _subscription.pause(resumeSignal);
+ }
+
+ void resume() {
+ if (_subscription == null) throw new StateError("Not subscribed");
+ _subscription.resume();
+ }
+
+ void cancel() {
+ if (_subscription == null) throw new StateError("Not subscribed");
+ _subscription.cancel();
+ _subscription = null;
+ }
+
+ void expectData(var data, [void action()]) {
+ _streamTest._expectData(this, data, action);
+ }
+
+ void expectError(var error, [void action()]) {
+ _streamTest._expectError(this, error, action);
+ }
+
+ void expectDone([void action()]) {
+ _streamTest._expectDone(this, action);
+ }
+}
+
class StreamProtocolTest {
bool trace = false;
+ // If not a broadcast stream, the onComplete is called automatically by
+ // the first onCancel.
+ bool isBroadcast;
StreamController _controller;
Stream _controllerStream;
- StreamSubscription _subscription;
+ // Most recent subscription created. Used as default for pause/resume.
+ SubscriptionProtocolTest _latestSubscription;
List<Event> _expectations = new List<Event>();
int _nextExpectationIndex = 0;
+ int _subscriptionIdCounter = 0;
Function _onComplete;
- StreamProtocolTest([bool broadcast = false]) {
+ StreamProtocolTest.broadcast({ bool sync: false })
+ : isBroadcast = true {
+ _controller = new StreamController.broadcast(
+ sync: sync,
+ onListen: _onListen,
+ onCancel: _onCancel);
+ _controllerStream = _controller.stream;
+ _onComplete = expectAsync0((){
+ _onComplete = null; // Being null marks the test as being complete.
+ });
+ }
+
+ StreamProtocolTest({ bool broadcast: false, bool sync: false })
+ : isBroadcast = false {
_controller = new StreamController(
- onListen: _onSubcription,
- onPause: _onPause,
- onResume: _onResume,
- onCancel: _onCancel);
+ sync: sync,
+ onListen: _onListen,
+ onPause: _onPause,
+ onResume: _onResume,
+ onCancel: _onCancel);
if (broadcast) {
_controllerStream = _controller.stream.asBroadcastStream();
} else {
_controllerStream = _controller.stream;
}
_onComplete = expectAsync0((){
- _onComplete = null; // Being null marks the test to be complete.
+ _onComplete = null; // Being null marks the test as being complete.
});
}
@@ -38,61 +93,76 @@ class StreamProtocolTest {
void error(var error) { _controller.addError(error); }
void close() { _controller.close(); }
- void subscribe({bool cancelOnError : false}) {
- // TODO(lrn): Handle more subscriptions (e.g., a subscription-id
- // per subscription, and an id on event _expectations).
- if (_subscription != null) throw new StateError("Already subscribed");
- _subscription = _controllerStream.listen(_onData,
- onError: _onError,
- onDone: _onDone,
- cancelOnError:
- cancelOnError);
+ SubscriptionProtocolTest listen({bool cancelOnError : false}) {
+ int subscriptionId = _subscriptionIdCounter++;
+
+ StreamSubscription subscription = _controllerStream.listen(
+ (var data) { _onData(subscriptionId, data); },
+ onError: (Object error) { _onError(subscriptionId, error); },
+ onDone: () { _onDone(subscriptionId); },
+ cancelOnError: cancelOnError);
+ _latestSubscription =
+ new SubscriptionProtocolTest(subscriptionId, subscription, this);
+ if (trace) {
+ print("[Listen #$subscriptionId(#${_latestSubscription.hashCode})]");
+ }
+ return _latestSubscription;
}
+ // Actions on the most recently created subscription.
void pause([Future resumeSignal]) {
- if (_subscription == null) throw new StateError("Not subscribed");
- _subscription.pause(resumeSignal);
+ _latestSubscription.pause(resumeSignal);
}
void resume() {
- if (_subscription == null) throw new StateError("Not subscribed");
- _subscription.resume();
+ _latestSubscription.resume();
}
void cancel() {
- if (_subscription == null) throw new StateError("Not subscribed");
- _subscription.cancel();
- _subscription = null;
+ _latestSubscription.cancel();
+ _latestSubscription = null;
+ }
+
+ // End the test now. There must be no open expectations, and no furter
+ // expectations will be allowed.
+ // Called automatically by an onCancel event on a non-broadcast stream.
+ void terminate() {
+ if (_nextExpectationIndex != _expectations.length) {
+ _withNextExpectation((Event expect) {
+ _fail("Expected: $expect\n"
+ "Found : Early termination.\n${expect._stackTrace}");
+ });
+ }
+ _onComplete();
}
// Handling of stream events.
- void _onData(var data) {
- if (trace) print("[Data : $data]");
+ void _onData(int id, var data) {
+ if (trace) print("[Data#$id : $data]");
_withNextExpectation((Event expect) {
- if (!expect.matchData(data)) {
+ if (!expect.matchData(id, data)) {
_fail("Expected: $expect\n"
- "Found : [Data: $data]");
+ "Found : [Data#$id: $data]\n${expect._stackTrace}");
}
});
}
- void _onError(error) {
- if (trace) print("[Error : $error]");
+ void _onError(int id, Object error) {
+ if (trace) print("[Error#$id : $error]");
_withNextExpectation((Event expect) {
- if (!expect.matchError(error)) {
+ if (!expect.matchError(id, error)) {
_fail("Expected: $expect\n"
- "Found : [Error: ${error}]");
+ "Found : [Error#$id: ${error}]\n${expect._stackTrace}");
}
});
}
- void _onDone() {
- if (trace) print("[Done]");
- _subscription = null;
+ void _onDone(int id) {
+ if (trace) print("[Done#$id]");
_withNextExpectation((Event expect) {
- if (!expect.matchDone()) {
+ if (!expect.matchDone(id)) {
_fail("Expected: $expect\n"
- "Found : [Done]");
+ "Found : [Done#$id]\n${expect._stackTrace}");
}
});
}
@@ -102,7 +172,7 @@ class StreamProtocolTest {
_withNextExpectation((Event expect) {
if (!expect.matchPause()) {
_fail("Expected: $expect\n"
- "Found : [Paused]");
+ "Found : [Paused]\n${expect._stackTrace}");
}
});
}
@@ -112,17 +182,17 @@ class StreamProtocolTest {
_withNextExpectation((Event expect) {
if (!expect.matchResume()) {
_fail("Expected: $expect\n"
- "Found : [Resumed]");
+ "Found : [Resumed]\n${expect._stackTrace}");
}
});
}
- void _onSubcription() {
+ void _onListen() {
if (trace) print("[Subscribed]");
_withNextExpectation((Event expect) {
if (!expect.matchSubscribe()) {
_fail("Expected: $expect\n"
- "Found: [Subscribed]");
+ "Found: [Subscribed]\n${expect._stackTrace}");
}
});
}
@@ -132,29 +202,22 @@ class StreamProtocolTest {
_withNextExpectation((Event expect) {
if (!expect.matchCancel()) {
_fail("Expected: $expect\n"
- "Found: [Cancelled]");
+ "Found: [Cancelled]\n${expect._stackTrace}");
}
});
+ if (!isBroadcast) terminate();
}
void _withNextExpectation(void action(Event expect)) {
if (_nextExpectationIndex == _expectations.length) {
+ _nextExpectationIndex++;
action(new MismatchEvent());
} else {
- Event next = _expectations[_nextExpectationIndex];
+ Event next = _expectations[_nextExpectationIndex++];
action(next);
}
- _nextExpectationIndex++;
- _checkDone();
}
- void _checkDone() {
- if (_nextExpectationIndex == _expectations.length) {
- _onComplete();
- }
- }
-
-
// Adds _expectations.
void expectAny([void action()]) {
if (_onComplete == null) {
@@ -162,43 +225,62 @@ class StreamProtocolTest {
}
_expectations.add(new LogAnyEvent(action));
}
+
void expectData(var data, [void action()]) {
+ _expectData(null, data, action);
+ }
+
+ void _expectData(SubscriptionProtocolTest sub, var data, void action()) {
if (_onComplete == null) {
_fail("Adding expectation after completing");
}
- _expectations.add(new DataEvent(data, action));
+ _expectations.add(new DataEvent(sub, data, action));
}
+
void expectError(var error, [void action()]) {
+ _expectError(null, error, action);
+ }
+
+ void _expectError(SubscriptionProtocolTest sub, var error, void action()) {
if (_onComplete == null) {
_fail("Adding expectation after completing");
}
- _expectations.add(new ErrorEvent(error, action));
+ _expectations.add(new ErrorEvent(sub, error, action));
}
+
void expectDone([void action()]) {
+ _expectDone(null, action);
+ }
+
+ void _expectDone(SubscriptionProtocolTest sub, [void action()]) {
if (_onComplete == null) {
_fail("Adding expectation after completing");
}
- _expectations.add(new DoneEvent(action));
+ _expectations.add(new DoneEvent(sub, action));
}
+
void expectPause([void action()]) {
if (_onComplete == null) {
_fail("Adding expectation after completing");
}
_expectations.add(new PauseCallbackEvent(action));
}
+
void expectResume([void action()]) {
if (_onComplete == null) {
_fail("Adding expectation after completing");
}
_expectations.add(new ResumeCallbackEvent(action));
}
- void expectSubscription([void action()]) {
+
+ void expectListen([void action()]) {
if (_onComplete == null) {
_fail("Adding expectation after completing");
}
_expectations.add(
new SubscriptionCallbackEvent(action));
}
+
void expectCancel([void action()]) {
if (_onComplete == null) {
_fail("Adding expectation after completing");
@@ -211,45 +293,58 @@ class StreamProtocolTest {
if (_nextExpectationIndex == 0) {
throw "Unexpected event:\n$message\nNo earlier events matched.";
}
- throw "Unexpected event:\n$message\nMatched so far:\n"
- " ${_expectations.take(_nextExpectationIndex).join("\n ")}";
+ StringBuffer buf = new StringBuffer();
+ for (int i = 0; i < _expectations.length; i++) {
+ if (i == _nextExpectationIndex - 1) {
+ buf.write("->");
+ } else {
+ buf.write(" ");
+ }
+ buf.write(_expectations[i]);
+ buf.write("\n");
+ }
+ throw "Unexpected event:\n$message\nAll expectations:\n$buf";
}
}
class Event {
Function _action;
- Event(void this._action());
+ StackTrace _stackTrace;
+ Event(void action())
+ : _action = (action == null) ? null : expectAsync0(action) {
+ try { throw 0; } catch (_, s) { _stackTrace = s; }
+ }
- bool matchData(var data) {
- if (!_testData(data)) return false;
- if (_action != null) _action();
- return true;
+ bool matchData(int id, var data) {
+ return false;
}
- bool matchError(e) {
- if (!_testError(e)) return false;
- if (_action != null) _action();
- return true;
+
+ bool matchError(int id, e) {
+ return false;
}
- bool matchDone() {
- if (!_testDone()) return false;
- if (_action != null) _action();
- return true;
+
+ bool matchDone(int id) {
+ return false;
}
+
bool matchPause() {
if (!_testPause()) return false;
if (_action != null) _action();
return true;
}
+
bool matchResume() {
if (!_testResume()) return false;
if (_action != null) _action();
return true;
}
+
bool matchSubscribe() {
if (!_testSubscribe()) return false;
if (_action != null) _action();
return true;
}
+
bool matchCancel() {
if (!_testCancel()) return false;
if (_action != null) _action();
@@ -265,29 +360,59 @@ class Event {
bool _testCancel() => false;
}
+class SubscriptionEvent extends Event {
+ SubscriptionProtocolTest subscription;
+ SubscriptionEvent(this.subscription, void action()) : super(action);
+
+ bool matchData(int id, var data) {
+ if (subscription != null && subscription.id != id) return false;
+ if (!_testData(data)) return false;
+ if (_action != null) _action();
+ return true;
+ }
+
+ bool matchError(int id, e) {
+ if (subscription != null && subscription.id != id) return false;
+ if (!_testError(e)) return false;
+ if (_action != null) _action();
+ return true;
+ }
+
+ bool matchDone(int id) {
+ if (subscription != null && subscription.id != id) return false;
+ if (!_testDone()) return false;
+ if (_action != null) _action();
+ return true;
+ }
+
+ String get _id => (subscription == null) ? "" : "#${subscription.id}";
+}
+
class MismatchEvent extends Event {
MismatchEvent() : super(null);
toString() => "[No event expected]";
}
-class DataEvent extends Event {
+class DataEvent extends SubscriptionEvent {
final data;
- DataEvent(this.data, void action()) : super(action);
+ DataEvent(SubscriptionProtocolTest sub, this.data, void action())
+ : super(sub, action);
bool _testData(var data) => this.data == data;
- String toString() => "[Data: $data]";
+ String toString() => "[Data$_id: $data]";
}
-class ErrorEvent extends Event {
+class ErrorEvent extends SubscriptionEvent {
final error;
- ErrorEvent(this.error, void action()) : super(action);
+ ErrorEvent(SubscriptionProtocolTest sub, this.error, void action())
+ : super(sub, action);
bool _testError(error) => this.error == error;
- String toString() => "[Error: $error]";
+ String toString() => "[Error$_id: $error]";
}
-class DoneEvent extends Event {
- DoneEvent(void action()) : super(action);
+class DoneEvent extends SubscriptionEvent {
+ DoneEvent(SubscriptionProtocolTest sub, void action()) : super(sub, action);
bool _testDone() => true;
- String toString() => "[Done]";
+ String toString() => "[Done$_id]";
}
class PauseCallbackEvent extends Event {
@@ -314,38 +439,47 @@ class CancelCallbackEvent extends Event {
String toString() => "[Cancelled]";
}
-
+/** Event matcher that matches any other event. */
class LogAnyEvent extends Event {
String _actual = "*Not matched yet*";
+
LogAnyEvent(void action()) : super(action);
+
bool _testData(var data) {
_actual = "*[Data $data]";
return true;
}
+
bool _testError(error) {
_actual = "*[Error ${error}]";
return true;
}
+
bool _testDone() {
_actual = "*[Done]";
return true;
}
+
bool _testPause() {
_actual = "*[Paused]";
return true;
}
+
bool _testResume() {
_actual = "*[Resumed]";
return true;
}
+
bool _testSubcribe() {
_actual = "*[Subscribed]";
return true;
}
+
bool _testCancel() {
_actual = "*[Cancelled]";
return true;
}
+ /** Returns a representation of the event it was tested against. */
String toString() => _actual;
}
« no previous file with comments | « tests/lib/async/stream_single_to_multi_subscriber_test.dart ('k') | tests/lib/async/stream_state_nonzero_timer_test.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698