Index: test/util/stream_queue_test.dart |
diff --git a/test/util/stream_queue_test.dart b/test/util/stream_queue_test.dart |
deleted file mode 100644 |
index 7cc1557cafcbe0c047a687c46ac885b9b080d5d5..0000000000000000000000000000000000000000 |
--- a/test/util/stream_queue_test.dart |
+++ /dev/null |
@@ -1,1134 +0,0 @@ |
-// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
-// for details. All rights reserved. Use of this source code is governed by a |
-// BSD-style license that can be found in the LICENSE filevents. |
- |
-// TODO(nweiz): Get rid of this when https://codereview.chromium.org/1241723003/ |
-// lands. |
- |
-import "dart:async"; |
- |
-import "package:test/src/util/stream_queue.dart"; |
-import "package:test/test.dart"; |
- |
-main() { |
- group("source stream", () { |
- test("is listened to on first request, paused between requests", () async { |
- var controller = new StreamController(); |
- var events = new StreamQueue<int>(controller.stream); |
- await flushMicrotasks(); |
- expect(controller.hasListener, isFalse); |
- |
- var next = events.next; |
- expect(controller.hasListener, isTrue); |
- expect(controller.isPaused, isFalse); |
- |
- controller.add(1); |
- |
- expect(await next, 1); |
- expect(controller.hasListener, isTrue); |
- expect(controller.isPaused, isTrue); |
- |
- next = events.next; |
- expect(controller.hasListener, isTrue); |
- expect(controller.isPaused, isFalse); |
- |
- controller.add(2); |
- |
- expect(await next, 2); |
- expect(controller.hasListener, isTrue); |
- expect(controller.isPaused, isTrue); |
- |
- events.cancel(); |
- expect(controller.hasListener, isFalse); |
- }); |
- }); |
- |
- group("next operation", () { |
- test("simple sequence of requests", () async { |
- var events = new StreamQueue<int>(createStream()); |
- for (int i = 1; i <= 4; i++) { |
- expect(await events.next, i); |
- } |
- expect(events.next, throwsStateError); |
- }); |
- |
- test("multiple requests at the same time", () async { |
- var events = new StreamQueue<int>(createStream()); |
- var result = await Future.wait( |
- [events.next, events.next, events.next, events.next]); |
- expect(result, [1, 2, 3, 4]); |
- await events.cancel(); |
- }); |
- |
- test("sequence of requests with error", () async { |
- var events = new StreamQueue<int>(createErrorStream()); |
- expect(await events.next, 1); |
- expect(await events.next, 2); |
- expect(events.next, throwsA("To err is divine!")); |
- expect(await events.next, 4); |
- await events.cancel(); |
- }); |
- }); |
- |
- group("skip operation", () { |
- test("of two elements in the middle of sequence", () async { |
- var events = new StreamQueue<int>(createStream()); |
- expect(await events.next, 1); |
- expect(await events.skip(2), 0); |
- expect(await events.next, 4); |
- await events.cancel(); |
- }); |
- |
- test("with negative/bad arguments throws", () async { |
- var events = new StreamQueue<int>(createStream()); |
- expect(() => events.skip(-1), throwsArgumentError); |
- // A non-int throws either a type error or an argument error, |
- // depending on whether it's checked mode or not. |
- expect(await events.next, 1); // Did not consume event. |
- expect(() => events.skip(-1), throwsArgumentError); |
- expect(await events.next, 2); // Did not consume event. |
- await events.cancel(); |
- }); |
- |
- test("of 0 elements works", () async { |
- var events = new StreamQueue<int>(createStream()); |
- expect(events.skip(0), completion(0)); |
- expect(events.next, completion(1)); |
- expect(events.skip(0), completion(0)); |
- expect(events.next, completion(2)); |
- expect(events.skip(0), completion(0)); |
- expect(events.next, completion(3)); |
- expect(events.skip(0), completion(0)); |
- expect(events.next, completion(4)); |
- expect(events.skip(0), completion(0)); |
- expect(events.skip(5), completion(5)); |
- expect(events.next, throwsStateError); |
- await events.cancel(); |
- }); |
- |
- test("of too many events ends at stream start", () async { |
- var events = new StreamQueue<int>(createStream()); |
- expect(await events.skip(6), 2); |
- await events.cancel(); |
- }); |
- |
- test("of too many events after some events", () async { |
- var events = new StreamQueue<int>(createStream()); |
- expect(await events.next, 1); |
- expect(await events.next, 2); |
- expect(await events.skip(6), 4); |
- await events.cancel(); |
- }); |
- |
- test("of too many events ends at stream end", () async { |
- var events = new StreamQueue<int>(createStream()); |
- expect(await events.next, 1); |
- expect(await events.next, 2); |
- expect(await events.next, 3); |
- expect(await events.next, 4); |
- expect(await events.skip(2), 2); |
- await events.cancel(); |
- }); |
- |
- test("of events with error", () async { |
- var events = new StreamQueue<int>(createErrorStream()); |
- expect(events.skip(4), throwsA("To err is divine!")); |
- expect(await events.next, 4); |
- await events.cancel(); |
- }); |
- |
- test("of events with error, and skip again after", () async { |
- var events = new StreamQueue<int>(createErrorStream()); |
- expect(events.skip(4), throwsA("To err is divine!")); |
- expect(events.skip(2), completion(1)); |
- await events.cancel(); |
- }); |
- test("multiple skips at same time complete in order.", () async { |
- var events = new StreamQueue<int>(createStream()); |
- var skip1 = events.skip(1); |
- var skip2 = events.skip(0); |
- var skip3 = events.skip(4); |
- var skip4 = events.skip(1); |
- var index = 0; |
- // Check that futures complete in order. |
- sequence(expectedValue, sequenceIndex) => (value) { |
- expect(value, expectedValue); |
- expect(index, sequenceIndex); |
- index++; |
- }; |
- await Future.wait([skip1.then(sequence(0, 0)), |
- skip2.then(sequence(0, 1)), |
- skip3.then(sequence(1, 2)), |
- skip4.then(sequence(1, 3))]); |
- await events.cancel(); |
- }); |
- }); |
- |
- group("take operation", () { |
- test("as simple take of events", () async { |
- var events = new StreamQueue<int>(createStream()); |
- expect(await events.next, 1); |
- expect(await events.take(2), [2, 3]); |
- expect(await events.next, 4); |
- await events.cancel(); |
- }); |
- |
- test("of 0 events", () async { |
- var events = new StreamQueue<int>(createStream()); |
- expect(events.take(0), completion([])); |
- expect(events.next, completion(1)); |
- expect(events.take(0), completion([])); |
- expect(events.next, completion(2)); |
- expect(events.take(0), completion([])); |
- expect(events.next, completion(3)); |
- expect(events.take(0), completion([])); |
- expect(events.next, completion(4)); |
- expect(events.take(0), completion([])); |
- expect(events.take(5), completion([])); |
- expect(events.next, throwsStateError); |
- await events.cancel(); |
- }); |
- |
- test("with bad arguments throws", () async { |
- var events = new StreamQueue<int>(createStream()); |
- expect(() => events.take(-1), throwsArgumentError); |
- expect(await events.next, 1); // Did not consume event. |
- expect(() => events.take(-1), throwsArgumentError); |
- expect(await events.next, 2); // Did not consume event. |
- await events.cancel(); |
- }); |
- |
- test("of too many arguments", () async { |
- var events = new StreamQueue<int>(createStream()); |
- expect(await events.take(6), [1, 2, 3, 4]); |
- await events.cancel(); |
- }); |
- |
- test("too large later", () async { |
- var events = new StreamQueue<int>(createStream()); |
- expect(await events.next, 1); |
- expect(await events.next, 2); |
- expect(await events.take(6), [3, 4]); |
- await events.cancel(); |
- }); |
- |
- test("error", () async { |
- var events = new StreamQueue<int>(createErrorStream()); |
- expect(events.take(4), throwsA("To err is divine!")); |
- expect(await events.next, 4); |
- await events.cancel(); |
- }); |
- }); |
- |
- group("rest operation", () { |
- test("after single next", () async { |
- var events = new StreamQueue<int>(createStream()); |
- expect(await events.next, 1); |
- expect(await events.rest.toList(), [2, 3, 4]); |
- }); |
- |
- test("at start", () async { |
- var events = new StreamQueue<int>(createStream()); |
- expect(await events.rest.toList(), [1, 2, 3, 4]); |
- }); |
- |
- test("at end", () async { |
- var events = new StreamQueue<int>(createStream()); |
- expect(await events.next, 1); |
- expect(await events.next, 2); |
- expect(await events.next, 3); |
- expect(await events.next, 4); |
- expect(await events.rest.toList(), isEmpty); |
- }); |
- |
- test("after end", () async { |
- var events = new StreamQueue<int>(createStream()); |
- expect(await events.next, 1); |
- expect(await events.next, 2); |
- expect(await events.next, 3); |
- expect(await events.next, 4); |
- expect(events.next, throwsStateError); |
- expect(await events.rest.toList(), isEmpty); |
- }); |
- |
- test("after receiving done requested before", () async { |
- var events = new StreamQueue<int>(createStream()); |
- var next1 = events.next; |
- var next2 = events.next; |
- var next3 = events.next; |
- var rest = events.rest; |
- for (int i = 0; i < 10; i++) { |
- await flushMicrotasks(); |
- } |
- expect(await next1, 1); |
- expect(await next2, 2); |
- expect(await next3, 3); |
- expect(await rest.toList(), [4]); |
- }); |
- |
- test("with an error event error", () async { |
- var events = new StreamQueue<int>(createErrorStream()); |
- expect(await events.next, 1); |
- var rest = events.rest; |
- var events2 = new StreamQueue(rest); |
- expect(await events2.next, 2); |
- expect(events2.next, throwsA("To err is divine!")); |
- expect(await events2.next, 4); |
- }); |
- |
- test("closes the events, prevents other operations", () async { |
- var events = new StreamQueue<int>(createStream()); |
- var stream = events.rest; |
- expect(() => events.next, throwsStateError); |
- expect(() => events.skip(1), throwsStateError); |
- expect(() => events.take(1), throwsStateError); |
- expect(() => events.rest, throwsStateError); |
- expect(() => events.cancel(), throwsStateError); |
- expect(stream.toList(), completion([1, 2, 3, 4])); |
- }); |
- |
- test("forwards to underlying stream", () async { |
- var cancel = new Completer(); |
- var controller = new StreamController(onCancel: () => cancel.future); |
- var events = new StreamQueue<int>(controller.stream); |
- expect(controller.hasListener, isFalse); |
- var next = events.next; |
- expect(controller.hasListener, isTrue); |
- expect(controller.isPaused, isFalse); |
- |
- controller.add(1); |
- expect(await next, 1); |
- expect(controller.isPaused, isTrue); |
- |
- var rest = events.rest; |
- var subscription = rest.listen(null); |
- expect(controller.hasListener, isTrue); |
- expect(controller.isPaused, isFalse); |
- |
- var lastEvent; |
- subscription.onData((value) => lastEvent = value); |
- |
- controller.add(2); |
- |
- await flushMicrotasks(); |
- expect(lastEvent, 2); |
- expect(controller.hasListener, isTrue); |
- expect(controller.isPaused, isFalse); |
- |
- subscription.pause(); |
- expect(controller.isPaused, isTrue); |
- |
- controller.add(3); |
- |
- await flushMicrotasks(); |
- expect(lastEvent, 2); |
- subscription.resume(); |
- |
- await flushMicrotasks(); |
- expect(lastEvent, 3); |
- |
- var cancelFuture = subscription.cancel(); |
- expect(controller.hasListener, isFalse); |
- cancel.complete(42); |
- expect(cancelFuture, completion(42)); |
- }); |
- }); |
- |
- group("cancel operation", () { |
- test("closes the events, prevents any other operation", () async { |
- var events = new StreamQueue<int>(createStream()); |
- await events.cancel(); |
- expect(() => events.next, throwsStateError); |
- expect(() => events.skip(1), throwsStateError); |
- expect(() => events.take(1), throwsStateError); |
- expect(() => events.rest, throwsStateError); |
- expect(() => events.cancel(), throwsStateError); |
- }); |
- |
- test("cancels underlying subscription when called before any event", |
- () async { |
- var cancelFuture = new Future.value(42); |
- var controller = new StreamController(onCancel: () => cancelFuture); |
- var events = new StreamQueue<int>(controller.stream); |
- expect(await events.cancel(), 42); |
- }); |
- |
- test("cancels underlying subscription, returns result", () async { |
- var cancelFuture = new Future.value(42); |
- var controller = new StreamController(onCancel: () => cancelFuture); |
- var events = new StreamQueue<int>(controller.stream); |
- controller.add(1); |
- expect(await events.next, 1); |
- expect(await events.cancel(), 42); |
- }); |
- |
- group("with immediate: true", () { |
- test("closes the events, prevents any other operation", () async { |
- var events = new StreamQueue<int>(createStream()); |
- await events.cancel(immediate: true); |
- expect(() => events.next, throwsStateError); |
- expect(() => events.skip(1), throwsStateError); |
- expect(() => events.take(1), throwsStateError); |
- expect(() => events.rest, throwsStateError); |
- expect(() => events.cancel(), throwsStateError); |
- }); |
- |
- test("cancels the underlying subscription immediately", () async { |
- var controller = new StreamController(); |
- controller.add(1); |
- |
- var events = new StreamQueue<int>(controller.stream); |
- expect(await events.next, 1); |
- expect(controller.hasListener, isTrue); |
- |
- events.cancel(immediate: true); |
- await expect(controller.hasListener, isFalse); |
- }); |
- |
- test("cancels the underlying subscription when called before any event", |
- () async { |
- var cancelFuture = new Future.value(42); |
- var controller = new StreamController(onCancel: () => cancelFuture); |
- |
- var events = new StreamQueue<int>(controller.stream); |
- expect(await events.cancel(immediate: true), 42); |
- }); |
- |
- test("closes pending requests", () async { |
- var events = new StreamQueue<int>(createStream()); |
- expect(await events.next, 1); |
- expect(events.next, throwsStateError); |
- expect(events.hasNext, completion(isFalse)); |
- |
- await events.cancel(immediate: true); |
- }); |
- |
- test("returns the result of closing the underlying subscription", |
- () async { |
- var controller = new StreamController( |
- onCancel: () => new Future.value(42)); |
- var events = new StreamQueue<int>(controller.stream); |
- expect(await events.cancel(immediate: true), 42); |
- }); |
- |
- test("listens and then cancels a stream that hasn't been listened to yet", |
- () async { |
- var wasListened = false; |
- var controller = new StreamController( |
- onListen: () => wasListened = true); |
- var events = new StreamQueue<int>(controller.stream); |
- expect(wasListened, isFalse); |
- expect(controller.hasListener, isFalse); |
- |
- await events.cancel(immediate: true); |
- expect(wasListened, isTrue); |
- expect(controller.hasListener, isFalse); |
- }); |
- }); |
- }); |
- |
- group("hasNext operation", () { |
- test("true at start", () async { |
- var events = new StreamQueue<int>(createStream()); |
- expect(await events.hasNext, isTrue); |
- }); |
- |
- test("true after start", () async { |
- var events = new StreamQueue<int>(createStream()); |
- expect(await events.next, 1); |
- expect(await events.hasNext, isTrue); |
- }); |
- |
- test("true at end", () async { |
- var events = new StreamQueue<int>(createStream()); |
- for (int i = 1; i <= 4; i++) { |
- expect(await events.next, i); |
- } |
- expect(await events.hasNext, isFalse); |
- }); |
- |
- test("true when enqueued", () async { |
- var events = new StreamQueue<int>(createStream()); |
- var values = []; |
- for (int i = 1; i <= 3; i++) { |
- events.next.then(values.add); |
- } |
- expect(values, isEmpty); |
- expect(await events.hasNext, isTrue); |
- expect(values, [1, 2, 3]); |
- }); |
- |
- test("false when enqueued", () async { |
- var events = new StreamQueue<int>(createStream()); |
- var values = []; |
- for (int i = 1; i <= 4; i++) { |
- events.next.then(values.add); |
- } |
- expect(values, isEmpty); |
- expect(await events.hasNext, isFalse); |
- expect(values, [1, 2, 3, 4]); |
- }); |
- |
- test("true when data event", () async { |
- var controller = new StreamController(); |
- var events = new StreamQueue<int>(controller.stream); |
- |
- var hasNext; |
- events.hasNext.then((result) { hasNext = result; }); |
- await flushMicrotasks(); |
- expect(hasNext, isNull); |
- controller.add(42); |
- expect(hasNext, isNull); |
- await flushMicrotasks(); |
- expect(hasNext, isTrue); |
- }); |
- |
- test("true when error event", () async { |
- var controller = new StreamController(); |
- var events = new StreamQueue<int>(controller.stream); |
- |
- var hasNext; |
- events.hasNext.then((result) { hasNext = result; }); |
- await flushMicrotasks(); |
- expect(hasNext, isNull); |
- controller.addError("BAD"); |
- expect(hasNext, isNull); |
- await flushMicrotasks(); |
- expect(hasNext, isTrue); |
- expect(events.next, throwsA("BAD")); |
- }); |
- |
- test("- hasNext after hasNext", () async { |
- var events = new StreamQueue<int>(createStream()); |
- expect(await events.hasNext, true); |
- expect(await events.hasNext, true); |
- expect(await events.next, 1); |
- expect(await events.hasNext, true); |
- expect(await events.hasNext, true); |
- expect(await events.next, 2); |
- expect(await events.hasNext, true); |
- expect(await events.hasNext, true); |
- expect(await events.next, 3); |
- expect(await events.hasNext, true); |
- expect(await events.hasNext, true); |
- expect(await events.next, 4); |
- expect(await events.hasNext, false); |
- expect(await events.hasNext, false); |
- }); |
- |
- test("- next after true", () async { |
- var events = new StreamQueue<int>(createStream()); |
- expect(await events.next, 1); |
- expect(await events.hasNext, true); |
- expect(await events.next, 2); |
- expect(await events.next, 3); |
- }); |
- |
- test("- next after true, enqueued", () async { |
- var events = new StreamQueue<int>(createStream()); |
- var responses = []; |
- events.next.then(responses.add); |
- events.hasNext.then(responses.add); |
- events.next.then(responses.add); |
- do { |
- await flushMicrotasks(); |
- } while (responses.length < 3); |
- expect(responses, [1, true, 2]); |
- }); |
- |
- test("- skip 0 after true", () async { |
- var events = new StreamQueue<int>(createStream()); |
- expect(await events.next, 1); |
- expect(await events.hasNext, true); |
- expect(await events.skip(0), 0); |
- expect(await events.next, 2); |
- }); |
- |
- test("- skip 1 after true", () async { |
- var events = new StreamQueue<int>(createStream()); |
- expect(await events.next, 1); |
- expect(await events.hasNext, true); |
- expect(await events.skip(1), 0); |
- expect(await events.next, 3); |
- }); |
- |
- test("- skip 2 after true", () async { |
- var events = new StreamQueue<int>(createStream()); |
- expect(await events.next, 1); |
- expect(await events.hasNext, true); |
- expect(await events.skip(2), 0); |
- expect(await events.next, 4); |
- }); |
- |
- test("- take 0 after true", () async { |
- var events = new StreamQueue<int>(createStream()); |
- expect(await events.next, 1); |
- expect(await events.hasNext, true); |
- expect(await events.take(0), isEmpty); |
- expect(await events.next, 2); |
- }); |
- |
- test("- take 1 after true", () async { |
- var events = new StreamQueue<int>(createStream()); |
- expect(await events.next, 1); |
- expect(await events.hasNext, true); |
- expect(await events.take(1), [2]); |
- expect(await events.next, 3); |
- }); |
- |
- test("- take 2 after true", () async { |
- var events = new StreamQueue<int>(createStream()); |
- expect(await events.next, 1); |
- expect(await events.hasNext, true); |
- expect(await events.take(2), [2, 3]); |
- expect(await events.next, 4); |
- }); |
- |
- test("- rest after true", () async { |
- var events = new StreamQueue<int>(createStream()); |
- expect(await events.next, 1); |
- expect(await events.hasNext, true); |
- var stream = events.rest; |
- expect(await stream.toList(), [2, 3, 4]); |
- }); |
- |
- test("- rest after true, at last", () async { |
- var events = new StreamQueue<int>(createStream()); |
- expect(await events.next, 1); |
- expect(await events.next, 2); |
- expect(await events.next, 3); |
- expect(await events.hasNext, true); |
- var stream = events.rest; |
- expect(await stream.toList(), [4]); |
- }); |
- |
- test("- rest after false", () async { |
- var events = new StreamQueue<int>(createStream()); |
- expect(await events.next, 1); |
- expect(await events.next, 2); |
- expect(await events.next, 3); |
- expect(await events.next, 4); |
- expect(await events.hasNext, false); |
- var stream = events.rest; |
- expect(await stream.toList(), isEmpty); |
- }); |
- |
- test("- cancel after true on data", () async { |
- var events = new StreamQueue<int>(createStream()); |
- expect(await events.next, 1); |
- expect(await events.next, 2); |
- expect(await events.hasNext, true); |
- expect(await events.cancel(), null); |
- }); |
- |
- test("- cancel after true on error", () async { |
- var events = new StreamQueue<int>(createErrorStream()); |
- expect(await events.next, 1); |
- expect(await events.next, 2); |
- expect(await events.hasNext, true); |
- expect(await events.cancel(), null); |
- }); |
- }); |
- |
- group("fork operation", () { |
- test("produces a stream queue with the same events", () async { |
- var queue1 = new StreamQueue<int>(createStream()); |
- var queue2 = queue1.fork(); |
- |
- expect(await queue1.next, 1); |
- expect(await queue1.next, 2); |
- expect(await queue1.next, 3); |
- expect(await queue1.next, 4); |
- expect(await queue1.hasNext, isFalse); |
- |
- expect(await queue2.next, 1); |
- expect(await queue2.next, 2); |
- expect(await queue2.next, 3); |
- expect(await queue2.next, 4); |
- expect(await queue2.hasNext, isFalse); |
- }); |
- |
- test("produces a stream queue with the same errors", () async { |
- var queue1 = new StreamQueue<int>(createErrorStream()); |
- var queue2 = queue1.fork(); |
- |
- expect(await queue1.next, 1); |
- expect(await queue1.next, 2); |
- expect(queue1.next, throwsA("To err is divine!")); |
- expect(await queue1.next, 4); |
- expect(await queue1.hasNext, isFalse); |
- |
- expect(await queue2.next, 1); |
- expect(await queue2.next, 2); |
- expect(queue2.next, throwsA("To err is divine!")); |
- expect(await queue2.next, 4); |
- expect(await queue2.hasNext, isFalse); |
- }); |
- |
- test("forks at the current point in the source queue", () { |
- var queue1 = new StreamQueue<int>(createStream()); |
- |
- expect(queue1.next, completion(1)); |
- expect(queue1.next, completion(2)); |
- |
- var queue2 = queue1.fork(); |
- |
- expect(queue1.next, completion(3)); |
- expect(queue1.next, completion(4)); |
- expect(queue1.hasNext, completion(isFalse)); |
- |
- expect(queue2.next, completion(3)); |
- expect(queue2.next, completion(4)); |
- expect(queue2.hasNext, completion(isFalse)); |
- }); |
- |
- test("can be created after there are pending values", () async { |
- var queue1 = new StreamQueue<int>(createStream()); |
- await flushMicrotasks(); |
- |
- var queue2 = queue1.fork(); |
- expect(await queue2.next, 1); |
- expect(await queue2.next, 2); |
- expect(await queue2.next, 3); |
- expect(await queue2.next, 4); |
- expect(await queue2.hasNext, isFalse); |
- }); |
- |
- test("multiple forks can be created at different points", () async { |
- var queue1 = new StreamQueue<int>(createStream()); |
- |
- var queue2 = queue1.fork(); |
- expect(await queue1.next, 1); |
- expect(await queue2.next, 1); |
- |
- var queue3 = queue1.fork(); |
- expect(await queue1.next, 2); |
- expect(await queue2.next, 2); |
- expect(await queue3.next, 2); |
- |
- var queue4 = queue1.fork(); |
- expect(await queue1.next, 3); |
- expect(await queue2.next, 3); |
- expect(await queue3.next, 3); |
- expect(await queue4.next, 3); |
- |
- var queue5 = queue1.fork(); |
- expect(await queue1.next, 4); |
- expect(await queue2.next, 4); |
- expect(await queue3.next, 4); |
- expect(await queue4.next, 4); |
- expect(await queue5.next, 4); |
- |
- var queue6 = queue1.fork(); |
- expect(await queue1.hasNext, isFalse); |
- expect(await queue2.hasNext, isFalse); |
- expect(await queue3.hasNext, isFalse); |
- expect(await queue4.hasNext, isFalse); |
- expect(await queue5.hasNext, isFalse); |
- expect(await queue6.hasNext, isFalse); |
- }); |
- |
- test("same-level forks receive data in the order they were created", |
- () async { |
- var queue1 = new StreamQueue<int>(createStream()); |
- var queue2 = queue1.fork(); |
- var queue3 = queue1.fork(); |
- var queue4 = queue1.fork(); |
- var queue5 = queue1.fork(); |
- |
- for (var i = 0; i < 4; i++) { |
- var queue1Fired = false; |
- var queue2Fired = false; |
- var queue3Fired = false; |
- var queue4Fired = false; |
- var queue5Fired = false; |
- |
- queue5.next.then(expectAsync((_) { |
- queue5Fired = true; |
- expect(queue1Fired, isTrue); |
- expect(queue2Fired, isTrue); |
- expect(queue3Fired, isTrue); |
- expect(queue4Fired, isTrue); |
- })); |
- |
- queue1.next.then(expectAsync((_) { |
- queue1Fired = true; |
- expect(queue2Fired, isFalse); |
- expect(queue3Fired, isFalse); |
- expect(queue4Fired, isFalse); |
- expect(queue5Fired, isFalse); |
- })); |
- |
- queue4.next.then(expectAsync((_) { |
- queue4Fired = true; |
- expect(queue1Fired, isTrue); |
- expect(queue2Fired, isTrue); |
- expect(queue3Fired, isTrue); |
- expect(queue5Fired, isFalse); |
- })); |
- |
- queue2.next.then(expectAsync((_) { |
- queue2Fired = true; |
- expect(queue1Fired, isTrue); |
- expect(queue3Fired, isFalse); |
- expect(queue4Fired, isFalse); |
- expect(queue5Fired, isFalse); |
- })); |
- |
- queue3.next.then(expectAsync((_) { |
- queue3Fired = true; |
- expect(queue1Fired, isTrue); |
- expect(queue2Fired, isTrue); |
- expect(queue4Fired, isFalse); |
- expect(queue5Fired, isFalse); |
- })); |
- } |
- }); |
- |
- test("forks can be created from forks", () async { |
- var queue1 = new StreamQueue<int>(createStream()); |
- |
- var queue2 = queue1.fork(); |
- expect(await queue1.next, 1); |
- expect(await queue2.next, 1); |
- |
- var queue3 = queue2.fork(); |
- expect(await queue1.next, 2); |
- expect(await queue2.next, 2); |
- expect(await queue3.next, 2); |
- |
- var queue4 = queue3.fork(); |
- expect(await queue1.next, 3); |
- expect(await queue2.next, 3); |
- expect(await queue3.next, 3); |
- expect(await queue4.next, 3); |
- |
- var queue5 = queue4.fork(); |
- expect(await queue1.next, 4); |
- expect(await queue2.next, 4); |
- expect(await queue3.next, 4); |
- expect(await queue4.next, 4); |
- expect(await queue5.next, 4); |
- |
- var queue6 = queue5.fork(); |
- expect(await queue1.hasNext, isFalse); |
- expect(await queue2.hasNext, isFalse); |
- expect(await queue3.hasNext, isFalse); |
- expect(await queue4.hasNext, isFalse); |
- expect(await queue5.hasNext, isFalse); |
- expect(await queue6.hasNext, isFalse); |
- }); |
- |
- group("canceling:", () { |
- test("cancelling a fork doesn't cancel its source", () async { |
- var queue1 = new StreamQueue<int>(createStream()); |
- var queue2 = queue1.fork(); |
- |
- queue2.cancel(); |
- expect(() => queue2.next, throwsStateError); |
- |
- expect(await queue1.next, 1); |
- expect(await queue1.next, 2); |
- expect(await queue1.next, 3); |
- expect(await queue1.next, 4); |
- expect(await queue1.hasNext, isFalse); |
- }); |
- |
- test("cancelling a source doesn't cancel its unmaterialized fork", |
- () async { |
- var queue1 = new StreamQueue<int>(createStream()); |
- var queue2 = queue1.fork(); |
- |
- queue1.cancel(); |
- expect(() => queue1.next, throwsStateError); |
- |
- expect(await queue2.next, 1); |
- expect(await queue2.next, 2); |
- expect(await queue2.next, 3); |
- expect(await queue2.next, 4); |
- expect(await queue2.hasNext, isFalse); |
- }); |
- |
- test("cancelling a source doesn't cancel its materialized fork", |
- () async { |
- var queue1 = new StreamQueue<int>(createStream()); |
- var queue2 = queue1.fork(); |
- |
- expect(await queue1.next, 1); |
- |
- queue1.cancel(); |
- expect(() => queue1.next, throwsStateError); |
- |
- expect(await queue2.next, 1); |
- expect(await queue2.next, 2); |
- expect(await queue2.next, 3); |
- expect(await queue2.next, 4); |
- expect(await queue2.hasNext, isFalse); |
- }); |
- |
- test("the underlying stream is only canceled once all forks are canceled", |
- () async { |
- var controller = new StreamController(); |
- var queue1 = new StreamQueue<int>(controller.stream); |
- var queue2 = queue1.fork(); |
- |
- await flushMicrotasks(); |
- expect(controller.hasListener, isFalse); |
- |
- expect(queue1.next, completion(1)); |
- await flushMicrotasks(); |
- expect(controller.hasListener, isTrue); |
- |
- queue2.cancel(); |
- await flushMicrotasks(); |
- expect(controller.hasListener, isTrue); |
- |
- controller.add(1); |
- queue1.cancel(); |
- await flushMicrotasks(); |
- expect(controller.hasListener, isFalse); |
- }); |
- |
- group("with immediate,", () { |
- test("cancelling a fork doesn't cancel its source", () async { |
- var queue1 = new StreamQueue<int>(createStream()); |
- var queue2 = queue1.fork(); |
- |
- queue2.cancel(immediate: true); |
- expect(() => queue2.next, throwsStateError); |
- |
- expect(await queue1.next, 1); |
- expect(await queue1.next, 2); |
- expect(await queue1.next, 3); |
- expect(await queue1.next, 4); |
- expect(await queue1.hasNext, isFalse); |
- }); |
- |
- test("cancelling a source doesn't cancel its unmaterialized fork", |
- () async { |
- var queue1 = new StreamQueue<int>(createStream()); |
- var queue2 = queue1.fork(); |
- |
- queue1.cancel(immediate: true); |
- expect(() => queue1.next, throwsStateError); |
- |
- expect(await queue2.next, 1); |
- expect(await queue2.next, 2); |
- expect(await queue2.next, 3); |
- expect(await queue2.next, 4); |
- expect(await queue2.hasNext, isFalse); |
- }); |
- |
- test("cancelling a source doesn't cancel its materialized fork", |
- () async { |
- var queue1 = new StreamQueue<int>(createStream()); |
- var queue2 = queue1.fork(); |
- |
- expect(await queue1.next, 1); |
- |
- queue1.cancel(immediate: true); |
- expect(() => queue1.next, throwsStateError); |
- |
- expect(await queue2.next, 1); |
- expect(await queue2.next, 2); |
- expect(await queue2.next, 3); |
- expect(await queue2.next, 4); |
- expect(await queue2.hasNext, isFalse); |
- }); |
- |
- test("the underlying stream is only canceled once all forks are " |
- "canceled", () async { |
- var controller = new StreamController(); |
- var queue1 = new StreamQueue<int>(controller.stream); |
- var queue2 = queue1.fork(); |
- |
- await flushMicrotasks(); |
- expect(controller.hasListener, isFalse); |
- |
- expect(queue1.next, throwsStateError); |
- await flushMicrotasks(); |
- expect(controller.hasListener, isTrue); |
- |
- queue2.cancel(immediate: true); |
- await flushMicrotasks(); |
- expect(controller.hasListener, isTrue); |
- |
- queue1.cancel(immediate: true); |
- await flushMicrotasks(); |
- expect(controller.hasListener, isFalse); |
- }); |
- }); |
- }); |
- |
- group("pausing:", () { |
- test("the underlying stream is only implicitly paused when no forks are " |
- "awaiting input", () async { |
- var controller = new StreamController(); |
- var queue1 = new StreamQueue<int>(controller.stream); |
- var queue2 = queue1.fork(); |
- |
- controller.add(1); |
- expect(await queue1.next, 1); |
- expect(controller.hasListener, isTrue); |
- expect(controller.isPaused, isTrue); |
- |
- expect(queue1.next, completion(2)); |
- await flushMicrotasks(); |
- expect(controller.isPaused, isFalse); |
- |
- controller.add(2); |
- await flushMicrotasks(); |
- expect(controller.isPaused, isTrue); |
- |
- expect(queue2.next, completion(1)); |
- expect(queue2.next, completion(2)); |
- expect(queue2.next, completion(3)); |
- await flushMicrotasks(); |
- expect(controller.isPaused, isFalse); |
- |
- controller.add(3); |
- await flushMicrotasks(); |
- expect(controller.isPaused, isTrue); |
- }); |
- |
- test("pausing a fork doesn't pause its source", () async { |
- var queue1 = new StreamQueue<int>(createStream()); |
- var queue2 = queue1.fork(); |
- |
- queue2.rest.listen(expectAsync((_) {}, count: 0)).pause(); |
- |
- expect(await queue1.next, 1); |
- expect(await queue1.next, 2); |
- expect(await queue1.next, 3); |
- expect(await queue1.next, 4); |
- expect(await queue1.hasNext, isFalse); |
- }); |
- |
- test("pausing a source doesn't pause its fork", () async { |
- var queue1 = new StreamQueue<int>(createStream()); |
- var queue2 = queue1.fork(); |
- |
- queue1.rest.listen(expectAsync((_) {}, count: 0)).pause(); |
- |
- expect(await queue2.next, 1); |
- expect(await queue2.next, 2); |
- expect(await queue2.next, 3); |
- expect(await queue2.next, 4); |
- expect(await queue2.hasNext, isFalse); |
- }); |
- |
- test("the underlying stream is only paused when all forks are paused", |
- () async { |
- var controller = new StreamController(); |
- var queue1 = new StreamQueue<int>(controller.stream); |
- var queue2 = queue1.fork(); |
- |
- await flushMicrotasks(); |
- expect(controller.hasListener, isFalse); |
- |
- var sub1 = queue1.rest.listen(null); |
- await flushMicrotasks(); |
- expect(controller.hasListener, isTrue); |
- expect(controller.isPaused, isFalse); |
- |
- sub1.pause(); |
- await flushMicrotasks(); |
- expect(controller.isPaused, isTrue); |
- |
- expect(queue2.next, completion(1)); |
- await flushMicrotasks(); |
- expect(controller.isPaused, isFalse); |
- |
- controller.add(1); |
- await flushMicrotasks(); |
- expect(controller.isPaused, isTrue); |
- |
- var sub2 = queue2.rest.listen(null); |
- await flushMicrotasks(); |
- expect(controller.isPaused, isFalse); |
- |
- sub2.pause(); |
- await flushMicrotasks(); |
- expect(controller.isPaused, isTrue); |
- |
- sub1.resume(); |
- await flushMicrotasks(); |
- expect(controller.isPaused, isFalse); |
- }); |
- }); |
- }); |
- |
- test("all combinations sequential skip/next/take operations", () async { |
- // Takes all combinations of two of next, skip and take, then ends with |
- // doing rest. Each of the first rounds do 10 events of each type, |
- // the rest does 20 elements. |
- var eventCount = 20 * (3 * 3 + 1); |
- var events = new StreamQueue<int>(createLongStream(eventCount)); |
- |
- // Test expecting [startIndex .. startIndex + 9] as events using |
- // `next`. |
- nextTest(startIndex) { |
- for (int i = 0; i < 10; i++) { |
- expect(events.next, completion(startIndex + i)); |
- } |
- } |
- |
- // Test expecting 10 events to be skipped. |
- skipTest(startIndex) { |
- expect(events.skip(10), completion(0)); |
- } |
- |
- // Test expecting [startIndex .. startIndex + 9] as events using |
- // `take(10)`. |
- takeTest(startIndex) { |
- expect(events.take(10), |
- completion(new List.generate(10, (i) => startIndex + i))); |
- } |
- var tests = [nextTest, skipTest, takeTest]; |
- |
- int counter = 0; |
- // Run through all pairs of two tests and run them. |
- for (int i = 0; i < tests.length; i++) { |
- for (int j = 0; j < tests.length; j++) { |
- tests[i](counter); |
- tests[j](counter + 10); |
- counter += 20; |
- } |
- } |
- // Then expect 20 more events as a `rest` call. |
- expect(events.rest.toList(), |
- completion(new List.generate(20, (i) => counter + i))); |
- }); |
-} |
- |
-Stream<int> createStream() async* { |
- yield 1; |
- await flushMicrotasks(); |
- yield 2; |
- await flushMicrotasks(); |
- yield 3; |
- await flushMicrotasks(); |
- yield 4; |
-} |
- |
-Stream<int> createErrorStream() { |
- StreamController controller = new StreamController<int>(); |
- () async { |
- controller.add(1); |
- await flushMicrotasks(); |
- controller.add(2); |
- await flushMicrotasks(); |
- controller.addError("To err is divine!"); |
- await flushMicrotasks(); |
- controller.add(4); |
- await flushMicrotasks(); |
- controller.close(); |
- }(); |
- return controller.stream; |
-} |
- |
-Stream<int> createLongStream(int eventCount) async* { |
- for (int i = 0; i < eventCount; i++) yield i; |
-} |
- |
-Future flushMicrotasks() => new Future.delayed(Duration.ZERO); |