| OLD | NEW | 
|---|
| 1 // Copyright (c) 2011, the Dart project authors.  Please see the AUTHORS file | 1 // Copyright (c) 2011, the Dart project authors.  Please see the AUTHORS file | 
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a | 
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. | 
| 4 | 4 | 
| 5 // Test the basic StreamController and StreamController.singleSubscription. | 5 // Test the basic StreamController and StreamController.singleSubscription. | 
| 6 library stream_controller_async_test; | 6 library stream_controller_async_test; | 
| 7 | 7 | 
| 8 import "package:expect/expect.dart"; | 8 import "package:expect/expect.dart"; | 
| 9 import 'dart:async'; | 9 import 'dart:async'; | 
| 10 import 'dart:isolate'; | 10 import 'dart:isolate'; | 
| 11 import '../../../pkg/unittest/lib/unittest.dart'; | 11 import '../../../pkg/unittest/lib/unittest.dart'; | 
| 12 import 'event_helper.dart'; | 12 import 'event_helper.dart'; | 
|  | 13 import 'stream_state_helper.dart'; | 
| 13 | 14 | 
| 14 testController() { | 15 testController() { | 
| 15   // Test fold | 16   // Test fold | 
| 16   test("StreamController.fold", () { | 17   test("StreamController.fold", () { | 
| 17     StreamController c = new StreamController(); | 18     StreamController c = new StreamController(); | 
| 18     Stream stream = c.stream.asBroadcastStream(); | 19     Stream stream = c.stream.asBroadcastStream(); | 
| 19     stream.fold(0, (a,b) => a + b) | 20     stream.fold(0, (a,b) => a + b) | 
| 20      .then(expectAsync1((int v) { | 21      .then(expectAsync1((int v) { | 
| 21         Expect.equals(42, v); | 22         Expect.equals(42, v); | 
| 22     })); | 23     })); | 
| (...skipping 236 matching lines...) Expand 10 before | Expand all | Expand 10 after  Loading... | 
| 259     Future f = c.stream.drain(); | 260     Future f = c.stream.drain(); | 
| 260     f.catchError(expectAsync1((error) { Expect.equals("error", error); })); | 261     f.catchError(expectAsync1((error) { Expect.equals("error", error); })); | 
| 261     Events errorEvents = new Events()..error("error")..error("error2")..close(); | 262     Events errorEvents = new Events()..error("error")..error("error2")..close(); | 
| 262     errorEvents.replay(c); | 263     errorEvents.replay(c); | 
| 263   }); | 264   }); | 
| 264 | 265 | 
| 265 } | 266 } | 
| 266 | 267 | 
| 267 testPause() { | 268 testPause() { | 
| 268   test("pause event-unpause", () { | 269   test("pause event-unpause", () { | 
| 269     StreamController c = new StreamController(); | 270     StreamProtocolTest test = new StreamProtocolTest(); | 
| 270     Events actualEvents = new Events.capture(c.stream); |  | 
| 271     Events expectedEvents = new Events(); |  | 
| 272     expectedEvents.add(42); |  | 
| 273     c.add(42); |  | 
| 274     Expect.listEquals(expectedEvents.events, actualEvents.events); |  | 
| 275     Completer completer = new Completer(); | 271     Completer completer = new Completer(); | 
| 276     actualEvents.pause(completer.future); | 272     test..expectListen() | 
| 277     c..add(43)..add(44)..close(); | 273         ..expectData(42, () { test.pause(completer.future); }) | 
| 278     Expect.listEquals(expectedEvents.events, actualEvents.events); | 274         ..expectPause(() { | 
| 279     completer.complete(); | 275             completer.complete(null); | 
| 280     expectedEvents..add(43)..add(44)..close(); | 276           }) | 
| 281     actualEvents.onDone(expectAsync0(() { | 277         ..expectData(43) | 
| 282       Expect.listEquals(expectedEvents.events, actualEvents.events); | 278         ..expectData(44) | 
| 283     })); | 279         ..expectDone() | 
|  | 280         ..expectCancel(); | 
|  | 281     test.listen(); | 
|  | 282     test.add(42); | 
|  | 283     test.add(43); | 
|  | 284     test.add(44); | 
|  | 285     test.close(); | 
| 284   }); | 286   }); | 
| 285 | 287 | 
| 286   test("pause twice event-unpause", () { | 288   test("pause twice event-unpause", () { | 
| 287     StreamController c = new StreamController(); | 289     StreamProtocolTest test = new StreamProtocolTest(); | 
| 288     Events actualEvents = new Events.capture(c.stream); |  | 
| 289     Events expectedEvents = new Events(); |  | 
| 290     expectedEvents.add(42); |  | 
| 291     c.add(42); |  | 
| 292     Expect.listEquals(expectedEvents.events, actualEvents.events); |  | 
| 293     Completer completer = new Completer(); | 290     Completer completer = new Completer(); | 
| 294     Completer completer2 = new Completer(); | 291     Completer completer2 = new Completer(); | 
| 295     actualEvents.pause(completer.future); | 292     test..expectListen() | 
| 296     actualEvents.pause(completer2.future); | 293         ..expectData(42, () { | 
| 297     c..add(43)..add(44)..close(); | 294             test.pause(completer.future); | 
| 298     Expect.listEquals(expectedEvents.events, actualEvents.events); | 295             test.pause(completer2.future); | 
| 299     completer.complete(); | 296           }) | 
| 300     Expect.listEquals(expectedEvents.events, actualEvents.events); | 297         ..expectPause(() { | 
| 301     completer2.complete(); | 298             completer.future.then(completer2.complete); | 
| 302     expectedEvents..add(43)..add(44)..close(); | 299             completer.complete(null); | 
| 303     actualEvents.onDone(expectAsync0((){ | 300           }) | 
| 304       Expect.listEquals(expectedEvents.events, actualEvents.events); | 301         ..expectData(43) | 
| 305     })); | 302         ..expectData(44) | 
|  | 303         ..expectDone() | 
|  | 304         ..expectCancel(); | 
|  | 305     test..listen() | 
|  | 306         ..add(42) | 
|  | 307         ..add(43) | 
|  | 308         ..add(44) | 
|  | 309         ..close(); | 
| 306   }); | 310   }); | 
| 307 | 311 | 
| 308   test("pause twice direct-unpause", () { | 312   test("pause twice direct-unpause", () { | 
| 309     StreamController c = new StreamController(); | 313     StreamProtocolTest test = new StreamProtocolTest(); | 
| 310     Events actualEvents = new Events.capture(c.stream); | 314     test..expectListen() | 
| 311     Events expectedEvents = new Events(); | 315         ..expectData(42, () { | 
| 312     expectedEvents.add(42); | 316             test.pause(); | 
| 313     c.add(42); | 317             test.pause(); | 
| 314     Expect.listEquals(expectedEvents.events, actualEvents.events); | 318           }) | 
| 315     actualEvents.pause(); | 319         ..expectPause(() { | 
| 316     actualEvents.pause(); | 320             test.resume(); | 
| 317     c.add(43); | 321             test.resume(); | 
| 318     c.add(44); | 322           }) | 
| 319     c.close(); | 323         ..expectData(43) | 
| 320     Expect.listEquals(expectedEvents.events, actualEvents.events); | 324         ..expectData(44) | 
| 321     actualEvents.resume(); | 325         ..expectDone() | 
| 322     Expect.listEquals(expectedEvents.events, actualEvents.events); | 326         ..expectCancel(); | 
| 323     expectedEvents..add(43)..add(44)..close(); | 327     test..listen() | 
| 324     actualEvents.onDone(expectAsync0(() { | 328         ..add(42) | 
| 325       Expect.listEquals(expectedEvents.events, actualEvents.events); | 329         ..add(43) | 
| 326     })); | 330         ..add(44) | 
| 327     actualEvents.resume(); | 331         ..close(); | 
| 328   }); | 332   }); | 
| 329 | 333 | 
| 330   test("pause twice direct-event-unpause", () { | 334   test("pause twice direct-event-unpause", () { | 
| 331     StreamController c = new StreamController(); | 335     StreamProtocolTest test = new StreamProtocolTest(); | 
| 332     Events actualEvents = new Events.capture(c.stream); |  | 
| 333     Events expectedEvents = new Events(); |  | 
| 334     expectedEvents.add(42); |  | 
| 335     c.add(42); |  | 
| 336     Expect.listEquals(expectedEvents.events, actualEvents.events); |  | 
| 337     Completer completer = new Completer(); | 336     Completer completer = new Completer(); | 
| 338     actualEvents.pause(completer.future); | 337     test..expectListen() | 
| 339     actualEvents.pause(); | 338         ..expectData(42, () { | 
| 340     c.add(43); | 339             test.pause(); | 
| 341     c.add(44); | 340             test.pause(completer.future); | 
| 342     c.close(); | 341             test.add(43); | 
| 343     Expect.listEquals(expectedEvents.events, actualEvents.events); | 342             test.add(44); | 
| 344     actualEvents.resume(); | 343             test.close(); | 
| 345     Expect.listEquals(expectedEvents.events, actualEvents.events); | 344           }) | 
| 346     expectedEvents..add(43)..add(44)..close(); | 345         ..expectPause(() { | 
| 347     actualEvents.onDone(expectAsync0(() { | 346             completer.future.then((v) => test.resume()); | 
| 348       Expect.listEquals(expectedEvents.events, actualEvents.events); | 347             completer.complete(null); | 
| 349     })); | 348           }) | 
| 350     completer.complete(); | 349         ..expectData(43) | 
| 351   }); | 350         ..expectData(44) | 
| 352 | 351         ..expectDone() | 
| 353   test("pause twice direct-unpause", () { | 352         ..expectCancel(); | 
| 354     StreamController c = new StreamController(); | 353     test..listen() | 
| 355     Events actualEvents = new Events.capture(c.stream); | 354         ..add(42); | 
| 356     Events expectedEvents = new Events(); |  | 
| 357     expectedEvents.add(42); |  | 
| 358     c.add(42); |  | 
| 359     Expect.listEquals(expectedEvents.events, actualEvents.events); |  | 
| 360     Completer completer = new Completer(); |  | 
| 361     actualEvents.pause(completer.future); |  | 
| 362     actualEvents.pause(); |  | 
| 363     c.add(43); |  | 
| 364     c.add(44); |  | 
| 365     c.close(); |  | 
| 366     Expect.listEquals(expectedEvents.events, actualEvents.events); |  | 
| 367     completer.complete(); |  | 
| 368     Expect.listEquals(expectedEvents.events, actualEvents.events); |  | 
| 369     expectedEvents..add(43)..add(44)..close(); |  | 
| 370     actualEvents.onDone(expectAsync0(() { |  | 
| 371       Expect.listEquals(expectedEvents.events, actualEvents.events); |  | 
| 372     })); |  | 
| 373     actualEvents.resume(); |  | 
| 374   }); | 355   }); | 
| 375 } | 356 } | 
| 376 | 357 | 
| 377 class TestError { const TestError(); } | 358 class TestError { const TestError(); } | 
| 378 | 359 | 
| 379 testRethrow() { | 360 testRethrow() { | 
| 380   TestError error = const TestError(); | 361   TestError error = const TestError(); | 
| 381 | 362 | 
| 382 | 363 | 
| 383   testStream(name, streamValueTransform) { | 364   testStream(name, streamValueTransform) { | 
| (...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after  Loading... | 
| 424   testFuture("forEach", (s, act) => s.forEach(act)); | 405   testFuture("forEach", (s, act) => s.forEach(act)); | 
| 425   testFuture("every", (s, act) => s.every(act)); | 406   testFuture("every", (s, act) => s.every(act)); | 
| 426   testFuture("any", (s, act) => s.any(act)); | 407   testFuture("any", (s, act) => s.any(act)); | 
| 427   testFuture("reduce", (s, act) => s.reduce((a,b) => act(b))); | 408   testFuture("reduce", (s, act) => s.reduce((a,b) => act(b))); | 
| 428   testFuture("fold", (s, act) => s.fold(0, (a,b) => act(b))); | 409   testFuture("fold", (s, act) => s.fold(0, (a,b) => act(b))); | 
| 429   testFuture("drain", (s, act) => s.drain().then(act)); | 410   testFuture("drain", (s, act) => s.drain().then(act)); | 
| 430 } | 411 } | 
| 431 | 412 | 
| 432 void testBroadcastController() { | 413 void testBroadcastController() { | 
| 433   test("broadcast-controller-basic", () { | 414   test("broadcast-controller-basic", () { | 
| 434     StreamController<int> c = new StreamController.broadcast( | 415     StreamProtocolTest test = new StreamProtocolTest.broadcast(); | 
| 435       onListen: expectAsync0(() {}), | 416     test..expectListen() | 
| 436       onCancel: expectAsync0(() {}) | 417         ..expectData(42) | 
| 437     ); | 418         ..expectDone() | 
| 438     Stream<int> s = c.stream; | 419         ..expectCancel(test.terminate); | 
| 439     s.listen(expectAsync1((x) { expect(x, equals(42)); })); | 420     test..listen() | 
| 440     c.add(42); | 421         ..add(42) | 
| 441     c.close(); | 422         ..close(); | 
| 442   }); | 423   }); | 
| 443 | 424 | 
| 444   test("broadcast-controller-listen-twice", () { | 425   test("broadcast-controller-listen-twice", () { | 
| 445     StreamController<int> c = new StreamController.broadcast( | 426     StreamProtocolTest test = new StreamProtocolTest.broadcast(); | 
| 446       onListen: expectAsync0(() {}), | 427     test..expectListen() | 
| 447       onCancel: expectAsync0(() {}) | 428         ..expectData(42, () { | 
| 448     ); | 429             test.listen(); | 
| 449     c.stream.listen(expectAsync1((x) { expect(x, equals(42)); }, count: 2)); | 430             test.add(37); | 
| 450     c.add(42); | 431             test.close(); | 
| 451     c.stream.listen(expectAsync1((x) { expect(x, equals(42)); })); | 432           }) | 
| 452     c.add(42); | 433       // Order is not guaranteed between subscriptions if not sync. | 
| 453     c.close(); | 434         ..expectData(37) | 
|  | 435         ..expectData(37) | 
|  | 436         ..expectDone() | 
|  | 437         ..expectDone() | 
|  | 438         ..expectCancel(test.terminate); | 
|  | 439     test.listen(); | 
|  | 440     test.add(42); | 
| 454   }); | 441   }); | 
| 455 | 442 | 
| 456   test("broadcast-controller-listen-twice-non-overlap", () { | 443   test("broadcast-controller-listen-twice-non-overlap", () { | 
| 457     StreamController<int> c = new StreamController.broadcast( | 444     StreamProtocolTest test = new StreamProtocolTest.broadcast(); | 
| 458       onListen: expectAsync0(() {}, count: 2), | 445     test | 
| 459       onCancel: expectAsync0(() {}, count: 2) | 446     ..expectListen(() { | 
| 460     ); | 447       test.add(42); | 
| 461     var sub = c.stream.listen(expectAsync1((x) { expect(x, equals(42)); })); | 448     }) | 
| 462     c.add(42); | 449     ..expectData(42, () { | 
| 463     sub.cancel(); | 450       test.cancel(); | 
| 464     c.stream.listen(expectAsync1((x) { expect(x, equals(42)); })); | 451     }) | 
| 465     c.add(42); | 452     ..expectCancel(() { | 
| 466     c.close(); | 453       test.listen(); | 
|  | 454     })..expectListen(() { | 
|  | 455       test.add(37); | 
|  | 456     }) | 
|  | 457     ..expectData(37, () { | 
|  | 458       test.close(); | 
|  | 459     }) | 
|  | 460     ..expectDone() | 
|  | 461     ..expectCancel(test.terminate); | 
|  | 462     test.listen(); | 
| 467   }); | 463   }); | 
| 468 | 464 | 
| 469   test("broadcast-controller-individual-pause", () { | 465   test("broadcast-controller-individual-pause", () { | 
| 470     StreamController<int> c = new StreamController.broadcast( | 466     StreamProtocolTest test = new StreamProtocolTest.broadcast(); | 
| 471       onListen: expectAsync0(() {}), | 467     test.trace = true; | 
| 472       onCancel: expectAsync0(() {}) | 468     var sub1; | 
| 473     ); | 469     test..expectListen() | 
| 474     var sub1 = c.stream.listen(expectAsync1((x) { expect(x, equals(42)); })); | 470         ..expectData(42) | 
| 475     var sub2 = c.stream.listen(expectAsync1((x) { expect(x, equals(42)); }, | 471         ..expectData(42, () { sub1.pause(); }) | 
| 476                                             count: 3)); | 472         ..expectData(43, () { | 
| 477     c.add(42); | 473       sub1.cancel(); | 
| 478     sub1.pause(); | 474       test.listen(); | 
| 479     c.add(42); | 475       test.add(44); | 
| 480     sub1.cancel(); | 476       test.expectData(44); | 
| 481     var sub3 = c.stream.listen(expectAsync1((x) { expect(x, equals(42)); })); | 477       test.expectData(44, test.terminate); | 
| 482     c.add(42); | 478     }); | 
| 483     c.close(); | 479     sub1 = test.listen(); | 
|  | 480     test.listen(); | 
|  | 481     test.add(42); | 
|  | 482     test.add(43); | 
| 484   }); | 483   }); | 
| 485 | 484 | 
| 486   test("broadcast-controller-add-in-callback", () { | 485   test("broadcast-controller-add-in-callback", () { | 
| 487     StreamController<int> c; | 486     StreamProtocolTest test = new StreamProtocolTest.broadcast(); | 
| 488     c = new StreamController( | 487     test.expectListen(); | 
| 489       onListen: expectAsync0(() {}), | 488     var sub = test.listen(); | 
| 490       onCancel: expectAsync0(() { | 489     test.add(42); | 
| 491         c.add(42); | 490     sub.expectData(42, () { | 
| 492       }) | 491       test.add(87); | 
| 493     ); |  | 
| 494     var sub; |  | 
| 495     sub = c.stream.asBroadcastStream().listen(expectAsync1((v) { |  | 
| 496       Expect.equals(37, v); |  | 
| 497       c.add(21); |  | 
| 498       sub.cancel(); | 492       sub.cancel(); | 
| 499     })); | 493     }); | 
| 500     c.add(37);  // Triggers listener, which adds 21 and removes itself. | 494     test.expectCancel(() { | 
| 501     // Removing listener triggers onCancel which adds another 42. | 495       test.add(37); | 
| 502     // Both 21 and 42 are lost because there are no listeners. | 496       test.terminate(); | 
|  | 497     }); | 
| 503   }); | 498   }); | 
| 504 } | 499 } | 
| 505 | 500 | 
| 506 main() { | 501 main() { | 
| 507   testController(); | 502   testController(); | 
| 508   testSingleController(); | 503   testSingleController(); | 
| 509   testExtraMethods(); | 504   testExtraMethods(); | 
| 510   testPause(); | 505   testPause(); | 
| 511   testRethrow(); | 506   testRethrow(); | 
| 512   testBroadcastController(); | 507   testBroadcastController(); | 
| 513 } | 508 } | 
| OLD | NEW | 
|---|