OLD | NEW |
| (Empty) |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | |
2 // for details. All rights reserved. Use of this source code is governed by a | |
3 // BSD-style license that can be found in the LICENSE file. | |
4 | |
5 library barback.test.stream_pool_test; | |
6 | |
7 import 'dart:async'; | |
8 | |
9 import 'package:barback/src/utils.dart'; | |
10 import 'package:barback/src/utils/stream_pool.dart'; | |
11 import 'package:scheduled_test/scheduled_test.dart'; | |
12 | |
13 import 'utils.dart'; | |
14 | |
15 main() { | |
16 initConfig(); | |
17 | |
18 group("buffered", () { | |
19 test("buffers events from multiple inputs", () { | |
20 var pool = new StreamPool<String>(); | |
21 | |
22 var controller1 = new StreamController<String>(); | |
23 pool.add(controller1.stream); | |
24 controller1.add("first"); | |
25 | |
26 var controller2 = new StreamController<String>(); | |
27 pool.add(controller2.stream); | |
28 controller2.add("second"); | |
29 | |
30 // Call [toList] asynchronously to be sure that the events have been | |
31 // buffered beforehand and aren't just being received unbuffered. | |
32 expect(newFuture(() => pool.stream.toList()), | |
33 completion(equals(["first", "second"]))); | |
34 | |
35 pumpEventQueue().then((_) => pool.close()); | |
36 }); | |
37 | |
38 test("buffers errors from multiple inputs", () { | |
39 var pool = new StreamPool<String>(); | |
40 | |
41 var controller1 = new StreamController<String>(); | |
42 pool.add(controller1.stream); | |
43 controller1.add("first"); | |
44 | |
45 var controller2 = new StreamController<String>(); | |
46 pool.add(controller2.stream); | |
47 controller2.add("second"); | |
48 controller1.addError("third"); | |
49 controller2.addError("fourth"); | |
50 controller1.add("fifth"); | |
51 | |
52 expect(newFuture(() { | |
53 return pool.stream.transform(new StreamTransformer.fromHandlers( | |
54 handleData: (data, sink) => sink.add(["data", data]), | |
55 handleError: (error, stackTrace, sink) { | |
56 sink.add(["error", error]); | |
57 })).toList(); | |
58 }), completion(equals([ | |
59 ["data", "first"], | |
60 ["data", "second"], | |
61 ["error", "third"], | |
62 ["error", "fourth"], | |
63 ["data", "fifth"] | |
64 ]))); | |
65 | |
66 pumpEventQueue().then((_) => pool.close()); | |
67 }); | |
68 | |
69 test("buffers inputs from a broadcast stream", () { | |
70 var pool = new StreamPool<String>(); | |
71 var controller = new StreamController<String>.broadcast(); | |
72 pool.add(controller.stream); | |
73 controller.add("first"); | |
74 controller.add("second"); | |
75 | |
76 // Call [toList] asynchronously to be sure that the events have been | |
77 // buffered beforehand and aren't just being received unbuffered. | |
78 expect(newFuture(() => pool.stream.toList()), | |
79 completion(equals(["first", "second"]))); | |
80 | |
81 pumpEventQueue().then((_) => pool.close()); | |
82 }); | |
83 }); | |
84 | |
85 group("broadcast", () { | |
86 test("doesn't buffer inputs", () { | |
87 var pool = new StreamPool<String>.broadcast(); | |
88 | |
89 var controller1 = new StreamController<String>.broadcast(); | |
90 pool.add(controller1.stream); | |
91 controller1.add("first"); | |
92 | |
93 var controller2 = new StreamController<String>.broadcast(); | |
94 pool.add(controller2.stream); | |
95 controller2.add("second"); | |
96 | |
97 // Call [toList] asynchronously to be sure that the events have been | |
98 // buffered beforehand and aren't just being received unbuffered. | |
99 expect(newFuture(() => pool.stream.toList()), completion(isEmpty)); | |
100 | |
101 pumpEventQueue().then((_) => pool.close()); | |
102 }); | |
103 | |
104 test("doesn't buffer errors", () { | |
105 var pool = new StreamPool<String>.broadcast(); | |
106 | |
107 var controller1 = new StreamController<String>.broadcast(); | |
108 pool.add(controller1.stream); | |
109 controller1.addError("first"); | |
110 | |
111 var controller2 = new StreamController<String>.broadcast(); | |
112 pool.add(controller2.stream); | |
113 controller2.addError("second"); | |
114 | |
115 expect(newFuture(() { | |
116 return pool.stream.transform(new StreamTransformer.fromHandlers( | |
117 handleData: (data, sink) => sink.add(data), | |
118 handleError: (error, stackTrace, sink) { sink.add(error); })) | |
119 .toList(); | |
120 }), completion(isEmpty)); | |
121 | |
122 pumpEventQueue().then((_) => pool.close()); | |
123 }); | |
124 | |
125 test("doesn't buffer inputs from a buffered stream", () { | |
126 var pool = new StreamPool<String>.broadcast(); | |
127 var controller = new StreamController<String>(); | |
128 pool.add(controller.stream); | |
129 controller.add("first"); | |
130 controller.add("second"); | |
131 | |
132 expect(pumpEventQueue().then((_) => pool.stream.toList()), | |
133 completion(isEmpty)); | |
134 | |
135 pumpEventQueue().then((_) => pool.close()); | |
136 }); | |
137 }); | |
138 | |
139 for (var type in ["buffered", "broadcast"]) { | |
140 group(type, () { | |
141 var pool; | |
142 var bufferedController; | |
143 var bufferedStream; | |
144 var bufferedSyncController; | |
145 var broadcastController; | |
146 var broadcastStream; | |
147 var broadcastSyncController; | |
148 | |
149 setUp(() { | |
150 if (type == "buffered") { | |
151 pool = new StreamPool<String>(); | |
152 } else { | |
153 pool = new StreamPool<String>.broadcast(); | |
154 } | |
155 | |
156 bufferedController = new StreamController<String>(); | |
157 pool.add(bufferedController.stream); | |
158 | |
159 bufferedSyncController = new StreamController<String>(sync: true); | |
160 pool.add(bufferedSyncController.stream); | |
161 | |
162 broadcastController = new StreamController<String>.broadcast(); | |
163 pool.add(broadcastController.stream); | |
164 | |
165 broadcastSyncController = | |
166 new StreamController<String>.broadcast(sync: true); | |
167 pool.add(broadcastSyncController.stream); | |
168 }); | |
169 | |
170 test("emits events to a listener", () { | |
171 expect(pool.stream.toList(), completion(equals(["first", "second"]))); | |
172 | |
173 bufferedController.add("first"); | |
174 broadcastController.add("second"); | |
175 pumpEventQueue().then((_) => pool.close()); | |
176 }); | |
177 | |
178 test("emits sync events synchronously", () { | |
179 var events = []; | |
180 pool.stream.listen(events.add); | |
181 | |
182 bufferedSyncController.add("first"); | |
183 expect(events, equals(["first"])); | |
184 | |
185 broadcastSyncController.add("second"); | |
186 expect(events, equals(["first", "second"])); | |
187 }); | |
188 | |
189 test("emits async events asynchronously", () { | |
190 var events = []; | |
191 pool.stream.listen(events.add); | |
192 | |
193 bufferedController.add("first"); | |
194 broadcastController.add("second"); | |
195 expect(events, isEmpty); | |
196 | |
197 expect(pumpEventQueue().then((_) => events), | |
198 completion(equals(["first", "second"]))); | |
199 }); | |
200 | |
201 test("doesn't emit events from removed streams", () { | |
202 expect(pool.stream.toList(), completion(equals(["first", "third"]))); | |
203 | |
204 bufferedController.add("first"); | |
205 expect(pumpEventQueue().then((_) { | |
206 pool.remove(bufferedController.stream); | |
207 bufferedController.add("second"); | |
208 }).then((_) { | |
209 broadcastController.add("third"); | |
210 return pumpEventQueue(); | |
211 }).then((_) { | |
212 pool.remove(broadcastController.stream); | |
213 broadcastController.add("fourth"); | |
214 pool.close(); | |
215 }), completes); | |
216 }); | |
217 }); | |
218 } | |
219 } | |
OLD | NEW |