Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(116)

Side by Side Diff: packages/async/test/stream_splitter_test.dart

Issue 1400473008: Roll Observatory packages and add a roll script (Closed) Base URL: git@github.com:dart-lang/observatory_pub_packages.git@master
Patch Set: Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « packages/async/test/stream_queue_test.dart ('k') | packages/async/test/stream_zip_test.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 import 'dart:async';
6
7 import 'package:async/async.dart';
8 import 'package:test/test.dart';
9
10 main() {
11 var controller;
12 var splitter;
13 setUp(() {
14 controller = new StreamController<int>();
15 splitter = new StreamSplitter<int>(controller.stream);
16 });
17
18 test("a branch that's created before the stream starts to replay it",
19 () async {
20 var events = [];
21 var branch = splitter.split();
22 splitter.close();
23 branch.listen(events.add);
24
25 controller.add(1);
26 await flushMicrotasks();
27 expect(events, equals([1]));
28
29 controller.add(2);
30 await flushMicrotasks();
31 expect(events, equals([1, 2]));
32
33 controller.add(3);
34 await flushMicrotasks();
35 expect(events, equals([1, 2, 3]));
36
37 controller.close();
38 });
39
40 test("a branch replays error events as well as data events", () {
41 var branch = splitter.split();
42 splitter.close();
43
44 controller.add(1);
45 controller.addError("error");
46 controller.add(3);
47 controller.close();
48
49 var count = 0;
50 branch.listen(expectAsync((value) {
51 expect(count, anyOf(0, 2));
52 expect(value, equals(count + 1));
53 count++;
54 }, count: 2), onError: expectAsync((error) {
55 expect(count, equals(1));
56 expect(error, equals("error"));
57 count++;
58 }), onDone: expectAsync(() {
59 expect(count, equals(3));
60 }));
61 });
62
63 test("a branch that's created in the middle of a stream replays it", () async {
64 controller.add(1);
65 controller.add(2);
66 await flushMicrotasks();
67
68 var branch = splitter.split();
69 splitter.close();
70
71 controller.add(3);
72 controller.add(4);
73 controller.close();
74
75 expect(branch.toList(), completion(equals([1, 2, 3, 4])));
76 });
77
78 test("a branch that's created after the stream is finished replays it",
79 () async {
80 controller.add(1);
81 controller.add(2);
82 controller.add(3);
83 controller.close();
84 await flushMicrotasks();
85
86 expect(splitter.split().toList(), completion(equals([1, 2, 3])));
87 splitter.close();
88 });
89
90 test("creates single-subscription branches", () async {
91 var branch = splitter.split();
92 expect(branch.isBroadcast, isFalse);
93 branch.listen(null);
94 expect(() => branch.listen(null), throwsStateError);
95 expect(() => branch.listen(null), throwsStateError);
96 });
97
98 // TODO(nweiz): Test that branches have the correct reified type once Dart
99 // 1.11 is released. In 1.10, the stream exposed by a StreamController didn't
100 // have a reified type.
101
102 test("multiple branches each replay the stream", () async {
103 var branch1 = splitter.split();
104 controller.add(1);
105 controller.add(2);
106 await flushMicrotasks();
107
108 var branch2 = splitter.split();
109 controller.add(3);
110 controller.close();
111 await flushMicrotasks();
112
113 var branch3 = splitter.split();
114 splitter.close();
115
116 expect(branch1.toList(), completion(equals([1, 2, 3])));
117 expect(branch2.toList(), completion(equals([1, 2, 3])));
118 expect(branch3.toList(), completion(equals([1, 2, 3])));
119 });
120
121 test("a branch doesn't close until the source stream closes", () async {
122 var branch = splitter.split();
123 splitter.close();
124
125 var closed = false;
126 branch.last.then((_) => closed = true);
127
128 controller.add(1);
129 controller.add(2);
130 controller.add(3);
131 await flushMicrotasks();
132 expect(closed, isFalse);
133
134 controller.close();
135 await flushMicrotasks();
136 expect(closed, isTrue);
137 });
138
139 test("the source stream isn't listened to until a branch is", () async {
140 expect(controller.hasListener, isFalse);
141
142 var branch = splitter.split();
143 splitter.close();
144 await flushMicrotasks();
145 expect(controller.hasListener, isFalse);
146
147 branch.listen(null);
148 await flushMicrotasks();
149 expect(controller.hasListener, isTrue);
150 });
151
152 test("the source stream is paused when all branches are paused", () async {
153 var branch1 = splitter.split();
154 var branch2 = splitter.split();
155 var branch3 = splitter.split();
156 splitter.close();
157
158 var subscription1 = branch1.listen(null);
159 var subscription2 = branch2.listen(null);
160 var subscription3 = branch3.listen(null);
161
162 subscription1.pause();
163 await flushMicrotasks();
164 expect(controller.isPaused, isFalse);
165
166 subscription2.pause();
167 await flushMicrotasks();
168 expect(controller.isPaused, isFalse);
169
170 subscription3.pause();
171 await flushMicrotasks();
172 expect(controller.isPaused, isTrue);
173
174 subscription2.resume();
175 await flushMicrotasks();
176 expect(controller.isPaused, isFalse);
177 });
178
179 test("the source stream is paused when all branches are canceled", () async {
180 var branch1 = splitter.split();
181 var branch2 = splitter.split();
182 var branch3 = splitter.split();
183
184 var subscription1 = branch1.listen(null);
185 var subscription2 = branch2.listen(null);
186 var subscription3 = branch3.listen(null);
187
188 subscription1.cancel();
189 await flushMicrotasks();
190 expect(controller.isPaused, isFalse);
191
192 subscription2.cancel();
193 await flushMicrotasks();
194 expect(controller.isPaused, isFalse);
195
196 subscription3.cancel();
197 await flushMicrotasks();
198 expect(controller.isPaused, isTrue);
199
200 var branch4 = splitter.split();
201 splitter.close();
202 await flushMicrotasks();
203 expect(controller.isPaused, isTrue);
204
205 branch4.listen(null);
206 await flushMicrotasks();
207 expect(controller.isPaused, isFalse);
208 });
209
210 test("the source stream is canceled when it's closed after all branches have "
211 "been canceled", () async {
212 var branch1 = splitter.split();
213 var branch2 = splitter.split();
214 var branch3 = splitter.split();
215
216 var subscription1 = branch1.listen(null);
217 var subscription2 = branch2.listen(null);
218 var subscription3 = branch3.listen(null);
219
220 subscription1.cancel();
221 await flushMicrotasks();
222 expect(controller.hasListener, isTrue);
223
224 subscription2.cancel();
225 await flushMicrotasks();
226 expect(controller.hasListener, isTrue);
227
228 subscription3.cancel();
229 await flushMicrotasks();
230 expect(controller.hasListener, isTrue);
231
232 splitter.close();
233 expect(controller.hasListener, isFalse);
234 });
235
236 test("the source stream is canceled when all branches are canceled after it "
237 "has been closed", () async {
238 var branch1 = splitter.split();
239 var branch2 = splitter.split();
240 var branch3 = splitter.split();
241 splitter.close();
242
243 var subscription1 = branch1.listen(null);
244 var subscription2 = branch2.listen(null);
245 var subscription3 = branch3.listen(null);
246
247 subscription1.cancel();
248 await flushMicrotasks();
249 expect(controller.hasListener, isTrue);
250
251 subscription2.cancel();
252 await flushMicrotasks();
253 expect(controller.hasListener, isTrue);
254
255 subscription3.cancel();
256 await flushMicrotasks();
257 expect(controller.hasListener, isFalse);
258 });
259
260 test("a splitter that's closed before any branches are added never listens "
261 "to the source stream", () {
262 splitter.close();
263
264 // This would throw an error if the stream had already been listened to.
265 controller.stream.listen(null);
266 });
267
268 test("splitFrom splits a source stream into the designated number of "
269 "branches", () {
270 var branches = StreamSplitter.splitFrom(controller.stream, 5);
271
272 controller.add(1);
273 controller.add(2);
274 controller.add(3);
275 controller.close();
276
277 expect(branches[0].toList(), completion(equals([1, 2, 3])));
278 expect(branches[1].toList(), completion(equals([1, 2, 3])));
279 expect(branches[2].toList(), completion(equals([1, 2, 3])));
280 expect(branches[3].toList(), completion(equals([1, 2, 3])));
281 expect(branches[4].toList(), completion(equals([1, 2, 3])));
282 });
283 }
284
285 /// Wait for all microtasks to complete.
286 Future flushMicrotasks() => new Future.delayed(Duration.ZERO);
OLDNEW
« no previous file with comments | « packages/async/test/stream_queue_test.dart ('k') | packages/async/test/stream_zip_test.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698