Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(8)

Side by Side Diff: tests/lib/async/stream_controller_async_test.dart

Issue 16131003: Reapply "Active stream subscriptions". (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Patch from sgjesse fixing file descriptor error. Created 7 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « sdk/lib/utf/utf_stream.dart ('k') | tests/lib/async/stream_state_helper.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 // Test the basic StreamController and StreamController.singleSubscription. 5 // Test the basic StreamController and StreamController.singleSubscription.
6 library stream_controller_async_test; 6 library stream_controller_async_test;
7 7
8 import "package:expect/expect.dart"; 8 import "package:expect/expect.dart";
9 import 'dart:async'; 9 import 'dart:async';
10 import 'dart:isolate'; 10 import 'dart:isolate';
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after
46 }); 46 });
47 47
48 test("Single-subscription StreamController.fold throws", () { 48 test("Single-subscription StreamController.fold throws", () {
49 StreamController c = new StreamController(); 49 StreamController c = new StreamController();
50 Stream stream = c.stream; 50 Stream stream = c.stream;
51 stream.fold(0, (a,b) { throw "Fnyf!"; }) 51 stream.fold(0, (a,b) { throw "Fnyf!"; })
52 .catchError(expectAsync1((e) { Expect.equals("Fnyf!", e); })); 52 .catchError(expectAsync1((e) { Expect.equals("Fnyf!", e); }));
53 c.add(42); 53 c.add(42);
54 }); 54 });
55 55
56 test("Single-subscription StreamController subscription changes", () {
57 StreamController c = new StreamController();
58 EventSink sink = c.sink;
59 Stream stream = c.stream;
60 int counter = 0;
61 var subscription;
62 subscription = stream.listen((data) {
63 counter += data;
64 Expect.throws(() => stream.listen(null), (e) => e is StateError);
65 subscription.cancel();
66 stream.listen((data) {
67 counter += data * 10;
68 },
69 onDone: expectAsync0(() {
70 Expect.equals(1 + 20, counter);
71 }));
72 });
73 sink.add(1);
74 sink.add(2);
75 sink.close();
76 });
77
78 test("Single-subscription StreamController events are buffered when" 56 test("Single-subscription StreamController events are buffered when"
79 " there is no subscriber", 57 " there is no subscriber",
80 () { 58 () {
81 StreamController c = new StreamController(); 59 StreamController c = new StreamController();
82 EventSink sink = c.sink; 60 EventSink sink = c.sink;
83 Stream stream = c.stream; 61 Stream stream = c.stream;
84 int counter = 0; 62 int counter = 0;
85 sink.add(1); 63 sink.add(1);
86 sink.add(2); 64 sink.add(2);
87 sink.close(); 65 sink.close();
88 stream.listen( 66 stream.listen(
89 (data) { 67 (data) {
90 counter += data; 68 counter += data;
91 }, 69 },
92 onDone: expectAsync0(() { 70 onDone: expectAsync0(() {
93 Expect.equals(3, counter); 71 Expect.equals(3, counter);
94 })); 72 }));
95 }); 73 });
96
97 // Test subscription changes while firing.
98 test("Single-subscription StreamController subscription changes while firing",
99 () {
100 StreamController c = new StreamController();
101 EventSink sink = c.sink;
102 Stream stream = c.stream;
103 int counter = 0;
104 var subscription = stream.listen(null);
105 subscription.onData(expectAsync1((data) {
106 counter += data;
107 subscription.cancel();
108 stream.listen((data) {
109 counter += 10 * data;
110 },
111 onDone: expectAsync0(() {
112 Expect.equals(1 + 20 + 30 + 40 + 50, counter);
113 }));
114 Expect.throws(() => stream.listen(null), (e) => e is StateError);
115 }));
116 sink.add(1); // seen by stream 1
117 sink.add(2); // seen by stream 10 and 100
118 sink.add(3); // -"-
119 sink.add(4); // -"-
120 sink.add(5); // seen by stream 10
121 sink.close();
122 });
123 } 74 }
124 75
125 testExtraMethods() { 76 testExtraMethods() {
126 Events sentEvents = new Events()..add(7)..add(9)..add(13)..add(87)..close(); 77 Events sentEvents = new Events()..add(7)..add(9)..add(13)..add(87)..close();
127 78
128 test("forEach", () { 79 test("forEach", () {
129 StreamController c = new StreamController(); 80 StreamController c = new StreamController();
130 Events actualEvents = new Events(); 81 Events actualEvents = new Events();
131 Future f = c.stream.forEach(actualEvents.add); 82 Future f = c.stream.forEach(actualEvents.add);
132 f.then(expectAsync1((_) { 83 f.then(expectAsync1((_) {
(...skipping 338 matching lines...) Expand 10 before | Expand all | Expand 10 after
471 testStreamError("handleError", (s, act) => s.handleError(act)); 422 testStreamError("handleError", (s, act) => s.handleError(act));
472 testStreamError("handleTest", (s, act) => s.handleError((v) {}, test: act)); 423 testStreamError("handleTest", (s, act) => s.handleError((v) {}, test: act));
473 testFuture("forEach", (s, act) => s.forEach(act)); 424 testFuture("forEach", (s, act) => s.forEach(act));
474 testFuture("every", (s, act) => s.every(act)); 425 testFuture("every", (s, act) => s.every(act));
475 testFuture("any", (s, act) => s.any(act)); 426 testFuture("any", (s, act) => s.any(act));
476 testFuture("reduce", (s, act) => s.reduce((a,b) => act(b))); 427 testFuture("reduce", (s, act) => s.reduce((a,b) => act(b)));
477 testFuture("fold", (s, act) => s.fold(0, (a,b) => act(b))); 428 testFuture("fold", (s, act) => s.fold(0, (a,b) => act(b)));
478 testFuture("drain", (s, act) => s.drain().then(act)); 429 testFuture("drain", (s, act) => s.drain().then(act));
479 } 430 }
480 431
432 void testMultiplex() {
433 test("multiplex-basic", () {
434 StreamController<int> c = new StreamController.multiplex(
435 onListen: expectAsync0(() {}),
436 onCancel: expectAsync0(() {})
437 );
438 Stream<int> s = c.stream;
439 s.listen(expectAsync1((x) { expect(x, equals(42)); }));
440 c.add(42);
441 c.close();
442 });
443
444 test("multiplex-listen-twice", () {
445 StreamController<int> c = new StreamController.multiplex(
446 onListen: expectAsync0(() {}),
447 onCancel: expectAsync0(() {})
448 );
449 c.stream.listen(expectAsync1((x) { expect(x, equals(42)); }, count: 2));
450 c.add(42);
451 c.stream.listen(expectAsync1((x) { expect(x, equals(42)); }));
452 c.add(42);
453 c.close();
454 });
455
456 test("multiplex-listen-twice-non-overlap", () {
457 StreamController<int> c = new StreamController.multiplex(
458 onListen: expectAsync0(() {}, count: 2),
459 onCancel: expectAsync0(() {}, count: 2)
460 );
461 var sub = c.stream.listen(expectAsync1((x) { expect(x, equals(42)); }));
462 c.add(42);
463 sub.cancel();
464 c.stream.listen(expectAsync1((x) { expect(x, equals(42)); }));
465 c.add(42);
466 c.close();
467 });
468
469 test("multiplex-individual-pause", () {
470 StreamController<int> c = new StreamController.multiplex(
471 onListen: expectAsync0(() {}),
472 onCancel: expectAsync0(() {})
473 );
474 var sub1 = c.stream.listen(expectAsync1((x) { expect(x, equals(42)); }));
475 var sub2 = c.stream.listen(expectAsync1((x) { expect(x, equals(42)); },
476 count: 3));
477 c.add(42);
478 sub1.pause();
479 c.add(42);
480 sub1.cancel();
481 var sub3 = c.stream.listen(expectAsync1((x) { expect(x, equals(42)); }));
482 c.add(42);
483 c.close();
484 });
485 }
486
481 main() { 487 main() {
482 testController(); 488 testController();
483 testSingleController(); 489 testSingleController();
484 testExtraMethods(); 490 testExtraMethods();
485 testPause(); 491 testPause();
486 testRethrow(); 492 testRethrow();
493 testMultiplex();
487 } 494 }
OLDNEW
« no previous file with comments | « sdk/lib/utf/utf_stream.dart ('k') | tests/lib/async/stream_state_helper.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698