Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(75)

Side by Side Diff: sdk/lib/_internal/pub/lib/src/utils.dart

Issue 25094002: Adapt streams for additional stackTrace argument. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Remove types in closures. Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 /// Generic utility functions. Stuff that should possibly be in core. 5 /// Generic utility functions. Stuff that should possibly be in core.
6 library pub.utils; 6 library pub.utils;
7 7
8 import 'dart:async'; 8 import 'dart:async';
9 import "dart:collection"; 9 import "dart:collection";
10 import "dart:convert"; 10 import "dart:convert";
(...skipping 285 matching lines...) Expand 10 before | Expand all | Expand 10 after
296 /// emitted, the returned Future completes to that error. 296 /// emitted, the returned Future completes to that error.
297 Future<Stream> validateStream(Stream stream) { 297 Future<Stream> validateStream(Stream stream) {
298 var completer = new Completer<Stream>(); 298 var completer = new Completer<Stream>();
299 var controller = new StreamController(sync: true); 299 var controller = new StreamController(sync: true);
300 300
301 StreamSubscription subscription; 301 StreamSubscription subscription;
302 subscription = stream.listen((value) { 302 subscription = stream.listen((value) {
303 // We got a value, so the stream is valid. 303 // We got a value, so the stream is valid.
304 if (!completer.isCompleted) completer.complete(controller.stream); 304 if (!completer.isCompleted) completer.complete(controller.stream);
305 controller.add(value); 305 controller.add(value);
306 }, onError: (error) { 306 }, onError: (error, [stackTrace]) {
307 // If the error came after values, it's OK. 307 // If the error came after values, it's OK.
308 if (completer.isCompleted) { 308 if (completer.isCompleted) {
309 controller.addError(error); 309 controller.addError(error, stackTrace);
310 return; 310 return;
311 } 311 }
312 312
313 // Otherwise, the error came first and the stream is invalid. 313 // Otherwise, the error came first and the stream is invalid.
314 completer.completeError(error); 314 completer.completeError(error, stackTrace);
315 315
316 // We don't be returning the stream at all in this case, so unsubscribe 316 // We don't be returning the stream at all in this case, so unsubscribe
317 // and swallow the error. 317 // and swallow the error.
318 subscription.cancel(); 318 subscription.cancel();
319 }, onDone: () { 319 }, onDone: () {
320 // It closed with no errors, so the stream is valid. 320 // It closed with no errors, so the stream is valid.
321 if (!completer.isCompleted) completer.complete(controller.stream); 321 if (!completer.isCompleted) completer.complete(controller.stream);
322 controller.close(); 322 controller.close();
323 }); 323 });
324 324
325 return completer.future; 325 return completer.future;
326 } 326 }
327 327
328 // TODO(nweiz): remove this when issue 7964 is fixed. 328 // TODO(nweiz): remove this when issue 7964 is fixed.
329 /// Returns a [Future] that will complete to the first element of [stream]. 329 /// Returns a [Future] that will complete to the first element of [stream].
330 /// Unlike [Stream.first], this is safe to use with single-subscription streams. 330 /// Unlike [Stream.first], this is safe to use with single-subscription streams.
331 Future streamFirst(Stream stream) { 331 Future streamFirst(Stream stream) {
332 var completer = new Completer(); 332 var completer = new Completer();
333 var subscription; 333 var subscription;
334 subscription = stream.listen((value) { 334 subscription = stream.listen((value) {
335 subscription.cancel(); 335 subscription.cancel();
336 completer.complete(value); 336 completer.complete(value);
337 }, onError: (e) { 337 }, onError: (e, [stackTrace]) {
338 completer.completeError(e); 338 completer.completeError(e, stackTrace);
339 }, onDone: () { 339 }, onDone: () {
340 completer.completeError(new StateError("No elements")); 340 completer.completeError(new StateError("No elements"));
341 }, cancelOnError: true); 341 }, cancelOnError: true);
342 return completer.future; 342 return completer.future;
343 } 343 }
344 344
345 /// Returns a wrapped version of [stream] along with a [StreamSubscription] that 345 /// Returns a wrapped version of [stream] along with a [StreamSubscription] that
346 /// can be used to control the wrapped stream. 346 /// can be used to control the wrapped stream.
347 Pair<Stream, StreamSubscription> streamWithSubscription(Stream stream) { 347 Pair<Stream, StreamSubscription> streamWithSubscription(Stream stream) {
348 var controller = 348 var controller =
349 stream.isBroadcast ? new StreamController.broadcast(sync: true) 349 stream.isBroadcast ? new StreamController.broadcast(sync: true)
350 : new StreamController(sync: true); 350 : new StreamController(sync: true);
351 var subscription = stream.listen(controller.add, 351 var subscription = stream.listen(controller.add,
352 onError: controller.addError, 352 onError: controller.addError,
353 onDone: controller.close); 353 onDone: controller.close);
354 return new Pair<Stream, StreamSubscription>(controller.stream, subscription); 354 return new Pair<Stream, StreamSubscription>(controller.stream, subscription);
355 } 355 }
356 356
357 // TODO(nweiz): remove this when issue 7787 is fixed. 357 // TODO(nweiz): remove this when issue 7787 is fixed.
358 /// Creates two single-subscription [Stream]s that each emit all values and 358 /// Creates two single-subscription [Stream]s that each emit all values and
359 /// errors from [stream]. This is useful if [stream] is single-subscription but 359 /// errors from [stream]. This is useful if [stream] is single-subscription but
360 /// multiple subscribers are necessary. 360 /// multiple subscribers are necessary.
361 Pair<Stream, Stream> tee(Stream stream) { 361 Pair<Stream, Stream> tee(Stream stream) {
362 var controller1 = new StreamController(sync: true); 362 var controller1 = new StreamController(sync: true);
363 var controller2 = new StreamController(sync: true); 363 var controller2 = new StreamController(sync: true);
364 stream.listen((value) { 364 stream.listen((value) {
365 controller1.add(value); 365 controller1.add(value);
366 controller2.add(value); 366 controller2.add(value);
367 }, onError: (error) { 367 }, onError: (error, [stackTrace]) {
368 controller1.addError(error); 368 controller1.addError(error, stackTrace);
369 controller2.addError(error); 369 controller2.addError(error, stackTrace);
370 }, onDone: () { 370 }, onDone: () {
371 controller1.close(); 371 controller1.close();
372 controller2.close(); 372 controller2.close();
373 }); 373 });
374 return new Pair<Stream, Stream>(controller1.stream, controller2.stream); 374 return new Pair<Stream, Stream>(controller1.stream, controller2.stream);
375 } 375 }
376 376
377 /// Merges [stream1] and [stream2] into a single stream that emits events from 377 /// Merges [stream1] and [stream2] into a single stream that emits events from
378 /// both sources. 378 /// both sources.
379 Stream mergeStreams(Stream stream1, Stream stream2) { 379 Stream mergeStreams(Stream stream1, Stream stream2) {
380 var doneCount = 0; 380 var doneCount = 0;
381 var controller = new StreamController(sync: true); 381 var controller = new StreamController(sync: true);
382 382
383 for (var stream in [stream1, stream2]) { 383 for (var stream in [stream1, stream2]) {
384 stream.listen((value) { 384 stream.listen(
385 controller.add(value); 385 controller.add,
386 }, onError: (error) { 386 onError: controller.addError,
387 controller.addError(error); 387 onDone: () {
388 }, onDone: () {
389 doneCount++; 388 doneCount++;
390 if (doneCount == 2) controller.close(); 389 if (doneCount == 2) controller.close();
391 }); 390 });
392 } 391 }
393 392
394 return controller.stream; 393 return controller.stream;
395 } 394 }
396 395
397 /// A regular expression matching a trailing CR character. 396 /// A regular expression matching a trailing CR character.
398 final _trailingCR = new RegExp(r"\r$"); 397 final _trailingCR = new RegExp(r"\r$");
(...skipping 336 matching lines...) Expand 10 before | Expand all | Expand 10 after
735 error is DirectoryException || 734 error is DirectoryException ||
736 error is FileException || 735 error is FileException ||
737 error is HttpException || 736 error is HttpException ||
738 error is HttpException || 737 error is HttpException ||
739 error is LinkException || 738 error is LinkException ||
740 error is OSError || 739 error is OSError ||
741 error is ProcessException || 740 error is ProcessException ||
742 error is SocketException || 741 error is SocketException ||
743 error is WebSocketException; 742 error is WebSocketException;
744 } 743 }
OLDNEW
« no previous file with comments | « sdk/lib/_internal/pub/lib/src/safe_http_server.dart ('k') | sdk/lib/async/broadcast_stream_controller.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698