Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(23)

Side by Side Diff: lib/src/stream_splitter.dart

Issue 2660333005: Change generic comment syntax to real generic syntax. (Closed)
Patch Set: Created 3 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 import 'dart:async'; 5 import 'dart:async';
6 6
7 import 'future_group.dart'; 7 import 'future_group.dart';
8 import 'result.dart'; 8 import 'result.dart';
9 9
10 /// A class that splits a single source stream into an arbitrary number of 10 /// A class that splits a single source stream into an arbitrary number of
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after
50 /// Whether [_stream] is done emitting events. 50 /// Whether [_stream] is done emitting events.
51 var _isDone = false; 51 var _isDone = false;
52 52
53 /// Whether [close] has been called. 53 /// Whether [close] has been called.
54 var _isClosed = false; 54 var _isClosed = false;
55 55
56 /// Splits [stream] into [count] identical streams. 56 /// Splits [stream] into [count] identical streams.
57 /// 57 ///
58 /// [count] defaults to 2. This is the same as creating [count] branches and 58 /// [count] defaults to 2. This is the same as creating [count] branches and
59 /// then closing the [StreamSplitter]. 59 /// then closing the [StreamSplitter].
60 static List<Stream/*<T>*/> splitFrom/*<T>*/(Stream/*<T>*/ stream, 60 static List<Stream<T>> splitFrom<T>(Stream<T> stream, [int count]) {
61 [int count]) {
62 if (count == null) count = 2; 61 if (count == null) count = 2;
63 var splitter = new StreamSplitter/*<T>*/(stream); 62 var splitter = new StreamSplitter<T>(stream);
64 var streams = new List<Stream>.generate(count, (_) => splitter.split()); 63 var streams = new List<Stream>.generate(count, (_) => splitter.split());
65 splitter.close(); 64 splitter.close();
66 return streams; 65 return streams;
67 } 66 }
68 67
69 StreamSplitter(this._stream); 68 StreamSplitter(this._stream);
70 69
71 /// Returns a single-subscription stream that's a copy of the input stream. 70 /// Returns a single-subscription stream that's a copy of the input stream.
72 /// 71 ///
73 /// This will throw a [StateError] if [close] has been called. 72 /// This will throw a [StateError] if [close] has been called.
74 Stream<T> split() { 73 Stream<T> split() {
75 if (_isClosed) { 74 if (_isClosed) {
76 throw new StateError("Can't call split() on a closed StreamSplitter."); 75 throw new StateError("Can't call split() on a closed StreamSplitter.");
77 } 76 }
78 77
79 var controller = new StreamController<T>( 78 var controller = new StreamController<T>(
80 onListen: _onListen, 79 onListen: _onListen, onPause: _onPause, onResume: _onResume);
81 onPause: _onPause,
82 onResume: _onResume);
83 controller.onCancel = () => _onCancel(controller); 80 controller.onCancel = () => _onCancel(controller);
84 81
85 for (var result in _buffer) { 82 for (var result in _buffer) {
86 result.addTo(controller); 83 result.addTo(controller);
87 } 84 }
88 85
89 if (_isDone) { 86 if (_isDone) {
90 _closeGroup.add(controller.close()); 87 _closeGroup.add(controller.close());
91 } else { 88 } else {
92 _controllers.add(controller); 89 _controllers.add(controller);
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after
140 /// subscription if we have. 137 /// subscription if we have.
141 void _onListen() { 138 void _onListen() {
142 if (_isDone) return; 139 if (_isDone) return;
143 140
144 if (_subscription != null) { 141 if (_subscription != null) {
145 // Resume the subscription in case it was paused, either because all the 142 // Resume the subscription in case it was paused, either because all the
146 // controllers were paused or because the last one was canceled. If it 143 // controllers were paused or because the last one was canceled. If it
147 // wasn't paused, this will be a no-op. 144 // wasn't paused, this will be a no-op.
148 _subscription.resume(); 145 _subscription.resume();
149 } else { 146 } else {
150 _subscription = _stream.listen( 147 _subscription =
151 _onData, onError: _onError, onDone: _onDone); 148 _stream.listen(_onData, onError: _onError, onDone: _onDone);
152 } 149 }
153 } 150 }
154 151
155 /// Pauses [_subscription] if every controller is paused. 152 /// Pauses [_subscription] if every controller is paused.
156 void _onPause() { 153 void _onPause() {
157 if (!_controllers.every((controller) => controller.isPaused)) return; 154 if (!_controllers.every((controller) => controller.isPaused)) return;
158 _subscription.pause(); 155 _subscription.pause();
159 } 156 }
160 157
161 /// Resumes [_subscription]. 158 /// Resumes [_subscription].
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after
201 } 198 }
202 199
203 /// Marks [_controllers] as done. 200 /// Marks [_controllers] as done.
204 void _onDone() { 201 void _onDone() {
205 _isDone = true; 202 _isDone = true;
206 for (var controller in _controllers) { 203 for (var controller in _controllers) {
207 _closeGroup.add(controller.close()); 204 _closeGroup.add(controller.close());
208 } 205 }
209 } 206 }
210 } 207 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698