Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(39)

Side by Side Diff: sdk/lib/async/stream_controller.dart

Issue 2754013002: Format all dart: library files (Closed)
Patch Set: Format all dart: library files Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « sdk/lib/async/stream.dart ('k') | sdk/lib/async/stream_impl.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Controller for creating and adding events to a stream. 8 // Controller for creating and adding events to a stream.
9 // ------------------------------------------------------------------- 9 // -------------------------------------------------------------------
10 10
(...skipping 73 matching lines...) Expand 10 before | Expand all | Expand 10 after
84 * 84 *
85 * The [onListen] callback is called when the stream 85 * The [onListen] callback is called when the stream
86 * receives its listener and [onCancel] when the listener ends 86 * receives its listener and [onCancel] when the listener ends
87 * its subscription. If [onCancel] needs to perform an asynchronous operation, 87 * its subscription. If [onCancel] needs to perform an asynchronous operation,
88 * [onCancel] should return a future that completes when the cancel operation 88 * [onCancel] should return a future that completes when the cancel operation
89 * is done. 89 * is done.
90 * 90 *
91 * If the stream is canceled before the controller needs new data the 91 * If the stream is canceled before the controller needs new data the
92 * [onResume] call might not be executed. 92 * [onResume] call might not be executed.
93 */ 93 */
94 factory StreamController({void onListen(), 94 factory StreamController(
95 void onPause(), 95 {void onListen(),
96 void onResume(), 96 void onPause(),
97 onCancel(), 97 void onResume(),
98 bool sync: false}) { 98 onCancel(),
99 bool sync: false}) {
99 return sync 100 return sync
100 ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel) 101 ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel)
101 : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel); 102 : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel);
102 } 103 }
103 104
104 /** 105 /**
105 * A controller where [stream] can be listened to more than once. 106 * A controller where [stream] can be listened to more than once.
106 * 107 *
107 * The [Stream] returned by [stream] is a broadcast stream. 108 * The [Stream] returned by [stream] is a broadcast stream.
108 * It can be listened to more than once. 109 * It can be listened to more than once.
109 * 110 *
110 * A Stream should be inert until a subscriber starts listening on it (using 111 * A Stream should be inert until a subscriber starts listening on it (using
111 * the [onListen] callback to start producing events). Streams should not 112 * the [onListen] callback to start producing events). Streams should not
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after
145 * A listener must be subscribed both when the event is initiated 146 * A listener must be subscribed both when the event is initiated
146 * (that is, when [add] is called) 147 * (that is, when [add] is called)
147 * and when the event is later delivered, 148 * and when the event is later delivered,
148 * in order to receive the event. 149 * in order to receive the event.
149 * 150 *
150 * The [onListen] callback is called when the first listener is subscribed, 151 * The [onListen] callback is called when the first listener is subscribed,
151 * and the [onCancel] is called when there are no longer any active listeners. 152 * and the [onCancel] is called when there are no longer any active listeners.
152 * If a listener is added again later, after the [onCancel] was called, 153 * If a listener is added again later, after the [onCancel] was called,
153 * the [onListen] will be called again. 154 * the [onListen] will be called again.
154 */ 155 */
155 factory StreamController.broadcast({void onListen(), 156 factory StreamController.broadcast(
156 void onCancel(), 157 {void onListen(), void onCancel(), bool sync: false}) {
157 bool sync: false}) {
158 return sync 158 return sync
159 ? new _SyncBroadcastStreamController<T>(onListen, onCancel) 159 ? new _SyncBroadcastStreamController<T>(onListen, onCancel)
160 : new _AsyncBroadcastStreamController<T>(onListen, onCancel); 160 : new _AsyncBroadcastStreamController<T>(onListen, onCancel);
161 } 161 }
162 162
163 /** 163 /**
164 * The callback which is called when the stream is listened to. 164 * The callback which is called when the stream is listened to.
165 * 165 *
166 * May be set to `null`, in which case no callback will happen. 166 * May be set to `null`, in which case no callback will happen.
167 */ 167 */
(...skipping 87 matching lines...) Expand 10 before | Expand all | Expand 10 after
255 * returned future. 255 * returned future.
256 * 256 *
257 * If [cancelOnError] is true, only the first error on [source] is 257 * If [cancelOnError] is true, only the first error on [source] is
258 * forwarded to the controller's stream, and the `addStream` ends 258 * forwarded to the controller's stream, and the `addStream` ends
259 * after this. If [cancelOnError] is false, all errors are forwarded 259 * after this. If [cancelOnError] is false, all errors are forwarded
260 * and only a done event will end the `addStream`. 260 * and only a done event will end the `addStream`.
261 */ 261 */
262 Future addStream(Stream<T> source, {bool cancelOnError: true}); 262 Future addStream(Stream<T> source, {bool cancelOnError: true});
263 } 263 }
264 264
265
266 /** 265 /**
267 * A stream controller that delivers its events synchronously. 266 * A stream controller that delivers its events synchronously.
268 * 267 *
269 * A synchronous stream controller is intended for cases where 268 * A synchronous stream controller is intended for cases where
270 * an already asynchronous event triggers an event on a stream. 269 * an already asynchronous event triggers an event on a stream.
271 * 270 *
272 * Instead of adding the event to the stream in a later microtask, 271 * Instead of adding the event to the stream in a later microtask,
273 * causing extra latency, the event is instead fired immediately by the 272 * causing extra latency, the event is instead fired immediately by the
274 * synchronous stream controller, as if the stream event was 273 * synchronous stream controller, as if the stream event was
275 * the current event or microtask. 274 * the current event or microtask.
(...skipping 79 matching lines...) Expand 10 before | Expand all | Expand 10 after
355 * Closes the controller's stream. 354 * Closes the controller's stream.
356 * 355 *
357 * As [StreamController.close], but must not be called while an event is 356 * As [StreamController.close], but must not be called while an event is
358 * being added by [add], [addError] or [close]. 357 * being added by [add], [addError] or [close].
359 */ 358 */
360 Future close(); 359 Future close();
361 } 360 }
362 361
363 abstract class _StreamControllerLifecycle<T> { 362 abstract class _StreamControllerLifecycle<T> {
364 StreamSubscription<T> _subscribe( 363 StreamSubscription<T> _subscribe(
365 void onData(T data), 364 void onData(T data), Function onError, void onDone(), bool cancelOnError);
366 Function onError,
367 void onDone(),
368 bool cancelOnError);
369 void _recordPause(StreamSubscription<T> subscription) {} 365 void _recordPause(StreamSubscription<T> subscription) {}
370 void _recordResume(StreamSubscription<T> subscription) {} 366 void _recordResume(StreamSubscription<T> subscription) {}
371 Future _recordCancel(StreamSubscription<T> subscription) => null; 367 Future _recordCancel(StreamSubscription<T> subscription) => null;
372 } 368 }
373 369
374 /** 370 /**
375 * Default implementation of [StreamController]. 371 * Default implementation of [StreamController].
376 * 372 *
377 * Controls a stream that only supports a single controller. 373 * Controls a stream that only supports a single controller.
378 */ 374 */
379 abstract class _StreamController<T> implements StreamController<T>, 375 abstract class _StreamController<T>
380 _StreamControllerLifecycle<T>, 376 implements
381 _EventSink<T>, 377 StreamController<T>,
382 _EventDispatch<T> { 378 _StreamControllerLifecycle<T>,
379 _EventSink<T>,
380 _EventDispatch<T> {
383 // The states are bit-flags. More than one can be set at a time. 381 // The states are bit-flags. More than one can be set at a time.
384 // 382 //
385 // The "subscription state" goes through the states: 383 // The "subscription state" goes through the states:
386 // initial -> subscribed -> canceled. 384 // initial -> subscribed -> canceled.
387 // These are mutually exclusive. 385 // These are mutually exclusive.
388 // The "closed" state records whether the [close] method has been called 386 // The "closed" state records whether the [close] method has been called
389 // on the controller. This can be done at any time. If done before 387 // on the controller. This can be done at any time. If done before
390 // subscription, the done event is queued. If done after cancel, the done 388 // subscription, the done event is queued. If done after cancel, the done
391 // event is ignored (just as any other event after a cancel). 389 // event is ignored (just as any other event after a cancel).
392 390
(...skipping 50 matching lines...) Expand 10 before | Expand all | Expand 10 after
443 // TODO(lrn): Could this be stored in the varData field too, if it's not 441 // TODO(lrn): Could this be stored in the varData field too, if it's not
444 // accessed until the call to "close"? Then we need to special case if it's 442 // accessed until the call to "close"? Then we need to special case if it's
445 // accessed earlier, or if close is called before subscribing. 443 // accessed earlier, or if close is called before subscribing.
446 _Future _doneFuture; 444 _Future _doneFuture;
447 445
448 ControllerCallback onListen; 446 ControllerCallback onListen;
449 ControllerCallback onPause; 447 ControllerCallback onPause;
450 ControllerCallback onResume; 448 ControllerCallback onResume;
451 ControllerCancelCallback onCancel; 449 ControllerCancelCallback onCancel;
452 450
453 _StreamController(this.onListen, 451 _StreamController(this.onListen, this.onPause, this.onResume, this.onCancel);
454 this.onPause,
455 this.onResume,
456 this.onCancel);
457 452
458 // Return a new stream every time. The streams are equal, but not identical. 453 // Return a new stream every time. The streams are equal, but not identical.
459 Stream<T> get stream => new _ControllerStream<T>(this); 454 Stream<T> get stream => new _ControllerStream<T>(this);
460 455
461 /** 456 /**
462 * Returns a view of this object that only exposes the [StreamSink] interface. 457 * Returns a view of this object that only exposes the [StreamSink] interface.
463 */ 458 */
464 StreamSink<T> get sink => new _StreamSinkWrapper<T>(this); 459 StreamSink<T> get sink => new _StreamSinkWrapper<T>(this);
465 460
466 /** 461 /**
467 * Whether a listener has existed and been canceled. 462 * Whether a listener has existed and been canceled.
468 * 463 *
469 * After this, adding more events will be ignored. 464 * After this, adding more events will be ignored.
470 */ 465 */
471 bool get _isCanceled => (_state & _STATE_CANCELED) != 0; 466 bool get _isCanceled => (_state & _STATE_CANCELED) != 0;
472 467
473 /** Whether there is an active listener. */ 468 /** Whether there is an active listener. */
474 bool get hasListener => (_state & _STATE_SUBSCRIBED) != 0; 469 bool get hasListener => (_state & _STATE_SUBSCRIBED) != 0;
475 470
476 /** Whether there has not been a listener yet. */ 471 /** Whether there has not been a listener yet. */
477 bool get _isInitialState => 472 bool get _isInitialState =>
478 (_state & _STATE_SUBSCRIPTION_MASK) == _STATE_INITIAL; 473 (_state & _STATE_SUBSCRIPTION_MASK) == _STATE_INITIAL;
479 474
480 bool get isClosed => (_state & _STATE_CLOSED) != 0; 475 bool get isClosed => (_state & _STATE_CLOSED) != 0;
481 476
482 bool get isPaused => hasListener ? _subscription._isInputPaused 477 bool get isPaused =>
483 : !_isCanceled; 478 hasListener ? _subscription._isInputPaused : !_isCanceled;
484 479
485 bool get _isAddingStream => (_state & _STATE_ADDSTREAM) != 0; 480 bool get _isAddingStream => (_state & _STATE_ADDSTREAM) != 0;
486 481
487 /** New events may not be added after close, or during addStream. */ 482 /** New events may not be added after close, or during addStream. */
488 bool get _mayAddEvent => (_state < _STATE_CLOSED); 483 bool get _mayAddEvent => (_state < _STATE_CLOSED);
489 484
490 // Returns the pending events. 485 // Returns the pending events.
491 // Pending events are events added before a subscription exists. 486 // Pending events are events added before a subscription exists.
492 // They are added to the subscription when it is created. 487 // They are added to the subscription when it is created.
493 // Pending events, if any, are kept in the _varData field until the 488 // Pending events, if any, are kept in the _varData field until the
494 // stream is listened to. 489 // stream is listened to.
495 // While adding a stream, pending events are moved into the 490 // While adding a stream, pending events are moved into the
496 // state object to allow the state object to use the _varData field. 491 // state object to allow the state object to use the _varData field.
497 _PendingEvents<T> get _pendingEvents { 492 _PendingEvents<T> get _pendingEvents {
498 assert(_isInitialState); 493 assert(_isInitialState);
499 if (!_isAddingStream) { 494 if (!_isAddingStream) {
500 return _varData as Object /*=_PendingEvents<T>*/; 495 return _varData as Object/*=_PendingEvents<T>*/;
501 } 496 }
502 _StreamControllerAddStreamState<T> state = 497 _StreamControllerAddStreamState<T> state =
503 _varData as Object /*=_StreamControllerAddStreamState<T>*/; 498 _varData as Object/*=_StreamControllerAddStreamState<T>*/;
504 return state.varData as Object /*=_PendingEvents<T>*/; 499 return state.varData as Object/*=_PendingEvents<T>*/;
505 } 500 }
506 501
507 // Returns the pending events, and creates the object if necessary. 502 // Returns the pending events, and creates the object if necessary.
508 _StreamImplEvents<T> _ensurePendingEvents() { 503 _StreamImplEvents<T> _ensurePendingEvents() {
509 assert(_isInitialState); 504 assert(_isInitialState);
510 if (!_isAddingStream) { 505 if (!_isAddingStream) {
511 if (_varData == null) _varData = new _StreamImplEvents<T>(); 506 if (_varData == null) _varData = new _StreamImplEvents<T>();
512 return _varData as Object /*=_StreamImplEvents<T>*/; 507 return _varData as Object/*=_StreamImplEvents<T>*/;
513 } 508 }
514 _StreamControllerAddStreamState<T> state = 509 _StreamControllerAddStreamState<T> state =
515 _varData as Object /*=_StreamControllerAddStreamState<T>*/; 510 _varData as Object/*=_StreamControllerAddStreamState<T>*/;
516 if (state.varData == null) state.varData = new _StreamImplEvents<T>(); 511 if (state.varData == null) state.varData = new _StreamImplEvents<T>();
517 return state.varData as Object /*=_StreamImplEvents<T>*/; 512 return state.varData as Object/*=_StreamImplEvents<T>*/;
518 } 513 }
519 514
520 // Get the current subscription. 515 // Get the current subscription.
521 // If we are adding a stream, the subscription is moved into the state 516 // If we are adding a stream, the subscription is moved into the state
522 // object to allow the state object to use the _varData field. 517 // object to allow the state object to use the _varData field.
523 _ControllerSubscription<T> get _subscription { 518 _ControllerSubscription<T> get _subscription {
524 assert(hasListener); 519 assert(hasListener);
525 if (_isAddingStream) { 520 if (_isAddingStream) {
526 _StreamControllerAddStreamState<T> addState = 521 _StreamControllerAddStreamState<T> addState =
527 _varData as Object /*=_StreamControllerAddStreamState<T>*/; 522 _varData as Object/*=_StreamControllerAddStreamState<T>*/;
528 return addState.varData as Object /*=_ControllerSubscription<T>*/; 523 return addState.varData as Object/*=_ControllerSubscription<T>*/;
529 } 524 }
530 return _varData as Object /*=_ControllerSubscription<T>*/; 525 return _varData as Object/*=_ControllerSubscription<T>*/;
531 } 526 }
532 527
533 /** 528 /**
534 * Creates an error describing why an event cannot be added. 529 * Creates an error describing why an event cannot be added.
535 * 530 *
536 * The reason, and therefore the error message, depends on the current state. 531 * The reason, and therefore the error message, depends on the current state.
537 */ 532 */
538 Error _badEventState() { 533 Error _badEventState() {
539 if (isClosed) { 534 if (isClosed) {
540 return new StateError("Cannot add event after closing"); 535 return new StateError("Cannot add event after closing");
541 } 536 }
542 assert(_isAddingStream); 537 assert(_isAddingStream);
543 return new StateError("Cannot add event while adding a stream"); 538 return new StateError("Cannot add event while adding a stream");
544 } 539 }
545 540
546 // StreamSink interface. 541 // StreamSink interface.
547 Future addStream(Stream<T> source, {bool cancelOnError: true}) { 542 Future addStream(Stream<T> source, {bool cancelOnError: true}) {
548 if (!_mayAddEvent) throw _badEventState(); 543 if (!_mayAddEvent) throw _badEventState();
549 if (_isCanceled) return new _Future.immediate(null); 544 if (_isCanceled) return new _Future.immediate(null);
550 _StreamControllerAddStreamState<T> addState = 545 _StreamControllerAddStreamState<T> addState =
551 new _StreamControllerAddStreamState<T>(this, 546 new _StreamControllerAddStreamState<T>(
552 _varData, 547 this, _varData, source, cancelOnError);
553 source,
554 cancelOnError);
555 _varData = addState; 548 _varData = addState;
556 _state |= _STATE_ADDSTREAM; 549 _state |= _STATE_ADDSTREAM;
557 return addState.addStreamFuture; 550 return addState.addStreamFuture;
558 } 551 }
559 552
560 /** 553 /**
561 * Returns a future that is completed when the stream is done 554 * Returns a future that is completed when the stream is done
562 * processing events. 555 * processing events.
563 * 556 *
564 * This happens either when the done event has been sent, or if the 557 * This happens either when the done event has been sent, or if the
(...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after
643 _sendError(error, stackTrace); 636 _sendError(error, stackTrace);
644 } else if (_isInitialState) { 637 } else if (_isInitialState) {
645 _ensurePendingEvents().add(new _DelayedError(error, stackTrace)); 638 _ensurePendingEvents().add(new _DelayedError(error, stackTrace));
646 } 639 }
647 } 640 }
648 641
649 void _close() { 642 void _close() {
650 // End of addStream stream. 643 // End of addStream stream.
651 assert(_isAddingStream); 644 assert(_isAddingStream);
652 _StreamControllerAddStreamState<T> addState = 645 _StreamControllerAddStreamState<T> addState =
653 _varData as Object /*=_StreamControllerAddStreamState<T>*/; 646 _varData as Object/*=_StreamControllerAddStreamState<T>*/;
654 _varData = addState.varData; 647 _varData = addState.varData;
655 _state &= ~_STATE_ADDSTREAM; 648 _state &= ~_STATE_ADDSTREAM;
656 addState.complete(); 649 addState.complete();
657 } 650 }
658 651
659 // _StreamControllerLifeCycle interface 652 // _StreamControllerLifeCycle interface
660 653
661 StreamSubscription<T> _subscribe( 654 StreamSubscription<T> _subscribe(void onData(T data), Function onError,
662 void onData(T data), 655 void onDone(), bool cancelOnError) {
663 Function onError,
664 void onDone(),
665 bool cancelOnError) {
666 if (!_isInitialState) { 656 if (!_isInitialState) {
667 throw new StateError("Stream has already been listened to."); 657 throw new StateError("Stream has already been listened to.");
668 } 658 }
669 _ControllerSubscription<T> subscription = 659 _ControllerSubscription<T> subscription = new _ControllerSubscription<T>(
670 new _ControllerSubscription<T>(this, onData, onError, onDone, 660 this, onData, onError, onDone, cancelOnError);
671 cancelOnError);
672 661
673 _PendingEvents<T> pendingEvents = _pendingEvents; 662 _PendingEvents<T> pendingEvents = _pendingEvents;
674 _state |= _STATE_SUBSCRIBED; 663 _state |= _STATE_SUBSCRIBED;
675 if (_isAddingStream) { 664 if (_isAddingStream) {
676 _StreamControllerAddStreamState<T> addState = 665 _StreamControllerAddStreamState<T> addState =
677 _varData as Object /*=_StreamControllerAddStreamState<T>*/; 666 _varData as Object/*=_StreamControllerAddStreamState<T>*/;
678 addState.varData = subscription; 667 addState.varData = subscription;
679 addState.resume(); 668 addState.resume();
680 } else { 669 } else {
681 _varData = subscription; 670 _varData = subscription;
682 } 671 }
683 subscription._setPendingEvents(pendingEvents); 672 subscription._setPendingEvents(pendingEvents);
684 subscription._guardCallback(() { 673 subscription._guardCallback(() {
685 _runGuarded(onListen); 674 _runGuarded(onListen);
686 }); 675 });
687 676
688 return subscription; 677 return subscription;
689 } 678 }
690 679
691 Future _recordCancel(StreamSubscription<T> subscription) { 680 Future _recordCancel(StreamSubscription<T> subscription) {
692 // When we cancel, we first cancel any stream being added, 681 // When we cancel, we first cancel any stream being added,
693 // Then we call `onCancel`, and finally the _doneFuture is completed. 682 // Then we call `onCancel`, and finally the _doneFuture is completed.
694 // If either of addStream's cancel or `onCancel` returns a future, 683 // If either of addStream's cancel or `onCancel` returns a future,
695 // we wait for it before continuing. 684 // we wait for it before continuing.
696 // Any error during this process ends up in the returned future. 685 // Any error during this process ends up in the returned future.
697 // If more errors happen, we act as if it happens inside nested try/finallys 686 // If more errors happen, we act as if it happens inside nested try/finallys
698 // or whenComplete calls, and only the last error ends up in the 687 // or whenComplete calls, and only the last error ends up in the
699 // returned future. 688 // returned future.
700 Future result; 689 Future result;
701 if (_isAddingStream) { 690 if (_isAddingStream) {
702 _StreamControllerAddStreamState<T> addState = 691 _StreamControllerAddStreamState<T> addState =
703 _varData as Object /*=_StreamControllerAddStreamState<T>*/; 692 _varData as Object/*=_StreamControllerAddStreamState<T>*/;
704 result = addState.cancel(); 693 result = addState.cancel();
705 } 694 }
706 _varData = null; 695 _varData = null;
707 _state = 696 _state =
708 (_state & ~(_STATE_SUBSCRIBED | _STATE_ADDSTREAM)) | _STATE_CANCELED; 697 (_state & ~(_STATE_SUBSCRIBED | _STATE_ADDSTREAM)) | _STATE_CANCELED;
709 698
710 if (onCancel != null) { 699 if (onCancel != null) {
711 if (result == null) { 700 if (result == null) {
712 // Only introduce a future if one is needed. 701 // Only introduce a future if one is needed.
713 // If _onCancel returns null, no future is needed. 702 // If _onCancel returns null, no future is needed.
(...skipping 22 matching lines...) Expand all
736 } else { 725 } else {
737 complete(); 726 complete();
738 } 727 }
739 728
740 return result; 729 return result;
741 } 730 }
742 731
743 void _recordPause(StreamSubscription<T> subscription) { 732 void _recordPause(StreamSubscription<T> subscription) {
744 if (_isAddingStream) { 733 if (_isAddingStream) {
745 _StreamControllerAddStreamState<T> addState = 734 _StreamControllerAddStreamState<T> addState =
746 _varData as Object /*=_StreamControllerAddStreamState<T>*/; 735 _varData as Object/*=_StreamControllerAddStreamState<T>*/;
747 addState.pause(); 736 addState.pause();
748 } 737 }
749 _runGuarded(onPause); 738 _runGuarded(onPause);
750 } 739 }
751 740
752 void _recordResume(StreamSubscription<T> subscription) { 741 void _recordResume(StreamSubscription<T> subscription) {
753 if (_isAddingStream) { 742 if (_isAddingStream) {
754 _StreamControllerAddStreamState<T> addState = 743 _StreamControllerAddStreamState<T> addState =
755 _varData as Object /*=_StreamControllerAddStreamState<T>*/; 744 _varData as Object/*=_StreamControllerAddStreamState<T>*/;
756 addState.resume(); 745 addState.resume();
757 } 746 }
758 _runGuarded(onResume); 747 _runGuarded(onResume);
759 } 748 }
760 } 749 }
761 750
762 abstract class _SyncStreamControllerDispatch<T> 751 abstract class _SyncStreamControllerDispatch<T>
763 implements _StreamController<T>, SynchronousStreamController<T> { 752 implements _StreamController<T>, SynchronousStreamController<T> {
764 int get _state; 753 int get _state;
765 void set _state(int state); 754 void set _state(int state);
(...skipping 23 matching lines...) Expand all
789 778
790 void _sendDone() { 779 void _sendDone() {
791 _subscription._addPending(const _DelayedDone()); 780 _subscription._addPending(const _DelayedDone());
792 } 781 }
793 } 782 }
794 783
795 // TODO(lrn): Use common superclass for callback-controllers when VM supports 784 // TODO(lrn): Use common superclass for callback-controllers when VM supports
796 // constructors in mixin superclasses. 785 // constructors in mixin superclasses.
797 786
798 class _AsyncStreamController<T> = _StreamController<T> 787 class _AsyncStreamController<T> = _StreamController<T>
799 with _AsyncStreamControllerDispatch<T>; 788 with _AsyncStreamControllerDispatch<T>;
800 789
801 class _SyncStreamController<T> = _StreamController<T> 790 class _SyncStreamController<T> = _StreamController<T>
802 with _SyncStreamControllerDispatch<T>; 791 with _SyncStreamControllerDispatch<T>;
803 792
804 typedef _NotificationHandler(); 793 typedef _NotificationHandler();
805 794
806 Future _runGuarded(_NotificationHandler notificationHandler) { 795 Future _runGuarded(_NotificationHandler notificationHandler) {
807 if (notificationHandler == null) return null; 796 if (notificationHandler == null) return null;
808 try { 797 try {
809 var result = notificationHandler(); 798 var result = notificationHandler();
810 if (result is Future) return result; 799 if (result is Future) return result;
811 return null; 800 return null;
812 } catch (e, s) { 801 } catch (e, s) {
813 Zone.current.handleUncaughtError(e, s); 802 Zone.current.handleUncaughtError(e, s);
814 } 803 }
815 } 804 }
816 805
817 class _ControllerStream<T> extends _StreamImpl<T> { 806 class _ControllerStream<T> extends _StreamImpl<T> {
818 _StreamControllerLifecycle<T> _controller; 807 _StreamControllerLifecycle<T> _controller;
819 808
820 _ControllerStream(this._controller); 809 _ControllerStream(this._controller);
821 810
822 StreamSubscription<T> _createSubscription( 811 StreamSubscription<T> _createSubscription(void onData(T data),
823 void onData(T data), 812 Function onError, void onDone(), bool cancelOnError) =>
824 Function onError, 813 _controller._subscribe(onData, onError, onDone, cancelOnError);
825 void onDone(),
826 bool cancelOnError) =>
827 _controller._subscribe(onData, onError, onDone, cancelOnError);
828 814
829 // Override == and hashCode so that new streams returned by the same 815 // Override == and hashCode so that new streams returned by the same
830 // controller are considered equal. The controller returns a new stream 816 // controller are considered equal. The controller returns a new stream
831 // each time it's queried, but doesn't have to cache the result. 817 // each time it's queried, but doesn't have to cache the result.
832 818
833 int get hashCode => _controller.hashCode ^ 0x35323532; 819 int get hashCode => _controller.hashCode ^ 0x35323532;
834 820
835 bool operator==(Object other) { 821 bool operator ==(Object other) {
836 if (identical(this, other)) return true; 822 if (identical(this, other)) return true;
837 if (other is! _ControllerStream) return false; 823 if (other is! _ControllerStream) return false;
838 _ControllerStream otherStream = other; 824 _ControllerStream otherStream = other;
839 return identical(otherStream._controller, this._controller); 825 return identical(otherStream._controller, this._controller);
840 } 826 }
841 } 827 }
842 828
843 class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> { 829 class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> {
844 final _StreamControllerLifecycle<T> _controller; 830 final _StreamControllerLifecycle<T> _controller;
845 831
846 _ControllerSubscription(this._controller, void onData(T data), 832 _ControllerSubscription(this._controller, void onData(T data),
847 Function onError, void onDone(), bool cancelOnError) 833 Function onError, void onDone(), bool cancelOnError)
848 : super(onData, onError, onDone, cancelOnError); 834 : super(onData, onError, onDone, cancelOnError);
849 835
850 Future _onCancel() { 836 Future _onCancel() {
851 return _controller._recordCancel(this); 837 return _controller._recordCancel(this);
852 } 838 }
853 839
854 void _onPause() { 840 void _onPause() {
855 _controller._recordPause(this); 841 _controller._recordPause(this);
856 } 842 }
857 843
858 void _onResume() { 844 void _onResume() {
859 _controller._recordResume(this); 845 _controller._recordResume(this);
860 } 846 }
861 } 847 }
862 848
863
864 /** A class that exposes only the [StreamSink] interface of an object. */ 849 /** A class that exposes only the [StreamSink] interface of an object. */
865 class _StreamSinkWrapper<T> implements StreamSink<T> { 850 class _StreamSinkWrapper<T> implements StreamSink<T> {
866 final StreamController _target; 851 final StreamController _target;
867 _StreamSinkWrapper(this._target); 852 _StreamSinkWrapper(this._target);
868 void add(T data) { _target.add(data); } 853 void add(T data) {
854 _target.add(data);
855 }
856
869 void addError(Object error, [StackTrace stackTrace]) { 857 void addError(Object error, [StackTrace stackTrace]) {
870 _target.addError(error, stackTrace); 858 _target.addError(error, stackTrace);
871 } 859 }
860
872 Future close() => _target.close(); 861 Future close() => _target.close();
873 Future addStream(Stream<T> source, {bool cancelOnError: true}) => 862 Future addStream(Stream<T> source, {bool cancelOnError: true}) =>
874 _target.addStream(source, cancelOnError: cancelOnError); 863 _target.addStream(source, cancelOnError: cancelOnError);
875 Future get done => _target.done; 864 Future get done => _target.done;
876 } 865 }
877 866
878 /** 867 /**
879 * Object containing the state used to handle [StreamController.addStream]. 868 * Object containing the state used to handle [StreamController.addStream].
880 */ 869 */
881 class _AddStreamState<T> { 870 class _AddStreamState<T> {
882 // [_Future] returned by call to addStream. 871 // [_Future] returned by call to addStream.
883 final _Future addStreamFuture; 872 final _Future addStreamFuture;
884 873
885 // Subscription on stream argument to addStream. 874 // Subscription on stream argument to addStream.
886 final StreamSubscription addSubscription; 875 final StreamSubscription addSubscription;
887 876
888 _AddStreamState(_EventSink<T> controller, Stream source, bool cancelOnError) 877 _AddStreamState(_EventSink<T> controller, Stream source, bool cancelOnError)
889 : addStreamFuture = new _Future(), 878 : addStreamFuture = new _Future(),
890 addSubscription = source.listen(controller._add, 879 addSubscription = source.listen(controller._add,
891 onError: cancelOnError 880 onError: cancelOnError
892 ? makeErrorHandler(controller) 881 ? makeErrorHandler(controller)
893 : controller._addError, 882 : controller._addError,
894 onDone: controller._close, 883 onDone: controller._close,
895 cancelOnError: cancelOnError); 884 cancelOnError: cancelOnError);
896 885
897 static makeErrorHandler(_EventSink controller) => 886 static makeErrorHandler(_EventSink controller) => (e, StackTrace s) {
898 (e, StackTrace s) {
899 controller._addError(e, s); 887 controller._addError(e, s);
900 controller._close(); 888 controller._close();
901 }; 889 };
902 890
903 void pause() { 891 void pause() {
904 addSubscription.pause(); 892 addSubscription.pause();
905 } 893 }
906 894
907 void resume() { 895 void resume() {
908 addSubscription.resume(); 896 addSubscription.resume();
909 } 897 }
910 898
911 /** 899 /**
912 * Stop adding the stream. 900 * Stop adding the stream.
913 * 901 *
914 * Complete the future returned by `StreamController.addStream` when 902 * Complete the future returned by `StreamController.addStream` when
915 * the cancel is complete. 903 * the cancel is complete.
916 * 904 *
917 * Return a future if the cancel takes time, otherwise return `null`. 905 * Return a future if the cancel takes time, otherwise return `null`.
918 */ 906 */
919 Future cancel() { 907 Future cancel() {
920 var cancel = addSubscription.cancel(); 908 var cancel = addSubscription.cancel();
921 if (cancel == null) { 909 if (cancel == null) {
922 addStreamFuture._asyncComplete(null); 910 addStreamFuture._asyncComplete(null);
923 return null; 911 return null;
924 } 912 }
925 return cancel.whenComplete(() { addStreamFuture._asyncComplete(null); }); 913 return cancel.whenComplete(() {
914 addStreamFuture._asyncComplete(null);
915 });
926 } 916 }
927 917
928 void complete() { 918 void complete() {
929 addStreamFuture._asyncComplete(null); 919 addStreamFuture._asyncComplete(null);
930 } 920 }
931 } 921 }
932 922
933 class _StreamControllerAddStreamState<T> extends _AddStreamState<T> { 923 class _StreamControllerAddStreamState<T> extends _AddStreamState<T> {
934 // The subscription or pending data of a _StreamController. 924 // The subscription or pending data of a _StreamController.
935 // Stored here because we reuse the `_varData` field in the _StreamController 925 // Stored here because we reuse the `_varData` field in the _StreamController
936 // to store this state object. 926 // to store this state object.
937 var varData; 927 var varData;
938 928
939 _StreamControllerAddStreamState(_StreamController<T> controller, 929 _StreamControllerAddStreamState(_StreamController<T> controller, this.varData,
940 this.varData, 930 Stream source, bool cancelOnError)
941 Stream source,
942 bool cancelOnError)
943 : super(controller, source, cancelOnError) { 931 : super(controller, source, cancelOnError) {
944 if (controller.isPaused) { 932 if (controller.isPaused) {
945 addSubscription.pause(); 933 addSubscription.pause();
946 } 934 }
947 } 935 }
948 } 936 }
OLDNEW
« no previous file with comments | « sdk/lib/async/stream.dart ('k') | sdk/lib/async/stream_impl.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698