Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(439)

Side by Side Diff: test/util/forkable_stream_test.dart

Issue 1262623006: Temporarily bring in code from the async package. (Closed) Base URL: git@github.com:dart-lang/test@master
Patch Set: Created 5 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 // TODO(nweiz): Get rid of this when https://codereview.chromium.org/1241723003/
6 // lands.
7
8 import 'dart:async';
9
10 import 'package:test/test.dart';
11 import 'package:test/src/util/forkable_stream.dart';
12 import 'package:test/src/util/stream_queue.dart';
13
14 void main() {
15 var controller;
16 var stream;
17 setUp(() {
18 var cancelFuture = new Future.value(42);
19 controller = new StreamController<int>(onCancel: () => cancelFuture);
20 stream = new ForkableStream<int>(controller.stream);
21 });
22
23 group("with no forks", () {
24 test("forwards events, errors, and close", () async {
25 var queue = new StreamQueue(stream);
26
27 controller.add(1);
28 expect(await queue.next, equals(1));
29
30 controller.add(2);
31 expect(await queue.next, equals(2));
32
33 controller.addError("error");
34 expect(queue.next, throwsA("error"));
35 await flushMicrotasks();
36
37 controller.add(3);
38 expect(await queue.next, equals(3));
39
40 controller.close();
41 expect(await queue.hasNext, isFalse);
42 });
43
44 test("listens to, pauses, and cancels the controller", () {
45 expect(controller.hasListener, isFalse);
46
47 var sub = stream.listen(null);
48 expect(controller.hasListener, isTrue);
49
50 sub.pause();
51 expect(controller.isPaused, isTrue);
52
53 sub.resume();
54 expect(controller.isPaused, isFalse);
55
56 sub.cancel();
57 expect(controller.hasListener, isFalse);
58 });
59
60 test("unpauses the controller when a fork is listened", () {
61 stream.listen(null).pause();
62 expect(controller.isPaused, isTrue);
63
64 var fork = stream.fork();
65 expect(controller.isPaused, isTrue);
66
67 fork.listen(null);
68 expect(controller.isPaused, isFalse);
69 });
70 });
71
72 group("with a fork created before the stream was listened", () {
73 var fork;
74 setUp(() {
75 fork = stream.fork();
76 });
77
78 test("forwards events, errors, and close to both branches", () async {
79 var queue = new StreamQueue(stream);
80 var forkQueue = new StreamQueue(fork);
81
82 controller.add(1);
83 expect(await queue.next, equals(1));
84 expect(await forkQueue.next, equals(1));
85
86 controller.add(2);
87 expect(await queue.next, equals(2));
88 expect(await forkQueue.next, equals(2));
89
90 controller.addError("error");
91 expect(queue.next, throwsA("error"));
92 expect(forkQueue.next, throwsA("error"));
93 await flushMicrotasks();
94
95 controller.add(3);
96 expect(await queue.next, equals(3));
97 expect(await forkQueue.next, equals(3));
98
99 controller.close();
100 expect(await queue.hasNext, isFalse);
101 expect(await forkQueue.hasNext, isFalse);
102 });
103
104 test('listens to the source when the original is listened', () {
105 expect(controller.hasListener, isFalse);
106 stream.listen(null);
107 expect(controller.hasListener, isTrue);
108 });
109
110 test('listens to the source when the fork is listened', () {
111 expect(controller.hasListener, isFalse);
112 fork.listen(null);
113 expect(controller.hasListener, isTrue);
114 });
115 });
116
117 test("with a fork created after the stream emitted a few events, forwards "
118 "future events, errors, and close to both branches", () async {
119 var queue = new StreamQueue(stream);
120
121 controller.add(1);
122 expect(await queue.next, equals(1));
123
124 controller.add(2);
125 expect(await queue.next, equals(2));
126
127 var fork = stream.fork();
128 var forkQueue = new StreamQueue(fork);
129
130 controller.add(3);
131 expect(await queue.next, equals(3));
132 expect(await forkQueue.next, equals(3));
133
134 controller.addError("error");
135 expect(queue.next, throwsA("error"));
136 expect(forkQueue.next, throwsA("error"));
137 await flushMicrotasks();
138
139 controller.close();
140 expect(await queue.hasNext, isFalse);
141 expect(await forkQueue.hasNext, isFalse);
142 });
143
144 group("with multiple forks", () {
145 var fork1;
146 var fork2;
147 var fork3;
148 var fork4;
149 setUp(() {
150 fork1 = stream.fork();
151 fork2 = stream.fork();
152 fork3 = stream.fork();
153 fork4 = stream.fork();
154 });
155
156 test("forwards events, errors, and close to all branches", () async {
157 var queue1 = new StreamQueue(stream);
158 var queue2 = new StreamQueue(fork1);
159 var queue3 = new StreamQueue(fork2);
160 var queue4 = new StreamQueue(fork3);
161 var queue5 = new StreamQueue(fork4);
162
163 controller.add(1);
164 expect(await queue1.next, equals(1));
165 expect(await queue2.next, equals(1));
166 expect(await queue3.next, equals(1));
167 expect(await queue4.next, equals(1));
168 expect(await queue5.next, equals(1));
169
170 controller.add(2);
171 expect(await queue1.next, equals(2));
172 expect(await queue2.next, equals(2));
173 expect(await queue3.next, equals(2));
174 expect(await queue4.next, equals(2));
175 expect(await queue5.next, equals(2));
176
177 controller.addError("error");
178 expect(queue1.next, throwsA("error"));
179 expect(queue2.next, throwsA("error"));
180 expect(queue3.next, throwsA("error"));
181 expect(queue4.next, throwsA("error"));
182 expect(queue5.next, throwsA("error"));
183 await flushMicrotasks();
184
185 controller.add(3);
186 expect(await queue1.next, equals(3));
187 expect(await queue2.next, equals(3));
188 expect(await queue3.next, equals(3));
189 expect(await queue4.next, equals(3));
190 expect(await queue5.next, equals(3));
191
192 controller.close();
193 expect(await queue1.hasNext, isFalse);
194 expect(await queue2.hasNext, isFalse);
195 expect(await queue3.hasNext, isFalse);
196 expect(await queue4.hasNext, isFalse);
197 expect(await queue5.hasNext, isFalse);
198 });
199
200 test("forwards events in order of forking", () async {
201 var queue1 = new StreamQueue(stream);
202 var queue2 = new StreamQueue(fork1);
203 var queue3 = new StreamQueue(fork2);
204 var queue4 = new StreamQueue(fork3);
205 var queue5 = new StreamQueue(fork4);
206
207 for (var i = 0; i < 4; i++) {
208 controller.add(i);
209
210 var queue1Fired = false;
211 var queue2Fired = false;
212 var queue3Fired = false;
213 var queue4Fired = false;
214 var queue5Fired = false;
215
216 queue5.next.then(expectAsync((_) {
217 queue5Fired = true;
218 expect(queue1Fired, isTrue);
219 expect(queue2Fired, isTrue);
220 expect(queue3Fired, isTrue);
221 expect(queue4Fired, isTrue);
222 }));
223
224 queue1.next.then(expectAsync((_) {
225 queue1Fired = true;
226 expect(queue2Fired, isFalse);
227 expect(queue3Fired, isFalse);
228 expect(queue4Fired, isFalse);
229 expect(queue5Fired, isFalse);
230 }));
231
232 queue4.next.then(expectAsync((_) {
233 queue4Fired = true;
234 expect(queue1Fired, isTrue);
235 expect(queue2Fired, isTrue);
236 expect(queue3Fired, isTrue);
237 expect(queue5Fired, isFalse);
238 }));
239
240 queue2.next.then(expectAsync((_) {
241 queue2Fired = true;
242 expect(queue1Fired, isTrue);
243 expect(queue3Fired, isFalse);
244 expect(queue4Fired, isFalse);
245 expect(queue5Fired, isFalse);
246 }));
247
248 queue3.next.then(expectAsync((_) {
249 queue3Fired = true;
250 expect(queue1Fired, isTrue);
251 expect(queue2Fired, isTrue);
252 expect(queue4Fired, isFalse);
253 expect(queue5Fired, isFalse);
254 }));
255 }
256 });
257
258 test("pauses the source when all forks are paused and/or not listening",
259 () {
260 var sub1 = stream.listen(null);
261 var sub2 = fork1.listen(null);
262 expect(controller.isPaused, isFalse);
263
264 sub1.pause();
265 expect(controller.isPaused, isFalse);
266
267 sub2.pause();
268 expect(controller.isPaused, isTrue);
269
270 var sub3 = fork2.listen(null);
271 expect(controller.isPaused, isFalse);
272
273 sub3.pause();
274 expect(controller.isPaused, isTrue);
275
276 sub2.resume();
277 expect(controller.isPaused, isFalse);
278
279 sub2.cancel();
280 expect(controller.isPaused, isTrue);
281 });
282
283 test("cancels the source when all forks are canceled", () async {
284 var sub1 = stream.listen(null);
285 expect(controller.hasListener, isTrue);
286
287 var sub2 = fork1.listen(null);
288 expect(controller.hasListener, isTrue);
289
290 expect(sub1.cancel(), isNull);
291 await flushMicrotasks();
292 expect(controller.hasListener, isTrue);
293
294 expect(sub2.cancel(), isNull);
295 await flushMicrotasks();
296 expect(controller.hasListener, isTrue);
297
298 expect(fork2.listen(null).cancel(), isNull);
299 await flushMicrotasks();
300 expect(controller.hasListener, isTrue);
301
302 expect(fork3.listen(null).cancel(), isNull);
303 await flushMicrotasks();
304 expect(controller.hasListener, isTrue);
305
306 expect(fork4.listen(null).cancel(), completion(equals(42)));
307 await flushMicrotasks();
308 expect(controller.hasListener, isFalse);
309 });
310 });
311
312 group("modification during dispatch:", () {
313 test("forking during onCancel", () {
314 controller = new StreamController<int>(onCancel: expectAsync(() {
315 expect(stream.fork().toList(), completion(isEmpty));
316 }));
317 stream = new ForkableStream<int>(controller.stream);
318
319 stream.listen(null).cancel();
320 });
321
322 test("forking during onPause", () {
323 controller = new StreamController<int>(onPause: expectAsync(() {
324 stream.fork().listen(null);
325 }));
326 stream = new ForkableStream<int>(controller.stream);
327
328 stream.listen(null).pause();
329
330 // The fork created in onPause should have resumed the stream.
331 expect(controller.isPaused, isFalse);
332 });
333
334 test("forking during onData", () {
335 var sub;
336 sub = stream.listen(expectAsync((value1) {
337 expect(value1, equals(1));
338 stream.fork().listen(expectAsync((value2) {
339 expect(value2, equals(2));
340 }));
341 sub.cancel();
342 }));
343
344 controller.add(1);
345 controller.add(2);
346 });
347
348 test("canceling a fork during onData", () {
349 var fork = stream.fork();
350 var forkSub = fork.listen(expectAsync((_) {}, count: 0));
351
352 stream.listen(expectAsync((_) => forkSub.cancel()));
353 controller.add(null);
354 });
355
356 test("forking during onError", () {
357 var sub;
358 sub = stream.listen(null, onError: expectAsync((error1) {
359 expect(error1, equals("error 1"));
360 stream.fork().listen(null, onError: expectAsync((error2) {
361 expect(error2, equals("error 2"));
362 }));
363 sub.cancel();
364 }));
365
366 controller.addError("error 1");
367 controller.addError("error 2");
368 });
369
370 test("canceling a fork during onError", () {
371 var fork = stream.fork();
372 var forkSub = fork.listen(expectAsync((_) {}, count: 0));
373
374 stream.listen(null, onError: expectAsync((_) => forkSub.cancel()));
375 controller.addError("error");
376 });
377
378 test("forking during onDone", () {
379 stream.listen(null, onDone: expectAsync(() {
380 expect(stream.fork().toList(), completion(isEmpty));
381 }));
382
383 controller.close();
384 });
385
386 test("canceling a fork during onDone", () {
387 var fork = stream.fork();
388 var forkSub = fork.listen(null, onDone: expectAsync(() {}, count: 0));
389
390 stream.listen(null, onDone: expectAsync(() => forkSub.cancel()));
391 controller.close();
392 });
393 });
394
395 group("throws an error when", () {
396 test("a cancelled stream is forked", () {
397 stream.listen(null).cancel();
398 expect(stream.fork().toList(), completion(isEmpty));
399 });
400
401 test("a cancelled stream is forked even when other forks are alive", () {
402 stream.fork().listen(null);
403 stream.listen(null).cancel();
404
405 expect(controller.hasListener, isTrue);
406 expect(stream.fork().toList(), completion(isEmpty));
407 });
408
409 test("a closed stream is forked", () async {
410 controller.close();
411 await stream.listen(null).asFuture();
412 expect(stream.fork().toList(), completion(isEmpty));
413 });
414 });
415 }
416
417 Future flushMicrotasks() => new Future.delayed(Duration.ZERO);
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698