| OLD | NEW |
| 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE filevents. | 3 // BSD-style license that can be found in the LICENSE filevents. |
| 4 | 4 |
| 5 import "dart:async"; | 5 import "dart:async"; |
| 6 | 6 |
| 7 import "package:async/async.dart"; | 7 import "package:async/async.dart"; |
| 8 import "package:test/test.dart"; | 8 import "package:test/test.dart"; |
| 9 | 9 |
| 10 import "utils.dart"; | 10 import "utils.dart"; |
| (...skipping 24 matching lines...) Expand all Loading... |
| 35 | 35 |
| 36 expect(await next, 2); | 36 expect(await next, 2); |
| 37 expect(controller.hasListener, isTrue); | 37 expect(controller.hasListener, isTrue); |
| 38 expect(controller.isPaused, isTrue); | 38 expect(controller.isPaused, isTrue); |
| 39 | 39 |
| 40 events.cancel(); | 40 events.cancel(); |
| 41 expect(controller.hasListener, isFalse); | 41 expect(controller.hasListener, isFalse); |
| 42 }); | 42 }); |
| 43 }); | 43 }); |
| 44 | 44 |
| 45 group("lookAhead operation", () { |
| 46 test("as simple list of events", () async { |
| 47 var events = new StreamQueue<int>(createStream()); |
| 48 expect(await events.lookAhead(4), [1, 2, 3, 4]); |
| 49 expect(await events.next, 1); |
| 50 expect(await events.lookAhead(2), [2, 3]); |
| 51 expect(await events.take(2), [2, 3]); |
| 52 expect(await events.next, 4); |
| 53 await events.cancel(); |
| 54 }); |
| 55 |
| 56 test("of 0 events", () async { |
| 57 var events = new StreamQueue<int>(createStream()); |
| 58 expect(events.lookAhead(0), completion([])); |
| 59 expect(events.next, completion(1)); |
| 60 expect(events.lookAhead(0), completion([])); |
| 61 expect(events.next, completion(2)); |
| 62 expect(events.lookAhead(0), completion([])); |
| 63 expect(events.next, completion(3)); |
| 64 expect(events.lookAhead(0), completion([])); |
| 65 expect(events.next, completion(4)); |
| 66 expect(events.lookAhead(0), completion([])); |
| 67 expect(events.lookAhead(5), completion([])); |
| 68 expect(events.next, throwsStateError); |
| 69 await events.cancel(); |
| 70 }); |
| 71 |
| 72 test("with bad arguments throws", () async { |
| 73 var events = new StreamQueue<int>(createStream()); |
| 74 expect(() => events.lookAhead(-1), throwsArgumentError); |
| 75 expect(await events.next, 1); // Did not consume event. |
| 76 expect(() => events.lookAhead(-1), throwsArgumentError); |
| 77 expect(await events.next, 2); // Did not consume event. |
| 78 await events.cancel(); |
| 79 }); |
| 80 |
| 81 test("of too many arguments", () async { |
| 82 var events = new StreamQueue<int>(createStream()); |
| 83 expect(await events.lookAhead(6), [1, 2, 3, 4]); |
| 84 await events.cancel(); |
| 85 }); |
| 86 |
| 87 test("too large later", () async { |
| 88 var events = new StreamQueue<int>(createStream()); |
| 89 expect(await events.next, 1); |
| 90 expect(await events.next, 2); |
| 91 expect(await events.lookAhead(6), [3, 4]); |
| 92 await events.cancel(); |
| 93 }); |
| 94 |
| 95 test("error", () async { |
| 96 var events = new StreamQueue<int>(createErrorStream()); |
| 97 expect(events.lookAhead(4), throwsA("To err is divine!")); |
| 98 expect(events.take(4), throwsA("To err is divine!")); |
| 99 expect(await events.next, 4); |
| 100 await events.cancel(); |
| 101 }); |
| 102 }); |
| 103 |
| 45 group("next operation", () { | 104 group("next operation", () { |
| 46 test("simple sequence of requests", () async { | 105 test("simple sequence of requests", () async { |
| 47 var events = new StreamQueue<int>(createStream()); | 106 var events = new StreamQueue<int>(createStream()); |
| 48 for (int i = 1; i <= 4; i++) { | 107 for (int i = 1; i <= 4; i++) { |
| 49 expect(await events.next, i); | 108 expect(await events.next, i); |
| 50 } | 109 } |
| 51 expect(events.next, throwsStateError); | 110 expect(events.next, throwsStateError); |
| 52 }); | 111 }); |
| 53 | 112 |
| 54 test("multiple requests at the same time", () async { | 113 test("multiple requests at the same time", () async { |
| (...skipping 271 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 326 await flushMicrotasks(); | 385 await flushMicrotasks(); |
| 327 expect(lastEvent, 3); | 386 expect(lastEvent, 3); |
| 328 | 387 |
| 329 var cancelFuture = subscription.cancel(); | 388 var cancelFuture = subscription.cancel(); |
| 330 expect(controller.hasListener, isFalse); | 389 expect(controller.hasListener, isFalse); |
| 331 cancel.complete(42); | 390 cancel.complete(42); |
| 332 expect(cancelFuture, completion(42)); | 391 expect(cancelFuture, completion(42)); |
| 333 }); | 392 }); |
| 334 }); | 393 }); |
| 335 | 394 |
| 395 group("peek operation", () { |
| 396 test("peeks one event", () async { |
| 397 var events = new StreamQueue<int>(createStream()); |
| 398 expect(await events.peek, 1); |
| 399 expect(await events.next, 1); |
| 400 expect(await events.peek, 2); |
| 401 expect(await events.take(2), [2, 3]); |
| 402 expect(await events.peek, 4); |
| 403 expect(await events.next, 4); |
| 404 // Throws at end. |
| 405 expect(events.peek, throws); |
| 406 await events.cancel(); |
| 407 }); |
| 408 test("multiple requests at the same time", () async { |
| 409 var events = new StreamQueue<int>(createStream()); |
| 410 var result = await Future.wait( |
| 411 [events.peek, events.peek, events.next, events.peek, events.peek]); |
| 412 expect(result, [1, 1, 1, 2, 2]); |
| 413 await events.cancel(); |
| 414 }); |
| 415 test("sequence of requests with error", () async { |
| 416 var events = new StreamQueue<int>(createErrorStream()); |
| 417 expect(await events.next, 1); |
| 418 expect(await events.next, 2); |
| 419 expect(events.peek, throwsA("To err is divine!")); |
| 420 // Error stays in queue. |
| 421 expect(events.peek, throwsA("To err is divine!")); |
| 422 expect(events.next, throwsA("To err is divine!")); |
| 423 expect(await events.next, 4); |
| 424 await events.cancel(); |
| 425 }); |
| 426 }); |
| 427 |
| 336 group("cancel operation", () { | 428 group("cancel operation", () { |
| 337 test("closes the events, prevents any other operation", () async { | 429 test("closes the events, prevents any other operation", () async { |
| 338 var events = new StreamQueue<int>(createStream()); | 430 var events = new StreamQueue<int>(createStream()); |
| 339 await events.cancel(); | 431 await events.cancel(); |
| 432 expect(() => events.lookAhead(1), throwsStateError); |
| 340 expect(() => events.next, throwsStateError); | 433 expect(() => events.next, throwsStateError); |
| 434 expect(() => events.peek, throwsStateError); |
| 341 expect(() => events.skip(1), throwsStateError); | 435 expect(() => events.skip(1), throwsStateError); |
| 342 expect(() => events.take(1), throwsStateError); | 436 expect(() => events.take(1), throwsStateError); |
| 343 expect(() => events.rest, throwsStateError); | 437 expect(() => events.rest, throwsStateError); |
| 344 expect(() => events.cancel(), throwsStateError); | 438 expect(() => events.cancel(), throwsStateError); |
| 345 }); | 439 }); |
| 346 | 440 |
| 347 test("cancels underlying subscription when called before any event", | 441 test("cancels underlying subscription when called before any event", |
| 348 () async { | 442 () async { |
| 349 var cancelFuture = new Future.value(42); | 443 var cancelFuture = new Future.value(42); |
| 350 var controller = new StreamController<int>(onCancel: () => cancelFuture); | 444 var controller = new StreamController<int>(onCancel: () => cancelFuture); |
| (...skipping 616 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 967 controller.add(4); | 1061 controller.add(4); |
| 968 await flushMicrotasks(); | 1062 await flushMicrotasks(); |
| 969 controller.close(); | 1063 controller.close(); |
| 970 }(); | 1064 }(); |
| 971 return controller.stream; | 1065 return controller.stream; |
| 972 } | 1066 } |
| 973 | 1067 |
| 974 Stream<int> createLongStream(int eventCount) async* { | 1068 Stream<int> createLongStream(int eventCount) async* { |
| 975 for (int i = 0; i < eventCount; i++) yield i; | 1069 for (int i = 0; i < eventCount; i++) yield i; |
| 976 } | 1070 } |
| OLD | NEW |