Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(619)

Side by Side Diff: sdk/lib/async/stream_controller.dart

Issue 18915008: Let StreamSubscription.cancel return a Future. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Rebase Created 7 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Controller for creating and adding events to a stream. 8 // Controller for creating and adding events to a stream.
9 // ------------------------------------------------------------------- 9 // -------------------------------------------------------------------
10 10
(...skipping 48 matching lines...) Expand 10 before | Expand all | Expand 10 after
59 * the event has returned. 59 * the event has returned.
60 * 60 *
61 * The controller will buffer all incoming events until the subscriber is 61 * The controller will buffer all incoming events until the subscriber is
62 * registered. 62 * registered.
63 * 63 *
64 * The [onPause] function is called when the stream becomes 64 * The [onPause] function is called when the stream becomes
65 * paused. [onResume] is called when the stream resumed. 65 * paused. [onResume] is called when the stream resumed.
66 * 66 *
67 * The [onListen] callback is called when the stream 67 * The [onListen] callback is called when the stream
68 * receives its listener and [onCancel] when the listener ends 68 * receives its listener and [onCancel] when the listener ends
69 * its subscription. 69 * its subscription.
Lasse Reichstein Nielsen 2013/07/17 07:28:46 The [onCancel] callback may return a value or [Fut
70 * 70 *
71 * If the stream is canceled before the controller needs new data the 71 * If the stream is canceled before the controller needs new data the
72 * [onResume] call might not be executed. 72 * [onResume] call might not be executed.
73 */ 73 */
74 factory StreamController({void onListen(), 74 factory StreamController({void onListen(),
75 void onPause(), 75 void onPause(),
76 void onResume(), 76 void onResume(),
77 void onCancel(), 77 onCancel(),
78 bool sync: false}) { 78 bool sync: false}) {
79 if (onListen == null && onPause == null && 79 if (onListen == null && onPause == null &&
80 onResume == null && onCancel == null) { 80 onResume == null && onCancel == null) {
81 return sync 81 return sync
82 ? new _NoCallbackSyncStreamController/*<T>*/() 82 ? new _NoCallbackSyncStreamController/*<T>*/()
83 : new _NoCallbackAsyncStreamController/*<T>*/(); 83 : new _NoCallbackAsyncStreamController/*<T>*/();
84 } 84 }
85 return sync 85 return sync
86 ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel) 86 ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel)
87 : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel); 87 : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel);
(...skipping 80 matching lines...) Expand 10 before | Expand all | Expand 10 after
168 } 168 }
169 169
170 170
171 abstract class _StreamControllerLifecycle<T> { 171 abstract class _StreamControllerLifecycle<T> {
172 StreamSubscription<T> _subscribe(void onData(T data), 172 StreamSubscription<T> _subscribe(void onData(T data),
173 void onError(Object error), 173 void onError(Object error),
174 void onDone(), 174 void onDone(),
175 bool cancelOnError); 175 bool cancelOnError);
176 void _recordPause(StreamSubscription<T> subscription) {} 176 void _recordPause(StreamSubscription<T> subscription) {}
177 void _recordResume(StreamSubscription<T> subscription) {} 177 void _recordResume(StreamSubscription<T> subscription) {}
178 void _recordCancel(StreamSubscription<T> subscription) {} 178 _FutureImpl _recordCancel(StreamSubscription<T> subscription) => null;
179 } 179 }
180 180
181 /** 181 /**
182 * Default implementation of [StreamController]. 182 * Default implementation of [StreamController].
183 * 183 *
184 * Controls a stream that only supports a single controller. 184 * Controls a stream that only supports a single controller.
185 */ 185 */
186 abstract class _StreamController<T> implements StreamController<T>, 186 abstract class _StreamController<T> implements StreamController<T>,
187 _StreamControllerLifecycle<T>, 187 _StreamControllerLifecycle<T>,
188 _EventSink<T>, 188 _EventSink<T>,
(...skipping 273 matching lines...) Expand 10 before | Expand all | Expand 10 after
462 _varData = subscription; 462 _varData = subscription;
463 } 463 }
464 subscription._setPendingEvents(pendingEvents); 464 subscription._setPendingEvents(pendingEvents);
465 subscription._guardCallback(() { 465 subscription._guardCallback(() {
466 _runGuarded(_onListen); 466 _runGuarded(_onListen);
467 }); 467 });
468 468
469 return subscription; 469 return subscription;
470 } 470 }
471 471
472 void _recordCancel(StreamSubscription<T> subscription) { 472 _FutureImpl _recordCancel(StreamSubscription<T> subscription) {
Lasse Reichstein Nielsen 2013/07/17 07:28:46 Why _FutureImpl if the user can provide a future t
473 if (_isAddingStream) { 473 if (_isAddingStream) {
474 _StreamControllerAddStreamState addState = _varData; 474 _StreamControllerAddStreamState addState = _varData;
475 addState.cancel(); 475 addState.cancel();
476 } 476 }
477 _varData = null; 477 _varData = null;
478 _state = 478 _state =
479 (_state & ~(_STATE_SUBSCRIBED | _STATE_ADDSTREAM)) | _STATE_CANCELED; 479 (_state & ~(_STATE_SUBSCRIBED | _STATE_ADDSTREAM)) | _STATE_CANCELED;
480 _runGuarded(_onCancel); 480 var future = _runGuardedFutureResult(_onCancel);
floitsch 2013/07/12 16:42:34 _runGuardedFutureResult may return `null`. Documen
Lasse Reichstein Nielsen 2013/07/17 07:28:46 I think we should not guarantee that there is no e
481 if (_doneFuture != null && _doneFuture._mayComplete) { 481 if (_doneFuture != null && _doneFuture._mayComplete) {
482 _doneFuture._asyncSetValue(null); 482 _doneFuture._asyncSetValue(null);
483 } 483 }
484 return future;
484 } 485 }
485 486
486 void _recordPause(StreamSubscription<T> subscription) { 487 void _recordPause(StreamSubscription<T> subscription) {
487 if (_isAddingStream) { 488 if (_isAddingStream) {
488 _StreamControllerAddStreamState addState = _varData; 489 _StreamControllerAddStreamState addState = _varData;
489 addState.pause(); 490 addState.pause();
490 } 491 }
491 _runGuarded(_onPause); 492 _runGuarded(_onPause);
492 } 493 }
493 494
(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after
536 class _AsyncStreamController<T> extends _StreamController<T> 537 class _AsyncStreamController<T> extends _StreamController<T>
537 with _AsyncStreamControllerDispatch<T> { 538 with _AsyncStreamControllerDispatch<T> {
538 final _NotificationHandler _onListen; 539 final _NotificationHandler _onListen;
539 final _NotificationHandler _onPause; 540 final _NotificationHandler _onPause;
540 final _NotificationHandler _onResume; 541 final _NotificationHandler _onResume;
541 final _NotificationHandler _onCancel; 542 final _NotificationHandler _onCancel;
542 543
543 _AsyncStreamController(void this._onListen(), 544 _AsyncStreamController(void this._onListen(),
544 void this._onPause(), 545 void this._onPause(),
545 void this._onResume(), 546 void this._onResume(),
546 void this._onCancel()); 547 this._onCancel());
547 } 548 }
548 549
549 class _SyncStreamController<T> extends _StreamController<T> 550 class _SyncStreamController<T> extends _StreamController<T>
550 with _SyncStreamControllerDispatch<T> { 551 with _SyncStreamControllerDispatch<T> {
551 final _NotificationHandler _onListen; 552 final _NotificationHandler _onListen;
552 final _NotificationHandler _onPause; 553 final _NotificationHandler _onPause;
553 final _NotificationHandler _onResume; 554 final _NotificationHandler _onResume;
554 final _NotificationHandler _onCancel; 555 final _NotificationHandler _onCancel;
555 556
556 _SyncStreamController(void this._onListen(), 557 _SyncStreamController(void this._onListen(),
557 void this._onPause(), 558 void this._onPause(),
558 void this._onResume(), 559 void this._onResume(),
559 void this._onCancel()); 560 this._onCancel());
560 } 561 }
561 562
562 abstract class _NoCallbacks { 563 abstract class _NoCallbacks {
563 _NotificationHandler get _onListen => null; 564 _NotificationHandler get _onListen => null;
564 _NotificationHandler get _onPause => null; 565 _NotificationHandler get _onPause => null;
565 _NotificationHandler get _onResume => null; 566 _NotificationHandler get _onResume => null;
566 _NotificationHandler get _onCancel => null; 567 _NotificationHandler get _onCancel => null;
567 } 568 }
568 569
569 typedef _NoCallbackAsyncStreamController/*<T>*/ = _StreamController/*<T>*/ 570 typedef _NoCallbackAsyncStreamController/*<T>*/ = _StreamController/*<T>*/
570 with _AsyncStreamControllerDispatch/*<T>*/, _NoCallbacks; 571 with _AsyncStreamControllerDispatch/*<T>*/, _NoCallbacks;
571 572
572 typedef _NoCallbackSyncStreamController/*<T>*/ = _StreamController/*<T>*/ 573 typedef _NoCallbackSyncStreamController/*<T>*/ = _StreamController/*<T>*/
573 with _SyncStreamControllerDispatch/*<T>*/, _NoCallbacks; 574 with _SyncStreamControllerDispatch/*<T>*/, _NoCallbacks;
574 575
575 typedef void _NotificationHandler(); 576 typedef void _NotificationHandler();
576 577
577 void _runGuarded(_NotificationHandler notificationHandler) { 578 void _runGuarded(_NotificationHandler notificationHandler) {
578 if (notificationHandler == null) return; 579 if (notificationHandler == null) return;
579 try { 580 try {
580 notificationHandler(); 581 notificationHandler();
581 } catch (e, s) { 582 } catch (e, s) {
582 _Zone.current.handleUncaughtError(_asyncError(e, s)); 583 _Zone.current.handleUncaughtError(_asyncError(e, s));
583 } 584 }
584 } 585 }
585 586
587 _FutureImpl _runGuardedFutureResult(_NotificationHandler notificationHandler) {
588 if (notificationHandler == null) return null;
589 try {
590 var value = notificationHandler();
591 if (value is _FutureImpl) {
592 return value;
593 } else {
Lasse Reichstein Nielsen 2013/07/17 07:28:46 else if (value is Future) { // You can't assume t
594 return new Future(() => value);
Lasse Reichstein Nielsen 2013/07/17 07:28:46 return new _FutureImpl.immediate(value);
595 }
596 } catch (e, s) {
597 _Zone.current.handleUncaughtError(_asyncError(e, s));
598 }
599 }
600
586 class _ControllerStream<T> extends _StreamImpl<T> { 601 class _ControllerStream<T> extends _StreamImpl<T> {
587 _StreamControllerLifecycle<T> _controller; 602 _StreamControllerLifecycle<T> _controller;
588 603
589 _ControllerStream(this._controller); 604 _ControllerStream(this._controller);
590 605
591 StreamSubscription<T> _createSubscription( 606 StreamSubscription<T> _createSubscription(
592 void onData(T data), 607 void onData(T data),
593 void onError(Object error), 608 void onError(Object error),
594 void onDone(), 609 void onDone(),
595 bool cancelOnError) => 610 bool cancelOnError) =>
(...skipping 15 matching lines...) Expand all
611 class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> { 626 class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> {
612 final _StreamControllerLifecycle<T> _controller; 627 final _StreamControllerLifecycle<T> _controller;
613 628
614 _ControllerSubscription(this._controller, 629 _ControllerSubscription(this._controller,
615 void onData(T data), 630 void onData(T data),
616 void onError(Object error), 631 void onError(Object error),
617 void onDone(), 632 void onDone(),
618 bool cancelOnError) 633 bool cancelOnError)
619 : super(onData, onError, onDone, cancelOnError); 634 : super(onData, onError, onDone, cancelOnError);
620 635
621 void _onCancel() { 636 _FutureImpl _onCancel() {
622 _controller._recordCancel(this); 637 return _controller._recordCancel(this);
623 } 638 }
624 639
625 void _onPause() { 640 void _onPause() {
626 _controller._recordPause(this); 641 _controller._recordPause(this);
627 } 642 }
628 643
629 void _onResume() { 644 void _onResume() {
630 _controller._recordResume(this); 645 _controller._recordResume(this);
631 } 646 }
632 } 647 }
(...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after
685 var varData; 700 var varData;
686 701
687 _StreamControllerAddStreamState(_StreamController controller, 702 _StreamControllerAddStreamState(_StreamController controller,
688 this.varData, 703 this.varData,
689 Stream source) : super(controller, source) { 704 Stream source) : super(controller, source) {
690 if (controller.isPaused) { 705 if (controller.isPaused) {
691 addSubscription.pause(); 706 addSubscription.pause();
692 } 707 }
693 } 708 }
694 } 709 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698