| OLD | NEW |
| 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 /** | |
| 6 * Help for combining multiple streams into a single stream. | |
| 7 */ | |
| 8 library dart.pkg.async.stream_zip; | |
| 9 | |
| 10 import "dart:async"; | 5 import "dart:async"; |
| 11 | 6 |
| 12 /** | 7 /// A stream that combines the values of other streams. |
| 13 * A stream that combines the values of other streams. | 8 /// |
| 14 */ | 9 /// This emits lists of collected values from each input stream. The first list |
| 15 class StreamZip extends Stream<List> { | 10 /// contains the first value emitted by each stream, the second contrains the |
| 16 final Iterable<Stream> _streams; | 11 /// second value, and so on. The lists have the same ordering as the iterable |
| 17 StreamZip(Iterable<Stream> streams) : _streams = streams; | 12 /// passed to [new StreamZip]. |
| 13 /// |
| 14 /// Any errors from any of the streams are forwarded directly to this stream. |
| 15 class StreamZip<T> extends Stream<List<T>> { |
| 16 final Iterable<Stream<T>> _streams; |
| 18 | 17 |
| 19 StreamSubscription<List> listen(void onData(List data), { | 18 StreamZip(Iterable<Stream<T>> streams) : _streams = streams; |
| 19 |
| 20 StreamSubscription<List<T>> listen(void onData(List data), { |
| 20 Function onError, | 21 Function onError, |
| 21 void onDone(), | 22 void onDone(), |
| 22 bool cancelOnError}) { | 23 bool cancelOnError}) { |
| 23 cancelOnError = identical(true, cancelOnError); | 24 cancelOnError = identical(true, cancelOnError); |
| 24 List<StreamSubscription> subscriptions = <StreamSubscription>[]; | 25 List<StreamSubscription> subscriptions = <StreamSubscription>[]; |
| 25 StreamController controller; | 26 StreamController controller; |
| 26 List current; | 27 List current; |
| 27 int dataCount = 0; | 28 int dataCount = 0; |
| 28 | 29 |
| 29 /// Called for each data from a subscription in [subscriptions]. | 30 /// Called for each data from a subscription in [subscriptions]. |
| (...skipping 80 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 110 | 111 |
| 111 if (subscriptions.isEmpty) { | 112 if (subscriptions.isEmpty) { |
| 112 controller.close(); | 113 controller.close(); |
| 113 } | 114 } |
| 114 return controller.stream.listen(onData, | 115 return controller.stream.listen(onData, |
| 115 onError: onError, | 116 onError: onError, |
| 116 onDone: onDone, | 117 onDone: onDone, |
| 117 cancelOnError: cancelOnError); | 118 cancelOnError: cancelOnError); |
| 118 } | 119 } |
| 119 } | 120 } |
| OLD | NEW |