OLD | NEW |
| (Empty) |
1 // Copyright 2014 Google Inc. All Rights Reserved. | |
2 // | |
3 // Licensed under the Apache License, Version 2.0 (the "License"); | |
4 // you may not use this file except in compliance with the License. | |
5 // You may obtain a copy of the License at | |
6 // | |
7 // http://www.apache.org/licenses/LICENSE-2.0 | |
8 // | |
9 // Unless required by applicable law or agreed to in writing, software | |
10 // distributed under the License is distributed on an "AS IS" BASIS, | |
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
12 // See the License for the specific language governing permissions and | |
13 // limitations under the License. | |
14 | |
15 part of quiver.streams; | |
16 | |
17 /** | |
18 * Returns the concatentation of the input streams. | |
19 * | |
20 * When the returned stream is listened to, the [streams] are iterated through | |
21 * asynchronously, forwarding all events (both data and error) for the current | |
22 * stream to the returned stream before advancing the iterator and listening to | |
23 * the next stream. If advancing the iterator throws an error, the returned | |
24 * stream ends immediately with that error. | |
25 * | |
26 * Pausing and resuming the returned stream's subscriptions will pause and | |
27 * resume the subscription of the current stream being listened to. | |
28 * | |
29 * Note: Events from pre-existing broadcast streams which occur before | |
30 * the stream is reached by the iteration will be dropped. | |
31 * | |
32 * Example: | |
33 * | |
34 * concat(files.map((file) => | |
35 * file.openRead().transform(const LineSplitter()))) | |
36 * | |
37 */ | |
38 Stream concat(Iterable<Stream> streams) => new _ConcatStream(streams); | |
39 | |
40 class _ConcatStream extends Stream { | |
41 final Iterable<Stream> _streams; | |
42 | |
43 _ConcatStream(Iterable<Stream> streams) : _streams = streams; | |
44 | |
45 StreamSubscription listen(void onData(var data), | |
46 {Function onError, void onDone(), bool cancelOnError}) { | |
47 cancelOnError = true == cancelOnError; | |
48 StreamSubscription currentSubscription; | |
49 StreamController controller; | |
50 Iterator iterator = _streams.iterator; | |
51 | |
52 void nextStream() { | |
53 bool hasNext; | |
54 try { | |
55 hasNext = iterator.moveNext(); | |
56 } catch (e, s) { | |
57 controller | |
58 ..addError(e, s) | |
59 ..close(); | |
60 return; | |
61 } | |
62 if (hasNext) { | |
63 currentSubscription = iterator.current.listen(controller.add, | |
64 onError: controller.addError, | |
65 onDone: nextStream, | |
66 cancelOnError: cancelOnError); | |
67 } else { | |
68 controller.close(); | |
69 } | |
70 } | |
71 | |
72 controller = new StreamController(onPause: () { | |
73 if (currentSubscription != null) currentSubscription.pause(); | |
74 }, onResume: () { | |
75 if (currentSubscription != null) currentSubscription.resume(); | |
76 }, onCancel: () { | |
77 if (currentSubscription != null) return currentSubscription.cancel(); | |
78 }); | |
79 | |
80 nextStream(); | |
81 | |
82 return controller.stream.listen(onData, | |
83 onError: onError, onDone: onDone, cancelOnError: cancelOnError); | |
84 } | |
85 } | |
OLD | NEW |