Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(119)

Side by Side Diff: sdk/lib/async/stream_impl.dart

Issue 2754013002: Format all dart: library files (Closed)
Patch Set: Format all dart: library files Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « sdk/lib/async/stream_controller.dart ('k') | sdk/lib/async/stream_pipe.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 /** Abstract and private interface for a place to put events. */ 7 /** Abstract and private interface for a place to put events. */
8 abstract class _EventSink<T> { 8 abstract class _EventSink<T> {
9 void _add(T data); 9 void _add(T data);
10 void _addError(Object error, StackTrace stackTrace); 10 void _addError(Object error, StackTrace stackTrace);
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after
42 * * [_close]: Request to close the stream. 42 * * [_close]: Request to close the stream.
43 * * [_onCancel]: Called when the subscription will provide no more events, 43 * * [_onCancel]: Called when the subscription will provide no more events,
44 * either due to being actively canceled, or after sending a done event. 44 * either due to being actively canceled, or after sending a done event.
45 * * [_onPause]: Called when the subscription wants the event source to pause. 45 * * [_onPause]: Called when the subscription wants the event source to pause.
46 * * [_onResume]: Called when allowing new events after a pause. 46 * * [_onResume]: Called when allowing new events after a pause.
47 * 47 *
48 * The user should not add new events when the subscription requests a paused, 48 * The user should not add new events when the subscription requests a paused,
49 * but if it happens anyway, the subscription will enqueue the events just as 49 * but if it happens anyway, the subscription will enqueue the events just as
50 * when new events arrive while still firing an old event. 50 * when new events arrive while still firing an old event.
51 */ 51 */
52 class _BufferingStreamSubscription<T> implements StreamSubscription<T>, 52 class _BufferingStreamSubscription<T>
53 _EventSink<T>, 53 implements StreamSubscription<T>, _EventSink<T>, _EventDispatch<T> {
54 _EventDispatch<T> {
55 /** The `cancelOnError` flag from the `listen` call. */ 54 /** The `cancelOnError` flag from the `listen` call. */
56 static const int _STATE_CANCEL_ON_ERROR = 1; 55 static const int _STATE_CANCEL_ON_ERROR = 1;
57 /** 56 /**
58 * Whether the "done" event has been received. 57 * Whether the "done" event has been received.
59 * No further events are accepted after this. 58 * No further events are accepted after this.
60 */ 59 */
61 static const int _STATE_CLOSED = 2; 60 static const int _STATE_CLOSED = 2;
62 /** 61 /**
63 * Set if the input has been asked not to send events. 62 * Set if the input has been asked not to send events.
64 * 63 *
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after
100 /** The future [_onCancel] may return. */ 99 /** The future [_onCancel] may return. */
101 Future _cancelFuture; 100 Future _cancelFuture;
102 101
103 /** 102 /**
104 * Queue of pending events. 103 * Queue of pending events.
105 * 104 *
106 * Is created when necessary, or set in constructor for preconfigured events. 105 * Is created when necessary, or set in constructor for preconfigured events.
107 */ 106 */
108 _PendingEvents<T> _pending; 107 _PendingEvents<T> _pending;
109 108
110 _BufferingStreamSubscription(void onData(T data), 109 _BufferingStreamSubscription(
111 Function onError, 110 void onData(T data), Function onError, void onDone(), bool cancelOnError)
112 void onDone(),
113 bool cancelOnError)
114 : _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0) { 111 : _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0) {
115 this.onData(onData); 112 this.onData(onData);
116 this.onError(onError); 113 this.onError(onError);
117 this.onDone(onDone); 114 this.onDone(onDone);
118 } 115 }
119 116
120 /** 117 /**
121 * Sets the subscription's pending events object. 118 * Sets the subscription's pending events object.
122 * 119 *
123 * This can only be done once. The pending events object is used for the 120 * This can only be done once. The pending events object is used for the
(...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after
190 if (!_isCanceled) { 187 if (!_isCanceled) {
191 _cancel(); 188 _cancel();
192 } 189 }
193 return _cancelFuture ?? Future._nullFuture; 190 return _cancelFuture ?? Future._nullFuture;
194 } 191 }
195 192
196 Future<E> asFuture<E>([E futureValue]) { 193 Future<E> asFuture<E>([E futureValue]) {
197 _Future<E> result = new _Future<E>(); 194 _Future<E> result = new _Future<E>();
198 195
199 // Overwrite the onDone and onError handlers. 196 // Overwrite the onDone and onError handlers.
200 _onDone = () { result._complete(futureValue); }; 197 _onDone = () {
198 result._complete(futureValue);
199 };
201 _onError = (error, stackTrace) { 200 _onError = (error, stackTrace) {
202 Future cancelFuture = cancel(); 201 Future cancelFuture = cancel();
203 if (!identical(cancelFuture, Future._nullFuture)) { 202 if (!identical(cancelFuture, Future._nullFuture)) {
204 cancelFuture.whenComplete(() { 203 cancelFuture.whenComplete(() {
205 result._completeError(error, stackTrace); 204 result._completeError(error, stackTrace);
206 }); 205 });
207 } else { 206 } else {
208 result._completeError(error, stackTrace); 207 result._completeError(error, stackTrace);
209 } 208 }
210 }; 209 };
(...skipping 46 matching lines...) Expand 10 before | Expand all | Expand 10 after
257 if (_canFire) { 256 if (_canFire) {
258 _sendData(data); 257 _sendData(data);
259 } else { 258 } else {
260 _addPending(new _DelayedData<T>(data)); 259 _addPending(new _DelayedData<T>(data));
261 } 260 }
262 } 261 }
263 262
264 void _addError(Object error, StackTrace stackTrace) { 263 void _addError(Object error, StackTrace stackTrace) {
265 if (_isCanceled) return; 264 if (_isCanceled) return;
266 if (_canFire) { 265 if (_canFire) {
267 _sendError(error, stackTrace); // Reports cancel after sending. 266 _sendError(error, stackTrace); // Reports cancel after sending.
268 } else { 267 } else {
269 _addPending(new _DelayedError(error, stackTrace)); 268 _addPending(new _DelayedError(error, stackTrace));
270 } 269 }
271 } 270 }
272 271
273 void _close() { 272 void _close() {
274 assert(!_isClosed); 273 assert(!_isClosed);
275 if (_isCanceled) return; 274 if (_isCanceled) return;
276 _state |= _STATE_CLOSED; 275 _state |= _STATE_CLOSED;
277 if (_canFire) { 276 if (_canFire) {
(...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after
339 assert(!_inCallback); 338 assert(!_inCallback);
340 bool wasInputPaused = _isInputPaused; 339 bool wasInputPaused = _isInputPaused;
341 340
342 void sendError() { 341 void sendError() {
343 // If the subscription has been canceled while waiting for the cancel 342 // If the subscription has been canceled while waiting for the cancel
344 // future to finish we must not report the error. 343 // future to finish we must not report the error.
345 if (_isCanceled && !_waitsForCancel) return; 344 if (_isCanceled && !_waitsForCancel) return;
346 _state |= _STATE_IN_CALLBACK; 345 _state |= _STATE_IN_CALLBACK;
347 if (_onError is ZoneBinaryCallback<dynamic, Object, StackTrace>) { 346 if (_onError is ZoneBinaryCallback<dynamic, Object, StackTrace>) {
348 ZoneBinaryCallback<dynamic, Object, StackTrace> errorCallback = _onError 347 ZoneBinaryCallback<dynamic, Object, StackTrace> errorCallback = _onError
349 as Object /*=ZoneBinaryCallback<dynamic, Object, StackTrace>*/; 348 as Object/*=ZoneBinaryCallback<dynamic, Object, StackTrace>*/;
350 _zone.runBinaryGuarded(errorCallback, error, stackTrace); 349 _zone.runBinaryGuarded(errorCallback, error, stackTrace);
351 } else { 350 } else {
352 _zone.runUnaryGuarded<dynamic, dynamic>( 351 _zone.runUnaryGuarded<dynamic, dynamic>(
353 _onError as Object /*=ZoneUnaryCallback<dynamic, dynamic>*/, error); 352 _onError as Object/*=ZoneUnaryCallback<dynamic, dynamic>*/, error);
354 } 353 }
355 _state &= ~_STATE_IN_CALLBACK; 354 _state &= ~_STATE_IN_CALLBACK;
356 } 355 }
357 356
358 if (_cancelOnError) { 357 if (_cancelOnError) {
359 _state |= _STATE_WAIT_FOR_CANCEL; 358 _state |= _STATE_WAIT_FOR_CANCEL;
360 _cancel(); 359 _cancel();
361 if (_cancelFuture is Future && 360 if (_cancelFuture is Future &&
362 !identical(_cancelFuture, Future._nullFuture)) { 361 !identical(_cancelFuture, Future._nullFuture)) {
363 _cancelFuture.whenComplete(sendError); 362 _cancelFuture.whenComplete(sendError);
(...skipping 90 matching lines...) Expand 10 before | Expand all | Expand 10 after
454 } 453 }
455 454
456 // ------------------------------------------------------------------- 455 // -------------------------------------------------------------------
457 // Common base class for single and multi-subscription streams. 456 // Common base class for single and multi-subscription streams.
458 // ------------------------------------------------------------------- 457 // -------------------------------------------------------------------
459 abstract class _StreamImpl<T> extends Stream<T> { 458 abstract class _StreamImpl<T> extends Stream<T> {
460 // ------------------------------------------------------------------ 459 // ------------------------------------------------------------------
461 // Stream interface. 460 // Stream interface.
462 461
463 StreamSubscription<T> listen(void onData(T data), 462 StreamSubscription<T> listen(void onData(T data),
464 { Function onError, 463 {Function onError, void onDone(), bool cancelOnError}) {
465 void onDone(),
466 bool cancelOnError }) {
467 cancelOnError = identical(true, cancelOnError); 464 cancelOnError = identical(true, cancelOnError);
468 StreamSubscription<T> subscription = 465 StreamSubscription<T> subscription =
469 _createSubscription(onData, onError, onDone, cancelOnError); 466 _createSubscription(onData, onError, onDone, cancelOnError);
470 _onListen(subscription); 467 _onListen(subscription);
471 return subscription; 468 return subscription;
472 } 469 }
473 470
474 // ------------------------------------------------------------------- 471 // -------------------------------------------------------------------
475 /** Create a subscription object. Called by [subcribe]. */ 472 /** Create a subscription object. Called by [subcribe]. */
476 StreamSubscription<T> _createSubscription( 473 StreamSubscription<T> _createSubscription(void onData(T data),
477 void onData(T data), 474 Function onError, void onDone(), bool cancelOnError) {
478 Function onError, 475 return new _BufferingStreamSubscription<T>(
479 void onDone(), 476 onData, onError, onDone, cancelOnError);
480 bool cancelOnError) {
481 return new _BufferingStreamSubscription<T>(onData, onError, onDone,
482 cancelOnError);
483 } 477 }
484 478
485 /** Hook called when the subscription has been created. */ 479 /** Hook called when the subscription has been created. */
486 void _onListen(StreamSubscription subscription) {} 480 void _onListen(StreamSubscription subscription) {}
487 } 481 }
488 482
489 typedef _PendingEvents<T> _EventGenerator<T>(); 483 typedef _PendingEvents<T> _EventGenerator<T>();
490 484
491 /** Stream that generates its own events. */ 485 /** Stream that generates its own events. */
492 class _GeneratedStreamImpl<T> extends _StreamImpl<T> { 486 class _GeneratedStreamImpl<T> extends _StreamImpl<T> {
493 final _EventGenerator<T> _pending; 487 final _EventGenerator<T> _pending;
494 bool _isUsed = false; 488 bool _isUsed = false;
495 /** 489 /**
496 * Initializes the stream to have only the events provided by a 490 * Initializes the stream to have only the events provided by a
497 * [_PendingEvents]. 491 * [_PendingEvents].
498 * 492 *
499 * A new [_PendingEvents] must be generated for each listen. 493 * A new [_PendingEvents] must be generated for each listen.
500 */ 494 */
501 _GeneratedStreamImpl(this._pending); 495 _GeneratedStreamImpl(this._pending);
502 496
503 StreamSubscription<T> _createSubscription( 497 StreamSubscription<T> _createSubscription(void onData(T data),
504 void onData(T data), 498 Function onError, void onDone(), bool cancelOnError) {
505 Function onError,
506 void onDone(),
507 bool cancelOnError) {
508 if (_isUsed) throw new StateError("Stream has already been listened to."); 499 if (_isUsed) throw new StateError("Stream has already been listened to.");
509 _isUsed = true; 500 _isUsed = true;
510 return new _BufferingStreamSubscription<T>( 501 return new _BufferingStreamSubscription<T>(
511 onData, onError, onDone, cancelOnError).._setPendingEvents(_pending()); 502 onData, onError, onDone, cancelOnError).._setPendingEvents(_pending());
512 } 503 }
513 } 504 }
514 505
515
516 /** Pending events object that gets its events from an [Iterable]. */ 506 /** Pending events object that gets its events from an [Iterable]. */
517 class _IterablePendingEvents<T> extends _PendingEvents<T> { 507 class _IterablePendingEvents<T> extends _PendingEvents<T> {
518 // The iterator providing data for data events. 508 // The iterator providing data for data events.
519 // Set to null when iteration has completed. 509 // Set to null when iteration has completed.
520 Iterator<T> _iterator; 510 Iterator<T> _iterator;
521 511
522 _IterablePendingEvents(Iterable<T> data) : _iterator = data.iterator; 512 _IterablePendingEvents(Iterable<T> data) : _iterator = data.iterator;
523 513
524 bool get isEmpty => _iterator == null; 514 bool get isEmpty => _iterator == null;
525 515
(...skipping 21 matching lines...) Expand all
547 dispatch._sendDone(); 537 dispatch._sendDone();
548 } 538 }
549 } 539 }
550 540
551 void clear() { 541 void clear() {
552 if (isScheduled) cancelSchedule(); 542 if (isScheduled) cancelSchedule();
553 _iterator = null; 543 _iterator = null;
554 } 544 }
555 } 545 }
556 546
557
558 // Internal helpers. 547 // Internal helpers.
559 548
560 // Types of the different handlers on a stream. Types used to type fields. 549 // Types of the different handlers on a stream. Types used to type fields.
561 typedef void _DataHandler<T>(T value); 550 typedef void _DataHandler<T>(T value);
562 typedef void _DoneHandler(); 551 typedef void _DoneHandler();
563 552
564
565 /** Default data handler, does nothing. */ 553 /** Default data handler, does nothing. */
566 void _nullDataHandler(var value) {} 554 void _nullDataHandler(var value) {}
567 555
568 /** Default error handler, reports the error to the current zone's handler. */ 556 /** Default error handler, reports the error to the current zone's handler. */
569 void _nullErrorHandler(error, [StackTrace stackTrace]) { 557 void _nullErrorHandler(error, [StackTrace stackTrace]) {
570 Zone.current.handleUncaughtError(error, stackTrace); 558 Zone.current.handleUncaughtError(error, stackTrace);
571 } 559 }
572 560
573 /** Default done handler, does nothing. */ 561 /** Default done handler, does nothing. */
574 void _nullDoneHandler() {} 562 void _nullDoneHandler() {}
575 563
576
577 /** A delayed event on a buffering stream subscription. */ 564 /** A delayed event on a buffering stream subscription. */
578 abstract class _DelayedEvent<T> { 565 abstract class _DelayedEvent<T> {
579 /** Added as a linked list on the [StreamController]. */ 566 /** Added as a linked list on the [StreamController]. */
580 _DelayedEvent next; 567 _DelayedEvent next;
581 /** Execute the delayed event on the [StreamController]. */ 568 /** Execute the delayed event on the [StreamController]. */
582 void perform(_EventDispatch<T> dispatch); 569 void perform(_EventDispatch<T> dispatch);
583 } 570 }
584 571
585 /** A delayed data event. */ 572 /** A delayed data event. */
586 class _DelayedData<T> extends _DelayedEvent<T> { 573 class _DelayedData<T> extends _DelayedEvent<T> {
(...skipping 84 matching lines...) Expand 10 before | Expand all | Expand 10 after
671 void cancelSchedule() { 658 void cancelSchedule() {
672 if (isScheduled) _state = _STATE_CANCELED; 659 if (isScheduled) _state = _STATE_CANCELED;
673 } 660 }
674 661
675 void handleNext(_EventDispatch<T> dispatch); 662 void handleNext(_EventDispatch<T> dispatch);
676 663
677 /** Throw away any pending events and cancel scheduled events. */ 664 /** Throw away any pending events and cancel scheduled events. */
678 void clear(); 665 void clear();
679 } 666 }
680 667
681
682 /** Class holding pending events for a [_StreamImpl]. */ 668 /** Class holding pending events for a [_StreamImpl]. */
683 class _StreamImplEvents<T> extends _PendingEvents<T> { 669 class _StreamImplEvents<T> extends _PendingEvents<T> {
684 /// Single linked list of [_DelayedEvent] objects. 670 /// Single linked list of [_DelayedEvent] objects.
685 _DelayedEvent firstPendingEvent = null; 671 _DelayedEvent firstPendingEvent = null;
672
686 /// Last element in the list of pending events. New events are added after it. 673 /// Last element in the list of pending events. New events are added after it.
687 _DelayedEvent lastPendingEvent = null; 674 _DelayedEvent lastPendingEvent = null;
688 675
689 bool get isEmpty => lastPendingEvent == null; 676 bool get isEmpty => lastPendingEvent == null;
690 677
691 void add(_DelayedEvent event) { 678 void add(_DelayedEvent event) {
692 if (lastPendingEvent == null) { 679 if (lastPendingEvent == null) {
693 firstPendingEvent = lastPendingEvent = event; 680 firstPendingEvent = lastPendingEvent = event;
694 } else { 681 } else {
695 lastPendingEvent = lastPendingEvent.next = event; 682 lastPendingEvent = lastPendingEvent.next = event;
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after
735 bool get isPaused => _state >= _PAUSED; 722 bool get isPaused => _state >= _PAUSED;
736 723
737 void _schedule() { 724 void _schedule() {
738 if (_isScheduled) return; 725 if (_isScheduled) return;
739 _zone.scheduleMicrotask(_sendDone); 726 _zone.scheduleMicrotask(_sendDone);
740 _state |= _SCHEDULED; 727 _state |= _SCHEDULED;
741 } 728 }
742 729
743 void onData(void handleData(T data)) {} 730 void onData(void handleData(T data)) {}
744 void onError(Function handleError) {} 731 void onError(Function handleError) {}
745 void onDone(void handleDone()) { _onDone = handleDone; } 732 void onDone(void handleDone()) {
733 _onDone = handleDone;
734 }
746 735
747 void pause([Future resumeSignal]) { 736 void pause([Future resumeSignal]) {
748 _state += _PAUSED; 737 _state += _PAUSED;
749 if (resumeSignal != null) resumeSignal.whenComplete(resume); 738 if (resumeSignal != null) resumeSignal.whenComplete(resume);
750 } 739 }
751 740
752 void resume() { 741 void resume() {
753 if (isPaused) { 742 if (isPaused) {
754 _state -= _PAUSED; 743 _state -= _PAUSED;
755 if (!isPaused && !_isSent) { 744 if (!isPaused && !_isSent) {
756 _schedule(); 745 _schedule();
757 } 746 }
758 } 747 }
759 } 748 }
760 749
761 Future cancel() => Future._nullFuture; 750 Future cancel() => Future._nullFuture;
762 751
763 Future<E> asFuture<E>([E futureValue]) { 752 Future<E> asFuture<E>([E futureValue]) {
764 _Future<E> result = new _Future<E>(); 753 _Future<E> result = new _Future<E>();
765 _onDone = () { result._completeWithValue(null); }; 754 _onDone = () {
755 result._completeWithValue(null);
756 };
766 return result; 757 return result;
767 } 758 }
768 759
769 void _sendDone() { 760 void _sendDone() {
770 _state &= ~_SCHEDULED; 761 _state &= ~_SCHEDULED;
771 if (isPaused) return; 762 if (isPaused) return;
772 _state |= _DONE_SENT; 763 _state |= _DONE_SENT;
773 if (_onDone != null) _zone.runGuarded(_onDone); 764 if (_onDone != null) _zone.runGuarded(_onDone);
774 } 765 }
775 } 766 }
776 767
777 class _AsBroadcastStream<T> extends Stream<T> { 768 class _AsBroadcastStream<T> extends Stream<T> {
778 final Stream<T> _source; 769 final Stream<T> _source;
779 final _BroadcastCallback<T> _onListenHandler; 770 final _BroadcastCallback<T> _onListenHandler;
780 final _BroadcastCallback<T> _onCancelHandler; 771 final _BroadcastCallback<T> _onCancelHandler;
781 final Zone _zone; 772 final Zone _zone;
782 773
783 _AsBroadcastStreamController<T> _controller; 774 _AsBroadcastStreamController<T> _controller;
784 StreamSubscription<T> _subscription; 775 StreamSubscription<T> _subscription;
785 776
786 _AsBroadcastStream(this._source, 777 _AsBroadcastStream(
787 void onListenHandler(StreamSubscription<T> subscription), 778 this._source,
788 void onCancelHandler(StreamSubscription<T> subscription)) 779 void onListenHandler(StreamSubscription<T> subscription),
780 void onCancelHandler(StreamSubscription<T> subscription))
789 // TODO(floitsch): the return type should be void and should be 781 // TODO(floitsch): the return type should be void and should be
790 // inferred. 782 // inferred.
791 : _onListenHandler = Zone.current.registerUnaryCallback 783 : _onListenHandler = Zone.current
792 <dynamic, StreamSubscription<T>>(onListenHandler), 784 .registerUnaryCallback<dynamic, StreamSubscription<T>>(
793 _onCancelHandler = Zone.current.registerUnaryCallback 785 onListenHandler),
794 <dynamic, StreamSubscription<T>>(onCancelHandler), 786 _onCancelHandler = Zone.current
787 .registerUnaryCallback<dynamic, StreamSubscription<T>>(
788 onCancelHandler),
795 _zone = Zone.current { 789 _zone = Zone.current {
796 _controller = new _AsBroadcastStreamController<T>(_onListen, _onCancel); 790 _controller = new _AsBroadcastStreamController<T>(_onListen, _onCancel);
797 } 791 }
798 792
799 bool get isBroadcast => true; 793 bool get isBroadcast => true;
800 794
801 StreamSubscription<T> listen(void onData(T data), 795 StreamSubscription<T> listen(void onData(T data),
802 { Function onError, 796 {Function onError, void onDone(), bool cancelOnError}) {
803 void onDone(),
804 bool cancelOnError}) {
805 if (_controller == null || _controller.isClosed) { 797 if (_controller == null || _controller.isClosed) {
806 // Return a dummy subscription backed by nothing, since 798 // Return a dummy subscription backed by nothing, since
807 // it will only ever send one done event. 799 // it will only ever send one done event.
808 return new _DoneStreamSubscription<T>(onDone); 800 return new _DoneStreamSubscription<T>(onDone);
809 } 801 }
810 if (_subscription == null) { 802 if (_subscription == null) {
811 _subscription = _source.listen(_controller.add, 803 _subscription = _source.listen(_controller.add,
812 onError: _controller.addError, 804 onError: _controller.addError, onDone: _controller.close);
813 onDone: _controller.close);
814 } 805 }
815 cancelOnError = identical(true, cancelOnError); 806 cancelOnError = identical(true, cancelOnError);
816 return _controller._subscribe(onData, onError, onDone, cancelOnError); 807 return _controller._subscribe(onData, onError, onDone, cancelOnError);
817 } 808 }
818 809
819 void _onCancel() { 810 void _onCancel() {
820 bool shutdown = (_controller == null) || _controller.isClosed; 811 bool shutdown = (_controller == null) || _controller.isClosed;
821 if (_onCancelHandler != null) { 812 if (_onCancelHandler != null) {
822 _zone.runUnary( 813 _zone.runUnary(
823 _onCancelHandler, new _BroadcastSubscriptionWrapper<T>(this)); 814 _onCancelHandler, new _BroadcastSubscriptionWrapper<T>(this));
(...skipping 12 matching lines...) Expand all
836 _onListenHandler, new _BroadcastSubscriptionWrapper<T>(this)); 827 _onListenHandler, new _BroadcastSubscriptionWrapper<T>(this));
837 } 828 }
838 } 829 }
839 830
840 // Methods called from _BroadcastSubscriptionWrapper. 831 // Methods called from _BroadcastSubscriptionWrapper.
841 void _cancelSubscription() { 832 void _cancelSubscription() {
842 if (_subscription == null) return; 833 if (_subscription == null) return;
843 // Called by [_controller] when it has no subscribers left. 834 // Called by [_controller] when it has no subscribers left.
844 StreamSubscription subscription = _subscription; 835 StreamSubscription subscription = _subscription;
845 _subscription = null; 836 _subscription = null;
846 _controller = null; // Marks the stream as no longer listenable. 837 _controller = null; // Marks the stream as no longer listenable.
847 subscription.cancel(); 838 subscription.cancel();
848 } 839 }
849 840
850 void _pauseSubscription(Future resumeSignal) { 841 void _pauseSubscription(Future resumeSignal) {
851 if (_subscription == null) return; 842 if (_subscription == null) return;
852 _subscription.pause(resumeSignal); 843 _subscription.pause(resumeSignal);
853 } 844 }
854 845
855 void _resumeSubscription() { 846 void _resumeSubscription() {
856 if (_subscription == null) return; 847 if (_subscription == null) return;
(...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after
902 bool get isPaused { 893 bool get isPaused {
903 return _stream._isSubscriptionPaused; 894 return _stream._isSubscriptionPaused;
904 } 895 }
905 896
906 Future<E> asFuture<E>([E futureValue]) { 897 Future<E> asFuture<E>([E futureValue]) {
907 throw new UnsupportedError( 898 throw new UnsupportedError(
908 "Cannot change handlers of asBroadcastStream source subscription."); 899 "Cannot change handlers of asBroadcastStream source subscription.");
909 } 900 }
910 } 901 }
911 902
912
913 /** 903 /**
914 * Simple implementation of [StreamIterator]. 904 * Simple implementation of [StreamIterator].
915 * 905 *
916 * Pauses the stream between calls to [moveNext]. 906 * Pauses the stream between calls to [moveNext].
917 */ 907 */
918 class _StreamIterator<T> implements StreamIterator<T> { 908 class _StreamIterator<T> implements StreamIterator<T> {
919 // The stream iterator is always in one of four states. 909 // The stream iterator is always in one of four states.
920 // The value of the [_stateData] field depends on the state. 910 // The value of the [_stateData] field depends on the state.
921 // 911 //
922 // When `_subscription == null` and `_stateData != null`: 912 // When `_subscription == null` and `_stateData != null`:
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after
962 /// Whether the iterator is between calls to `moveNext`. 952 /// Whether the iterator is between calls to `moveNext`.
963 /// This will usually cause the [_subscription] to be paused, but as an 953 /// This will usually cause the [_subscription] to be paused, but as an
964 /// optimization, we only pause after the [moveNext] future has been 954 /// optimization, we only pause after the [moveNext] future has been
965 /// completed. 955 /// completed.
966 bool _isPaused = false; 956 bool _isPaused = false;
967 957
968 _StreamIterator(final Stream<T> stream) : _stateData = stream; 958 _StreamIterator(final Stream<T> stream) : _stateData = stream;
969 959
970 T get current { 960 T get current {
971 if (_subscription != null && _isPaused) { 961 if (_subscription != null && _isPaused) {
972 return _stateData as Object /*=T*/; 962 return _stateData as Object/*=T*/;
973 } 963 }
974 return null; 964 return null;
975 } 965 }
976 966
977 Future<bool> moveNext() { 967 Future<bool> moveNext() {
978 if (_subscription != null) { 968 if (_subscription != null) {
979 if (_isPaused) { 969 if (_isPaused) {
980 var future = new _Future<bool>(); 970 var future = new _Future<bool>();
981 _stateData = future; 971 _stateData = future;
982 _isPaused = false; 972 _isPaused = false;
983 _subscription.resume(); 973 _subscription.resume();
984 return future; 974 return future;
985 } 975 }
986 throw new StateError("Already waiting for next."); 976 throw new StateError("Already waiting for next.");
987 } 977 }
988 return _initializeOrDone(); 978 return _initializeOrDone();
989 } 979 }
990 980
991 /// Called if there is no active subscription when [moveNext] is called. 981 /// Called if there is no active subscription when [moveNext] is called.
992 /// 982 ///
993 /// Either starts listening on the stream if this is the first call to 983 /// Either starts listening on the stream if this is the first call to
994 /// [moveNext], or returns a `false` future because the stream has already 984 /// [moveNext], or returns a `false` future because the stream has already
995 /// ended. 985 /// ended.
996 Future<bool> _initializeOrDone() { 986 Future<bool> _initializeOrDone() {
997 assert(_subscription == null); 987 assert(_subscription == null);
998 var stateData = _stateData; 988 var stateData = _stateData;
999 if (stateData != null) { 989 if (stateData != null) {
1000 Stream<T> stream = stateData as Object /*=Stream<T>*/; 990 Stream<T> stream = stateData as Object/*=Stream<T>*/;
1001 _subscription = stream.listen( 991 _subscription = stream.listen(_onData,
1002 _onData, onError: _onError, onDone: _onDone, cancelOnError: true); 992 onError: _onError, onDone: _onDone, cancelOnError: true);
1003 var future = new _Future<bool>(); 993 var future = new _Future<bool>();
1004 _stateData = future; 994 _stateData = future;
1005 return future; 995 return future;
1006 } 996 }
1007 return new _Future<bool>.immediate(false); 997 return new _Future<bool>.immediate(false);
1008 } 998 }
1009 999
1010 Future cancel() { 1000 Future cancel() {
1011 StreamSubscription<T> subscription = _subscription; 1001 StreamSubscription<T> subscription = _subscription;
1012 Object stateData = _stateData; 1002 Object stateData = _stateData;
1013 _stateData = null; 1003 _stateData = null;
1014 if (subscription != null) { 1004 if (subscription != null) {
1015 _subscription = null; 1005 _subscription = null;
1016 if (!_isPaused) { 1006 if (!_isPaused) {
1017 _Future<bool> future = stateData as Object /*=_Future<bool>*/; 1007 _Future<bool> future = stateData as Object/*=_Future<bool>*/;
1018 future._asyncComplete(false); 1008 future._asyncComplete(false);
1019 } 1009 }
1020 return subscription.cancel(); 1010 return subscription.cancel();
1021 } 1011 }
1022 return Future._nullFuture; 1012 return Future._nullFuture;
1023 } 1013 }
1024 1014
1025 void _onData(T data) { 1015 void _onData(T data) {
1026 assert(_subscription != null && !_isPaused); 1016 assert(_subscription != null && !_isPaused);
1027 _Future<bool> moveNextFuture = _stateData as Object /*=_Future<bool>*/; 1017 _Future<bool> moveNextFuture = _stateData as Object/*=_Future<bool>*/;
1028 _stateData = data; 1018 _stateData = data;
1029 _isPaused = true; 1019 _isPaused = true;
1030 moveNextFuture._complete(true); 1020 moveNextFuture._complete(true);
1031 if (_subscription != null && _isPaused) _subscription.pause(); 1021 if (_subscription != null && _isPaused) _subscription.pause();
1032 } 1022 }
1033 1023
1034 void _onError(Object error, [StackTrace stackTrace]) { 1024 void _onError(Object error, [StackTrace stackTrace]) {
1035 assert(_subscription != null && !_isPaused); 1025 assert(_subscription != null && !_isPaused);
1036 _Future<bool> moveNextFuture = _stateData as Object /*=_Future<bool>*/; 1026 _Future<bool> moveNextFuture = _stateData as Object/*=_Future<bool>*/;
1037 _subscription = null; 1027 _subscription = null;
1038 _stateData = null; 1028 _stateData = null;
1039 moveNextFuture._completeError(error, stackTrace); 1029 moveNextFuture._completeError(error, stackTrace);
1040 } 1030 }
1041 1031
1042 void _onDone() { 1032 void _onDone() {
1043 assert(_subscription != null && !_isPaused); 1033 assert(_subscription != null && !_isPaused);
1044 _Future<bool> moveNextFuture = _stateData as Object /*=_Future<bool>*/; 1034 _Future<bool> moveNextFuture = _stateData as Object/*=_Future<bool>*/;
1045 _subscription = null; 1035 _subscription = null;
1046 _stateData = null; 1036 _stateData = null;
1047 moveNextFuture._complete(false); 1037 moveNextFuture._complete(false);
1048 } 1038 }
1049 } 1039 }
1050 1040
1051 /** An empty broadcast stream, sending a done event as soon as possible. */ 1041 /** An empty broadcast stream, sending a done event as soon as possible. */
1052 class _EmptyStream<T> extends Stream<T> { 1042 class _EmptyStream<T> extends Stream<T> {
1053 const _EmptyStream() : super._internal(); 1043 const _EmptyStream() : super._internal();
1054 bool get isBroadcast => true; 1044 bool get isBroadcast => true;
1055 StreamSubscription<T> listen(void onData(T data), 1045 StreamSubscription<T> listen(void onData(T data),
1056 {Function onError, 1046 {Function onError, void onDone(), bool cancelOnError}) {
1057 void onDone(),
1058 bool cancelOnError}) {
1059 return new _DoneStreamSubscription<T>(onDone); 1047 return new _DoneStreamSubscription<T>(onDone);
1060 } 1048 }
1061 } 1049 }
OLDNEW
« no previous file with comments | « sdk/lib/async/stream_controller.dart ('k') | sdk/lib/async/stream_pipe.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698