| Index: packages/async/test/stream_splitter_test.dart | 
| diff --git a/packages/async/test/stream_splitter_test.dart b/packages/async/test/stream_splitter_test.dart | 
| new file mode 100644 | 
| index 0000000000000000000000000000000000000000..ffd0c87afb2899a0f6decf1ac804e93dcc67e447 | 
| --- /dev/null | 
| +++ b/packages/async/test/stream_splitter_test.dart | 
| @@ -0,0 +1,286 @@ | 
| +// Copyright (c) 2015, the Dart project authors.  Please see the AUTHORS file | 
| +// for details. All rights reserved. Use of this source code is governed by a | 
| +// BSD-style license that can be found in the LICENSE file. | 
| + | 
| +import 'dart:async'; | 
| + | 
| +import 'package:async/async.dart'; | 
| +import 'package:test/test.dart'; | 
| + | 
| +main() { | 
| +  var controller; | 
| +  var splitter; | 
| +  setUp(() { | 
| +    controller = new StreamController<int>(); | 
| +    splitter = new StreamSplitter<int>(controller.stream); | 
| +  }); | 
| + | 
| +  test("a branch that's created before the stream starts to replay it", | 
| +      () async { | 
| +    var events = []; | 
| +    var branch = splitter.split(); | 
| +    splitter.close(); | 
| +    branch.listen(events.add); | 
| + | 
| +    controller.add(1); | 
| +    await flushMicrotasks(); | 
| +    expect(events, equals([1])); | 
| + | 
| +    controller.add(2); | 
| +    await flushMicrotasks(); | 
| +    expect(events, equals([1, 2])); | 
| + | 
| +    controller.add(3); | 
| +    await flushMicrotasks(); | 
| +    expect(events, equals([1, 2, 3])); | 
| + | 
| +    controller.close(); | 
| +  }); | 
| + | 
| +  test("a branch replays error events as well as data events", () { | 
| +    var branch = splitter.split(); | 
| +    splitter.close(); | 
| + | 
| +    controller.add(1); | 
| +    controller.addError("error"); | 
| +    controller.add(3); | 
| +    controller.close(); | 
| + | 
| +    var count = 0; | 
| +    branch.listen(expectAsync((value) { | 
| +      expect(count, anyOf(0, 2)); | 
| +      expect(value, equals(count + 1)); | 
| +      count++; | 
| +    }, count: 2), onError: expectAsync((error) { | 
| +      expect(count, equals(1)); | 
| +      expect(error, equals("error")); | 
| +      count++; | 
| +    }), onDone: expectAsync(() { | 
| +      expect(count, equals(3)); | 
| +    })); | 
| +  }); | 
| + | 
| +  test("a branch that's created in the middle of a stream replays it", () async { | 
| +    controller.add(1); | 
| +    controller.add(2); | 
| +    await flushMicrotasks(); | 
| + | 
| +    var branch = splitter.split(); | 
| +    splitter.close(); | 
| + | 
| +    controller.add(3); | 
| +    controller.add(4); | 
| +    controller.close(); | 
| + | 
| +    expect(branch.toList(), completion(equals([1, 2, 3, 4]))); | 
| +  }); | 
| + | 
| +  test("a branch that's created after the stream is finished replays it", | 
| +      () async { | 
| +    controller.add(1); | 
| +    controller.add(2); | 
| +    controller.add(3); | 
| +    controller.close(); | 
| +    await flushMicrotasks(); | 
| + | 
| +    expect(splitter.split().toList(), completion(equals([1, 2, 3]))); | 
| +    splitter.close(); | 
| +  }); | 
| + | 
| +  test("creates single-subscription branches", () async { | 
| +    var branch = splitter.split(); | 
| +    expect(branch.isBroadcast, isFalse); | 
| +    branch.listen(null); | 
| +    expect(() => branch.listen(null), throwsStateError); | 
| +    expect(() => branch.listen(null), throwsStateError); | 
| +  }); | 
| + | 
| +  // TODO(nweiz): Test that branches have the correct reified type once Dart | 
| +  // 1.11 is released. In 1.10, the stream exposed by a StreamController didn't | 
| +  // have a reified type. | 
| + | 
| +  test("multiple branches each replay the stream", () async { | 
| +    var branch1 = splitter.split(); | 
| +    controller.add(1); | 
| +    controller.add(2); | 
| +    await flushMicrotasks(); | 
| + | 
| +    var branch2 = splitter.split(); | 
| +    controller.add(3); | 
| +    controller.close(); | 
| +    await flushMicrotasks(); | 
| + | 
| +    var branch3 = splitter.split(); | 
| +    splitter.close(); | 
| + | 
| +    expect(branch1.toList(), completion(equals([1, 2, 3]))); | 
| +    expect(branch2.toList(), completion(equals([1, 2, 3]))); | 
| +    expect(branch3.toList(), completion(equals([1, 2, 3]))); | 
| +  }); | 
| + | 
| +  test("a branch doesn't close until the source stream closes", () async { | 
| +    var branch = splitter.split(); | 
| +    splitter.close(); | 
| + | 
| +    var closed = false; | 
| +    branch.last.then((_) => closed = true); | 
| + | 
| +    controller.add(1); | 
| +    controller.add(2); | 
| +    controller.add(3); | 
| +    await flushMicrotasks(); | 
| +    expect(closed, isFalse); | 
| + | 
| +    controller.close(); | 
| +    await flushMicrotasks(); | 
| +    expect(closed, isTrue); | 
| +  }); | 
| + | 
| +  test("the source stream isn't listened to until a branch is", () async { | 
| +    expect(controller.hasListener, isFalse); | 
| + | 
| +    var branch = splitter.split(); | 
| +    splitter.close(); | 
| +    await flushMicrotasks(); | 
| +    expect(controller.hasListener, isFalse); | 
| + | 
| +    branch.listen(null); | 
| +    await flushMicrotasks(); | 
| +    expect(controller.hasListener, isTrue); | 
| +  }); | 
| + | 
| +  test("the source stream is paused when all branches are paused", () async { | 
| +    var branch1 = splitter.split(); | 
| +    var branch2 = splitter.split(); | 
| +    var branch3 = splitter.split(); | 
| +    splitter.close(); | 
| + | 
| +    var subscription1 = branch1.listen(null); | 
| +    var subscription2 = branch2.listen(null); | 
| +    var subscription3 = branch3.listen(null); | 
| + | 
| +    subscription1.pause(); | 
| +    await flushMicrotasks(); | 
| +    expect(controller.isPaused, isFalse); | 
| + | 
| +    subscription2.pause(); | 
| +    await flushMicrotasks(); | 
| +    expect(controller.isPaused, isFalse); | 
| + | 
| +    subscription3.pause(); | 
| +    await flushMicrotasks(); | 
| +    expect(controller.isPaused, isTrue); | 
| + | 
| +    subscription2.resume(); | 
| +    await flushMicrotasks(); | 
| +    expect(controller.isPaused, isFalse); | 
| +  }); | 
| + | 
| +  test("the source stream is paused when all branches are canceled", () async { | 
| +    var branch1 = splitter.split(); | 
| +    var branch2 = splitter.split(); | 
| +    var branch3 = splitter.split(); | 
| + | 
| +    var subscription1 = branch1.listen(null); | 
| +    var subscription2 = branch2.listen(null); | 
| +    var subscription3 = branch3.listen(null); | 
| + | 
| +    subscription1.cancel(); | 
| +    await flushMicrotasks(); | 
| +    expect(controller.isPaused, isFalse); | 
| + | 
| +    subscription2.cancel(); | 
| +    await flushMicrotasks(); | 
| +    expect(controller.isPaused, isFalse); | 
| + | 
| +    subscription3.cancel(); | 
| +    await flushMicrotasks(); | 
| +    expect(controller.isPaused, isTrue); | 
| + | 
| +    var branch4 = splitter.split(); | 
| +    splitter.close(); | 
| +    await flushMicrotasks(); | 
| +    expect(controller.isPaused, isTrue); | 
| + | 
| +    branch4.listen(null); | 
| +    await flushMicrotasks(); | 
| +    expect(controller.isPaused, isFalse); | 
| +  }); | 
| + | 
| +  test("the source stream is canceled when it's closed after all branches have " | 
| +      "been canceled", () async { | 
| +    var branch1 = splitter.split(); | 
| +    var branch2 = splitter.split(); | 
| +    var branch3 = splitter.split(); | 
| + | 
| +    var subscription1 = branch1.listen(null); | 
| +    var subscription2 = branch2.listen(null); | 
| +    var subscription3 = branch3.listen(null); | 
| + | 
| +    subscription1.cancel(); | 
| +    await flushMicrotasks(); | 
| +    expect(controller.hasListener, isTrue); | 
| + | 
| +    subscription2.cancel(); | 
| +    await flushMicrotasks(); | 
| +    expect(controller.hasListener, isTrue); | 
| + | 
| +    subscription3.cancel(); | 
| +    await flushMicrotasks(); | 
| +    expect(controller.hasListener, isTrue); | 
| + | 
| +    splitter.close(); | 
| +    expect(controller.hasListener, isFalse); | 
| +  }); | 
| + | 
| +  test("the source stream is canceled when all branches are canceled after it " | 
| +      "has been closed", () async { | 
| +    var branch1 = splitter.split(); | 
| +    var branch2 = splitter.split(); | 
| +    var branch3 = splitter.split(); | 
| +    splitter.close(); | 
| + | 
| +    var subscription1 = branch1.listen(null); | 
| +    var subscription2 = branch2.listen(null); | 
| +    var subscription3 = branch3.listen(null); | 
| + | 
| +    subscription1.cancel(); | 
| +    await flushMicrotasks(); | 
| +    expect(controller.hasListener, isTrue); | 
| + | 
| +    subscription2.cancel(); | 
| +    await flushMicrotasks(); | 
| +    expect(controller.hasListener, isTrue); | 
| + | 
| +    subscription3.cancel(); | 
| +    await flushMicrotasks(); | 
| +    expect(controller.hasListener, isFalse); | 
| +  }); | 
| + | 
| +  test("a splitter that's closed before any branches are added never listens " | 
| +      "to the source stream", () { | 
| +    splitter.close(); | 
| + | 
| +    // This would throw an error if the stream had already been listened to. | 
| +    controller.stream.listen(null); | 
| +  }); | 
| + | 
| +  test("splitFrom splits a source stream into the designated number of " | 
| +      "branches", () { | 
| +    var branches = StreamSplitter.splitFrom(controller.stream, 5); | 
| + | 
| +    controller.add(1); | 
| +    controller.add(2); | 
| +    controller.add(3); | 
| +    controller.close(); | 
| + | 
| +    expect(branches[0].toList(), completion(equals([1, 2, 3]))); | 
| +    expect(branches[1].toList(), completion(equals([1, 2, 3]))); | 
| +    expect(branches[2].toList(), completion(equals([1, 2, 3]))); | 
| +    expect(branches[3].toList(), completion(equals([1, 2, 3]))); | 
| +    expect(branches[4].toList(), completion(equals([1, 2, 3]))); | 
| +  }); | 
| +} | 
| + | 
| +/// Wait for all microtasks to complete. | 
| +Future flushMicrotasks() => new Future.delayed(Duration.ZERO); | 
|  |