Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(466)

Side by Side Diff: sdk/lib/async/stream_controller.dart

Issue 18915008: Let StreamSubscription.cancel return a Future. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Mark failing tests. Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « sdk/lib/async/stream.dart ('k') | sdk/lib/async/stream_impl.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Controller for creating and adding events to a stream. 8 // Controller for creating and adding events to a stream.
9 // ------------------------------------------------------------------- 9 // -------------------------------------------------------------------
10 10
(...skipping 48 matching lines...) Expand 10 before | Expand all | Expand 10 after
59 * the event has returned. 59 * the event has returned.
60 * 60 *
61 * The controller will buffer all incoming events until the subscriber is 61 * The controller will buffer all incoming events until the subscriber is
62 * registered. 62 * registered.
63 * 63 *
64 * The [onPause] function is called when the stream becomes 64 * The [onPause] function is called when the stream becomes
65 * paused. [onResume] is called when the stream resumed. 65 * paused. [onResume] is called when the stream resumed.
66 * 66 *
67 * The [onListen] callback is called when the stream 67 * The [onListen] callback is called when the stream
68 * receives its listener and [onCancel] when the listener ends 68 * receives its listener and [onCancel] when the listener ends
69 * its subscription. 69 * its subscription. If [onCancel] needs to perform an asynchronous operation,
70 * [onCancel] should return a future that completes when the cancel operation
71 * is done.
70 * 72 *
71 * If the stream is canceled before the controller needs new data the 73 * If the stream is canceled before the controller needs new data the
72 * [onResume] call might not be executed. 74 * [onResume] call might not be executed.
73 */ 75 */
74 factory StreamController({void onListen(), 76 factory StreamController({void onListen(),
75 void onPause(), 77 void onPause(),
76 void onResume(), 78 void onResume(),
77 void onCancel(), 79 onCancel(),
78 bool sync: false}) { 80 bool sync: false}) {
79 if (onListen == null && onPause == null && 81 if (onListen == null && onPause == null &&
80 onResume == null && onCancel == null) { 82 onResume == null && onCancel == null) {
81 return sync 83 return sync
82 ? new _NoCallbackSyncStreamController/*<T>*/() 84 ? new _NoCallbackSyncStreamController/*<T>*/()
83 : new _NoCallbackAsyncStreamController/*<T>*/(); 85 : new _NoCallbackAsyncStreamController/*<T>*/();
84 } 86 }
85 return sync 87 return sync
86 ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel) 88 ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel)
87 : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel); 89 : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel);
(...skipping 77 matching lines...) Expand 10 before | Expand all | Expand 10 after
165 * allows. 167 * allows.
166 */ 168 */
167 void addError(Object error, [Object stackTrace]); 169 void addError(Object error, [Object stackTrace]);
168 } 170 }
169 171
170 172
171 abstract class _StreamControllerLifecycle<T> { 173 abstract class _StreamControllerLifecycle<T> {
172 StreamSubscription<T> _subscribe(bool cancelOnError); 174 StreamSubscription<T> _subscribe(bool cancelOnError);
173 void _recordPause(StreamSubscription<T> subscription) {} 175 void _recordPause(StreamSubscription<T> subscription) {}
174 void _recordResume(StreamSubscription<T> subscription) {} 176 void _recordResume(StreamSubscription<T> subscription) {}
175 void _recordCancel(StreamSubscription<T> subscription) {} 177 Future _recordCancel(StreamSubscription<T> subscription) => null;
176 } 178 }
177 179
178 /** 180 /**
179 * Default implementation of [StreamController]. 181 * Default implementation of [StreamController].
180 * 182 *
181 * Controls a stream that only supports a single controller. 183 * Controls a stream that only supports a single controller.
182 */ 184 */
183 abstract class _StreamController<T> implements StreamController<T>, 185 abstract class _StreamController<T> implements StreamController<T>,
184 _StreamControllerLifecycle<T>, 186 _StreamControllerLifecycle<T>,
185 _EventSink<T>, 187 _EventSink<T>,
(...skipping 270 matching lines...) Expand 10 before | Expand all | Expand 10 after
456 _varData = subscription; 458 _varData = subscription;
457 } 459 }
458 subscription._setPendingEvents(pendingEvents); 460 subscription._setPendingEvents(pendingEvents);
459 subscription._guardCallback(() { 461 subscription._guardCallback(() {
460 _runGuarded(_onListen); 462 _runGuarded(_onListen);
461 }); 463 });
462 464
463 return subscription; 465 return subscription;
464 } 466 }
465 467
466 void _recordCancel(StreamSubscription<T> subscription) { 468 Future _recordCancel(StreamSubscription<T> subscription) {
467 if (_isAddingStream) { 469 if (_isAddingStream) {
468 _StreamControllerAddStreamState addState = _varData; 470 _StreamControllerAddStreamState addState = _varData;
469 addState.cancel(); 471 addState.cancel();
470 } 472 }
471 _varData = null; 473 _varData = null;
472 _state = 474 _state =
473 (_state & ~(_STATE_SUBSCRIBED | _STATE_ADDSTREAM)) | _STATE_CANCELED; 475 (_state & ~(_STATE_SUBSCRIBED | _STATE_ADDSTREAM)) | _STATE_CANCELED;
474 _runGuarded(_onCancel); 476 void complete() {
475 if (_doneFuture != null && _doneFuture._mayComplete) { 477 if (_doneFuture != null && _doneFuture._mayComplete) {
476 _doneFuture._asyncComplete(null); 478 _doneFuture._asyncComplete(null);
479 }
477 } 480 }
481 Future future = _runGuarded(_onCancel);
482 if (future != null) {
483 future = future.whenComplete(complete);
484 } else {
485 complete();
486 }
487 return future;
478 } 488 }
479 489
480 void _recordPause(StreamSubscription<T> subscription) { 490 void _recordPause(StreamSubscription<T> subscription) {
481 if (_isAddingStream) { 491 if (_isAddingStream) {
482 _StreamControllerAddStreamState addState = _varData; 492 _StreamControllerAddStreamState addState = _varData;
483 addState.pause(); 493 addState.pause();
484 } 494 }
485 _runGuarded(_onPause); 495 _runGuarded(_onPause);
486 } 496 }
487 497
(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after
530 class _AsyncStreamController<T> extends _StreamController<T> 540 class _AsyncStreamController<T> extends _StreamController<T>
531 with _AsyncStreamControllerDispatch<T> { 541 with _AsyncStreamControllerDispatch<T> {
532 final _NotificationHandler _onListen; 542 final _NotificationHandler _onListen;
533 final _NotificationHandler _onPause; 543 final _NotificationHandler _onPause;
534 final _NotificationHandler _onResume; 544 final _NotificationHandler _onResume;
535 final _NotificationHandler _onCancel; 545 final _NotificationHandler _onCancel;
536 546
537 _AsyncStreamController(void this._onListen(), 547 _AsyncStreamController(void this._onListen(),
538 void this._onPause(), 548 void this._onPause(),
539 void this._onResume(), 549 void this._onResume(),
540 void this._onCancel()); 550 this._onCancel());
541 } 551 }
542 552
543 class _SyncStreamController<T> extends _StreamController<T> 553 class _SyncStreamController<T> extends _StreamController<T>
544 with _SyncStreamControllerDispatch<T> { 554 with _SyncStreamControllerDispatch<T> {
545 final _NotificationHandler _onListen; 555 final _NotificationHandler _onListen;
546 final _NotificationHandler _onPause; 556 final _NotificationHandler _onPause;
547 final _NotificationHandler _onResume; 557 final _NotificationHandler _onResume;
548 final _NotificationHandler _onCancel; 558 final _NotificationHandler _onCancel;
549 559
550 _SyncStreamController(void this._onListen(), 560 _SyncStreamController(void this._onListen(),
551 void this._onPause(), 561 void this._onPause(),
552 void this._onResume(), 562 void this._onResume(),
553 void this._onCancel()); 563 this._onCancel());
554 } 564 }
555 565
556 abstract class _NoCallbacks { 566 abstract class _NoCallbacks {
557 _NotificationHandler get _onListen => null; 567 _NotificationHandler get _onListen => null;
558 _NotificationHandler get _onPause => null; 568 _NotificationHandler get _onPause => null;
559 _NotificationHandler get _onResume => null; 569 _NotificationHandler get _onResume => null;
560 _NotificationHandler get _onCancel => null; 570 _NotificationHandler get _onCancel => null;
561 } 571 }
562 572
563 class _NoCallbackAsyncStreamController/*<T>*/ = _StreamController/*<T>*/ 573 class _NoCallbackAsyncStreamController/*<T>*/ = _StreamController/*<T>*/
564 with _AsyncStreamControllerDispatch/*<T>*/, _NoCallbacks; 574 with _AsyncStreamControllerDispatch/*<T>*/, _NoCallbacks;
565 575
566 class _NoCallbackSyncStreamController/*<T>*/ = _StreamController/*<T>*/ 576 class _NoCallbackSyncStreamController/*<T>*/ = _StreamController/*<T>*/
567 with _SyncStreamControllerDispatch/*<T>*/, _NoCallbacks; 577 with _SyncStreamControllerDispatch/*<T>*/, _NoCallbacks;
568 578
569 typedef void _NotificationHandler(); 579 typedef _NotificationHandler();
570 580
571 void _runGuarded(_NotificationHandler notificationHandler) { 581 Future _runGuarded(_NotificationHandler notificationHandler) {
572 if (notificationHandler == null) return; 582 if (notificationHandler == null) return null;
573 try { 583 try {
574 notificationHandler(); 584 var result = notificationHandler();
585 if (result is Future) return result;
586 return null;
575 } catch (e, s) { 587 } catch (e, s) {
576 Zone.current.handleUncaughtError(_asyncError(e, s), s); 588 Zone.current.handleUncaughtError(_asyncError(e, s), s);
577 } 589 }
578 } 590 }
579 591
580 class _ControllerStream<T> extends _StreamImpl<T> { 592 class _ControllerStream<T> extends _StreamImpl<T> {
581 _StreamControllerLifecycle<T> _controller; 593 _StreamControllerLifecycle<T> _controller;
582 594
583 _ControllerStream(this._controller); 595 _ControllerStream(this._controller);
584 596
(...skipping 13 matching lines...) Expand all
598 return identical(otherStream._controller, this._controller); 610 return identical(otherStream._controller, this._controller);
599 } 611 }
600 } 612 }
601 613
602 class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> { 614 class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> {
603 final _StreamControllerLifecycle<T> _controller; 615 final _StreamControllerLifecycle<T> _controller;
604 616
605 _ControllerSubscription(this._controller, bool cancelOnError) 617 _ControllerSubscription(this._controller, bool cancelOnError)
606 : super(cancelOnError); 618 : super(cancelOnError);
607 619
608 void _onCancel() { 620 Future _onCancel() {
609 _controller._recordCancel(this); 621 return _controller._recordCancel(this);
610 } 622 }
611 623
612 void _onPause() { 624 void _onPause() {
613 _controller._recordPause(this); 625 _controller._recordPause(this);
614 } 626 }
615 627
616 void _onResume() { 628 void _onResume() {
617 _controller._recordResume(this); 629 _controller._recordResume(this);
618 } 630 }
619 } 631 }
620 632
621 633
622 /** A class that exposes only the [StreamSink] interface of an object. */ 634 /** A class that exposes only the [StreamSink] interface of an object. */
623 class _StreamSinkWrapper<T> implements StreamSink<T> { 635 class _StreamSinkWrapper<T> implements StreamSink<T> {
624 final StreamSink _target; 636 final StreamSink _target;
625 _StreamSinkWrapper(this._target); 637 _StreamSinkWrapper(this._target);
626 void add(T data) { _target.add(data); } 638 void add(T data) { _target.add(data); }
627 void addError(Object error, [StackTrace stackTrace]) { 639 void addError(Object error, [StackTrace stackTrace]) {
628 _target.addError(error); 640 _target.addError(error);
629 } 641 }
630 Future close() => _target.close(); 642 Future close() => _target.close();
631 Future addStream(Stream<T> source) => _target.addStream(source); 643 Future addStream(Stream<T> source) => _target.addStream(source);
632 Future get done => _target.done; 644 Future get done => _target.done;
633 } 645 }
634 646
635 /** 647 /**
636 * Object containing the state used to handle [StreamController.addStream]. 648 * Object containing the state used to handle [StreamController.addStream].
637 */ 649 */
638 class _AddStreamState<T> { 650 class _AddStreamState<T> {
639 // [_FutureImpl] returned by call to addStream. 651 // [_Future] returned by call to addStream.
640 _Future addStreamFuture; 652 _Future addStreamFuture;
641 653
642 // Subscription on stream argument to addStream. 654 // Subscription on stream argument to addStream.
643 StreamSubscription addSubscription; 655 StreamSubscription addSubscription;
644 656
645 _AddStreamState(_EventSink<T> controller, Stream source) 657 _AddStreamState(_EventSink<T> controller, Stream source)
646 : addStreamFuture = new _Future(), 658 : addStreamFuture = new _Future(),
647 addSubscription = source.listen(controller._add, 659 addSubscription = source.listen(controller._add,
648 onError: controller._addError, 660 onError: controller._addError,
649 onDone: controller._close, 661 onDone: controller._close,
(...skipping 24 matching lines...) Expand all
674 var varData; 686 var varData;
675 687
676 _StreamControllerAddStreamState(_StreamController controller, 688 _StreamControllerAddStreamState(_StreamController controller,
677 this.varData, 689 this.varData,
678 Stream source) : super(controller, source) { 690 Stream source) : super(controller, source) {
679 if (controller.isPaused) { 691 if (controller.isPaused) {
680 addSubscription.pause(); 692 addSubscription.pause();
681 } 693 }
682 } 694 }
683 } 695 }
OLDNEW
« no previous file with comments | « sdk/lib/async/stream.dart ('k') | sdk/lib/async/stream_impl.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698