Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(590)

Side by Side Diff: tests/lib/async/merge_stream_test.dart

Issue 12049013: Change singleSubscription/multiSubscription to normal/broadcast. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Addressed comments, renamed .multiSubscription to .broadcast. Created 7 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 // Test merging streams. 5 // Test merging streams.
6 library merge_stream_test; 6 library merge_stream_test;
7 7
8 import "dart:async"; 8 import "dart:async";
9 import '../../../pkg/unittest/lib/unittest.dart'; 9 import '../../../pkg/unittest/lib/unittest.dart';
10 import 'event_helper.dart'; 10 import 'event_helper.dart';
11 11
12 testSupercedeStream() { 12 testSupercedeStream() {
13 { // Simple case of superceding lower priority streams. 13 { // Simple case of superceding lower priority streams.
14 StreamController s1 = new StreamController.multiSubscription(); 14 StreamController s1 = new StreamController.broadcast();
15 StreamController s2 = new StreamController.multiSubscription(); 15 StreamController s2 = new StreamController.broadcast();
16 StreamController s3 = new StreamController.multiSubscription(); 16 StreamController s3 = new StreamController.broadcast();
17 Stream merge = new Stream.superceding([s1.stream, s2.stream, s3.stream]); 17 Stream merge = new Stream.superceding([s1.stream, s2.stream, s3.stream]);
18 Events expected = new Events()..add(1)..add(2)..add(3)..add(4)..close(); 18 Events expected = new Events()..add(1)..add(2)..add(3)..add(4)..close();
19 Events actual = new Events.capture(merge); 19 Events actual = new Events.capture(merge);
20 s1.add(1); 20 s1.add(1);
21 s2.add(2); 21 s2.add(2);
22 s1.add(1); // Ignored. 22 s1.add(1); // Ignored.
23 s2.add(3); 23 s2.add(3);
24 s3.add(4); 24 s3.add(4);
25 s2.add(3); // Ignored. 25 s2.add(3); // Ignored.
26 s3.close(); 26 s3.close();
27 Expect.listEquals(expected.events, actual.events); 27 Expect.listEquals(expected.events, actual.events);
28 } 28 }
29 29
30 { // Superceding more than one stream at a time. 30 { // Superceding more than one stream at a time.
31 StreamController s1 = new StreamController.multiSubscription(); 31 StreamController s1 = new StreamController.broadcast();
32 StreamController s2 = new StreamController.multiSubscription(); 32 StreamController s2 = new StreamController.broadcast();
33 StreamController s3 = new StreamController.multiSubscription(); 33 StreamController s3 = new StreamController.broadcast();
34 Stream merge = new Stream.superceding([s1.stream, s2.stream, s3.stream]); 34 Stream merge = new Stream.superceding([s1.stream, s2.stream, s3.stream]);
35 Events expected = new Events()..add(1)..add(2)..close(); 35 Events expected = new Events()..add(1)..add(2)..close();
36 Events actual = new Events.capture(merge); 36 Events actual = new Events.capture(merge);
37 s1.add(1); 37 s1.add(1);
38 s3.add(2); 38 s3.add(2);
39 s1.add(1); // Ignored. 39 s1.add(1); // Ignored.
40 s2.add(1); // Ignored. 40 s2.add(1); // Ignored.
41 s3.close(); 41 s3.close();
42 Expect.listEquals(expected.events, actual.events); 42 Expect.listEquals(expected.events, actual.events);
43 } 43 }
44 44
45 { // Closing a stream before superceding it. 45 { // Closing a stream before superceding it.
46 StreamController s1 = new StreamController.multiSubscription(); 46 StreamController s1 = new StreamController.broadcast();
47 StreamController s2 = new StreamController.multiSubscription(); 47 StreamController s2 = new StreamController.broadcast();
48 StreamController s3 = new StreamController.multiSubscription(); 48 StreamController s3 = new StreamController.broadcast();
49 Stream merge = new Stream.superceding([s1.stream, s2.stream, s3.stream]); 49 Stream merge = new Stream.superceding([s1.stream, s2.stream, s3.stream]);
50 Events expected = new Events()..add(1)..add(2)..add(3)..close(); 50 Events expected = new Events()..add(1)..add(2)..add(3)..close();
51 Events actual = new Events.capture(merge); 51 Events actual = new Events.capture(merge);
52 s1.add(1); 52 s1.add(1);
53 s1.close(); 53 s1.close();
54 s3.close(); 54 s3.close();
55 s2.add(2); 55 s2.add(2);
56 s2.add(3); 56 s2.add(3);
57 s2.close(); 57 s2.close();
58 Expect.listEquals(expected.events, actual.events); 58 Expect.listEquals(expected.events, actual.events);
59 } 59 }
60 60
61 { // Errors from all non-superceded streams are forwarded. 61 { // Errors from all non-superceded streams are forwarded.
62 StreamController s1 = new StreamController.multiSubscription(); 62 StreamController s1 = new StreamController.broadcast();
63 StreamController s2 = new StreamController.multiSubscription(); 63 StreamController s2 = new StreamController.broadcast();
64 StreamController s3 = new StreamController.multiSubscription(); 64 StreamController s3 = new StreamController.broadcast();
65 Stream merge = new Stream.superceding([s1.stream, s2.stream, s3.stream]); 65 Stream merge = new Stream.superceding([s1.stream, s2.stream, s3.stream]);
66 Events expected = 66 Events expected =
67 new Events()..add(1)..error("1")..error("2")..error("3") 67 new Events()..add(1)..error("1")..error("2")..error("3")
68 ..add(3)..error("6")..add(4)..close(); 68 ..add(3)..error("6")..add(4)..close();
69 Events actual = new Events.capture(merge); 69 Events actual = new Events.capture(merge);
70 s1.add(1); 70 s1.add(1);
71 s1.signalError(new AsyncError("1")); 71 s1.signalError(new AsyncError("1"));
72 s2.signalError(new AsyncError("2")); 72 s2.signalError(new AsyncError("2"));
73 s3.signalError(new AsyncError("3")); 73 s3.signalError(new AsyncError("3"));
74 s3.add(3); 74 s3.add(3);
75 s1.signalError(new AsyncError("4")); 75 s1.signalError(new AsyncError("4"));
76 s2.signalError(new AsyncError("5")); 76 s2.signalError(new AsyncError("5"));
77 s3.signalError(new AsyncError("6")); 77 s3.signalError(new AsyncError("6"));
78 s1.close(); 78 s1.close();
79 s2.close(); 79 s2.close();
80 s3.add(4); 80 s3.add(4);
81 s3.close(); 81 s3.close();
82 Expect.listEquals(expected.events, actual.events); 82 Expect.listEquals(expected.events, actual.events);
83 } 83 }
84 84
85 test("Pausing on a superceding stream", () { 85 test("Pausing on a superceding stream", () {
86 StreamController s1 = new StreamController.multiSubscription(); 86 StreamController s1 = new StreamController.broadcast();
87 StreamController s2 = new StreamController.multiSubscription(); 87 StreamController s2 = new StreamController.broadcast();
88 StreamController s3 = new StreamController.multiSubscription(); 88 StreamController s3 = new StreamController.broadcast();
89 Stream merge = new Stream.superceding([s1.stream, s2.stream, s3.stream]); 89 Stream merge = new Stream.superceding([s1.stream, s2.stream, s3.stream]);
90 Events expected = new Events()..add(1)..add(2)..add(3); 90 Events expected = new Events()..add(1)..add(2)..add(3);
91 Events actual = new Events.capture(merge); 91 Events actual = new Events.capture(merge);
92 s1.add(1); 92 s1.add(1);
93 s2.add(2); 93 s2.add(2);
94 s2.add(3); 94 s2.add(3);
95 Expect.listEquals(expected.events, actual.events); 95 Expect.listEquals(expected.events, actual.events);
96 actual.pause(); // Pauses the stream that feeds the actual Events. 96 actual.pause(); // Pauses the stream that feeds the actual Events.
97 Events expected2 = expected.copy(); 97 Events expected2 = expected.copy();
98 expected..add(5)..add(6)..close(); 98 expected..add(5)..add(6)..close();
99 expected2..add(6)..close(); 99 expected2..add(6)..close();
100 s1.add(4); 100 s1.add(4);
101 s2.add(5); // May or may not arrive before '6' when resuming. 101 s2.add(5); // May or may not arrive before '6' when resuming.
102 s3.add(6); 102 s3.add(6);
103 s3.close(); 103 s3.close();
104 actual.onDone(expectAsync0(() { 104 actual.onDone(expectAsync0(() {
105 if (expected.events.length == actual.events.length) { 105 if (expected.events.length == actual.events.length) {
106 Expect.listEquals(expected.events, actual.events); 106 Expect.listEquals(expected.events, actual.events);
107 } else { 107 } else {
108 Expect.listEquals(expected2.events, actual.events); 108 Expect.listEquals(expected2.events, actual.events);
109 } 109 }
110 })); 110 }));
111 actual.resume(); 111 actual.resume();
112 }); 112 });
113 } 113 }
114 114
115 void testCyclicStream() { 115 void testCyclicStream() {
116 test("Simple case of superceding lower priority streams", () { 116 test("Simple case of superceding lower priority streams", () {
117 StreamController s1 = new StreamController.multiSubscription(); 117 StreamController s1 = new StreamController.broadcast();
118 StreamController s2 = new StreamController.multiSubscription(); 118 StreamController s2 = new StreamController.broadcast();
119 StreamController s3 = new StreamController.multiSubscription(); 119 StreamController s3 = new StreamController.broadcast();
120 Stream merge = new Stream.cyclic([s1.stream, s2.stream, s3.stream]); 120 Stream merge = new Stream.cyclic([s1.stream, s2.stream, s3.stream]);
121 Events expected = 121 Events expected =
122 new Events()..add(1)..add(2)..add(3)..add(4)..add(5)..add(6)..close(); 122 new Events()..add(1)..add(2)..add(3)..add(4)..add(5)..add(6)..close();
123 Events actual = new Events.capture(merge); 123 Events actual = new Events.capture(merge);
124 Expect.isFalse(s1.isPaused); 124 Expect.isFalse(s1.isPaused);
125 Expect.isTrue(s2.isPaused); 125 Expect.isTrue(s2.isPaused);
126 Expect.isTrue(s3.isPaused); 126 Expect.isTrue(s3.isPaused);
127 s3.add(3); 127 s3.add(3);
128 s1.add(1); 128 s1.add(1);
129 s1.add(4); 129 s1.add(4);
130 s1.add(6); 130 s1.add(6);
131 s1.close(); 131 s1.close();
132 s2.add(2); 132 s2.add(2);
133 s2.add(5); 133 s2.add(5);
134 s2.close(); 134 s2.close();
135 s3.close(); 135 s3.close();
136 actual.onDone(expectAsync0(() { 136 actual.onDone(expectAsync0(() {
137 Expect.listEquals(expected.events, actual.events); 137 Expect.listEquals(expected.events, actual.events);
138 })); 138 }));
139 }); 139 });
140 140
141 test("Cyclic merge with errors", () { 141 test("Cyclic merge with errors", () {
142 StreamController s1 = new StreamController.multiSubscription(); 142 StreamController s1 = new StreamController.broadcast();
143 StreamController s2 = new StreamController.multiSubscription(); 143 StreamController s2 = new StreamController.broadcast();
144 StreamController s3 = new StreamController.multiSubscription(); 144 StreamController s3 = new StreamController.broadcast();
145 Stream merge = new Stream.cyclic([s1.stream, s2.stream, s3.stream]); 145 Stream merge = new Stream.cyclic([s1.stream, s2.stream, s3.stream]);
146 Events expected = 146 Events expected =
147 new Events()..add(1)..error("1")..add(2)..add(3)..error("2") 147 new Events()..add(1)..error("1")..add(2)..add(3)..error("2")
148 ..add(4)..add(5)..error("3")..add(6)..close(); 148 ..add(4)..add(5)..error("3")..add(6)..close();
149 Events actual = new Events.capture(merge); 149 Events actual = new Events.capture(merge);
150 Expect.isFalse(s1.isPaused); 150 Expect.isFalse(s1.isPaused);
151 Expect.isTrue(s2.isPaused); 151 Expect.isTrue(s2.isPaused);
152 Expect.isTrue(s3.isPaused); 152 Expect.isTrue(s3.isPaused);
153 s3.add(3); 153 s3.add(3);
154 s3.signalError(new AsyncError("3")); // Error just before a "done". 154 s3.signalError(new AsyncError("3")); // Error just before a "done".
(...skipping 10 matching lines...) Expand all
165 actual.onDone(expectAsync0(() { 165 actual.onDone(expectAsync0(() {
166 Expect.listEquals(expected.events, actual.events); 166 Expect.listEquals(expected.events, actual.events);
167 })); 167 }));
168 }); 168 });
169 } 169 }
170 170
171 main() { 171 main() {
172 testSupercedeStream(); 172 testSupercedeStream();
173 testCyclicStream(); 173 testCyclicStream();
174 } 174 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698