OLD | NEW |
(Empty) | |
| 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE filevents. |
| 4 |
| 5 // TODO(nweiz): Get rid of this when https://codereview.chromium.org/1241723003/ |
| 6 // lands. |
| 7 |
| 8 import "dart:async"; |
| 9 |
| 10 import "package:test/src/util/stream_queue.dart"; |
| 11 import "package:test/test.dart"; |
| 12 |
| 13 main() { |
| 14 group("source stream", () { |
| 15 test("is listened to on first request, paused between requests", () async { |
| 16 var controller = new StreamController(); |
| 17 var events = new StreamQueue<int>(controller.stream); |
| 18 await flushMicrotasks(); |
| 19 expect(controller.hasListener, isFalse); |
| 20 |
| 21 var next = events.next; |
| 22 expect(controller.hasListener, isTrue); |
| 23 expect(controller.isPaused, isFalse); |
| 24 |
| 25 controller.add(1); |
| 26 |
| 27 expect(await next, 1); |
| 28 expect(controller.hasListener, isTrue); |
| 29 expect(controller.isPaused, isTrue); |
| 30 |
| 31 next = events.next; |
| 32 expect(controller.hasListener, isTrue); |
| 33 expect(controller.isPaused, isFalse); |
| 34 |
| 35 controller.add(2); |
| 36 |
| 37 expect(await next, 2); |
| 38 expect(controller.hasListener, isTrue); |
| 39 expect(controller.isPaused, isTrue); |
| 40 |
| 41 events.cancel(); |
| 42 expect(controller.hasListener, isFalse); |
| 43 }); |
| 44 }); |
| 45 |
| 46 group("next operation", () { |
| 47 test("simple sequence of requests", () async { |
| 48 var events = new StreamQueue<int>(createStream()); |
| 49 for (int i = 1; i <= 4; i++) { |
| 50 expect(await events.next, i); |
| 51 } |
| 52 expect(events.next, throwsStateError); |
| 53 }); |
| 54 |
| 55 test("multiple requests at the same time", () async { |
| 56 var events = new StreamQueue<int>(createStream()); |
| 57 var result = await Future.wait( |
| 58 [events.next, events.next, events.next, events.next]); |
| 59 expect(result, [1, 2, 3, 4]); |
| 60 await events.cancel(); |
| 61 }); |
| 62 |
| 63 test("sequence of requests with error", () async { |
| 64 var events = new StreamQueue<int>(createErrorStream()); |
| 65 expect(await events.next, 1); |
| 66 expect(await events.next, 2); |
| 67 expect(events.next, throwsA("To err is divine!")); |
| 68 expect(await events.next, 4); |
| 69 await events.cancel(); |
| 70 }); |
| 71 }); |
| 72 |
| 73 group("skip operation", () { |
| 74 test("of two elements in the middle of sequence", () async { |
| 75 var events = new StreamQueue<int>(createStream()); |
| 76 expect(await events.next, 1); |
| 77 expect(await events.skip(2), 0); |
| 78 expect(await events.next, 4); |
| 79 await events.cancel(); |
| 80 }); |
| 81 |
| 82 test("with negative/bad arguments throws", () async { |
| 83 var events = new StreamQueue<int>(createStream()); |
| 84 expect(() => events.skip(-1), throwsArgumentError); |
| 85 // A non-int throws either a type error or an argument error, |
| 86 // depending on whether it's checked mode or not. |
| 87 expect(await events.next, 1); // Did not consume event. |
| 88 expect(() => events.skip(-1), throwsArgumentError); |
| 89 expect(await events.next, 2); // Did not consume event. |
| 90 await events.cancel(); |
| 91 }); |
| 92 |
| 93 test("of 0 elements works", () async { |
| 94 var events = new StreamQueue<int>(createStream()); |
| 95 expect(events.skip(0), completion(0)); |
| 96 expect(events.next, completion(1)); |
| 97 expect(events.skip(0), completion(0)); |
| 98 expect(events.next, completion(2)); |
| 99 expect(events.skip(0), completion(0)); |
| 100 expect(events.next, completion(3)); |
| 101 expect(events.skip(0), completion(0)); |
| 102 expect(events.next, completion(4)); |
| 103 expect(events.skip(0), completion(0)); |
| 104 expect(events.skip(5), completion(5)); |
| 105 expect(events.next, throwsStateError); |
| 106 await events.cancel(); |
| 107 }); |
| 108 |
| 109 test("of too many events ends at stream start", () async { |
| 110 var events = new StreamQueue<int>(createStream()); |
| 111 expect(await events.skip(6), 2); |
| 112 await events.cancel(); |
| 113 }); |
| 114 |
| 115 test("of too many events after some events", () async { |
| 116 var events = new StreamQueue<int>(createStream()); |
| 117 expect(await events.next, 1); |
| 118 expect(await events.next, 2); |
| 119 expect(await events.skip(6), 4); |
| 120 await events.cancel(); |
| 121 }); |
| 122 |
| 123 test("of too many events ends at stream end", () async { |
| 124 var events = new StreamQueue<int>(createStream()); |
| 125 expect(await events.next, 1); |
| 126 expect(await events.next, 2); |
| 127 expect(await events.next, 3); |
| 128 expect(await events.next, 4); |
| 129 expect(await events.skip(2), 2); |
| 130 await events.cancel(); |
| 131 }); |
| 132 |
| 133 test("of events with error", () async { |
| 134 var events = new StreamQueue<int>(createErrorStream()); |
| 135 expect(events.skip(4), throwsA("To err is divine!")); |
| 136 expect(await events.next, 4); |
| 137 await events.cancel(); |
| 138 }); |
| 139 |
| 140 test("of events with error, and skip again after", () async { |
| 141 var events = new StreamQueue<int>(createErrorStream()); |
| 142 expect(events.skip(4), throwsA("To err is divine!")); |
| 143 expect(events.skip(2), completion(1)); |
| 144 await events.cancel(); |
| 145 }); |
| 146 test("multiple skips at same time complete in order.", () async { |
| 147 var events = new StreamQueue<int>(createStream()); |
| 148 var skip1 = events.skip(1); |
| 149 var skip2 = events.skip(0); |
| 150 var skip3 = events.skip(4); |
| 151 var skip4 = events.skip(1); |
| 152 var index = 0; |
| 153 // Check that futures complete in order. |
| 154 sequence(expectedValue, sequenceIndex) => (value) { |
| 155 expect(value, expectedValue); |
| 156 expect(index, sequenceIndex); |
| 157 index++; |
| 158 }; |
| 159 await Future.wait([skip1.then(sequence(0, 0)), |
| 160 skip2.then(sequence(0, 1)), |
| 161 skip3.then(sequence(1, 2)), |
| 162 skip4.then(sequence(1, 3))]); |
| 163 await events.cancel(); |
| 164 }); |
| 165 }); |
| 166 |
| 167 group("take operation", () { |
| 168 test("as simple take of events", () async { |
| 169 var events = new StreamQueue<int>(createStream()); |
| 170 expect(await events.next, 1); |
| 171 expect(await events.take(2), [2, 3]); |
| 172 expect(await events.next, 4); |
| 173 await events.cancel(); |
| 174 }); |
| 175 |
| 176 test("of 0 events", () async { |
| 177 var events = new StreamQueue<int>(createStream()); |
| 178 expect(events.take(0), completion([])); |
| 179 expect(events.next, completion(1)); |
| 180 expect(events.take(0), completion([])); |
| 181 expect(events.next, completion(2)); |
| 182 expect(events.take(0), completion([])); |
| 183 expect(events.next, completion(3)); |
| 184 expect(events.take(0), completion([])); |
| 185 expect(events.next, completion(4)); |
| 186 expect(events.take(0), completion([])); |
| 187 expect(events.take(5), completion([])); |
| 188 expect(events.next, throwsStateError); |
| 189 await events.cancel(); |
| 190 }); |
| 191 |
| 192 test("with bad arguments throws", () async { |
| 193 var events = new StreamQueue<int>(createStream()); |
| 194 expect(() => events.take(-1), throwsArgumentError); |
| 195 expect(await events.next, 1); // Did not consume event. |
| 196 expect(() => events.take(-1), throwsArgumentError); |
| 197 expect(await events.next, 2); // Did not consume event. |
| 198 await events.cancel(); |
| 199 }); |
| 200 |
| 201 test("of too many arguments", () async { |
| 202 var events = new StreamQueue<int>(createStream()); |
| 203 expect(await events.take(6), [1, 2, 3, 4]); |
| 204 await events.cancel(); |
| 205 }); |
| 206 |
| 207 test("too large later", () async { |
| 208 var events = new StreamQueue<int>(createStream()); |
| 209 expect(await events.next, 1); |
| 210 expect(await events.next, 2); |
| 211 expect(await events.take(6), [3, 4]); |
| 212 await events.cancel(); |
| 213 }); |
| 214 |
| 215 test("error", () async { |
| 216 var events = new StreamQueue<int>(createErrorStream()); |
| 217 expect(events.take(4), throwsA("To err is divine!")); |
| 218 expect(await events.next, 4); |
| 219 await events.cancel(); |
| 220 }); |
| 221 }); |
| 222 |
| 223 group("rest operation", () { |
| 224 test("after single next", () async { |
| 225 var events = new StreamQueue<int>(createStream()); |
| 226 expect(await events.next, 1); |
| 227 expect(await events.rest.toList(), [2, 3, 4]); |
| 228 }); |
| 229 |
| 230 test("at start", () async { |
| 231 var events = new StreamQueue<int>(createStream()); |
| 232 expect(await events.rest.toList(), [1, 2, 3, 4]); |
| 233 }); |
| 234 |
| 235 test("at end", () async { |
| 236 var events = new StreamQueue<int>(createStream()); |
| 237 expect(await events.next, 1); |
| 238 expect(await events.next, 2); |
| 239 expect(await events.next, 3); |
| 240 expect(await events.next, 4); |
| 241 expect(await events.rest.toList(), isEmpty); |
| 242 }); |
| 243 |
| 244 test("after end", () async { |
| 245 var events = new StreamQueue<int>(createStream()); |
| 246 expect(await events.next, 1); |
| 247 expect(await events.next, 2); |
| 248 expect(await events.next, 3); |
| 249 expect(await events.next, 4); |
| 250 expect(events.next, throwsStateError); |
| 251 expect(await events.rest.toList(), isEmpty); |
| 252 }); |
| 253 |
| 254 test("after receiving done requested before", () async { |
| 255 var events = new StreamQueue<int>(createStream()); |
| 256 var next1 = events.next; |
| 257 var next2 = events.next; |
| 258 var next3 = events.next; |
| 259 var rest = events.rest; |
| 260 for (int i = 0; i < 10; i++) { |
| 261 await flushMicrotasks(); |
| 262 } |
| 263 expect(await next1, 1); |
| 264 expect(await next2, 2); |
| 265 expect(await next3, 3); |
| 266 expect(await rest.toList(), [4]); |
| 267 }); |
| 268 |
| 269 test("with an error event error", () async { |
| 270 var events = new StreamQueue<int>(createErrorStream()); |
| 271 expect(await events.next, 1); |
| 272 var rest = events.rest; |
| 273 var events2 = new StreamQueue(rest); |
| 274 expect(await events2.next, 2); |
| 275 expect(events2.next, throwsA("To err is divine!")); |
| 276 expect(await events2.next, 4); |
| 277 }); |
| 278 |
| 279 test("closes the events, prevents other operations", () async { |
| 280 var events = new StreamQueue<int>(createStream()); |
| 281 var stream = events.rest; |
| 282 expect(() => events.next, throwsStateError); |
| 283 expect(() => events.skip(1), throwsStateError); |
| 284 expect(() => events.take(1), throwsStateError); |
| 285 expect(() => events.rest, throwsStateError); |
| 286 expect(() => events.cancel(), throwsStateError); |
| 287 expect(stream.toList(), completion([1, 2, 3, 4])); |
| 288 }); |
| 289 |
| 290 test("forwards to underlying stream", () async { |
| 291 var cancel = new Completer(); |
| 292 var controller = new StreamController(onCancel: () => cancel.future); |
| 293 var events = new StreamQueue<int>(controller.stream); |
| 294 expect(controller.hasListener, isFalse); |
| 295 var next = events.next; |
| 296 expect(controller.hasListener, isTrue); |
| 297 expect(controller.isPaused, isFalse); |
| 298 |
| 299 controller.add(1); |
| 300 expect(await next, 1); |
| 301 expect(controller.isPaused, isTrue); |
| 302 |
| 303 var rest = events.rest; |
| 304 var subscription = rest.listen(null); |
| 305 expect(controller.hasListener, isTrue); |
| 306 expect(controller.isPaused, isFalse); |
| 307 |
| 308 var lastEvent; |
| 309 subscription.onData((value) => lastEvent = value); |
| 310 |
| 311 controller.add(2); |
| 312 |
| 313 await flushMicrotasks(); |
| 314 expect(lastEvent, 2); |
| 315 expect(controller.hasListener, isTrue); |
| 316 expect(controller.isPaused, isFalse); |
| 317 |
| 318 subscription.pause(); |
| 319 expect(controller.isPaused, isTrue); |
| 320 |
| 321 controller.add(3); |
| 322 |
| 323 await flushMicrotasks(); |
| 324 expect(lastEvent, 2); |
| 325 subscription.resume(); |
| 326 |
| 327 await flushMicrotasks(); |
| 328 expect(lastEvent, 3); |
| 329 |
| 330 var cancelFuture = subscription.cancel(); |
| 331 expect(controller.hasListener, isFalse); |
| 332 cancel.complete(42); |
| 333 expect(cancelFuture, completion(42)); |
| 334 }); |
| 335 }); |
| 336 |
| 337 group("cancel operation", () { |
| 338 test("closes the events, prevents any other operation", () async { |
| 339 var events = new StreamQueue<int>(createStream()); |
| 340 await events.cancel(); |
| 341 expect(() => events.next, throwsStateError); |
| 342 expect(() => events.skip(1), throwsStateError); |
| 343 expect(() => events.take(1), throwsStateError); |
| 344 expect(() => events.rest, throwsStateError); |
| 345 expect(() => events.cancel(), throwsStateError); |
| 346 }); |
| 347 |
| 348 test("cancels underlying subscription when called before any event", |
| 349 () async { |
| 350 var cancelFuture = new Future.value(42); |
| 351 var controller = new StreamController(onCancel: () => cancelFuture); |
| 352 var events = new StreamQueue<int>(controller.stream); |
| 353 expect(await events.cancel(), 42); |
| 354 }); |
| 355 |
| 356 test("cancels underlying subscription, returns result", () async { |
| 357 var cancelFuture = new Future.value(42); |
| 358 var controller = new StreamController(onCancel: () => cancelFuture); |
| 359 var events = new StreamQueue<int>(controller.stream); |
| 360 controller.add(1); |
| 361 expect(await events.next, 1); |
| 362 expect(await events.cancel(), 42); |
| 363 }); |
| 364 |
| 365 group("with immediate: true", () { |
| 366 test("closes the events, prevents any other operation", () async { |
| 367 var events = new StreamQueue<int>(createStream()); |
| 368 await events.cancel(immediate: true); |
| 369 expect(() => events.next, throwsStateError); |
| 370 expect(() => events.skip(1), throwsStateError); |
| 371 expect(() => events.take(1), throwsStateError); |
| 372 expect(() => events.rest, throwsStateError); |
| 373 expect(() => events.cancel(), throwsStateError); |
| 374 }); |
| 375 |
| 376 test("cancels the underlying subscription immediately", () async { |
| 377 var controller = new StreamController(); |
| 378 controller.add(1); |
| 379 |
| 380 var events = new StreamQueue<int>(controller.stream); |
| 381 expect(await events.next, 1); |
| 382 expect(controller.hasListener, isTrue); |
| 383 |
| 384 events.cancel(immediate: true); |
| 385 await expect(controller.hasListener, isFalse); |
| 386 }); |
| 387 |
| 388 test("cancels the underlying subscription when called before any event", |
| 389 () async { |
| 390 var cancelFuture = new Future.value(42); |
| 391 var controller = new StreamController(onCancel: () => cancelFuture); |
| 392 |
| 393 var events = new StreamQueue<int>(controller.stream); |
| 394 expect(await events.cancel(immediate: true), 42); |
| 395 }); |
| 396 |
| 397 test("closes pending requests", () async { |
| 398 var events = new StreamQueue<int>(createStream()); |
| 399 expect(await events.next, 1); |
| 400 expect(events.next, throwsStateError); |
| 401 expect(events.hasNext, completion(isFalse)); |
| 402 |
| 403 await events.cancel(immediate: true); |
| 404 }); |
| 405 |
| 406 test("returns the result of closing the underlying subscription", |
| 407 () async { |
| 408 var controller = new StreamController( |
| 409 onCancel: () => new Future.value(42)); |
| 410 var events = new StreamQueue<int>(controller.stream); |
| 411 expect(await events.cancel(immediate: true), 42); |
| 412 }); |
| 413 |
| 414 test("listens and then cancels a stream that hasn't been listened to yet", |
| 415 () async { |
| 416 var wasListened = false; |
| 417 var controller = new StreamController( |
| 418 onListen: () => wasListened = true); |
| 419 var events = new StreamQueue<int>(controller.stream); |
| 420 expect(wasListened, isFalse); |
| 421 expect(controller.hasListener, isFalse); |
| 422 |
| 423 await events.cancel(immediate: true); |
| 424 expect(wasListened, isTrue); |
| 425 expect(controller.hasListener, isFalse); |
| 426 }); |
| 427 }); |
| 428 }); |
| 429 |
| 430 group("hasNext operation", () { |
| 431 test("true at start", () async { |
| 432 var events = new StreamQueue<int>(createStream()); |
| 433 expect(await events.hasNext, isTrue); |
| 434 }); |
| 435 |
| 436 test("true after start", () async { |
| 437 var events = new StreamQueue<int>(createStream()); |
| 438 expect(await events.next, 1); |
| 439 expect(await events.hasNext, isTrue); |
| 440 }); |
| 441 |
| 442 test("true at end", () async { |
| 443 var events = new StreamQueue<int>(createStream()); |
| 444 for (int i = 1; i <= 4; i++) { |
| 445 expect(await events.next, i); |
| 446 } |
| 447 expect(await events.hasNext, isFalse); |
| 448 }); |
| 449 |
| 450 test("true when enqueued", () async { |
| 451 var events = new StreamQueue<int>(createStream()); |
| 452 var values = []; |
| 453 for (int i = 1; i <= 3; i++) { |
| 454 events.next.then(values.add); |
| 455 } |
| 456 expect(values, isEmpty); |
| 457 expect(await events.hasNext, isTrue); |
| 458 expect(values, [1, 2, 3]); |
| 459 }); |
| 460 |
| 461 test("false when enqueued", () async { |
| 462 var events = new StreamQueue<int>(createStream()); |
| 463 var values = []; |
| 464 for (int i = 1; i <= 4; i++) { |
| 465 events.next.then(values.add); |
| 466 } |
| 467 expect(values, isEmpty); |
| 468 expect(await events.hasNext, isFalse); |
| 469 expect(values, [1, 2, 3, 4]); |
| 470 }); |
| 471 |
| 472 test("true when data event", () async { |
| 473 var controller = new StreamController(); |
| 474 var events = new StreamQueue<int>(controller.stream); |
| 475 |
| 476 var hasNext; |
| 477 events.hasNext.then((result) { hasNext = result; }); |
| 478 await flushMicrotasks(); |
| 479 expect(hasNext, isNull); |
| 480 controller.add(42); |
| 481 expect(hasNext, isNull); |
| 482 await flushMicrotasks(); |
| 483 expect(hasNext, isTrue); |
| 484 }); |
| 485 |
| 486 test("true when error event", () async { |
| 487 var controller = new StreamController(); |
| 488 var events = new StreamQueue<int>(controller.stream); |
| 489 |
| 490 var hasNext; |
| 491 events.hasNext.then((result) { hasNext = result; }); |
| 492 await flushMicrotasks(); |
| 493 expect(hasNext, isNull); |
| 494 controller.addError("BAD"); |
| 495 expect(hasNext, isNull); |
| 496 await flushMicrotasks(); |
| 497 expect(hasNext, isTrue); |
| 498 expect(events.next, throwsA("BAD")); |
| 499 }); |
| 500 |
| 501 test("- hasNext after hasNext", () async { |
| 502 var events = new StreamQueue<int>(createStream()); |
| 503 expect(await events.hasNext, true); |
| 504 expect(await events.hasNext, true); |
| 505 expect(await events.next, 1); |
| 506 expect(await events.hasNext, true); |
| 507 expect(await events.hasNext, true); |
| 508 expect(await events.next, 2); |
| 509 expect(await events.hasNext, true); |
| 510 expect(await events.hasNext, true); |
| 511 expect(await events.next, 3); |
| 512 expect(await events.hasNext, true); |
| 513 expect(await events.hasNext, true); |
| 514 expect(await events.next, 4); |
| 515 expect(await events.hasNext, false); |
| 516 expect(await events.hasNext, false); |
| 517 }); |
| 518 |
| 519 test("- next after true", () async { |
| 520 var events = new StreamQueue<int>(createStream()); |
| 521 expect(await events.next, 1); |
| 522 expect(await events.hasNext, true); |
| 523 expect(await events.next, 2); |
| 524 expect(await events.next, 3); |
| 525 }); |
| 526 |
| 527 test("- next after true, enqueued", () async { |
| 528 var events = new StreamQueue<int>(createStream()); |
| 529 var responses = []; |
| 530 events.next.then(responses.add); |
| 531 events.hasNext.then(responses.add); |
| 532 events.next.then(responses.add); |
| 533 do { |
| 534 await flushMicrotasks(); |
| 535 } while (responses.length < 3); |
| 536 expect(responses, [1, true, 2]); |
| 537 }); |
| 538 |
| 539 test("- skip 0 after true", () async { |
| 540 var events = new StreamQueue<int>(createStream()); |
| 541 expect(await events.next, 1); |
| 542 expect(await events.hasNext, true); |
| 543 expect(await events.skip(0), 0); |
| 544 expect(await events.next, 2); |
| 545 }); |
| 546 |
| 547 test("- skip 1 after true", () async { |
| 548 var events = new StreamQueue<int>(createStream()); |
| 549 expect(await events.next, 1); |
| 550 expect(await events.hasNext, true); |
| 551 expect(await events.skip(1), 0); |
| 552 expect(await events.next, 3); |
| 553 }); |
| 554 |
| 555 test("- skip 2 after true", () async { |
| 556 var events = new StreamQueue<int>(createStream()); |
| 557 expect(await events.next, 1); |
| 558 expect(await events.hasNext, true); |
| 559 expect(await events.skip(2), 0); |
| 560 expect(await events.next, 4); |
| 561 }); |
| 562 |
| 563 test("- take 0 after true", () async { |
| 564 var events = new StreamQueue<int>(createStream()); |
| 565 expect(await events.next, 1); |
| 566 expect(await events.hasNext, true); |
| 567 expect(await events.take(0), isEmpty); |
| 568 expect(await events.next, 2); |
| 569 }); |
| 570 |
| 571 test("- take 1 after true", () async { |
| 572 var events = new StreamQueue<int>(createStream()); |
| 573 expect(await events.next, 1); |
| 574 expect(await events.hasNext, true); |
| 575 expect(await events.take(1), [2]); |
| 576 expect(await events.next, 3); |
| 577 }); |
| 578 |
| 579 test("- take 2 after true", () async { |
| 580 var events = new StreamQueue<int>(createStream()); |
| 581 expect(await events.next, 1); |
| 582 expect(await events.hasNext, true); |
| 583 expect(await events.take(2), [2, 3]); |
| 584 expect(await events.next, 4); |
| 585 }); |
| 586 |
| 587 test("- rest after true", () async { |
| 588 var events = new StreamQueue<int>(createStream()); |
| 589 expect(await events.next, 1); |
| 590 expect(await events.hasNext, true); |
| 591 var stream = events.rest; |
| 592 expect(await stream.toList(), [2, 3, 4]); |
| 593 }); |
| 594 |
| 595 test("- rest after true, at last", () async { |
| 596 var events = new StreamQueue<int>(createStream()); |
| 597 expect(await events.next, 1); |
| 598 expect(await events.next, 2); |
| 599 expect(await events.next, 3); |
| 600 expect(await events.hasNext, true); |
| 601 var stream = events.rest; |
| 602 expect(await stream.toList(), [4]); |
| 603 }); |
| 604 |
| 605 test("- rest after false", () async { |
| 606 var events = new StreamQueue<int>(createStream()); |
| 607 expect(await events.next, 1); |
| 608 expect(await events.next, 2); |
| 609 expect(await events.next, 3); |
| 610 expect(await events.next, 4); |
| 611 expect(await events.hasNext, false); |
| 612 var stream = events.rest; |
| 613 expect(await stream.toList(), isEmpty); |
| 614 }); |
| 615 |
| 616 test("- cancel after true on data", () async { |
| 617 var events = new StreamQueue<int>(createStream()); |
| 618 expect(await events.next, 1); |
| 619 expect(await events.next, 2); |
| 620 expect(await events.hasNext, true); |
| 621 expect(await events.cancel(), null); |
| 622 }); |
| 623 |
| 624 test("- cancel after true on error", () async { |
| 625 var events = new StreamQueue<int>(createErrorStream()); |
| 626 expect(await events.next, 1); |
| 627 expect(await events.next, 2); |
| 628 expect(await events.hasNext, true); |
| 629 expect(await events.cancel(), null); |
| 630 }); |
| 631 }); |
| 632 |
| 633 group("fork operation", () { |
| 634 test("produces a stream queue with the same events", () async { |
| 635 var queue1 = new StreamQueue<int>(createStream()); |
| 636 var queue2 = queue1.fork(); |
| 637 |
| 638 expect(await queue1.next, 1); |
| 639 expect(await queue1.next, 2); |
| 640 expect(await queue1.next, 3); |
| 641 expect(await queue1.next, 4); |
| 642 expect(await queue1.hasNext, isFalse); |
| 643 |
| 644 expect(await queue2.next, 1); |
| 645 expect(await queue2.next, 2); |
| 646 expect(await queue2.next, 3); |
| 647 expect(await queue2.next, 4); |
| 648 expect(await queue2.hasNext, isFalse); |
| 649 }); |
| 650 |
| 651 test("produces a stream queue with the same errors", () async { |
| 652 var queue1 = new StreamQueue<int>(createErrorStream()); |
| 653 var queue2 = queue1.fork(); |
| 654 |
| 655 expect(await queue1.next, 1); |
| 656 expect(await queue1.next, 2); |
| 657 expect(queue1.next, throwsA("To err is divine!")); |
| 658 expect(await queue1.next, 4); |
| 659 expect(await queue1.hasNext, isFalse); |
| 660 |
| 661 expect(await queue2.next, 1); |
| 662 expect(await queue2.next, 2); |
| 663 expect(queue2.next, throwsA("To err is divine!")); |
| 664 expect(await queue2.next, 4); |
| 665 expect(await queue2.hasNext, isFalse); |
| 666 }); |
| 667 |
| 668 test("forks at the current point in the source queue", () { |
| 669 var queue1 = new StreamQueue<int>(createStream()); |
| 670 |
| 671 expect(queue1.next, completion(1)); |
| 672 expect(queue1.next, completion(2)); |
| 673 |
| 674 var queue2 = queue1.fork(); |
| 675 |
| 676 expect(queue1.next, completion(3)); |
| 677 expect(queue1.next, completion(4)); |
| 678 expect(queue1.hasNext, completion(isFalse)); |
| 679 |
| 680 expect(queue2.next, completion(3)); |
| 681 expect(queue2.next, completion(4)); |
| 682 expect(queue2.hasNext, completion(isFalse)); |
| 683 }); |
| 684 |
| 685 test("can be created after there are pending values", () async { |
| 686 var queue1 = new StreamQueue<int>(createStream()); |
| 687 await flushMicrotasks(); |
| 688 |
| 689 var queue2 = queue1.fork(); |
| 690 expect(await queue2.next, 1); |
| 691 expect(await queue2.next, 2); |
| 692 expect(await queue2.next, 3); |
| 693 expect(await queue2.next, 4); |
| 694 expect(await queue2.hasNext, isFalse); |
| 695 }); |
| 696 |
| 697 test("multiple forks can be created at different points", () async { |
| 698 var queue1 = new StreamQueue<int>(createStream()); |
| 699 |
| 700 var queue2 = queue1.fork(); |
| 701 expect(await queue1.next, 1); |
| 702 expect(await queue2.next, 1); |
| 703 |
| 704 var queue3 = queue1.fork(); |
| 705 expect(await queue1.next, 2); |
| 706 expect(await queue2.next, 2); |
| 707 expect(await queue3.next, 2); |
| 708 |
| 709 var queue4 = queue1.fork(); |
| 710 expect(await queue1.next, 3); |
| 711 expect(await queue2.next, 3); |
| 712 expect(await queue3.next, 3); |
| 713 expect(await queue4.next, 3); |
| 714 |
| 715 var queue5 = queue1.fork(); |
| 716 expect(await queue1.next, 4); |
| 717 expect(await queue2.next, 4); |
| 718 expect(await queue3.next, 4); |
| 719 expect(await queue4.next, 4); |
| 720 expect(await queue5.next, 4); |
| 721 |
| 722 var queue6 = queue1.fork(); |
| 723 expect(await queue1.hasNext, isFalse); |
| 724 expect(await queue2.hasNext, isFalse); |
| 725 expect(await queue3.hasNext, isFalse); |
| 726 expect(await queue4.hasNext, isFalse); |
| 727 expect(await queue5.hasNext, isFalse); |
| 728 expect(await queue6.hasNext, isFalse); |
| 729 }); |
| 730 |
| 731 test("same-level forks receive data in the order they were created", |
| 732 () async { |
| 733 var queue1 = new StreamQueue<int>(createStream()); |
| 734 var queue2 = queue1.fork(); |
| 735 var queue3 = queue1.fork(); |
| 736 var queue4 = queue1.fork(); |
| 737 var queue5 = queue1.fork(); |
| 738 |
| 739 for (var i = 0; i < 4; i++) { |
| 740 var queue1Fired = false; |
| 741 var queue2Fired = false; |
| 742 var queue3Fired = false; |
| 743 var queue4Fired = false; |
| 744 var queue5Fired = false; |
| 745 |
| 746 queue5.next.then(expectAsync((_) { |
| 747 queue5Fired = true; |
| 748 expect(queue1Fired, isTrue); |
| 749 expect(queue2Fired, isTrue); |
| 750 expect(queue3Fired, isTrue); |
| 751 expect(queue4Fired, isTrue); |
| 752 })); |
| 753 |
| 754 queue1.next.then(expectAsync((_) { |
| 755 queue1Fired = true; |
| 756 expect(queue2Fired, isFalse); |
| 757 expect(queue3Fired, isFalse); |
| 758 expect(queue4Fired, isFalse); |
| 759 expect(queue5Fired, isFalse); |
| 760 })); |
| 761 |
| 762 queue4.next.then(expectAsync((_) { |
| 763 queue4Fired = true; |
| 764 expect(queue1Fired, isTrue); |
| 765 expect(queue2Fired, isTrue); |
| 766 expect(queue3Fired, isTrue); |
| 767 expect(queue5Fired, isFalse); |
| 768 })); |
| 769 |
| 770 queue2.next.then(expectAsync((_) { |
| 771 queue2Fired = true; |
| 772 expect(queue1Fired, isTrue); |
| 773 expect(queue3Fired, isFalse); |
| 774 expect(queue4Fired, isFalse); |
| 775 expect(queue5Fired, isFalse); |
| 776 })); |
| 777 |
| 778 queue3.next.then(expectAsync((_) { |
| 779 queue3Fired = true; |
| 780 expect(queue1Fired, isTrue); |
| 781 expect(queue2Fired, isTrue); |
| 782 expect(queue4Fired, isFalse); |
| 783 expect(queue5Fired, isFalse); |
| 784 })); |
| 785 } |
| 786 }); |
| 787 |
| 788 test("forks can be created from forks", () async { |
| 789 var queue1 = new StreamQueue<int>(createStream()); |
| 790 |
| 791 var queue2 = queue1.fork(); |
| 792 expect(await queue1.next, 1); |
| 793 expect(await queue2.next, 1); |
| 794 |
| 795 var queue3 = queue2.fork(); |
| 796 expect(await queue1.next, 2); |
| 797 expect(await queue2.next, 2); |
| 798 expect(await queue3.next, 2); |
| 799 |
| 800 var queue4 = queue3.fork(); |
| 801 expect(await queue1.next, 3); |
| 802 expect(await queue2.next, 3); |
| 803 expect(await queue3.next, 3); |
| 804 expect(await queue4.next, 3); |
| 805 |
| 806 var queue5 = queue4.fork(); |
| 807 expect(await queue1.next, 4); |
| 808 expect(await queue2.next, 4); |
| 809 expect(await queue3.next, 4); |
| 810 expect(await queue4.next, 4); |
| 811 expect(await queue5.next, 4); |
| 812 |
| 813 var queue6 = queue5.fork(); |
| 814 expect(await queue1.hasNext, isFalse); |
| 815 expect(await queue2.hasNext, isFalse); |
| 816 expect(await queue3.hasNext, isFalse); |
| 817 expect(await queue4.hasNext, isFalse); |
| 818 expect(await queue5.hasNext, isFalse); |
| 819 expect(await queue6.hasNext, isFalse); |
| 820 }); |
| 821 |
| 822 group("canceling:", () { |
| 823 test("cancelling a fork doesn't cancel its source", () async { |
| 824 var queue1 = new StreamQueue<int>(createStream()); |
| 825 var queue2 = queue1.fork(); |
| 826 |
| 827 queue2.cancel(); |
| 828 expect(() => queue2.next, throwsStateError); |
| 829 |
| 830 expect(await queue1.next, 1); |
| 831 expect(await queue1.next, 2); |
| 832 expect(await queue1.next, 3); |
| 833 expect(await queue1.next, 4); |
| 834 expect(await queue1.hasNext, isFalse); |
| 835 }); |
| 836 |
| 837 test("cancelling a source doesn't cancel its unmaterialized fork", |
| 838 () async { |
| 839 var queue1 = new StreamQueue<int>(createStream()); |
| 840 var queue2 = queue1.fork(); |
| 841 |
| 842 queue1.cancel(); |
| 843 expect(() => queue1.next, throwsStateError); |
| 844 |
| 845 expect(await queue2.next, 1); |
| 846 expect(await queue2.next, 2); |
| 847 expect(await queue2.next, 3); |
| 848 expect(await queue2.next, 4); |
| 849 expect(await queue2.hasNext, isFalse); |
| 850 }); |
| 851 |
| 852 test("cancelling a source doesn't cancel its materialized fork", |
| 853 () async { |
| 854 var queue1 = new StreamQueue<int>(createStream()); |
| 855 var queue2 = queue1.fork(); |
| 856 |
| 857 expect(await queue1.next, 1); |
| 858 |
| 859 queue1.cancel(); |
| 860 expect(() => queue1.next, throwsStateError); |
| 861 |
| 862 expect(await queue2.next, 1); |
| 863 expect(await queue2.next, 2); |
| 864 expect(await queue2.next, 3); |
| 865 expect(await queue2.next, 4); |
| 866 expect(await queue2.hasNext, isFalse); |
| 867 }); |
| 868 |
| 869 test("the underlying stream is only canceled once all forks are canceled", |
| 870 () async { |
| 871 var controller = new StreamController(); |
| 872 var queue1 = new StreamQueue<int>(controller.stream); |
| 873 var queue2 = queue1.fork(); |
| 874 |
| 875 await flushMicrotasks(); |
| 876 expect(controller.hasListener, isFalse); |
| 877 |
| 878 expect(queue1.next, completion(1)); |
| 879 await flushMicrotasks(); |
| 880 expect(controller.hasListener, isTrue); |
| 881 |
| 882 queue2.cancel(); |
| 883 await flushMicrotasks(); |
| 884 expect(controller.hasListener, isTrue); |
| 885 |
| 886 controller.add(1); |
| 887 queue1.cancel(); |
| 888 await flushMicrotasks(); |
| 889 expect(controller.hasListener, isFalse); |
| 890 }); |
| 891 |
| 892 group("with immediate,", () { |
| 893 test("cancelling a fork doesn't cancel its source", () async { |
| 894 var queue1 = new StreamQueue<int>(createStream()); |
| 895 var queue2 = queue1.fork(); |
| 896 |
| 897 queue2.cancel(immediate: true); |
| 898 expect(() => queue2.next, throwsStateError); |
| 899 |
| 900 expect(await queue1.next, 1); |
| 901 expect(await queue1.next, 2); |
| 902 expect(await queue1.next, 3); |
| 903 expect(await queue1.next, 4); |
| 904 expect(await queue1.hasNext, isFalse); |
| 905 }); |
| 906 |
| 907 test("cancelling a source doesn't cancel its unmaterialized fork", |
| 908 () async { |
| 909 var queue1 = new StreamQueue<int>(createStream()); |
| 910 var queue2 = queue1.fork(); |
| 911 |
| 912 queue1.cancel(immediate: true); |
| 913 expect(() => queue1.next, throwsStateError); |
| 914 |
| 915 expect(await queue2.next, 1); |
| 916 expect(await queue2.next, 2); |
| 917 expect(await queue2.next, 3); |
| 918 expect(await queue2.next, 4); |
| 919 expect(await queue2.hasNext, isFalse); |
| 920 }); |
| 921 |
| 922 test("cancelling a source doesn't cancel its materialized fork", |
| 923 () async { |
| 924 var queue1 = new StreamQueue<int>(createStream()); |
| 925 var queue2 = queue1.fork(); |
| 926 |
| 927 expect(await queue1.next, 1); |
| 928 |
| 929 queue1.cancel(immediate: true); |
| 930 expect(() => queue1.next, throwsStateError); |
| 931 |
| 932 expect(await queue2.next, 1); |
| 933 expect(await queue2.next, 2); |
| 934 expect(await queue2.next, 3); |
| 935 expect(await queue2.next, 4); |
| 936 expect(await queue2.hasNext, isFalse); |
| 937 }); |
| 938 |
| 939 test("the underlying stream is only canceled once all forks are " |
| 940 "canceled", () async { |
| 941 var controller = new StreamController(); |
| 942 var queue1 = new StreamQueue<int>(controller.stream); |
| 943 var queue2 = queue1.fork(); |
| 944 |
| 945 await flushMicrotasks(); |
| 946 expect(controller.hasListener, isFalse); |
| 947 |
| 948 expect(queue1.next, throwsStateError); |
| 949 await flushMicrotasks(); |
| 950 expect(controller.hasListener, isTrue); |
| 951 |
| 952 queue2.cancel(immediate: true); |
| 953 await flushMicrotasks(); |
| 954 expect(controller.hasListener, isTrue); |
| 955 |
| 956 queue1.cancel(immediate: true); |
| 957 await flushMicrotasks(); |
| 958 expect(controller.hasListener, isFalse); |
| 959 }); |
| 960 }); |
| 961 }); |
| 962 |
| 963 group("pausing:", () { |
| 964 test("the underlying stream is only implicitly paused when no forks are " |
| 965 "awaiting input", () async { |
| 966 var controller = new StreamController(); |
| 967 var queue1 = new StreamQueue<int>(controller.stream); |
| 968 var queue2 = queue1.fork(); |
| 969 |
| 970 controller.add(1); |
| 971 expect(await queue1.next, 1); |
| 972 expect(controller.hasListener, isTrue); |
| 973 expect(controller.isPaused, isTrue); |
| 974 |
| 975 expect(queue1.next, completion(2)); |
| 976 await flushMicrotasks(); |
| 977 expect(controller.isPaused, isFalse); |
| 978 |
| 979 controller.add(2); |
| 980 await flushMicrotasks(); |
| 981 expect(controller.isPaused, isTrue); |
| 982 |
| 983 expect(queue2.next, completion(1)); |
| 984 expect(queue2.next, completion(2)); |
| 985 expect(queue2.next, completion(3)); |
| 986 await flushMicrotasks(); |
| 987 expect(controller.isPaused, isFalse); |
| 988 |
| 989 controller.add(3); |
| 990 await flushMicrotasks(); |
| 991 expect(controller.isPaused, isTrue); |
| 992 }); |
| 993 |
| 994 test("pausing a fork doesn't pause its source", () async { |
| 995 var queue1 = new StreamQueue<int>(createStream()); |
| 996 var queue2 = queue1.fork(); |
| 997 |
| 998 queue2.rest.listen(expectAsync((_) {}, count: 0)).pause(); |
| 999 |
| 1000 expect(await queue1.next, 1); |
| 1001 expect(await queue1.next, 2); |
| 1002 expect(await queue1.next, 3); |
| 1003 expect(await queue1.next, 4); |
| 1004 expect(await queue1.hasNext, isFalse); |
| 1005 }); |
| 1006 |
| 1007 test("pausing a source doesn't pause its fork", () async { |
| 1008 var queue1 = new StreamQueue<int>(createStream()); |
| 1009 var queue2 = queue1.fork(); |
| 1010 |
| 1011 queue1.rest.listen(expectAsync((_) {}, count: 0)).pause(); |
| 1012 |
| 1013 expect(await queue2.next, 1); |
| 1014 expect(await queue2.next, 2); |
| 1015 expect(await queue2.next, 3); |
| 1016 expect(await queue2.next, 4); |
| 1017 expect(await queue2.hasNext, isFalse); |
| 1018 }); |
| 1019 |
| 1020 test("the underlying stream is only paused when all forks are paused", |
| 1021 () async { |
| 1022 var controller = new StreamController(); |
| 1023 var queue1 = new StreamQueue<int>(controller.stream); |
| 1024 var queue2 = queue1.fork(); |
| 1025 |
| 1026 await flushMicrotasks(); |
| 1027 expect(controller.hasListener, isFalse); |
| 1028 |
| 1029 var sub1 = queue1.rest.listen(null); |
| 1030 await flushMicrotasks(); |
| 1031 expect(controller.hasListener, isTrue); |
| 1032 expect(controller.isPaused, isFalse); |
| 1033 |
| 1034 sub1.pause(); |
| 1035 await flushMicrotasks(); |
| 1036 expect(controller.isPaused, isTrue); |
| 1037 |
| 1038 expect(queue2.next, completion(1)); |
| 1039 await flushMicrotasks(); |
| 1040 expect(controller.isPaused, isFalse); |
| 1041 |
| 1042 controller.add(1); |
| 1043 await flushMicrotasks(); |
| 1044 expect(controller.isPaused, isTrue); |
| 1045 |
| 1046 var sub2 = queue2.rest.listen(null); |
| 1047 await flushMicrotasks(); |
| 1048 expect(controller.isPaused, isFalse); |
| 1049 |
| 1050 sub2.pause(); |
| 1051 await flushMicrotasks(); |
| 1052 expect(controller.isPaused, isTrue); |
| 1053 |
| 1054 sub1.resume(); |
| 1055 await flushMicrotasks(); |
| 1056 expect(controller.isPaused, isFalse); |
| 1057 }); |
| 1058 }); |
| 1059 }); |
| 1060 |
| 1061 test("all combinations sequential skip/next/take operations", () async { |
| 1062 // Takes all combinations of two of next, skip and take, then ends with |
| 1063 // doing rest. Each of the first rounds do 10 events of each type, |
| 1064 // the rest does 20 elements. |
| 1065 var eventCount = 20 * (3 * 3 + 1); |
| 1066 var events = new StreamQueue<int>(createLongStream(eventCount)); |
| 1067 |
| 1068 // Test expecting [startIndex .. startIndex + 9] as events using |
| 1069 // `next`. |
| 1070 nextTest(startIndex) { |
| 1071 for (int i = 0; i < 10; i++) { |
| 1072 expect(events.next, completion(startIndex + i)); |
| 1073 } |
| 1074 } |
| 1075 |
| 1076 // Test expecting 10 events to be skipped. |
| 1077 skipTest(startIndex) { |
| 1078 expect(events.skip(10), completion(0)); |
| 1079 } |
| 1080 |
| 1081 // Test expecting [startIndex .. startIndex + 9] as events using |
| 1082 // `take(10)`. |
| 1083 takeTest(startIndex) { |
| 1084 expect(events.take(10), |
| 1085 completion(new List.generate(10, (i) => startIndex + i))); |
| 1086 } |
| 1087 var tests = [nextTest, skipTest, takeTest]; |
| 1088 |
| 1089 int counter = 0; |
| 1090 // Run through all pairs of two tests and run them. |
| 1091 for (int i = 0; i < tests.length; i++) { |
| 1092 for (int j = 0; j < tests.length; j++) { |
| 1093 tests[i](counter); |
| 1094 tests[j](counter + 10); |
| 1095 counter += 20; |
| 1096 } |
| 1097 } |
| 1098 // Then expect 20 more events as a `rest` call. |
| 1099 expect(events.rest.toList(), |
| 1100 completion(new List.generate(20, (i) => counter + i))); |
| 1101 }); |
| 1102 } |
| 1103 |
| 1104 Stream<int> createStream() async* { |
| 1105 yield 1; |
| 1106 await flushMicrotasks(); |
| 1107 yield 2; |
| 1108 await flushMicrotasks(); |
| 1109 yield 3; |
| 1110 await flushMicrotasks(); |
| 1111 yield 4; |
| 1112 } |
| 1113 |
| 1114 Stream<int> createErrorStream() { |
| 1115 StreamController controller = new StreamController<int>(); |
| 1116 () async { |
| 1117 controller.add(1); |
| 1118 await flushMicrotasks(); |
| 1119 controller.add(2); |
| 1120 await flushMicrotasks(); |
| 1121 controller.addError("To err is divine!"); |
| 1122 await flushMicrotasks(); |
| 1123 controller.add(4); |
| 1124 await flushMicrotasks(); |
| 1125 controller.close(); |
| 1126 }(); |
| 1127 return controller.stream; |
| 1128 } |
| 1129 |
| 1130 Stream<int> createLongStream(int eventCount) async* { |
| 1131 for (int i = 0; i < eventCount; i++) yield i; |
| 1132 } |
| 1133 |
| 1134 Future flushMicrotasks() => new Future.delayed(Duration.ZERO); |
OLD | NEW |