| Index: test/util/stream_queue_test.dart | 
| diff --git a/test/util/stream_queue_test.dart b/test/util/stream_queue_test.dart | 
| new file mode 100644 | 
| index 0000000000000000000000000000000000000000..7cc1557cafcbe0c047a687c46ac885b9b080d5d5 | 
| --- /dev/null | 
| +++ b/test/util/stream_queue_test.dart | 
| @@ -0,0 +1,1134 @@ | 
| +// Copyright (c) 2015, the Dart project authors.  Please see the AUTHORS file | 
| +// for details. All rights reserved. Use of this source code is governed by a | 
| +// BSD-style license that can be found in the LICENSE filevents. | 
| + | 
| +// TODO(nweiz): Get rid of this when https://codereview.chromium.org/1241723003/ | 
| +// lands. | 
| + | 
| +import "dart:async"; | 
| + | 
| +import "package:test/src/util/stream_queue.dart"; | 
| +import "package:test/test.dart"; | 
| + | 
| +main() { | 
| +  group("source stream", () { | 
| +    test("is listened to on first request, paused between requests", () async { | 
| +      var controller = new StreamController(); | 
| +      var events = new StreamQueue<int>(controller.stream); | 
| +      await flushMicrotasks(); | 
| +      expect(controller.hasListener, isFalse); | 
| + | 
| +      var next = events.next; | 
| +      expect(controller.hasListener, isTrue); | 
| +      expect(controller.isPaused, isFalse); | 
| + | 
| +      controller.add(1); | 
| + | 
| +      expect(await next, 1); | 
| +      expect(controller.hasListener, isTrue); | 
| +      expect(controller.isPaused, isTrue); | 
| + | 
| +      next = events.next; | 
| +      expect(controller.hasListener, isTrue); | 
| +      expect(controller.isPaused, isFalse); | 
| + | 
| +      controller.add(2); | 
| + | 
| +      expect(await next, 2); | 
| +      expect(controller.hasListener, isTrue); | 
| +      expect(controller.isPaused, isTrue); | 
| + | 
| +      events.cancel(); | 
| +      expect(controller.hasListener, isFalse); | 
| +    }); | 
| +  }); | 
| + | 
| +  group("next operation", () { | 
| +    test("simple sequence of requests", () async { | 
| +      var events = new StreamQueue<int>(createStream()); | 
| +      for (int i = 1; i <= 4; i++) { | 
| +        expect(await events.next, i); | 
| +      } | 
| +      expect(events.next, throwsStateError); | 
| +    }); | 
| + | 
| +    test("multiple requests at the same time", () async { | 
| +      var events = new StreamQueue<int>(createStream()); | 
| +      var result = await Future.wait( | 
| +          [events.next, events.next, events.next, events.next]); | 
| +      expect(result, [1, 2, 3, 4]); | 
| +      await events.cancel(); | 
| +    }); | 
| + | 
| +    test("sequence of requests with error", () async { | 
| +      var events = new StreamQueue<int>(createErrorStream()); | 
| +      expect(await events.next, 1); | 
| +      expect(await events.next, 2); | 
| +      expect(events.next, throwsA("To err is divine!")); | 
| +      expect(await events.next, 4); | 
| +      await events.cancel(); | 
| +    }); | 
| +  }); | 
| + | 
| +  group("skip operation", () { | 
| +    test("of two elements in the middle of sequence", () async { | 
| +      var events = new StreamQueue<int>(createStream()); | 
| +      expect(await events.next, 1); | 
| +      expect(await events.skip(2), 0); | 
| +      expect(await events.next, 4); | 
| +      await events.cancel(); | 
| +    }); | 
| + | 
| +    test("with negative/bad arguments throws", () async { | 
| +      var events = new StreamQueue<int>(createStream()); | 
| +      expect(() => events.skip(-1), throwsArgumentError); | 
| +      // A non-int throws either a type error or an argument error, | 
| +      // depending on whether it's checked mode or not. | 
| +      expect(await events.next, 1);  // Did not consume event. | 
| +      expect(() => events.skip(-1), throwsArgumentError); | 
| +      expect(await events.next, 2);  // Did not consume event. | 
| +      await events.cancel(); | 
| +    }); | 
| + | 
| +    test("of 0 elements works", () async { | 
| +      var events = new StreamQueue<int>(createStream()); | 
| +      expect(events.skip(0), completion(0)); | 
| +      expect(events.next, completion(1)); | 
| +      expect(events.skip(0), completion(0)); | 
| +      expect(events.next, completion(2)); | 
| +      expect(events.skip(0), completion(0)); | 
| +      expect(events.next, completion(3)); | 
| +      expect(events.skip(0), completion(0)); | 
| +      expect(events.next, completion(4)); | 
| +      expect(events.skip(0), completion(0)); | 
| +      expect(events.skip(5), completion(5)); | 
| +      expect(events.next, throwsStateError); | 
| +      await events.cancel(); | 
| +    }); | 
| + | 
| +    test("of too many events ends at stream start", () async { | 
| +      var events = new StreamQueue<int>(createStream()); | 
| +      expect(await events.skip(6), 2); | 
| +      await events.cancel(); | 
| +    }); | 
| + | 
| +    test("of too many events after some events", () async { | 
| +      var events = new StreamQueue<int>(createStream()); | 
| +      expect(await events.next, 1); | 
| +      expect(await events.next, 2); | 
| +      expect(await events.skip(6), 4); | 
| +      await events.cancel(); | 
| +    }); | 
| + | 
| +    test("of too many events ends at stream end", () async { | 
| +      var events = new StreamQueue<int>(createStream()); | 
| +      expect(await events.next, 1); | 
| +      expect(await events.next, 2); | 
| +      expect(await events.next, 3); | 
| +      expect(await events.next, 4); | 
| +      expect(await events.skip(2), 2); | 
| +      await events.cancel(); | 
| +    }); | 
| + | 
| +    test("of events with error", () async { | 
| +      var events = new StreamQueue<int>(createErrorStream()); | 
| +      expect(events.skip(4), throwsA("To err is divine!")); | 
| +      expect(await events.next, 4); | 
| +      await events.cancel(); | 
| +    }); | 
| + | 
| +    test("of events with error, and skip again after", () async { | 
| +      var events = new StreamQueue<int>(createErrorStream()); | 
| +      expect(events.skip(4), throwsA("To err is divine!")); | 
| +      expect(events.skip(2), completion(1)); | 
| +      await events.cancel(); | 
| +    }); | 
| +    test("multiple skips at same time complete in order.", () async { | 
| +      var events = new StreamQueue<int>(createStream()); | 
| +      var skip1 = events.skip(1); | 
| +      var skip2 = events.skip(0); | 
| +      var skip3 = events.skip(4); | 
| +      var skip4 = events.skip(1); | 
| +      var index = 0; | 
| +      // Check that futures complete in order. | 
| +      sequence(expectedValue, sequenceIndex) => (value) { | 
| +        expect(value, expectedValue); | 
| +        expect(index, sequenceIndex); | 
| +        index++; | 
| +      }; | 
| +      await Future.wait([skip1.then(sequence(0, 0)), | 
| +                         skip2.then(sequence(0, 1)), | 
| +                         skip3.then(sequence(1, 2)), | 
| +                         skip4.then(sequence(1, 3))]); | 
| +      await events.cancel(); | 
| +    }); | 
| +  }); | 
| + | 
| +  group("take operation", () { | 
| +    test("as simple take of events", () async { | 
| +      var events = new StreamQueue<int>(createStream()); | 
| +      expect(await events.next, 1); | 
| +      expect(await events.take(2), [2, 3]); | 
| +      expect(await events.next, 4); | 
| +      await events.cancel(); | 
| +    }); | 
| + | 
| +    test("of 0 events", () async { | 
| +      var events = new StreamQueue<int>(createStream()); | 
| +      expect(events.take(0), completion([])); | 
| +      expect(events.next, completion(1)); | 
| +      expect(events.take(0), completion([])); | 
| +      expect(events.next, completion(2)); | 
| +      expect(events.take(0), completion([])); | 
| +      expect(events.next, completion(3)); | 
| +      expect(events.take(0), completion([])); | 
| +      expect(events.next, completion(4)); | 
| +      expect(events.take(0), completion([])); | 
| +      expect(events.take(5), completion([])); | 
| +      expect(events.next, throwsStateError); | 
| +      await events.cancel(); | 
| +    }); | 
| + | 
| +    test("with bad arguments throws", () async { | 
| +      var events = new StreamQueue<int>(createStream()); | 
| +      expect(() => events.take(-1), throwsArgumentError); | 
| +      expect(await events.next, 1);  // Did not consume event. | 
| +      expect(() => events.take(-1), throwsArgumentError); | 
| +      expect(await events.next, 2);  // Did not consume event. | 
| +      await events.cancel(); | 
| +    }); | 
| + | 
| +    test("of too many arguments", () async { | 
| +      var events = new StreamQueue<int>(createStream()); | 
| +      expect(await events.take(6), [1, 2, 3, 4]); | 
| +      await events.cancel(); | 
| +    }); | 
| + | 
| +    test("too large later", () async { | 
| +      var events = new StreamQueue<int>(createStream()); | 
| +      expect(await events.next, 1); | 
| +      expect(await events.next, 2); | 
| +      expect(await events.take(6), [3, 4]); | 
| +      await events.cancel(); | 
| +    }); | 
| + | 
| +    test("error", () async { | 
| +      var events = new StreamQueue<int>(createErrorStream()); | 
| +      expect(events.take(4), throwsA("To err is divine!")); | 
| +      expect(await events.next, 4); | 
| +      await events.cancel(); | 
| +    }); | 
| +  }); | 
| + | 
| +  group("rest operation", () { | 
| +    test("after single next", () async { | 
| +      var events = new StreamQueue<int>(createStream()); | 
| +      expect(await events.next, 1); | 
| +      expect(await events.rest.toList(), [2, 3, 4]); | 
| +    }); | 
| + | 
| +    test("at start", () async { | 
| +      var events = new StreamQueue<int>(createStream()); | 
| +      expect(await events.rest.toList(), [1, 2, 3, 4]); | 
| +    }); | 
| + | 
| +    test("at end", () async { | 
| +      var events = new StreamQueue<int>(createStream()); | 
| +      expect(await events.next, 1); | 
| +      expect(await events.next, 2); | 
| +      expect(await events.next, 3); | 
| +      expect(await events.next, 4); | 
| +      expect(await events.rest.toList(), isEmpty); | 
| +    }); | 
| + | 
| +    test("after end", () async { | 
| +      var events = new StreamQueue<int>(createStream()); | 
| +      expect(await events.next, 1); | 
| +      expect(await events.next, 2); | 
| +      expect(await events.next, 3); | 
| +      expect(await events.next, 4); | 
| +      expect(events.next, throwsStateError); | 
| +      expect(await events.rest.toList(), isEmpty); | 
| +    }); | 
| + | 
| +    test("after receiving done requested before", () async { | 
| +      var events = new StreamQueue<int>(createStream()); | 
| +      var next1 = events.next; | 
| +      var next2 = events.next; | 
| +      var next3 = events.next; | 
| +      var rest = events.rest; | 
| +      for (int i = 0; i < 10; i++) { | 
| +        await flushMicrotasks(); | 
| +      } | 
| +      expect(await next1, 1); | 
| +      expect(await next2, 2); | 
| +      expect(await next3, 3); | 
| +      expect(await rest.toList(), [4]); | 
| +    }); | 
| + | 
| +    test("with an error event error", () async { | 
| +      var events = new StreamQueue<int>(createErrorStream()); | 
| +      expect(await events.next, 1); | 
| +      var rest = events.rest; | 
| +      var events2 = new StreamQueue(rest); | 
| +      expect(await events2.next, 2); | 
| +      expect(events2.next, throwsA("To err is divine!")); | 
| +      expect(await events2.next, 4); | 
| +    }); | 
| + | 
| +    test("closes the events, prevents other operations", () async { | 
| +      var events = new StreamQueue<int>(createStream()); | 
| +      var stream = events.rest; | 
| +      expect(() => events.next, throwsStateError); | 
| +      expect(() => events.skip(1), throwsStateError); | 
| +      expect(() => events.take(1), throwsStateError); | 
| +      expect(() => events.rest, throwsStateError); | 
| +      expect(() => events.cancel(), throwsStateError); | 
| +      expect(stream.toList(), completion([1, 2, 3, 4])); | 
| +    }); | 
| + | 
| +    test("forwards to underlying stream", () async { | 
| +      var cancel = new Completer(); | 
| +      var controller = new StreamController(onCancel: () => cancel.future); | 
| +      var events = new StreamQueue<int>(controller.stream); | 
| +      expect(controller.hasListener, isFalse); | 
| +      var next = events.next; | 
| +      expect(controller.hasListener, isTrue); | 
| +      expect(controller.isPaused, isFalse); | 
| + | 
| +      controller.add(1); | 
| +      expect(await next, 1); | 
| +      expect(controller.isPaused, isTrue); | 
| + | 
| +      var rest = events.rest; | 
| +      var subscription = rest.listen(null); | 
| +      expect(controller.hasListener, isTrue); | 
| +      expect(controller.isPaused, isFalse); | 
| + | 
| +      var lastEvent; | 
| +      subscription.onData((value) => lastEvent = value); | 
| + | 
| +      controller.add(2); | 
| + | 
| +      await flushMicrotasks(); | 
| +      expect(lastEvent, 2); | 
| +      expect(controller.hasListener, isTrue); | 
| +      expect(controller.isPaused, isFalse); | 
| + | 
| +      subscription.pause(); | 
| +      expect(controller.isPaused, isTrue); | 
| + | 
| +      controller.add(3); | 
| + | 
| +      await flushMicrotasks(); | 
| +      expect(lastEvent, 2); | 
| +      subscription.resume(); | 
| + | 
| +      await flushMicrotasks(); | 
| +      expect(lastEvent, 3); | 
| + | 
| +      var cancelFuture = subscription.cancel(); | 
| +      expect(controller.hasListener, isFalse); | 
| +      cancel.complete(42); | 
| +      expect(cancelFuture, completion(42)); | 
| +    }); | 
| +  }); | 
| + | 
| +  group("cancel operation", () { | 
| +    test("closes the events, prevents any other operation", () async { | 
| +      var events = new StreamQueue<int>(createStream()); | 
| +      await events.cancel(); | 
| +      expect(() => events.next, throwsStateError); | 
| +      expect(() => events.skip(1), throwsStateError); | 
| +      expect(() => events.take(1), throwsStateError); | 
| +      expect(() => events.rest, throwsStateError); | 
| +      expect(() => events.cancel(), throwsStateError); | 
| +    }); | 
| + | 
| +    test("cancels underlying subscription when called before any event", | 
| +        () async { | 
| +      var cancelFuture = new Future.value(42); | 
| +      var controller = new StreamController(onCancel: () => cancelFuture); | 
| +      var events = new StreamQueue<int>(controller.stream); | 
| +      expect(await events.cancel(), 42); | 
| +    }); | 
| + | 
| +    test("cancels underlying subscription, returns result", () async { | 
| +      var cancelFuture = new Future.value(42); | 
| +      var controller = new StreamController(onCancel: () => cancelFuture); | 
| +      var events = new StreamQueue<int>(controller.stream); | 
| +      controller.add(1); | 
| +      expect(await events.next, 1); | 
| +      expect(await events.cancel(), 42); | 
| +    }); | 
| + | 
| +    group("with immediate: true", () { | 
| +      test("closes the events, prevents any other operation", () async { | 
| +        var events = new StreamQueue<int>(createStream()); | 
| +        await events.cancel(immediate: true); | 
| +        expect(() => events.next, throwsStateError); | 
| +        expect(() => events.skip(1), throwsStateError); | 
| +        expect(() => events.take(1), throwsStateError); | 
| +        expect(() => events.rest, throwsStateError); | 
| +        expect(() => events.cancel(), throwsStateError); | 
| +      }); | 
| + | 
| +      test("cancels the underlying subscription immediately", () async { | 
| +        var controller = new StreamController(); | 
| +        controller.add(1); | 
| + | 
| +        var events = new StreamQueue<int>(controller.stream); | 
| +        expect(await events.next, 1); | 
| +        expect(controller.hasListener, isTrue); | 
| + | 
| +        events.cancel(immediate: true); | 
| +        await expect(controller.hasListener, isFalse); | 
| +      }); | 
| + | 
| +      test("cancels the underlying subscription when called before any event", | 
| +          () async { | 
| +        var cancelFuture = new Future.value(42); | 
| +        var controller = new StreamController(onCancel: () => cancelFuture); | 
| + | 
| +        var events = new StreamQueue<int>(controller.stream); | 
| +        expect(await events.cancel(immediate: true), 42); | 
| +      }); | 
| + | 
| +      test("closes pending requests", () async { | 
| +        var events = new StreamQueue<int>(createStream()); | 
| +        expect(await events.next, 1); | 
| +        expect(events.next, throwsStateError); | 
| +        expect(events.hasNext, completion(isFalse)); | 
| + | 
| +        await events.cancel(immediate: true); | 
| +      }); | 
| + | 
| +      test("returns the result of closing the underlying subscription", | 
| +          () async { | 
| +        var controller = new StreamController( | 
| +            onCancel: () => new Future.value(42)); | 
| +        var events = new StreamQueue<int>(controller.stream); | 
| +        expect(await events.cancel(immediate: true), 42); | 
| +      }); | 
| + | 
| +      test("listens and then cancels a stream that hasn't been listened to yet", | 
| +          () async { | 
| +        var wasListened = false; | 
| +        var controller = new StreamController( | 
| +            onListen: () => wasListened = true); | 
| +        var events = new StreamQueue<int>(controller.stream); | 
| +        expect(wasListened, isFalse); | 
| +        expect(controller.hasListener, isFalse); | 
| + | 
| +        await events.cancel(immediate: true); | 
| +        expect(wasListened, isTrue); | 
| +        expect(controller.hasListener, isFalse); | 
| +      }); | 
| +    }); | 
| +  }); | 
| + | 
| +  group("hasNext operation", () { | 
| +    test("true at start", () async { | 
| +      var events = new StreamQueue<int>(createStream()); | 
| +      expect(await events.hasNext, isTrue); | 
| +    }); | 
| + | 
| +    test("true after start", () async { | 
| +      var events = new StreamQueue<int>(createStream()); | 
| +      expect(await events.next, 1); | 
| +      expect(await events.hasNext, isTrue); | 
| +    }); | 
| + | 
| +    test("true at end", () async { | 
| +      var events = new StreamQueue<int>(createStream()); | 
| +      for (int i = 1; i <= 4; i++) { | 
| +        expect(await events.next, i); | 
| +      } | 
| +      expect(await events.hasNext, isFalse); | 
| +    }); | 
| + | 
| +    test("true when enqueued", () async { | 
| +      var events = new StreamQueue<int>(createStream()); | 
| +      var values = []; | 
| +      for (int i = 1; i <= 3; i++) { | 
| +        events.next.then(values.add); | 
| +      } | 
| +      expect(values, isEmpty); | 
| +      expect(await events.hasNext, isTrue); | 
| +      expect(values, [1, 2, 3]); | 
| +    }); | 
| + | 
| +    test("false when enqueued", () async { | 
| +      var events = new StreamQueue<int>(createStream()); | 
| +      var values = []; | 
| +      for (int i = 1; i <= 4; i++) { | 
| +        events.next.then(values.add); | 
| +      } | 
| +      expect(values, isEmpty); | 
| +      expect(await events.hasNext, isFalse); | 
| +      expect(values, [1, 2, 3, 4]); | 
| +    }); | 
| + | 
| +    test("true when data event", () async { | 
| +      var controller = new StreamController(); | 
| +      var events = new StreamQueue<int>(controller.stream); | 
| + | 
| +      var hasNext; | 
| +      events.hasNext.then((result) { hasNext = result; }); | 
| +      await flushMicrotasks(); | 
| +      expect(hasNext, isNull); | 
| +      controller.add(42); | 
| +      expect(hasNext, isNull); | 
| +      await flushMicrotasks(); | 
| +      expect(hasNext, isTrue); | 
| +    }); | 
| + | 
| +    test("true when error event", () async { | 
| +      var controller = new StreamController(); | 
| +      var events = new StreamQueue<int>(controller.stream); | 
| + | 
| +      var hasNext; | 
| +      events.hasNext.then((result) { hasNext = result; }); | 
| +      await flushMicrotasks(); | 
| +      expect(hasNext, isNull); | 
| +      controller.addError("BAD"); | 
| +      expect(hasNext, isNull); | 
| +      await flushMicrotasks(); | 
| +      expect(hasNext, isTrue); | 
| +      expect(events.next, throwsA("BAD")); | 
| +    }); | 
| + | 
| +    test("- hasNext after hasNext", () async { | 
| +      var events = new StreamQueue<int>(createStream()); | 
| +      expect(await events.hasNext, true); | 
| +      expect(await events.hasNext, true); | 
| +      expect(await events.next, 1); | 
| +      expect(await events.hasNext, true); | 
| +      expect(await events.hasNext, true); | 
| +      expect(await events.next, 2); | 
| +      expect(await events.hasNext, true); | 
| +      expect(await events.hasNext, true); | 
| +      expect(await events.next, 3); | 
| +      expect(await events.hasNext, true); | 
| +      expect(await events.hasNext, true); | 
| +      expect(await events.next, 4); | 
| +      expect(await events.hasNext, false); | 
| +      expect(await events.hasNext, false); | 
| +    }); | 
| + | 
| +    test("- next after true", () async { | 
| +      var events = new StreamQueue<int>(createStream()); | 
| +      expect(await events.next, 1); | 
| +      expect(await events.hasNext, true); | 
| +      expect(await events.next, 2); | 
| +      expect(await events.next, 3); | 
| +    }); | 
| + | 
| +    test("- next after true, enqueued", () async { | 
| +      var events = new StreamQueue<int>(createStream()); | 
| +      var responses = []; | 
| +      events.next.then(responses.add); | 
| +      events.hasNext.then(responses.add); | 
| +      events.next.then(responses.add); | 
| +      do { | 
| +        await flushMicrotasks(); | 
| +      } while (responses.length < 3); | 
| +      expect(responses, [1, true, 2]); | 
| +    }); | 
| + | 
| +    test("- skip 0 after true", () async { | 
| +      var events = new StreamQueue<int>(createStream()); | 
| +      expect(await events.next, 1); | 
| +      expect(await events.hasNext, true); | 
| +      expect(await events.skip(0), 0); | 
| +      expect(await events.next, 2); | 
| +    }); | 
| + | 
| +    test("- skip 1 after true", () async { | 
| +      var events = new StreamQueue<int>(createStream()); | 
| +      expect(await events.next, 1); | 
| +      expect(await events.hasNext, true); | 
| +      expect(await events.skip(1), 0); | 
| +      expect(await events.next, 3); | 
| +    }); | 
| + | 
| +    test("- skip 2 after true", () async { | 
| +      var events = new StreamQueue<int>(createStream()); | 
| +      expect(await events.next, 1); | 
| +      expect(await events.hasNext, true); | 
| +      expect(await events.skip(2), 0); | 
| +      expect(await events.next, 4); | 
| +    }); | 
| + | 
| +    test("- take 0 after true", () async { | 
| +      var events = new StreamQueue<int>(createStream()); | 
| +      expect(await events.next, 1); | 
| +      expect(await events.hasNext, true); | 
| +      expect(await events.take(0), isEmpty); | 
| +      expect(await events.next, 2); | 
| +    }); | 
| + | 
| +    test("- take 1 after true", () async { | 
| +      var events = new StreamQueue<int>(createStream()); | 
| +      expect(await events.next, 1); | 
| +      expect(await events.hasNext, true); | 
| +      expect(await events.take(1), [2]); | 
| +      expect(await events.next, 3); | 
| +    }); | 
| + | 
| +    test("- take 2 after true", () async { | 
| +      var events = new StreamQueue<int>(createStream()); | 
| +      expect(await events.next, 1); | 
| +      expect(await events.hasNext, true); | 
| +      expect(await events.take(2), [2, 3]); | 
| +      expect(await events.next, 4); | 
| +    }); | 
| + | 
| +    test("- rest after true", () async { | 
| +      var events = new StreamQueue<int>(createStream()); | 
| +      expect(await events.next, 1); | 
| +      expect(await events.hasNext, true); | 
| +      var stream = events.rest; | 
| +      expect(await stream.toList(), [2, 3, 4]); | 
| +    }); | 
| + | 
| +    test("- rest after true, at last", () async { | 
| +      var events = new StreamQueue<int>(createStream()); | 
| +      expect(await events.next, 1); | 
| +      expect(await events.next, 2); | 
| +      expect(await events.next, 3); | 
| +      expect(await events.hasNext, true); | 
| +      var stream = events.rest; | 
| +      expect(await stream.toList(), [4]); | 
| +    }); | 
| + | 
| +    test("- rest after false", () async { | 
| +      var events = new StreamQueue<int>(createStream()); | 
| +      expect(await events.next, 1); | 
| +      expect(await events.next, 2); | 
| +      expect(await events.next, 3); | 
| +      expect(await events.next, 4); | 
| +      expect(await events.hasNext, false); | 
| +      var stream = events.rest; | 
| +      expect(await stream.toList(), isEmpty); | 
| +    }); | 
| + | 
| +    test("- cancel after true on data", () async { | 
| +      var events = new StreamQueue<int>(createStream()); | 
| +      expect(await events.next, 1); | 
| +      expect(await events.next, 2); | 
| +      expect(await events.hasNext, true); | 
| +      expect(await events.cancel(), null); | 
| +    }); | 
| + | 
| +    test("- cancel after true on error", () async { | 
| +      var events = new StreamQueue<int>(createErrorStream()); | 
| +      expect(await events.next, 1); | 
| +      expect(await events.next, 2); | 
| +      expect(await events.hasNext, true); | 
| +      expect(await events.cancel(), null); | 
| +    }); | 
| +  }); | 
| + | 
| +  group("fork operation", () { | 
| +    test("produces a stream queue with the same events", () async { | 
| +      var queue1 = new StreamQueue<int>(createStream()); | 
| +      var queue2 = queue1.fork(); | 
| + | 
| +      expect(await queue1.next, 1); | 
| +      expect(await queue1.next, 2); | 
| +      expect(await queue1.next, 3); | 
| +      expect(await queue1.next, 4); | 
| +      expect(await queue1.hasNext, isFalse); | 
| + | 
| +      expect(await queue2.next, 1); | 
| +      expect(await queue2.next, 2); | 
| +      expect(await queue2.next, 3); | 
| +      expect(await queue2.next, 4); | 
| +      expect(await queue2.hasNext, isFalse); | 
| +    }); | 
| + | 
| +    test("produces a stream queue with the same errors", () async { | 
| +      var queue1 = new StreamQueue<int>(createErrorStream()); | 
| +      var queue2 = queue1.fork(); | 
| + | 
| +      expect(await queue1.next, 1); | 
| +      expect(await queue1.next, 2); | 
| +      expect(queue1.next, throwsA("To err is divine!")); | 
| +      expect(await queue1.next, 4); | 
| +      expect(await queue1.hasNext, isFalse); | 
| + | 
| +      expect(await queue2.next, 1); | 
| +      expect(await queue2.next, 2); | 
| +      expect(queue2.next, throwsA("To err is divine!")); | 
| +      expect(await queue2.next, 4); | 
| +      expect(await queue2.hasNext, isFalse); | 
| +    }); | 
| + | 
| +    test("forks at the current point in the source queue", () { | 
| +      var queue1 = new StreamQueue<int>(createStream()); | 
| + | 
| +      expect(queue1.next, completion(1)); | 
| +      expect(queue1.next, completion(2)); | 
| + | 
| +      var queue2 = queue1.fork(); | 
| + | 
| +      expect(queue1.next, completion(3)); | 
| +      expect(queue1.next, completion(4)); | 
| +      expect(queue1.hasNext, completion(isFalse)); | 
| + | 
| +      expect(queue2.next, completion(3)); | 
| +      expect(queue2.next, completion(4)); | 
| +      expect(queue2.hasNext, completion(isFalse)); | 
| +    }); | 
| + | 
| +    test("can be created after there are pending values", () async { | 
| +      var queue1 = new StreamQueue<int>(createStream()); | 
| +      await flushMicrotasks(); | 
| + | 
| +      var queue2 = queue1.fork(); | 
| +      expect(await queue2.next, 1); | 
| +      expect(await queue2.next, 2); | 
| +      expect(await queue2.next, 3); | 
| +      expect(await queue2.next, 4); | 
| +      expect(await queue2.hasNext, isFalse); | 
| +    }); | 
| + | 
| +    test("multiple forks can be created at different points", () async { | 
| +      var queue1 = new StreamQueue<int>(createStream()); | 
| + | 
| +      var queue2 = queue1.fork(); | 
| +      expect(await queue1.next, 1); | 
| +      expect(await queue2.next, 1); | 
| + | 
| +      var queue3 = queue1.fork(); | 
| +      expect(await queue1.next, 2); | 
| +      expect(await queue2.next, 2); | 
| +      expect(await queue3.next, 2); | 
| + | 
| +      var queue4 = queue1.fork(); | 
| +      expect(await queue1.next, 3); | 
| +      expect(await queue2.next, 3); | 
| +      expect(await queue3.next, 3); | 
| +      expect(await queue4.next, 3); | 
| + | 
| +      var queue5 = queue1.fork(); | 
| +      expect(await queue1.next, 4); | 
| +      expect(await queue2.next, 4); | 
| +      expect(await queue3.next, 4); | 
| +      expect(await queue4.next, 4); | 
| +      expect(await queue5.next, 4); | 
| + | 
| +      var queue6 = queue1.fork(); | 
| +      expect(await queue1.hasNext, isFalse); | 
| +      expect(await queue2.hasNext, isFalse); | 
| +      expect(await queue3.hasNext, isFalse); | 
| +      expect(await queue4.hasNext, isFalse); | 
| +      expect(await queue5.hasNext, isFalse); | 
| +      expect(await queue6.hasNext, isFalse); | 
| +    }); | 
| + | 
| +    test("same-level forks receive data in the order they were created", | 
| +        () async { | 
| +      var queue1 = new StreamQueue<int>(createStream()); | 
| +      var queue2 = queue1.fork(); | 
| +      var queue3 = queue1.fork(); | 
| +      var queue4 = queue1.fork(); | 
| +      var queue5 = queue1.fork(); | 
| + | 
| +      for (var i = 0; i < 4; i++) { | 
| +        var queue1Fired = false; | 
| +        var queue2Fired = false; | 
| +        var queue3Fired = false; | 
| +        var queue4Fired = false; | 
| +        var queue5Fired = false; | 
| + | 
| +        queue5.next.then(expectAsync((_) { | 
| +          queue5Fired = true; | 
| +          expect(queue1Fired, isTrue); | 
| +          expect(queue2Fired, isTrue); | 
| +          expect(queue3Fired, isTrue); | 
| +          expect(queue4Fired, isTrue); | 
| +        })); | 
| + | 
| +        queue1.next.then(expectAsync((_) { | 
| +          queue1Fired = true; | 
| +          expect(queue2Fired, isFalse); | 
| +          expect(queue3Fired, isFalse); | 
| +          expect(queue4Fired, isFalse); | 
| +          expect(queue5Fired, isFalse); | 
| +        })); | 
| + | 
| +        queue4.next.then(expectAsync((_) { | 
| +          queue4Fired = true; | 
| +          expect(queue1Fired, isTrue); | 
| +          expect(queue2Fired, isTrue); | 
| +          expect(queue3Fired, isTrue); | 
| +          expect(queue5Fired, isFalse); | 
| +        })); | 
| + | 
| +        queue2.next.then(expectAsync((_) { | 
| +          queue2Fired = true; | 
| +          expect(queue1Fired, isTrue); | 
| +          expect(queue3Fired, isFalse); | 
| +          expect(queue4Fired, isFalse); | 
| +          expect(queue5Fired, isFalse); | 
| +        })); | 
| + | 
| +        queue3.next.then(expectAsync((_) { | 
| +          queue3Fired = true; | 
| +          expect(queue1Fired, isTrue); | 
| +          expect(queue2Fired, isTrue); | 
| +          expect(queue4Fired, isFalse); | 
| +          expect(queue5Fired, isFalse); | 
| +        })); | 
| +      } | 
| +    }); | 
| + | 
| +    test("forks can be created from forks", () async { | 
| +      var queue1 = new StreamQueue<int>(createStream()); | 
| + | 
| +      var queue2 = queue1.fork(); | 
| +      expect(await queue1.next, 1); | 
| +      expect(await queue2.next, 1); | 
| + | 
| +      var queue3 = queue2.fork(); | 
| +      expect(await queue1.next, 2); | 
| +      expect(await queue2.next, 2); | 
| +      expect(await queue3.next, 2); | 
| + | 
| +      var queue4 = queue3.fork(); | 
| +      expect(await queue1.next, 3); | 
| +      expect(await queue2.next, 3); | 
| +      expect(await queue3.next, 3); | 
| +      expect(await queue4.next, 3); | 
| + | 
| +      var queue5 = queue4.fork(); | 
| +      expect(await queue1.next, 4); | 
| +      expect(await queue2.next, 4); | 
| +      expect(await queue3.next, 4); | 
| +      expect(await queue4.next, 4); | 
| +      expect(await queue5.next, 4); | 
| + | 
| +      var queue6 = queue5.fork(); | 
| +      expect(await queue1.hasNext, isFalse); | 
| +      expect(await queue2.hasNext, isFalse); | 
| +      expect(await queue3.hasNext, isFalse); | 
| +      expect(await queue4.hasNext, isFalse); | 
| +      expect(await queue5.hasNext, isFalse); | 
| +      expect(await queue6.hasNext, isFalse); | 
| +    }); | 
| + | 
| +    group("canceling:", () { | 
| +      test("cancelling a fork doesn't cancel its source", () async { | 
| +        var queue1 = new StreamQueue<int>(createStream()); | 
| +        var queue2 = queue1.fork(); | 
| + | 
| +        queue2.cancel(); | 
| +        expect(() => queue2.next, throwsStateError); | 
| + | 
| +        expect(await queue1.next, 1); | 
| +        expect(await queue1.next, 2); | 
| +        expect(await queue1.next, 3); | 
| +        expect(await queue1.next, 4); | 
| +        expect(await queue1.hasNext, isFalse); | 
| +      }); | 
| + | 
| +      test("cancelling a source doesn't cancel its unmaterialized fork", | 
| +          () async { | 
| +        var queue1 = new StreamQueue<int>(createStream()); | 
| +        var queue2 = queue1.fork(); | 
| + | 
| +        queue1.cancel(); | 
| +        expect(() => queue1.next, throwsStateError); | 
| + | 
| +        expect(await queue2.next, 1); | 
| +        expect(await queue2.next, 2); | 
| +        expect(await queue2.next, 3); | 
| +        expect(await queue2.next, 4); | 
| +        expect(await queue2.hasNext, isFalse); | 
| +      }); | 
| + | 
| +      test("cancelling a source doesn't cancel its materialized fork", | 
| +          () async { | 
| +        var queue1 = new StreamQueue<int>(createStream()); | 
| +        var queue2 = queue1.fork(); | 
| + | 
| +        expect(await queue1.next, 1); | 
| + | 
| +        queue1.cancel(); | 
| +        expect(() => queue1.next, throwsStateError); | 
| + | 
| +        expect(await queue2.next, 1); | 
| +        expect(await queue2.next, 2); | 
| +        expect(await queue2.next, 3); | 
| +        expect(await queue2.next, 4); | 
| +        expect(await queue2.hasNext, isFalse); | 
| +      }); | 
| + | 
| +      test("the underlying stream is only canceled once all forks are canceled", | 
| +          () async { | 
| +        var controller = new StreamController(); | 
| +        var queue1 = new StreamQueue<int>(controller.stream); | 
| +        var queue2 = queue1.fork(); | 
| + | 
| +        await flushMicrotasks(); | 
| +        expect(controller.hasListener, isFalse); | 
| + | 
| +        expect(queue1.next, completion(1)); | 
| +        await flushMicrotasks(); | 
| +        expect(controller.hasListener, isTrue); | 
| + | 
| +        queue2.cancel(); | 
| +        await flushMicrotasks(); | 
| +        expect(controller.hasListener, isTrue); | 
| + | 
| +        controller.add(1); | 
| +        queue1.cancel(); | 
| +        await flushMicrotasks(); | 
| +        expect(controller.hasListener, isFalse); | 
| +      }); | 
| + | 
| +      group("with immediate,", () { | 
| +        test("cancelling a fork doesn't cancel its source", () async { | 
| +          var queue1 = new StreamQueue<int>(createStream()); | 
| +          var queue2 = queue1.fork(); | 
| + | 
| +          queue2.cancel(immediate: true); | 
| +          expect(() => queue2.next, throwsStateError); | 
| + | 
| +          expect(await queue1.next, 1); | 
| +          expect(await queue1.next, 2); | 
| +          expect(await queue1.next, 3); | 
| +          expect(await queue1.next, 4); | 
| +          expect(await queue1.hasNext, isFalse); | 
| +        }); | 
| + | 
| +        test("cancelling a source doesn't cancel its unmaterialized fork", | 
| +            () async { | 
| +          var queue1 = new StreamQueue<int>(createStream()); | 
| +          var queue2 = queue1.fork(); | 
| + | 
| +          queue1.cancel(immediate: true); | 
| +          expect(() => queue1.next, throwsStateError); | 
| + | 
| +          expect(await queue2.next, 1); | 
| +          expect(await queue2.next, 2); | 
| +          expect(await queue2.next, 3); | 
| +          expect(await queue2.next, 4); | 
| +          expect(await queue2.hasNext, isFalse); | 
| +        }); | 
| + | 
| +        test("cancelling a source doesn't cancel its materialized fork", | 
| +            () async { | 
| +          var queue1 = new StreamQueue<int>(createStream()); | 
| +          var queue2 = queue1.fork(); | 
| + | 
| +          expect(await queue1.next, 1); | 
| + | 
| +          queue1.cancel(immediate: true); | 
| +          expect(() => queue1.next, throwsStateError); | 
| + | 
| +          expect(await queue2.next, 1); | 
| +          expect(await queue2.next, 2); | 
| +          expect(await queue2.next, 3); | 
| +          expect(await queue2.next, 4); | 
| +          expect(await queue2.hasNext, isFalse); | 
| +        }); | 
| + | 
| +        test("the underlying stream is only canceled once all forks are " | 
| +            "canceled", () async { | 
| +          var controller = new StreamController(); | 
| +          var queue1 = new StreamQueue<int>(controller.stream); | 
| +          var queue2 = queue1.fork(); | 
| + | 
| +          await flushMicrotasks(); | 
| +          expect(controller.hasListener, isFalse); | 
| + | 
| +          expect(queue1.next, throwsStateError); | 
| +          await flushMicrotasks(); | 
| +          expect(controller.hasListener, isTrue); | 
| + | 
| +          queue2.cancel(immediate: true); | 
| +          await flushMicrotasks(); | 
| +          expect(controller.hasListener, isTrue); | 
| + | 
| +          queue1.cancel(immediate: true); | 
| +          await flushMicrotasks(); | 
| +          expect(controller.hasListener, isFalse); | 
| +        }); | 
| +      }); | 
| +    }); | 
| + | 
| +    group("pausing:", () { | 
| +      test("the underlying stream is only implicitly paused when no forks are " | 
| +          "awaiting input", () async { | 
| +        var controller = new StreamController(); | 
| +        var queue1 = new StreamQueue<int>(controller.stream); | 
| +        var queue2 = queue1.fork(); | 
| + | 
| +        controller.add(1); | 
| +        expect(await queue1.next, 1); | 
| +        expect(controller.hasListener, isTrue); | 
| +        expect(controller.isPaused, isTrue); | 
| + | 
| +        expect(queue1.next, completion(2)); | 
| +        await flushMicrotasks(); | 
| +        expect(controller.isPaused, isFalse); | 
| + | 
| +        controller.add(2); | 
| +        await flushMicrotasks(); | 
| +        expect(controller.isPaused, isTrue); | 
| + | 
| +        expect(queue2.next, completion(1)); | 
| +        expect(queue2.next, completion(2)); | 
| +        expect(queue2.next, completion(3)); | 
| +        await flushMicrotasks(); | 
| +        expect(controller.isPaused, isFalse); | 
| + | 
| +        controller.add(3); | 
| +        await flushMicrotasks(); | 
| +        expect(controller.isPaused, isTrue); | 
| +      }); | 
| + | 
| +      test("pausing a fork doesn't pause its source", () async { | 
| +        var queue1 = new StreamQueue<int>(createStream()); | 
| +        var queue2 = queue1.fork(); | 
| + | 
| +        queue2.rest.listen(expectAsync((_) {}, count: 0)).pause(); | 
| + | 
| +        expect(await queue1.next, 1); | 
| +        expect(await queue1.next, 2); | 
| +        expect(await queue1.next, 3); | 
| +        expect(await queue1.next, 4); | 
| +        expect(await queue1.hasNext, isFalse); | 
| +      }); | 
| + | 
| +      test("pausing a source doesn't pause its fork", () async { | 
| +        var queue1 = new StreamQueue<int>(createStream()); | 
| +        var queue2 = queue1.fork(); | 
| + | 
| +        queue1.rest.listen(expectAsync((_) {}, count: 0)).pause(); | 
| + | 
| +        expect(await queue2.next, 1); | 
| +        expect(await queue2.next, 2); | 
| +        expect(await queue2.next, 3); | 
| +        expect(await queue2.next, 4); | 
| +        expect(await queue2.hasNext, isFalse); | 
| +      }); | 
| + | 
| +      test("the underlying stream is only paused when all forks are paused", | 
| +          () async { | 
| +        var controller = new StreamController(); | 
| +        var queue1 = new StreamQueue<int>(controller.stream); | 
| +        var queue2 = queue1.fork(); | 
| + | 
| +        await flushMicrotasks(); | 
| +        expect(controller.hasListener, isFalse); | 
| + | 
| +        var sub1 = queue1.rest.listen(null); | 
| +        await flushMicrotasks(); | 
| +        expect(controller.hasListener, isTrue); | 
| +        expect(controller.isPaused, isFalse); | 
| + | 
| +        sub1.pause(); | 
| +        await flushMicrotasks(); | 
| +        expect(controller.isPaused, isTrue); | 
| + | 
| +        expect(queue2.next, completion(1)); | 
| +        await flushMicrotasks(); | 
| +        expect(controller.isPaused, isFalse); | 
| + | 
| +        controller.add(1); | 
| +        await flushMicrotasks(); | 
| +        expect(controller.isPaused, isTrue); | 
| + | 
| +        var sub2 = queue2.rest.listen(null); | 
| +        await flushMicrotasks(); | 
| +        expect(controller.isPaused, isFalse); | 
| + | 
| +        sub2.pause(); | 
| +        await flushMicrotasks(); | 
| +        expect(controller.isPaused, isTrue); | 
| + | 
| +        sub1.resume(); | 
| +        await flushMicrotasks(); | 
| +        expect(controller.isPaused, isFalse); | 
| +      }); | 
| +    }); | 
| +  }); | 
| + | 
| +  test("all combinations sequential skip/next/take operations", () async { | 
| +    // Takes all combinations of two of next, skip and take, then ends with | 
| +    // doing rest. Each of the first rounds do 10 events of each type, | 
| +    // the rest does 20 elements. | 
| +    var eventCount = 20 * (3 * 3 + 1); | 
| +    var events = new StreamQueue<int>(createLongStream(eventCount)); | 
| + | 
| +    // Test expecting [startIndex .. startIndex + 9] as events using | 
| +    // `next`. | 
| +    nextTest(startIndex) { | 
| +      for (int i = 0; i < 10; i++) { | 
| +        expect(events.next, completion(startIndex + i)); | 
| +      } | 
| +    } | 
| + | 
| +    // Test expecting 10 events to be skipped. | 
| +    skipTest(startIndex) { | 
| +      expect(events.skip(10), completion(0)); | 
| +    } | 
| + | 
| +    // Test expecting [startIndex .. startIndex + 9] as events using | 
| +    // `take(10)`. | 
| +    takeTest(startIndex) { | 
| +      expect(events.take(10), | 
| +             completion(new List.generate(10, (i) => startIndex + i))); | 
| +    } | 
| +    var tests = [nextTest, skipTest, takeTest]; | 
| + | 
| +    int counter = 0; | 
| +    // Run through all pairs of two tests and run them. | 
| +    for (int i = 0; i < tests.length; i++) { | 
| +      for (int j = 0; j < tests.length; j++) { | 
| +        tests[i](counter); | 
| +        tests[j](counter + 10); | 
| +        counter += 20; | 
| +      } | 
| +    } | 
| +    // Then expect 20 more events as a `rest` call. | 
| +    expect(events.rest.toList(), | 
| +           completion(new List.generate(20, (i) => counter + i))); | 
| +  }); | 
| +} | 
| + | 
| +Stream<int> createStream() async* { | 
| +  yield 1; | 
| +  await flushMicrotasks(); | 
| +  yield 2; | 
| +  await flushMicrotasks(); | 
| +  yield 3; | 
| +  await flushMicrotasks(); | 
| +  yield 4; | 
| +} | 
| + | 
| +Stream<int> createErrorStream() { | 
| +  StreamController controller = new StreamController<int>(); | 
| +  () async { | 
| +    controller.add(1); | 
| +    await flushMicrotasks(); | 
| +    controller.add(2); | 
| +    await flushMicrotasks(); | 
| +    controller.addError("To err is divine!"); | 
| +    await flushMicrotasks(); | 
| +    controller.add(4); | 
| +    await flushMicrotasks(); | 
| +    controller.close(); | 
| +  }(); | 
| +  return controller.stream; | 
| +} | 
| + | 
| +Stream<int> createLongStream(int eventCount) async* { | 
| +  for (int i = 0; i < eventCount; i++) yield i; | 
| +} | 
| + | 
| +Future flushMicrotasks() => new Future.delayed(Duration.ZERO); | 
|  |