Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(59)

Side by Side Diff: sdk/lib/async/stream_controller.dart

Issue 25094002: Adapt streams for additional stackTrace argument. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Remove types in closures. Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « sdk/lib/async/stream.dart ('k') | sdk/lib/async/stream_impl.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Controller for creating and adding events to a stream. 8 // Controller for creating and adding events to a stream.
9 // ------------------------------------------------------------------- 9 // -------------------------------------------------------------------
10 10
(...skipping 152 matching lines...) Expand 10 before | Expand all | Expand 10 after
163 * 163 *
164 * Also allows an objection stack trace object, on top of what [EventSink] 164 * Also allows an objection stack trace object, on top of what [EventSink]
165 * allows. 165 * allows.
166 */ 166 */
167 void addError(Object error, [Object stackTrace]); 167 void addError(Object error, [Object stackTrace]);
168 } 168 }
169 169
170 170
171 abstract class _StreamControllerLifecycle<T> { 171 abstract class _StreamControllerLifecycle<T> {
172 StreamSubscription<T> _subscribe(void onData(T data), 172 StreamSubscription<T> _subscribe(void onData(T data),
173 void onError(Object error), 173 Function onError,
174 void onDone(), 174 void onDone(),
175 bool cancelOnError); 175 bool cancelOnError);
176 void _recordPause(StreamSubscription<T> subscription) {} 176 void _recordPause(StreamSubscription<T> subscription) {}
177 void _recordResume(StreamSubscription<T> subscription) {} 177 void _recordResume(StreamSubscription<T> subscription) {}
178 void _recordCancel(StreamSubscription<T> subscription) {} 178 void _recordCancel(StreamSubscription<T> subscription) {}
179 } 179 }
180 180
181 /** 181 /**
182 * Default implementation of [StreamController]. 182 * Default implementation of [StreamController].
183 * 183 *
(...skipping 192 matching lines...) Expand 10 before | Expand all | Expand 10 after
376 /** 376 /**
377 * Send or enqueue an error event. 377 * Send or enqueue an error event.
378 */ 378 */
379 void addError(Object error, [Object stackTrace]) { 379 void addError(Object error, [Object stackTrace]) {
380 if (!_mayAddEvent) throw _badEventState(); 380 if (!_mayAddEvent) throw _badEventState();
381 if (stackTrace != null) { 381 if (stackTrace != null) {
382 // Force stack trace overwrite. Even if the error already contained 382 // Force stack trace overwrite. Even if the error already contained
383 // a stack trace. 383 // a stack trace.
384 _attachStackTrace(error, stackTrace); 384 _attachStackTrace(error, stackTrace);
385 } 385 }
386 _addError(error); 386 _addError(error, stackTrace);
387 } 387 }
388 388
389 /** 389 /**
390 * Closes this controller. 390 * Closes this controller.
391 * 391 *
392 * After closing, no further events may be added using [add] or [addError]. 392 * After closing, no further events may be added using [add] or [addError].
393 * 393 *
394 * You are allowed to close the controller more than once, but only the first 394 * You are allowed to close the controller more than once, but only the first
395 * call has any effect. 395 * call has any effect.
396 * 396 *
(...skipping 20 matching lines...) Expand all
417 417
418 // Add data event, used both by the [addStream] events and by [add]. 418 // Add data event, used both by the [addStream] events and by [add].
419 void _add(T value) { 419 void _add(T value) {
420 if (hasListener) { 420 if (hasListener) {
421 _sendData(value); 421 _sendData(value);
422 } else if (_isInitialState) { 422 } else if (_isInitialState) {
423 _ensurePendingEvents().add(new _DelayedData<T>(value)); 423 _ensurePendingEvents().add(new _DelayedData<T>(value));
424 } 424 }
425 } 425 }
426 426
427 void _addError(Object error) { 427 void _addError(Object error, StackTrace stackTrace) {
428 if (hasListener) { 428 if (hasListener) {
429 _sendError(error); 429 _sendError(error, stackTrace);
430 } else if (_isInitialState) { 430 } else if (_isInitialState) {
431 _ensurePendingEvents().add(new _DelayedError(error)); 431 _ensurePendingEvents().add(new _DelayedError(error, stackTrace));
432 } 432 }
433 } 433 }
434 434
435 void _close() { 435 void _close() {
436 // End of addStream stream. 436 // End of addStream stream.
437 assert(_isAddingStream); 437 assert(_isAddingStream);
438 _StreamControllerAddStreamState addState = _varData; 438 _StreamControllerAddStreamState addState = _varData;
439 _varData = addState.varData; 439 _varData = addState.varData;
440 _state &= ~_STATE_ADDSTREAM; 440 _state &= ~_STATE_ADDSTREAM;
441 addState.complete(); 441 addState.complete();
442 } 442 }
443 443
444 // _StreamControllerLifeCycle interface 444 // _StreamControllerLifeCycle interface
445 445
446 StreamSubscription<T> _subscribe(void onData(T data), 446 StreamSubscription<T> _subscribe(void onData(T data),
447 void onError(Object error), 447 Function onError,
448 void onDone(), 448 void onDone(),
449 bool cancelOnError) { 449 bool cancelOnError) {
450 if (!_isInitialState) { 450 if (!_isInitialState) {
451 throw new StateError("Stream has already been listened to."); 451 throw new StateError("Stream has already been listened to.");
452 } 452 }
453 _ControllerSubscription subscription = new _ControllerSubscription( 453 _ControllerSubscription subscription = new _ControllerSubscription(
454 this, onData, onError, onDone, cancelOnError); 454 this, onData, onError, onDone, cancelOnError);
455 455
456 _PendingEvents pendingEvents = _pendingEvents; 456 _PendingEvents pendingEvents = _pendingEvents;
457 _state |= _STATE_SUBSCRIBED; 457 _state |= _STATE_SUBSCRIBED;
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after
499 _runGuarded(_onResume); 499 _runGuarded(_onResume);
500 } 500 }
501 } 501 }
502 502
503 abstract class _SyncStreamControllerDispatch<T> 503 abstract class _SyncStreamControllerDispatch<T>
504 implements _StreamController<T> { 504 implements _StreamController<T> {
505 void _sendData(T data) { 505 void _sendData(T data) {
506 _subscription._add(data); 506 _subscription._add(data);
507 } 507 }
508 508
509 void _sendError(Object error) { 509 void _sendError(Object error, StackTrace stackTrace) {
510 _subscription._addError(error); 510 _subscription._addError(error, stackTrace);
511 } 511 }
512 512
513 void _sendDone() { 513 void _sendDone() {
514 _subscription._close(); 514 _subscription._close();
515 } 515 }
516 } 516 }
517 517
518 abstract class _AsyncStreamControllerDispatch<T> 518 abstract class _AsyncStreamControllerDispatch<T>
519 implements _StreamController<T> { 519 implements _StreamController<T> {
520 void _sendData(T data) { 520 void _sendData(T data) {
521 _subscription._addPending(new _DelayedData(data)); 521 _subscription._addPending(new _DelayedData(data));
522 } 522 }
523 523
524 void _sendError(Object error) { 524 void _sendError(Object error, StackTrace stackTrace) {
525 _subscription._addPending(new _DelayedError(error)); 525 _subscription._addPending(new _DelayedError(error, stackTrace));
526 } 526 }
527 527
528 void _sendDone() { 528 void _sendDone() {
529 _subscription._addPending(const _DelayedDone()); 529 _subscription._addPending(const _DelayedDone());
530 } 530 }
531 } 531 }
532 532
533 // TODO(lrn): Use common superclass for callback-controllers when VM supports 533 // TODO(lrn): Use common superclass for callback-controllers when VM supports
534 // constructors in mixin superclasses. 534 // constructors in mixin superclasses.
535 535
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after
583 } 583 }
584 } 584 }
585 585
586 class _ControllerStream<T> extends _StreamImpl<T> { 586 class _ControllerStream<T> extends _StreamImpl<T> {
587 _StreamControllerLifecycle<T> _controller; 587 _StreamControllerLifecycle<T> _controller;
588 588
589 _ControllerStream(this._controller); 589 _ControllerStream(this._controller);
590 590
591 StreamSubscription<T> _createSubscription( 591 StreamSubscription<T> _createSubscription(
592 void onData(T data), 592 void onData(T data),
593 void onError(Object error), 593 Function onError,
594 void onDone(), 594 void onDone(),
595 bool cancelOnError) => 595 bool cancelOnError) =>
596 _controller._subscribe(onData, onError, onDone, cancelOnError); 596 _controller._subscribe(onData, onError, onDone, cancelOnError);
597 597
598 // Override == and hashCode so that new streams returned by the same 598 // Override == and hashCode so that new streams returned by the same
599 // controller are considered equal. The controller returns a new stream 599 // controller are considered equal. The controller returns a new stream
600 // each time it's queried, but doesn't have to cache the result. 600 // each time it's queried, but doesn't have to cache the result.
601 601
602 int get hashCode => _controller.hashCode ^ 0x35323532; 602 int get hashCode => _controller.hashCode ^ 0x35323532;
603 603
604 bool operator==(Object other) { 604 bool operator==(Object other) {
605 if (identical(this, other)) return true; 605 if (identical(this, other)) return true;
606 if (other is! _ControllerStream) return false; 606 if (other is! _ControllerStream) return false;
607 _ControllerStream otherStream = other; 607 _ControllerStream otherStream = other;
608 return identical(otherStream._controller, this._controller); 608 return identical(otherStream._controller, this._controller);
609 } 609 }
610 } 610 }
611 611
612 class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> { 612 class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> {
613 final _StreamControllerLifecycle<T> _controller; 613 final _StreamControllerLifecycle<T> _controller;
614 614
615 _ControllerSubscription(this._controller, 615 _ControllerSubscription(this._controller,
616 void onData(T data), 616 void onData(T data),
617 void onError(Object error), 617 Function onError,
618 void onDone(), 618 void onDone(),
619 bool cancelOnError) 619 bool cancelOnError)
620 : super(onData, onError, onDone, cancelOnError); 620 : super(onData, onError, onDone, cancelOnError);
621 621
622 void _onCancel() { 622 void _onCancel() {
623 _controller._recordCancel(this); 623 _controller._recordCancel(this);
624 } 624 }
625 625
626 void _onPause() { 626 void _onPause() {
627 _controller._recordPause(this); 627 _controller._recordPause(this);
628 } 628 }
629 629
630 void _onResume() { 630 void _onResume() {
631 _controller._recordResume(this); 631 _controller._recordResume(this);
632 } 632 }
633 } 633 }
634 634
635 635
636 /** A class that exposes only the [StreamSink] interface of an object. */ 636 /** A class that exposes only the [StreamSink] interface of an object. */
637 class _StreamSinkWrapper<T> implements StreamSink<T> { 637 class _StreamSinkWrapper<T> implements StreamSink<T> {
638 final StreamSink _target; 638 final StreamSink _target;
639 _StreamSinkWrapper(this._target); 639 _StreamSinkWrapper(this._target);
640 void add(T data) { _target.add(data); } 640 void add(T data) { _target.add(data); }
641 void addError(Object error) { _target.addError(error); } 641 void addError(Object error, [StackTrace stackTrace]) {
642 _target.addError(error);
643 }
642 Future close() => _target.close(); 644 Future close() => _target.close();
643 Future addStream(Stream<T> source) => _target.addStream(source); 645 Future addStream(Stream<T> source) => _target.addStream(source);
644 Future get done => _target.done; 646 Future get done => _target.done;
645 } 647 }
646 648
647 /** 649 /**
648 * Object containing the state used to handle [StreamController.addStream]. 650 * Object containing the state used to handle [StreamController.addStream].
649 */ 651 */
650 class _AddStreamState<T> { 652 class _AddStreamState<T> {
651 // [_FutureImpl] returned by call to addStream. 653 // [_FutureImpl] returned by call to addStream.
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after
686 var varData; 688 var varData;
687 689
688 _StreamControllerAddStreamState(_StreamController controller, 690 _StreamControllerAddStreamState(_StreamController controller,
689 this.varData, 691 this.varData,
690 Stream source) : super(controller, source) { 692 Stream source) : super(controller, source) {
691 if (controller.isPaused) { 693 if (controller.isPaused) {
692 addSubscription.pause(); 694 addSubscription.pause();
693 } 695 }
694 } 696 }
695 } 697 }
OLDNEW
« no previous file with comments | « sdk/lib/async/stream.dart ('k') | sdk/lib/async/stream_impl.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698