Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(89)

Side by Side Diff: test/multi_channel_test.dart

Issue 1686263002: Make MultiChannel follow the stream channel rules. (Closed) Base URL: git@github.com:dart-lang/stream_channel.git@master
Patch Set: Code review changes Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « pubspec.yaml ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 import 'dart:async'; 5 import 'dart:async';
6 6
7 import 'package:stream_channel/stream_channel.dart'; 7 import 'package:stream_channel/stream_channel.dart';
8 import 'package:test/test.dart'; 8 import 'package:test/test.dart';
9 9
10 import 'utils.dart'; 10 import 'utils.dart';
11 11
12 void main() { 12 void main() {
13 var oneToTwo; 13 var controller;
14 var twoToOne;
15 var channel1; 14 var channel1;
16 var channel2; 15 var channel2;
17 setUp(() { 16 setUp(() {
18 oneToTwo = new StreamController(); 17 controller = new StreamChannelController();
19 twoToOne = new StreamController(); 18 channel1 = new MultiChannel(controller.local);
20 channel1 = new MultiChannel( 19 channel2 = new MultiChannel(controller.foreign);
21 new StreamChannel(twoToOne.stream, oneToTwo.sink));
22 channel2 = new MultiChannel(
23 new StreamChannel(oneToTwo.stream, twoToOne.sink));
24 }); 20 });
25 21
26 group("the default virtual channel", () { 22 group("the default virtual channel", () {
27 test("begins connected", () { 23 test("begins connected", () {
28 var first = true; 24 var first = true;
29 channel2.stream.listen(expectAsync((message) { 25 channel2.stream.listen(expectAsync((message) {
30 if (first) { 26 if (first) {
31 expect(message, equals("hello")); 27 expect(message, equals("hello"));
32 first = false; 28 first = false;
33 } else { 29 } else {
(...skipping 25 matching lines...) Expand all
59 55
60 channel1.stream.listen((_) {}).cancel(); 56 channel1.stream.listen((_) {}).cancel();
61 57
62 // Ensure that there's enough time for the channel to close if it's going 58 // Ensure that there's enough time for the channel to close if it's going
63 // to. 59 // to.
64 return pumpEventQueue(); 60 return pumpEventQueue();
65 }); 61 });
66 62
67 test("closes the underlying channel when it closes without any other " 63 test("closes the underlying channel when it closes without any other "
68 "virtual channels", () { 64 "virtual channels", () {
69 expect(oneToTwo.done, completes); 65 expect(controller.local.sink.done, completes);
70 expect(twoToOne.done, completes); 66 expect(controller.foreign.sink.done, completes);
71 67
72 channel1.sink.close(); 68 channel1.sink.close();
73 }); 69 });
74 70
75 test("doesn't close the underlying channel when it closes with other " 71 test("doesn't close the underlying channel when it closes with other "
76 "virtual channels", () { 72 "virtual channels", () {
77 oneToTwo.done.then(expectAsync((_) {}, count: 0)); 73 controller.local.sink.done.then(expectAsync((_) {}, count: 0));
78 twoToOne.done.then(expectAsync((_) {}, count: 0)); 74 controller.foreign.sink.done.then(expectAsync((_) {}, count: 0));
79 75
80 // Establish another virtual connection which should keep the underlying 76 // Establish another virtual connection which should keep the underlying
81 // connection open. 77 // connection open.
82 channel2.virtualChannel(channel1.virtualChannel().id); 78 channel2.virtualChannel(channel1.virtualChannel().id);
83 channel1.sink.close(); 79 channel1.sink.close();
84 80
85 // Ensure that there's enough time for the underlying channel to complete 81 // Ensure that there's enough time for the underlying channel to complete
86 // if it's going to. 82 // if it's going to.
87 return pumpEventQueue(); 83 return pumpEventQueue();
88 }); 84 });
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after
142 return pumpEventQueue(); 138 return pumpEventQueue();
143 }); 139 });
144 140
145 test("closes the underlying channel when it closes without any other " 141 test("closes the underlying channel when it closes without any other "
146 "virtual channels", () async { 142 "virtual channels", () async {
147 // First close the default channel so we can test the new channel as the 143 // First close the default channel so we can test the new channel as the
148 // last living virtual channel. 144 // last living virtual channel.
149 channel1.sink.close(); 145 channel1.sink.close();
150 146
151 await channel2.stream.toList(); 147 await channel2.stream.toList();
152 expect(oneToTwo.done, completes); 148 expect(controller.local.sink.done, completes);
153 expect(twoToOne.done, completes); 149 expect(controller.foreign.sink.done, completes);
154 150
155 virtual1.sink.close(); 151 virtual1.sink.close();
156 }); 152 });
157 153
158 test("doesn't close the underlying channel when it closes with other " 154 test("doesn't close the underlying channel when it closes with other "
159 "virtual channels", () { 155 "virtual channels", () {
160 oneToTwo.done.then(expectAsync((_) {}, count: 0)); 156 controller.local.sink.done.then(expectAsync((_) {}, count: 0));
161 twoToOne.done.then(expectAsync((_) {}, count: 0)); 157 controller.foreign.sink.done.then(expectAsync((_) {}, count: 0));
162 158
163 virtual1.sink.close(); 159 virtual1.sink.close();
164 160
165 // Ensure that there's enough time for the underlying channel to complete 161 // Ensure that there's enough time for the underlying channel to complete
166 // if it's going to. 162 // if it's going to.
167 return pumpEventQueue(); 163 return pumpEventQueue();
168 }); 164 });
169 165
170 test("doesn't conflict with a remote virtual channel", () { 166 test("doesn't conflict with a remote virtual channel", () {
171 var virtual3 = channel2.virtualChannel(); 167 var virtual3 = channel2.virtualChannel();
(...skipping 67 matching lines...) Expand 10 before | Expand all | Expand 10 after
239 return pumpEventQueue(); 235 return pumpEventQueue();
240 }); 236 });
241 237
242 test("closes the underlying channel when it closes without any other " 238 test("closes the underlying channel when it closes without any other "
243 "virtual channels", () async { 239 "virtual channels", () async {
244 // First close the default channel so we can test the new channel as the 240 // First close the default channel so we can test the new channel as the
245 // last living virtual channel. 241 // last living virtual channel.
246 channel2.sink.close(); 242 channel2.sink.close();
247 243
248 await channel1.stream.toList(); 244 await channel1.stream.toList();
249 expect(oneToTwo.done, completes); 245 expect(controller.local.sink.done, completes);
250 expect(twoToOne.done, completes); 246 expect(controller.foreign.sink.done, completes);
251 247
252 virtual2.sink.close(); 248 virtual2.sink.close();
253 }); 249 });
254 250
255 test("doesn't close the underlying channel when it closes with other " 251 test("doesn't close the underlying channel when it closes with other "
256 "virtual channels", () { 252 "virtual channels", () {
257 oneToTwo.done.then(expectAsync((_) {}, count: 0)); 253 controller.local.sink.done.then(expectAsync((_) {}, count: 0));
258 twoToOne.done.then(expectAsync((_) {}, count: 0)); 254 controller.foreign.sink.done.then(expectAsync((_) {}, count: 0));
259 255
260 virtual2.sink.close(); 256 virtual2.sink.close();
261 257
262 // Ensure that there's enough time for the underlying channel to complete 258 // Ensure that there's enough time for the underlying channel to complete
263 // if it's going to. 259 // if it's going to.
264 return pumpEventQueue(); 260 return pumpEventQueue();
265 }); 261 });
266 262
267 test("doesn't allow another virtual channel with the same id", () { 263 test("doesn't allow another virtual channel with the same id", () {
268 expect(() => channel2.virtualChannel(virtual1.id), 264 expect(() => channel2.virtualChannel(virtual1.id),
(...skipping 12 matching lines...) Expand all
281 test("closes, all virtual channels close", () { 277 test("closes, all virtual channels close", () {
282 expect(channel1.stream.toList(), completion(isEmpty)); 278 expect(channel1.stream.toList(), completion(isEmpty));
283 expect(channel1.sink.done, completes); 279 expect(channel1.sink.done, completes);
284 expect(channel2.stream.toList(), completion(isEmpty)); 280 expect(channel2.stream.toList(), completion(isEmpty));
285 expect(channel2.sink.done, completes); 281 expect(channel2.sink.done, completes);
286 expect(virtual1.stream.toList(), completion(isEmpty)); 282 expect(virtual1.stream.toList(), completion(isEmpty));
287 expect(virtual1.sink.done, completes); 283 expect(virtual1.sink.done, completes);
288 expect(virtual2.stream.toList(), completion(isEmpty)); 284 expect(virtual2.stream.toList(), completion(isEmpty));
289 expect(virtual2.sink.done, completes); 285 expect(virtual2.sink.done, completes);
290 286
291 oneToTwo.close(); 287 controller.local.sink.close();
292 }); 288 });
293 289
294 test("closes, no more virtual channels may be created", () { 290 test("closes, more virtual channels are created closed", () async {
295 expect(channel1.sink.done.then((_) => channel1.virtualChannel()), 291 channel2.sink.close();
296 throwsStateError); 292 virtual2.sink.close();
297 expect(channel2.sink.done.then((_) => channel2.virtualChannel()),
298 throwsStateError);
299 293
300 oneToTwo.close(); 294 // Wait for the existing channels to emit done events.
295 await channel1.stream.toList();
296 await virtual1.stream.toList();
297
298 var virtual = channel1.virtualChannel();
299 expect(virtual.stream.toList(), completion(isEmpty));
300 expect(virtual.sink.done, completes);
301
302 virtual = channel1.virtualChannel();
303 expect(virtual.stream.toList(), completion(isEmpty));
304 expect(virtual.sink.done, completes);
301 }); 305 });
302 306
303 test("emits an error, the error is sent only to the default channel", () { 307 test("emits an error, the error is sent only to the default channel", () {
304 channel1.stream.listen(expectAsync((_) {}, count: 0), 308 channel1.stream.listen(expectAsync((_) {}, count: 0),
305 onError: expectAsync((error) => expect(error, equals("oh no")))); 309 onError: expectAsync((error) => expect(error, equals("oh no"))));
306 virtual1.stream.listen(expectAsync((_) {}, count: 0), 310 virtual1.stream.listen(expectAsync((_) {}, count: 0),
307 onError: expectAsync((_) {}, count: 0)); 311 onError: expectAsync((_) {}, count: 0));
308 312
309 twoToOne.addError("oh no"); 313 controller.foreign.sink.addError("oh no");
314 });
315 });
316
317 group("stream channel rules", () {
318 group("for the main stream:", () {
319 test("closing the sink causes the stream to close before it emits any more "
320 "events", () {
321 channel1.sink.add(1);
322 channel1.sink.add(2);
323 channel1.sink.add(3);
324
325 channel2.stream.listen(expectAsync((message) {
326 expect(message, equals(1));
327 channel2.sink.close();
328 }, count: 1));
329 });
330
331 test("after the stream closes, the sink ignores events", () async {
332 channel1.sink.close();
333
334 // Wait for the done event to be delivered.
335 await channel2.stream.toList();
336 channel2.sink.add(1);
337 channel2.sink.add(2);
338 channel2.sink.add(3);
339 channel2.sink.close();
340
341 // None of our channel.sink additions should make it to the other endpoi nt.
342 channel1.stream.listen(expectAsync((_) {}, count: 0));
343 await pumpEventQueue();
344 });
345
346 test("canceling the stream's subscription has no effect on the sink",
347 () async {
348 channel1.stream.listen(null).cancel();
349 await pumpEventQueue();
350
351 channel1.sink.add(1);
352 channel1.sink.add(2);
353 channel1.sink.add(3);
354 channel1.sink.close();
355 expect(channel2.stream.toList(), completion(equals([1, 2, 3])));
356 });
357
358 test("canceling the stream's subscription doesn't stop a done event",
359 () async {
360 channel1.stream.listen(null).cancel();
361 await pumpEventQueue();
362
363 channel2.sink.close();
364 await pumpEventQueue();
365
366 channel1.sink.add(1);
367 channel1.sink.add(2);
368 channel1.sink.add(3);
369 channel1.sink.close();
370
371 // The sink should be ignoring events because the channel closed.
372 channel2.stream.listen(expectAsync((_) {}, count: 0));
373 await pumpEventQueue();
374 });
375 });
376
377 group("for a virtual channel:", () {
378 var virtual1;
379 var virtual2;
380 setUp(() {
381 virtual1 = channel1.virtualChannel();
382 virtual2 = channel2.virtualChannel(virtual1.id);
383 });
384
385 test("closing the sink causes the stream to close before it emits any more "
386 "events", () {
387 virtual1.sink.add(1);
388 virtual1.sink.add(2);
389 virtual1.sink.add(3);
390
391 virtual2.stream.listen(expectAsync((message) {
392 expect(message, equals(1));
393 virtual2.sink.close();
394 }, count: 1));
395 });
396
397 test("after the stream closes, the sink ignores events", () async {
398 virtual1.sink.close();
399
400 // Wait for the done event to be delivered.
401 await virtual2.stream.toList();
402 virtual2.sink.add(1);
403 virtual2.sink.add(2);
404 virtual2.sink.add(3);
405 virtual2.sink.close();
406
407 // None of our virtual.sink additions should make it to the other endpoi nt.
408 virtual1.stream.listen(expectAsync((_) {}, count: 0));
409 await pumpEventQueue();
410 });
411
412 test("canceling the stream's subscription has no effect on the sink",
413 () async {
414 virtual1.stream.listen(null).cancel();
415 await pumpEventQueue();
416
417 virtual1.sink.add(1);
418 virtual1.sink.add(2);
419 virtual1.sink.add(3);
420 virtual1.sink.close();
421 expect(virtual2.stream.toList(), completion(equals([1, 2, 3])));
422 });
423
424 test("canceling the stream's subscription doesn't stop a done event",
425 () async {
426 virtual1.stream.listen(null).cancel();
427 await pumpEventQueue();
428
429 virtual2.sink.close();
430 await pumpEventQueue();
431
432 virtual1.sink.add(1);
433 virtual1.sink.add(2);
434 virtual1.sink.add(3);
435 virtual1.sink.close();
436
437 // The sink should be ignoring events because the stream closed.
438 virtual2.stream.listen(expectAsync((_) {}, count: 0));
439 await pumpEventQueue();
440 });
310 }); 441 });
311 }); 442 });
312 } 443 }
OLDNEW
« no previous file with comments | « pubspec.yaml ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698