Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(57)

Side by Side Diff: lib/stream_zip.dart

Issue 1777453002: Modernize the package's style. (Closed) Base URL: git@github.com:dart-lang/async.git@master
Patch Set: Created 4 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 /** 5 /// Import `async.dart` instead.
6 * Help for combining multiple streams into a single stream. 6 @Deprecated("Will be removed in async 2.0.0.")
7 */
8 library dart.pkg.async.stream_zip; 7 library dart.pkg.async.stream_zip;
9 8
10 import "dart:async"; 9 export "src/stream_zip.dart";
11
12 /**
13 * A stream that combines the values of other streams.
14 */
15 class StreamZip extends Stream<List> {
16 final Iterable<Stream> _streams;
17 StreamZip(Iterable<Stream> streams) : _streams = streams;
18
19 StreamSubscription<List> listen(void onData(List data), {
20 Function onError,
21 void onDone(),
22 bool cancelOnError}) {
23 cancelOnError = identical(true, cancelOnError);
24 List<StreamSubscription> subscriptions = <StreamSubscription>[];
25 StreamController controller;
26 List current;
27 int dataCount = 0;
28
29 /// Called for each data from a subscription in [subscriptions].
30 void handleData(int index, data) {
31 current[index] = data;
32 dataCount++;
33 if (dataCount == subscriptions.length) {
34 List data = current;
35 current = new List(subscriptions.length);
36 dataCount = 0;
37 for (int i = 0; i < subscriptions.length; i++) {
38 if (i != index) subscriptions[i].resume();
39 }
40 controller.add(data);
41 } else {
42 subscriptions[index].pause();
43 }
44 }
45
46 /// Called for each error from a subscription in [subscriptions].
47 /// Except if [cancelOnError] is true, in which case the function below
48 /// is used instead.
49 void handleError(Object error, StackTrace stackTrace) {
50 controller.addError(error, stackTrace);
51 }
52
53 /// Called when a subscription has an error and [cancelOnError] is true.
54 ///
55 /// Prematurely cancels all subscriptions since we know that we won't
56 /// be needing any more values.
57 void handleErrorCancel(Object error, StackTrace stackTrace) {
58 for (int i = 0; i < subscriptions.length; i++) {
59 subscriptions[i].cancel();
60 }
61 controller.addError(error, stackTrace);
62 }
63
64 void handleDone() {
65 for (int i = 0; i < subscriptions.length; i++) {
66 subscriptions[i].cancel();
67 }
68 controller.close();
69 }
70
71 try {
72 for (Stream stream in _streams) {
73 int index = subscriptions.length;
74 subscriptions.add(stream.listen(
75 (data) { handleData(index, data); },
76 onError: cancelOnError ? handleError : handleErrorCancel,
77 onDone: handleDone,
78 cancelOnError: cancelOnError));
79 }
80 } catch (e) {
81 for (int i = subscriptions.length - 1; i >= 0; i--) {
82 subscriptions[i].cancel();
83 }
84 rethrow;
85 }
86
87 current = new List(subscriptions.length);
88
89 controller = new StreamController<List>(
90 onPause: () {
91 for (int i = 0; i < subscriptions.length; i++) {
92 // This may pause some subscriptions more than once.
93 // These will not be resumed by onResume below, but must wait for the
94 // next round.
95 subscriptions[i].pause();
96 }
97 },
98 onResume: () {
99 for (int i = 0; i < subscriptions.length; i++) {
100 subscriptions[i].resume();
101 }
102 },
103 onCancel: () {
104 for (int i = 0; i < subscriptions.length; i++) {
105 // Canceling more than once is safe.
106 subscriptions[i].cancel();
107 }
108 }
109 );
110
111 if (subscriptions.isEmpty) {
112 controller.close();
113 }
114 return controller.stream.listen(onData,
115 onError: onError,
116 onDone: onDone,
117 cancelOnError: cancelOnError);
118 }
119 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698