Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(618)

Side by Side Diff: packages/quiver/lib/src/streams/concat.dart

Issue 2989763002: Update charted to 0.4.8 and roll (Closed)
Patch Set: Removed Cutch from list of reviewers Created 3 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2014 Google Inc. All Rights Reserved.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 part of quiver.streams;
16
17 /**
18 * Returns the concatentation of the input streams.
19 *
20 * When the returned stream is listened to, the [streams] are iterated through
21 * asynchronously, forwarding all events (both data and error) for the current
22 * stream to the returned stream before advancing the iterator and listening to
23 * the next stream. If advancing the iterator throws an error, the returned
24 * stream ends immediately with that error.
25 *
26 * Pausing and resuming the returned stream's subscriptions will pause and
27 * resume the subscription of the current stream being listened to.
28 *
29 * Note: Events from pre-existing broadcast streams which occur before
30 * the stream is reached by the iteration will be dropped.
31 *
32 * Example:
33 *
34 * concat(files.map((file) =>
35 * file.openRead().transform(const LineSplitter())))
36 *
37 */
38 Stream concat(Iterable<Stream> streams) => new _ConcatStream(streams);
39
40 class _ConcatStream extends Stream {
41 final Iterable<Stream> _streams;
42
43 _ConcatStream(Iterable<Stream> streams) : _streams = streams;
44
45 StreamSubscription listen(void onData(var data),
46 {Function onError, void onDone(), bool cancelOnError}) {
47 cancelOnError = true == cancelOnError;
48 StreamSubscription currentSubscription;
49 StreamController controller;
50 Iterator iterator = _streams.iterator;
51
52 void nextStream() {
53 bool hasNext;
54 try {
55 hasNext = iterator.moveNext();
56 } catch (e, s) {
57 controller
58 ..addError(e, s)
59 ..close();
60 return;
61 }
62 if (hasNext) {
63 currentSubscription = iterator.current.listen(controller.add,
64 onError: controller.addError,
65 onDone: nextStream,
66 cancelOnError: cancelOnError);
67 } else {
68 controller.close();
69 }
70 }
71
72 controller = new StreamController(onPause: () {
73 if (currentSubscription != null) currentSubscription.pause();
74 }, onResume: () {
75 if (currentSubscription != null) currentSubscription.resume();
76 }, onCancel: () {
77 if (currentSubscription != null) return currentSubscription.cancel();
78 });
79
80 nextStream();
81
82 return controller.stream.listen(onData,
83 onError: onError, onDone: onDone, cancelOnError: cancelOnError);
84 }
85 }
OLDNEW
« no previous file with comments | « packages/quiver/lib/src/streams/collect.dart ('k') | packages/quiver/lib/src/streams/enumerate.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698