OLD | NEW |
---|---|
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 /** | 5 /** |
6 * Help for combining multiple streams into a single stream. | 6 * Help for combining multiple streams into a single stream. |
7 * | 7 * |
8 * This API is also available as part of the | 8 * This API is also available as part of the |
9 * [sequence_zip](#sequence_zip) library. | 9 * [sequence_zip](#sequence_zip) library. |
10 */ | 10 */ |
11 library stream_zip; | 11 library stream_zip; |
12 | 12 |
13 import "dart:async"; | 13 import "dart:async"; |
14 | 14 |
15 /** | 15 /** |
16 * A stream that combines the values of other streams. | 16 * A stream that combines the values of other streams. |
17 */ | 17 */ |
18 class StreamZip extends Stream<List> { | 18 class StreamZip extends Stream<List> { |
19 final Iterable<Stream> _streams; | 19 final Iterable<Stream> _streams; |
20 StreamZip(Iterable<Stream> streams) : _streams = streams; | 20 StreamZip(Iterable<Stream> streams) : _streams = streams; |
21 | 21 |
22 StreamSubscription<List> listen(void onData(List data), { | 22 StreamSubscription<List> listen(void onData(List data), { |
23 void onError(Object error), | 23 Function onError, |
24 void onDone(), | 24 void onDone(), |
25 bool cancelOnError}) { | 25 bool cancelOnError}) { |
26 cancelOnError = identical(true, cancelOnError); | 26 cancelOnError = identical(true, cancelOnError); |
27 List<StreamSubscription> subscriptions = <StreamSubscription>[]; | 27 List<StreamSubscription> subscriptions = <StreamSubscription>[]; |
28 StreamController controller; | 28 StreamController controller; |
29 List current; | 29 List current; |
30 int dataCount = 0; | 30 int dataCount = 0; |
31 | 31 |
32 /// Called for each data from a subscription in [subscriptions]. | 32 /// Called for each data from a subscription in [subscriptions]. |
33 void handleData(int index, data) { | 33 void handleData(int index, data) { |
34 current[index] = data; | 34 current[index] = data; |
35 dataCount++; | 35 dataCount++; |
36 if (dataCount == subscriptions.length) { | 36 if (dataCount == subscriptions.length) { |
37 List data = current; | 37 List data = current; |
38 current = new List(subscriptions.length); | 38 current = new List(subscriptions.length); |
39 dataCount = 0; | 39 dataCount = 0; |
40 for (int i = 0; i < subscriptions.length; i++) { | 40 for (int i = 0; i < subscriptions.length; i++) { |
41 if (i != index) subscriptions[i].resume(); | 41 if (i != index) subscriptions[i].resume(); |
42 } | 42 } |
43 controller.add(data); | 43 controller.add(data); |
44 } else { | 44 } else { |
45 subscriptions[index].pause(); | 45 subscriptions[index].pause(); |
46 } | 46 } |
47 } | 47 } |
48 | 48 |
49 /// Called for each error from a subscription in [subscriptons]. | 49 /// Called for each error from a subscription in [subscriptons]. |
50 /// Except if [cancelOnError] is true, in which case the function below | 50 /// Except if [cancelOnError] is true, in which case the function below |
51 /// is used instead. | 51 /// is used instead. |
52 void handleError(Object error) { | 52 void handleError(Object error, [StackTrace stackTrace]) { |
Lasse Reichstein Nielsen
2013/10/04 08:45:17
Make not optional.
floitsch
2013/10/05 18:11:48
Done.
Are you sure? The closure will be given to t
Lasse Reichstein Nielsen
2013/10/07 11:55:48
Ack. Not optional yet then.
| |
53 controller.addError(error); | 53 controller.addError(error, stackTrace); |
54 } | 54 } |
55 | 55 |
56 /// Called when a subscription has an error and [cancelOnError] is true. | 56 /// Called when a subscription has an error and [cancelOnError] is true. |
57 /// | 57 /// |
58 /// Prematurely cancels all subscriptions since we know that we won't | 58 /// Prematurely cancels all subscriptions since we know that we won't |
59 /// be needing any more values. | 59 /// be needing any more values. |
60 void handleErrorCancel(Object error) { | 60 void handleErrorCancel(Object error, [StackTrace stackTrace]) { |
Lasse Reichstein Nielsen
2013/10/04 08:45:17
Make not optional.
floitsch
2013/10/05 18:11:48
Done.
ditto.
Lasse Reichstein Nielsen
2013/10/07 11:55:48
Not here either.
| |
61 for (int i = 0; i < subscriptions.length; i++) { | 61 for (int i = 0; i < subscriptions.length; i++) { |
62 subscriptions[i].cancel(); | 62 subscriptions[i].cancel(); |
63 } | 63 } |
64 controller.addError(error); | 64 controller.addError(error, stackTrace); |
65 } | 65 } |
66 | 66 |
67 void handleDone() { | 67 void handleDone() { |
68 for (int i = 0; i < subscriptions.length; i++) { | 68 for (int i = 0; i < subscriptions.length; i++) { |
69 subscriptions[i].cancel(); | 69 subscriptions[i].cancel(); |
70 } | 70 } |
71 controller.close(); | 71 controller.close(); |
72 } | 72 } |
73 | 73 |
74 try { | 74 try { |
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
113 | 113 |
114 if (subscriptions.isEmpty) { | 114 if (subscriptions.isEmpty) { |
115 controller.close(); | 115 controller.close(); |
116 } | 116 } |
117 return controller.stream.listen(onData, | 117 return controller.stream.listen(onData, |
118 onError: onError, | 118 onError: onError, |
119 onDone: onDone, | 119 onDone: onDone, |
120 cancelOnError: cancelOnError); | 120 cancelOnError: cancelOnError); |
121 } | 121 } |
122 } | 122 } |
OLD | NEW |