OLD | NEW |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 /// Generic utility functions. Stuff that should possibly be in core. | 5 /// Generic utility functions. Stuff that should possibly be in core. |
6 library pub.utils; | 6 library pub.utils; |
7 | 7 |
8 import 'dart:async'; | 8 import 'dart:async'; |
9 import "dart:collection"; | 9 import "dart:collection"; |
10 import "dart:convert"; | 10 import "dart:convert"; |
(...skipping 285 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
296 /// emitted, the returned Future completes to that error. | 296 /// emitted, the returned Future completes to that error. |
297 Future<Stream> validateStream(Stream stream) { | 297 Future<Stream> validateStream(Stream stream) { |
298 var completer = new Completer<Stream>(); | 298 var completer = new Completer<Stream>(); |
299 var controller = new StreamController(sync: true); | 299 var controller = new StreamController(sync: true); |
300 | 300 |
301 StreamSubscription subscription; | 301 StreamSubscription subscription; |
302 subscription = stream.listen((value) { | 302 subscription = stream.listen((value) { |
303 // We got a value, so the stream is valid. | 303 // We got a value, so the stream is valid. |
304 if (!completer.isCompleted) completer.complete(controller.stream); | 304 if (!completer.isCompleted) completer.complete(controller.stream); |
305 controller.add(value); | 305 controller.add(value); |
306 }, onError: (error) { | 306 }, onError: (error, [stackTrace]) { |
307 // If the error came after values, it's OK. | 307 // If the error came after values, it's OK. |
308 if (completer.isCompleted) { | 308 if (completer.isCompleted) { |
309 controller.addError(error); | 309 controller.addError(error, stackTrace); |
310 return; | 310 return; |
311 } | 311 } |
312 | 312 |
313 // Otherwise, the error came first and the stream is invalid. | 313 // Otherwise, the error came first and the stream is invalid. |
314 completer.completeError(error); | 314 completer.completeError(error, stackTrace); |
315 | 315 |
316 // We don't be returning the stream at all in this case, so unsubscribe | 316 // We don't be returning the stream at all in this case, so unsubscribe |
317 // and swallow the error. | 317 // and swallow the error. |
318 subscription.cancel(); | 318 subscription.cancel(); |
319 }, onDone: () { | 319 }, onDone: () { |
320 // It closed with no errors, so the stream is valid. | 320 // It closed with no errors, so the stream is valid. |
321 if (!completer.isCompleted) completer.complete(controller.stream); | 321 if (!completer.isCompleted) completer.complete(controller.stream); |
322 controller.close(); | 322 controller.close(); |
323 }); | 323 }); |
324 | 324 |
325 return completer.future; | 325 return completer.future; |
326 } | 326 } |
327 | 327 |
328 // TODO(nweiz): remove this when issue 7964 is fixed. | 328 // TODO(nweiz): remove this when issue 7964 is fixed. |
329 /// Returns a [Future] that will complete to the first element of [stream]. | 329 /// Returns a [Future] that will complete to the first element of [stream]. |
330 /// Unlike [Stream.first], this is safe to use with single-subscription streams. | 330 /// Unlike [Stream.first], this is safe to use with single-subscription streams. |
331 Future streamFirst(Stream stream) { | 331 Future streamFirst(Stream stream) { |
332 var completer = new Completer(); | 332 var completer = new Completer(); |
333 var subscription; | 333 var subscription; |
334 subscription = stream.listen((value) { | 334 subscription = stream.listen((value) { |
335 subscription.cancel(); | 335 subscription.cancel(); |
336 completer.complete(value); | 336 completer.complete(value); |
337 }, onError: (e) { | 337 }, onError: (e, [stackTrace]) { |
338 completer.completeError(e); | 338 completer.completeError(e, stackTrace); |
339 }, onDone: () { | 339 }, onDone: () { |
340 completer.completeError(new StateError("No elements")); | 340 completer.completeError(new StateError("No elements")); |
341 }, cancelOnError: true); | 341 }, cancelOnError: true); |
342 return completer.future; | 342 return completer.future; |
343 } | 343 } |
344 | 344 |
345 /// Returns a wrapped version of [stream] along with a [StreamSubscription] that | 345 /// Returns a wrapped version of [stream] along with a [StreamSubscription] that |
346 /// can be used to control the wrapped stream. | 346 /// can be used to control the wrapped stream. |
347 Pair<Stream, StreamSubscription> streamWithSubscription(Stream stream) { | 347 Pair<Stream, StreamSubscription> streamWithSubscription(Stream stream) { |
348 var controller = | 348 var controller = |
349 stream.isBroadcast ? new StreamController.broadcast(sync: true) | 349 stream.isBroadcast ? new StreamController.broadcast(sync: true) |
350 : new StreamController(sync: true); | 350 : new StreamController(sync: true); |
351 var subscription = stream.listen(controller.add, | 351 var subscription = stream.listen(controller.add, |
352 onError: controller.addError, | 352 onError: controller.addError, |
353 onDone: controller.close); | 353 onDone: controller.close); |
354 return new Pair<Stream, StreamSubscription>(controller.stream, subscription); | 354 return new Pair<Stream, StreamSubscription>(controller.stream, subscription); |
355 } | 355 } |
356 | 356 |
357 // TODO(nweiz): remove this when issue 7787 is fixed. | 357 // TODO(nweiz): remove this when issue 7787 is fixed. |
358 /// Creates two single-subscription [Stream]s that each emit all values and | 358 /// Creates two single-subscription [Stream]s that each emit all values and |
359 /// errors from [stream]. This is useful if [stream] is single-subscription but | 359 /// errors from [stream]. This is useful if [stream] is single-subscription but |
360 /// multiple subscribers are necessary. | 360 /// multiple subscribers are necessary. |
361 Pair<Stream, Stream> tee(Stream stream) { | 361 Pair<Stream, Stream> tee(Stream stream) { |
362 var controller1 = new StreamController(sync: true); | 362 var controller1 = new StreamController(sync: true); |
363 var controller2 = new StreamController(sync: true); | 363 var controller2 = new StreamController(sync: true); |
364 stream.listen((value) { | 364 stream.listen((value) { |
365 controller1.add(value); | 365 controller1.add(value); |
366 controller2.add(value); | 366 controller2.add(value); |
367 }, onError: (error) { | 367 }, onError: (error, [stackTrace]) { |
368 controller1.addError(error); | 368 controller1.addError(error, stackTrace); |
369 controller2.addError(error); | 369 controller2.addError(error, stackTrace); |
370 }, onDone: () { | 370 }, onDone: () { |
371 controller1.close(); | 371 controller1.close(); |
372 controller2.close(); | 372 controller2.close(); |
373 }); | 373 }); |
374 return new Pair<Stream, Stream>(controller1.stream, controller2.stream); | 374 return new Pair<Stream, Stream>(controller1.stream, controller2.stream); |
375 } | 375 } |
376 | 376 |
377 /// Merges [stream1] and [stream2] into a single stream that emits events from | 377 /// Merges [stream1] and [stream2] into a single stream that emits events from |
378 /// both sources. | 378 /// both sources. |
379 Stream mergeStreams(Stream stream1, Stream stream2) { | 379 Stream mergeStreams(Stream stream1, Stream stream2) { |
380 var doneCount = 0; | 380 var doneCount = 0; |
381 var controller = new StreamController(sync: true); | 381 var controller = new StreamController(sync: true); |
382 | 382 |
383 for (var stream in [stream1, stream2]) { | 383 for (var stream in [stream1, stream2]) { |
384 stream.listen((value) { | 384 stream.listen( |
385 controller.add(value); | 385 controller.add, |
386 }, onError: (error) { | 386 onError: controller.addError, |
387 controller.addError(error); | 387 onDone: () { |
388 }, onDone: () { | |
389 doneCount++; | 388 doneCount++; |
390 if (doneCount == 2) controller.close(); | 389 if (doneCount == 2) controller.close(); |
391 }); | 390 }); |
392 } | 391 } |
393 | 392 |
394 return controller.stream; | 393 return controller.stream; |
395 } | 394 } |
396 | 395 |
397 /// A regular expression matching a trailing CR character. | 396 /// A regular expression matching a trailing CR character. |
398 final _trailingCR = new RegExp(r"\r$"); | 397 final _trailingCR = new RegExp(r"\r$"); |
(...skipping 336 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
735 error is DirectoryException || | 734 error is DirectoryException || |
736 error is FileException || | 735 error is FileException || |
737 error is HttpException || | 736 error is HttpException || |
738 error is HttpException || | 737 error is HttpException || |
739 error is LinkException || | 738 error is LinkException || |
740 error is OSError || | 739 error is OSError || |
741 error is ProcessException || | 740 error is ProcessException || |
742 error is SocketException || | 741 error is SocketException || |
743 error is WebSocketException; | 742 error is WebSocketException; |
744 } | 743 } |
OLD | NEW |