Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(561)

Side by Side Diff: test/stream_completer_test.dart

Issue 1149563010: Add new features to package:async. (Closed) Base URL: https://github.com/dart-lang/async@master
Patch Set: Add all.dart to test. Apparently people like that. Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 import "dart:async";
6
7 import "package:async/async.dart" show StreamCompleter;
8 import "package:test/test.dart";
9
10 main() {
11 test("a stream is linked before listening", () async {
12 var completer = new StreamCompleter();
13 completer.setSourceStream(createStream());
14 expect(completer.stream.toList(), completion([1, 2, 3, 4]));
15 });
16
17 test("listened to before a stream is linked", () async {
18 var completer = new StreamCompleter();
19 Future done = completer.stream.toList();
nweiz 2015/06/18 23:44:27 Nit: "var"
Lasse Reichstein Nielsen 2015/06/30 10:34:14 I have actually found full typing to catch bugs in
20 await flushMicrotasks();
21 completer.setSourceStream(createStream());
22 expect(done, completion([1, 2, 3, 4]));
23 });
24
25 test("cancel before linking a stream doesn't listen on stream", () async {
26 var completer = new StreamCompleter();
27 var subscription = completer.stream.listen(null);
28 subscription.pause(); // Should be ignored.
29 subscription.cancel();
30 completer.setSourceStream(new UnusableStream()); // Doesn't throw.
31 });
32
33 test("listen and pause before linking stream", () async {
nweiz 2015/06/18 23:44:27 Also test that this pause triggers "onPause" in th
Lasse Reichstein Nielsen 2015/06/30 10:34:14 Done.
34 var controller = new StreamCompleter();
35 var events = [];
36 var subscription = controller.stream.listen(events.add);
37 var done = subscription.asFuture();
38 subscription.pause();
39 controller.setSourceStream(createStream());
40 await flushMicrotasks();
41 expect(events, []);
42 subscription.resume();
43 await done;
44 expect(events, [1, 2, 3, 4]);
45 });
46
47 test("pause more than once", () async {
48 var completer = new StreamCompleter();
49 var events = [];
50 var subscription = completer.stream.listen(events.add);
51 Future done = subscription.asFuture();
52 subscription.pause();
53 subscription.pause();
54 subscription.pause();
55 completer.setSourceStream(createStream());
56 for (int i = 0; i < 3; i++) {
57 await flushMicrotasks();
58 expect(events, []);
59 subscription.resume();
60 }
61 await done;
62 expect(events, [1, 2, 3, 4]);
63 });
64
65 test("cancel new stream before source is done.", () async {
66 var completer = new StreamCompleter();
67 var listened = false;
68 var canceled = false;
69 var lastEvent = -1;
70 var controller = new StreamController(onListen: () { listened = true; },
nweiz 2015/06/18 23:44:27 It's not actually in the style guide right now, bu
Lasse Reichstein Nielsen 2015/06/30 10:34:14 That's silly. I'll rewrite it to () => listened
71 onCancel: () { canceled = true; });
72 var subscription;
73 subscription = completer.stream.listen(
74 (v) {
nweiz 2015/06/18 23:44:27 "v" -> "value"
Lasse Reichstein Nielsen 2015/06/30 10:34:14 Done.
75 expect(v, lessThan(3));
76 lastEvent = v;
77 if (v == 2) {
78 subscription.cancel();
79 }
80 },
81 onError: unreachable("error"),
82 onDone: unreachable("done"),
83 cancelOnError: true);
84 completer.setSourceStream(controller.stream);
85 expect(listened, isTrue);
86
87 await flushMicrotasks();
88 expect(canceled, isFalse);
89 controller.add(1);
90
91 await flushMicrotasks();
92 expect(lastEvent, 1);
93 expect(canceled, isFalse);
94 controller.add(2);
95
96 await flushMicrotasks();
97 expect(lastEvent, 2);
98 expect(canceled, isTrue);
99 });
100
101 test("complete with setEmpty before listening", () async {
102 var completer = new StreamCompleter();
103 completer.setEmpty();
104 var done = new Completer();
105 completer.stream.listen(
106 unreachable("data"),
107 onError: unreachable("error"),
108 onDone: done.complete);
109 await done.future;
110 });
111
112 test("complete with setEmpty after listening", () async {
113 var completer = new StreamCompleter();
114 var done = new Completer();
115 completer.stream.listen(
116 unreachable("data"),
117 onError: unreachable("error"),
118 onDone: done.complete);
119 completer.setEmpty();
120 await done.future;
121 });
122
123 test("source stream isn't listened to until completer stream is.", () async {
nweiz 2015/06/18 23:44:27 Nit: no period
Lasse Reichstein Nielsen 2015/06/30 10:34:14 Done.
124 var completer = new StreamCompleter();
125 bool listened = false;
nweiz 2015/06/18 23:44:27 Nit: "var"
Lasse Reichstein Nielsen 2015/06/30 10:34:14 Done.
126 var controller;
127 controller = new StreamController(onListen: () {
128 listened = true;
129 () async { controller.close(); } (); // In later microtask.
nweiz 2015/06/18 23:44:27 I think "scheduleMicrotask" is a little more expli
Lasse Reichstein Nielsen 2015/06/30 10:34:14 And it has exactly the same length. Scary.
130 });
131
132 completer.setSourceStream(controller.stream);
133 await flushMicrotasks();
134 expect(listened, isFalse);
135 var subscription = completer.stream.listen(null);
136 expect(listened, isTrue);
137 await subscription.asFuture();
138 });
139
140 test("cancelOnError true when listening before linking stream", () async {
141 var completer = new StreamCompleter();
142 var listened = false;
143 var canceled = false;
144 var lastEvent = -1;
145 var controller = new StreamController(onListen: () { listened = true; },
146 onCancel: () { canceled = true; });
147 var subscription = completer.stream.listen(
148 (v) {
149 expect(v, lessThan(3));
150 lastEvent = v;
151 },
152 onError: (v) {
153 expect(v, "3");
154 lastEvent = v;
155 },
156 onDone: unreachable("done"),
157 cancelOnError: true);
158 completer.setSourceStream(controller.stream);
159 expect(listened, isTrue);
160
161 await flushMicrotasks();
162 expect(canceled, isFalse);
163 controller.add(1);
164
165 await flushMicrotasks();
166 expect(lastEvent, 1);
167 expect(canceled, isFalse);
168 controller.add(2);
169
170 await flushMicrotasks();
171 expect(lastEvent, 2);
172 expect(canceled, isFalse);
173 controller.addError("3");
174
175 await flushMicrotasks();
176 expect(lastEvent, "3");
177 expect(canceled, isTrue);
178 });
179
180 test("cancelOnError true when listening after linking stream", () async {
181 var completer = new StreamCompleter();
182 var listened = false;
183 var canceled = false;
184 var lastEvent = -1;
185 var controller = new StreamController(onListen: () { listened = true; },
186 onCancel: () { canceled = true; });
187 completer.setSourceStream(controller.stream);
188 controller.add(1);
189 expect(listened, isFalse);
190
191 var subscription = completer.stream.listen(
192 (v) {
193 expect(v, lessThan(3));
194 lastEvent = v;
195 },
196 onError: (v) {
197 expect(v, "3");
198 lastEvent = v;
199 },
200 onDone: unreachable("done"),
201 cancelOnError: true);
202
203 expect(listened, isTrue);
204
205 await flushMicrotasks();
206 expect(lastEvent, 1);
207 expect(canceled, isFalse);
208 controller.add(2);
209
210 await flushMicrotasks();
211 expect(lastEvent, 2);
212 expect(canceled, isFalse);
213 controller.addError("3");
214
215 await flushMicrotasks();
216 expect(canceled, isTrue);
217 });
218
219 test("linking a stream after setSourceStream before listen", () async {
220 var completer = new StreamCompleter();
221 completer.setSourceStream(createStream());
222 expect(() { completer.setSourceStream(createStream()); }, throws);
223 expect(() { completer.setEmpty(createStream()); }, throws);
224 await completer.stream.toList();
225 // Still fails after source is done
226 expect(() { completer.setSourceStream(createStream()); }, throws);
227 expect(() { completer.setEmpty(createStream()); }, throws);
228 });
229
230 test("linking a stream after setSourceStream after listen", () async {
231 var completer = new StreamCompleter();
232 var list = completer.stream.toList();
233 completer.setSourceStream(createStream());
234 expect(() { completer.setSourceStream(createStream()); }, throws);
235 expect(() { completer.setEmpty(createStream()); }, throws);
236 await list;
237 // Still fails after source is done.
238 expect(() { completer.setSourceStream(createStream()); }, throws);
239 expect(() { completer.setEmpty(createStream()); }, throws);
240 });
241
242 test("linking a stream after setEmpty before listen", () async {
243 var completer = new StreamCompleter();
244 completer.setEmpty();
245 expect(() { completer.setSourceStream(createStream()); }, throws);
246 expect(() { completer.setEmpty(createStream()); }, throws);
247 await completer.stream.toList();
248 // Still fails after source is done
249 expect(() { completer.setSourceStream(createStream()); }, throws);
250 expect(() { completer.setEmpty(createStream()); }, throws);
251 });
252
253 test("linking a stream after setEmpty() after listen", () async {
254 var completer = new StreamCompleter();
255 var list = completer.stream.toList();
256 completer.setEmpty();
257 expect(() { completer.setSourceStream(createStream()); }, throws);
258 expect(() { completer.setEmpty(createStream()); }, throws);
259 await list;
260 // Still fails after source is done.
261 expect(() { completer.setSourceStream(createStream()); }, throws);
262 expect(() { completer.setEmpty(createStream()); }, throws);
263 });
264
265 test("Listening more than once after setting stream", () async {
nweiz 2015/06/18 23:44:27 Nit: "listening" (also below)
Lasse Reichstein Nielsen 2015/06/30 10:34:14 Done.
266 var completer = new StreamCompleter();
267 completer.setSourceStream(createStream());
268 var list = completer.stream.toList();
269 expect(() { completer.stream.toList(); }, throws);
270 await list;
271 expect(() { completer.stream.toList(); }, throws);
272 });
273
274 test("Listening more than once before setting stream", () async {
275 var completer = new StreamCompleter();
276 var list = completer.stream.toList();
277 expect(() { completer.stream.toList(); }, throws);
278 });
279
280 test("setting onData etc. before and after setting stream", () async {
281 var completer = new StreamCompleter();
282 var controller = new StreamController();
283 var subscription = completer.stream.listen(null);
284 var lastEvent = 0;
285 subscription.onData((v) { lastEvent = v; });
286 subscription.onError((v) { lastEvent = "$v"; });
287 subscription.onDone(() { lastEvent = -1; });
288 completer.setSourceStream(controller.stream);
289 await flushMicrotasks();
290 controller.add(1);
291 await flushMicrotasks();
292 expect(lastEvent, 1);
293 controller.addError(2);
294 await flushMicrotasks();
295 expect(lastEvent, "2");
296 subscription.onData((v) { lastEvent = -v; });
297 subscription.onError((v) { lastEvent = "${-v}"; });
298 controller.add(1);
299 await flushMicrotasks();
300 expect(lastEvent, -1);
301 controller.addError(2);
302 await flushMicrotasks();
303 expect(lastEvent, "-2");
304 controller.close();
305 await flushMicrotasks();
306 expect(lastEvent, -1);
307 });
308
309 test("pause w/ resume future accross setting stream", () async {
310 var completer = new StreamCompleter();
311 var resume = new Completer();
312 var subscription = completer.stream.listen(unreachable("data"));
313 var lastEvent = 0;
314 subscription.pause(resume.future);
315 await flushMicrotasks();
316 completer.setSourceStream(createStream());
317 await flushMicrotasks();
318 resume.complete();
319 var events = [];
320 subscription.onData(events.add);
321 await subscription.asFuture();
322 expect(events, [1, 2, 3, 4]);
323 });
324
325 test("asFuture with error accross setting stream", () async {
326 var completer = new StreamCompleter();
327 var controller = new StreamController();
328 var subscription = completer.stream.listen(unreachable("data"),
329 cancelOnError: false);
330 var done = subscription.asFuture();
331 expect(controller.hasListener, isFalse);
332 completer.setSourceStream(controller.stream);
333 await flushMicrotasks();
334 expect(controller.hasListener, isTrue);
335 controller.addError(42);
336 await done.then(unreachable("data"), onError: (error) {
337 expect(error, 42);
338 });
339 expect(controller.hasListener, isFalse);
340 });
341 }
342
343 /// A zero-millisecond timer should wait until after all microtasks.
344 Future flushMicrotasks() => new Future.delayed(Duration.ZERO);
nweiz 2015/06/18 23:44:27 Consider moving this (and maybe [unreachable] and
Lasse Reichstein Nielsen 2015/06/30 10:34:14 Done.
345
346 Stream<int> createStream() async* {
347 yield 1;
348 await flushMicrotasks();
349 yield 2;
350 await flushMicrotasks();
351 yield 3;
352 await flushMicrotasks();
353 yield 4;
354 }
355
356 unreachable(String name) => ([a, b]) { fail("Unreachable: $name"); };
357
358 class UnusableStream extends Stream {
359 listen(onData, {onError, onDone, cancelOnError}) {
360 throw new UnimplementedError("Gotcha!");
361 }
362 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698