| OLD | NEW |
| 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE filevents. | 3 // BSD-style license that can be found in the LICENSE filevents. |
| 4 | 4 |
| 5 import "dart:async"; | 5 import "dart:async"; |
| 6 | 6 |
| 7 import "package:async/async.dart" show StreamQueue; | 7 import "package:async/async.dart" show StreamQueue; |
| 8 import "package:test/test.dart"; | 8 import "package:test/test.dart"; |
| 9 | 9 |
| 10 import "utils.dart"; | 10 import "utils.dart"; |
| 11 | 11 |
| 12 main() { | 12 main() { |
| 13 group("source stream", () { | 13 group("source stream", () { |
| 14 test("is listened to on first request, paused between requests", () async { | 14 test("is listened to on first request, paused between requests", () async { |
| 15 var controller = new StreamController(); | 15 var controller = new StreamController<int>(); |
| 16 var events = new StreamQueue<int>(controller.stream); | 16 var events = new StreamQueue<int>(controller.stream); |
| 17 await flushMicrotasks(); | 17 await flushMicrotasks(); |
| 18 expect(controller.hasListener, isFalse); | 18 expect(controller.hasListener, isFalse); |
| 19 | 19 |
| 20 var next = events.next; | 20 var next = events.next; |
| 21 expect(controller.hasListener, isTrue); | 21 expect(controller.hasListener, isTrue); |
| 22 expect(controller.isPaused, isFalse); | 22 expect(controller.isPaused, isFalse); |
| 23 | 23 |
| 24 controller.add(1); | 24 controller.add(1); |
| 25 | 25 |
| (...skipping 255 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 281 expect(() => events.next, throwsStateError); | 281 expect(() => events.next, throwsStateError); |
| 282 expect(() => events.skip(1), throwsStateError); | 282 expect(() => events.skip(1), throwsStateError); |
| 283 expect(() => events.take(1), throwsStateError); | 283 expect(() => events.take(1), throwsStateError); |
| 284 expect(() => events.rest, throwsStateError); | 284 expect(() => events.rest, throwsStateError); |
| 285 expect(() => events.cancel(), throwsStateError); | 285 expect(() => events.cancel(), throwsStateError); |
| 286 expect(stream.toList(), completion([1, 2, 3, 4])); | 286 expect(stream.toList(), completion([1, 2, 3, 4])); |
| 287 }); | 287 }); |
| 288 | 288 |
| 289 test("forwards to underlying stream", () async { | 289 test("forwards to underlying stream", () async { |
| 290 var cancel = new Completer(); | 290 var cancel = new Completer(); |
| 291 var controller = new StreamController(onCancel: () => cancel.future); | 291 var controller = new StreamController<int>(onCancel: () => cancel.future); |
| 292 var events = new StreamQueue<int>(controller.stream); | 292 var events = new StreamQueue<int>(controller.stream); |
| 293 expect(controller.hasListener, isFalse); | 293 expect(controller.hasListener, isFalse); |
| 294 var next = events.next; | 294 var next = events.next; |
| 295 expect(controller.hasListener, isTrue); | 295 expect(controller.hasListener, isTrue); |
| 296 expect(controller.isPaused, isFalse); | 296 expect(controller.isPaused, isFalse); |
| 297 | 297 |
| 298 controller.add(1); | 298 controller.add(1); |
| 299 expect(await next, 1); | 299 expect(await next, 1); |
| 300 expect(controller.isPaused, isTrue); | 300 expect(controller.isPaused, isTrue); |
| 301 | 301 |
| (...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 340 expect(() => events.next, throwsStateError); | 340 expect(() => events.next, throwsStateError); |
| 341 expect(() => events.skip(1), throwsStateError); | 341 expect(() => events.skip(1), throwsStateError); |
| 342 expect(() => events.take(1), throwsStateError); | 342 expect(() => events.take(1), throwsStateError); |
| 343 expect(() => events.rest, throwsStateError); | 343 expect(() => events.rest, throwsStateError); |
| 344 expect(() => events.cancel(), throwsStateError); | 344 expect(() => events.cancel(), throwsStateError); |
| 345 }); | 345 }); |
| 346 | 346 |
| 347 test("cancels underlying subscription when called before any event", | 347 test("cancels underlying subscription when called before any event", |
| 348 () async { | 348 () async { |
| 349 var cancelFuture = new Future.value(42); | 349 var cancelFuture = new Future.value(42); |
| 350 var controller = new StreamController(onCancel: () => cancelFuture); | 350 var controller = new StreamController<int>(onCancel: () => cancelFuture); |
| 351 var events = new StreamQueue<int>(controller.stream); | 351 var events = new StreamQueue<int>(controller.stream); |
| 352 expect(await events.cancel(), 42); | 352 expect(await events.cancel(), 42); |
| 353 }); | 353 }); |
| 354 | 354 |
| 355 test("cancels underlying subscription, returns result", () async { | 355 test("cancels underlying subscription, returns result", () async { |
| 356 var cancelFuture = new Future.value(42); | 356 var cancelFuture = new Future.value(42); |
| 357 var controller = new StreamController(onCancel: () => cancelFuture); | 357 var controller = new StreamController<int>(onCancel: () => cancelFuture); |
| 358 var events = new StreamQueue<int>(controller.stream); | 358 var events = new StreamQueue<int>(controller.stream); |
| 359 controller.add(1); | 359 controller.add(1); |
| 360 expect(await events.next, 1); | 360 expect(await events.next, 1); |
| 361 expect(await events.cancel(), 42); | 361 expect(await events.cancel(), 42); |
| 362 }); | 362 }); |
| 363 | 363 |
| 364 group("with immediate: true", () { | 364 group("with immediate: true", () { |
| 365 test("closes the events, prevents any other operation", () async { | 365 test("closes the events, prevents any other operation", () async { |
| 366 var events = new StreamQueue<int>(createStream()); | 366 var events = new StreamQueue<int>(createStream()); |
| 367 await events.cancel(immediate: true); | 367 await events.cancel(immediate: true); |
| 368 expect(() => events.next, throwsStateError); | 368 expect(() => events.next, throwsStateError); |
| 369 expect(() => events.skip(1), throwsStateError); | 369 expect(() => events.skip(1), throwsStateError); |
| 370 expect(() => events.take(1), throwsStateError); | 370 expect(() => events.take(1), throwsStateError); |
| 371 expect(() => events.rest, throwsStateError); | 371 expect(() => events.rest, throwsStateError); |
| 372 expect(() => events.cancel(), throwsStateError); | 372 expect(() => events.cancel(), throwsStateError); |
| 373 }); | 373 }); |
| 374 | 374 |
| 375 test("cancels the underlying subscription immediately", () async { | 375 test("cancels the underlying subscription immediately", () async { |
| 376 var controller = new StreamController(); | 376 var controller = new StreamController<int>(); |
| 377 controller.add(1); | 377 controller.add(1); |
| 378 | 378 |
| 379 var events = new StreamQueue<int>(controller.stream); | 379 var events = new StreamQueue<int>(controller.stream); |
| 380 expect(await events.next, 1); | 380 expect(await events.next, 1); |
| 381 expect(controller.hasListener, isTrue); | 381 expect(controller.hasListener, isTrue); |
| 382 | 382 |
| 383 events.cancel(immediate: true); | 383 events.cancel(immediate: true); |
| 384 await expect(controller.hasListener, isFalse); | 384 await expect(controller.hasListener, isFalse); |
| 385 }); | 385 }); |
| 386 | 386 |
| 387 test("cancels the underlying subscription when called before any event", | 387 test("cancels the underlying subscription when called before any event", |
| 388 () async { | 388 () async { |
| 389 var cancelFuture = new Future.value(42); | 389 var cancelFuture = new Future.value(42); |
| 390 var controller = new StreamController(onCancel: () => cancelFuture); | 390 var controller = new StreamController<int>(onCancel: () => cancelFuture)
; |
| 391 | 391 |
| 392 var events = new StreamQueue<int>(controller.stream); | 392 var events = new StreamQueue<int>(controller.stream); |
| 393 expect(await events.cancel(immediate: true), 42); | 393 expect(await events.cancel(immediate: true), 42); |
| 394 }); | 394 }); |
| 395 | 395 |
| 396 test("closes pending requests", () async { | 396 test("closes pending requests", () async { |
| 397 var events = new StreamQueue<int>(createStream()); | 397 var events = new StreamQueue<int>(createStream()); |
| 398 expect(await events.next, 1); | 398 expect(await events.next, 1); |
| 399 expect(events.next, throwsStateError); | 399 expect(events.next, throwsStateError); |
| 400 expect(events.hasNext, completion(isFalse)); | 400 expect(events.hasNext, completion(isFalse)); |
| 401 | 401 |
| 402 await events.cancel(immediate: true); | 402 await events.cancel(immediate: true); |
| 403 }); | 403 }); |
| 404 | 404 |
| 405 test("returns the result of closing the underlying subscription", | 405 test("returns the result of closing the underlying subscription", |
| 406 () async { | 406 () async { |
| 407 var controller = new StreamController( | 407 var controller = new StreamController<int>( |
| 408 onCancel: () => new Future.value(42)); | 408 onCancel: () => new Future.value(42)); |
| 409 var events = new StreamQueue<int>(controller.stream); | 409 var events = new StreamQueue<int>(controller.stream); |
| 410 expect(await events.cancel(immediate: true), 42); | 410 expect(await events.cancel(immediate: true), 42); |
| 411 }); | 411 }); |
| 412 | 412 |
| 413 test("listens and then cancels a stream that hasn't been listened to yet", | 413 test("listens and then cancels a stream that hasn't been listened to yet", |
| 414 () async { | 414 () async { |
| 415 var wasListened = false; | 415 var wasListened = false; |
| 416 var controller = new StreamController( | 416 var controller = new StreamController<int>( |
| 417 onListen: () => wasListened = true); | 417 onListen: () => wasListened = true); |
| 418 var events = new StreamQueue<int>(controller.stream); | 418 var events = new StreamQueue<int>(controller.stream); |
| 419 expect(wasListened, isFalse); | 419 expect(wasListened, isFalse); |
| 420 expect(controller.hasListener, isFalse); | 420 expect(controller.hasListener, isFalse); |
| 421 | 421 |
| 422 await events.cancel(immediate: true); | 422 await events.cancel(immediate: true); |
| 423 expect(wasListened, isTrue); | 423 expect(wasListened, isTrue); |
| 424 expect(controller.hasListener, isFalse); | 424 expect(controller.hasListener, isFalse); |
| 425 }); | 425 }); |
| 426 }); | 426 }); |
| (...skipping 14 matching lines...) Expand all Loading... |
| 441 test("true at end", () async { | 441 test("true at end", () async { |
| 442 var events = new StreamQueue<int>(createStream()); | 442 var events = new StreamQueue<int>(createStream()); |
| 443 for (int i = 1; i <= 4; i++) { | 443 for (int i = 1; i <= 4; i++) { |
| 444 expect(await events.next, i); | 444 expect(await events.next, i); |
| 445 } | 445 } |
| 446 expect(await events.hasNext, isFalse); | 446 expect(await events.hasNext, isFalse); |
| 447 }); | 447 }); |
| 448 | 448 |
| 449 test("true when enqueued", () async { | 449 test("true when enqueued", () async { |
| 450 var events = new StreamQueue<int>(createStream()); | 450 var events = new StreamQueue<int>(createStream()); |
| 451 var values = []; | 451 var values = <int>[]; |
| 452 for (int i = 1; i <= 3; i++) { | 452 for (int i = 1; i <= 3; i++) { |
| 453 events.next.then(values.add); | 453 events.next.then(values.add); |
| 454 } | 454 } |
| 455 expect(values, isEmpty); | 455 expect(values, isEmpty); |
| 456 expect(await events.hasNext, isTrue); | 456 expect(await events.hasNext, isTrue); |
| 457 expect(values, [1, 2, 3]); | 457 expect(values, [1, 2, 3]); |
| 458 }); | 458 }); |
| 459 | 459 |
| 460 test("false when enqueued", () async { | 460 test("false when enqueued", () async { |
| 461 var events = new StreamQueue<int>(createStream()); | 461 var events = new StreamQueue<int>(createStream()); |
| 462 var values = []; | 462 var values = <int>[]; |
| 463 for (int i = 1; i <= 4; i++) { | 463 for (int i = 1; i <= 4; i++) { |
| 464 events.next.then(values.add); | 464 events.next.then(values.add); |
| 465 } | 465 } |
| 466 expect(values, isEmpty); | 466 expect(values, isEmpty); |
| 467 expect(await events.hasNext, isFalse); | 467 expect(await events.hasNext, isFalse); |
| 468 expect(values, [1, 2, 3, 4]); | 468 expect(values, [1, 2, 3, 4]); |
| 469 }); | 469 }); |
| 470 | 470 |
| 471 test("true when data event", () async { | 471 test("true when data event", () async { |
| 472 var controller = new StreamController(); | 472 var controller = new StreamController<int>(); |
| 473 var events = new StreamQueue<int>(controller.stream); | 473 var events = new StreamQueue<int>(controller.stream); |
| 474 | 474 |
| 475 var hasNext; | 475 var hasNext; |
| 476 events.hasNext.then((result) { hasNext = result; }); | 476 events.hasNext.then((result) { hasNext = result; }); |
| 477 await flushMicrotasks(); | 477 await flushMicrotasks(); |
| 478 expect(hasNext, isNull); | 478 expect(hasNext, isNull); |
| 479 controller.add(42); | 479 controller.add(42); |
| 480 expect(hasNext, isNull); | 480 expect(hasNext, isNull); |
| 481 await flushMicrotasks(); | 481 await flushMicrotasks(); |
| 482 expect(hasNext, isTrue); | 482 expect(hasNext, isTrue); |
| 483 }); | 483 }); |
| 484 | 484 |
| 485 test("true when error event", () async { | 485 test("true when error event", () async { |
| 486 var controller = new StreamController(); | 486 var controller = new StreamController<int>(); |
| 487 var events = new StreamQueue<int>(controller.stream); | 487 var events = new StreamQueue<int>(controller.stream); |
| 488 | 488 |
| 489 var hasNext; | 489 var hasNext; |
| 490 events.hasNext.then((result) { hasNext = result; }); | 490 events.hasNext.then((result) { hasNext = result; }); |
| 491 await flushMicrotasks(); | 491 await flushMicrotasks(); |
| 492 expect(hasNext, isNull); | 492 expect(hasNext, isNull); |
| 493 controller.addError("BAD"); | 493 controller.addError("BAD"); |
| 494 expect(hasNext, isNull); | 494 expect(hasNext, isNull); |
| 495 await flushMicrotasks(); | 495 await flushMicrotasks(); |
| 496 expect(hasNext, isTrue); | 496 expect(hasNext, isTrue); |
| (...skipping 21 matching lines...) Expand all Loading... |
| 518 test("- next after true", () async { | 518 test("- next after true", () async { |
| 519 var events = new StreamQueue<int>(createStream()); | 519 var events = new StreamQueue<int>(createStream()); |
| 520 expect(await events.next, 1); | 520 expect(await events.next, 1); |
| 521 expect(await events.hasNext, true); | 521 expect(await events.hasNext, true); |
| 522 expect(await events.next, 2); | 522 expect(await events.next, 2); |
| 523 expect(await events.next, 3); | 523 expect(await events.next, 3); |
| 524 }); | 524 }); |
| 525 | 525 |
| 526 test("- next after true, enqueued", () async { | 526 test("- next after true, enqueued", () async { |
| 527 var events = new StreamQueue<int>(createStream()); | 527 var events = new StreamQueue<int>(createStream()); |
| 528 var responses = []; | 528 var responses = <Object>[]; |
| 529 events.next.then(responses.add); | 529 events.next.then(responses.add); |
| 530 events.hasNext.then(responses.add); | 530 events.hasNext.then(responses.add); |
| 531 events.next.then(responses.add); | 531 events.next.then(responses.add); |
| 532 do { | 532 do { |
| 533 await flushMicrotasks(); | 533 await flushMicrotasks(); |
| 534 } while (responses.length < 3); | 534 } while (responses.length < 3); |
| 535 expect(responses, [1, true, 2]); | 535 expect(responses, [1, true, 2]); |
| 536 }); | 536 }); |
| 537 | 537 |
| 538 test("- skip 0 after true", () async { | 538 test("- skip 0 after true", () async { |
| (...skipping 137 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 676 yield 1; | 676 yield 1; |
| 677 await flushMicrotasks(); | 677 await flushMicrotasks(); |
| 678 yield 2; | 678 yield 2; |
| 679 await flushMicrotasks(); | 679 await flushMicrotasks(); |
| 680 yield 3; | 680 yield 3; |
| 681 await flushMicrotasks(); | 681 await flushMicrotasks(); |
| 682 yield 4; | 682 yield 4; |
| 683 } | 683 } |
| 684 | 684 |
| 685 Stream<int> createErrorStream() { | 685 Stream<int> createErrorStream() { |
| 686 StreamController controller = new StreamController<int>(); | 686 var controller = new StreamController<int>(); |
| 687 () async { | 687 () async { |
| 688 controller.add(1); | 688 controller.add(1); |
| 689 await flushMicrotasks(); | 689 await flushMicrotasks(); |
| 690 controller.add(2); | 690 controller.add(2); |
| 691 await flushMicrotasks(); | 691 await flushMicrotasks(); |
| 692 controller.addError("To err is divine!"); | 692 controller.addError("To err is divine!"); |
| 693 await flushMicrotasks(); | 693 await flushMicrotasks(); |
| 694 controller.add(4); | 694 controller.add(4); |
| 695 await flushMicrotasks(); | 695 await flushMicrotasks(); |
| 696 controller.close(); | 696 controller.close(); |
| 697 }(); | 697 }(); |
| 698 return controller.stream; | 698 return controller.stream; |
| 699 } | 699 } |
| 700 | 700 |
| 701 Stream<int> createLongStream(int eventCount) async* { | 701 Stream<int> createLongStream(int eventCount) async* { |
| 702 for (int i = 0; i < eventCount; i++) yield i; | 702 for (int i = 0; i < eventCount; i++) yield i; |
| 703 } | 703 } |
| OLD | NEW |