OLD | NEW |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
8 // Controller for creating and adding events to a stream. | 8 // Controller for creating and adding events to a stream. |
9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
10 | 10 |
(...skipping 152 matching lines...) Loading... |
163 * | 163 * |
164 * Also allows an objection stack trace object, on top of what [EventSink] | 164 * Also allows an objection stack trace object, on top of what [EventSink] |
165 * allows. | 165 * allows. |
166 */ | 166 */ |
167 void addError(Object error, [Object stackTrace]); | 167 void addError(Object error, [Object stackTrace]); |
168 } | 168 } |
169 | 169 |
170 | 170 |
171 abstract class _StreamControllerLifecycle<T> { | 171 abstract class _StreamControllerLifecycle<T> { |
172 StreamSubscription<T> _subscribe(void onData(T data), | 172 StreamSubscription<T> _subscribe(void onData(T data), |
173 void onError(Object error), | 173 Function onError, |
174 void onDone(), | 174 void onDone(), |
175 bool cancelOnError); | 175 bool cancelOnError); |
176 void _recordPause(StreamSubscription<T> subscription) {} | 176 void _recordPause(StreamSubscription<T> subscription) {} |
177 void _recordResume(StreamSubscription<T> subscription) {} | 177 void _recordResume(StreamSubscription<T> subscription) {} |
178 void _recordCancel(StreamSubscription<T> subscription) {} | 178 void _recordCancel(StreamSubscription<T> subscription) {} |
179 } | 179 } |
180 | 180 |
181 /** | 181 /** |
182 * Default implementation of [StreamController]. | 182 * Default implementation of [StreamController]. |
183 * | 183 * |
(...skipping 192 matching lines...) Loading... |
376 /** | 376 /** |
377 * Send or enqueue an error event. | 377 * Send or enqueue an error event. |
378 */ | 378 */ |
379 void addError(Object error, [Object stackTrace]) { | 379 void addError(Object error, [Object stackTrace]) { |
380 if (!_mayAddEvent) throw _badEventState(); | 380 if (!_mayAddEvent) throw _badEventState(); |
381 if (stackTrace != null) { | 381 if (stackTrace != null) { |
382 // Force stack trace overwrite. Even if the error already contained | 382 // Force stack trace overwrite. Even if the error already contained |
383 // a stack trace. | 383 // a stack trace. |
384 _attachStackTrace(error, stackTrace); | 384 _attachStackTrace(error, stackTrace); |
385 } | 385 } |
386 _addError(error); | 386 _addError(error, stackTrace); |
387 } | 387 } |
388 | 388 |
389 /** | 389 /** |
390 * Closes this controller. | 390 * Closes this controller. |
391 * | 391 * |
392 * After closing, no further events may be added using [add] or [addError]. | 392 * After closing, no further events may be added using [add] or [addError]. |
393 * | 393 * |
394 * You are allowed to close the controller more than once, but only the first | 394 * You are allowed to close the controller more than once, but only the first |
395 * call has any effect. | 395 * call has any effect. |
396 * | 396 * |
(...skipping 20 matching lines...) Loading... |
417 | 417 |
418 // Add data event, used both by the [addStream] events and by [add]. | 418 // Add data event, used both by the [addStream] events and by [add]. |
419 void _add(T value) { | 419 void _add(T value) { |
420 if (hasListener) { | 420 if (hasListener) { |
421 _sendData(value); | 421 _sendData(value); |
422 } else if (_isInitialState) { | 422 } else if (_isInitialState) { |
423 _ensurePendingEvents().add(new _DelayedData<T>(value)); | 423 _ensurePendingEvents().add(new _DelayedData<T>(value)); |
424 } | 424 } |
425 } | 425 } |
426 | 426 |
427 void _addError(Object error) { | 427 void _addError(Object error, StackTrace stackTrace) { |
428 if (hasListener) { | 428 if (hasListener) { |
429 _sendError(error); | 429 _sendError(error, stackTrace); |
430 } else if (_isInitialState) { | 430 } else if (_isInitialState) { |
431 _ensurePendingEvents().add(new _DelayedError(error)); | 431 _ensurePendingEvents().add(new _DelayedError(error, stackTrace)); |
432 } | 432 } |
433 } | 433 } |
434 | 434 |
435 void _close() { | 435 void _close() { |
436 // End of addStream stream. | 436 // End of addStream stream. |
437 assert(_isAddingStream); | 437 assert(_isAddingStream); |
438 _StreamControllerAddStreamState addState = _varData; | 438 _StreamControllerAddStreamState addState = _varData; |
439 _varData = addState.varData; | 439 _varData = addState.varData; |
440 _state &= ~_STATE_ADDSTREAM; | 440 _state &= ~_STATE_ADDSTREAM; |
441 addState.complete(); | 441 addState.complete(); |
442 } | 442 } |
443 | 443 |
444 // _StreamControllerLifeCycle interface | 444 // _StreamControllerLifeCycle interface |
445 | 445 |
446 StreamSubscription<T> _subscribe(void onData(T data), | 446 StreamSubscription<T> _subscribe(void onData(T data), |
447 void onError(Object error), | 447 Function onError, |
448 void onDone(), | 448 void onDone(), |
449 bool cancelOnError) { | 449 bool cancelOnError) { |
450 if (!_isInitialState) { | 450 if (!_isInitialState) { |
451 throw new StateError("Stream has already been listened to."); | 451 throw new StateError("Stream has already been listened to."); |
452 } | 452 } |
453 _ControllerSubscription subscription = new _ControllerSubscription( | 453 _ControllerSubscription subscription = new _ControllerSubscription( |
454 this, onData, onError, onDone, cancelOnError); | 454 this, onData, onError, onDone, cancelOnError); |
455 | 455 |
456 _PendingEvents pendingEvents = _pendingEvents; | 456 _PendingEvents pendingEvents = _pendingEvents; |
457 _state |= _STATE_SUBSCRIBED; | 457 _state |= _STATE_SUBSCRIBED; |
(...skipping 41 matching lines...) Loading... |
499 _runGuarded(_onResume); | 499 _runGuarded(_onResume); |
500 } | 500 } |
501 } | 501 } |
502 | 502 |
503 abstract class _SyncStreamControllerDispatch<T> | 503 abstract class _SyncStreamControllerDispatch<T> |
504 implements _StreamController<T> { | 504 implements _StreamController<T> { |
505 void _sendData(T data) { | 505 void _sendData(T data) { |
506 _subscription._add(data); | 506 _subscription._add(data); |
507 } | 507 } |
508 | 508 |
509 void _sendError(Object error) { | 509 void _sendError(Object error, StackTrace stackTrace) { |
510 _subscription._addError(error); | 510 _subscription._addError(error, stackTrace); |
511 } | 511 } |
512 | 512 |
513 void _sendDone() { | 513 void _sendDone() { |
514 _subscription._close(); | 514 _subscription._close(); |
515 } | 515 } |
516 } | 516 } |
517 | 517 |
518 abstract class _AsyncStreamControllerDispatch<T> | 518 abstract class _AsyncStreamControllerDispatch<T> |
519 implements _StreamController<T> { | 519 implements _StreamController<T> { |
520 void _sendData(T data) { | 520 void _sendData(T data) { |
521 _subscription._addPending(new _DelayedData(data)); | 521 _subscription._addPending(new _DelayedData(data)); |
522 } | 522 } |
523 | 523 |
524 void _sendError(Object error) { | 524 void _sendError(Object error, StackTrace stackTrace) { |
525 _subscription._addPending(new _DelayedError(error)); | 525 _subscription._addPending(new _DelayedError(error, stackTrace)); |
526 } | 526 } |
527 | 527 |
528 void _sendDone() { | 528 void _sendDone() { |
529 _subscription._addPending(const _DelayedDone()); | 529 _subscription._addPending(const _DelayedDone()); |
530 } | 530 } |
531 } | 531 } |
532 | 532 |
533 // TODO(lrn): Use common superclass for callback-controllers when VM supports | 533 // TODO(lrn): Use common superclass for callback-controllers when VM supports |
534 // constructors in mixin superclasses. | 534 // constructors in mixin superclasses. |
535 | 535 |
(...skipping 47 matching lines...) Loading... |
583 } | 583 } |
584 } | 584 } |
585 | 585 |
586 class _ControllerStream<T> extends _StreamImpl<T> { | 586 class _ControllerStream<T> extends _StreamImpl<T> { |
587 _StreamControllerLifecycle<T> _controller; | 587 _StreamControllerLifecycle<T> _controller; |
588 | 588 |
589 _ControllerStream(this._controller); | 589 _ControllerStream(this._controller); |
590 | 590 |
591 StreamSubscription<T> _createSubscription( | 591 StreamSubscription<T> _createSubscription( |
592 void onData(T data), | 592 void onData(T data), |
593 void onError(Object error), | 593 Function onError, |
594 void onDone(), | 594 void onDone(), |
595 bool cancelOnError) => | 595 bool cancelOnError) => |
596 _controller._subscribe(onData, onError, onDone, cancelOnError); | 596 _controller._subscribe(onData, onError, onDone, cancelOnError); |
597 | 597 |
598 // Override == and hashCode so that new streams returned by the same | 598 // Override == and hashCode so that new streams returned by the same |
599 // controller are considered equal. The controller returns a new stream | 599 // controller are considered equal. The controller returns a new stream |
600 // each time it's queried, but doesn't have to cache the result. | 600 // each time it's queried, but doesn't have to cache the result. |
601 | 601 |
602 int get hashCode => _controller.hashCode ^ 0x35323532; | 602 int get hashCode => _controller.hashCode ^ 0x35323532; |
603 | 603 |
604 bool operator==(Object other) { | 604 bool operator==(Object other) { |
605 if (identical(this, other)) return true; | 605 if (identical(this, other)) return true; |
606 if (other is! _ControllerStream) return false; | 606 if (other is! _ControllerStream) return false; |
607 _ControllerStream otherStream = other; | 607 _ControllerStream otherStream = other; |
608 return identical(otherStream._controller, this._controller); | 608 return identical(otherStream._controller, this._controller); |
609 } | 609 } |
610 } | 610 } |
611 | 611 |
612 class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> { | 612 class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> { |
613 final _StreamControllerLifecycle<T> _controller; | 613 final _StreamControllerLifecycle<T> _controller; |
614 | 614 |
615 _ControllerSubscription(this._controller, | 615 _ControllerSubscription(this._controller, |
616 void onData(T data), | 616 void onData(T data), |
617 void onError(Object error), | 617 Function onError, |
618 void onDone(), | 618 void onDone(), |
619 bool cancelOnError) | 619 bool cancelOnError) |
620 : super(onData, onError, onDone, cancelOnError); | 620 : super(onData, onError, onDone, cancelOnError); |
621 | 621 |
622 void _onCancel() { | 622 void _onCancel() { |
623 _controller._recordCancel(this); | 623 _controller._recordCancel(this); |
624 } | 624 } |
625 | 625 |
626 void _onPause() { | 626 void _onPause() { |
627 _controller._recordPause(this); | 627 _controller._recordPause(this); |
628 } | 628 } |
629 | 629 |
630 void _onResume() { | 630 void _onResume() { |
631 _controller._recordResume(this); | 631 _controller._recordResume(this); |
632 } | 632 } |
633 } | 633 } |
634 | 634 |
635 | 635 |
636 /** A class that exposes only the [StreamSink] interface of an object. */ | 636 /** A class that exposes only the [StreamSink] interface of an object. */ |
637 class _StreamSinkWrapper<T> implements StreamSink<T> { | 637 class _StreamSinkWrapper<T> implements StreamSink<T> { |
638 final StreamSink _target; | 638 final StreamSink _target; |
639 _StreamSinkWrapper(this._target); | 639 _StreamSinkWrapper(this._target); |
640 void add(T data) { _target.add(data); } | 640 void add(T data) { _target.add(data); } |
641 void addError(Object error) { _target.addError(error); } | 641 void addError(Object error, [StackTrace stackTrace]) { |
| 642 _target.addError(error); |
| 643 } |
642 Future close() => _target.close(); | 644 Future close() => _target.close(); |
643 Future addStream(Stream<T> source) => _target.addStream(source); | 645 Future addStream(Stream<T> source) => _target.addStream(source); |
644 Future get done => _target.done; | 646 Future get done => _target.done; |
645 } | 647 } |
646 | 648 |
647 /** | 649 /** |
648 * Object containing the state used to handle [StreamController.addStream]. | 650 * Object containing the state used to handle [StreamController.addStream]. |
649 */ | 651 */ |
650 class _AddStreamState<T> { | 652 class _AddStreamState<T> { |
651 // [_FutureImpl] returned by call to addStream. | 653 // [_FutureImpl] returned by call to addStream. |
(...skipping 34 matching lines...) Loading... |
686 var varData; | 688 var varData; |
687 | 689 |
688 _StreamControllerAddStreamState(_StreamController controller, | 690 _StreamControllerAddStreamState(_StreamController controller, |
689 this.varData, | 691 this.varData, |
690 Stream source) : super(controller, source) { | 692 Stream source) : super(controller, source) { |
691 if (controller.isPaused) { | 693 if (controller.isPaused) { |
692 addSubscription.pause(); | 694 addSubscription.pause(); |
693 } | 695 } |
694 } | 696 } |
695 } | 697 } |
OLD | NEW |