Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(244)

Side by Side Diff: test/stream_completer_test.dart

Issue 1149563010: Add new features to package:async. (Closed) Base URL: https://github.com/dart-lang/async@master
Patch Set: Address comments. Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « pubspec.yaml ('k') | test/stream_queue_test.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 import "dart:async";
6
7 import "package:async/async.dart" show StreamCompleter;
8 import "package:test/test.dart";
9
10 import "utils.dart";
11
12 main() {
13 test("a stream is linked before listening", () async {
14 var completer = new StreamCompleter();
15 completer.setSourceStream(createStream());
16 expect(completer.stream.toList(), completion([1, 2, 3, 4]));
17 });
18
19 test("listened to before a stream is linked", () async {
20 var completer = new StreamCompleter();
21 var done = completer.stream.toList();
22 await flushMicrotasks();
23 completer.setSourceStream(createStream());
24 expect(done, completion([1, 2, 3, 4]));
25 });
26
27 test("cancel before linking a stream doesn't listen on stream", () async {
28 var completer = new StreamCompleter();
29 var subscription = completer.stream.listen(null);
30 subscription.pause(); // Should be ignored.
31 subscription.cancel();
32 completer.setSourceStream(new UnusableStream()); // Doesn't throw.
33 });
34
35 test("listen and pause before linking stream", () async {
36 var controller = new StreamCompleter();
37 var events = [];
38 var subscription = controller.stream.listen(events.add);
39 var done = subscription.asFuture();
40 subscription.pause();
41 var sourceController = new StreamController();
42 sourceController..add(1)..add(2)..add(3)..add(4);
43 controller.setSourceStream(sourceController.stream);
44 await flushMicrotasks();
45 expect(sourceController.hasListener, isTrue);
46 expect(sourceController.isPaused, isTrue);
47 expect(events, []);
48 subscription.resume();
49 await flushMicrotasks();
50 expect(sourceController.hasListener, isTrue);
51 expect(sourceController.isPaused, isFalse);
52 expect(events, [1, 2, 3, 4]);
53 sourceController.close();
54 await done;
55 expect(events, [1, 2, 3, 4]);
56 });
57
58 test("pause more than once", () async {
59 var completer = new StreamCompleter();
60 var events = [];
61 var subscription = completer.stream.listen(events.add);
62 var done = subscription.asFuture();
63 subscription.pause();
64 subscription.pause();
65 subscription.pause();
66 completer.setSourceStream(createStream());
67 for (int i = 0; i < 3; i++) {
68 await flushMicrotasks();
69 expect(events, []);
70 subscription.resume();
71 }
72 await done;
73 expect(events, [1, 2, 3, 4]);
74 });
75
76 test("cancel new stream before source is done", () async {
77 var completer = new StreamCompleter();
78 var listened = false;
79 var lastEvent = -1;
80 var controller = new StreamController();
81 var subscription;
82 subscription = completer.stream.listen(
83 (value) {
84 expect(value, lessThan(3));
85 lastEvent = value;
86 if (value == 2) {
87 subscription.cancel();
88 }
89 },
90 onError: unreachable("error"),
91 onDone: unreachable("done"),
92 cancelOnError: true);
93 completer.setSourceStream(controller.stream);
94 expect(controller.hasListener, isTrue);
95
96 await flushMicrotasks();
97 expect(controller.hasListener, isTrue);
98 controller.add(1);
99
100 await flushMicrotasks();
101 expect(lastEvent, 1);
102 expect(controller.hasListener, isTrue);
103 controller.add(2);
104
105 await flushMicrotasks();
106 expect(lastEvent, 2);
107 expect(controller.hasListener, isFalse);
108 });
109
110 test("complete with setEmpty before listening", () async {
111 var completer = new StreamCompleter();
112 completer.setEmpty();
113 var done = new Completer();
114 completer.stream.listen(
115 unreachable("data"),
116 onError: unreachable("error"),
117 onDone: done.complete);
118 await done.future;
119 });
120
121 test("complete with setEmpty after listening", () async {
122 var completer = new StreamCompleter();
123 var done = new Completer();
124 completer.stream.listen(
125 unreachable("data"),
126 onError: unreachable("error"),
127 onDone: done.complete);
128 completer.setEmpty();
129 await done.future;
130 });
131
132 test("source stream isn't listened to until completer stream is", () async {
133 var completer = new StreamCompleter();
134 var controller;
135 controller = new StreamController(onListen: () {
136 scheduleMicrotask(controller.close);
137 });
138
139 completer.setSourceStream(controller.stream);
140 await flushMicrotasks();
141 expect(controller.hasListener, isFalse);
142 var subscription = completer.stream.listen(null);
143 expect(controller.hasListener, isTrue);
144 await subscription.asFuture();
145 });
146
147 test("cancelOnError true when listening before linking stream", () async {
148 var completer = new StreamCompleter();
149 var listened = false;
150 var canceled = false;
151 var lastEvent = -1;
152 var controller = new StreamController();
153 var subscription = completer.stream.listen(
154 (value) {
155 expect(value, lessThan(3));
156 lastEvent = value;
157 },
158 onError: (value) {
159 expect(value, "3");
160 lastEvent = value;
161 },
162 onDone: unreachable("done"),
163 cancelOnError: true);
164 completer.setSourceStream(controller.stream);
165 expect(controller.hasListener, isTrue);
166
167 await flushMicrotasks();
168 expect(controller.hasListener, isTrue);
169 controller.add(1);
170
171 await flushMicrotasks();
172 expect(lastEvent, 1);
173 expect(controller.hasListener, isTrue);
174 controller.add(2);
175
176 await flushMicrotasks();
177 expect(lastEvent, 2);
178 expect(controller.hasListener, isTrue);
179 controller.addError("3");
180
181 await flushMicrotasks();
182 expect(lastEvent, "3");
183 expect(controller.hasListener, isFalse);
184 });
185
186 test("cancelOnError true when listening after linking stream", () async {
187 var completer = new StreamCompleter();
188 var lastEvent = -1;
189 var controller = new StreamController();
190 completer.setSourceStream(controller.stream);
191 controller.add(1);
192 expect(controller.hasListener, isFalse);
193
194 var subscription = completer.stream.listen(
195 (value) {
196 expect(value, lessThan(3));
197 lastEvent = value;
198 },
199 onError: (value) {
200 expect(value, "3");
201 lastEvent = value;
202 },
203 onDone: unreachable("done"),
204 cancelOnError: true);
205
206 expect(controller.hasListener, isTrue);
207
208 await flushMicrotasks();
209 expect(lastEvent, 1);
210 expect(controller.hasListener, isTrue);
211 controller.add(2);
212
213 await flushMicrotasks();
214 expect(lastEvent, 2);
215 expect(controller.hasListener, isTrue);
216 controller.addError("3");
217
218 await flushMicrotasks();
219 expect(controller.hasListener, isFalse);
220 });
221
222 test("linking a stream after setSourceStream before listen", () async {
223 var completer = new StreamCompleter();
224 completer.setSourceStream(createStream());
225 expect(() => completer.setSourceStream(createStream()), throws);
226 expect(() => completer.setEmpty(createStream()), throws);
227 await completer.stream.toList();
228 // Still fails after source is done
229 expect(() => completer.setSourceStream(createStream()), throws);
230 expect(() => completer.setEmpty(createStream()), throws);
231 });
232
233 test("linking a stream after setSourceStream after listen", () async {
234 var completer = new StreamCompleter();
235 var list = completer.stream.toList();
236 completer.setSourceStream(createStream());
237 expect(() => completer.setSoureStream(createStream()), throws);
238 expect(() => completer.stEmpty(createStream()), throws);
239 await list;
240 // Still fails after source is done.
241 expect(() => completer.setSoureStream(createStream()), throws);
242 expect(() => completer.stEmpty(createStream()), throws);
243 });
244
245 test("linking a stream after setEmpty before listen", () async {
246 var completer = new StreamCompleter();
247 completer.setEmpty();
248 expect(() => completer.setSoureStream(createStream()), throws);
249 expect(() => completer.stEmpty(createStream()), throws);
250 await completer.stream.toList();
251 // Still fails after source is done
252 expect(() => completer.setSoureStream(createStream()), throws);
253 expect(() => completer.stEmpty(createStream()), throws);
254 });
255
256 test("linking a stream after setEmpty() after listen", () async {
257 var completer = new StreamCompleter();
258 var list = completer.stream.toList();
259 completer.setEmpty();
260 expect(() => completer.setSoureStream(createStream()), throws);
261 expect(() => completer.stEmpty(createStream()), throws);
262 await list;
263 // Still fails after source is done.
264 expect(() => completer.setSoureStream(createStream()), throws);
265 expect(() => completer.stEmpty(createStream()), throws);
266 });
267
268 test("listening more than once after setting stream", () async {
269 var completer = new StreamCompleter();
270 completer.setSourceStream(createStream());
271 var list = completer.stream.toList();
272 expect(() => completer.stream.oList(), throws);
273 await list;
274 expect(() => completer.stream.oList(), throws);
275 });
276
277 test("listening more than once before setting stream", () async {
278 var completer = new StreamCompleter();
279 var list = completer.stream.toList();
280 expect(() => completer.stream.oList(), throws);
281 });
282
283 test("setting onData etc. before and after setting stream", () async {
284 var completer = new StreamCompleter();
285 var controller = new StreamController();
286 var subscription = completer.stream.listen(null);
287 var lastEvent = 0;
288 subscription.onData((value) => lastEvent = value);
289 subscription.onError((value) => lastEvent = "$value");
290 subscription.onDone(() => lastEvent = -1);
291 completer.setSourceStream(controller.stream);
292 await flushMicrotasks();
293 controller.add(1);
294 await flushMicrotasks();
295 expect(lastEvent, 1);
296 controller.addError(2);
297 await flushMicrotasks();
298 expect(lastEvent, "2");
299 subscription.onData((value) => lastEvent = -value);
300 subscription.onError((value) => lastEvent = "${-value}");
301 controller.add(1);
302 await flushMicrotasks();
303 expect(lastEvent, -1);
304 controller.addError(2);
305 await flushMicrotasks();
306 expect(lastEvent, "-2");
307 controller.close();
308 await flushMicrotasks();
309 expect(lastEvent, -1);
310 });
311
312 test("pause w/ resume future accross setting stream", () async {
313 var completer = new StreamCompleter();
314 var resume = new Completer();
315 var subscription = completer.stream.listen(unreachable("data"));
316 var lastEvent = 0;
317 subscription.pause(resume.future);
318 await flushMicrotasks();
319 completer.setSourceStream(createStream());
320 await flushMicrotasks();
321 resume.complete();
322 var events = [];
323 subscription.onData(events.add);
324 await subscription.asFuture();
325 expect(events, [1, 2, 3, 4]);
326 });
327
328 test("asFuture with error accross setting stream", () async {
329 var completer = new StreamCompleter();
330 var controller = new StreamController();
331 var subscription = completer.stream.listen(unreachable("data"),
332 cancelOnError: false);
333 var done = subscription.asFuture();
334 expect(controller.hasListener, isFalse);
335 completer.setSourceStream(controller.stream);
336 await flushMicrotasks();
337 expect(controller.hasListener, isTrue);
338 controller.addError(42);
339 await done.then(unreachable("data"), onError: (error) {
340 expect(error, 42);
341 });
342 expect(controller.hasListener, isFalse);
343 });
344 }
345
346 Stream<int> createStream() async* {
347 yield 1;
348 await flushMicrotasks();
349 yield 2;
350 await flushMicrotasks();
351 yield 3;
352 await flushMicrotasks();
353 yield 4;
354 }
355
356 /// A zero-millisecond timer should wait until after all microtasks.
357 Future flushMicrotasks() => new Future.delayed(Duration.ZERO);
358
359 /// A generic unreachable callback function.
360 ///
361 /// Returns a function that fails the test if it is ever called.
362 unreachable(String name) => ([a, b]) => fail("Unreachable: $name");
363
364 /// A badly behaved stream which throws if it's ever listened to.
365 ///
366 /// Can be used to test cases where a stream should not be used.
367 class UnusableStream extends Stream {
368 listen(onData, {onError, onDone, cancelOnError}) {
369 throw new UnimplementedError("Gotcha!");
370 }
371 }
OLDNEW
« no previous file with comments | « pubspec.yaml ('k') | test/stream_queue_test.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698