OLD | NEW |
(Empty) | |
| 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE filevents. |
| 4 |
| 5 import "dart:async"; |
| 6 |
| 7 import "package:async/async.dart" show StreamQueue; |
| 8 import "package:test/test.dart"; |
| 9 |
| 10 import "utils.dart"; |
| 11 |
| 12 main() { |
| 13 group("source stream", () { |
| 14 test("is listened to on first request, paused between requests", () async { |
| 15 var controller = new StreamController(); |
| 16 var events = new StreamQueue<int>(controller.stream); |
| 17 await flushMicrotasks(); |
| 18 expect(controller.hasListener, isFalse); |
| 19 |
| 20 var next = events.next; |
| 21 expect(controller.hasListener, isTrue); |
| 22 expect(controller.isPaused, isFalse); |
| 23 |
| 24 controller.add(1); |
| 25 |
| 26 expect(await next, 1); |
| 27 expect(controller.hasListener, isTrue); |
| 28 expect(controller.isPaused, isTrue); |
| 29 |
| 30 next = events.next; |
| 31 expect(controller.hasListener, isTrue); |
| 32 expect(controller.isPaused, isFalse); |
| 33 |
| 34 controller.add(2); |
| 35 |
| 36 expect(await next, 2); |
| 37 expect(controller.hasListener, isTrue); |
| 38 expect(controller.isPaused, isTrue); |
| 39 |
| 40 events.cancel(); |
| 41 expect(controller.hasListener, isFalse); |
| 42 }); |
| 43 }); |
| 44 |
| 45 group("next operation", () { |
| 46 test("simple sequence of requests", () async { |
| 47 var events = new StreamQueue<int>(createStream()); |
| 48 for (int i = 1; i <= 4; i++) { |
| 49 expect(await events.next, i); |
| 50 } |
| 51 expect(events.next, throwsStateError); |
| 52 }); |
| 53 |
| 54 test("multiple requests at the same time", () async { |
| 55 var events = new StreamQueue<int>(createStream()); |
| 56 var result = await Future.wait( |
| 57 [events.next, events.next, events.next, events.next]); |
| 58 expect(result, [1, 2, 3, 4]); |
| 59 await events.cancel(); |
| 60 }); |
| 61 |
| 62 test("sequence of requests with error", () async { |
| 63 var events = new StreamQueue<int>(createErrorStream()); |
| 64 expect(await events.next, 1); |
| 65 expect(await events.next, 2); |
| 66 expect(events.next, throwsA("To err is divine!")); |
| 67 expect(await events.next, 4); |
| 68 await events.cancel(); |
| 69 }); |
| 70 }); |
| 71 |
| 72 group("skip operation", () { |
| 73 test("of two elements in the middle of sequence", () async { |
| 74 var events = new StreamQueue<int>(createStream()); |
| 75 expect(await events.next, 1); |
| 76 expect(await events.skip(2), 0); |
| 77 expect(await events.next, 4); |
| 78 await events.cancel(); |
| 79 }); |
| 80 |
| 81 test("with negative/bad arguments throws", () async { |
| 82 var events = new StreamQueue<int>(createStream()); |
| 83 expect(() => events.skip(-1), throwsArgumentError); |
| 84 // A non-int throws either a type error or an argument error, |
| 85 // depending on whether it's checked mode or not. |
| 86 expect(await events.next, 1); // Did not consume event. |
| 87 expect(() => events.skip(-1), throwsArgumentError); |
| 88 expect(await events.next, 2); // Did not consume event. |
| 89 await events.cancel(); |
| 90 }); |
| 91 |
| 92 test("of 0 elements works", () async { |
| 93 var events = new StreamQueue<int>(createStream()); |
| 94 expect(events.skip(0), completion(0)); |
| 95 expect(events.next, completion(1)); |
| 96 expect(events.skip(0), completion(0)); |
| 97 expect(events.next, completion(2)); |
| 98 expect(events.skip(0), completion(0)); |
| 99 expect(events.next, completion(3)); |
| 100 expect(events.skip(0), completion(0)); |
| 101 expect(events.next, completion(4)); |
| 102 expect(events.skip(0), completion(0)); |
| 103 expect(events.skip(5), completion(5)); |
| 104 expect(events.next, throwsStateError); |
| 105 await events.cancel(); |
| 106 }); |
| 107 |
| 108 test("of too many events ends at stream start", () async { |
| 109 var events = new StreamQueue<int>(createStream()); |
| 110 expect(await events.skip(6), 2); |
| 111 await events.cancel(); |
| 112 }); |
| 113 |
| 114 test("of too many events after some events", () async { |
| 115 var events = new StreamQueue<int>(createStream()); |
| 116 expect(await events.next, 1); |
| 117 expect(await events.next, 2); |
| 118 expect(await events.skip(6), 4); |
| 119 await events.cancel(); |
| 120 }); |
| 121 |
| 122 test("of too many events ends at stream end", () async { |
| 123 var events = new StreamQueue<int>(createStream()); |
| 124 expect(await events.next, 1); |
| 125 expect(await events.next, 2); |
| 126 expect(await events.next, 3); |
| 127 expect(await events.next, 4); |
| 128 expect(await events.skip(2), 2); |
| 129 await events.cancel(); |
| 130 }); |
| 131 |
| 132 test("of events with error", () async { |
| 133 var events = new StreamQueue<int>(createErrorStream()); |
| 134 expect(events.skip(4), throwsA("To err is divine!")); |
| 135 expect(await events.next, 4); |
| 136 await events.cancel(); |
| 137 }); |
| 138 |
| 139 test("of events with error, and skip again after", () async { |
| 140 var events = new StreamQueue<int>(createErrorStream()); |
| 141 expect(events.skip(4), throwsA("To err is divine!")); |
| 142 expect(events.skip(2), completion(1)); |
| 143 await events.cancel(); |
| 144 }); |
| 145 test("multiple skips at same time complete in order.", () async { |
| 146 var events = new StreamQueue<int>(createStream()); |
| 147 var skip1 = events.skip(1); |
| 148 var skip2 = events.skip(0); |
| 149 var skip3 = events.skip(4); |
| 150 var skip4 = events.skip(1); |
| 151 var index = 0; |
| 152 // Check that futures complete in order. |
| 153 sequence(expectedValue, sequenceIndex) => (value) { |
| 154 expect(value, expectedValue); |
| 155 expect(index, sequenceIndex); |
| 156 index++; |
| 157 }; |
| 158 await Future.wait([skip1.then(sequence(0, 0)), |
| 159 skip2.then(sequence(0, 1)), |
| 160 skip3.then(sequence(1, 2)), |
| 161 skip4.then(sequence(1, 3))]); |
| 162 await events.cancel(); |
| 163 }); |
| 164 }); |
| 165 |
| 166 group("take operation", () { |
| 167 test("as simple take of events", () async { |
| 168 var events = new StreamQueue<int>(createStream()); |
| 169 expect(await events.next, 1); |
| 170 expect(await events.take(2), [2, 3]); |
| 171 expect(await events.next, 4); |
| 172 await events.cancel(); |
| 173 }); |
| 174 |
| 175 test("of 0 events", () async { |
| 176 var events = new StreamQueue<int>(createStream()); |
| 177 expect(events.take(0), completion([])); |
| 178 expect(events.next, completion(1)); |
| 179 expect(events.take(0), completion([])); |
| 180 expect(events.next, completion(2)); |
| 181 expect(events.take(0), completion([])); |
| 182 expect(events.next, completion(3)); |
| 183 expect(events.take(0), completion([])); |
| 184 expect(events.next, completion(4)); |
| 185 expect(events.take(0), completion([])); |
| 186 expect(events.take(5), completion([])); |
| 187 expect(events.next, throwsStateError); |
| 188 await events.cancel(); |
| 189 }); |
| 190 |
| 191 test("with bad arguments throws", () async { |
| 192 var events = new StreamQueue<int>(createStream()); |
| 193 expect(() => events.take(-1), throwsArgumentError); |
| 194 expect(await events.next, 1); // Did not consume event. |
| 195 expect(() => events.take(-1), throwsArgumentError); |
| 196 expect(await events.next, 2); // Did not consume event. |
| 197 await events.cancel(); |
| 198 }); |
| 199 |
| 200 test("of too many arguments", () async { |
| 201 var events = new StreamQueue<int>(createStream()); |
| 202 expect(await events.take(6), [1, 2, 3, 4]); |
| 203 await events.cancel(); |
| 204 }); |
| 205 |
| 206 test("too large later", () async { |
| 207 var events = new StreamQueue<int>(createStream()); |
| 208 expect(await events.next, 1); |
| 209 expect(await events.next, 2); |
| 210 expect(await events.take(6), [3, 4]); |
| 211 await events.cancel(); |
| 212 }); |
| 213 |
| 214 test("error", () async { |
| 215 var events = new StreamQueue<int>(createErrorStream()); |
| 216 expect(events.take(4), throwsA("To err is divine!")); |
| 217 expect(await events.next, 4); |
| 218 await events.cancel(); |
| 219 }); |
| 220 }); |
| 221 |
| 222 group("rest operation", () { |
| 223 test("after single next", () async { |
| 224 var events = new StreamQueue<int>(createStream()); |
| 225 expect(await events.next, 1); |
| 226 expect(await events.rest.toList(), [2, 3, 4]); |
| 227 }); |
| 228 |
| 229 test("at start", () async { |
| 230 var events = new StreamQueue<int>(createStream()); |
| 231 expect(await events.rest.toList(), [1, 2, 3, 4]); |
| 232 }); |
| 233 |
| 234 test("at end", () async { |
| 235 var events = new StreamQueue<int>(createStream()); |
| 236 expect(await events.next, 1); |
| 237 expect(await events.next, 2); |
| 238 expect(await events.next, 3); |
| 239 expect(await events.next, 4); |
| 240 expect(await events.rest.toList(), isEmpty); |
| 241 }); |
| 242 |
| 243 test("after end", () async { |
| 244 var events = new StreamQueue<int>(createStream()); |
| 245 expect(await events.next, 1); |
| 246 expect(await events.next, 2); |
| 247 expect(await events.next, 3); |
| 248 expect(await events.next, 4); |
| 249 expect(events.next, throwsStateError); |
| 250 expect(await events.rest.toList(), isEmpty); |
| 251 }); |
| 252 |
| 253 test("after receiving done requested before", () async { |
| 254 var events = new StreamQueue<int>(createStream()); |
| 255 var next1 = events.next; |
| 256 var next2 = events.next; |
| 257 var next3 = events.next; |
| 258 var rest = events.rest; |
| 259 for (int i = 0; i < 10; i++) { |
| 260 await flushMicrotasks(); |
| 261 } |
| 262 expect(await next1, 1); |
| 263 expect(await next2, 2); |
| 264 expect(await next3, 3); |
| 265 expect(await rest.toList(), [4]); |
| 266 }); |
| 267 |
| 268 test("with an error event error", () async { |
| 269 var events = new StreamQueue<int>(createErrorStream()); |
| 270 expect(await events.next, 1); |
| 271 var rest = events.rest; |
| 272 var events2 = new StreamQueue(rest); |
| 273 expect(await events2.next, 2); |
| 274 expect(events2.next, throwsA("To err is divine!")); |
| 275 expect(await events2.next, 4); |
| 276 }); |
| 277 |
| 278 test("closes the events, prevents other operations", () async { |
| 279 var events = new StreamQueue<int>(createStream()); |
| 280 var stream = events.rest; |
| 281 expect(() => events.next, throwsStateError); |
| 282 expect(() => events.skip(1), throwsStateError); |
| 283 expect(() => events.take(1), throwsStateError); |
| 284 expect(() => events.rest, throwsStateError); |
| 285 expect(() => events.cancel(), throwsStateError); |
| 286 expect(stream.toList(), completion([1, 2, 3, 4])); |
| 287 }); |
| 288 |
| 289 test("forwards to underlying stream", () async { |
| 290 var cancel = new Completer(); |
| 291 var controller = new StreamController(onCancel: () => cancel.future); |
| 292 var events = new StreamQueue<int>(controller.stream); |
| 293 expect(controller.hasListener, isFalse); |
| 294 var next = events.next; |
| 295 expect(controller.hasListener, isTrue); |
| 296 expect(controller.isPaused, isFalse); |
| 297 |
| 298 controller.add(1); |
| 299 expect(await next, 1); |
| 300 expect(controller.isPaused, isTrue); |
| 301 |
| 302 var rest = events.rest; |
| 303 var subscription = rest.listen(null); |
| 304 expect(controller.hasListener, isTrue); |
| 305 expect(controller.isPaused, isFalse); |
| 306 |
| 307 var lastEvent; |
| 308 subscription.onData((value) => lastEvent = value); |
| 309 |
| 310 controller.add(2); |
| 311 |
| 312 await flushMicrotasks(); |
| 313 expect(lastEvent, 2); |
| 314 expect(controller.hasListener, isTrue); |
| 315 expect(controller.isPaused, isFalse); |
| 316 |
| 317 subscription.pause(); |
| 318 expect(controller.isPaused, isTrue); |
| 319 |
| 320 controller.add(3); |
| 321 |
| 322 await flushMicrotasks(); |
| 323 expect(lastEvent, 2); |
| 324 subscription.resume(); |
| 325 |
| 326 await flushMicrotasks(); |
| 327 expect(lastEvent, 3); |
| 328 |
| 329 var cancelFuture = subscription.cancel(); |
| 330 expect(controller.hasListener, isFalse); |
| 331 cancel.complete(42); |
| 332 expect(cancelFuture, completion(42)); |
| 333 }); |
| 334 }); |
| 335 |
| 336 group("cancel operation", () { |
| 337 test("closes the events, prevents any other operation", () async { |
| 338 var events = new StreamQueue<int>(createStream()); |
| 339 await events.cancel(); |
| 340 expect(() => events.next, throwsStateError); |
| 341 expect(() => events.skip(1), throwsStateError); |
| 342 expect(() => events.take(1), throwsStateError); |
| 343 expect(() => events.rest, throwsStateError); |
| 344 expect(() => events.cancel(), throwsStateError); |
| 345 }); |
| 346 |
| 347 test("cancels underlying subscription when called before any event", |
| 348 () async { |
| 349 var cancelFuture = new Future.value(42); |
| 350 var controller = new StreamController(onCancel: () => cancelFuture); |
| 351 var events = new StreamQueue<int>(controller.stream); |
| 352 expect(await events.cancel(), 42); |
| 353 }); |
| 354 |
| 355 test("cancels underlying subscription, returns result", () async { |
| 356 var cancelFuture = new Future.value(42); |
| 357 var controller = new StreamController(onCancel: () => cancelFuture); |
| 358 var events = new StreamQueue<int>(controller.stream); |
| 359 controller.add(1); |
| 360 expect(await events.next, 1); |
| 361 expect(await events.cancel(), 42); |
| 362 }); |
| 363 |
| 364 group("with immediate: true", () { |
| 365 test("closes the events, prevents any other operation", () async { |
| 366 var events = new StreamQueue<int>(createStream()); |
| 367 await events.cancel(immediate: true); |
| 368 expect(() => events.next, throwsStateError); |
| 369 expect(() => events.skip(1), throwsStateError); |
| 370 expect(() => events.take(1), throwsStateError); |
| 371 expect(() => events.rest, throwsStateError); |
| 372 expect(() => events.cancel(), throwsStateError); |
| 373 }); |
| 374 |
| 375 test("cancels the underlying subscription immediately", () async { |
| 376 var controller = new StreamController(); |
| 377 controller.add(1); |
| 378 |
| 379 var events = new StreamQueue<int>(controller.stream); |
| 380 expect(await events.next, 1); |
| 381 expect(controller.hasListener, isTrue); |
| 382 |
| 383 events.cancel(immediate: true); |
| 384 await expect(controller.hasListener, isFalse); |
| 385 }); |
| 386 |
| 387 test("cancels the underlying subscription when called before any event", |
| 388 () async { |
| 389 var cancelFuture = new Future.value(42); |
| 390 var controller = new StreamController(onCancel: () => cancelFuture); |
| 391 |
| 392 var events = new StreamQueue<int>(controller.stream); |
| 393 expect(await events.cancel(immediate: true), 42); |
| 394 }); |
| 395 |
| 396 test("closes pending requests", () async { |
| 397 var events = new StreamQueue<int>(createStream()); |
| 398 expect(await events.next, 1); |
| 399 expect(events.next, throwsStateError); |
| 400 expect(events.hasNext, completion(isFalse)); |
| 401 |
| 402 await events.cancel(immediate: true); |
| 403 }); |
| 404 |
| 405 test("returns the result of closing the underlying subscription", |
| 406 () async { |
| 407 var controller = new StreamController( |
| 408 onCancel: () => new Future.value(42)); |
| 409 var events = new StreamQueue<int>(controller.stream); |
| 410 expect(await events.cancel(immediate: true), 42); |
| 411 }); |
| 412 |
| 413 test("listens and then cancels a stream that hasn't been listened to yet", |
| 414 () async { |
| 415 var wasListened = false; |
| 416 var controller = new StreamController( |
| 417 onListen: () => wasListened = true); |
| 418 var events = new StreamQueue<int>(controller.stream); |
| 419 expect(wasListened, isFalse); |
| 420 expect(controller.hasListener, isFalse); |
| 421 |
| 422 await events.cancel(immediate: true); |
| 423 expect(wasListened, isTrue); |
| 424 expect(controller.hasListener, isFalse); |
| 425 }); |
| 426 }); |
| 427 }); |
| 428 |
| 429 group("hasNext operation", () { |
| 430 test("true at start", () async { |
| 431 var events = new StreamQueue<int>(createStream()); |
| 432 expect(await events.hasNext, isTrue); |
| 433 }); |
| 434 |
| 435 test("true after start", () async { |
| 436 var events = new StreamQueue<int>(createStream()); |
| 437 expect(await events.next, 1); |
| 438 expect(await events.hasNext, isTrue); |
| 439 }); |
| 440 |
| 441 test("true at end", () async { |
| 442 var events = new StreamQueue<int>(createStream()); |
| 443 for (int i = 1; i <= 4; i++) { |
| 444 expect(await events.next, i); |
| 445 } |
| 446 expect(await events.hasNext, isFalse); |
| 447 }); |
| 448 |
| 449 test("true when enqueued", () async { |
| 450 var events = new StreamQueue<int>(createStream()); |
| 451 var values = []; |
| 452 for (int i = 1; i <= 3; i++) { |
| 453 events.next.then(values.add); |
| 454 } |
| 455 expect(values, isEmpty); |
| 456 expect(await events.hasNext, isTrue); |
| 457 expect(values, [1, 2, 3]); |
| 458 }); |
| 459 |
| 460 test("false when enqueued", () async { |
| 461 var events = new StreamQueue<int>(createStream()); |
| 462 var values = []; |
| 463 for (int i = 1; i <= 4; i++) { |
| 464 events.next.then(values.add); |
| 465 } |
| 466 expect(values, isEmpty); |
| 467 expect(await events.hasNext, isFalse); |
| 468 expect(values, [1, 2, 3, 4]); |
| 469 }); |
| 470 |
| 471 test("true when data event", () async { |
| 472 var controller = new StreamController(); |
| 473 var events = new StreamQueue<int>(controller.stream); |
| 474 |
| 475 var hasNext; |
| 476 events.hasNext.then((result) { hasNext = result; }); |
| 477 await flushMicrotasks(); |
| 478 expect(hasNext, isNull); |
| 479 controller.add(42); |
| 480 expect(hasNext, isNull); |
| 481 await flushMicrotasks(); |
| 482 expect(hasNext, isTrue); |
| 483 }); |
| 484 |
| 485 test("true when error event", () async { |
| 486 var controller = new StreamController(); |
| 487 var events = new StreamQueue<int>(controller.stream); |
| 488 |
| 489 var hasNext; |
| 490 events.hasNext.then((result) { hasNext = result; }); |
| 491 await flushMicrotasks(); |
| 492 expect(hasNext, isNull); |
| 493 controller.addError("BAD"); |
| 494 expect(hasNext, isNull); |
| 495 await flushMicrotasks(); |
| 496 expect(hasNext, isTrue); |
| 497 expect(events.next, throwsA("BAD")); |
| 498 }); |
| 499 |
| 500 test("- hasNext after hasNext", () async { |
| 501 var events = new StreamQueue<int>(createStream()); |
| 502 expect(await events.hasNext, true); |
| 503 expect(await events.hasNext, true); |
| 504 expect(await events.next, 1); |
| 505 expect(await events.hasNext, true); |
| 506 expect(await events.hasNext, true); |
| 507 expect(await events.next, 2); |
| 508 expect(await events.hasNext, true); |
| 509 expect(await events.hasNext, true); |
| 510 expect(await events.next, 3); |
| 511 expect(await events.hasNext, true); |
| 512 expect(await events.hasNext, true); |
| 513 expect(await events.next, 4); |
| 514 expect(await events.hasNext, false); |
| 515 expect(await events.hasNext, false); |
| 516 }); |
| 517 |
| 518 test("- next after true", () async { |
| 519 var events = new StreamQueue<int>(createStream()); |
| 520 expect(await events.next, 1); |
| 521 expect(await events.hasNext, true); |
| 522 expect(await events.next, 2); |
| 523 expect(await events.next, 3); |
| 524 }); |
| 525 |
| 526 test("- next after true, enqueued", () async { |
| 527 var events = new StreamQueue<int>(createStream()); |
| 528 var responses = []; |
| 529 events.next.then(responses.add); |
| 530 events.hasNext.then(responses.add); |
| 531 events.next.then(responses.add); |
| 532 do { |
| 533 await flushMicrotasks(); |
| 534 } while (responses.length < 3); |
| 535 expect(responses, [1, true, 2]); |
| 536 }); |
| 537 |
| 538 test("- skip 0 after true", () async { |
| 539 var events = new StreamQueue<int>(createStream()); |
| 540 expect(await events.next, 1); |
| 541 expect(await events.hasNext, true); |
| 542 expect(await events.skip(0), 0); |
| 543 expect(await events.next, 2); |
| 544 }); |
| 545 |
| 546 test("- skip 1 after true", () async { |
| 547 var events = new StreamQueue<int>(createStream()); |
| 548 expect(await events.next, 1); |
| 549 expect(await events.hasNext, true); |
| 550 expect(await events.skip(1), 0); |
| 551 expect(await events.next, 3); |
| 552 }); |
| 553 |
| 554 test("- skip 2 after true", () async { |
| 555 var events = new StreamQueue<int>(createStream()); |
| 556 expect(await events.next, 1); |
| 557 expect(await events.hasNext, true); |
| 558 expect(await events.skip(2), 0); |
| 559 expect(await events.next, 4); |
| 560 }); |
| 561 |
| 562 test("- take 0 after true", () async { |
| 563 var events = new StreamQueue<int>(createStream()); |
| 564 expect(await events.next, 1); |
| 565 expect(await events.hasNext, true); |
| 566 expect(await events.take(0), isEmpty); |
| 567 expect(await events.next, 2); |
| 568 }); |
| 569 |
| 570 test("- take 1 after true", () async { |
| 571 var events = new StreamQueue<int>(createStream()); |
| 572 expect(await events.next, 1); |
| 573 expect(await events.hasNext, true); |
| 574 expect(await events.take(1), [2]); |
| 575 expect(await events.next, 3); |
| 576 }); |
| 577 |
| 578 test("- take 2 after true", () async { |
| 579 var events = new StreamQueue<int>(createStream()); |
| 580 expect(await events.next, 1); |
| 581 expect(await events.hasNext, true); |
| 582 expect(await events.take(2), [2, 3]); |
| 583 expect(await events.next, 4); |
| 584 }); |
| 585 |
| 586 test("- rest after true", () async { |
| 587 var events = new StreamQueue<int>(createStream()); |
| 588 expect(await events.next, 1); |
| 589 expect(await events.hasNext, true); |
| 590 var stream = events.rest; |
| 591 expect(await stream.toList(), [2, 3, 4]); |
| 592 }); |
| 593 |
| 594 test("- rest after true, at last", () async { |
| 595 var events = new StreamQueue<int>(createStream()); |
| 596 expect(await events.next, 1); |
| 597 expect(await events.next, 2); |
| 598 expect(await events.next, 3); |
| 599 expect(await events.hasNext, true); |
| 600 var stream = events.rest; |
| 601 expect(await stream.toList(), [4]); |
| 602 }); |
| 603 |
| 604 test("- rest after false", () async { |
| 605 var events = new StreamQueue<int>(createStream()); |
| 606 expect(await events.next, 1); |
| 607 expect(await events.next, 2); |
| 608 expect(await events.next, 3); |
| 609 expect(await events.next, 4); |
| 610 expect(await events.hasNext, false); |
| 611 var stream = events.rest; |
| 612 expect(await stream.toList(), isEmpty); |
| 613 }); |
| 614 |
| 615 test("- cancel after true on data", () async { |
| 616 var events = new StreamQueue<int>(createStream()); |
| 617 expect(await events.next, 1); |
| 618 expect(await events.next, 2); |
| 619 expect(await events.hasNext, true); |
| 620 expect(await events.cancel(), null); |
| 621 }); |
| 622 |
| 623 test("- cancel after true on error", () async { |
| 624 var events = new StreamQueue<int>(createErrorStream()); |
| 625 expect(await events.next, 1); |
| 626 expect(await events.next, 2); |
| 627 expect(await events.hasNext, true); |
| 628 expect(await events.cancel(), null); |
| 629 }); |
| 630 }); |
| 631 |
| 632 test("all combinations sequential skip/next/take operations", () async { |
| 633 // Takes all combinations of two of next, skip and take, then ends with |
| 634 // doing rest. Each of the first rounds do 10 events of each type, |
| 635 // the rest does 20 elements. |
| 636 var eventCount = 20 * (3 * 3 + 1); |
| 637 var events = new StreamQueue<int>(createLongStream(eventCount)); |
| 638 |
| 639 // Test expecting [startIndex .. startIndex + 9] as events using |
| 640 // `next`. |
| 641 nextTest(startIndex) { |
| 642 for (int i = 0; i < 10; i++) { |
| 643 expect(events.next, completion(startIndex + i)); |
| 644 } |
| 645 } |
| 646 |
| 647 // Test expecting 10 events to be skipped. |
| 648 skipTest(startIndex) { |
| 649 expect(events.skip(10), completion(0)); |
| 650 } |
| 651 |
| 652 // Test expecting [startIndex .. startIndex + 9] as events using |
| 653 // `take(10)`. |
| 654 takeTest(startIndex) { |
| 655 expect(events.take(10), |
| 656 completion(new List.generate(10, (i) => startIndex + i))); |
| 657 } |
| 658 var tests = [nextTest, skipTest, takeTest]; |
| 659 |
| 660 int counter = 0; |
| 661 // Run through all pairs of two tests and run them. |
| 662 for (int i = 0; i < tests.length; i++) { |
| 663 for (int j = 0; j < tests.length; j++) { |
| 664 tests[i](counter); |
| 665 tests[j](counter + 10); |
| 666 counter += 20; |
| 667 } |
| 668 } |
| 669 // Then expect 20 more events as a `rest` call. |
| 670 expect(events.rest.toList(), |
| 671 completion(new List.generate(20, (i) => counter + i))); |
| 672 }); |
| 673 } |
| 674 |
| 675 Stream<int> createStream() async* { |
| 676 yield 1; |
| 677 await flushMicrotasks(); |
| 678 yield 2; |
| 679 await flushMicrotasks(); |
| 680 yield 3; |
| 681 await flushMicrotasks(); |
| 682 yield 4; |
| 683 } |
| 684 |
| 685 Stream<int> createErrorStream() { |
| 686 StreamController controller = new StreamController<int>(); |
| 687 () async { |
| 688 controller.add(1); |
| 689 await flushMicrotasks(); |
| 690 controller.add(2); |
| 691 await flushMicrotasks(); |
| 692 controller.addError("To err is divine!"); |
| 693 await flushMicrotasks(); |
| 694 controller.add(4); |
| 695 await flushMicrotasks(); |
| 696 controller.close(); |
| 697 }(); |
| 698 return controller.stream; |
| 699 } |
| 700 |
| 701 Stream<int> createLongStream(int eventCount) async* { |
| 702 for (int i = 0; i < eventCount; i++) yield i; |
| 703 } |
OLD | NEW |