Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(3)

Side by Side Diff: test/forkable_stream_test.dart

Issue 1241723003: Add StreamQueue.fork and ForkableStream. (Closed) Base URL: git@github.com:dart-lang/async.git@master
Patch Set: Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 import 'dart:async';
6
7 import 'package:async/async.dart';
8 import 'package:test/test.dart';
9
10 import 'utils.dart';
11
12 void main() {
13 var controller;
14 var stream;
15 setUp(() {
16 var cancelFuture = new Future.value(42);
17 controller = new StreamController<int>(onCancel: () => cancelFuture);
18 stream = new ForkableStream<int>(controller.stream);
19 });
20
21 group("with no forks", () {
22 test("forwards events, errors, and close", () async {
23 var queue = new StreamQueue(stream);
24
25 controller.add(1);
26 expect(await queue.next, equals(1));
27
28 controller.add(2);
29 expect(await queue.next, equals(2));
30
31 controller.addError("error");
32 expect(queue.next, throwsA("error"));
33 await flushMicrotasks();
34
35 controller.add(3);
36 expect(await queue.next, equals(3));
37
38 controller.close();
39 expect(await queue.hasNext, isFalse);
40 });
41
42 test("listens to, pauses, and cancels the controller", () {
43 expect(controller.hasListener, isFalse);
44
45 var sub = stream.listen(null);
46 expect(controller.hasListener, isTrue);
47
48 sub.pause();
49 expect(controller.isPaused, isTrue);
50
51 sub.resume();
52 expect(controller.isPaused, isFalse);
53
54 sub.cancel();
55 expect(controller.hasListener, isFalse);
56 });
57
58 test("unpauses the controller when a fork is listened", () {
59 stream.listen(null).pause();
60 expect(controller.isPaused, isTrue);
61
62 var fork = stream.fork();
63 expect(controller.isPaused, isTrue);
64
65 fork.listen(null);
66 expect(controller.isPaused, isFalse);
67 });
68 });
69
70 group("with a fork created before the stream was listened", () {
71 var fork;
72 setUp(() {
73 fork = stream.fork();
74 });
75
76 test("forwards events, errors, and close to both branches", () async {
77 var queue = new StreamQueue(stream);
78 var forkQueue = new StreamQueue(fork);
79
80 controller.add(1);
81 expect(await queue.next, equals(1));
82 expect(await forkQueue.next, equals(1));
83
84 controller.add(2);
85 expect(await queue.next, equals(2));
86 expect(await forkQueue.next, equals(2));
87
88 controller.addError("error");
89 expect(queue.next, throwsA("error"));
90 expect(forkQueue.next, throwsA("error"));
91 await flushMicrotasks();
92
93 controller.add(3);
94 expect(await queue.next, equals(3));
95 expect(await forkQueue.next, equals(3));
96
97 controller.close();
98 expect(await queue.hasNext, isFalse);
99 expect(await forkQueue.hasNext, isFalse);
100 });
101
102 test('listens to the source when the original is listened', () {
103 expect(controller.hasListener, isFalse);
104 stream.listen(null);
105 expect(controller.hasListener, isTrue);
106 });
107
108 test('listens to the source when the fork is listened', () {
109 expect(controller.hasListener, isFalse);
110 fork.listen(null);
111 expect(controller.hasListener, isTrue);
112 });
113 });
114
115 test("with a fork created after the stream emitted a few events, forwards "
116 "future events, errors, and close to both branches", () async {
117 var queue = new StreamQueue(stream);
118
119 controller.add(1);
120 expect(await queue.next, equals(1));
121
122 controller.add(2);
123 expect(await queue.next, equals(2));
124
125 var fork = stream.fork();
126 var forkQueue = new StreamQueue(fork);
127
128 controller.add(3);
129 expect(await queue.next, equals(3));
130 expect(await forkQueue.next, equals(3));
131
132 controller.addError("error");
133 expect(queue.next, throwsA("error"));
134 expect(forkQueue.next, throwsA("error"));
135 await flushMicrotasks();
136
137 controller.close();
138 expect(await queue.hasNext, isFalse);
139 expect(await forkQueue.hasNext, isFalse);
140 });
141
142 group("with multiple forks", () {
143 var fork1;
144 var fork2;
145 var fork3;
146 var fork4;
147 setUp(() {
148 fork1 = stream.fork();
149 fork2 = stream.fork();
150 fork3 = stream.fork();
151 fork4 = stream.fork();
152 });
153
154 test("forwards events, errors, and close to all branches", () async {
155 var queue1 = new StreamQueue(stream);
156 var queue2 = new StreamQueue(fork1);
157 var queue3 = new StreamQueue(fork2);
158 var queue4 = new StreamQueue(fork3);
159 var queue5 = new StreamQueue(fork4);
160
161 controller.add(1);
162 expect(await queue1.next, equals(1));
163 expect(await queue2.next, equals(1));
164 expect(await queue3.next, equals(1));
165 expect(await queue4.next, equals(1));
166 expect(await queue5.next, equals(1));
167
168 controller.add(2);
169 expect(await queue1.next, equals(2));
170 expect(await queue2.next, equals(2));
171 expect(await queue3.next, equals(2));
172 expect(await queue4.next, equals(2));
173 expect(await queue5.next, equals(2));
174
175 controller.addError("error");
176 expect(queue1.next, throwsA("error"));
177 expect(queue2.next, throwsA("error"));
178 expect(queue3.next, throwsA("error"));
179 expect(queue4.next, throwsA("error"));
180 expect(queue5.next, throwsA("error"));
181 await flushMicrotasks();
182
183 controller.add(3);
184 expect(await queue1.next, equals(3));
185 expect(await queue2.next, equals(3));
186 expect(await queue3.next, equals(3));
187 expect(await queue4.next, equals(3));
188 expect(await queue5.next, equals(3));
189
190 controller.close();
191 expect(await queue1.hasNext, isFalse);
192 expect(await queue2.hasNext, isFalse);
193 expect(await queue3.hasNext, isFalse);
194 expect(await queue4.hasNext, isFalse);
195 expect(await queue5.hasNext, isFalse);
196 });
197
198 test("forwards events in order of forking", () async {
199 var queue1 = new StreamQueue(stream);
200 var queue2 = new StreamQueue(fork1);
201 var queue3 = new StreamQueue(fork2);
202 var queue4 = new StreamQueue(fork3);
203 var queue5 = new StreamQueue(fork4);
204
205 for (var i = 0; i < 4; i++) {
206 controller.add(i);
207
208 var queue1Fired = false;
209 var queue2Fired = false;
210 var queue3Fired = false;
211 var queue4Fired = false;
212 var queue5Fired = false;
213
214 queue5.next.then(expectAsync((_) {
215 queue5Fired = true;
216 expect(queue1Fired, isTrue);
217 expect(queue2Fired, isTrue);
218 expect(queue3Fired, isTrue);
219 expect(queue4Fired, isTrue);
220 }));
221
222 queue1.next.then(expectAsync((_) {
223 queue1Fired = true;
224 expect(queue2Fired, isFalse);
225 expect(queue3Fired, isFalse);
226 expect(queue4Fired, isFalse);
227 expect(queue5Fired, isFalse);
228 }));
229
230 queue4.next.then(expectAsync((_) {
231 queue4Fired = true;
232 expect(queue1Fired, isTrue);
233 expect(queue2Fired, isTrue);
234 expect(queue3Fired, isTrue);
235 expect(queue5Fired, isFalse);
236 }));
237
238 queue2.next.then(expectAsync((_) {
239 queue2Fired = true;
240 expect(queue1Fired, isTrue);
241 expect(queue3Fired, isFalse);
242 expect(queue4Fired, isFalse);
243 expect(queue5Fired, isFalse);
244 }));
245
246 queue3.next.then(expectAsync((_) {
247 queue3Fired = true;
248 expect(queue1Fired, isTrue);
249 expect(queue2Fired, isTrue);
250 expect(queue4Fired, isFalse);
251 expect(queue5Fired, isFalse);
252 }));
253 }
254 });
255
256 test("pauses the source when all forks are paused and/or not listening",
257 () {
258 var sub1 = stream.listen(null);
259 var sub2 = fork1.listen(null);
260 expect(controller.isPaused, isFalse);
261
262 sub1.pause();
263 expect(controller.isPaused, isFalse);
264
265 sub2.pause();
266 expect(controller.isPaused, isTrue);
267
268 var sub3 = fork2.listen(null);
269 expect(controller.isPaused, isFalse);
270
271 sub3.pause();
272 expect(controller.isPaused, isTrue);
273
274 sub2.resume();
275 expect(controller.isPaused, isFalse);
276
277 sub2.cancel();
278 expect(controller.isPaused, isTrue);
279 });
280
281 test("cancels the source when all forks are canceled", () async {
282 var sub1 = stream.listen(null);
283 expect(controller.hasListener, isTrue);
284
285 var sub2 = fork1.listen(null);
286 expect(controller.hasListener, isTrue);
287
288 expect(sub1.cancel(), isNull);
289 await flushMicrotasks();
290 expect(controller.hasListener, isTrue);
291
292 expect(sub2.cancel(), isNull);
293 await flushMicrotasks();
294 expect(controller.hasListener, isTrue);
295
296 expect(fork2.listen(null).cancel(), isNull);
297 await flushMicrotasks();
298 expect(controller.hasListener, isTrue);
299
300 expect(fork3.listen(null).cancel(), isNull);
301 await flushMicrotasks();
302 expect(controller.hasListener, isTrue);
303
304 expect(fork4.listen(null).cancel(), completion(equals(42)));
305 await flushMicrotasks();
306 expect(controller.hasListener, isFalse);
307 });
308 });
309
310 group("modification during dispatch:", () {
311 test("forking during onCancel", () {
312 controller = new StreamController<int>(onCancel: expectAsync(() {
313 expect(stream.fork, throwsStateError);
314 }));
315 stream = new ForkableStream<int>(controller.stream);
316
317 stream.listen(null).cancel();
318 });
319
320 test("forking during onPause", () {
321 controller = new StreamController<int>(onPause: expectAsync(() {
322 stream.fork().listen(null);
323 }));
324 stream = new ForkableStream<int>(controller.stream);
325
326 stream.listen(null).pause();
327
328 // The fork created in onPause should have resumed the stream.
329 expect(controller.isPaused, isFalse);
330 });
331
332 test("forking during onData", () {
333 var sub;
334 sub = stream.listen(expectAsync((value1) {
335 expect(value1, equals(1));
336 stream.fork().listen(expectAsync((value2) {
337 expect(value2, equals(2));
338 }));
339 sub.cancel();
340 }));
341
342 controller.add(1);
343 controller.add(2);
344 });
345
346 test("canceling a fork during onData", () {
347 var fork = stream.fork();
348 var forkSub = fork.listen(expectAsync((_) {}, count: 0));
349
350 stream.listen(expectAsync((_) => forkSub.cancel()));
351 controller.add(null);
352 });
353
354 test("forking during onError", () {
355 var sub;
356 sub = stream.listen(null, onError: expectAsync((error1) {
357 expect(error1, equals("error 1"));
358 stream.fork().listen(null, onError: expectAsync((error2) {
359 expect(error2, equals("error 2"));
360 }));
361 sub.cancel();
362 }));
363
364 controller.addError("error 1");
365 controller.addError("error 2");
366 });
367
368 test("canceling a fork during onError", () {
369 var fork = stream.fork();
370 var forkSub = fork.listen(expectAsync((_) {}, count: 0));
371
372 stream.listen(null, onError: expectAsync((_) => forkSub.cancel()));
373 controller.addError("error");
374 });
375
376 test("forking during onDone", () {
377 stream.listen(null, onDone: expectAsync(() {
378 expect(stream.fork, throwsStateError);
379 }));
380
381 controller.close();
382 });
383
384 test("canceling a fork during onDone", () {
385 var fork = stream.fork();
386 var forkSub = fork.listen(null, onDone: expectAsync(() {}, count: 0));
387
388 stream.listen(null, onDone: expectAsync(() => forkSub.cancel()));
389 controller.close();
390 });
391 });
392
393 group("throws an error when", () {
394 test("a cancelled stream is forked", () {
395 stream.listen(null).cancel();
396 expect(stream.fork, throwsStateError);
397 });
398
399 test("a cancelled stream is forked even when other forks are alive", () {
400 stream.fork().listen(null);
401 stream.listen(null).cancel();
402
403 expect(controller.hasListener, isTrue);
404 expect(stream.fork, throwsStateError);
405 });
406
407 test("a closed stream is forked", () async {
408 controller.close();
409 await stream.listen(null).asFuture();
410 expect(stream.fork, throwsStateError);
411 });
412 });
413 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698