OLD | NEW |
1 // Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 // Test the basic StreamController and StreamController.singleSubscription. | 5 // Test the basic StreamController and StreamController.singleSubscription. |
6 library stream_controller_async_test; | 6 library stream_controller_async_test; |
7 | 7 |
8 import 'dart:async'; | 8 import 'dart:async'; |
9 import 'dart:isolate'; | 9 import 'dart:isolate'; |
10 import "package:expect/expect.dart"; | 10 import "package:expect/expect.dart"; |
11 import '../../../pkg/unittest/lib/unittest.dart'; | 11 import '../../../pkg/unittest/lib/unittest.dart'; |
12 import 'event_helper.dart'; | 12 import 'event_helper.dart'; |
13 import 'stream_state_helper.dart'; | 13 import 'stream_state_helper.dart'; |
14 | 14 |
15 void cancelSub(StreamSubscription sub) { sub.cancel(); } | 15 void cancelSub(StreamSubscription sub) { sub.cancel(); } |
16 | 16 |
17 testController() { | 17 testController() { |
18 // Test fold | 18 // Test fold |
19 test("StreamController.fold", () { | 19 test("StreamController.fold", () { |
20 StreamController c = new StreamController(); | 20 StreamController c = new StreamController(); |
21 Stream stream = c.stream.asBroadcastStream(onCancel: cancelSub); | 21 Stream stream = c.stream.asBroadcastStream(onCancel: cancelSub); |
22 stream.fold(0, (a,b) => a + b) | 22 stream.fold(0, (a,b) => a + b) |
23 .then(expectAsync1((int v) { | 23 .then(expectAsync((int v) { |
24 Expect.equals(42, v); | 24 Expect.equals(42, v); |
25 })); | 25 })); |
26 c.add(10); | 26 c.add(10); |
27 c.add(32); | 27 c.add(32); |
28 c.close(); | 28 c.close(); |
29 }); | 29 }); |
30 | 30 |
31 test("StreamController.fold throws", () { | 31 test("StreamController.fold throws", () { |
32 StreamController c = new StreamController(); | 32 StreamController c = new StreamController(); |
33 Stream stream = c.stream.asBroadcastStream(onCancel: cancelSub); | 33 Stream stream = c.stream.asBroadcastStream(onCancel: cancelSub); |
34 stream.fold(0, (a,b) { throw "Fnyf!"; }) | 34 stream.fold(0, (a,b) { throw "Fnyf!"; }) |
35 .catchError(expectAsync1((error) { Expect.equals("Fnyf!", error); })); | 35 .catchError(expectAsync((error) { Expect.equals("Fnyf!", error); })); |
36 c.add(42); | 36 c.add(42); |
37 }); | 37 }); |
38 } | 38 } |
39 | 39 |
40 testSingleController() { | 40 testSingleController() { |
41 test("Single-subscription StreamController.fold", () { | 41 test("Single-subscription StreamController.fold", () { |
42 StreamController c = new StreamController(); | 42 StreamController c = new StreamController(); |
43 Stream stream = c.stream; | 43 Stream stream = c.stream; |
44 stream.fold(0, (a,b) => a + b) | 44 stream.fold(0, (a,b) => a + b) |
45 .then(expectAsync1((int v) { Expect.equals(42, v); })); | 45 .then(expectAsync((int v) { Expect.equals(42, v); })); |
46 c.add(10); | 46 c.add(10); |
47 c.add(32); | 47 c.add(32); |
48 c.close(); | 48 c.close(); |
49 }); | 49 }); |
50 | 50 |
51 test("Single-subscription StreamController.fold throws", () { | 51 test("Single-subscription StreamController.fold throws", () { |
52 StreamController c = new StreamController(); | 52 StreamController c = new StreamController(); |
53 Stream stream = c.stream; | 53 Stream stream = c.stream; |
54 stream.fold(0, (a,b) { throw "Fnyf!"; }) | 54 stream.fold(0, (a,b) { throw "Fnyf!"; }) |
55 .catchError(expectAsync1((e) { Expect.equals("Fnyf!", e); })); | 55 .catchError(expectAsync((e) { Expect.equals("Fnyf!", e); })); |
56 c.add(42); | 56 c.add(42); |
57 }); | 57 }); |
58 | 58 |
59 test("Single-subscription StreamController events are buffered when" | 59 test("Single-subscription StreamController events are buffered when" |
60 " there is no subscriber", | 60 " there is no subscriber", |
61 () { | 61 () { |
62 StreamController c = new StreamController(); | 62 StreamController c = new StreamController(); |
63 EventSink sink = c.sink; | 63 EventSink sink = c.sink; |
64 Stream stream = c.stream; | 64 Stream stream = c.stream; |
65 int counter = 0; | 65 int counter = 0; |
66 sink.add(1); | 66 sink.add(1); |
67 sink.add(2); | 67 sink.add(2); |
68 sink.close(); | 68 sink.close(); |
69 stream.listen( | 69 stream.listen( |
70 (data) { | 70 (data) { |
71 counter += data; | 71 counter += data; |
72 }, | 72 }, |
73 onDone: expectAsync0(() { | 73 onDone: expectAsync(() { |
74 Expect.equals(3, counter); | 74 Expect.equals(3, counter); |
75 })); | 75 })); |
76 }); | 76 }); |
77 } | 77 } |
78 | 78 |
79 testExtraMethods() { | 79 testExtraMethods() { |
80 Events sentEvents = new Events()..add(7)..add(9)..add(13)..add(87)..close(); | 80 Events sentEvents = new Events()..add(7)..add(9)..add(13)..add(87)..close(); |
81 | 81 |
82 test("forEach", () { | 82 test("forEach", () { |
83 StreamController c = new StreamController(); | 83 StreamController c = new StreamController(); |
84 Events actualEvents = new Events(); | 84 Events actualEvents = new Events(); |
85 Future f = c.stream.forEach(actualEvents.add); | 85 Future f = c.stream.forEach(actualEvents.add); |
86 f.then(expectAsync1((_) { | 86 f.then(expectAsync((_) { |
87 actualEvents.close(); | 87 actualEvents.close(); |
88 Expect.listEquals(sentEvents.events, actualEvents.events); | 88 Expect.listEquals(sentEvents.events, actualEvents.events); |
89 })); | 89 })); |
90 sentEvents.replay(c); | 90 sentEvents.replay(c); |
91 }); | 91 }); |
92 | 92 |
93 test("forEachError", () { | 93 test("forEachError", () { |
94 Events sentEvents = new Events()..add(7)..error("bad")..add(87)..close(); | 94 Events sentEvents = new Events()..add(7)..error("bad")..add(87)..close(); |
95 StreamController c = new StreamController(); | 95 StreamController c = new StreamController(); |
96 Events actualEvents = new Events(); | 96 Events actualEvents = new Events(); |
97 Future f = c.stream.forEach(actualEvents.add); | 97 Future f = c.stream.forEach(actualEvents.add); |
98 f.catchError(expectAsync1((error) { | 98 f.catchError(expectAsync((error) { |
99 Expect.equals("bad", error); | 99 Expect.equals("bad", error); |
100 Expect.listEquals((new Events()..add(7)).events, actualEvents.events); | 100 Expect.listEquals((new Events()..add(7)).events, actualEvents.events); |
101 })); | 101 })); |
102 sentEvents.replay(c); | 102 sentEvents.replay(c); |
103 }); | 103 }); |
104 | 104 |
105 test("forEachError2", () { | 105 test("forEachError2", () { |
106 Events sentEvents = new Events()..add(7)..add(9)..add(87)..close(); | 106 Events sentEvents = new Events()..add(7)..add(9)..add(87)..close(); |
107 StreamController c = new StreamController(); | 107 StreamController c = new StreamController(); |
108 Events actualEvents = new Events(); | 108 Events actualEvents = new Events(); |
109 Future f = c.stream.forEach((x) { | 109 Future f = c.stream.forEach((x) { |
110 if (x == 9) throw "bad"; | 110 if (x == 9) throw "bad"; |
111 actualEvents.add(x); | 111 actualEvents.add(x); |
112 }); | 112 }); |
113 f.catchError(expectAsync1((error) { | 113 f.catchError(expectAsync((error) { |
114 Expect.equals("bad", error); | 114 Expect.equals("bad", error); |
115 Expect.listEquals((new Events()..add(7)).events, actualEvents.events); | 115 Expect.listEquals((new Events()..add(7)).events, actualEvents.events); |
116 })); | 116 })); |
117 sentEvents.replay(c); | 117 sentEvents.replay(c); |
118 }); | 118 }); |
119 | 119 |
120 test("firstWhere", () { | 120 test("firstWhere", () { |
121 StreamController c = new StreamController(); | 121 StreamController c = new StreamController(); |
122 Future f = c.stream.firstWhere((x) => (x % 3) == 0); | 122 Future f = c.stream.firstWhere((x) => (x % 3) == 0); |
123 f.then(expectAsync1((v) { Expect.equals(9, v); })); | 123 f.then(expectAsync((v) { Expect.equals(9, v); })); |
124 sentEvents.replay(c); | 124 sentEvents.replay(c); |
125 }); | 125 }); |
126 | 126 |
127 test("firstWhere 2", () { | 127 test("firstWhere 2", () { |
128 StreamController c = new StreamController(); | 128 StreamController c = new StreamController(); |
129 Future f = c.stream.firstWhere((x) => (x % 4) == 0); | 129 Future f = c.stream.firstWhere((x) => (x % 4) == 0); |
130 f.catchError(expectAsync1((e) {})); | 130 f.catchError(expectAsync((e) {})); |
131 sentEvents.replay(c); | 131 sentEvents.replay(c); |
132 }); | 132 }); |
133 | 133 |
134 test("firstWhere 3", () { | 134 test("firstWhere 3", () { |
135 StreamController c = new StreamController(); | 135 StreamController c = new StreamController(); |
136 Future f = c.stream.firstWhere((x) => (x % 4) == 0, defaultValue: () => 999)
; | 136 Future f = c.stream.firstWhere((x) => (x % 4) == 0, defaultValue: () => 999)
; |
137 f.then(expectAsync1((v) { Expect.equals(999, v); })); | 137 f.then(expectAsync((v) { Expect.equals(999, v); })); |
138 sentEvents.replay(c); | 138 sentEvents.replay(c); |
139 }); | 139 }); |
140 | 140 |
141 | 141 |
142 test("lastWhere", () { | 142 test("lastWhere", () { |
143 StreamController c = new StreamController(); | 143 StreamController c = new StreamController(); |
144 Future f = c.stream.lastWhere((x) => (x % 3) == 0); | 144 Future f = c.stream.lastWhere((x) => (x % 3) == 0); |
145 f.then(expectAsync1((v) { Expect.equals(87, v); })); | 145 f.then(expectAsync((v) { Expect.equals(87, v); })); |
146 sentEvents.replay(c); | 146 sentEvents.replay(c); |
147 }); | 147 }); |
148 | 148 |
149 test("lastWhere 2", () { | 149 test("lastWhere 2", () { |
150 StreamController c = new StreamController(); | 150 StreamController c = new StreamController(); |
151 Future f = c.stream.lastWhere((x) => (x % 4) == 0); | 151 Future f = c.stream.lastWhere((x) => (x % 4) == 0); |
152 f.catchError(expectAsync1((e) {})); | 152 f.catchError(expectAsync((e) {})); |
153 sentEvents.replay(c); | 153 sentEvents.replay(c); |
154 }); | 154 }); |
155 | 155 |
156 test("lastWhere 3", () { | 156 test("lastWhere 3", () { |
157 StreamController c = new StreamController(); | 157 StreamController c = new StreamController(); |
158 Future f = c.stream.lastWhere((x) => (x % 4) == 0, defaultValue: () => 999); | 158 Future f = c.stream.lastWhere((x) => (x % 4) == 0, defaultValue: () => 999); |
159 f.then(expectAsync1((v) { Expect.equals(999, v); })); | 159 f.then(expectAsync((v) { Expect.equals(999, v); })); |
160 sentEvents.replay(c); | 160 sentEvents.replay(c); |
161 }); | 161 }); |
162 | 162 |
163 test("singleWhere", () { | 163 test("singleWhere", () { |
164 StreamController c = new StreamController(); | 164 StreamController c = new StreamController(); |
165 Future f = c.stream.singleWhere((x) => (x % 9) == 0); | 165 Future f = c.stream.singleWhere((x) => (x % 9) == 0); |
166 f.then(expectAsync1((v) { Expect.equals(9, v); })); | 166 f.then(expectAsync((v) { Expect.equals(9, v); })); |
167 sentEvents.replay(c); | 167 sentEvents.replay(c); |
168 }); | 168 }); |
169 | 169 |
170 test("singleWhere 2", () { | 170 test("singleWhere 2", () { |
171 StreamController c = new StreamController(); | 171 StreamController c = new StreamController(); |
172 Future f = c.stream.singleWhere((x) => (x % 3) == 0); // Matches 9 and 87.. | 172 Future f = c.stream.singleWhere((x) => (x % 3) == 0); // Matches 9 and 87.. |
173 f.catchError(expectAsync1((error) { Expect.isTrue(error is StateError); })); | 173 f.catchError(expectAsync((error) { Expect.isTrue(error is StateError); })); |
174 sentEvents.replay(c); | 174 sentEvents.replay(c); |
175 }); | 175 }); |
176 | 176 |
177 test("first", () { | 177 test("first", () { |
178 StreamController c = new StreamController(); | 178 StreamController c = new StreamController(); |
179 Future f = c.stream.first; | 179 Future f = c.stream.first; |
180 f.then(expectAsync1((v) { Expect.equals(7, v);})); | 180 f.then(expectAsync((v) { Expect.equals(7, v);})); |
181 sentEvents.replay(c); | 181 sentEvents.replay(c); |
182 }); | 182 }); |
183 | 183 |
184 test("first empty", () { | 184 test("first empty", () { |
185 StreamController c = new StreamController(); | 185 StreamController c = new StreamController(); |
186 Future f = c.stream.first; | 186 Future f = c.stream.first; |
187 f.catchError(expectAsync1((error) { Expect.isTrue(error is StateError); })); | 187 f.catchError(expectAsync((error) { Expect.isTrue(error is StateError); })); |
188 Events emptyEvents = new Events()..close(); | 188 Events emptyEvents = new Events()..close(); |
189 emptyEvents.replay(c); | 189 emptyEvents.replay(c); |
190 }); | 190 }); |
191 | 191 |
192 test("first error", () { | 192 test("first error", () { |
193 StreamController c = new StreamController(); | 193 StreamController c = new StreamController(); |
194 Future f = c.stream.first; | 194 Future f = c.stream.first; |
195 f.catchError(expectAsync1((error) { Expect.equals("error", error); })); | 195 f.catchError(expectAsync((error) { Expect.equals("error", error); })); |
196 Events errorEvents = new Events()..error("error")..close(); | 196 Events errorEvents = new Events()..error("error")..close(); |
197 errorEvents.replay(c); | 197 errorEvents.replay(c); |
198 }); | 198 }); |
199 | 199 |
200 test("first error 2", () { | 200 test("first error 2", () { |
201 StreamController c = new StreamController(); | 201 StreamController c = new StreamController(); |
202 Future f = c.stream.first; | 202 Future f = c.stream.first; |
203 f.catchError(expectAsync1((error) { Expect.equals("error", error); })); | 203 f.catchError(expectAsync((error) { Expect.equals("error", error); })); |
204 Events errorEvents = new Events()..error("error")..error("error2")..close(); | 204 Events errorEvents = new Events()..error("error")..error("error2")..close(); |
205 errorEvents.replay(c); | 205 errorEvents.replay(c); |
206 }); | 206 }); |
207 | 207 |
208 test("last", () { | 208 test("last", () { |
209 StreamController c = new StreamController(); | 209 StreamController c = new StreamController(); |
210 Future f = c.stream.last; | 210 Future f = c.stream.last; |
211 f.then(expectAsync1((v) { Expect.equals(87, v);})); | 211 f.then(expectAsync((v) { Expect.equals(87, v);})); |
212 sentEvents.replay(c); | 212 sentEvents.replay(c); |
213 }); | 213 }); |
214 | 214 |
215 test("last empty", () { | 215 test("last empty", () { |
216 StreamController c = new StreamController(); | 216 StreamController c = new StreamController(); |
217 Future f = c.stream.last; | 217 Future f = c.stream.last; |
218 f.catchError(expectAsync1((error) { Expect.isTrue(error is StateError); })); | 218 f.catchError(expectAsync((error) { Expect.isTrue(error is StateError); })); |
219 Events emptyEvents = new Events()..close(); | 219 Events emptyEvents = new Events()..close(); |
220 emptyEvents.replay(c); | 220 emptyEvents.replay(c); |
221 }); | 221 }); |
222 | 222 |
223 test("last error", () { | 223 test("last error", () { |
224 StreamController c = new StreamController(); | 224 StreamController c = new StreamController(); |
225 Future f = c.stream.last; | 225 Future f = c.stream.last; |
226 f.catchError(expectAsync1((error) { Expect.equals("error", error); })); | 226 f.catchError(expectAsync((error) { Expect.equals("error", error); })); |
227 Events errorEvents = new Events()..error("error")..close(); | 227 Events errorEvents = new Events()..error("error")..close(); |
228 errorEvents.replay(c); | 228 errorEvents.replay(c); |
229 }); | 229 }); |
230 | 230 |
231 test("last error 2", () { | 231 test("last error 2", () { |
232 StreamController c = new StreamController(); | 232 StreamController c = new StreamController(); |
233 Future f = c.stream.last; | 233 Future f = c.stream.last; |
234 f.catchError(expectAsync1((error) { Expect.equals("error", error); })); | 234 f.catchError(expectAsync((error) { Expect.equals("error", error); })); |
235 Events errorEvents = new Events()..error("error")..error("error2")..close(); | 235 Events errorEvents = new Events()..error("error")..error("error2")..close(); |
236 errorEvents.replay(c); | 236 errorEvents.replay(c); |
237 }); | 237 }); |
238 | 238 |
239 test("elementAt", () { | 239 test("elementAt", () { |
240 StreamController c = new StreamController(); | 240 StreamController c = new StreamController(); |
241 Future f = c.stream.elementAt(2); | 241 Future f = c.stream.elementAt(2); |
242 f.then(expectAsync1((v) { Expect.equals(13, v);})); | 242 f.then(expectAsync((v) { Expect.equals(13, v);})); |
243 sentEvents.replay(c); | 243 sentEvents.replay(c); |
244 }); | 244 }); |
245 | 245 |
246 test("elementAt 2", () { | 246 test("elementAt 2", () { |
247 StreamController c = new StreamController(); | 247 StreamController c = new StreamController(); |
248 Future f = c.stream.elementAt(20); | 248 Future f = c.stream.elementAt(20); |
249 f.catchError(expectAsync1((error) { Expect.isTrue(error is RangeError); })); | 249 f.catchError(expectAsync((error) { Expect.isTrue(error is RangeError); })); |
250 sentEvents.replay(c); | 250 sentEvents.replay(c); |
251 }); | 251 }); |
252 | 252 |
253 test("drain", () { | 253 test("drain", () { |
254 StreamController c = new StreamController(); | 254 StreamController c = new StreamController(); |
255 Future f = c.stream.drain(); | 255 Future f = c.stream.drain(); |
256 f.then(expectAsync1((v) { Expect.equals(null, v);})); | 256 f.then(expectAsync((v) { Expect.equals(null, v);})); |
257 sentEvents.replay(c); | 257 sentEvents.replay(c); |
258 }); | 258 }); |
259 | 259 |
260 test("drain error", () { | 260 test("drain error", () { |
261 StreamController c = new StreamController(); | 261 StreamController c = new StreamController(); |
262 Future f = c.stream.drain(); | 262 Future f = c.stream.drain(); |
263 f.catchError(expectAsync1((error) { Expect.equals("error", error); })); | 263 f.catchError(expectAsync((error) { Expect.equals("error", error); })); |
264 Events errorEvents = new Events()..error("error")..error("error2")..close(); | 264 Events errorEvents = new Events()..error("error")..error("error2")..close(); |
265 errorEvents.replay(c); | 265 errorEvents.replay(c); |
266 }); | 266 }); |
267 | 267 |
268 } | 268 } |
269 | 269 |
270 testPause() { | 270 testPause() { |
271 test("pause event-unpause", () { | 271 test("pause event-unpause", () { |
272 | 272 |
273 StreamProtocolTest test = new StreamProtocolTest(); | 273 StreamProtocolTest test = new StreamProtocolTest(); |
(...skipping 86 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
360 | 360 |
361 class TestError { const TestError(); } | 361 class TestError { const TestError(); } |
362 | 362 |
363 testRethrow() { | 363 testRethrow() { |
364 TestError error = const TestError(); | 364 TestError error = const TestError(); |
365 | 365 |
366 testStream(name, streamValueTransform) { | 366 testStream(name, streamValueTransform) { |
367 test("rethrow-$name-value", () { | 367 test("rethrow-$name-value", () { |
368 StreamController c = new StreamController(); | 368 StreamController c = new StreamController(); |
369 Stream s = streamValueTransform(c.stream, (v) { throw error; }); | 369 Stream s = streamValueTransform(c.stream, (v) { throw error; }); |
370 s.listen((_) { Expect.fail("unexpected value"); }, onError: expectAsync1( | 370 s.listen((_) { Expect.fail("unexpected value"); }, onError: expectAsync( |
371 (e) { Expect.identical(error, e); })); | 371 (e) { Expect.identical(error, e); })); |
372 c.add(null); | 372 c.add(null); |
373 c.close(); | 373 c.close(); |
374 }); | 374 }); |
375 } | 375 } |
376 | 376 |
377 testStreamError(name, streamErrorTransform) { | 377 testStreamError(name, streamErrorTransform) { |
378 test("rethrow-$name-error", () { | 378 test("rethrow-$name-error", () { |
379 StreamController c = new StreamController(); | 379 StreamController c = new StreamController(); |
380 Stream s = streamErrorTransform(c.stream, (e) { throw error; }); | 380 Stream s = streamErrorTransform(c.stream, (e) { throw error; }); |
381 s.listen((_) { Expect.fail("unexpected value"); }, onError: expectAsync1( | 381 s.listen((_) { Expect.fail("unexpected value"); }, onError: expectAsync( |
382 (e) { Expect.identical(error, e); })); | 382 (e) { Expect.identical(error, e); })); |
383 c.addError(null); | 383 c.addError(null); |
384 c.close(); | 384 c.close(); |
385 }); | 385 }); |
386 } | 386 } |
387 | 387 |
388 testFuture(name, streamValueTransform) { | 388 testFuture(name, streamValueTransform) { |
389 test("rethrow-$name-value", () { | 389 test("rethrow-$name-value", () { |
390 StreamController c = new StreamController(); | 390 StreamController c = new StreamController(); |
391 Future f = streamValueTransform(c.stream, (v) { throw error; }); | 391 Future f = streamValueTransform(c.stream, (v) { throw error; }); |
392 f.then((v) { Expect.fail("unreachable"); }, | 392 f.then((v) { Expect.fail("unreachable"); }, |
393 onError: expectAsync1((e) { Expect.identical(error, e); })); | 393 onError: expectAsync((e) { Expect.identical(error, e); })); |
394 // Need two values to trigger compare for reduce. | 394 // Need two values to trigger compare for reduce. |
395 c.add(0); | 395 c.add(0); |
396 c.add(1); | 396 c.add(1); |
397 c.close(); | 397 c.close(); |
398 }); | 398 }); |
399 } | 399 } |
400 | 400 |
401 testStream("where", (s, act) => s.where(act)); | 401 testStream("where", (s, act) => s.where(act)); |
402 testStream("map", (s, act) => s.map(act)); | 402 testStream("map", (s, act) => s.map(act)); |
403 testStream("expand", (s, act) => s.expand(act)); | 403 testStream("expand", (s, act) => s.expand(act)); |
(...skipping 159 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
563 ..expectCancel() | 563 ..expectCancel() |
564 ..expectDone() | 564 ..expectDone() |
565 ..expectBroadcastCancel((_) => test.terminate()); | 565 ..expectBroadcastCancel((_) => test.terminate()); |
566 sub = test.listen(); | 566 sub = test.listen(); |
567 }); | 567 }); |
568 } | 568 } |
569 | 569 |
570 void testSink({bool sync, bool broadcast, bool asBroadcast}) { | 570 void testSink({bool sync, bool broadcast, bool asBroadcast}) { |
571 String type = "${sync?"S":"A"}${broadcast?"B":"S"}${asBroadcast?"aB":""}"; | 571 String type = "${sync?"S":"A"}${broadcast?"B":"S"}${asBroadcast?"aB":""}"; |
572 test("$type-controller-sink", () { | 572 test("$type-controller-sink", () { |
573 var done = expectAsync0((){}); | 573 var done = expectAsync((){}); |
574 var c = broadcast ? new StreamController.broadcast(sync: sync) | 574 var c = broadcast ? new StreamController.broadcast(sync: sync) |
575 : new StreamController(sync: sync); | 575 : new StreamController(sync: sync); |
576 var expected = new Events() | 576 var expected = new Events() |
577 ..add(42)..error("error") | 577 ..add(42)..error("error") |
578 ..add(1)..add(2)..add(3)..add(4)..add(5) | 578 ..add(1)..add(2)..add(3)..add(4)..add(5) |
579 ..add(43)..close(); | 579 ..add(43)..close(); |
580 var actual = new Events.capture(asBroadcast ? c.stream.asBroadcastStream() | 580 var actual = new Events.capture(asBroadcast ? c.stream.asBroadcastStream() |
581 : c.stream); | 581 : c.stream); |
582 var sink = c.sink; | 582 var sink = c.sink; |
583 sink.add(42); | 583 sink.add(42); |
584 sink.addError("error"); | 584 sink.addError("error"); |
585 sink.addStream(new Stream.fromIterable([1, 2, 3, 4, 5])) | 585 sink.addStream(new Stream.fromIterable([1, 2, 3, 4, 5])) |
586 .then((_) { | 586 .then((_) { |
587 sink.add(43); | 587 sink.add(43); |
588 return sink.close(); | 588 return sink.close(); |
589 }) | 589 }) |
590 .then((_) { | 590 .then((_) { |
591 Expect.listEquals(expected.events, actual.events); | 591 Expect.listEquals(expected.events, actual.events); |
592 done(); | 592 done(); |
593 }); | 593 }); |
594 }); | 594 }); |
595 | 595 |
596 test("$type-controller-sink-canceled", () { | 596 test("$type-controller-sink-canceled", () { |
597 var done = expectAsync0((){}); | 597 var done = expectAsync((){}); |
598 var c = broadcast ? new StreamController.broadcast(sync: sync) | 598 var c = broadcast ? new StreamController.broadcast(sync: sync) |
599 : new StreamController(sync: sync); | 599 : new StreamController(sync: sync); |
600 var expected = new Events() | 600 var expected = new Events() |
601 ..add(42)..error("error") | 601 ..add(42)..error("error") |
602 ..add(1)..add(2)..add(3); | 602 ..add(1)..add(2)..add(3); |
603 var stream = asBroadcast ? c.stream.asBroadcastStream() : c.stream; | 603 var stream = asBroadcast ? c.stream.asBroadcastStream() : c.stream; |
604 var actual = new Events(); | 604 var actual = new Events(); |
605 var sub; | 605 var sub; |
606 // Cancel subscription after receiving "3" event. | 606 // Cancel subscription after receiving "3" event. |
607 sub = stream.listen((v) { | 607 sub = stream.listen((v) { |
(...skipping 14 matching lines...) Expand all Loading... |
622 return doneFuture; | 622 return doneFuture; |
623 }) | 623 }) |
624 .then((_) { | 624 .then((_) { |
625 // No change in events. | 625 // No change in events. |
626 Expect.listEquals(expected.events, actual.events); | 626 Expect.listEquals(expected.events, actual.events); |
627 done(); | 627 done(); |
628 }); | 628 }); |
629 }); | 629 }); |
630 | 630 |
631 test("$type-controller-sink-paused", () { | 631 test("$type-controller-sink-paused", () { |
632 var done = expectAsync0((){}); | 632 var done = expectAsync((){}); |
633 var c = broadcast ? new StreamController.broadcast(sync: sync) | 633 var c = broadcast ? new StreamController.broadcast(sync: sync) |
634 : new StreamController(sync: sync); | 634 : new StreamController(sync: sync); |
635 var expected = new Events() | 635 var expected = new Events() |
636 ..add(42)..error("error") | 636 ..add(42)..error("error") |
637 ..add(1)..add(2)..add(3) | 637 ..add(1)..add(2)..add(3) |
638 ..add(4)..add(5)..add(43)..close(); | 638 ..add(4)..add(5)..add(43)..close(); |
639 var stream = asBroadcast ? c.stream.asBroadcastStream() : c.stream; | 639 var stream = asBroadcast ? c.stream.asBroadcastStream() : c.stream; |
640 var actual = new Events(); | 640 var actual = new Events(); |
641 var sub; | 641 var sub; |
642 var pauseIsDone = false; | 642 var pauseIsDone = false; |
(...skipping 28 matching lines...) Expand all Loading... |
671 }); | 671 }); |
672 } else { | 672 } else { |
673 Expect.listEquals(expected.events, actual.events); | 673 Expect.listEquals(expected.events, actual.events); |
674 done(); | 674 done(); |
675 } | 675 } |
676 }); | 676 }); |
677 }); | 677 }); |
678 | 678 |
679 test("$type-controller-addstream-error-stop", () { | 679 test("$type-controller-addstream-error-stop", () { |
680 // Check that addStream defaults to ending after the first error. | 680 // Check that addStream defaults to ending after the first error. |
681 var done = expectAsync0((){}); | 681 var done = expectAsync((){}); |
682 StreamController c = broadcast | 682 StreamController c = broadcast |
683 ? new StreamController.broadcast(sync: sync) | 683 ? new StreamController.broadcast(sync: sync) |
684 : new StreamController(sync: sync); | 684 : new StreamController(sync: sync); |
685 Stream stream = asBroadcast ? c.stream.asBroadcastStream() : c.stream; | 685 Stream stream = asBroadcast ? c.stream.asBroadcastStream() : c.stream; |
686 var actual = new Events.capture(stream); | 686 var actual = new Events.capture(stream); |
687 | 687 |
688 var source = new Events(); | 688 var source = new Events(); |
689 source..add(1)..add(2)..error("BAD")..add(3)..error("FAIL")..close(); | 689 source..add(1)..add(2)..error("BAD")..add(3)..error("FAIL")..close(); |
690 | 690 |
691 var expected = new Events()..add(1)..add(2)..error("BAD")..close(); | 691 var expected = new Events()..add(1)..add(2)..error("BAD")..close(); |
692 StreamController sourceController = new StreamController(); | 692 StreamController sourceController = new StreamController(); |
693 c.addStream(sourceController.stream).then((_) { | 693 c.addStream(sourceController.stream).then((_) { |
694 c.close().then((_) { | 694 c.close().then((_) { |
695 Expect.listEquals(expected.events, actual.events); | 695 Expect.listEquals(expected.events, actual.events); |
696 done(); | 696 done(); |
697 }); | 697 }); |
698 }); | 698 }); |
699 | 699 |
700 source.replay(sourceController); | 700 source.replay(sourceController); |
701 }); | 701 }); |
702 | 702 |
703 test("$type-controller-addstream-error-forward", () { | 703 test("$type-controller-addstream-error-forward", () { |
704 // Check that addStream with cancelOnError:false passes all data and errors | 704 // Check that addStream with cancelOnError:false passes all data and errors |
705 // to the controller. | 705 // to the controller. |
706 var done = expectAsync0((){}); | 706 var done = expectAsync((){}); |
707 StreamController c = broadcast | 707 StreamController c = broadcast |
708 ? new StreamController.broadcast(sync: sync) | 708 ? new StreamController.broadcast(sync: sync) |
709 : new StreamController(sync: sync); | 709 : new StreamController(sync: sync); |
710 Stream stream = asBroadcast ? c.stream.asBroadcastStream() : c.stream; | 710 Stream stream = asBroadcast ? c.stream.asBroadcastStream() : c.stream; |
711 var actual = new Events.capture(stream); | 711 var actual = new Events.capture(stream); |
712 | 712 |
713 var source = new Events(); | 713 var source = new Events(); |
714 source..add(1)..add(2)..addError("BAD")..add(3)..addError("FAIL")..close(); | 714 source..add(1)..add(2)..addError("BAD")..add(3)..addError("FAIL")..close(); |
715 | 715 |
716 StreamController sourceController = new StreamController(); | 716 StreamController sourceController = new StreamController(); |
717 c.addStream(sourceController.stream, cancelOnError: false).then((_) { | 717 c.addStream(sourceController.stream, cancelOnError: false).then((_) { |
718 c.close().then((_) { | 718 c.close().then((_) { |
719 Expect.listEquals(source.events, actual.events); | 719 Expect.listEquals(source.events, actual.events); |
720 done(); | 720 done(); |
721 }); | 721 }); |
722 }); | 722 }); |
723 | 723 |
724 source.replay(sourceController); | 724 source.replay(sourceController); |
725 }); | 725 }); |
726 | 726 |
727 test("$type-controller-addstream-twice", () { | 727 test("$type-controller-addstream-twice", () { |
728 // Using addStream twice on the same stream | 728 // Using addStream twice on the same stream |
729 var done = expectAsync0((){}); | 729 var done = expectAsync((){}); |
730 StreamController c = broadcast | 730 StreamController c = broadcast |
731 ? new StreamController.broadcast(sync: sync) | 731 ? new StreamController.broadcast(sync: sync) |
732 : new StreamController(sync: sync); | 732 : new StreamController(sync: sync); |
733 Stream stream = asBroadcast ? c.stream.asBroadcastStream() : c.stream; | 733 Stream stream = asBroadcast ? c.stream.asBroadcastStream() : c.stream; |
734 var actual = new Events.capture(stream); | 734 var actual = new Events.capture(stream); |
735 | 735 |
736 // Streams of five events, throws on 3. | 736 // Streams of five events, throws on 3. |
737 Stream s1 = new Stream.fromIterable([1,2,3,4,5]) | 737 Stream s1 = new Stream.fromIterable([1,2,3,4,5]) |
738 .map((x) => (x == 3 ? throw x : x)); | 738 .map((x) => (x == 3 ? throw x : x)); |
739 Stream s2 = new Stream.fromIterable([1,2,3,4,5]) | 739 Stream s2 = new Stream.fromIterable([1,2,3,4,5]) |
(...skipping 23 matching lines...) Expand all Loading... |
763 testRethrow(); | 763 testRethrow(); |
764 testBroadcastController(); | 764 testBroadcastController(); |
765 testAsBroadcast(); | 765 testAsBroadcast(); |
766 testSink(sync: true, broadcast: false, asBroadcast: false); | 766 testSink(sync: true, broadcast: false, asBroadcast: false); |
767 testSink(sync: true, broadcast: false, asBroadcast: true); | 767 testSink(sync: true, broadcast: false, asBroadcast: true); |
768 testSink(sync: true, broadcast: true, asBroadcast: false); | 768 testSink(sync: true, broadcast: true, asBroadcast: false); |
769 testSink(sync: false, broadcast: false, asBroadcast: false); | 769 testSink(sync: false, broadcast: false, asBroadcast: false); |
770 testSink(sync: false, broadcast: false, asBroadcast: true); | 770 testSink(sync: false, broadcast: false, asBroadcast: true); |
771 testSink(sync: false, broadcast: true, asBroadcast: false); | 771 testSink(sync: false, broadcast: true, asBroadcast: false); |
772 } | 772 } |
OLD | NEW |