| OLD | NEW |
| 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 import "dart:async"; | 5 import "dart:async"; |
| 6 | 6 |
| 7 import "package:async/async.dart" show SubscriptionStream; | 7 import "package:async/async.dart" show SubscriptionStream; |
| 8 import "package:test/test.dart"; | 8 import "package:test/test.dart"; |
| 9 | 9 |
| 10 import "utils.dart"; | 10 import "utils.dart"; |
| 11 | 11 |
| 12 main() { | 12 main() { |
| 13 test("subscription stream of an entire subscription", () async { | 13 test("subscription stream of an entire subscription", () async { |
| 14 var stream = createStream(); | 14 var stream = createStream(); |
| 15 var subscription = stream.listen(null); | 15 var subscription = stream.listen(null); |
| 16 var subscriptionStream = new SubscriptionStream<int>(subscription); | 16 var subscriptionStream = new SubscriptionStream<int>(subscription); |
| 17 await flushMicrotasks(); | 17 await flushMicrotasks(); |
| 18 expect(subscriptionStream.toList(), completion([1, 2, 3, 4])); | 18 expect(subscriptionStream.toList(), completion([1, 2, 3, 4])); |
| 19 }); | 19 }); |
| 20 | 20 |
| 21 test("subscription stream after two events", () async { | 21 test("subscription stream after two events", () async { |
| 22 var stream = createStream(); | 22 var stream = createStream(); |
| 23 var skips = 0; | 23 var skips = 0; |
| 24 var completer = new Completer(); | 24 var completer = new Completer(); |
| 25 var subscription; | 25 StreamSubscription<int> subscription; |
| 26 subscription = stream.listen((value) { | 26 subscription = stream.listen((value) { |
| 27 ++skips; | 27 ++skips; |
| 28 expect(value, skips); | 28 expect(value, skips); |
| 29 if (skips == 2) { | 29 if (skips == 2) { |
| 30 completer.complete(new SubscriptionStream<int>(subscription)); | 30 completer.complete(new SubscriptionStream<int>(subscription)); |
| 31 } | 31 } |
| 32 }); | 32 }); |
| 33 var subscriptionStream = await completer.future; | 33 var subscriptionStream = await completer.future; |
| 34 await flushMicrotasks(); | 34 await flushMicrotasks(); |
| 35 expect(subscriptionStream.toList(), completion([3, 4])); | 35 expect(subscriptionStream.toList(), completion([3, 4])); |
| (...skipping 30 matching lines...) Expand all Loading... |
| 66 expect(controller.isPaused, isFalse); | 66 expect(controller.isPaused, isFalse); |
| 67 | 67 |
| 68 expect(await subscription.cancel(), 42); | 68 expect(await subscription.cancel(), 42); |
| 69 expect(controller.hasListener, isFalse); | 69 expect(controller.hasListener, isFalse); |
| 70 }); | 70 }); |
| 71 | 71 |
| 72 group("cancelOnError source:", () { | 72 group("cancelOnError source:", () { |
| 73 for (var sourceCancels in [false, true]) { | 73 for (var sourceCancels in [false, true]) { |
| 74 group("${sourceCancels ? "yes" : "no"}:", () { | 74 group("${sourceCancels ? "yes" : "no"}:", () { |
| 75 var subscriptionStream; | 75 var subscriptionStream; |
| 76 var onCancel; // Completes if source stream is canceled before done. | 76 var onCancel; // Completes if source stream is canceled before done. |
| 77 setUp(() { | 77 setUp(() { |
| 78 var cancelCompleter = new Completer(); | 78 var cancelCompleter = new Completer(); |
| 79 var source = createErrorStream(cancelCompleter); | 79 var source = createErrorStream(cancelCompleter); |
| 80 onCancel = cancelCompleter.future; | 80 onCancel = cancelCompleter.future; |
| 81 var sourceSubscription = source.listen(null, | 81 var sourceSubscription = |
| 82 cancelOnError: sourceCancels); | 82 source.listen(null, cancelOnError: sourceCancels); |
| 83 subscriptionStream = new SubscriptionStream<int>(sourceSubscription); | 83 subscriptionStream = new SubscriptionStream<int>(sourceSubscription); |
| 84 }); | 84 }); |
| 85 | 85 |
| 86 test("- subscriptionStream: no", () async { | 86 test("- subscriptionStream: no", () async { |
| 87 var done = new Completer(); | 87 var done = new Completer(); |
| 88 var events = []; | 88 var events = []; |
| 89 subscriptionStream.listen(events.add, | 89 subscriptionStream.listen(events.add, |
| 90 onError: events.add, | 90 onError: events.add, onDone: done.complete, cancelOnError: false); |
| 91 onDone: done.complete, | |
| 92 cancelOnError: false); | |
| 93 var expected = [1, 2, "To err is divine!"]; | 91 var expected = [1, 2, "To err is divine!"]; |
| 94 if (sourceCancels) { | 92 if (sourceCancels) { |
| 95 await onCancel; | 93 await onCancel; |
| 96 // And [done] won't complete at all. | 94 // And [done] won't complete at all. |
| 97 bool isDone = false; | 95 bool isDone = false; |
| 98 done.future.then((_) { isDone = true; }); | 96 done.future.then((_) { |
| 97 isDone = true; |
| 98 }); |
| 99 await new Future.delayed(const Duration(milliseconds: 5)); | 99 await new Future.delayed(const Duration(milliseconds: 5)); |
| 100 expect(isDone, false); | 100 expect(isDone, false); |
| 101 } else { | 101 } else { |
| 102 expected.add(4); | 102 expected.add(4); |
| 103 await done.future; | 103 await done.future; |
| 104 } | 104 } |
| 105 expect(events, expected); | 105 expect(events, expected); |
| 106 }); | 106 }); |
| 107 | 107 |
| 108 test("- subscriptionStream: yes", () async { | 108 test("- subscriptionStream: yes", () async { |
| 109 var completer = new Completer(); | 109 var completer = new Completer(); |
| 110 var events = []; | 110 var events = []; |
| 111 subscriptionStream.listen(events.add, | 111 subscriptionStream.listen(events.add, |
| 112 onError: (value) { | 112 onError: (value) { |
| 113 events.add(value); | 113 events.add(value); |
| 114 completer.complete(); | 114 completer.complete(); |
| 115 }, | 115 }, |
| 116 onDone: () => throw "should not happen", | 116 onDone: () => throw "should not happen", |
| 117 cancelOnError: true); | 117 cancelOnError: true); |
| 118 await completer.future; | 118 await completer.future; |
| 119 await flushMicrotasks(); | 119 await flushMicrotasks(); |
| 120 expect(events, [1, 2, "To err is divine!"]); | 120 expect(events, [1, 2, "To err is divine!"]); |
| 121 }); | 121 }); |
| 122 }); | 122 }); |
| 123 } | 123 } |
| 124 | 124 |
| 125 for (var cancelOnError in [false, true]) { | 125 for (var cancelOnError in [false, true]) { |
| 126 group(cancelOnError ? "yes" : "no", () { | 126 group(cancelOnError ? "yes" : "no", () { |
| 127 test("- no error, value goes to asFuture", () async { | 127 test("- no error, value goes to asFuture", () async { |
| 128 var stream = createStream(); | 128 var stream = createStream(); |
| 129 var sourceSubscription = | 129 var sourceSubscription = |
| 130 stream.listen(null, cancelOnError: cancelOnError); | 130 stream.listen(null, cancelOnError: cancelOnError); |
| 131 var subscriptionStream = | 131 var subscriptionStream = new SubscriptionStream(sourceSubscription); |
| 132 new SubscriptionStream(sourceSubscription); | |
| 133 var subscription = | 132 var subscription = |
| 134 subscriptionStream.listen(null, cancelOnError: cancelOnError); | 133 subscriptionStream.listen(null, cancelOnError: cancelOnError); |
| 135 expect(subscription.asFuture(42), completion(42)); | 134 expect(subscription.asFuture(42), completion(42)); |
| 136 }); | 135 }); |
| 137 | 136 |
| 138 test("- error goes to asFuture", () async { | 137 test("- error goes to asFuture", () async { |
| 139 var stream = createErrorStream(); | 138 var stream = createErrorStream(); |
| 140 var sourceSubscription = stream.listen(null, | 139 var sourceSubscription = |
| 141 cancelOnError: cancelOnError); | 140 stream.listen(null, cancelOnError: cancelOnError); |
| 142 var subscriptionStream = | 141 var subscriptionStream = new SubscriptionStream(sourceSubscription); |
| 143 new SubscriptionStream(sourceSubscription); | |
| 144 | 142 |
| 145 var subscription = | 143 var subscription = |
| 146 subscriptionStream.listen(null, cancelOnError: cancelOnError); | 144 subscriptionStream.listen(null, cancelOnError: cancelOnError); |
| 147 expect(subscription.asFuture(), throws); | 145 expect(subscription.asFuture(), throws); |
| 148 }); | 146 }); |
| 149 }); | 147 }); |
| 150 } | 148 } |
| 151 }); | 149 }); |
| 152 } | 150 } |
| 153 | 151 |
| 154 Stream<int> createStream() async* { | 152 Stream<int> createStream() async* { |
| 155 yield 1; | 153 yield 1; |
| 156 await flushMicrotasks(); | 154 await flushMicrotasks(); |
| 157 yield 2; | 155 yield 2; |
| 158 await flushMicrotasks(); | 156 await flushMicrotasks(); |
| 159 yield 3; | 157 yield 3; |
| 160 await flushMicrotasks(); | 158 await flushMicrotasks(); |
| 161 yield 4; | 159 yield 4; |
| 162 } | 160 } |
| 163 | 161 |
| 164 Stream<int> createErrorStream([Completer onCancel]) async* { | 162 Stream<int> createErrorStream([Completer onCancel]) async* { |
| 165 bool canceled = true; | 163 bool canceled = true; |
| 166 try { | 164 try { |
| 167 yield 1; | 165 yield 1; |
| 168 await flushMicrotasks(); | 166 await flushMicrotasks(); |
| 169 yield 2; | 167 yield 2; |
| 170 await flushMicrotasks(); | 168 await flushMicrotasks(); |
| 171 yield* new Future.error("To err is divine!").asStream(); | 169 yield* new Future<int>.error("To err is divine!").asStream(); |
| 172 await flushMicrotasks(); | 170 await flushMicrotasks(); |
| 173 yield 4; | 171 yield 4; |
| 174 await flushMicrotasks(); | 172 await flushMicrotasks(); |
| 175 canceled = false; | 173 canceled = false; |
| 176 } finally { | 174 } finally { |
| 177 // Completes before the "done", but should be after all events. | 175 // Completes before the "done", but should be after all events. |
| 178 if (canceled && onCancel != null) { | 176 if (canceled && onCancel != null) { |
| 179 await flushMicrotasks(); | 177 await flushMicrotasks(); |
| 180 onCancel.complete(); | 178 onCancel.complete(); |
| 181 } | 179 } |
| 182 } | 180 } |
| 183 } | 181 } |
| 184 | 182 |
| 185 Stream<int> createLongStream() async* { | 183 Stream<int> createLongStream() async* { |
| 186 for (int i = 0; i < 200; i++) yield i; | 184 for (int i = 0; i < 200; i++) yield i; |
| 187 } | 185 } |
| OLD | NEW |