OLD | NEW |
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 import "dart:async"; | 5 import "dart:async"; |
6 | 6 |
7 import "package:async/async.dart" show SubscriptionStream; | 7 import "package:async/async.dart" show SubscriptionStream; |
8 import "package:test/test.dart"; | 8 import "package:test/test.dart"; |
9 | 9 |
10 import "utils.dart"; | 10 import "utils.dart"; |
11 | 11 |
12 main() { | 12 main() { |
13 test("subscription stream of an entire subscription", () async { | 13 test("subscription stream of an entire subscription", () async { |
14 var stream = createStream(); | 14 var stream = createStream(); |
15 var subscription = stream.listen(null); | 15 var subscription = stream.listen(null); |
16 var subscriptionStream = new SubscriptionStream<int>(subscription); | 16 var subscriptionStream = new SubscriptionStream<int>(subscription); |
17 await flushMicrotasks(); | 17 await flushMicrotasks(); |
18 expect(subscriptionStream.toList(), completion([1, 2, 3, 4])); | 18 expect(subscriptionStream.toList(), completion([1, 2, 3, 4])); |
19 }); | 19 }); |
20 | 20 |
21 test("subscription stream after two events", () async { | 21 test("subscription stream after two events", () async { |
22 var stream = createStream(); | 22 var stream = createStream(); |
23 var skips = 0; | 23 var skips = 0; |
24 var completer = new Completer(); | 24 var completer = new Completer(); |
25 var subscription; | 25 StreamSubscription<int> subscription; |
26 subscription = stream.listen((value) { | 26 subscription = stream.listen((value) { |
27 ++skips; | 27 ++skips; |
28 expect(value, skips); | 28 expect(value, skips); |
29 if (skips == 2) { | 29 if (skips == 2) { |
30 completer.complete(new SubscriptionStream<int>(subscription)); | 30 completer.complete(new SubscriptionStream<int>(subscription)); |
31 } | 31 } |
32 }); | 32 }); |
33 var subscriptionStream = await completer.future; | 33 var subscriptionStream = await completer.future; |
34 await flushMicrotasks(); | 34 await flushMicrotasks(); |
35 expect(subscriptionStream.toList(), completion([3, 4])); | 35 expect(subscriptionStream.toList(), completion([3, 4])); |
(...skipping 30 matching lines...) Expand all Loading... |
66 expect(controller.isPaused, isFalse); | 66 expect(controller.isPaused, isFalse); |
67 | 67 |
68 expect(await subscription.cancel(), 42); | 68 expect(await subscription.cancel(), 42); |
69 expect(controller.hasListener, isFalse); | 69 expect(controller.hasListener, isFalse); |
70 }); | 70 }); |
71 | 71 |
72 group("cancelOnError source:", () { | 72 group("cancelOnError source:", () { |
73 for (var sourceCancels in [false, true]) { | 73 for (var sourceCancels in [false, true]) { |
74 group("${sourceCancels ? "yes" : "no"}:", () { | 74 group("${sourceCancels ? "yes" : "no"}:", () { |
75 var subscriptionStream; | 75 var subscriptionStream; |
76 var onCancel; // Completes if source stream is canceled before done. | 76 var onCancel; // Completes if source stream is canceled before done. |
77 setUp(() { | 77 setUp(() { |
78 var cancelCompleter = new Completer(); | 78 var cancelCompleter = new Completer(); |
79 var source = createErrorStream(cancelCompleter); | 79 var source = createErrorStream(cancelCompleter); |
80 onCancel = cancelCompleter.future; | 80 onCancel = cancelCompleter.future; |
81 var sourceSubscription = source.listen(null, | 81 var sourceSubscription = |
82 cancelOnError: sourceCancels); | 82 source.listen(null, cancelOnError: sourceCancels); |
83 subscriptionStream = new SubscriptionStream<int>(sourceSubscription); | 83 subscriptionStream = new SubscriptionStream<int>(sourceSubscription); |
84 }); | 84 }); |
85 | 85 |
86 test("- subscriptionStream: no", () async { | 86 test("- subscriptionStream: no", () async { |
87 var done = new Completer(); | 87 var done = new Completer(); |
88 var events = []; | 88 var events = []; |
89 subscriptionStream.listen(events.add, | 89 subscriptionStream.listen(events.add, |
90 onError: events.add, | 90 onError: events.add, onDone: done.complete, cancelOnError: false); |
91 onDone: done.complete, | |
92 cancelOnError: false); | |
93 var expected = [1, 2, "To err is divine!"]; | 91 var expected = [1, 2, "To err is divine!"]; |
94 if (sourceCancels) { | 92 if (sourceCancels) { |
95 await onCancel; | 93 await onCancel; |
96 // And [done] won't complete at all. | 94 // And [done] won't complete at all. |
97 bool isDone = false; | 95 bool isDone = false; |
98 done.future.then((_) { isDone = true; }); | 96 done.future.then((_) { |
| 97 isDone = true; |
| 98 }); |
99 await new Future.delayed(const Duration(milliseconds: 5)); | 99 await new Future.delayed(const Duration(milliseconds: 5)); |
100 expect(isDone, false); | 100 expect(isDone, false); |
101 } else { | 101 } else { |
102 expected.add(4); | 102 expected.add(4); |
103 await done.future; | 103 await done.future; |
104 } | 104 } |
105 expect(events, expected); | 105 expect(events, expected); |
106 }); | 106 }); |
107 | 107 |
108 test("- subscriptionStream: yes", () async { | 108 test("- subscriptionStream: yes", () async { |
109 var completer = new Completer(); | 109 var completer = new Completer(); |
110 var events = []; | 110 var events = []; |
111 subscriptionStream.listen(events.add, | 111 subscriptionStream.listen(events.add, |
112 onError: (value) { | 112 onError: (value) { |
113 events.add(value); | 113 events.add(value); |
114 completer.complete(); | 114 completer.complete(); |
115 }, | 115 }, |
116 onDone: () => throw "should not happen", | 116 onDone: () => throw "should not happen", |
117 cancelOnError: true); | 117 cancelOnError: true); |
118 await completer.future; | 118 await completer.future; |
119 await flushMicrotasks(); | 119 await flushMicrotasks(); |
120 expect(events, [1, 2, "To err is divine!"]); | 120 expect(events, [1, 2, "To err is divine!"]); |
121 }); | 121 }); |
122 }); | 122 }); |
123 } | 123 } |
124 | 124 |
125 for (var cancelOnError in [false, true]) { | 125 for (var cancelOnError in [false, true]) { |
126 group(cancelOnError ? "yes" : "no", () { | 126 group(cancelOnError ? "yes" : "no", () { |
127 test("- no error, value goes to asFuture", () async { | 127 test("- no error, value goes to asFuture", () async { |
128 var stream = createStream(); | 128 var stream = createStream(); |
129 var sourceSubscription = | 129 var sourceSubscription = |
130 stream.listen(null, cancelOnError: cancelOnError); | 130 stream.listen(null, cancelOnError: cancelOnError); |
131 var subscriptionStream = | 131 var subscriptionStream = new SubscriptionStream(sourceSubscription); |
132 new SubscriptionStream(sourceSubscription); | |
133 var subscription = | 132 var subscription = |
134 subscriptionStream.listen(null, cancelOnError: cancelOnError); | 133 subscriptionStream.listen(null, cancelOnError: cancelOnError); |
135 expect(subscription.asFuture(42), completion(42)); | 134 expect(subscription.asFuture(42), completion(42)); |
136 }); | 135 }); |
137 | 136 |
138 test("- error goes to asFuture", () async { | 137 test("- error goes to asFuture", () async { |
139 var stream = createErrorStream(); | 138 var stream = createErrorStream(); |
140 var sourceSubscription = stream.listen(null, | 139 var sourceSubscription = |
141 cancelOnError: cancelOnError); | 140 stream.listen(null, cancelOnError: cancelOnError); |
142 var subscriptionStream = | 141 var subscriptionStream = new SubscriptionStream(sourceSubscription); |
143 new SubscriptionStream(sourceSubscription); | |
144 | 142 |
145 var subscription = | 143 var subscription = |
146 subscriptionStream.listen(null, cancelOnError: cancelOnError); | 144 subscriptionStream.listen(null, cancelOnError: cancelOnError); |
147 expect(subscription.asFuture(), throws); | 145 expect(subscription.asFuture(), throws); |
148 }); | 146 }); |
149 }); | 147 }); |
150 } | 148 } |
151 }); | 149 }); |
152 } | 150 } |
153 | 151 |
154 Stream<int> createStream() async* { | 152 Stream<int> createStream() async* { |
155 yield 1; | 153 yield 1; |
156 await flushMicrotasks(); | 154 await flushMicrotasks(); |
157 yield 2; | 155 yield 2; |
158 await flushMicrotasks(); | 156 await flushMicrotasks(); |
159 yield 3; | 157 yield 3; |
160 await flushMicrotasks(); | 158 await flushMicrotasks(); |
161 yield 4; | 159 yield 4; |
162 } | 160 } |
163 | 161 |
164 Stream<int> createErrorStream([Completer onCancel]) async* { | 162 Stream<int> createErrorStream([Completer onCancel]) async* { |
165 bool canceled = true; | 163 bool canceled = true; |
166 try { | 164 try { |
167 yield 1; | 165 yield 1; |
168 await flushMicrotasks(); | 166 await flushMicrotasks(); |
169 yield 2; | 167 yield 2; |
170 await flushMicrotasks(); | 168 await flushMicrotasks(); |
171 yield* new Future.error("To err is divine!").asStream(); | 169 yield* new Future<int>.error("To err is divine!").asStream(); |
172 await flushMicrotasks(); | 170 await flushMicrotasks(); |
173 yield 4; | 171 yield 4; |
174 await flushMicrotasks(); | 172 await flushMicrotasks(); |
175 canceled = false; | 173 canceled = false; |
176 } finally { | 174 } finally { |
177 // Completes before the "done", but should be after all events. | 175 // Completes before the "done", but should be after all events. |
178 if (canceled && onCancel != null) { | 176 if (canceled && onCancel != null) { |
179 await flushMicrotasks(); | 177 await flushMicrotasks(); |
180 onCancel.complete(); | 178 onCancel.complete(); |
181 } | 179 } |
182 } | 180 } |
183 } | 181 } |
184 | 182 |
185 Stream<int> createLongStream() async* { | 183 Stream<int> createLongStream() async* { |
186 for (int i = 0; i < 200; i++) yield i; | 184 for (int i = 0; i < 200; i++) yield i; |
187 } | 185 } |
OLD | NEW |