| OLD | NEW |
| 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 import "dart:async"; | 5 import "dart:async"; |
| 6 | 6 |
| 7 import "package:async/async.dart" show StreamCompleter; | 7 import "package:async/async.dart" show StreamCompleter; |
| 8 import "package:test/test.dart"; | 8 import "package:test/test.dart"; |
| 9 | 9 |
| 10 import "utils.dart"; | 10 import "utils.dart"; |
| (...skipping 57 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 68 await flushMicrotasks(); | 68 await flushMicrotasks(); |
| 69 expect(events, []); | 69 expect(events, []); |
| 70 subscription.resume(); | 70 subscription.resume(); |
| 71 } | 71 } |
| 72 await done; | 72 await done; |
| 73 expect(events, [1, 2, 3, 4]); | 73 expect(events, [1, 2, 3, 4]); |
| 74 }); | 74 }); |
| 75 | 75 |
| 76 test("cancel new stream before source is done", () async { | 76 test("cancel new stream before source is done", () async { |
| 77 var completer = new StreamCompleter(); | 77 var completer = new StreamCompleter(); |
| 78 var listened = false; | |
| 79 var lastEvent = -1; | 78 var lastEvent = -1; |
| 80 var controller = new StreamController(); | 79 var controller = new StreamController(); |
| 81 var subscription; | 80 var subscription; |
| 82 subscription = completer.stream.listen( | 81 subscription = completer.stream.listen( |
| 83 (value) { | 82 (value) { |
| 84 expect(value, lessThan(3)); | 83 expect(value, lessThan(3)); |
| 85 lastEvent = value; | 84 lastEvent = value; |
| 86 if (value == 2) { | 85 if (value == 2) { |
| 87 subscription.cancel(); | 86 subscription.cancel(); |
| 88 } | 87 } |
| (...skipping 50 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 139 completer.setSourceStream(controller.stream); | 138 completer.setSourceStream(controller.stream); |
| 140 await flushMicrotasks(); | 139 await flushMicrotasks(); |
| 141 expect(controller.hasListener, isFalse); | 140 expect(controller.hasListener, isFalse); |
| 142 var subscription = completer.stream.listen(null); | 141 var subscription = completer.stream.listen(null); |
| 143 expect(controller.hasListener, isTrue); | 142 expect(controller.hasListener, isTrue); |
| 144 await subscription.asFuture(); | 143 await subscription.asFuture(); |
| 145 }); | 144 }); |
| 146 | 145 |
| 147 test("cancelOnError true when listening before linking stream", () async { | 146 test("cancelOnError true when listening before linking stream", () async { |
| 148 var completer = new StreamCompleter(); | 147 var completer = new StreamCompleter(); |
| 149 var listened = false; | |
| 150 var canceled = false; | |
| 151 var lastEvent = -1; | 148 var lastEvent = -1; |
| 152 var controller = new StreamController(); | 149 var controller = new StreamController(); |
| 153 var subscription = completer.stream.listen( | 150 completer.stream.listen( |
| 154 (value) { | 151 (value) { |
| 155 expect(value, lessThan(3)); | 152 expect(value, lessThan(3)); |
| 156 lastEvent = value; | 153 lastEvent = value; |
| 157 }, | 154 }, |
| 158 onError: (value) { | 155 onError: (value) { |
| 159 expect(value, "3"); | 156 expect(value, "3"); |
| 160 lastEvent = value; | 157 lastEvent = value; |
| 161 }, | 158 }, |
| 162 onDone: unreachable("done"), | 159 onDone: unreachable("done"), |
| 163 cancelOnError: true); | 160 cancelOnError: true); |
| (...skipping 20 matching lines...) Expand all Loading... |
| 184 }); | 181 }); |
| 185 | 182 |
| 186 test("cancelOnError true when listening after linking stream", () async { | 183 test("cancelOnError true when listening after linking stream", () async { |
| 187 var completer = new StreamCompleter(); | 184 var completer = new StreamCompleter(); |
| 188 var lastEvent = -1; | 185 var lastEvent = -1; |
| 189 var controller = new StreamController(); | 186 var controller = new StreamController(); |
| 190 completer.setSourceStream(controller.stream); | 187 completer.setSourceStream(controller.stream); |
| 191 controller.add(1); | 188 controller.add(1); |
| 192 expect(controller.hasListener, isFalse); | 189 expect(controller.hasListener, isFalse); |
| 193 | 190 |
| 194 var subscription = completer.stream.listen( | 191 completer.stream.listen( |
| 195 (value) { | 192 (value) { |
| 196 expect(value, lessThan(3)); | 193 expect(value, lessThan(3)); |
| 197 lastEvent = value; | 194 lastEvent = value; |
| 198 }, | 195 }, |
| 199 onError: (value) { | 196 onError: (value) { |
| 200 expect(value, "3"); | 197 expect(value, "3"); |
| 201 lastEvent = value; | 198 lastEvent = value; |
| 202 }, | 199 }, |
| 203 onDone: unreachable("done"), | 200 onDone: unreachable("done"), |
| 204 cancelOnError: true); | 201 cancelOnError: true); |
| (...skipping 10 matching lines...) Expand all Loading... |
| 215 expect(controller.hasListener, isTrue); | 212 expect(controller.hasListener, isTrue); |
| 216 controller.addError("3"); | 213 controller.addError("3"); |
| 217 | 214 |
| 218 await flushMicrotasks(); | 215 await flushMicrotasks(); |
| 219 expect(controller.hasListener, isFalse); | 216 expect(controller.hasListener, isFalse); |
| 220 }); | 217 }); |
| 221 | 218 |
| 222 test("linking a stream after setSourceStream before listen", () async { | 219 test("linking a stream after setSourceStream before listen", () async { |
| 223 var completer = new StreamCompleter(); | 220 var completer = new StreamCompleter(); |
| 224 completer.setSourceStream(createStream()); | 221 completer.setSourceStream(createStream()); |
| 225 expect(() => completer.setSourceStream(createStream()), throws); | 222 expect(() => completer.setSourceStream(createStream()), throwsStateError); |
| 226 expect(() => completer.setEmpty(createStream()), throws); | 223 expect(() => completer.setEmpty(), throwsStateError); |
| 227 await completer.stream.toList(); | 224 await completer.stream.toList(); |
| 228 // Still fails after source is done | 225 // Still fails after source is done |
| 229 expect(() => completer.setSourceStream(createStream()), throws); | 226 expect(() => completer.setSourceStream(createStream()), throwsStateError); |
| 230 expect(() => completer.setEmpty(createStream()), throws); | 227 expect(() => completer.setEmpty(), throwsStateError); |
| 231 }); | 228 }); |
| 232 | 229 |
| 233 test("linking a stream after setSourceStream after listen", () async { | 230 test("linking a stream after setSourceStream after listen", () async { |
| 234 var completer = new StreamCompleter(); | 231 var completer = new StreamCompleter(); |
| 235 var list = completer.stream.toList(); | 232 var list = completer.stream.toList(); |
| 236 completer.setSourceStream(createStream()); | 233 completer.setSourceStream(createStream()); |
| 237 expect(() => completer.setSoureStream(createStream()), throws); | 234 expect(() => completer.setSourceStream(createStream()), throwsStateError); |
| 238 expect(() => completer.stEmpty(createStream()), throws); | 235 expect(() => completer.setEmpty(), throwsStateError); |
| 239 await list; | 236 await list; |
| 240 // Still fails after source is done. | 237 // Still fails after source is done. |
| 241 expect(() => completer.setSoureStream(createStream()), throws); | 238 expect(() => completer.setSourceStream(createStream()), throwsStateError); |
| 242 expect(() => completer.stEmpty(createStream()), throws); | 239 expect(() => completer.setEmpty(), throwsStateError); |
| 243 }); | 240 }); |
| 244 | 241 |
| 245 test("linking a stream after setEmpty before listen", () async { | 242 test("linking a stream after setEmpty before listen", () async { |
| 246 var completer = new StreamCompleter(); | 243 var completer = new StreamCompleter(); |
| 247 completer.setEmpty(); | 244 completer.setEmpty(); |
| 248 expect(() => completer.setSoureStream(createStream()), throws); | 245 expect(() => completer.setSourceStream(createStream()), throwsStateError); |
| 249 expect(() => completer.stEmpty(createStream()), throws); | 246 expect(() => completer.setEmpty(), throwsStateError); |
| 250 await completer.stream.toList(); | 247 await completer.stream.toList(); |
| 251 // Still fails after source is done | 248 // Still fails after source is done |
| 252 expect(() => completer.setSoureStream(createStream()), throws); | 249 expect(() => completer.setSourceStream(createStream()), throwsStateError); |
| 253 expect(() => completer.stEmpty(createStream()), throws); | 250 expect(() => completer.setEmpty(), throwsStateError); |
| 254 }); | 251 }); |
| 255 | 252 |
| 256 test("linking a stream after setEmpty() after listen", () async { | 253 test("linking a stream after setEmpty() after listen", () async { |
| 257 var completer = new StreamCompleter(); | 254 var completer = new StreamCompleter(); |
| 258 var list = completer.stream.toList(); | 255 var list = completer.stream.toList(); |
| 259 completer.setEmpty(); | 256 completer.setEmpty(); |
| 260 expect(() => completer.setSoureStream(createStream()), throws); | 257 expect(() => completer.setSourceStream(createStream()), throwsStateError); |
| 261 expect(() => completer.stEmpty(createStream()), throws); | 258 expect(() => completer.setEmpty(), throwsStateError); |
| 262 await list; | 259 await list; |
| 263 // Still fails after source is done. | 260 // Still fails after source is done. |
| 264 expect(() => completer.setSoureStream(createStream()), throws); | 261 expect(() => completer.setSourceStream(createStream()), throwsStateError); |
| 265 expect(() => completer.stEmpty(createStream()), throws); | 262 expect(() => completer.setEmpty(), throwsStateError); |
| 266 }); | 263 }); |
| 267 | 264 |
| 268 test("listening more than once after setting stream", () async { | 265 test("listening more than once after setting stream", () async { |
| 269 var completer = new StreamCompleter(); | 266 var completer = new StreamCompleter(); |
| 270 completer.setSourceStream(createStream()); | 267 completer.setSourceStream(createStream()); |
| 271 var list = completer.stream.toList(); | 268 var list = completer.stream.toList(); |
| 272 expect(() => completer.stream.oList(), throws); | 269 expect(() => completer.stream.toList(), throwsStateError); |
| 273 await list; | 270 await list; |
| 274 expect(() => completer.stream.oList(), throws); | 271 expect(() => completer.stream.toList(), throwsStateError); |
| 275 }); | 272 }); |
| 276 | 273 |
| 277 test("listening more than once before setting stream", () async { | 274 test("listening more than once before setting stream", () async { |
| 278 var completer = new StreamCompleter(); | 275 var completer = new StreamCompleter(); |
| 279 var list = completer.stream.toList(); | 276 completer.stream.toList(); |
| 280 expect(() => completer.stream.oList(), throws); | 277 expect(() => completer.stream.toList(), throwsStateError); |
| 281 }); | 278 }); |
| 282 | 279 |
| 283 test("setting onData etc. before and after setting stream", () async { | 280 test("setting onData etc. before and after setting stream", () async { |
| 284 var completer = new StreamCompleter(); | 281 var completer = new StreamCompleter(); |
| 285 var controller = new StreamController(); | 282 var controller = new StreamController(); |
| 286 var subscription = completer.stream.listen(null); | 283 var subscription = completer.stream.listen(null); |
| 287 var lastEvent = 0; | 284 var lastEvent = 0; |
| 288 subscription.onData((value) => lastEvent = value); | 285 subscription.onData((value) => lastEvent = value); |
| 289 subscription.onError((value) => lastEvent = "$value"); | 286 subscription.onError((value) => lastEvent = "$value"); |
| 290 subscription.onDone(() => lastEvent = -1); | 287 subscription.onDone(() => lastEvent = -1); |
| (...skipping 15 matching lines...) Expand all Loading... |
| 306 expect(lastEvent, "-2"); | 303 expect(lastEvent, "-2"); |
| 307 controller.close(); | 304 controller.close(); |
| 308 await flushMicrotasks(); | 305 await flushMicrotasks(); |
| 309 expect(lastEvent, -1); | 306 expect(lastEvent, -1); |
| 310 }); | 307 }); |
| 311 | 308 |
| 312 test("pause w/ resume future accross setting stream", () async { | 309 test("pause w/ resume future accross setting stream", () async { |
| 313 var completer = new StreamCompleter(); | 310 var completer = new StreamCompleter(); |
| 314 var resume = new Completer(); | 311 var resume = new Completer(); |
| 315 var subscription = completer.stream.listen(unreachable("data")); | 312 var subscription = completer.stream.listen(unreachable("data")); |
| 316 var lastEvent = 0; | |
| 317 subscription.pause(resume.future); | 313 subscription.pause(resume.future); |
| 318 await flushMicrotasks(); | 314 await flushMicrotasks(); |
| 319 completer.setSourceStream(createStream()); | 315 completer.setSourceStream(createStream()); |
| 320 await flushMicrotasks(); | 316 await flushMicrotasks(); |
| 321 resume.complete(); | 317 resume.complete(); |
| 322 var events = []; | 318 var events = []; |
| 323 subscription.onData(events.add); | 319 subscription.onData(events.add); |
| 324 await subscription.asFuture(); | 320 await subscription.asFuture(); |
| 325 expect(events, [1, 2, 3, 4]); | 321 expect(events, [1, 2, 3, 4]); |
| 326 }); | 322 }); |
| (...skipping 18 matching lines...) Expand all Loading... |
| 345 | 341 |
| 346 Stream<int> createStream() async* { | 342 Stream<int> createStream() async* { |
| 347 yield 1; | 343 yield 1; |
| 348 await flushMicrotasks(); | 344 await flushMicrotasks(); |
| 349 yield 2; | 345 yield 2; |
| 350 await flushMicrotasks(); | 346 await flushMicrotasks(); |
| 351 yield 3; | 347 yield 3; |
| 352 await flushMicrotasks(); | 348 await flushMicrotasks(); |
| 353 yield 4; | 349 yield 4; |
| 354 } | 350 } |
| 355 | |
| 356 /// A zero-millisecond timer should wait until after all microtasks. | |
| 357 Future flushMicrotasks() => new Future.delayed(Duration.ZERO); | |
| 358 | |
| 359 /// A generic unreachable callback function. | |
| 360 /// | |
| 361 /// Returns a function that fails the test if it is ever called. | |
| 362 unreachable(String name) => ([a, b]) => fail("Unreachable: $name"); | |
| 363 | |
| 364 /// A badly behaved stream which throws if it's ever listened to. | |
| 365 /// | |
| 366 /// Can be used to test cases where a stream should not be used. | |
| 367 class UnusableStream extends Stream { | |
| 368 listen(onData, {onError, onDone, cancelOnError}) { | |
| 369 throw new UnimplementedError("Gotcha!"); | |
| 370 } | |
| 371 } | |
| OLD | NEW |