OLD | NEW |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 /** | |
6 * Help for combining multiple streams into a single stream. | |
7 */ | |
8 library dart.pkg.async.stream_zip; | |
9 | |
10 import "dart:async"; | 5 import "dart:async"; |
11 | 6 |
12 /** | 7 /// A stream that combines the values of other streams. |
13 * A stream that combines the values of other streams. | 8 /// |
14 */ | 9 /// This emits lists of collected values from each input stream. The first list |
15 class StreamZip extends Stream<List> { | 10 /// contains the first value emitted by each stream, the second contains the |
16 final Iterable<Stream> _streams; | 11 /// second value, and so on. The lists have the same ordering as the iterable |
17 StreamZip(Iterable<Stream> streams) : _streams = streams; | 12 /// passed to [new StreamZip]. |
| 13 /// |
| 14 /// Any errors from any of the streams are forwarded directly to this stream. |
| 15 class StreamZip<T> extends Stream<List<T>> { |
| 16 final Iterable<Stream<T>> _streams; |
18 | 17 |
19 StreamSubscription<List> listen(void onData(List data), { | 18 StreamZip(Iterable<Stream<T>> streams) : _streams = streams; |
20 Function onError, | 19 |
21 void onDone(), | 20 StreamSubscription<List<T>> listen(void onData(List<T> data), |
22 bool cancelOnError}) { | 21 {Function onError, void onDone(), bool cancelOnError}) { |
23 cancelOnError = identical(true, cancelOnError); | 22 cancelOnError = identical(true, cancelOnError); |
24 List<StreamSubscription> subscriptions = <StreamSubscription>[]; | 23 var subscriptions = <StreamSubscription<T>>[]; |
25 StreamController controller; | 24 StreamController<List<T>> controller; |
26 List current; | 25 List<T> current; |
27 int dataCount = 0; | 26 int dataCount = 0; |
28 | 27 |
29 /// Called for each data from a subscription in [subscriptions]. | 28 /// Called for each data from a subscription in [subscriptions]. |
30 void handleData(int index, data) { | 29 void handleData(int index, T data) { |
31 current[index] = data; | 30 current[index] = data; |
32 dataCount++; | 31 dataCount++; |
33 if (dataCount == subscriptions.length) { | 32 if (dataCount == subscriptions.length) { |
34 List data = current; | 33 var data = current; |
35 current = new List(subscriptions.length); | 34 current = new List(subscriptions.length); |
36 dataCount = 0; | 35 dataCount = 0; |
37 for (int i = 0; i < subscriptions.length; i++) { | 36 for (int i = 0; i < subscriptions.length; i++) { |
38 if (i != index) subscriptions[i].resume(); | 37 if (i != index) subscriptions[i].resume(); |
39 } | 38 } |
40 controller.add(data); | 39 controller.add(data); |
41 } else { | 40 } else { |
42 subscriptions[index].pause(); | 41 subscriptions[index].pause(); |
43 } | 42 } |
44 } | 43 } |
(...skipping 17 matching lines...) Expand all Loading... |
62 } | 61 } |
63 | 62 |
64 void handleDone() { | 63 void handleDone() { |
65 for (int i = 0; i < subscriptions.length; i++) { | 64 for (int i = 0; i < subscriptions.length; i++) { |
66 subscriptions[i].cancel(); | 65 subscriptions[i].cancel(); |
67 } | 66 } |
68 controller.close(); | 67 controller.close(); |
69 } | 68 } |
70 | 69 |
71 try { | 70 try { |
72 for (Stream stream in _streams) { | 71 for (var stream in _streams) { |
73 int index = subscriptions.length; | 72 int index = subscriptions.length; |
74 subscriptions.add(stream.listen( | 73 subscriptions.add(stream.listen((data) { |
75 (data) { handleData(index, data); }, | 74 handleData(index, data); |
| 75 }, |
76 onError: cancelOnError ? handleError : handleErrorCancel, | 76 onError: cancelOnError ? handleError : handleErrorCancel, |
77 onDone: handleDone, | 77 onDone: handleDone, |
78 cancelOnError: cancelOnError)); | 78 cancelOnError: cancelOnError)); |
79 } | 79 } |
80 } catch (e) { | 80 } catch (e) { |
81 for (int i = subscriptions.length - 1; i >= 0; i--) { | 81 for (int i = subscriptions.length - 1; i >= 0; i--) { |
82 subscriptions[i].cancel(); | 82 subscriptions[i].cancel(); |
83 } | 83 } |
84 rethrow; | 84 rethrow; |
85 } | 85 } |
86 | 86 |
87 current = new List(subscriptions.length); | 87 current = new List(subscriptions.length); |
88 | 88 |
89 controller = new StreamController<List>( | 89 controller = new StreamController<List<T>>(onPause: () { |
90 onPause: () { | 90 for (int i = 0; i < subscriptions.length; i++) { |
91 for (int i = 0; i < subscriptions.length; i++) { | 91 // This may pause some subscriptions more than once. |
92 // This may pause some subscriptions more than once. | 92 // These will not be resumed by onResume below, but must wait for the |
93 // These will not be resumed by onResume below, but must wait for the | 93 // next round. |
94 // next round. | 94 subscriptions[i].pause(); |
95 subscriptions[i].pause(); | |
96 } | |
97 }, | |
98 onResume: () { | |
99 for (int i = 0; i < subscriptions.length; i++) { | |
100 subscriptions[i].resume(); | |
101 } | |
102 }, | |
103 onCancel: () { | |
104 for (int i = 0; i < subscriptions.length; i++) { | |
105 // Canceling more than once is safe. | |
106 subscriptions[i].cancel(); | |
107 } | |
108 } | 95 } |
109 ); | 96 }, onResume: () { |
| 97 for (int i = 0; i < subscriptions.length; i++) { |
| 98 subscriptions[i].resume(); |
| 99 } |
| 100 }, onCancel: () { |
| 101 for (int i = 0; i < subscriptions.length; i++) { |
| 102 // Canceling more than once is safe. |
| 103 subscriptions[i].cancel(); |
| 104 } |
| 105 }); |
110 | 106 |
111 if (subscriptions.isEmpty) { | 107 if (subscriptions.isEmpty) { |
112 controller.close(); | 108 controller.close(); |
113 } | 109 } |
114 return controller.stream.listen(onData, | 110 return controller.stream.listen(onData, |
115 onError: onError, | 111 onError: onError, onDone: onDone, cancelOnError: cancelOnError); |
116 onDone: onDone, | |
117 cancelOnError: cancelOnError); | |
118 } | 112 } |
119 } | 113 } |
OLD | NEW |