OLD | NEW |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 /** | |
6 * Help for combining multiple streams into a single stream. | |
7 */ | |
8 library dart.pkg.async.stream_zip; | |
9 | |
10 import "dart:async"; | 5 import "dart:async"; |
11 | 6 |
12 /** | 7 /// A stream that combines the values of other streams. |
13 * A stream that combines the values of other streams. | 8 /// |
14 */ | 9 /// This emits lists of collected values from each input stream. The first list |
15 class StreamZip extends Stream<List> { | 10 /// contains the first value emitted by each stream, the second contrains the |
16 final Iterable<Stream> _streams; | 11 /// second value, and so on. The lists have the same ordering as the iterable |
17 StreamZip(Iterable<Stream> streams) : _streams = streams; | 12 /// passed to [new StreamZip]. |
| 13 /// |
| 14 /// Any errors from any of the streams are forwarded directly to this stream. |
| 15 class StreamZip<T> extends Stream<List<T>> { |
| 16 final Iterable<Stream<T>> _streams; |
18 | 17 |
19 StreamSubscription<List> listen(void onData(List data), { | 18 StreamZip(Iterable<Stream<T>> streams) : _streams = streams; |
| 19 |
| 20 StreamSubscription<List<T>> listen(void onData(List data), { |
20 Function onError, | 21 Function onError, |
21 void onDone(), | 22 void onDone(), |
22 bool cancelOnError}) { | 23 bool cancelOnError}) { |
23 cancelOnError = identical(true, cancelOnError); | 24 cancelOnError = identical(true, cancelOnError); |
24 List<StreamSubscription> subscriptions = <StreamSubscription>[]; | 25 List<StreamSubscription> subscriptions = <StreamSubscription>[]; |
25 StreamController controller; | 26 StreamController controller; |
26 List current; | 27 List current; |
27 int dataCount = 0; | 28 int dataCount = 0; |
28 | 29 |
29 /// Called for each data from a subscription in [subscriptions]. | 30 /// Called for each data from a subscription in [subscriptions]. |
(...skipping 80 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
110 | 111 |
111 if (subscriptions.isEmpty) { | 112 if (subscriptions.isEmpty) { |
112 controller.close(); | 113 controller.close(); |
113 } | 114 } |
114 return controller.stream.listen(onData, | 115 return controller.stream.listen(onData, |
115 onError: onError, | 116 onError: onError, |
116 onDone: onDone, | 117 onDone: onDone, |
117 cancelOnError: cancelOnError); | 118 cancelOnError: cancelOnError); |
118 } | 119 } |
119 } | 120 } |
OLD | NEW |