OLD | NEW |
(Empty) | |
| 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. |
| 4 |
| 5 import "dart:async"; |
| 6 |
| 7 import "package:async/async.dart" show StreamCompleter; |
| 8 import "package:test/test.dart"; |
| 9 |
| 10 import "utils.dart"; |
| 11 |
| 12 main() { |
| 13 test("a stream is linked before listening", () async { |
| 14 var completer = new StreamCompleter(); |
| 15 completer.setSourceStream(createStream()); |
| 16 expect(completer.stream.toList(), completion([1, 2, 3, 4])); |
| 17 }); |
| 18 |
| 19 test("listened to before a stream is linked", () async { |
| 20 var completer = new StreamCompleter(); |
| 21 var done = completer.stream.toList(); |
| 22 await flushMicrotasks(); |
| 23 completer.setSourceStream(createStream()); |
| 24 expect(done, completion([1, 2, 3, 4])); |
| 25 }); |
| 26 |
| 27 test("cancel before linking a stream doesn't listen on stream", () async { |
| 28 var completer = new StreamCompleter(); |
| 29 var subscription = completer.stream.listen(null); |
| 30 subscription.pause(); // Should be ignored. |
| 31 subscription.cancel(); |
| 32 completer.setSourceStream(new UnusableStream()); // Doesn't throw. |
| 33 }); |
| 34 |
| 35 test("listen and pause before linking stream", () async { |
| 36 var controller = new StreamCompleter(); |
| 37 var events = []; |
| 38 var subscription = controller.stream.listen(events.add); |
| 39 var done = subscription.asFuture(); |
| 40 subscription.pause(); |
| 41 var sourceController = new StreamController(); |
| 42 sourceController..add(1)..add(2)..add(3)..add(4); |
| 43 controller.setSourceStream(sourceController.stream); |
| 44 await flushMicrotasks(); |
| 45 expect(sourceController.hasListener, isTrue); |
| 46 expect(sourceController.isPaused, isTrue); |
| 47 expect(events, []); |
| 48 subscription.resume(); |
| 49 await flushMicrotasks(); |
| 50 expect(sourceController.hasListener, isTrue); |
| 51 expect(sourceController.isPaused, isFalse); |
| 52 expect(events, [1, 2, 3, 4]); |
| 53 sourceController.close(); |
| 54 await done; |
| 55 expect(events, [1, 2, 3, 4]); |
| 56 }); |
| 57 |
| 58 test("pause more than once", () async { |
| 59 var completer = new StreamCompleter(); |
| 60 var events = []; |
| 61 var subscription = completer.stream.listen(events.add); |
| 62 var done = subscription.asFuture(); |
| 63 subscription.pause(); |
| 64 subscription.pause(); |
| 65 subscription.pause(); |
| 66 completer.setSourceStream(createStream()); |
| 67 for (int i = 0; i < 3; i++) { |
| 68 await flushMicrotasks(); |
| 69 expect(events, []); |
| 70 subscription.resume(); |
| 71 } |
| 72 await done; |
| 73 expect(events, [1, 2, 3, 4]); |
| 74 }); |
| 75 |
| 76 test("cancel new stream before source is done", () async { |
| 77 var completer = new StreamCompleter(); |
| 78 var listened = false; |
| 79 var lastEvent = -1; |
| 80 var controller = new StreamController(); |
| 81 var subscription; |
| 82 subscription = completer.stream.listen( |
| 83 (value) { |
| 84 expect(value, lessThan(3)); |
| 85 lastEvent = value; |
| 86 if (value == 2) { |
| 87 subscription.cancel(); |
| 88 } |
| 89 }, |
| 90 onError: unreachable("error"), |
| 91 onDone: unreachable("done"), |
| 92 cancelOnError: true); |
| 93 completer.setSourceStream(controller.stream); |
| 94 expect(controller.hasListener, isTrue); |
| 95 |
| 96 await flushMicrotasks(); |
| 97 expect(controller.hasListener, isTrue); |
| 98 controller.add(1); |
| 99 |
| 100 await flushMicrotasks(); |
| 101 expect(lastEvent, 1); |
| 102 expect(controller.hasListener, isTrue); |
| 103 controller.add(2); |
| 104 |
| 105 await flushMicrotasks(); |
| 106 expect(lastEvent, 2); |
| 107 expect(controller.hasListener, isFalse); |
| 108 }); |
| 109 |
| 110 test("complete with setEmpty before listening", () async { |
| 111 var completer = new StreamCompleter(); |
| 112 completer.setEmpty(); |
| 113 var done = new Completer(); |
| 114 completer.stream.listen( |
| 115 unreachable("data"), |
| 116 onError: unreachable("error"), |
| 117 onDone: done.complete); |
| 118 await done.future; |
| 119 }); |
| 120 |
| 121 test("complete with setEmpty after listening", () async { |
| 122 var completer = new StreamCompleter(); |
| 123 var done = new Completer(); |
| 124 completer.stream.listen( |
| 125 unreachable("data"), |
| 126 onError: unreachable("error"), |
| 127 onDone: done.complete); |
| 128 completer.setEmpty(); |
| 129 await done.future; |
| 130 }); |
| 131 |
| 132 test("source stream isn't listened to until completer stream is", () async { |
| 133 var completer = new StreamCompleter(); |
| 134 var controller; |
| 135 controller = new StreamController(onListen: () { |
| 136 scheduleMicrotask(controller.close); |
| 137 }); |
| 138 |
| 139 completer.setSourceStream(controller.stream); |
| 140 await flushMicrotasks(); |
| 141 expect(controller.hasListener, isFalse); |
| 142 var subscription = completer.stream.listen(null); |
| 143 expect(controller.hasListener, isTrue); |
| 144 await subscription.asFuture(); |
| 145 }); |
| 146 |
| 147 test("cancelOnError true when listening before linking stream", () async { |
| 148 var completer = new StreamCompleter(); |
| 149 var listened = false; |
| 150 var canceled = false; |
| 151 var lastEvent = -1; |
| 152 var controller = new StreamController(); |
| 153 var subscription = completer.stream.listen( |
| 154 (value) { |
| 155 expect(value, lessThan(3)); |
| 156 lastEvent = value; |
| 157 }, |
| 158 onError: (value) { |
| 159 expect(value, "3"); |
| 160 lastEvent = value; |
| 161 }, |
| 162 onDone: unreachable("done"), |
| 163 cancelOnError: true); |
| 164 completer.setSourceStream(controller.stream); |
| 165 expect(controller.hasListener, isTrue); |
| 166 |
| 167 await flushMicrotasks(); |
| 168 expect(controller.hasListener, isTrue); |
| 169 controller.add(1); |
| 170 |
| 171 await flushMicrotasks(); |
| 172 expect(lastEvent, 1); |
| 173 expect(controller.hasListener, isTrue); |
| 174 controller.add(2); |
| 175 |
| 176 await flushMicrotasks(); |
| 177 expect(lastEvent, 2); |
| 178 expect(controller.hasListener, isTrue); |
| 179 controller.addError("3"); |
| 180 |
| 181 await flushMicrotasks(); |
| 182 expect(lastEvent, "3"); |
| 183 expect(controller.hasListener, isFalse); |
| 184 }); |
| 185 |
| 186 test("cancelOnError true when listening after linking stream", () async { |
| 187 var completer = new StreamCompleter(); |
| 188 var lastEvent = -1; |
| 189 var controller = new StreamController(); |
| 190 completer.setSourceStream(controller.stream); |
| 191 controller.add(1); |
| 192 expect(controller.hasListener, isFalse); |
| 193 |
| 194 var subscription = completer.stream.listen( |
| 195 (value) { |
| 196 expect(value, lessThan(3)); |
| 197 lastEvent = value; |
| 198 }, |
| 199 onError: (value) { |
| 200 expect(value, "3"); |
| 201 lastEvent = value; |
| 202 }, |
| 203 onDone: unreachable("done"), |
| 204 cancelOnError: true); |
| 205 |
| 206 expect(controller.hasListener, isTrue); |
| 207 |
| 208 await flushMicrotasks(); |
| 209 expect(lastEvent, 1); |
| 210 expect(controller.hasListener, isTrue); |
| 211 controller.add(2); |
| 212 |
| 213 await flushMicrotasks(); |
| 214 expect(lastEvent, 2); |
| 215 expect(controller.hasListener, isTrue); |
| 216 controller.addError("3"); |
| 217 |
| 218 await flushMicrotasks(); |
| 219 expect(controller.hasListener, isFalse); |
| 220 }); |
| 221 |
| 222 test("linking a stream after setSourceStream before listen", () async { |
| 223 var completer = new StreamCompleter(); |
| 224 completer.setSourceStream(createStream()); |
| 225 expect(() => completer.setSourceStream(createStream()), throws); |
| 226 expect(() => completer.setEmpty(createStream()), throws); |
| 227 await completer.stream.toList(); |
| 228 // Still fails after source is done |
| 229 expect(() => completer.setSourceStream(createStream()), throws); |
| 230 expect(() => completer.setEmpty(createStream()), throws); |
| 231 }); |
| 232 |
| 233 test("linking a stream after setSourceStream after listen", () async { |
| 234 var completer = new StreamCompleter(); |
| 235 var list = completer.stream.toList(); |
| 236 completer.setSourceStream(createStream()); |
| 237 expect(() => completer.setSoureStream(createStream()), throws); |
| 238 expect(() => completer.stEmpty(createStream()), throws); |
| 239 await list; |
| 240 // Still fails after source is done. |
| 241 expect(() => completer.setSoureStream(createStream()), throws); |
| 242 expect(() => completer.stEmpty(createStream()), throws); |
| 243 }); |
| 244 |
| 245 test("linking a stream after setEmpty before listen", () async { |
| 246 var completer = new StreamCompleter(); |
| 247 completer.setEmpty(); |
| 248 expect(() => completer.setSoureStream(createStream()), throws); |
| 249 expect(() => completer.stEmpty(createStream()), throws); |
| 250 await completer.stream.toList(); |
| 251 // Still fails after source is done |
| 252 expect(() => completer.setSoureStream(createStream()), throws); |
| 253 expect(() => completer.stEmpty(createStream()), throws); |
| 254 }); |
| 255 |
| 256 test("linking a stream after setEmpty() after listen", () async { |
| 257 var completer = new StreamCompleter(); |
| 258 var list = completer.stream.toList(); |
| 259 completer.setEmpty(); |
| 260 expect(() => completer.setSoureStream(createStream()), throws); |
| 261 expect(() => completer.stEmpty(createStream()), throws); |
| 262 await list; |
| 263 // Still fails after source is done. |
| 264 expect(() => completer.setSoureStream(createStream()), throws); |
| 265 expect(() => completer.stEmpty(createStream()), throws); |
| 266 }); |
| 267 |
| 268 test("listening more than once after setting stream", () async { |
| 269 var completer = new StreamCompleter(); |
| 270 completer.setSourceStream(createStream()); |
| 271 var list = completer.stream.toList(); |
| 272 expect(() => completer.stream.oList(), throws); |
| 273 await list; |
| 274 expect(() => completer.stream.oList(), throws); |
| 275 }); |
| 276 |
| 277 test("listening more than once before setting stream", () async { |
| 278 var completer = new StreamCompleter(); |
| 279 var list = completer.stream.toList(); |
| 280 expect(() => completer.stream.oList(), throws); |
| 281 }); |
| 282 |
| 283 test("setting onData etc. before and after setting stream", () async { |
| 284 var completer = new StreamCompleter(); |
| 285 var controller = new StreamController(); |
| 286 var subscription = completer.stream.listen(null); |
| 287 var lastEvent = 0; |
| 288 subscription.onData((value) => lastEvent = value); |
| 289 subscription.onError((value) => lastEvent = "$value"); |
| 290 subscription.onDone(() => lastEvent = -1); |
| 291 completer.setSourceStream(controller.stream); |
| 292 await flushMicrotasks(); |
| 293 controller.add(1); |
| 294 await flushMicrotasks(); |
| 295 expect(lastEvent, 1); |
| 296 controller.addError(2); |
| 297 await flushMicrotasks(); |
| 298 expect(lastEvent, "2"); |
| 299 subscription.onData((value) => lastEvent = -value); |
| 300 subscription.onError((value) => lastEvent = "${-value}"); |
| 301 controller.add(1); |
| 302 await flushMicrotasks(); |
| 303 expect(lastEvent, -1); |
| 304 controller.addError(2); |
| 305 await flushMicrotasks(); |
| 306 expect(lastEvent, "-2"); |
| 307 controller.close(); |
| 308 await flushMicrotasks(); |
| 309 expect(lastEvent, -1); |
| 310 }); |
| 311 |
| 312 test("pause w/ resume future accross setting stream", () async { |
| 313 var completer = new StreamCompleter(); |
| 314 var resume = new Completer(); |
| 315 var subscription = completer.stream.listen(unreachable("data")); |
| 316 var lastEvent = 0; |
| 317 subscription.pause(resume.future); |
| 318 await flushMicrotasks(); |
| 319 completer.setSourceStream(createStream()); |
| 320 await flushMicrotasks(); |
| 321 resume.complete(); |
| 322 var events = []; |
| 323 subscription.onData(events.add); |
| 324 await subscription.asFuture(); |
| 325 expect(events, [1, 2, 3, 4]); |
| 326 }); |
| 327 |
| 328 test("asFuture with error accross setting stream", () async { |
| 329 var completer = new StreamCompleter(); |
| 330 var controller = new StreamController(); |
| 331 var subscription = completer.stream.listen(unreachable("data"), |
| 332 cancelOnError: false); |
| 333 var done = subscription.asFuture(); |
| 334 expect(controller.hasListener, isFalse); |
| 335 completer.setSourceStream(controller.stream); |
| 336 await flushMicrotasks(); |
| 337 expect(controller.hasListener, isTrue); |
| 338 controller.addError(42); |
| 339 await done.then(unreachable("data"), onError: (error) { |
| 340 expect(error, 42); |
| 341 }); |
| 342 expect(controller.hasListener, isFalse); |
| 343 }); |
| 344 } |
| 345 |
| 346 Stream<int> createStream() async* { |
| 347 yield 1; |
| 348 await flushMicrotasks(); |
| 349 yield 2; |
| 350 await flushMicrotasks(); |
| 351 yield 3; |
| 352 await flushMicrotasks(); |
| 353 yield 4; |
| 354 } |
| 355 |
| 356 /// A zero-millisecond timer should wait until after all microtasks. |
| 357 Future flushMicrotasks() => new Future.delayed(Duration.ZERO); |
| 358 |
| 359 /// A generic unreachable callback function. |
| 360 /// |
| 361 /// Returns a function that fails the test if it is ever called. |
| 362 unreachable(String name) => ([a, b]) => fail("Unreachable: $name"); |
| 363 |
| 364 /// A badly behaved stream which throws if it's ever listened to. |
| 365 /// |
| 366 /// Can be used to test cases where a stream should not be used. |
| 367 class UnusableStream extends Stream { |
| 368 listen(onData, {onError, onDone, cancelOnError}) { |
| 369 throw new UnimplementedError("Gotcha!"); |
| 370 } |
| 371 } |
OLD | NEW |