| OLD | NEW |
| 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
| 8 // Core Stream types | 8 // Core Stream types |
| 9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
| 10 | 10 |
| (...skipping 366 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 377 * If [convert] throws, the returned stream reports the exception as an error | 377 * If [convert] throws, the returned stream reports the exception as an error |
| 378 * event instead. | 378 * event instead. |
| 379 * | 379 * |
| 380 * Error and done events are passed through unchanged to the returned stream. | 380 * Error and done events are passed through unchanged to the returned stream. |
| 381 * | 381 * |
| 382 * The returned stream is a broadcast stream if this stream is. | 382 * The returned stream is a broadcast stream if this stream is. |
| 383 * The [convert] function is called once per data event per listener. | 383 * The [convert] function is called once per data event per listener. |
| 384 * If a broadcast stream is listened to more than once, each subscription | 384 * If a broadcast stream is listened to more than once, each subscription |
| 385 * will individually call [convert] on each data event. | 385 * will individually call [convert] on each data event. |
| 386 */ | 386 */ |
| 387 Stream/*<S>*/ map/*<S>*/(/*=S*/ convert(T event)) { | 387 Stream<S> map<S>(S convert(T event)) { |
| 388 return new _MapStream<T, dynamic/*=S*/>(this, convert); | 388 return new _MapStream<T, S>(this, convert); |
| 389 } | 389 } |
| 390 | 390 |
| 391 /** | 391 /** |
| 392 * Creates a new stream with each data event of this stream asynchronously | 392 * Creates a new stream with each data event of this stream asynchronously |
| 393 * mapped to a new event. | 393 * mapped to a new event. |
| 394 * | 394 * |
| 395 * This acts like [map], except that [convert] may return a [Future], | 395 * This acts like [map], except that [convert] may return a [Future], |
| 396 * and in that case, the stream waits for that future to complete before | 396 * and in that case, the stream waits for that future to complete before |
| 397 * continuing with its result. | 397 * continuing with its result. |
| 398 * | 398 * |
| 399 * The returned stream is a broadcast stream if this stream is. | 399 * The returned stream is a broadcast stream if this stream is. |
| 400 */ | 400 */ |
| 401 Stream/*<E>*/ asyncMap/*<E>*/(convert(T event)) { | 401 Stream<E> asyncMap<E>(convert(T event)) { |
| 402 StreamController/*<E>*/ controller; | 402 StreamController<E> controller; |
| 403 StreamSubscription/*<T>*/ subscription; | 403 StreamSubscription<T> subscription; |
| 404 | 404 |
| 405 void onListen() { | 405 void onListen() { |
| 406 final add = controller.add; | 406 final add = controller.add; |
| 407 assert(controller is _StreamController || | 407 assert(controller is _StreamController || |
| 408 controller is _BroadcastStreamController); | 408 controller is _BroadcastStreamController); |
| 409 final _EventSink/*<E>*/ eventSink = | 409 final _EventSink<E> eventSink = |
| 410 controller as Object /*=_EventSink<E>*/; | 410 controller as Object /*=_EventSink<E>*/; |
| 411 final addError = eventSink._addError; | 411 final addError = eventSink._addError; |
| 412 subscription = this.listen( | 412 subscription = this.listen( |
| 413 (T event) { | 413 (T event) { |
| 414 dynamic newValue; | 414 dynamic newValue; |
| 415 try { | 415 try { |
| 416 newValue = convert(event); | 416 newValue = convert(event); |
| 417 } catch (e, s) { | 417 } catch (e, s) { |
| 418 controller.addError(e, s); | 418 controller.addError(e, s); |
| 419 return; | 419 return; |
| 420 } | 420 } |
| 421 if (newValue is Future) { | 421 if (newValue is Future) { |
| 422 subscription.pause(); | 422 subscription.pause(); |
| 423 newValue.then(add, onError: addError) | 423 newValue.then(add, onError: addError) |
| 424 .whenComplete(subscription.resume); | 424 .whenComplete(subscription.resume); |
| 425 } else { | 425 } else { |
| 426 controller.add(newValue as Object/*=E*/); | 426 controller.add(newValue as Object/*=E*/); |
| 427 } | 427 } |
| 428 }, | 428 }, |
| 429 onError: addError, | 429 onError: addError, |
| 430 onDone: controller.close | 430 onDone: controller.close |
| 431 ); | 431 ); |
| 432 } | 432 } |
| 433 | 433 |
| 434 if (this.isBroadcast) { | 434 if (this.isBroadcast) { |
| 435 controller = new StreamController/*<E>*/.broadcast( | 435 controller = new StreamController<E>.broadcast( |
| 436 onListen: onListen, | 436 onListen: onListen, |
| 437 onCancel: () { subscription.cancel(); }, | 437 onCancel: () { subscription.cancel(); }, |
| 438 sync: true | 438 sync: true |
| 439 ); | 439 ); |
| 440 } else { | 440 } else { |
| 441 controller = new StreamController/*<E>*/( | 441 controller = new StreamController<E>( |
| 442 onListen: onListen, | 442 onListen: onListen, |
| 443 onPause: () { subscription.pause(); }, | 443 onPause: () { subscription.pause(); }, |
| 444 onResume: () { subscription.resume(); }, | 444 onResume: () { subscription.resume(); }, |
| 445 onCancel: () => subscription.cancel(), | 445 onCancel: () => subscription.cancel(), |
| 446 sync: true | 446 sync: true |
| 447 ); | 447 ); |
| 448 } | 448 } |
| 449 return controller.stream; | 449 return controller.stream; |
| 450 } | 450 } |
| 451 | 451 |
| 452 /** | 452 /** |
| 453 * Creates a new stream with the events of a stream per original event. | 453 * Creates a new stream with the events of a stream per original event. |
| 454 * | 454 * |
| 455 * This acts like [expand], except that [convert] returns a [Stream] | 455 * This acts like [expand], except that [convert] returns a [Stream] |
| 456 * instead of an [Iterable]. | 456 * instead of an [Iterable]. |
| 457 * The events of the returned stream becomes the events of the returned | 457 * The events of the returned stream becomes the events of the returned |
| 458 * stream, in the order they are produced. | 458 * stream, in the order they are produced. |
| 459 * | 459 * |
| 460 * If [convert] returns `null`, no value is put on the output stream, | 460 * If [convert] returns `null`, no value is put on the output stream, |
| 461 * just as if it returned an empty stream. | 461 * just as if it returned an empty stream. |
| 462 * | 462 * |
| 463 * The returned stream is a broadcast stream if this stream is. | 463 * The returned stream is a broadcast stream if this stream is. |
| 464 */ | 464 */ |
| 465 Stream/*<E>*/ asyncExpand/*<E>*/(Stream/*<E>*/ convert(T event)) { | 465 Stream<E> asyncExpand<E>(Stream<E> convert(T event)) { |
| 466 StreamController/*<E>*/ controller; | 466 StreamController<E> controller; |
| 467 StreamSubscription<T> subscription; | 467 StreamSubscription<T> subscription; |
| 468 void onListen() { | 468 void onListen() { |
| 469 assert(controller is _StreamController || | 469 assert(controller is _StreamController || |
| 470 controller is _BroadcastStreamController); | 470 controller is _BroadcastStreamController); |
| 471 final _EventSink/*<E>*/ eventSink = | 471 final _EventSink<E> eventSink = |
| 472 controller as Object /*=_EventSink<E>*/; | 472 controller as Object /*=_EventSink<E>*/; |
| 473 subscription = this.listen( | 473 subscription = this.listen( |
| 474 (T event) { | 474 (T event) { |
| 475 Stream/*<E>*/ newStream; | 475 Stream<E> newStream; |
| 476 try { | 476 try { |
| 477 newStream = convert(event); | 477 newStream = convert(event); |
| 478 } catch (e, s) { | 478 } catch (e, s) { |
| 479 controller.addError(e, s); | 479 controller.addError(e, s); |
| 480 return; | 480 return; |
| 481 } | 481 } |
| 482 if (newStream != null) { | 482 if (newStream != null) { |
| 483 subscription.pause(); | 483 subscription.pause(); |
| 484 controller.addStream(newStream) | 484 controller.addStream(newStream) |
| 485 .whenComplete(subscription.resume); | 485 .whenComplete(subscription.resume); |
| 486 } | 486 } |
| 487 }, | 487 }, |
| 488 onError: eventSink._addError, // Avoid Zone error replacement. | 488 onError: eventSink._addError, // Avoid Zone error replacement. |
| 489 onDone: controller.close | 489 onDone: controller.close |
| 490 ); | 490 ); |
| 491 } | 491 } |
| 492 if (this.isBroadcast) { | 492 if (this.isBroadcast) { |
| 493 controller = new StreamController/*<E>*/.broadcast( | 493 controller = new StreamController<E>.broadcast( |
| 494 onListen: onListen, | 494 onListen: onListen, |
| 495 onCancel: () { subscription.cancel(); }, | 495 onCancel: () { subscription.cancel(); }, |
| 496 sync: true | 496 sync: true |
| 497 ); | 497 ); |
| 498 } else { | 498 } else { |
| 499 controller = new StreamController/*<E>*/( | 499 controller = new StreamController<E>( |
| 500 onListen: onListen, | 500 onListen: onListen, |
| 501 onPause: () { subscription.pause(); }, | 501 onPause: () { subscription.pause(); }, |
| 502 onResume: () { subscription.resume(); }, | 502 onResume: () { subscription.resume(); }, |
| 503 onCancel: () => subscription.cancel(), | 503 onCancel: () => subscription.cancel(), |
| 504 sync: true | 504 sync: true |
| 505 ); | 505 ); |
| 506 } | 506 } |
| 507 return controller.stream; | 507 return controller.stream; |
| 508 } | 508 } |
| 509 | 509 |
| (...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 544 * into zero or more events. | 544 * into zero or more events. |
| 545 * | 545 * |
| 546 * Each incoming event is converted to an [Iterable] of new events, | 546 * Each incoming event is converted to an [Iterable] of new events, |
| 547 * and each of these new events are then sent by the returned stream | 547 * and each of these new events are then sent by the returned stream |
| 548 * in order. | 548 * in order. |
| 549 * | 549 * |
| 550 * The returned stream is a broadcast stream if this stream is. | 550 * The returned stream is a broadcast stream if this stream is. |
| 551 * If a broadcast stream is listened to more than once, each subscription | 551 * If a broadcast stream is listened to more than once, each subscription |
| 552 * will individually call `convert` and expand the events. | 552 * will individually call `convert` and expand the events. |
| 553 */ | 553 */ |
| 554 Stream/*<S>*/ expand/*<S>*/(Iterable/*<S>*/ convert(T value)) { | 554 Stream<S> expand<S>(Iterable<S> convert(T value)) { |
| 555 return new _ExpandStream<T, dynamic/*=S*/>(this, convert); | 555 return new _ExpandStream<T, S>(this, convert); |
| 556 } | 556 } |
| 557 | 557 |
| 558 /** | 558 /** |
| 559 * Pipe the events of this stream into [streamConsumer]. | 559 * Pipe the events of this stream into [streamConsumer]. |
| 560 * | 560 * |
| 561 * The events of this stream are added to `streamConsumer` using | 561 * The events of this stream are added to `streamConsumer` using |
| 562 * [StreamConsumer.addStream]. | 562 * [StreamConsumer.addStream]. |
| 563 * The `streamConsumer` is closed when this stream has been successfully added | 563 * The `streamConsumer` is closed when this stream has been successfully added |
| 564 * to it - when the future returned by `addStream` completes without an error. | 564 * to it - when the future returned by `addStream` completes without an error. |
| 565 * | 565 * |
| (...skipping 12 matching lines...) Expand all Loading... |
| 578 } | 578 } |
| 579 | 579 |
| 580 /** | 580 /** |
| 581 * Chains this stream as the input of the provided [StreamTransformer]. | 581 * Chains this stream as the input of the provided [StreamTransformer]. |
| 582 * | 582 * |
| 583 * Returns the result of [:streamTransformer.bind:] itself. | 583 * Returns the result of [:streamTransformer.bind:] itself. |
| 584 * | 584 * |
| 585 * The `streamTransformer` can decide whether it wants to return a | 585 * The `streamTransformer` can decide whether it wants to return a |
| 586 * broadcast stream or not. | 586 * broadcast stream or not. |
| 587 */ | 587 */ |
| 588 Stream/*<S>*/ transform/*<S>*/( | 588 Stream<S> transform<S>( |
| 589 StreamTransformer<T, dynamic/*=S*/ > streamTransformer) { | 589 StreamTransformer<T, S > streamTransformer) { |
| 590 return streamTransformer.bind(this); | 590 return streamTransformer.bind(this); |
| 591 } | 591 } |
| 592 | 592 |
| 593 /** | 593 /** |
| 594 * Reduces a sequence of values by repeatedly applying [combine]. | 594 * Reduces a sequence of values by repeatedly applying [combine]. |
| 595 */ | 595 */ |
| 596 Future<T> reduce(T combine(T previous, T element)) { | 596 Future<T> reduce(T combine(T previous, T element)) { |
| 597 _Future<T> result = new _Future<T>(); | 597 _Future<T> result = new _Future<T>(); |
| 598 bool seenFirst = false; | 598 bool seenFirst = false; |
| 599 T value; | 599 T value; |
| (...skipping 20 matching lines...) Expand all Loading... |
| 620 } else { | 620 } else { |
| 621 result._complete(value); | 621 result._complete(value); |
| 622 } | 622 } |
| 623 }, | 623 }, |
| 624 cancelOnError: true | 624 cancelOnError: true |
| 625 ); | 625 ); |
| 626 return result; | 626 return result; |
| 627 } | 627 } |
| 628 | 628 |
| 629 /** Reduces a sequence of values by repeatedly applying [combine]. */ | 629 /** Reduces a sequence of values by repeatedly applying [combine]. */ |
| 630 Future/*<S>*/ fold/*<S>*/(var/*=S*/ initialValue, | 630 Future<S> fold<S>(S initialValue, |
| 631 /*=S*/ combine(var/*=S*/ previous, T element)) { | 631 S combine(S previous, T element)) { |
| 632 | 632 |
| 633 _Future/*<S>*/ result = new _Future/*<S>*/(); | 633 _Future<S> result = new _Future<S>(); |
| 634 var/*=S*/ value = initialValue; | 634 S value = initialValue; |
| 635 StreamSubscription subscription; | 635 StreamSubscription subscription; |
| 636 subscription = this.listen( | 636 subscription = this.listen( |
| 637 (T element) { | 637 (T element) { |
| 638 _runUserCode( | 638 _runUserCode( |
| 639 () => combine(value, element), | 639 () => combine(value, element), |
| 640 (/*=S*/ newValue) { value = newValue; }, | 640 (S newValue) { value = newValue; }, |
| 641 _cancelAndErrorClosure(subscription, result) | 641 _cancelAndErrorClosure(subscription, result) |
| 642 ); | 642 ); |
| 643 }, | 643 }, |
| 644 onError: (e, st) { | 644 onError: (e, st) { |
| 645 result._completeError(e, st); | 645 result._completeError(e, st); |
| 646 }, | 646 }, |
| 647 onDone: () { | 647 onDone: () { |
| 648 result._complete(value); | 648 result._complete(value); |
| 649 }, | 649 }, |
| 650 cancelOnError: true); | 650 cancelOnError: true); |
| (...skipping 241 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 892 * Discards all data on the stream, but signals when it's done or an error | 892 * Discards all data on the stream, but signals when it's done or an error |
| 893 * occured. | 893 * occured. |
| 894 * | 894 * |
| 895 * When subscribing using [drain], cancelOnError will be true. This means | 895 * When subscribing using [drain], cancelOnError will be true. This means |
| 896 * that the future will complete with the first error on the stream and then | 896 * that the future will complete with the first error on the stream and then |
| 897 * cancel the subscription. | 897 * cancel the subscription. |
| 898 * | 898 * |
| 899 * In case of a `done` event the future completes with the given | 899 * In case of a `done` event the future completes with the given |
| 900 * [futureValue]. | 900 * [futureValue]. |
| 901 */ | 901 */ |
| 902 Future/*<E>*/ drain/*<E>*/([/*=E*/ futureValue]) | 902 Future<E> drain<E>([E futureValue]) |
| 903 => listen(null, cancelOnError: true).asFuture/*<E>*/(futureValue); | 903 => listen(null, cancelOnError: true).asFuture<E>(futureValue); |
| 904 | 904 |
| 905 /** | 905 /** |
| 906 * Provides at most the first [count] data events of this stream. | 906 * Provides at most the first [count] data events of this stream. |
| 907 * | 907 * |
| 908 * Forwards all events of this stream to the returned stream | 908 * Forwards all events of this stream to the returned stream |
| 909 * until [count] data events have been forwarded or this stream ends, | 909 * until [count] data events have been forwarded or this stream ends, |
| 910 * then ends the returned stream with a done event. | 910 * then ends the returned stream with a done event. |
| 911 * | 911 * |
| 912 * If this stream produces fewer than [count] data events before it's done, | 912 * If this stream produces fewer than [count] data events before it's done, |
| 913 * so will the returned stream. | 913 * so will the returned stream. |
| (...skipping 430 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1344 zone = Zone.current; | 1344 zone = Zone.current; |
| 1345 if (onTimeout == null) { | 1345 if (onTimeout == null) { |
| 1346 timeout = () { | 1346 timeout = () { |
| 1347 controller.addError(new TimeoutException("No stream event", | 1347 controller.addError(new TimeoutException("No stream event", |
| 1348 timeLimit), null); | 1348 timeLimit), null); |
| 1349 }; | 1349 }; |
| 1350 } else { | 1350 } else { |
| 1351 // TODO(floitsch): the return type should be 'void', and the type | 1351 // TODO(floitsch): the return type should be 'void', and the type |
| 1352 // should be inferred. | 1352 // should be inferred. |
| 1353 var registeredOnTimeout = | 1353 var registeredOnTimeout = |
| 1354 zone.registerUnaryCallback/*<dynamic, EventSink<T>>*/(onTimeout); | 1354 zone.registerUnaryCallback<dynamic, EventSink<T>>(onTimeout); |
| 1355 _ControllerEventSinkWrapper wrapper = | 1355 _ControllerEventSinkWrapper wrapper = |
| 1356 new _ControllerEventSinkWrapper(null); | 1356 new _ControllerEventSinkWrapper(null); |
| 1357 timeout = () { | 1357 timeout = () { |
| 1358 wrapper._sink = controller; // Only valid during call. | 1358 wrapper._sink = controller; // Only valid during call. |
| 1359 zone.runUnaryGuarded(registeredOnTimeout, wrapper); | 1359 zone.runUnaryGuarded(registeredOnTimeout, wrapper); |
| 1360 wrapper._sink = null; | 1360 wrapper._sink = null; |
| 1361 }; | 1361 }; |
| 1362 } | 1362 } |
| 1363 | 1363 |
| 1364 subscription = this.listen(onData, onError: onError, onDone: onDone); | 1364 subscription = this.listen(onData, onError: onError, onDone: onDone); |
| (...skipping 126 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1491 * | 1491 * |
| 1492 * This method *overwrites* the existing [onDone] and [onError] callbacks | 1492 * This method *overwrites* the existing [onDone] and [onError] callbacks |
| 1493 * with new ones that complete the returned future. | 1493 * with new ones that complete the returned future. |
| 1494 * | 1494 * |
| 1495 * In case of an error the subscription will automatically cancel (even | 1495 * In case of an error the subscription will automatically cancel (even |
| 1496 * when it was listening with `cancelOnError` set to `false`). | 1496 * when it was listening with `cancelOnError` set to `false`). |
| 1497 * | 1497 * |
| 1498 * In case of a `done` event the future completes with the given | 1498 * In case of a `done` event the future completes with the given |
| 1499 * [futureValue]. | 1499 * [futureValue]. |
| 1500 */ | 1500 */ |
| 1501 Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]); | 1501 Future<E> asFuture<E>([E futureValue]); |
| 1502 } | 1502 } |
| 1503 | 1503 |
| 1504 | 1504 |
| 1505 /** | 1505 /** |
| 1506 * An interface that abstracts creation or handling of [Stream] events. | 1506 * An interface that abstracts creation or handling of [Stream] events. |
| 1507 */ | 1507 */ |
| 1508 abstract class EventSink<T> implements Sink<T> { | 1508 abstract class EventSink<T> implements Sink<T> { |
| 1509 /** Send a data event to a stream. */ | 1509 /** Send a data event to a stream. */ |
| 1510 void add(T event); | 1510 void add(T event); |
| 1511 | 1511 |
| (...skipping 315 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1827 class _ControllerEventSinkWrapper<T> implements EventSink<T> { | 1827 class _ControllerEventSinkWrapper<T> implements EventSink<T> { |
| 1828 EventSink _sink; | 1828 EventSink _sink; |
| 1829 _ControllerEventSinkWrapper(this._sink); | 1829 _ControllerEventSinkWrapper(this._sink); |
| 1830 | 1830 |
| 1831 void add(T data) { _sink.add(data); } | 1831 void add(T data) { _sink.add(data); } |
| 1832 void addError(error, [StackTrace stackTrace]) { | 1832 void addError(error, [StackTrace stackTrace]) { |
| 1833 _sink.addError(error, stackTrace); | 1833 _sink.addError(error, stackTrace); |
| 1834 } | 1834 } |
| 1835 void close() { _sink.close(); } | 1835 void close() { _sink.close(); } |
| 1836 } | 1836 } |
| OLD | NEW |