Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(147)

Side by Side Diff: test/stream_events_test.dart

Issue 1149563010: Add new features to package:async. (Closed) Base URL: https://github.com/dart-lang/async@master
Patch Set: Address remaining comments. Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE filevents.
4
5 import "dart:async";
6
7 import "package:async/async.dart" show StreamEvents;
8 import "package:test/test.dart";
9
10 main() {
11 group("source stream", () {
12 test("is listened to on first request, paused between requests", () async {
13 var controller = new StreamController();
14 var events = new StreamEvents<int>(controller.stream);
15 await flushMicrotasks();
16 expect(controller.hasListener, isFalse);
17 var next = events.next;
18 expect(controller.hasListener, isTrue);
19 expect(controller.isPaused, isFalse);
20 controller.add(1);
21 expect(await next, 1);
22 expect(controller.hasListener, isTrue);
23 expect(controller.isPaused, isTrue);
24 next = events.next;
25 expect(controller.hasListener, isTrue);
26 expect(controller.isPaused, isFalse);
27 controller.add(2);
28 expect(await next, 2);
29 expect(controller.hasListener, isTrue);
30 expect(controller.isPaused, isTrue);
31 var cancel = events.cancel();
32 expect(controller.hasListener, isFalse);
33 });
34 });
35
36 group("next operation", () {
37 test("simple sequence of requests", () async {
38 var events = new StreamEvents<int>(createStream());
39 for (int i = 1; i <= 4; i++) {
40 expect(await events.next, i);
41 }
42 expect(events.next, throwsStateError);
43 });
44
45 test("multiple requests at the same time", () async {
46 var events = new StreamEvents<int>(createStream());
47 var result = await Future.wait(
48 [events.next, events.next, events.next, events.next]);
49 expect(result, [1, 2, 3, 4]);
50 await events.cancel();
51 });
52
53 test("sequence of requests with error", () async {
54 var events = new StreamEvents<int>(createErrorStream());
55 expect(await events.next, 1);
56 expect(await events.next, 2);
57 expect(events.next, throws);
58 expect(await events.next, 4);
59 await events.cancel();
60 });
61 });
62
63 group("skip operation", () {
64 test("of two elements in the middle of sequence", () async {
65 var events = new StreamEvents<int>(createStream());
66 expect(await events.next, 1);
67 expect(await events.skip(2), 0);
68 expect(await events.next, 4);
69 await events.cancel();
70 });
71
72 test("with negative/bad arguments throws", () async {
73 var events = new StreamEvents<int>(createStream());
74 expect(() => events.skip(-1), throwsArgumentError);
75 // A non-int throws either a type error or an argument error,
76 // depending on whether it's checked mode or not.
77 expect(await events.next, 1); // Did not consume event.
78 expect(() => events.skip(-1), throwsArgumentError);
79 expect(await events.next, 2); // Did not consume event.
80 await events.cancel();
81 });
82
83 test("of 0 elements works", () async {
84 var events = new StreamEvents<int>(createStream());
85 expect(events.skip(0), completion(0));
86 expect(events.next, completion(1));
87 expect(events.skip(0), completion(0));
88 expect(events.next, completion(2));
89 expect(events.skip(0), completion(0));
90 expect(events.next, completion(3));
91 expect(events.skip(0), completion(0));
92 expect(events.next, completion(4));
93 expect(events.skip(0), completion(0));
94 expect(events.skip(5), completion(5));
95 expect(events.next, throwsStateError);
96 await events.cancel();
97 });
98
99 test("of too many events ends at stream start", () async {
100 var events = new StreamEvents<int>(createStream());
101 expect(await events.skip(6), 2);
102 await events.cancel();
103 });
104
105 test("of too many events after some events", () async {
106 var events = new StreamEvents<int>(createStream());
107 expect(await events.next, 1);
108 expect(await events.next, 2);
109 expect(await events.skip(6), 4);
110 await events.cancel();
111 });
112
113 test("of too many events ends at stream end", () async {
114 var events = new StreamEvents<int>(createStream());
115 expect(await events.next, 1);
116 expect(await events.next, 2);
117 expect(await events.next, 3);
118 expect(await events.next, 4);
119 expect(await events.skip(2), 2);
120 await events.cancel();
121 });
122
123 test("of events with error", () async {
124 var events = new StreamEvents<int>(createErrorStream());
125 expect(events.skip(4), throws);
126 expect(await events.next, 4);
127 await events.cancel();
128 });
129
130 test("of events with error, and skip again after", () async {
131 var events = new StreamEvents<int>(createErrorStream());
132 expect(events.skip(4), throws);
133 expect(events.skip(2), completion(1));
134 await events.cancel();
135 });
136
137 test("multiple skips at same time complete in order.", () async {
138 var events = new StreamEvents<int>(createStream());
139 var skip1 = events.skip(1);
140 var skip2 = events.skip(0);
141 var skip3 = events.skip(4);
142 var skip4 = events.skip(1);
143 var index = 0;
144 // Check that futures complete in order.
145 sequence(expectedValue, sequenceIndex) => (v) {
146 expect(v, expectedValue);
147 expect(index, sequenceIndex);
148 index++;
149 }
150 await Future.wait([skip1.then(sequence(0, 0)),
151 skip2.then(sequence(0, 1)),
152 skip3.then(sequence(1, 2)),
153 skip4.then(sequence(1, 3))]);
154 // Complete when they are all done.
155 await Future.wait([skip1, skip2, skip3, skip4]);
156 await events.cancel();
157 });
158 });
159
160 group("take operation", () {
161 test("as simple take of events", () async {
162 var events = new StreamEvents<int>(createStream());
163 expect(await events.next, 1);
164 expect(await events.take(2), [2, 3]);
165 expect(await events.next, 4);
166 await events.cancel();
167 });
168
169 test("of 0 events", () async {
170 var events = new StreamEvents<int>(createStream());
171 expect(events.take(0), completion([]));
172 expect(events.next, completion(1));
173 expect(events.take(0), completion([]));
174 expect(events.next, completion(2));
175 expect(events.take(0), completion([]));
176 expect(events.next, completion(3));
177 expect(events.take(0), completion([]));
178 expect(events.next, completion(4));
179 expect(events.take(0), completion([]));
180 expect(events.take(5), completion([]));
181 expect(events.next, throwsStateError);
182 await events.cancel();
183 });
184
185 test("with bad arguments throws", () async {
186 var events = new StreamEvents<int>(createStream());
187 expect(() => events.take(-1), throwsArgumentError);
188 expect(await events.next, 1); // Did not consume event.
189 expect(() => events.take(-1), throwsArgumentError);
190 expect(await events.next, 2); // Did not consume event.
191 await events.cancel();
192 });
193
194 test("of too many arguments", () async {
195 var events = new StreamEvents<int>(createStream());
196 expect(await events.take(6), [1, 2, 3, 4]);
197 await events.cancel();
198 });
199
200 test("too large later", () async {
201 var events = new StreamEvents<int>(createStream());
202 expect(await events.next, 1);
203 expect(await events.next, 2);
204 expect(await events.take(6), [3, 4]);
205 await events.cancel();
206 });
207
208 test("error", () async {
209 var events = new StreamEvents<int>(createErrorStream());
210 expect(events.take(4), throws);
211 expect(await events.next, 4);
212 await events.cancel();
213 });
214 });
215
216 group("rest operation", () {
217 test("after single next", () async {
218 var events = new StreamEvents<int>(createStream());
219 expect(await events.next, 1);
220 expect(await events.rest.toList(), [2, 3, 4]);
221 });
222
223 test("at start", () async {
224 var events = new StreamEvents<int>(createStream());
225 expect(await events.rest.toList(), [1, 2, 3, 4]);
226 });
227
228 test("at end", () async {
229 var events = new StreamEvents<int>(createStream());
230 expect(await events.next, 1);
231 expect(await events.next, 2);
232 expect(await events.next, 3);
233 expect(await events.next, 4);
234 expect(await events.rest.toList(), []);
235 });
236
237 test("after end", () async {
238 var events = new StreamEvents<int>(createStream());
239 expect(await events.next, 1);
240 expect(await events.next, 2);
241 expect(await events.next, 3);
242 expect(await events.next, 4);
243 expect(events.next, throwsStateError);
244 expect(await events.rest.toList(), []);
245 });
246
247 test("with an error event error", () async {
248 var events = new StreamEvents<int>(createErrorStream());
249 expect(await events.next, 1);
250 var rest = events.rest;
251 var events2 = new StreamEvents(rest);
252 expect(await events2.next, 2);
253 expect(events2.next, throws);
254 expect(await events2.next, 4);
255 });
256
257 test("closes the events, prevents other operations", () async {
258 var events = new StreamEvents<int>(createStream());
259 var stream = events.rest;
260 expect(() => events.next, throwsStateError);
261 expect(() => events.skip(1), throwsStateError);
262 expect(() => events.take(1), throwsStateError);
263 expect(() => events.rest, throwsStateError);
264 expect(() => events.cancel(), throwsStateError);
265 });
266
267 test("forwards to underlying stream", () async {
268 var cancel = new Completer();
269 var controller = new StreamController(onCancel: () => cancel.future);
270 var events = new StreamEvents<int>(controller.stream);
271 expect(controller.hasListener, isFalse);
272 var next = events.next;
273 expect(controller.hasListener, isTrue);
274 expect(controller.isPaused, isFalse);
275 controller.add(1);
276 expect(await next, 1);
277 expect(controller.isPaused, isTrue);
278 var rest = events.rest;
279 var subscription = rest.listen(null);
280 expect(controller.hasListener, isTrue);
281 expect(controller.isPaused, isFalse);
282 var lastEvent;
283 subscription.onData((v) { lastEvent = v; });
284 controller.add(2);
285 await flushMicrotasks();
286 expect(lastEvent, 2);
287 expect(controller.hasListener, isTrue);
288 expect(controller.isPaused, isFalse);
289 subscription.pause();
290 expect(controller.isPaused, isTrue);
291 controller.add(3);
292 await flushMicrotasks();
293 expect(lastEvent, 2);
294 subscription.resume();
295 await flushMicrotasks();
296 expect(lastEvent, 3);
297 var cancelFuture = subscription.cancel();
298 expect(controller.hasListener, isFalse);
299 cancel.complete(42);
300 expect(cancelFuture, completion(42));
301 });
302 });
303
304 group("close operation", () {
305 test("closes the events, prevents any other operation", () async {
306 var events = new StreamEvents<int>(createStream());
307 await events.cancel();
308 expect(() => events.next, throwsStateError);
309 expect(() => events.skip(1), throwsStateError);
310 expect(() => events.take(1), throwsStateError);
311 expect(() => events.rest, throwsStateError);
312 expect(() => events.cancel(), throwsStateError);
313 });
314
315 test("cancels underlying subscription, returns result", () async {
316 var cancelFuture = new Future.value(42);
317 var controller = new StreamController(onCancel: () => cancelFuture);
318 var events = new StreamEvents<int>(controller.stream);
319 controller.add(1);
320 expect(await events.next, 1);
321 expect(await events.cancel(), 42);
322 });
323 });
324
325 test("all combinations sequential skip/next/take operations", () async {
326 // Takes all combinations of two of next, skip and take, then ends with
327 // doing rest. Each of the first rounds do 10 events of each type,
328 // the rest does 20 elements.
329 var eventCount = 20 * (3 * 3 + 1);
330 var events = new StreamEvents<int>(createLongStream(eventCount));
331
332 // Test expecting [startIndex .. startIndex + 9] as events using
333 // `next`.
334 nextTest(startIndex) {
335 for (int i = 0; i < 10; i++) {
336 expect(events.next, completion(startIndex + i));
337 }
338 }
339
340 // Test expecting 10 events to be skipped.
341 skipTest(startIndex) {
342 expect(events.skip(10), completion(0));
343 }
344
345 // Test expecting [startIndex .. startIndex + 9] as events using
346 // `take(10)`.
347 takeTest(startIndex) {
348 expect(events.take(10),
349 completion(new List.generate(10, (i) => startIndex + i)));
350 }
351 var tests = [nextTest, skipTest, takeTest];
352
353 int counter = 0;
354 // Run through all pairs of two tests and run them.
355 for (int i = 0; i < tests.length; i++) {
356 for (int j = 0; j < tests.length; j++) {
357 tests[i](counter);
358 tests[j](counter + 10);
359 counter += 20;
360 }
361 }
362 // Then expect 20 more events as a `rest` call.
363 expect(events.rest.toList(),
364 completion(new List.generate(20, (i) => counter + i)));
365 });
366 }
367
368 Future flushMicrotasks() => new Future.delayed(Duration.ZERO);
369
370 Stream<int> createStream() async* {
371 yield 1;
372 await flushMicrotasks();
373 yield 2;
374 await flushMicrotasks();
375 yield 3;
376 await flushMicrotasks();
377 yield 4;
378 }
379
380 Stream<int> createErrorStream() {
381 StreamController controller = new StreamController<int>();
382 () async {
383 controller.add(1);
384 await flushMicrotasks();
385 controller.add(2);
386 await flushMicrotasks();
387 controller.addError("To err is divine!");
388 await flushMicrotasks();
389 controller.add(4);
390 await flushMicrotasks();
391 controller.close();
392 }();
393 return controller.stream;
394 }
395
396 Stream<int> createLongStream(int eventCount) async* {
397 for (int i = 0; i < eventCount; i++) yield i;
398 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698