Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(184)

Side by Side Diff: packages/async/lib/src/stream_splitter.dart

Issue 2989763002: Update charted to 0.4.8 and roll (Closed)
Patch Set: Removed Cutch from list of reviewers Created 3 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 library async.stream_splitter; 5 import 'dart:async';
6 6
7 import 'dart:async';
8 import 'dart:collection';
9
10 import '../result.dart';
11 import 'future_group.dart'; 7 import 'future_group.dart';
8 import 'result.dart';
12 9
13 /// A class that splits a single source stream into an arbitrary number of 10 /// A class that splits a single source stream into an arbitrary number of
14 /// (single-subscription) streams (called "branch") that emit the same events. 11 /// (single-subscription) streams (called "branch") that emit the same events.
15 /// 12 ///
16 /// Each branch will emit all the same values and errors as the source stream, 13 /// Each branch will emit all the same values and errors as the source stream,
17 /// regardless of which values have been emitted on other branches. This means 14 /// regardless of which values have been emitted on other branches. This means
18 /// that the splitter stores every event that has been emitted so far, which may 15 /// that the splitter stores every event that has been emitted so far, which may
19 /// consume a lot of memory. The user can call [close] to indicate that no more 16 /// consume a lot of memory. The user can call [close] to indicate that no more
20 /// branches will be created, and this memory will be released. 17 /// branches will be created, and this memory will be released.
21 /// 18 ///
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after
53 /// Whether [_stream] is done emitting events. 50 /// Whether [_stream] is done emitting events.
54 var _isDone = false; 51 var _isDone = false;
55 52
56 /// Whether [close] has been called. 53 /// Whether [close] has been called.
57 var _isClosed = false; 54 var _isClosed = false;
58 55
59 /// Splits [stream] into [count] identical streams. 56 /// Splits [stream] into [count] identical streams.
60 /// 57 ///
61 /// [count] defaults to 2. This is the same as creating [count] branches and 58 /// [count] defaults to 2. This is the same as creating [count] branches and
62 /// then closing the [StreamSplitter]. 59 /// then closing the [StreamSplitter].
63 static List<Stream> splitFrom(Stream stream, [int count]) { 60 static List<Stream<T>> splitFrom<T>(Stream<T> stream, [int count]) {
64 if (count == null) count = 2; 61 if (count == null) count = 2;
65 var splitter = new StreamSplitter(stream); 62 var splitter = new StreamSplitter<T>(stream);
66 var streams = new List.generate(count, (_) => splitter.split()); 63 var streams = new List<Stream>.generate(count, (_) => splitter.split());
67 splitter.close(); 64 splitter.close();
68 return streams; 65 return streams;
69 } 66 }
70 67
71 StreamSplitter(this._stream); 68 StreamSplitter(this._stream);
72 69
73 /// Returns a single-subscription stream that's a copy of the input stream. 70 /// Returns a single-subscription stream that's a copy of the input stream.
74 /// 71 ///
75 /// This will throw a [StateError] if [close] has been called. 72 /// This will throw a [StateError] if [close] has been called.
76 Stream<T> split() { 73 Stream<T> split() {
77 if (_isClosed) { 74 if (_isClosed) {
78 throw new StateError("Can't call split() on a closed StreamSplitter."); 75 throw new StateError("Can't call split() on a closed StreamSplitter.");
79 } 76 }
80 77
81 var controller; 78 var controller = new StreamController<T>(
82 controller = new StreamController<T>( 79 onListen: _onListen, onPause: _onPause, onResume: _onResume);
83 onListen: _onListen, 80 controller.onCancel = () => _onCancel(controller);
84 onPause: _onPause,
85 onResume: _onResume,
86 onCancel: () => _onCancel(controller));
87 81
88 for (var result in _buffer) { 82 for (var result in _buffer) {
89 result.addTo(controller); 83 result.addTo(controller);
90 } 84 }
91 85
92 if (_isDone) { 86 if (_isDone) {
93 _closeGroup.add(controller.close()); 87 _closeGroup.add(controller.close());
94 } else { 88 } else {
95 _controllers.add(controller); 89 _controllers.add(controller);
96 } 90 }
(...skipping 46 matching lines...) Expand 10 before | Expand all | Expand 10 after
143 /// subscription if we have. 137 /// subscription if we have.
144 void _onListen() { 138 void _onListen() {
145 if (_isDone) return; 139 if (_isDone) return;
146 140
147 if (_subscription != null) { 141 if (_subscription != null) {
148 // Resume the subscription in case it was paused, either because all the 142 // Resume the subscription in case it was paused, either because all the
149 // controllers were paused or because the last one was canceled. If it 143 // controllers were paused or because the last one was canceled. If it
150 // wasn't paused, this will be a no-op. 144 // wasn't paused, this will be a no-op.
151 _subscription.resume(); 145 _subscription.resume();
152 } else { 146 } else {
153 _subscription = _stream.listen( 147 _subscription =
154 _onData, onError: _onError, onDone: _onDone); 148 _stream.listen(_onData, onError: _onError, onDone: _onDone);
155 } 149 }
156 } 150 }
157 151
158 /// Pauses [_subscription] if every controller is paused. 152 /// Pauses [_subscription] if every controller is paused.
159 void _onPause() { 153 void _onPause() {
160 if (!_controllers.every((controller) => controller.isPaused)) return; 154 if (!_controllers.every((controller) => controller.isPaused)) return;
161 _subscription.pause(); 155 _subscription.pause();
162 } 156 }
163 157
164 /// Resumes [_subscription]. 158 /// Resumes [_subscription].
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after
204 } 198 }
205 199
206 /// Marks [_controllers] as done. 200 /// Marks [_controllers] as done.
207 void _onDone() { 201 void _onDone() {
208 _isDone = true; 202 _isDone = true;
209 for (var controller in _controllers) { 203 for (var controller in _controllers) {
210 _closeGroup.add(controller.close()); 204 _closeGroup.add(controller.close());
211 } 205 }
212 } 206 }
213 } 207 }
OLDNEW
« no previous file with comments | « packages/async/lib/src/stream_sink_transformer/typed.dart ('k') | packages/async/lib/src/stream_subscription_transformer.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698