OLD | NEW |
1 // Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 // Test the basic StreamController and StreamController.singleSubscription. | 5 // Test the basic StreamController and StreamController.singleSubscription. |
6 library stream_controller_async_test; | 6 library stream_controller_async_test; |
7 | 7 |
8 import "package:expect/expect.dart"; | 8 import "package:expect/expect.dart"; |
9 import 'dart:async'; | 9 import 'dart:async'; |
10 import 'dart:isolate'; | 10 import 'dart:isolate'; |
11 import '../../../pkg/unittest/lib/unittest.dart'; | 11 import '../../../pkg/unittest/lib/unittest.dart'; |
12 import 'event_helper.dart'; | 12 import 'event_helper.dart'; |
13 import 'stream_state_helper.dart'; | 13 import 'stream_state_helper.dart'; |
14 | 14 |
| 15 void cancelSub(StreamSubscription sub) { sub.cancel(); } |
| 16 |
15 testController() { | 17 testController() { |
16 // Test fold | 18 // Test fold |
17 test("StreamController.fold", () { | 19 test("StreamController.fold", () { |
18 StreamController c = new StreamController(); | 20 StreamController c = new StreamController(); |
19 Stream stream = c.stream.asBroadcastStream(); | 21 Stream stream = c.stream.asBroadcastStream(onCancel: cancelSub); |
20 stream.fold(0, (a,b) => a + b) | 22 stream.fold(0, (a,b) => a + b) |
21 .then(expectAsync1((int v) { | 23 .then(expectAsync1((int v) { |
22 Expect.equals(42, v); | 24 Expect.equals(42, v); |
23 })); | 25 })); |
24 c.add(10); | 26 c.add(10); |
25 c.add(32); | 27 c.add(32); |
26 c.close(); | 28 c.close(); |
27 }); | 29 }); |
28 | 30 |
29 test("StreamController.fold throws", () { | 31 test("StreamController.fold throws", () { |
30 StreamController c = new StreamController(); | 32 StreamController c = new StreamController(); |
31 Stream stream = c.stream.asBroadcastStream(); | 33 Stream stream = c.stream.asBroadcastStream(onCancel: cancelSub); |
32 stream.fold(0, (a,b) { throw "Fnyf!"; }) | 34 stream.fold(0, (a,b) { throw "Fnyf!"; }) |
33 .catchError(expectAsync1((error) { Expect.equals("Fnyf!", error); })); | 35 .catchError(expectAsync1((error) { Expect.equals("Fnyf!", error); })); |
34 c.add(42); | 36 c.add(42); |
35 }); | 37 }); |
36 } | 38 } |
37 | 39 |
38 testSingleController() { | 40 testSingleController() { |
39 test("Single-subscription StreamController.fold", () { | 41 test("Single-subscription StreamController.fold", () { |
40 StreamController c = new StreamController(); | 42 StreamController c = new StreamController(); |
41 Stream stream = c.stream; | 43 Stream stream = c.stream; |
(...skipping 218 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
260 Future f = c.stream.drain(); | 262 Future f = c.stream.drain(); |
261 f.catchError(expectAsync1((error) { Expect.equals("error", error); })); | 263 f.catchError(expectAsync1((error) { Expect.equals("error", error); })); |
262 Events errorEvents = new Events()..error("error")..error("error2")..close(); | 264 Events errorEvents = new Events()..error("error")..error("error2")..close(); |
263 errorEvents.replay(c); | 265 errorEvents.replay(c); |
264 }); | 266 }); |
265 | 267 |
266 } | 268 } |
267 | 269 |
268 testPause() { | 270 testPause() { |
269 test("pause event-unpause", () { | 271 test("pause event-unpause", () { |
| 272 |
270 StreamProtocolTest test = new StreamProtocolTest(); | 273 StreamProtocolTest test = new StreamProtocolTest(); |
271 Completer completer = new Completer(); | 274 Completer completer = new Completer(); |
272 test..expectListen() | 275 test..expectListen() |
273 ..expectData(42, () { test.pause(completer.future); }) | 276 ..expectData(42, () { test.pause(completer.future); }) |
274 ..expectPause(() { | 277 ..expectPause(() { |
275 completer.complete(null); | 278 completer.complete(null); |
276 }) | 279 }) |
277 ..expectData(43) | 280 ..expectData(43) |
278 ..expectData(44) | 281 ..expectData(44) |
279 ..expectDone() | 282 ..expectDone() |
280 ..expectCancel(); | 283 ..expectCancel(test.terminate); |
281 test.listen(); | 284 test.listen(); |
282 test.add(42); | 285 test.add(42); |
283 test.add(43); | 286 test.add(43); |
284 test.add(44); | 287 test.add(44); |
285 test.close(); | 288 test.close(); |
286 }); | 289 }); |
287 | 290 |
288 test("pause twice event-unpause", () { | 291 test("pause twice event-unpause", () { |
289 StreamProtocolTest test = new StreamProtocolTest(); | 292 StreamProtocolTest test = new StreamProtocolTest(); |
290 Completer completer = new Completer(); | 293 Completer completer = new Completer(); |
291 Completer completer2 = new Completer(); | 294 Completer completer2 = new Completer(); |
292 test..expectListen() | 295 test..expectListen() |
293 ..expectData(42, () { | 296 ..expectData(42, () { |
294 test.pause(completer.future); | 297 test.pause(completer.future); |
295 test.pause(completer2.future); | 298 test.pause(completer2.future); |
296 }) | 299 }) |
297 ..expectPause(() { | 300 ..expectPause(() { |
298 completer.future.then(completer2.complete); | 301 completer.future.then(completer2.complete); |
299 completer.complete(null); | 302 completer.complete(null); |
300 }) | 303 }) |
301 ..expectData(43) | 304 ..expectData(43) |
302 ..expectData(44) | 305 ..expectData(44) |
303 ..expectDone() | 306 ..expectDone() |
304 ..expectCancel(); | 307 ..expectCancel(test.terminate); |
305 test..listen() | 308 test..listen() |
306 ..add(42) | 309 ..add(42) |
307 ..add(43) | 310 ..add(43) |
308 ..add(44) | 311 ..add(44) |
309 ..close(); | 312 ..close(); |
310 }); | 313 }); |
311 | 314 |
312 test("pause twice direct-unpause", () { | 315 test("pause twice direct-unpause", () { |
313 StreamProtocolTest test = new StreamProtocolTest(); | 316 StreamProtocolTest test = new StreamProtocolTest(); |
314 test..expectListen() | 317 test..expectListen() |
315 ..expectData(42, () { | 318 ..expectData(42, () { |
316 test.pause(); | 319 test.pause(); |
317 test.pause(); | 320 test.pause(); |
318 }) | 321 }) |
319 ..expectPause(() { | 322 ..expectPause(() { |
320 test.resume(); | 323 test.resume(); |
321 test.resume(); | 324 test.resume(); |
322 }) | 325 }) |
323 ..expectData(43) | 326 ..expectData(43) |
324 ..expectData(44) | 327 ..expectData(44) |
325 ..expectDone() | 328 ..expectDone() |
326 ..expectCancel(); | 329 ..expectCancel(test.terminate); |
327 test..listen() | 330 test..listen() |
328 ..add(42) | 331 ..add(42) |
329 ..add(43) | 332 ..add(43) |
330 ..add(44) | 333 ..add(44) |
331 ..close(); | 334 ..close(); |
332 }); | 335 }); |
333 | 336 |
334 test("pause twice direct-event-unpause", () { | 337 test("pause twice direct-event-unpause", () { |
335 StreamProtocolTest test = new StreamProtocolTest(); | 338 StreamProtocolTest test = new StreamProtocolTest(); |
336 Completer completer = new Completer(); | 339 Completer completer = new Completer(); |
337 test..expectListen() | 340 test..expectListen() |
338 ..expectData(42, () { | 341 ..expectData(42, () { |
339 test.pause(); | 342 test.pause(); |
340 test.pause(completer.future); | 343 test.pause(completer.future); |
341 test.add(43); | 344 test.add(43); |
342 test.add(44); | 345 test.add(44); |
343 test.close(); | 346 test.close(); |
344 }) | 347 }) |
345 ..expectPause(() { | 348 ..expectPause(() { |
346 completer.future.then((v) => test.resume()); | 349 completer.future.then((v) => test.resume()); |
347 completer.complete(null); | 350 completer.complete(null); |
348 }) | 351 }) |
349 ..expectData(43) | 352 ..expectData(43) |
350 ..expectData(44) | 353 ..expectData(44) |
351 ..expectDone() | 354 ..expectDone() |
352 ..expectCancel(); | 355 ..expectCancel(test.terminate); |
353 test..listen() | 356 test..listen() |
354 ..add(42); | 357 ..add(42); |
355 }); | 358 }); |
356 } | 359 } |
357 | 360 |
358 class TestError { const TestError(); } | 361 class TestError { const TestError(); } |
359 | 362 |
360 testRethrow() { | 363 testRethrow() { |
361 TestError error = const TestError(); | 364 TestError error = const TestError(); |
362 | 365 |
363 | |
364 testStream(name, streamValueTransform) { | 366 testStream(name, streamValueTransform) { |
365 test("rethrow-$name-value", () { | 367 test("rethrow-$name-value", () { |
366 StreamController c = new StreamController(); | 368 StreamController c = new StreamController(); |
367 Stream s = streamValueTransform(c.stream, (v) { throw error; }); | 369 Stream s = streamValueTransform(c.stream, (v) { throw error; }); |
368 s.listen((_) { Expect.fail("unexpected value"); }, onError: expectAsync1( | 370 s.listen((_) { Expect.fail("unexpected value"); }, onError: expectAsync1( |
369 (e) { Expect.identical(error, e); })); | 371 (e) { Expect.identical(error, e); })); |
370 c.add(null); | 372 c.add(null); |
371 c.close(); | 373 c.close(); |
372 }); | 374 }); |
373 } | 375 } |
(...skipping 117 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
491 test.add(87); | 493 test.add(87); |
492 sub.cancel(); | 494 sub.cancel(); |
493 }); | 495 }); |
494 test.expectCancel(() { | 496 test.expectCancel(() { |
495 test.add(37); | 497 test.add(37); |
496 test.terminate(); | 498 test.terminate(); |
497 }); | 499 }); |
498 }); | 500 }); |
499 } | 501 } |
500 | 502 |
| 503 void testAsBroadcast() { |
| 504 test("asBroadcast-not-canceled", () { |
| 505 StreamProtocolTest test = new StreamProtocolTest.asBroadcast(); |
| 506 var sub; |
| 507 test..expectListen() |
| 508 ..expectBroadcastListen((_) { |
| 509 test.add(42); |
| 510 }) |
| 511 ..expectData(42, () { |
| 512 sub.cancel(); |
| 513 }) |
| 514 ..expectBroadcastCancel((_) { |
| 515 sub = test.listen(); |
| 516 }) |
| 517 ..expectBroadcastListen((_) { |
| 518 test.terminate(); |
| 519 }); |
| 520 sub = test.listen(); |
| 521 }); |
| 522 |
| 523 test("asBroadcast-canceled", () { |
| 524 StreamProtocolTest test = new StreamProtocolTest.asBroadcast(); |
| 525 var sub; |
| 526 test..expectListen() |
| 527 ..expectBroadcastListen((_) { |
| 528 test.add(42); |
| 529 }) |
| 530 ..expectData(42, () { |
| 531 sub.cancel(); |
| 532 }) |
| 533 ..expectBroadcastCancel((originalSub) { |
| 534 originalSub.cancel(); |
| 535 }) |
| 536 ..expectCancel(test.terminate); |
| 537 sub = test.listen(); |
| 538 }); |
| 539 |
| 540 test("asBroadcast-pause-original", () { |
| 541 StreamProtocolTest test = new StreamProtocolTest.asBroadcast(); |
| 542 var sub; |
| 543 test..expectListen() |
| 544 ..expectBroadcastListen((_) { |
| 545 test.add(42); |
| 546 test.add(43); |
| 547 }) |
| 548 ..expectData(42, () { |
| 549 sub.cancel(); |
| 550 }) |
| 551 ..expectBroadcastCancel((originalSub) { |
| 552 originalSub.pause(); // Pause before sending 43 from original sub. |
| 553 }) |
| 554 ..expectPause(() { |
| 555 sub = test.listen(); |
| 556 }) |
| 557 ..expectBroadcastListen((originalSub) { |
| 558 originalSub.resume(); |
| 559 }) |
| 560 ..expectData(43) |
| 561 ..expectResume(() { |
| 562 test.close(); |
| 563 }) |
| 564 ..expectDone() |
| 565 ..expectBroadcastCancel() |
| 566 ..expectCancel(test.terminate); |
| 567 sub = test.listen(); |
| 568 }); |
| 569 } |
| 570 |
501 main() { | 571 main() { |
502 testController(); | 572 testController(); |
503 testSingleController(); | 573 testSingleController(); |
504 testExtraMethods(); | 574 testExtraMethods(); |
505 testPause(); | 575 testPause(); |
506 testRethrow(); | 576 testRethrow(); |
507 testBroadcastController(); | 577 testBroadcastController(); |
| 578 testAsBroadcast(); |
508 } | 579 } |
OLD | NEW |