| OLD | NEW |
| 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 /** | |
| 6 * Help for combining multiple streams into a single stream. | |
| 7 */ | |
| 8 library dart.pkg.async.stream_zip; | |
| 9 | |
| 10 import "dart:async"; | 5 import "dart:async"; |
| 11 | 6 |
| 12 /** | 7 /// A stream that combines the values of other streams. |
| 13 * A stream that combines the values of other streams. | 8 /// |
| 14 */ | 9 /// This emits lists of collected values from each input stream. The first list |
| 15 class StreamZip extends Stream<List> { | 10 /// contains the first value emitted by each stream, the second contains the |
| 16 final Iterable<Stream> _streams; | 11 /// second value, and so on. The lists have the same ordering as the iterable |
| 17 StreamZip(Iterable<Stream> streams) : _streams = streams; | 12 /// passed to [new StreamZip]. |
| 13 /// |
| 14 /// Any errors from any of the streams are forwarded directly to this stream. |
| 15 class StreamZip<T> extends Stream<List<T>> { |
| 16 final Iterable<Stream<T>> _streams; |
| 18 | 17 |
| 19 StreamSubscription<List> listen(void onData(List data), { | 18 StreamZip(Iterable<Stream<T>> streams) : _streams = streams; |
| 20 Function onError, | 19 |
| 21 void onDone(), | 20 StreamSubscription<List<T>> listen(void onData(List<T> data), |
| 22 bool cancelOnError}) { | 21 {Function onError, void onDone(), bool cancelOnError}) { |
| 23 cancelOnError = identical(true, cancelOnError); | 22 cancelOnError = identical(true, cancelOnError); |
| 24 List<StreamSubscription> subscriptions = <StreamSubscription>[]; | 23 var subscriptions = <StreamSubscription<T>>[]; |
| 25 StreamController controller; | 24 StreamController<List<T>> controller; |
| 26 List current; | 25 List<T> current; |
| 27 int dataCount = 0; | 26 int dataCount = 0; |
| 28 | 27 |
| 29 /// Called for each data from a subscription in [subscriptions]. | 28 /// Called for each data from a subscription in [subscriptions]. |
| 30 void handleData(int index, data) { | 29 void handleData(int index, T data) { |
| 31 current[index] = data; | 30 current[index] = data; |
| 32 dataCount++; | 31 dataCount++; |
| 33 if (dataCount == subscriptions.length) { | 32 if (dataCount == subscriptions.length) { |
| 34 List data = current; | 33 var data = current; |
| 35 current = new List(subscriptions.length); | 34 current = new List(subscriptions.length); |
| 36 dataCount = 0; | 35 dataCount = 0; |
| 37 for (int i = 0; i < subscriptions.length; i++) { | 36 for (int i = 0; i < subscriptions.length; i++) { |
| 38 if (i != index) subscriptions[i].resume(); | 37 if (i != index) subscriptions[i].resume(); |
| 39 } | 38 } |
| 40 controller.add(data); | 39 controller.add(data); |
| 41 } else { | 40 } else { |
| 42 subscriptions[index].pause(); | 41 subscriptions[index].pause(); |
| 43 } | 42 } |
| 44 } | 43 } |
| (...skipping 17 matching lines...) Expand all Loading... |
| 62 } | 61 } |
| 63 | 62 |
| 64 void handleDone() { | 63 void handleDone() { |
| 65 for (int i = 0; i < subscriptions.length; i++) { | 64 for (int i = 0; i < subscriptions.length; i++) { |
| 66 subscriptions[i].cancel(); | 65 subscriptions[i].cancel(); |
| 67 } | 66 } |
| 68 controller.close(); | 67 controller.close(); |
| 69 } | 68 } |
| 70 | 69 |
| 71 try { | 70 try { |
| 72 for (Stream stream in _streams) { | 71 for (var stream in _streams) { |
| 73 int index = subscriptions.length; | 72 int index = subscriptions.length; |
| 74 subscriptions.add(stream.listen( | 73 subscriptions.add(stream.listen((data) { |
| 75 (data) { handleData(index, data); }, | 74 handleData(index, data); |
| 75 }, |
| 76 onError: cancelOnError ? handleError : handleErrorCancel, | 76 onError: cancelOnError ? handleError : handleErrorCancel, |
| 77 onDone: handleDone, | 77 onDone: handleDone, |
| 78 cancelOnError: cancelOnError)); | 78 cancelOnError: cancelOnError)); |
| 79 } | 79 } |
| 80 } catch (e) { | 80 } catch (e) { |
| 81 for (int i = subscriptions.length - 1; i >= 0; i--) { | 81 for (int i = subscriptions.length - 1; i >= 0; i--) { |
| 82 subscriptions[i].cancel(); | 82 subscriptions[i].cancel(); |
| 83 } | 83 } |
| 84 rethrow; | 84 rethrow; |
| 85 } | 85 } |
| 86 | 86 |
| 87 current = new List(subscriptions.length); | 87 current = new List(subscriptions.length); |
| 88 | 88 |
| 89 controller = new StreamController<List>( | 89 controller = new StreamController<List<T>>(onPause: () { |
| 90 onPause: () { | 90 for (int i = 0; i < subscriptions.length; i++) { |
| 91 for (int i = 0; i < subscriptions.length; i++) { | 91 // This may pause some subscriptions more than once. |
| 92 // This may pause some subscriptions more than once. | 92 // These will not be resumed by onResume below, but must wait for the |
| 93 // These will not be resumed by onResume below, but must wait for the | 93 // next round. |
| 94 // next round. | 94 subscriptions[i].pause(); |
| 95 subscriptions[i].pause(); | |
| 96 } | |
| 97 }, | |
| 98 onResume: () { | |
| 99 for (int i = 0; i < subscriptions.length; i++) { | |
| 100 subscriptions[i].resume(); | |
| 101 } | |
| 102 }, | |
| 103 onCancel: () { | |
| 104 for (int i = 0; i < subscriptions.length; i++) { | |
| 105 // Canceling more than once is safe. | |
| 106 subscriptions[i].cancel(); | |
| 107 } | |
| 108 } | 95 } |
| 109 ); | 96 }, onResume: () { |
| 97 for (int i = 0; i < subscriptions.length; i++) { |
| 98 subscriptions[i].resume(); |
| 99 } |
| 100 }, onCancel: () { |
| 101 for (int i = 0; i < subscriptions.length; i++) { |
| 102 // Canceling more than once is safe. |
| 103 subscriptions[i].cancel(); |
| 104 } |
| 105 }); |
| 110 | 106 |
| 111 if (subscriptions.isEmpty) { | 107 if (subscriptions.isEmpty) { |
| 112 controller.close(); | 108 controller.close(); |
| 113 } | 109 } |
| 114 return controller.stream.listen(onData, | 110 return controller.stream.listen(onData, |
| 115 onError: onError, | 111 onError: onError, onDone: onDone, cancelOnError: cancelOnError); |
| 116 onDone: onDone, | |
| 117 cancelOnError: cancelOnError); | |
| 118 } | 112 } |
| 119 } | 113 } |
| OLD | NEW |