Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(21)

Unified Diff: tests/lib/async/stream_state_helper.dart

Issue 14753009: Make StreamSubscription be the active part of a stream. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Address comments. Created 7 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: tests/lib/async/stream_state_helper.dart
diff --git a/tests/lib/async/stream_state_helper.dart b/tests/lib/async/stream_state_helper.dart
index dbc645a47c9029bf4d51fdee7023090b41bfc94c..c433e391cb53ba8fb6fd818cd2d06b69bd810da6 100644
--- a/tests/lib/async/stream_state_helper.dart
+++ b/tests/lib/async/stream_state_helper.dart
@@ -9,6 +9,7 @@ import "dart:async";
import "dart:collection";
class StreamProtocolTest {
+ bool trace = false;
StreamController _controller;
Stream _controllerStream;
StreamSubscription _subscription;
@@ -20,9 +21,8 @@ class StreamProtocolTest {
_controller = new StreamController(
onListen: _onSubcription,
onPause: _onPause,
- onResume: _onPause,
- onCancel: _onSubcription);
- // TODO(lrn): Make it work with multiple subscribers too.
+ onResume: _onResume,
+ onCancel: _onCancel);
if (broadcast) {
_controllerStream = _controller.stream.asBroadcastStream();
} else {
@@ -54,7 +54,7 @@ class StreamProtocolTest {
_subscription.pause(resumeSignal);
}
- void resume([Future resumeSignal]) {
+ void resume() {
if (_subscription == null) throw new StateError("Not subscribed");
_subscription.resume();
}
@@ -67,6 +67,7 @@ class StreamProtocolTest {
// Handling of stream events.
void _onData(var data) {
+ if (trace) print("[Data : $data]");
_withNextExpectation((Event expect) {
if (!expect.matchData(data)) {
_fail("Expected: $expect\n"
@@ -76,15 +77,17 @@ class StreamProtocolTest {
}
void _onError(error) {
+ if (trace) print("[Error : $error]");
_withNextExpectation((Event expect) {
if (!expect.matchError(error)) {
_fail("Expected: $expect\n"
- "Found : [Data: ${error}]");
+ "Found : [Error: ${error}]");
}
});
}
void _onDone() {
+ if (trace) print("[Done]");
_subscription = null;
_withNextExpectation((Event expect) {
if (!expect.matchDone()) {
@@ -95,20 +98,41 @@ class StreamProtocolTest {
}
void _onPause() {
+ if (trace) print("[Pause]");
_withNextExpectation((Event expect) {
- if (!expect.matchPauseChange(_controller)) {
+ if (!expect.matchPause()) {
_fail("Expected: $expect\n"
- "Found : [Paused:${_controller.isPaused}]");
+ "Found : [Paused]");
+ }
+ });
+ }
+
+ void _onResume() {
+ if (trace) print("[Resumed]");
+ _withNextExpectation((Event expect) {
+ if (!expect.matchResume()) {
+ _fail("Expected: $expect\n"
+ "Found : [Resumed]");
}
});
}
void _onSubcription() {
+ if (trace) print("[Subscribed]");
_withNextExpectation((Event expect) {
- if (!expect.matchSubscriptionChange(_controller)) {
+ if (!expect.matchSubscribe()) {
_fail("Expected: $expect\n"
- "Found: [Has listener:${_controller.hasListener}, "
- "Paused:${_controller.isPaused}]");
+ "Found: [Subscribed]");
+ }
+ });
+ }
+
+ void _onCancel() {
+ if (trace) print("[Cancelled]");
+ _withNextExpectation((Event expect) {
+ if (!expect.matchCancel()) {
+ _fail("Expected: $expect\n"
+ "Found: [Cancelled]");
}
});
}
@@ -117,9 +141,10 @@ class StreamProtocolTest {
if (_nextExpectationIndex == _expectations.length) {
action(new MismatchEvent());
} else {
- Event next = _expectations[_nextExpectationIndex++];
+ Event next = _expectations[_nextExpectationIndex];
action(next);
}
+ _nextExpectationIndex++;
_checkDone();
}
@@ -155,18 +180,31 @@ class StreamProtocolTest {
}
_expectations.add(new DoneEvent(action));
}
- void expectPause(bool isPaused, [void action()]) {
+ void expectPause([void action()]) {
+ if (_onComplete == null) {
+ _fail("Adding expectation after completing");
+ }
+ _expectations.add(new PauseCallbackEvent(action));
+ }
+ void expectResume([void action()]) {
+ if (_onComplete == null) {
+ _fail("Adding expectation after completing");
+ }
+ _expectations.add(new ResumeCallbackEvent(action));
+ }
+ void expectSubscription([void action()]) {
if (_onComplete == null) {
_fail("Adding expectation after completing");
}
- _expectations.add(new PauseCallbackEvent(isPaused, action));
+ _expectations.add(
+ new SubscriptionCallbackEvent(action));
}
- void expectSubscription(bool hasListener, bool isPaused, [void action()]) {
+ void expectCancel([void action()]) {
if (_onComplete == null) {
_fail("Adding expectation after completing");
}
_expectations.add(
- new SubscriptionCallbackEvent(hasListener, isPaused, action));
+ new CancelCallbackEvent(action));
}
void _fail(String message) {
@@ -178,11 +216,6 @@ class StreamProtocolTest {
}
}
-class EventCollector {
- final Queue<Event> events = new Queue<Event>();
-
-}
-
class Event {
Function _action;
Event(void this._action());
@@ -202,13 +235,23 @@ class Event {
if (_action != null) _action();
return true;
}
- bool matchPauseChange(StreamController c) {
- if (!_testPause(c)) return false;
+ bool matchPause() {
+ if (!_testPause()) return false;
if (_action != null) _action();
return true;
}
- bool matchSubscriptionChange(StreamController c) {
- if (!_testSubscribe(c)) return false;
+ bool matchResume() {
+ if (!_testResume()) return false;
+ if (_action != null) _action();
+ return true;
+ }
+ bool matchSubscribe() {
+ if (!_testSubscribe()) return false;
+ if (_action != null) _action();
+ return true;
+ }
+ bool matchCancel() {
+ if (!_testCancel()) return false;
if (_action != null) _action();
return true;
}
@@ -216,8 +259,10 @@ class Event {
bool _testData(_) => false;
bool _testError(_) => false;
bool _testDone() => false;
- bool _testPause(_) => false;
- bool _testSubscribe(_) => false;
+ bool _testPause() => false;
+ bool _testResume() => false;
+ bool _testSubscribe() => false;
+ bool _testCancel() => false;
}
class MismatchEvent extends Event {
@@ -246,22 +291,27 @@ class DoneEvent extends Event {
}
class PauseCallbackEvent extends Event {
- final bool isPaused;
- PauseCallbackEvent(this.isPaused, void action())
- : super(action);
- bool _testPause(StreamController c) => isPaused == c.isPaused;
- String toString() => "[Paused:$isPaused]";
+ PauseCallbackEvent(void action()) : super(action);
+ bool _testPause() => true;
+ String toString() => "[Paused]";
+}
+
+class ResumeCallbackEvent extends Event {
+ ResumeCallbackEvent(void action()) : super(action);
+ bool _testResume() => true;
+ String toString() => "[Resumed]";
}
class SubscriptionCallbackEvent extends Event {
- final bool hasListener;
- final bool isPaused;
- SubscriptionCallbackEvent(this.hasListener, this.isPaused, void action())
- : super(action);
- bool _testSubscribe(StreamController c) {
- return hasListener == c.hasListener && isPaused == c.isPaused;
- }
- String toString() => "[Has listener:$hasListener, Paused:$isPaused]";
+ SubscriptionCallbackEvent(void action()) : super(action);
+ bool _testSubscribe() => true;
+ String toString() => "[Subscribed]";
+}
+
+class CancelCallbackEvent extends Event {
+ CancelCallbackEvent(void action()) : super(action);
+ bool _testCancel() => true;
+ String toString() => "[Cancelled]";
}
@@ -280,12 +330,20 @@ class LogAnyEvent extends Event {
_actual = "*[Done]";
return true;
}
- bool _testPause(StreamController c) {
- _actual = "*[Paused:${c.isPaused}]";
+ bool _testPause() {
+ _actual = "*[Paused]";
+ return true;
+ }
+ bool _testResume() {
+ _actual = "*[Resumed]";
+ return true;
+ }
+ bool _testSubcribe() {
+ _actual = "*[Subscribed]";
return true;
}
- bool _testSubcribe(StreamController c) {
- _actual = "*[Has listener:${c.hasListener}, Paused:${c.isPaused}]";
+ bool _testCancel() {
+ _actual = "*[Cancelled]";
return true;
}
« no previous file with comments | « tests/lib/async/stream_controller_async_test.dart ('k') | tests/lib/async/stream_state_nonzero_timer_test.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698