OLD | NEW |
(Empty) | |
| 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. |
| 4 |
| 5 import 'dart:async'; |
| 6 |
| 7 import 'package:async/async.dart'; |
| 8 import 'package:test/test.dart'; |
| 9 |
| 10 main() { |
| 11 var controller; |
| 12 var splitter; |
| 13 setUp(() { |
| 14 controller = new StreamController<int>(); |
| 15 splitter = new StreamSplitter<int>(controller.stream); |
| 16 }); |
| 17 |
| 18 test("a branch that's created before the stream starts to replay it", |
| 19 () async { |
| 20 var events = []; |
| 21 var branch = splitter.split(); |
| 22 splitter.close(); |
| 23 branch.listen(events.add); |
| 24 |
| 25 controller.add(1); |
| 26 await flushMicrotasks(); |
| 27 expect(events, equals([1])); |
| 28 |
| 29 controller.add(2); |
| 30 await flushMicrotasks(); |
| 31 expect(events, equals([1, 2])); |
| 32 |
| 33 controller.add(3); |
| 34 await flushMicrotasks(); |
| 35 expect(events, equals([1, 2, 3])); |
| 36 |
| 37 controller.close(); |
| 38 }); |
| 39 |
| 40 test("a branch replays error events as well as data events", () { |
| 41 var branch = splitter.split(); |
| 42 splitter.close(); |
| 43 |
| 44 controller.add(1); |
| 45 controller.addError("error"); |
| 46 controller.add(3); |
| 47 controller.close(); |
| 48 |
| 49 var count = 0; |
| 50 branch.listen(expectAsync((value) { |
| 51 expect(count, anyOf(0, 2)); |
| 52 expect(value, equals(count + 1)); |
| 53 count++; |
| 54 }, count: 2), onError: expectAsync((error) { |
| 55 expect(count, equals(1)); |
| 56 expect(error, equals("error")); |
| 57 count++; |
| 58 }), onDone: expectAsync(() { |
| 59 expect(count, equals(3)); |
| 60 })); |
| 61 }); |
| 62 |
| 63 test("a branch that's created in the middle of a stream replays it", () async
{ |
| 64 controller.add(1); |
| 65 controller.add(2); |
| 66 await flushMicrotasks(); |
| 67 |
| 68 var branch = splitter.split(); |
| 69 splitter.close(); |
| 70 |
| 71 controller.add(3); |
| 72 controller.add(4); |
| 73 controller.close(); |
| 74 |
| 75 expect(branch.toList(), completion(equals([1, 2, 3, 4]))); |
| 76 }); |
| 77 |
| 78 test("a branch that's created after the stream is finished replays it", |
| 79 () async { |
| 80 controller.add(1); |
| 81 controller.add(2); |
| 82 controller.add(3); |
| 83 controller.close(); |
| 84 await flushMicrotasks(); |
| 85 |
| 86 expect(splitter.split().toList(), completion(equals([1, 2, 3]))); |
| 87 splitter.close(); |
| 88 }); |
| 89 |
| 90 test("creates single-subscription branches", () async { |
| 91 var branch = splitter.split(); |
| 92 expect(branch.isBroadcast, isFalse); |
| 93 branch.listen(null); |
| 94 expect(() => branch.listen(null), throwsStateError); |
| 95 expect(() => branch.listen(null), throwsStateError); |
| 96 }); |
| 97 |
| 98 // TODO(nweiz): Test that branches have the correct reified type once Dart |
| 99 // 1.11 is released. In 1.10, the stream exposed by a StreamController didn't |
| 100 // have a reified type. |
| 101 |
| 102 test("multiple branches each replay the stream", () async { |
| 103 var branch1 = splitter.split(); |
| 104 controller.add(1); |
| 105 controller.add(2); |
| 106 await flushMicrotasks(); |
| 107 |
| 108 var branch2 = splitter.split(); |
| 109 controller.add(3); |
| 110 controller.close(); |
| 111 await flushMicrotasks(); |
| 112 |
| 113 var branch3 = splitter.split(); |
| 114 splitter.close(); |
| 115 |
| 116 expect(branch1.toList(), completion(equals([1, 2, 3]))); |
| 117 expect(branch2.toList(), completion(equals([1, 2, 3]))); |
| 118 expect(branch3.toList(), completion(equals([1, 2, 3]))); |
| 119 }); |
| 120 |
| 121 test("a branch doesn't close until the source stream closes", () async { |
| 122 var branch = splitter.split(); |
| 123 splitter.close(); |
| 124 |
| 125 var closed = false; |
| 126 branch.last.then((_) => closed = true); |
| 127 |
| 128 controller.add(1); |
| 129 controller.add(2); |
| 130 controller.add(3); |
| 131 await flushMicrotasks(); |
| 132 expect(closed, isFalse); |
| 133 |
| 134 controller.close(); |
| 135 await flushMicrotasks(); |
| 136 expect(closed, isTrue); |
| 137 }); |
| 138 |
| 139 test("the source stream isn't listened to until a branch is", () async { |
| 140 expect(controller.hasListener, isFalse); |
| 141 |
| 142 var branch = splitter.split(); |
| 143 splitter.close(); |
| 144 await flushMicrotasks(); |
| 145 expect(controller.hasListener, isFalse); |
| 146 |
| 147 branch.listen(null); |
| 148 await flushMicrotasks(); |
| 149 expect(controller.hasListener, isTrue); |
| 150 }); |
| 151 |
| 152 test("the source stream is paused when all branches are paused", () async { |
| 153 var branch1 = splitter.split(); |
| 154 var branch2 = splitter.split(); |
| 155 var branch3 = splitter.split(); |
| 156 splitter.close(); |
| 157 |
| 158 var subscription1 = branch1.listen(null); |
| 159 var subscription2 = branch2.listen(null); |
| 160 var subscription3 = branch3.listen(null); |
| 161 |
| 162 subscription1.pause(); |
| 163 await flushMicrotasks(); |
| 164 expect(controller.isPaused, isFalse); |
| 165 |
| 166 subscription2.pause(); |
| 167 await flushMicrotasks(); |
| 168 expect(controller.isPaused, isFalse); |
| 169 |
| 170 subscription3.pause(); |
| 171 await flushMicrotasks(); |
| 172 expect(controller.isPaused, isTrue); |
| 173 |
| 174 subscription2.resume(); |
| 175 await flushMicrotasks(); |
| 176 expect(controller.isPaused, isFalse); |
| 177 }); |
| 178 |
| 179 test("the source stream is paused when all branches are canceled", () async { |
| 180 var branch1 = splitter.split(); |
| 181 var branch2 = splitter.split(); |
| 182 var branch3 = splitter.split(); |
| 183 |
| 184 var subscription1 = branch1.listen(null); |
| 185 var subscription2 = branch2.listen(null); |
| 186 var subscription3 = branch3.listen(null); |
| 187 |
| 188 subscription1.cancel(); |
| 189 await flushMicrotasks(); |
| 190 expect(controller.isPaused, isFalse); |
| 191 |
| 192 subscription2.cancel(); |
| 193 await flushMicrotasks(); |
| 194 expect(controller.isPaused, isFalse); |
| 195 |
| 196 subscription3.cancel(); |
| 197 await flushMicrotasks(); |
| 198 expect(controller.isPaused, isTrue); |
| 199 |
| 200 var branch4 = splitter.split(); |
| 201 splitter.close(); |
| 202 await flushMicrotasks(); |
| 203 expect(controller.isPaused, isTrue); |
| 204 |
| 205 branch4.listen(null); |
| 206 await flushMicrotasks(); |
| 207 expect(controller.isPaused, isFalse); |
| 208 }); |
| 209 |
| 210 test("the source stream is canceled when it's closed after all branches have " |
| 211 "been canceled", () async { |
| 212 var branch1 = splitter.split(); |
| 213 var branch2 = splitter.split(); |
| 214 var branch3 = splitter.split(); |
| 215 |
| 216 var subscription1 = branch1.listen(null); |
| 217 var subscription2 = branch2.listen(null); |
| 218 var subscription3 = branch3.listen(null); |
| 219 |
| 220 subscription1.cancel(); |
| 221 await flushMicrotasks(); |
| 222 expect(controller.hasListener, isTrue); |
| 223 |
| 224 subscription2.cancel(); |
| 225 await flushMicrotasks(); |
| 226 expect(controller.hasListener, isTrue); |
| 227 |
| 228 subscription3.cancel(); |
| 229 await flushMicrotasks(); |
| 230 expect(controller.hasListener, isTrue); |
| 231 |
| 232 splitter.close(); |
| 233 expect(controller.hasListener, isFalse); |
| 234 }); |
| 235 |
| 236 test("the source stream is canceled when all branches are canceled after it " |
| 237 "has been closed", () async { |
| 238 var branch1 = splitter.split(); |
| 239 var branch2 = splitter.split(); |
| 240 var branch3 = splitter.split(); |
| 241 splitter.close(); |
| 242 |
| 243 var subscription1 = branch1.listen(null); |
| 244 var subscription2 = branch2.listen(null); |
| 245 var subscription3 = branch3.listen(null); |
| 246 |
| 247 subscription1.cancel(); |
| 248 await flushMicrotasks(); |
| 249 expect(controller.hasListener, isTrue); |
| 250 |
| 251 subscription2.cancel(); |
| 252 await flushMicrotasks(); |
| 253 expect(controller.hasListener, isTrue); |
| 254 |
| 255 subscription3.cancel(); |
| 256 await flushMicrotasks(); |
| 257 expect(controller.hasListener, isFalse); |
| 258 }); |
| 259 |
| 260 test("a splitter that's closed before any branches are added never listens " |
| 261 "to the source stream", () { |
| 262 splitter.close(); |
| 263 |
| 264 // This would throw an error if the stream had already been listened to. |
| 265 controller.stream.listen(null); |
| 266 }); |
| 267 |
| 268 test("splitFrom splits a source stream into the designated number of " |
| 269 "branches", () { |
| 270 var branches = StreamSplitter.splitFrom(controller.stream, 5); |
| 271 |
| 272 controller.add(1); |
| 273 controller.add(2); |
| 274 controller.add(3); |
| 275 controller.close(); |
| 276 |
| 277 expect(branches[0].toList(), completion(equals([1, 2, 3]))); |
| 278 expect(branches[1].toList(), completion(equals([1, 2, 3]))); |
| 279 expect(branches[2].toList(), completion(equals([1, 2, 3]))); |
| 280 expect(branches[3].toList(), completion(equals([1, 2, 3]))); |
| 281 expect(branches[4].toList(), completion(equals([1, 2, 3]))); |
| 282 }); |
| 283 } |
| 284 |
| 285 /// Wait for all microtasks to complete. |
| 286 Future flushMicrotasks() => new Future.delayed(Duration.ZERO); |
OLD | NEW |