| OLD | NEW |
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
| 8 // Controller for creating and adding events to a stream. | 8 // Controller for creating and adding events to a stream. |
| 9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
| 10 | 10 |
| (...skipping 152 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 163 * | 163 * |
| 164 * Also allows an objection stack trace object, on top of what [EventSink] | 164 * Also allows an objection stack trace object, on top of what [EventSink] |
| 165 * allows. | 165 * allows. |
| 166 */ | 166 */ |
| 167 void addError(Object error, [Object stackTrace]); | 167 void addError(Object error, [Object stackTrace]); |
| 168 } | 168 } |
| 169 | 169 |
| 170 | 170 |
| 171 abstract class _StreamControllerLifecycle<T> { | 171 abstract class _StreamControllerLifecycle<T> { |
| 172 StreamSubscription<T> _subscribe(void onData(T data), | 172 StreamSubscription<T> _subscribe(void onData(T data), |
| 173 void onError(Object error), | 173 Function onError, |
| 174 void onDone(), | 174 void onDone(), |
| 175 bool cancelOnError); | 175 bool cancelOnError); |
| 176 void _recordPause(StreamSubscription<T> subscription) {} | 176 void _recordPause(StreamSubscription<T> subscription) {} |
| 177 void _recordResume(StreamSubscription<T> subscription) {} | 177 void _recordResume(StreamSubscription<T> subscription) {} |
| 178 void _recordCancel(StreamSubscription<T> subscription) {} | 178 void _recordCancel(StreamSubscription<T> subscription) {} |
| 179 } | 179 } |
| 180 | 180 |
| 181 /** | 181 /** |
| 182 * Default implementation of [StreamController]. | 182 * Default implementation of [StreamController]. |
| 183 * | 183 * |
| (...skipping 192 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 376 /** | 376 /** |
| 377 * Send or enqueue an error event. | 377 * Send or enqueue an error event. |
| 378 */ | 378 */ |
| 379 void addError(Object error, [Object stackTrace]) { | 379 void addError(Object error, [Object stackTrace]) { |
| 380 if (!_mayAddEvent) throw _badEventState(); | 380 if (!_mayAddEvent) throw _badEventState(); |
| 381 if (stackTrace != null) { | 381 if (stackTrace != null) { |
| 382 // Force stack trace overwrite. Even if the error already contained | 382 // Force stack trace overwrite. Even if the error already contained |
| 383 // a stack trace. | 383 // a stack trace. |
| 384 _attachStackTrace(error, stackTrace); | 384 _attachStackTrace(error, stackTrace); |
| 385 } | 385 } |
| 386 _addError(error); | 386 _addError(error, stackTrace); |
| 387 } | 387 } |
| 388 | 388 |
| 389 /** | 389 /** |
| 390 * Closes this controller. | 390 * Closes this controller. |
| 391 * | 391 * |
| 392 * After closing, no further events may be added using [add] or [addError]. | 392 * After closing, no further events may be added using [add] or [addError]. |
| 393 * | 393 * |
| 394 * You are allowed to close the controller more than once, but only the first | 394 * You are allowed to close the controller more than once, but only the first |
| 395 * call has any effect. | 395 * call has any effect. |
| 396 * | 396 * |
| (...skipping 20 matching lines...) Expand all Loading... |
| 417 | 417 |
| 418 // Add data event, used both by the [addStream] events and by [add]. | 418 // Add data event, used both by the [addStream] events and by [add]. |
| 419 void _add(T value) { | 419 void _add(T value) { |
| 420 if (hasListener) { | 420 if (hasListener) { |
| 421 _sendData(value); | 421 _sendData(value); |
| 422 } else if (_isInitialState) { | 422 } else if (_isInitialState) { |
| 423 _ensurePendingEvents().add(new _DelayedData<T>(value)); | 423 _ensurePendingEvents().add(new _DelayedData<T>(value)); |
| 424 } | 424 } |
| 425 } | 425 } |
| 426 | 426 |
| 427 void _addError(Object error) { | 427 void _addError(Object error, StackTrace stackTrace) { |
| 428 if (hasListener) { | 428 if (hasListener) { |
| 429 _sendError(error); | 429 _sendError(error, stackTrace); |
| 430 } else if (_isInitialState) { | 430 } else if (_isInitialState) { |
| 431 _ensurePendingEvents().add(new _DelayedError(error)); | 431 _ensurePendingEvents().add(new _DelayedError(error, stackTrace)); |
| 432 } | 432 } |
| 433 } | 433 } |
| 434 | 434 |
| 435 void _close() { | 435 void _close() { |
| 436 // End of addStream stream. | 436 // End of addStream stream. |
| 437 assert(_isAddingStream); | 437 assert(_isAddingStream); |
| 438 _StreamControllerAddStreamState addState = _varData; | 438 _StreamControllerAddStreamState addState = _varData; |
| 439 _varData = addState.varData; | 439 _varData = addState.varData; |
| 440 _state &= ~_STATE_ADDSTREAM; | 440 _state &= ~_STATE_ADDSTREAM; |
| 441 addState.complete(); | 441 addState.complete(); |
| 442 } | 442 } |
| 443 | 443 |
| 444 // _StreamControllerLifeCycle interface | 444 // _StreamControllerLifeCycle interface |
| 445 | 445 |
| 446 StreamSubscription<T> _subscribe(void onData(T data), | 446 StreamSubscription<T> _subscribe(void onData(T data), |
| 447 void onError(Object error), | 447 Function onError, |
| 448 void onDone(), | 448 void onDone(), |
| 449 bool cancelOnError) { | 449 bool cancelOnError) { |
| 450 if (!_isInitialState) { | 450 if (!_isInitialState) { |
| 451 throw new StateError("Stream has already been listened to."); | 451 throw new StateError("Stream has already been listened to."); |
| 452 } | 452 } |
| 453 _ControllerSubscription subscription = new _ControllerSubscription( | 453 _ControllerSubscription subscription = new _ControllerSubscription( |
| 454 this, onData, onError, onDone, cancelOnError); | 454 this, onData, onError, onDone, cancelOnError); |
| 455 | 455 |
| 456 _PendingEvents pendingEvents = _pendingEvents; | 456 _PendingEvents pendingEvents = _pendingEvents; |
| 457 _state |= _STATE_SUBSCRIBED; | 457 _state |= _STATE_SUBSCRIBED; |
| (...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 499 _runGuarded(_onResume); | 499 _runGuarded(_onResume); |
| 500 } | 500 } |
| 501 } | 501 } |
| 502 | 502 |
| 503 abstract class _SyncStreamControllerDispatch<T> | 503 abstract class _SyncStreamControllerDispatch<T> |
| 504 implements _StreamController<T> { | 504 implements _StreamController<T> { |
| 505 void _sendData(T data) { | 505 void _sendData(T data) { |
| 506 _subscription._add(data); | 506 _subscription._add(data); |
| 507 } | 507 } |
| 508 | 508 |
| 509 void _sendError(Object error) { | 509 void _sendError(Object error, StackTrace stackTrace) { |
| 510 _subscription._addError(error); | 510 _subscription._addError(error, stackTrace); |
| 511 } | 511 } |
| 512 | 512 |
| 513 void _sendDone() { | 513 void _sendDone() { |
| 514 _subscription._close(); | 514 _subscription._close(); |
| 515 } | 515 } |
| 516 } | 516 } |
| 517 | 517 |
| 518 abstract class _AsyncStreamControllerDispatch<T> | 518 abstract class _AsyncStreamControllerDispatch<T> |
| 519 implements _StreamController<T> { | 519 implements _StreamController<T> { |
| 520 void _sendData(T data) { | 520 void _sendData(T data) { |
| 521 _subscription._addPending(new _DelayedData(data)); | 521 _subscription._addPending(new _DelayedData(data)); |
| 522 } | 522 } |
| 523 | 523 |
| 524 void _sendError(Object error) { | 524 void _sendError(Object error, StackTrace stackTrace) { |
| 525 _subscription._addPending(new _DelayedError(error)); | 525 _subscription._addPending(new _DelayedError(error, stackTrace)); |
| 526 } | 526 } |
| 527 | 527 |
| 528 void _sendDone() { | 528 void _sendDone() { |
| 529 _subscription._addPending(const _DelayedDone()); | 529 _subscription._addPending(const _DelayedDone()); |
| 530 } | 530 } |
| 531 } | 531 } |
| 532 | 532 |
| 533 // TODO(lrn): Use common superclass for callback-controllers when VM supports | 533 // TODO(lrn): Use common superclass for callback-controllers when VM supports |
| 534 // constructors in mixin superclasses. | 534 // constructors in mixin superclasses. |
| 535 | 535 |
| (...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 583 } | 583 } |
| 584 } | 584 } |
| 585 | 585 |
| 586 class _ControllerStream<T> extends _StreamImpl<T> { | 586 class _ControllerStream<T> extends _StreamImpl<T> { |
| 587 _StreamControllerLifecycle<T> _controller; | 587 _StreamControllerLifecycle<T> _controller; |
| 588 | 588 |
| 589 _ControllerStream(this._controller); | 589 _ControllerStream(this._controller); |
| 590 | 590 |
| 591 StreamSubscription<T> _createSubscription( | 591 StreamSubscription<T> _createSubscription( |
| 592 void onData(T data), | 592 void onData(T data), |
| 593 void onError(Object error), | 593 Function onError, |
| 594 void onDone(), | 594 void onDone(), |
| 595 bool cancelOnError) => | 595 bool cancelOnError) => |
| 596 _controller._subscribe(onData, onError, onDone, cancelOnError); | 596 _controller._subscribe(onData, onError, onDone, cancelOnError); |
| 597 | 597 |
| 598 // Override == and hashCode so that new streams returned by the same | 598 // Override == and hashCode so that new streams returned by the same |
| 599 // controller are considered equal. The controller returns a new stream | 599 // controller are considered equal. The controller returns a new stream |
| 600 // each time it's queried, but doesn't have to cache the result. | 600 // each time it's queried, but doesn't have to cache the result. |
| 601 | 601 |
| 602 int get hashCode => _controller.hashCode ^ 0x35323532; | 602 int get hashCode => _controller.hashCode ^ 0x35323532; |
| 603 | 603 |
| 604 bool operator==(Object other) { | 604 bool operator==(Object other) { |
| 605 if (identical(this, other)) return true; | 605 if (identical(this, other)) return true; |
| 606 if (other is! _ControllerStream) return false; | 606 if (other is! _ControllerStream) return false; |
| 607 _ControllerStream otherStream = other; | 607 _ControllerStream otherStream = other; |
| 608 return identical(otherStream._controller, this._controller); | 608 return identical(otherStream._controller, this._controller); |
| 609 } | 609 } |
| 610 } | 610 } |
| 611 | 611 |
| 612 class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> { | 612 class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> { |
| 613 final _StreamControllerLifecycle<T> _controller; | 613 final _StreamControllerLifecycle<T> _controller; |
| 614 | 614 |
| 615 _ControllerSubscription(this._controller, | 615 _ControllerSubscription(this._controller, |
| 616 void onData(T data), | 616 void onData(T data), |
| 617 void onError(Object error), | 617 Function onError, |
| 618 void onDone(), | 618 void onDone(), |
| 619 bool cancelOnError) | 619 bool cancelOnError) |
| 620 : super(onData, onError, onDone, cancelOnError); | 620 : super(onData, onError, onDone, cancelOnError); |
| 621 | 621 |
| 622 void _onCancel() { | 622 void _onCancel() { |
| 623 _controller._recordCancel(this); | 623 _controller._recordCancel(this); |
| 624 } | 624 } |
| 625 | 625 |
| 626 void _onPause() { | 626 void _onPause() { |
| 627 _controller._recordPause(this); | 627 _controller._recordPause(this); |
| 628 } | 628 } |
| 629 | 629 |
| 630 void _onResume() { | 630 void _onResume() { |
| 631 _controller._recordResume(this); | 631 _controller._recordResume(this); |
| 632 } | 632 } |
| 633 } | 633 } |
| 634 | 634 |
| 635 | 635 |
| 636 /** A class that exposes only the [StreamSink] interface of an object. */ | 636 /** A class that exposes only the [StreamSink] interface of an object. */ |
| 637 class _StreamSinkWrapper<T> implements StreamSink<T> { | 637 class _StreamSinkWrapper<T> implements StreamSink<T> { |
| 638 final StreamSink _target; | 638 final StreamSink _target; |
| 639 _StreamSinkWrapper(this._target); | 639 _StreamSinkWrapper(this._target); |
| 640 void add(T data) { _target.add(data); } | 640 void add(T data) { _target.add(data); } |
| 641 void addError(Object error) { _target.addError(error); } | 641 void addError(Object error, [StackTrace stackTrace]) { |
| 642 _target.addError(error); |
| 643 } |
| 642 Future close() => _target.close(); | 644 Future close() => _target.close(); |
| 643 Future addStream(Stream<T> source) => _target.addStream(source); | 645 Future addStream(Stream<T> source) => _target.addStream(source); |
| 644 Future get done => _target.done; | 646 Future get done => _target.done; |
| 645 } | 647 } |
| 646 | 648 |
| 647 /** | 649 /** |
| 648 * Object containing the state used to handle [StreamController.addStream]. | 650 * Object containing the state used to handle [StreamController.addStream]. |
| 649 */ | 651 */ |
| 650 class _AddStreamState<T> { | 652 class _AddStreamState<T> { |
| 651 // [_FutureImpl] returned by call to addStream. | 653 // [_FutureImpl] returned by call to addStream. |
| (...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 686 var varData; | 688 var varData; |
| 687 | 689 |
| 688 _StreamControllerAddStreamState(_StreamController controller, | 690 _StreamControllerAddStreamState(_StreamController controller, |
| 689 this.varData, | 691 this.varData, |
| 690 Stream source) : super(controller, source) { | 692 Stream source) : super(controller, source) { |
| 691 if (controller.isPaused) { | 693 if (controller.isPaused) { |
| 692 addSubscription.pause(); | 694 addSubscription.pause(); |
| 693 } | 695 } |
| 694 } | 696 } |
| 695 } | 697 } |
| OLD | NEW |