Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(16)

Side by Side Diff: pkg/http/lib/src/utils.dart

Issue 11887016: Make StreamController's unnamed constructor create a single-sub stream. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Address comments. Created 7 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « pkg/http/lib/src/streamed_request.dart ('k') | pkg/http/test/mock_client_test.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 library utils; 5 library utils;
6 6
7 import 'dart:async'; 7 import 'dart:async';
8 import 'dart:crypto'; 8 import 'dart:crypto';
9 import 'dart:io'; 9 import 'dart:io';
10 import 'dart:scalarlist'; 10 import 'dart:scalarlist';
(...skipping 137 matching lines...) Expand 10 before | Expand all | Expand 10 after
148 pair.first.listen((_) {}, onError: (_) {}, onDone: onDone); 148 pair.first.listen((_) {}, onError: (_) {}, onDone: onDone);
149 return pair.last; 149 return pair.last;
150 } 150 }
151 151
152 // TODO(nweiz): remove this once issue 7785 is fixed. 152 // TODO(nweiz): remove this once issue 7785 is fixed.
153 /// Wraps [stream] in a single-subscription [ByteStream] that emits the same 153 /// Wraps [stream] in a single-subscription [ByteStream] that emits the same
154 /// data. 154 /// data.
155 ByteStream wrapInputStream(InputStream stream) { 155 ByteStream wrapInputStream(InputStream stream) {
156 if (stream.closed) return emptyStream; 156 if (stream.closed) return emptyStream;
157 157
158 var controller = new StreamController.singleSubscription(); 158 var controller = new StreamController();
159 stream.onClosed = controller.close; 159 stream.onClosed = controller.close;
160 stream.onData = () => controller.add(stream.read()); 160 stream.onData = () => controller.add(stream.read());
161 stream.onError = (e) => controller.signalError(new AsyncError(e)); 161 stream.onError = (e) => controller.signalError(new AsyncError(e));
162 return new ByteStream(controller); 162 return new ByteStream(controller);
163 } 163 }
164 164
165 // TODO(nweiz): remove this once issue 7785 is fixed. 165 // TODO(nweiz): remove this once issue 7785 is fixed.
166 /// Wraps [stream] in a [StreamConsumer] so that [Stream]s can by piped into it 166 /// Wraps [stream] in a [StreamConsumer] so that [Stream]s can by piped into it
167 /// using [Stream.pipe]. 167 /// using [Stream.pipe].
168 StreamConsumer<List<int>, dynamic> wrapOutputStream(OutputStream stream) => 168 StreamConsumer<List<int>, dynamic> wrapOutputStream(OutputStream stream) =>
(...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after
235 235
236 /// Returns a [Future] that asynchronously completes to `null`. 236 /// Returns a [Future] that asynchronously completes to `null`.
237 Future get async => new Future.immediate(null); 237 Future get async => new Future.immediate(null);
238 238
239 /// Returns a closed [Stream] with no elements. 239 /// Returns a closed [Stream] with no elements.
240 Stream get emptyStream => streamFromIterable([]); 240 Stream get emptyStream => streamFromIterable([]);
241 241
242 /// Creates a single-subscription stream that emits the items in [iter] and then 242 /// Creates a single-subscription stream that emits the items in [iter] and then
243 /// ends. 243 /// ends.
244 Stream streamFromIterable(Iterable iter) { 244 Stream streamFromIterable(Iterable iter) {
245 var stream = new StreamController.singleSubscription(); 245 var stream = new StreamController();
246 iter.forEach(stream.add); 246 iter.forEach(stream.add);
247 stream.close(); 247 stream.close();
248 return stream.stream; 248 return stream.stream;
249 } 249 }
250 250
251 // TODO(nweiz): remove this when issue 7787 is fixed. 251 // TODO(nweiz): remove this when issue 7787 is fixed.
252 /// Creates two single-subscription [Stream]s that each emit all values and 252 /// Creates two single-subscription [Stream]s that each emit all values and
253 /// errors from [stream]. This is useful if [stream] is single-subscription but 253 /// errors from [stream]. This is useful if [stream] is single-subscription but
254 /// multiple subscribers are necessary. 254 /// multiple subscribers are necessary.
255 Pair<Stream, Stream> tee(Stream stream) { 255 Pair<Stream, Stream> tee(Stream stream) {
256 var controller1 = new StreamController.singleSubscription(); 256 var controller1 = new StreamController();
257 var controller2 = new StreamController.singleSubscription(); 257 var controller2 = new StreamController();
258 stream.listen((value) { 258 stream.listen((value) {
259 controller1.add(value); 259 controller1.add(value);
260 controller2.add(value); 260 controller2.add(value);
261 }, onError: (error) { 261 }, onError: (error) {
262 controller1.signalError(error); 262 controller1.signalError(error);
263 controller2.signalError(error); 263 controller2.signalError(error);
264 }, onDone: () { 264 }, onDone: () {
265 controller1.close(); 265 controller1.close();
266 controller2.close(); 266 controller2.close();
267 }); 267 });
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after
302 /// The return values of all [Future]s are discarded. Any errors will cause the 302 /// The return values of all [Future]s are discarded. Any errors will cause the
303 /// iteration to stop and will be piped through the return value. 303 /// iteration to stop and will be piped through the return value.
304 Future forEachFuture(Iterable input, Future fn(element)) { 304 Future forEachFuture(Iterable input, Future fn(element)) {
305 var iterator = input.iterator; 305 var iterator = input.iterator;
306 Future nextElement(_) { 306 Future nextElement(_) {
307 if (!iterator.moveNext()) return new Future.immediate(null); 307 if (!iterator.moveNext()) return new Future.immediate(null);
308 return fn(iterator.current).then(nextElement); 308 return fn(iterator.current).then(nextElement);
309 } 309 }
310 return nextElement(null); 310 return nextElement(null);
311 } 311 }
OLDNEW
« no previous file with comments | « pkg/http/lib/src/streamed_request.dart ('k') | pkg/http/test/mock_client_test.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698