Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1806)

Side by Side Diff: sdk/lib/async/stream_controller.dart

Issue 18915008: Let StreamSubscription.cancel return a Future. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Remove dir stuff. Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Controller for creating and adding events to a stream. 8 // Controller for creating and adding events to a stream.
9 // ------------------------------------------------------------------- 9 // -------------------------------------------------------------------
10 10
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after
67 * The [onListen] callback is called when the stream 67 * The [onListen] callback is called when the stream
68 * receives its listener and [onCancel] when the listener ends 68 * receives its listener and [onCancel] when the listener ends
69 * its subscription. 69 * its subscription.
70 * 70 *
71 * If the stream is canceled before the controller needs new data the 71 * If the stream is canceled before the controller needs new data the
72 * [onResume] call might not be executed. 72 * [onResume] call might not be executed.
73 */ 73 */
74 factory StreamController({void onListen(), 74 factory StreamController({void onListen(),
75 void onPause(), 75 void onPause(),
76 void onResume(), 76 void onResume(),
77 void onCancel(), 77 onCancel(),
floitsch 2013/10/12 18:53:57 Needs comment.
Lasse Reichstein Nielsen 2013/10/14 11:32:33 Can the return type be Future? It's always ok to
Anders Johnsen 2013/10/16 11:52:21 Done.
Anders Johnsen 2013/10/16 11:52:21 Done.
78 bool sync: false}) { 78 bool sync: false}) {
79 if (onListen == null && onPause == null && 79 if (onListen == null && onPause == null &&
80 onResume == null && onCancel == null) { 80 onResume == null && onCancel == null) {
81 return sync 81 return sync
82 ? new _NoCallbackSyncStreamController/*<T>*/() 82 ? new _NoCallbackSyncStreamController/*<T>*/()
83 : new _NoCallbackAsyncStreamController/*<T>*/(); 83 : new _NoCallbackAsyncStreamController/*<T>*/();
84 } 84 }
85 return sync 85 return sync
86 ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel) 86 ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel)
87 : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel); 87 : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel);
(...skipping 80 matching lines...) Expand 10 before | Expand all | Expand 10 after
168 } 168 }
169 169
170 170
171 abstract class _StreamControllerLifecycle<T> { 171 abstract class _StreamControllerLifecycle<T> {
172 StreamSubscription<T> _subscribe(void onData(T data), 172 StreamSubscription<T> _subscribe(void onData(T data),
173 void onError(Object error), 173 void onError(Object error),
174 void onDone(), 174 void onDone(),
175 bool cancelOnError); 175 bool cancelOnError);
176 void _recordPause(StreamSubscription<T> subscription) {} 176 void _recordPause(StreamSubscription<T> subscription) {}
177 void _recordResume(StreamSubscription<T> subscription) {} 177 void _recordResume(StreamSubscription<T> subscription) {}
178 void _recordCancel(StreamSubscription<T> subscription) {} 178 Future _recordCancel(StreamSubscription<T> subscription) => null;
179 } 179 }
180 180
181 /** 181 /**
182 * Default implementation of [StreamController]. 182 * Default implementation of [StreamController].
183 * 183 *
184 * Controls a stream that only supports a single controller. 184 * Controls a stream that only supports a single controller.
185 */ 185 */
186 abstract class _StreamController<T> implements StreamController<T>, 186 abstract class _StreamController<T> implements StreamController<T>,
187 _StreamControllerLifecycle<T>, 187 _StreamControllerLifecycle<T>,
188 _EventSink<T>, 188 _EventSink<T>,
(...skipping 273 matching lines...) Expand 10 before | Expand all | Expand 10 after
462 _varData = subscription; 462 _varData = subscription;
463 } 463 }
464 subscription._setPendingEvents(pendingEvents); 464 subscription._setPendingEvents(pendingEvents);
465 subscription._guardCallback(() { 465 subscription._guardCallback(() {
466 _runGuarded(_onListen); 466 _runGuarded(_onListen);
467 }); 467 });
468 468
469 return subscription; 469 return subscription;
470 } 470 }
471 471
472 void _recordCancel(StreamSubscription<T> subscription) { 472 Future _recordCancel(StreamSubscription<T> subscription) {
473 if (_isAddingStream) { 473 if (_isAddingStream) {
474 _StreamControllerAddStreamState addState = _varData; 474 _StreamControllerAddStreamState addState = _varData;
475 addState.cancel(); 475 addState.cancel();
476 } 476 }
477 _varData = null; 477 _varData = null;
478 _state = 478 _state =
479 (_state & ~(_STATE_SUBSCRIBED | _STATE_ADDSTREAM)) | _STATE_CANCELED; 479 (_state & ~(_STATE_SUBSCRIBED | _STATE_ADDSTREAM)) | _STATE_CANCELED;
480 _runGuarded(_onCancel); 480 var future = _runGuarded(_onCancel);
floitsch 2013/10/12 18:53:57 Use type.
Anders Johnsen 2013/10/16 11:52:21 Done.
481 if (_doneFuture != null && _doneFuture._mayComplete) { 481 if (_doneFuture != null && _doneFuture._mayComplete) {
482 _doneFuture._asyncComplete(null); 482 _doneFuture._asyncComplete(null);
483 } 483 }
484 return future;
484 } 485 }
485 486
486 void _recordPause(StreamSubscription<T> subscription) { 487 void _recordPause(StreamSubscription<T> subscription) {
487 if (_isAddingStream) { 488 if (_isAddingStream) {
488 _StreamControllerAddStreamState addState = _varData; 489 _StreamControllerAddStreamState addState = _varData;
489 addState.pause(); 490 addState.pause();
490 } 491 }
491 _runGuarded(_onPause); 492 _runGuarded(_onPause);
492 } 493 }
493 494
(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after
536 class _AsyncStreamController<T> extends _StreamController<T> 537 class _AsyncStreamController<T> extends _StreamController<T>
537 with _AsyncStreamControllerDispatch<T> { 538 with _AsyncStreamControllerDispatch<T> {
538 final _NotificationHandler _onListen; 539 final _NotificationHandler _onListen;
539 final _NotificationHandler _onPause; 540 final _NotificationHandler _onPause;
540 final _NotificationHandler _onResume; 541 final _NotificationHandler _onResume;
541 final _NotificationHandler _onCancel; 542 final _NotificationHandler _onCancel;
542 543
543 _AsyncStreamController(void this._onListen(), 544 _AsyncStreamController(void this._onListen(),
544 void this._onPause(), 545 void this._onPause(),
545 void this._onResume(), 546 void this._onResume(),
546 void this._onCancel()); 547 this._onCancel());
547 } 548 }
548 549
549 class _SyncStreamController<T> extends _StreamController<T> 550 class _SyncStreamController<T> extends _StreamController<T>
550 with _SyncStreamControllerDispatch<T> { 551 with _SyncStreamControllerDispatch<T> {
551 final _NotificationHandler _onListen; 552 final _NotificationHandler _onListen;
552 final _NotificationHandler _onPause; 553 final _NotificationHandler _onPause;
553 final _NotificationHandler _onResume; 554 final _NotificationHandler _onResume;
554 final _NotificationHandler _onCancel; 555 final _NotificationHandler _onCancel;
555 556
556 _SyncStreamController(void this._onListen(), 557 _SyncStreamController(void this._onListen(),
557 void this._onPause(), 558 void this._onPause(),
558 void this._onResume(), 559 void this._onResume(),
559 void this._onCancel()); 560 this._onCancel());
560 } 561 }
561 562
562 abstract class _NoCallbacks { 563 abstract class _NoCallbacks {
563 _NotificationHandler get _onListen => null; 564 _NotificationHandler get _onListen => null;
564 _NotificationHandler get _onPause => null; 565 _NotificationHandler get _onPause => null;
565 _NotificationHandler get _onResume => null; 566 _NotificationHandler get _onResume => null;
566 _NotificationHandler get _onCancel => null; 567 _NotificationHandler get _onCancel => null;
567 } 568 }
568 569
569 typedef _NoCallbackAsyncStreamController/*<T>*/ = _StreamController/*<T>*/ 570 typedef _NoCallbackAsyncStreamController/*<T>*/ = _StreamController/*<T>*/
570 with _AsyncStreamControllerDispatch/*<T>*/, _NoCallbacks; 571 with _AsyncStreamControllerDispatch/*<T>*/, _NoCallbacks;
571 572
572 typedef _NoCallbackSyncStreamController/*<T>*/ = _StreamController/*<T>*/ 573 typedef _NoCallbackSyncStreamController/*<T>*/ = _StreamController/*<T>*/
573 with _SyncStreamControllerDispatch/*<T>*/, _NoCallbacks; 574 with _SyncStreamControllerDispatch/*<T>*/, _NoCallbacks;
574 575
575 typedef void _NotificationHandler(); 576 typedef void _NotificationHandler();
576 577
577 void _runGuarded(_NotificationHandler notificationHandler) { 578 _runGuarded(_NotificationHandler notificationHandler) {
floitsch 2013/10/12 18:53:57 "Future" as return type ?
Anders Johnsen 2013/10/16 11:52:21 Done.
578 if (notificationHandler == null) return; 579 if (notificationHandler == null) return null;
579 try { 580 try {
Lasse Reichstein Nielsen 2013/10/14 11:32:33 Is this not just Zone.current.runGuarded(notifica
Anders Johnsen 2013/10/16 11:52:21 Not sure, Florian?
floitsch 2013/10/16 14:43:44 Zone.current.runGuarded(notificationHandler) sound
580 notificationHandler(); 581 return notificationHandler();
581 } catch (e, s) { 582 } catch (e, s) {
582 Zone.current.handleUncaughtError(_asyncError(e, s)); 583 Zone.current.handleUncaughtError(_asyncError(e, s));
583 } 584 }
584 } 585 }
585 586
586 class _ControllerStream<T> extends _StreamImpl<T> { 587 class _ControllerStream<T> extends _StreamImpl<T> {
587 _StreamControllerLifecycle<T> _controller; 588 _StreamControllerLifecycle<T> _controller;
588 589
589 _ControllerStream(this._controller); 590 _ControllerStream(this._controller);
590 591
(...skipping 21 matching lines...) Expand all
612 class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> { 613 class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> {
613 final _StreamControllerLifecycle<T> _controller; 614 final _StreamControllerLifecycle<T> _controller;
614 615
615 _ControllerSubscription(this._controller, 616 _ControllerSubscription(this._controller,
616 void onData(T data), 617 void onData(T data),
617 void onError(Object error), 618 void onError(Object error),
618 void onDone(), 619 void onDone(),
619 bool cancelOnError) 620 bool cancelOnError)
620 : super(onData, onError, onDone, cancelOnError); 621 : super(onData, onError, onDone, cancelOnError);
621 622
622 void _onCancel() { 623 Future _onCancel() {
623 _controller._recordCancel(this); 624 return _controller._recordCancel(this);
624 } 625 }
625 626
626 void _onPause() { 627 void _onPause() {
627 _controller._recordPause(this); 628 _controller._recordPause(this);
628 } 629 }
629 630
630 void _onResume() { 631 void _onResume() {
631 _controller._recordResume(this); 632 _controller._recordResume(this);
632 } 633 }
633 } 634 }
634 635
635 636
636 /** A class that exposes only the [StreamSink] interface of an object. */ 637 /** A class that exposes only the [StreamSink] interface of an object. */
637 class _StreamSinkWrapper<T> implements StreamSink<T> { 638 class _StreamSinkWrapper<T> implements StreamSink<T> {
638 final StreamSink _target; 639 final StreamSink _target;
639 _StreamSinkWrapper(this._target); 640 _StreamSinkWrapper(this._target);
640 void add(T data) { _target.add(data); } 641 void add(T data) { _target.add(data); }
641 void addError(Object error) { _target.addError(error); } 642 void addError(Object error) { _target.addError(error); }
642 Future close() => _target.close(); 643 Future close() => _target.close();
643 Future addStream(Stream<T> source) => _target.addStream(source); 644 Future addStream(Stream<T> source) => _target.addStream(source);
644 Future get done => _target.done; 645 Future get done => _target.done;
645 } 646 }
646 647
647 /** 648 /**
648 * Object containing the state used to handle [StreamController.addStream]. 649 * Object containing the state used to handle [StreamController.addStream].
649 */ 650 */
650 class _AddStreamState<T> { 651 class _AddStreamState<T> {
651 // [_FutureImpl] returned by call to addStream. 652 // [_Future] returned by call to addStream.
652 _Future addStreamFuture; 653 _Future addStreamFuture;
653 654
654 // Subscription on stream argument to addStream. 655 // Subscription on stream argument to addStream.
655 StreamSubscription addSubscription; 656 StreamSubscription addSubscription;
656 657
657 _AddStreamState(_EventSink<T> controller, Stream source) 658 _AddStreamState(_EventSink<T> controller, Stream source)
658 : addStreamFuture = new _Future(), 659 : addStreamFuture = new _Future(),
659 addSubscription = source.listen(controller._add, 660 addSubscription = source.listen(controller._add,
660 onError: controller._addError, 661 onError: controller._addError,
661 onDone: controller._close, 662 onDone: controller._close,
(...skipping 24 matching lines...) Expand all
686 var varData; 687 var varData;
687 688
688 _StreamControllerAddStreamState(_StreamController controller, 689 _StreamControllerAddStreamState(_StreamController controller,
689 this.varData, 690 this.varData,
690 Stream source) : super(controller, source) { 691 Stream source) : super(controller, source) {
691 if (controller.isPaused) { 692 if (controller.isPaused) {
692 addSubscription.pause(); 693 addSubscription.pause();
693 } 694 }
694 } 695 }
695 } 696 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698