Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(75)

Side by Side Diff: pkg/sequence_zip/lib/stream_zip.dart

Issue 113883002: Create associated packages for the dart:collection and dart:async libs. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Update SDK dependency to 1.0.0 Created 7 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « pkg/sequence_zip/lib/sequence_zip.dart ('k') | pkg/sequence_zip/pubspec.yaml » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 /** 5 /** Please use `package:async/stream_zip.dart` instead. */
6 * Help for combining multiple streams into a single stream. 6 @deprecated
7 *
8 * This API is also available as part of the
9 * [sequence_zip](#sequence_zip) library.
10 */
11 library stream_zip; 7 library stream_zip;
12 8
13 import "dart:async"; 9 export "package:async/stream_zip.dart";
14
15 /**
16 * A stream that combines the values of other streams.
17 */
18 class StreamZip extends Stream<List> {
19 final Iterable<Stream> _streams;
20 StreamZip(Iterable<Stream> streams) : _streams = streams;
21
22 StreamSubscription<List> listen(void onData(List data), {
23 Function onError,
24 void onDone(),
25 bool cancelOnError}) {
26 cancelOnError = identical(true, cancelOnError);
27 List<StreamSubscription> subscriptions = <StreamSubscription>[];
28 StreamController controller;
29 List current;
30 int dataCount = 0;
31
32 /// Called for each data from a subscription in [subscriptions].
33 void handleData(int index, data) {
34 current[index] = data;
35 dataCount++;
36 if (dataCount == subscriptions.length) {
37 List data = current;
38 current = new List(subscriptions.length);
39 dataCount = 0;
40 for (int i = 0; i < subscriptions.length; i++) {
41 if (i != index) subscriptions[i].resume();
42 }
43 controller.add(data);
44 } else {
45 subscriptions[index].pause();
46 }
47 }
48
49 /// Called for each error from a subscription in [subscriptions].
50 /// Except if [cancelOnError] is true, in which case the function below
51 /// is used instead.
52 void handleError(Object error, StackTrace stackTrace) {
53 controller.addError(error, stackTrace);
54 }
55
56 /// Called when a subscription has an error and [cancelOnError] is true.
57 ///
58 /// Prematurely cancels all subscriptions since we know that we won't
59 /// be needing any more values.
60 void handleErrorCancel(Object error, StackTrace stackTrace) {
61 for (int i = 0; i < subscriptions.length; i++) {
62 subscriptions[i].cancel();
63 }
64 controller.addError(error, stackTrace);
65 }
66
67 void handleDone() {
68 for (int i = 0; i < subscriptions.length; i++) {
69 subscriptions[i].cancel();
70 }
71 controller.close();
72 }
73
74 try {
75 for (Stream stream in _streams) {
76 int index = subscriptions.length;
77 subscriptions.add(stream.listen(
78 (data) { handleData(index, data); },
79 onError: cancelOnError ? handleError : handleErrorCancel,
80 onDone: handleDone,
81 cancelOnError: cancelOnError));
82 }
83 } catch (e) {
84 for (int i = subscriptions.length - 1; i >= 0; i--) {
85 subscriptions[i].cancel();
86 }
87 rethrow;
88 }
89
90 current = new List(subscriptions.length);
91
92 controller = new StreamController<List>(
93 onPause: () {
94 for (int i = 0; i < subscriptions.length; i++) {
95 // This may pause some subscriptions more than once.
96 // These will not be resumed by onResume below, but must wait for the
97 // next round.
98 subscriptions[i].pause();
99 }
100 },
101 onResume: () {
102 for (int i = 0; i < subscriptions.length; i++) {
103 subscriptions[i].resume();
104 }
105 },
106 onCancel: () {
107 for (int i = 0; i < subscriptions.length; i++) {
108 // Canceling more than once is safe.
109 subscriptions[i].cancel();
110 }
111 }
112 );
113
114 if (subscriptions.isEmpty) {
115 controller.close();
116 }
117 return controller.stream.listen(onData,
118 onError: onError,
119 onDone: onDone,
120 cancelOnError: cancelOnError);
121 }
122 }
OLDNEW
« no previous file with comments | « pkg/sequence_zip/lib/sequence_zip.dart ('k') | pkg/sequence_zip/pubspec.yaml » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698