Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(301)

Side by Side Diff: sdk/lib/async/stream.dart

Issue 2529393002: Make core libraries use generic method syntax. (Closed)
Patch Set: Update status files. Created 3 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « sdk/lib/async/future_impl.dart ('k') | sdk/lib/async/stream_controller.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Core Stream types 8 // Core Stream types
9 // ------------------------------------------------------------------- 9 // -------------------------------------------------------------------
10 10
(...skipping 366 matching lines...) Expand 10 before | Expand all | Expand 10 after
377 * If [convert] throws, the returned stream reports the exception as an error 377 * If [convert] throws, the returned stream reports the exception as an error
378 * event instead. 378 * event instead.
379 * 379 *
380 * Error and done events are passed through unchanged to the returned stream. 380 * Error and done events are passed through unchanged to the returned stream.
381 * 381 *
382 * The returned stream is a broadcast stream if this stream is. 382 * The returned stream is a broadcast stream if this stream is.
383 * The [convert] function is called once per data event per listener. 383 * The [convert] function is called once per data event per listener.
384 * If a broadcast stream is listened to more than once, each subscription 384 * If a broadcast stream is listened to more than once, each subscription
385 * will individually call [convert] on each data event. 385 * will individually call [convert] on each data event.
386 */ 386 */
387 Stream/*<S>*/ map/*<S>*/(/*=S*/ convert(T event)) { 387 Stream<S> map<S>(S convert(T event)) {
388 return new _MapStream<T, dynamic/*=S*/>(this, convert); 388 return new _MapStream<T, S>(this, convert);
389 } 389 }
390 390
391 /** 391 /**
392 * Creates a new stream with each data event of this stream asynchronously 392 * Creates a new stream with each data event of this stream asynchronously
393 * mapped to a new event. 393 * mapped to a new event.
394 * 394 *
395 * This acts like [map], except that [convert] may return a [Future], 395 * This acts like [map], except that [convert] may return a [Future],
396 * and in that case, the stream waits for that future to complete before 396 * and in that case, the stream waits for that future to complete before
397 * continuing with its result. 397 * continuing with its result.
398 * 398 *
399 * The returned stream is a broadcast stream if this stream is. 399 * The returned stream is a broadcast stream if this stream is.
400 */ 400 */
401 Stream/*<E>*/ asyncMap/*<E>*/(convert(T event)) { 401 Stream<E> asyncMap<E>(convert(T event)) {
402 StreamController/*<E>*/ controller; 402 StreamController<E> controller;
403 StreamSubscription/*<T>*/ subscription; 403 StreamSubscription<T> subscription;
404 404
405 void onListen() { 405 void onListen() {
406 final add = controller.add; 406 final add = controller.add;
407 assert(controller is _StreamController || 407 assert(controller is _StreamController ||
408 controller is _BroadcastStreamController); 408 controller is _BroadcastStreamController);
409 final _EventSink/*<E>*/ eventSink = 409 final _EventSink<E> eventSink =
410 controller as Object /*=_EventSink<E>*/; 410 controller as Object /*=_EventSink<E>*/;
411 final addError = eventSink._addError; 411 final addError = eventSink._addError;
412 subscription = this.listen( 412 subscription = this.listen(
413 (T event) { 413 (T event) {
414 dynamic newValue; 414 dynamic newValue;
415 try { 415 try {
416 newValue = convert(event); 416 newValue = convert(event);
417 } catch (e, s) { 417 } catch (e, s) {
418 controller.addError(e, s); 418 controller.addError(e, s);
419 return; 419 return;
420 } 420 }
421 if (newValue is Future) { 421 if (newValue is Future) {
422 subscription.pause(); 422 subscription.pause();
423 newValue.then(add, onError: addError) 423 newValue.then(add, onError: addError)
424 .whenComplete(subscription.resume); 424 .whenComplete(subscription.resume);
425 } else { 425 } else {
426 controller.add(newValue as Object/*=E*/); 426 controller.add(newValue as Object/*=E*/);
427 } 427 }
428 }, 428 },
429 onError: addError, 429 onError: addError,
430 onDone: controller.close 430 onDone: controller.close
431 ); 431 );
432 } 432 }
433 433
434 if (this.isBroadcast) { 434 if (this.isBroadcast) {
435 controller = new StreamController/*<E>*/.broadcast( 435 controller = new StreamController<E>.broadcast(
436 onListen: onListen, 436 onListen: onListen,
437 onCancel: () { subscription.cancel(); }, 437 onCancel: () { subscription.cancel(); },
438 sync: true 438 sync: true
439 ); 439 );
440 } else { 440 } else {
441 controller = new StreamController/*<E>*/( 441 controller = new StreamController<E>(
442 onListen: onListen, 442 onListen: onListen,
443 onPause: () { subscription.pause(); }, 443 onPause: () { subscription.pause(); },
444 onResume: () { subscription.resume(); }, 444 onResume: () { subscription.resume(); },
445 onCancel: () => subscription.cancel(), 445 onCancel: () => subscription.cancel(),
446 sync: true 446 sync: true
447 ); 447 );
448 } 448 }
449 return controller.stream; 449 return controller.stream;
450 } 450 }
451 451
452 /** 452 /**
453 * Creates a new stream with the events of a stream per original event. 453 * Creates a new stream with the events of a stream per original event.
454 * 454 *
455 * This acts like [expand], except that [convert] returns a [Stream] 455 * This acts like [expand], except that [convert] returns a [Stream]
456 * instead of an [Iterable]. 456 * instead of an [Iterable].
457 * The events of the returned stream becomes the events of the returned 457 * The events of the returned stream becomes the events of the returned
458 * stream, in the order they are produced. 458 * stream, in the order they are produced.
459 * 459 *
460 * If [convert] returns `null`, no value is put on the output stream, 460 * If [convert] returns `null`, no value is put on the output stream,
461 * just as if it returned an empty stream. 461 * just as if it returned an empty stream.
462 * 462 *
463 * The returned stream is a broadcast stream if this stream is. 463 * The returned stream is a broadcast stream if this stream is.
464 */ 464 */
465 Stream/*<E>*/ asyncExpand/*<E>*/(Stream/*<E>*/ convert(T event)) { 465 Stream<E> asyncExpand<E>(Stream<E> convert(T event)) {
466 StreamController/*<E>*/ controller; 466 StreamController<E> controller;
467 StreamSubscription<T> subscription; 467 StreamSubscription<T> subscription;
468 void onListen() { 468 void onListen() {
469 assert(controller is _StreamController || 469 assert(controller is _StreamController ||
470 controller is _BroadcastStreamController); 470 controller is _BroadcastStreamController);
471 final _EventSink/*<E>*/ eventSink = 471 final _EventSink<E> eventSink =
472 controller as Object /*=_EventSink<E>*/; 472 controller as Object /*=_EventSink<E>*/;
473 subscription = this.listen( 473 subscription = this.listen(
474 (T event) { 474 (T event) {
475 Stream/*<E>*/ newStream; 475 Stream<E> newStream;
476 try { 476 try {
477 newStream = convert(event); 477 newStream = convert(event);
478 } catch (e, s) { 478 } catch (e, s) {
479 controller.addError(e, s); 479 controller.addError(e, s);
480 return; 480 return;
481 } 481 }
482 if (newStream != null) { 482 if (newStream != null) {
483 subscription.pause(); 483 subscription.pause();
484 controller.addStream(newStream) 484 controller.addStream(newStream)
485 .whenComplete(subscription.resume); 485 .whenComplete(subscription.resume);
486 } 486 }
487 }, 487 },
488 onError: eventSink._addError, // Avoid Zone error replacement. 488 onError: eventSink._addError, // Avoid Zone error replacement.
489 onDone: controller.close 489 onDone: controller.close
490 ); 490 );
491 } 491 }
492 if (this.isBroadcast) { 492 if (this.isBroadcast) {
493 controller = new StreamController/*<E>*/.broadcast( 493 controller = new StreamController<E>.broadcast(
494 onListen: onListen, 494 onListen: onListen,
495 onCancel: () { subscription.cancel(); }, 495 onCancel: () { subscription.cancel(); },
496 sync: true 496 sync: true
497 ); 497 );
498 } else { 498 } else {
499 controller = new StreamController/*<E>*/( 499 controller = new StreamController<E>(
500 onListen: onListen, 500 onListen: onListen,
501 onPause: () { subscription.pause(); }, 501 onPause: () { subscription.pause(); },
502 onResume: () { subscription.resume(); }, 502 onResume: () { subscription.resume(); },
503 onCancel: () => subscription.cancel(), 503 onCancel: () => subscription.cancel(),
504 sync: true 504 sync: true
505 ); 505 );
506 } 506 }
507 return controller.stream; 507 return controller.stream;
508 } 508 }
509 509
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after
544 * into zero or more events. 544 * into zero or more events.
545 * 545 *
546 * Each incoming event is converted to an [Iterable] of new events, 546 * Each incoming event is converted to an [Iterable] of new events,
547 * and each of these new events are then sent by the returned stream 547 * and each of these new events are then sent by the returned stream
548 * in order. 548 * in order.
549 * 549 *
550 * The returned stream is a broadcast stream if this stream is. 550 * The returned stream is a broadcast stream if this stream is.
551 * If a broadcast stream is listened to more than once, each subscription 551 * If a broadcast stream is listened to more than once, each subscription
552 * will individually call `convert` and expand the events. 552 * will individually call `convert` and expand the events.
553 */ 553 */
554 Stream/*<S>*/ expand/*<S>*/(Iterable/*<S>*/ convert(T value)) { 554 Stream<S> expand<S>(Iterable<S> convert(T value)) {
555 return new _ExpandStream<T, dynamic/*=S*/>(this, convert); 555 return new _ExpandStream<T, S>(this, convert);
556 } 556 }
557 557
558 /** 558 /**
559 * Pipe the events of this stream into [streamConsumer]. 559 * Pipe the events of this stream into [streamConsumer].
560 * 560 *
561 * The events of this stream are added to `streamConsumer` using 561 * The events of this stream are added to `streamConsumer` using
562 * [StreamConsumer.addStream]. 562 * [StreamConsumer.addStream].
563 * The `streamConsumer` is closed when this stream has been successfully added 563 * The `streamConsumer` is closed when this stream has been successfully added
564 * to it - when the future returned by `addStream` completes without an error. 564 * to it - when the future returned by `addStream` completes without an error.
565 * 565 *
(...skipping 12 matching lines...) Expand all
578 } 578 }
579 579
580 /** 580 /**
581 * Chains this stream as the input of the provided [StreamTransformer]. 581 * Chains this stream as the input of the provided [StreamTransformer].
582 * 582 *
583 * Returns the result of [:streamTransformer.bind:] itself. 583 * Returns the result of [:streamTransformer.bind:] itself.
584 * 584 *
585 * The `streamTransformer` can decide whether it wants to return a 585 * The `streamTransformer` can decide whether it wants to return a
586 * broadcast stream or not. 586 * broadcast stream or not.
587 */ 587 */
588 Stream/*<S>*/ transform/*<S>*/( 588 Stream<S> transform<S>(
589 StreamTransformer<T, dynamic/*=S*/ > streamTransformer) { 589 StreamTransformer<T, S > streamTransformer) {
590 return streamTransformer.bind(this); 590 return streamTransformer.bind(this);
591 } 591 }
592 592
593 /** 593 /**
594 * Reduces a sequence of values by repeatedly applying [combine]. 594 * Reduces a sequence of values by repeatedly applying [combine].
595 */ 595 */
596 Future<T> reduce(T combine(T previous, T element)) { 596 Future<T> reduce(T combine(T previous, T element)) {
597 _Future<T> result = new _Future<T>(); 597 _Future<T> result = new _Future<T>();
598 bool seenFirst = false; 598 bool seenFirst = false;
599 T value; 599 T value;
(...skipping 20 matching lines...) Expand all
620 } else { 620 } else {
621 result._complete(value); 621 result._complete(value);
622 } 622 }
623 }, 623 },
624 cancelOnError: true 624 cancelOnError: true
625 ); 625 );
626 return result; 626 return result;
627 } 627 }
628 628
629 /** Reduces a sequence of values by repeatedly applying [combine]. */ 629 /** Reduces a sequence of values by repeatedly applying [combine]. */
630 Future/*<S>*/ fold/*<S>*/(var/*=S*/ initialValue, 630 Future<S> fold<S>(S initialValue,
631 /*=S*/ combine(var/*=S*/ previous, T element)) { 631 S combine(S previous, T element)) {
632 632
633 _Future/*<S>*/ result = new _Future/*<S>*/(); 633 _Future<S> result = new _Future<S>();
634 var/*=S*/ value = initialValue; 634 S value = initialValue;
635 StreamSubscription subscription; 635 StreamSubscription subscription;
636 subscription = this.listen( 636 subscription = this.listen(
637 (T element) { 637 (T element) {
638 _runUserCode( 638 _runUserCode(
639 () => combine(value, element), 639 () => combine(value, element),
640 (/*=S*/ newValue) { value = newValue; }, 640 (S newValue) { value = newValue; },
641 _cancelAndErrorClosure(subscription, result) 641 _cancelAndErrorClosure(subscription, result)
642 ); 642 );
643 }, 643 },
644 onError: (e, st) { 644 onError: (e, st) {
645 result._completeError(e, st); 645 result._completeError(e, st);
646 }, 646 },
647 onDone: () { 647 onDone: () {
648 result._complete(value); 648 result._complete(value);
649 }, 649 },
650 cancelOnError: true); 650 cancelOnError: true);
(...skipping 241 matching lines...) Expand 10 before | Expand all | Expand 10 after
892 * Discards all data on the stream, but signals when it's done or an error 892 * Discards all data on the stream, but signals when it's done or an error
893 * occured. 893 * occured.
894 * 894 *
895 * When subscribing using [drain], cancelOnError will be true. This means 895 * When subscribing using [drain], cancelOnError will be true. This means
896 * that the future will complete with the first error on the stream and then 896 * that the future will complete with the first error on the stream and then
897 * cancel the subscription. 897 * cancel the subscription.
898 * 898 *
899 * In case of a `done` event the future completes with the given 899 * In case of a `done` event the future completes with the given
900 * [futureValue]. 900 * [futureValue].
901 */ 901 */
902 Future/*<E>*/ drain/*<E>*/([/*=E*/ futureValue]) 902 Future<E> drain<E>([E futureValue])
903 => listen(null, cancelOnError: true).asFuture/*<E>*/(futureValue); 903 => listen(null, cancelOnError: true).asFuture<E>(futureValue);
904 904
905 /** 905 /**
906 * Provides at most the first [count] data events of this stream. 906 * Provides at most the first [count] data events of this stream.
907 * 907 *
908 * Forwards all events of this stream to the returned stream 908 * Forwards all events of this stream to the returned stream
909 * until [count] data events have been forwarded or this stream ends, 909 * until [count] data events have been forwarded or this stream ends,
910 * then ends the returned stream with a done event. 910 * then ends the returned stream with a done event.
911 * 911 *
912 * If this stream produces fewer than [count] data events before it's done, 912 * If this stream produces fewer than [count] data events before it's done,
913 * so will the returned stream. 913 * so will the returned stream.
(...skipping 430 matching lines...) Expand 10 before | Expand all | Expand 10 after
1344 zone = Zone.current; 1344 zone = Zone.current;
1345 if (onTimeout == null) { 1345 if (onTimeout == null) {
1346 timeout = () { 1346 timeout = () {
1347 controller.addError(new TimeoutException("No stream event", 1347 controller.addError(new TimeoutException("No stream event",
1348 timeLimit), null); 1348 timeLimit), null);
1349 }; 1349 };
1350 } else { 1350 } else {
1351 // TODO(floitsch): the return type should be 'void', and the type 1351 // TODO(floitsch): the return type should be 'void', and the type
1352 // should be inferred. 1352 // should be inferred.
1353 var registeredOnTimeout = 1353 var registeredOnTimeout =
1354 zone.registerUnaryCallback/*<dynamic, EventSink<T>>*/(onTimeout); 1354 zone.registerUnaryCallback<dynamic, EventSink<T>>(onTimeout);
1355 _ControllerEventSinkWrapper wrapper = 1355 _ControllerEventSinkWrapper wrapper =
1356 new _ControllerEventSinkWrapper(null); 1356 new _ControllerEventSinkWrapper(null);
1357 timeout = () { 1357 timeout = () {
1358 wrapper._sink = controller; // Only valid during call. 1358 wrapper._sink = controller; // Only valid during call.
1359 zone.runUnaryGuarded(registeredOnTimeout, wrapper); 1359 zone.runUnaryGuarded(registeredOnTimeout, wrapper);
1360 wrapper._sink = null; 1360 wrapper._sink = null;
1361 }; 1361 };
1362 } 1362 }
1363 1363
1364 subscription = this.listen(onData, onError: onError, onDone: onDone); 1364 subscription = this.listen(onData, onError: onError, onDone: onDone);
(...skipping 126 matching lines...) Expand 10 before | Expand all | Expand 10 after
1491 * 1491 *
1492 * This method *overwrites* the existing [onDone] and [onError] callbacks 1492 * This method *overwrites* the existing [onDone] and [onError] callbacks
1493 * with new ones that complete the returned future. 1493 * with new ones that complete the returned future.
1494 * 1494 *
1495 * In case of an error the subscription will automatically cancel (even 1495 * In case of an error the subscription will automatically cancel (even
1496 * when it was listening with `cancelOnError` set to `false`). 1496 * when it was listening with `cancelOnError` set to `false`).
1497 * 1497 *
1498 * In case of a `done` event the future completes with the given 1498 * In case of a `done` event the future completes with the given
1499 * [futureValue]. 1499 * [futureValue].
1500 */ 1500 */
1501 Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]); 1501 Future<E> asFuture<E>([E futureValue]);
1502 } 1502 }
1503 1503
1504 1504
1505 /** 1505 /**
1506 * An interface that abstracts creation or handling of [Stream] events. 1506 * An interface that abstracts creation or handling of [Stream] events.
1507 */ 1507 */
1508 abstract class EventSink<T> implements Sink<T> { 1508 abstract class EventSink<T> implements Sink<T> {
1509 /** Send a data event to a stream. */ 1509 /** Send a data event to a stream. */
1510 void add(T event); 1510 void add(T event);
1511 1511
(...skipping 315 matching lines...) Expand 10 before | Expand all | Expand 10 after
1827 class _ControllerEventSinkWrapper<T> implements EventSink<T> { 1827 class _ControllerEventSinkWrapper<T> implements EventSink<T> {
1828 EventSink _sink; 1828 EventSink _sink;
1829 _ControllerEventSinkWrapper(this._sink); 1829 _ControllerEventSinkWrapper(this._sink);
1830 1830
1831 void add(T data) { _sink.add(data); } 1831 void add(T data) { _sink.add(data); }
1832 void addError(error, [StackTrace stackTrace]) { 1832 void addError(error, [StackTrace stackTrace]) {
1833 _sink.addError(error, stackTrace); 1833 _sink.addError(error, stackTrace);
1834 } 1834 }
1835 void close() { _sink.close(); } 1835 void close() { _sink.close(); }
1836 } 1836 }
OLDNEW
« no previous file with comments | « sdk/lib/async/future_impl.dart ('k') | sdk/lib/async/stream_controller.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698