| OLD | NEW |
| (Empty) |
| 1 // Copyright 2014 Google Inc. All Rights Reserved. | |
| 2 // | |
| 3 // Licensed under the Apache License, Version 2.0 (the "License"); | |
| 4 // you may not use this file except in compliance with the License. | |
| 5 // You may obtain a copy of the License at | |
| 6 // | |
| 7 // http://www.apache.org/licenses/LICENSE-2.0 | |
| 8 // | |
| 9 // Unless required by applicable law or agreed to in writing, software | |
| 10 // distributed under the License is distributed on an "AS IS" BASIS, | |
| 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| 12 // See the License for the specific language governing permissions and | |
| 13 // limitations under the License. | |
| 14 | |
| 15 part of quiver.streams; | |
| 16 | |
| 17 /** | |
| 18 * Returns the concatentation of the input streams. | |
| 19 * | |
| 20 * When the returned stream is listened to, the [streams] are iterated through | |
| 21 * asynchronously, forwarding all events (both data and error) for the current | |
| 22 * stream to the returned stream before advancing the iterator and listening to | |
| 23 * the next stream. If advancing the iterator throws an error, the returned | |
| 24 * stream ends immediately with that error. | |
| 25 * | |
| 26 * Pausing and resuming the returned stream's subscriptions will pause and | |
| 27 * resume the subscription of the current stream being listened to. | |
| 28 * | |
| 29 * Note: Events from pre-existing broadcast streams which occur before | |
| 30 * the stream is reached by the iteration will be dropped. | |
| 31 * | |
| 32 * Example: | |
| 33 * | |
| 34 * concat(files.map((file) => | |
| 35 * file.openRead().transform(const LineSplitter()))) | |
| 36 * | |
| 37 */ | |
| 38 Stream concat(Iterable<Stream> streams) => new _ConcatStream(streams); | |
| 39 | |
| 40 class _ConcatStream extends Stream { | |
| 41 final Iterable<Stream> _streams; | |
| 42 | |
| 43 _ConcatStream(Iterable<Stream> streams) : _streams = streams; | |
| 44 | |
| 45 StreamSubscription listen(void onData(var data), | |
| 46 {Function onError, void onDone(), bool cancelOnError}) { | |
| 47 cancelOnError = true == cancelOnError; | |
| 48 StreamSubscription currentSubscription; | |
| 49 StreamController controller; | |
| 50 Iterator iterator = _streams.iterator; | |
| 51 | |
| 52 void nextStream() { | |
| 53 bool hasNext; | |
| 54 try { | |
| 55 hasNext = iterator.moveNext(); | |
| 56 } catch (e, s) { | |
| 57 controller | |
| 58 ..addError(e, s) | |
| 59 ..close(); | |
| 60 return; | |
| 61 } | |
| 62 if (hasNext) { | |
| 63 currentSubscription = iterator.current.listen(controller.add, | |
| 64 onError: controller.addError, | |
| 65 onDone: nextStream, | |
| 66 cancelOnError: cancelOnError); | |
| 67 } else { | |
| 68 controller.close(); | |
| 69 } | |
| 70 } | |
| 71 | |
| 72 controller = new StreamController(onPause: () { | |
| 73 if (currentSubscription != null) currentSubscription.pause(); | |
| 74 }, onResume: () { | |
| 75 if (currentSubscription != null) currentSubscription.resume(); | |
| 76 }, onCancel: () { | |
| 77 if (currentSubscription != null) return currentSubscription.cancel(); | |
| 78 }); | |
| 79 | |
| 80 nextStream(); | |
| 81 | |
| 82 return controller.stream.listen(onData, | |
| 83 onError: onError, onDone: onDone, cancelOnError: cancelOnError); | |
| 84 } | |
| 85 } | |
| OLD | NEW |