Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(24)

Side by Side Diff: sdk/lib/async/stream_impl.dart

Issue 14251006: Remove AsyncError with Expando. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Address comments. Created 7 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « sdk/lib/async/stream_controller.dart ('k') | sdk/lib/async/stream_pipe.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // States shared by single/multi stream implementations. 7 // States shared by single/multi stream implementations.
8 8
9 // Completion state of the stream. 9 // Completion state of the stream.
10 /// Initial and default state where the stream can receive and send events. 10 /// Initial and default state where the stream can receive and send events.
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after
48 const int _LISTENER_PENDING_UNSUBSCRIBE = 2; 48 const int _LISTENER_PENDING_UNSUBSCRIBE = 2;
49 49
50 /// Bit that contains the last sent event's "id bit". 50 /// Bit that contains the last sent event's "id bit".
51 const int _LISTENER_EVENT_ID = 4; 51 const int _LISTENER_EVENT_ID = 4;
52 const int _LISTENER_EVENT_ID_SHIFT = 2; 52 const int _LISTENER_EVENT_ID_SHIFT = 2;
53 53
54 /// The count of times a listener has paused is stored in the 54 /// The count of times a listener has paused is stored in the
55 /// state, shifted by this amount. 55 /// state, shifted by this amount.
56 const int _LISTENER_PAUSE_COUNT_SHIFT = 3; 56 const int _LISTENER_PAUSE_COUNT_SHIFT = 3;
57 57
58 /** Throws the given error in the next cycle. */
59 _throwDelayed(var error, [Object stackTrace]) {
60 // We are going to reach the top-level here, but there might be a global
61 // exception handler. This means that we shouldn't print the stack trace.
62 // TODO(floitsch): Find better solution that doesn't print the stack trace
63 // if there is a global exception handler.
64 runAsync(() {
65 if (stackTrace != null) print(stackTrace);
66 var trace = getAttachedStackTrace(error);
67 if (trace != null && trace != stackTrace) print(trace);
68 throw error;
69 });
70 }
71
58 72
59 // ------------------------------------------------------------------- 73 // -------------------------------------------------------------------
60 // Common base class for single and multi-subscription streams. 74 // Common base class for single and multi-subscription streams.
61 // ------------------------------------------------------------------- 75 // -------------------------------------------------------------------
62 abstract class _StreamImpl<T> extends Stream<T> { 76 abstract class _StreamImpl<T> extends Stream<T> {
63 /** Current state of the stream. */ 77 /** Current state of the stream. */
64 int _state = _STREAM_OPEN; 78 int _state = _STREAM_OPEN;
65 79
66 /** 80 /**
67 * List of pending events. 81 * List of pending events.
68 * 82 *
69 * If events are added to the stream (using [_add], [_addError] or [_done]) 83 * If events are added to the stream (using [_add], [_addError] or [_done])
70 * while the stream is paused, or while another event is firing, events will 84 * while the stream is paused, or while another event is firing, events will
71 * stored here. 85 * stored here.
72 * Also supports scheduling the events for later execution. 86 * Also supports scheduling the events for later execution.
73 */ 87 */
74 _PendingEvents _pendingEvents; 88 _PendingEvents _pendingEvents;
75 89
76 // ------------------------------------------------------------------ 90 // ------------------------------------------------------------------
77 // Stream interface. 91 // Stream interface.
78 92
79 StreamSubscription<T> listen(void onData(T data), 93 StreamSubscription<T> listen(void onData(T data),
80 { void onError(AsyncError error), 94 { void onError(error),
81 void onDone(), 95 void onDone(),
82 bool cancelOnError }) { 96 bool cancelOnError }) {
83 if (_isComplete) { 97 if (_isComplete) {
84 return new _DoneSubscription(onDone); 98 return new _DoneSubscription(onDone);
85 } 99 }
86 if (onData == null) onData = _nullDataHandler; 100 if (onData == null) onData = _nullDataHandler;
87 if (onError == null) onError = _nullErrorHandler; 101 if (onError == null) onError = _nullErrorHandler;
88 if (onDone == null) onDone = _nullDoneHandler; 102 if (onDone == null) onDone = _nullDoneHandler;
89 cancelOnError = identical(true, cancelOnError); 103 cancelOnError = identical(true, cancelOnError);
90 _StreamSubscriptionImpl subscription = 104 _StreamSubscriptionImpl subscription =
(...skipping 25 matching lines...) Expand all
116 } 130 }
117 _handlePendingEvents(); 131 _handlePendingEvents();
118 } 132 }
119 133
120 /** 134 /**
121 * Send or enqueue an error event. 135 * Send or enqueue an error event.
122 * 136 *
123 * If a subscription has requested to be unsubscribed on errors, 137 * If a subscription has requested to be unsubscribed on errors,
124 * it will be unsubscribed after receiving this event. 138 * it will be unsubscribed after receiving this event.
125 */ 139 */
126 void _addError(AsyncError error) { 140 void _addError(error) {
127 if (_isClosed) throw new StateError("Sending on closed stream"); 141 if (_isClosed) throw new StateError("Sending on closed stream");
128 if (!_mayFireState) { 142 if (!_mayFireState) {
129 // Not the time to send events. 143 // Not the time to send events.
130 _addPendingEvent(new _DelayedError(error)); 144 _addPendingEvent(new _DelayedError(error));
131 return; 145 return;
132 } 146 }
133 if (_hasPendingEvent) { 147 if (_hasPendingEvent) {
134 _addPendingEvent(new _DelayedError(error)); 148 _addPendingEvent(new _DelayedError(error));
135 } else { 149 } else {
136 _sendError(error); 150 _sendError(error);
(...skipping 230 matching lines...) Expand 10 before | Expand all | Expand 10 after
367 381
368 /** Schedule pending events to be executed. */ 382 /** Schedule pending events to be executed. */
369 void _schedulePendingEvents() { 383 void _schedulePendingEvents() {
370 assert(_hasPendingEvent); 384 assert(_hasPendingEvent);
371 _pendingEvents.schedule(this); 385 _pendingEvents.schedule(this);
372 } 386 }
373 387
374 /** Create a subscription object. Called by [subcribe]. */ 388 /** Create a subscription object. Called by [subcribe]. */
375 _StreamSubscriptionImpl<T> _createSubscription( 389 _StreamSubscriptionImpl<T> _createSubscription(
376 void onData(T data), 390 void onData(T data),
377 void onError(AsyncError error), 391 void onError(error),
378 void onDone(), 392 void onDone(),
379 bool cancelOnError); 393 bool cancelOnError);
380 394
381 /** 395 /**
382 * Adds a listener to this stream. 396 * Adds a listener to this stream.
383 */ 397 */
384 void _addListener(_StreamSubscriptionImpl subscription); 398 void _addListener(_StreamSubscriptionImpl subscription);
385 399
386 /** 400 /**
387 * Handle a cancel requested from a [_StreamSubscriptionImpl]. 401 * Handle a cancel requested from a [_StreamSubscriptionImpl].
(...skipping 98 matching lines...) Expand 10 before | Expand all | Expand 10 after
486 /** 500 /**
487 * Send a data event directly to each subscriber. 501 * Send a data event directly to each subscriber.
488 */ 502 */
489 _sendData(T value) { 503 _sendData(T value) {
490 assert(!_isPaused); 504 assert(!_isPaused);
491 assert(!_isComplete); 505 assert(!_isComplete);
492 if (!_hasListener) return; 506 if (!_hasListener) return;
493 _forEachSubscriber((subscriber) { 507 _forEachSubscriber((subscriber) {
494 try { 508 try {
495 subscriber._sendData(value); 509 subscriber._sendData(value);
496 } on AsyncError catch (e) {
497 e.throwDelayed();
498 } catch (e, s) { 510 } catch (e, s) {
499 new AsyncError(e, s).throwDelayed(); 511 _throwDelayed(e, s);
500 } 512 }
501 }); 513 });
502 } 514 }
503 515
504 /** 516 /**
505 * Sends an error event directly to each subscriber. 517 * Sends an error event directly to each subscriber.
506 */ 518 */
507 void _sendError(AsyncError error) { 519 void _sendError(error) {
508 assert(!_isPaused); 520 assert(!_isPaused);
509 assert(!_isComplete); 521 assert(!_isComplete);
510 if (!_hasListener) return; 522 if (!_hasListener) return;
511 _forEachSubscriber((subscriber) { 523 _forEachSubscriber((subscriber) {
512 try { 524 try {
513 subscriber._sendError(error); 525 subscriber._sendError(error);
514 } on AsyncError catch (e) {
515 e.throwDelayed();
516 } catch (e, s) { 526 } catch (e, s) {
517 new AsyncError.withCause(e, s, error).throwDelayed(); 527 _throwDelayed(e, s);
518 } 528 }
519 }); 529 });
520 } 530 }
521 531
522 /** 532 /**
523 * Sends the "done" message directly to each subscriber. 533 * Sends the "done" message directly to each subscriber.
524 * This automatically stops further subscription and 534 * This automatically stops further subscription and
525 * unsubscribes all subscribers. 535 * unsubscribes all subscribers.
526 */ 536 */
527 void _sendDone() { 537 void _sendDone() {
528 assert(!_isPaused); 538 assert(!_isPaused);
529 assert(_isClosed); 539 assert(_isClosed);
530 _setComplete(); 540 _setComplete();
531 if (!_hasListener) return; 541 if (!_hasListener) return;
532 _forEachSubscriber((subscriber) { 542 _forEachSubscriber((subscriber) {
533 _cancel(subscriber); 543 _cancel(subscriber);
534 try { 544 try {
535 subscriber._sendDone(); 545 subscriber._sendDone();
536 } on AsyncError catch (e) {
537 e.throwDelayed();
538 } catch (e, s) { 546 } catch (e, s) {
539 new AsyncError(e, s).throwDelayed(); 547 _throwDelayed(e, s);
540 } 548 }
541 }); 549 });
542 assert(!_hasListener); 550 assert(!_hasListener);
543 } 551 }
544 } 552 }
545 553
546 // ------------------------------------------------------------------- 554 // -------------------------------------------------------------------
547 // Default implementation of a stream with a single subscriber. 555 // Default implementation of a stream with a single subscriber.
548 // ------------------------------------------------------------------- 556 // -------------------------------------------------------------------
549 /** 557 /**
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after
585 _SingleStreamImpl() { 593 _SingleStreamImpl() {
586 // Start out paused. 594 // Start out paused.
587 _updatePauseCount(1); 595 _updatePauseCount(1);
588 } 596 }
589 597
590 /** 598 /**
591 * Create the new subscription object. 599 * Create the new subscription object.
592 */ 600 */
593 _StreamSubscriptionImpl<T> _createSubscription( 601 _StreamSubscriptionImpl<T> _createSubscription(
594 void onData(T data), 602 void onData(T data),
595 void onError(AsyncError error), 603 void onError(error),
596 void onDone(), 604 void onDone(),
597 bool cancelOnError) { 605 bool cancelOnError) {
598 return new _StreamSubscriptionImpl<T>( 606 return new _StreamSubscriptionImpl<T>(
599 this, onData, onError, onDone, cancelOnError); 607 this, onData, onError, onDone, cancelOnError);
600 } 608 }
601 609
602 void _addListener(_StreamListener subscription) { 610 void _addListener(_StreamListener subscription) {
603 assert(!_isComplete); 611 assert(!_isComplete);
604 if (_hasListener) { 612 if (_hasListener) {
605 throw new StateError("Stream already has subscriber."); 613 throw new StateError("Stream already has subscriber.");
(...skipping 99 matching lines...) Expand 10 before | Expand all | Expand 10 after
705 // Helper functions that can be overridden in subclasses. 713 // Helper functions that can be overridden in subclasses.
706 714
707 /** Whether there are currently any subscribers on this [Stream]. */ 715 /** Whether there are currently any subscribers on this [Stream]. */
708 bool get _hasListener => !_InternalLinkList.isEmpty(this); 716 bool get _hasListener => !_InternalLinkList.isEmpty(this);
709 717
710 /** 718 /**
711 * Create the new subscription object. 719 * Create the new subscription object.
712 */ 720 */
713 _StreamListener<T> _createSubscription( 721 _StreamListener<T> _createSubscription(
714 void onData(T data), 722 void onData(T data),
715 void onError(AsyncError error), 723 void onError(error),
716 void onDone(), 724 void onDone(),
717 bool cancelOnError) { 725 bool cancelOnError) {
718 return new _StreamSubscriptionImpl<T>( 726 return new _StreamSubscriptionImpl<T>(
719 this, onData, onError, onDone, cancelOnError); 727 this, onData, onError, onDone, cancelOnError);
720 } 728 }
721 729
722 // ------------------------------------------------------------------- 730 // -------------------------------------------------------------------
723 // Internal implementation. 731 // Internal implementation.
724 732
725 /** 733 /**
(...skipping 108 matching lines...) Expand 10 before | Expand all | Expand 10 after
834 */ 842 */
835 _GeneratedSingleStreamImpl(_PendingEvents events) { 843 _GeneratedSingleStreamImpl(_PendingEvents events) {
836 _pendingEvents = events; 844 _pendingEvents = events;
837 _setClosed(); // Closed for input since all events are already pending. 845 _setClosed(); // Closed for input since all events are already pending.
838 } 846 }
839 847
840 void _add(T value) { 848 void _add(T value) {
841 throw new UnsupportedError("Cannot inject events into generated stream"); 849 throw new UnsupportedError("Cannot inject events into generated stream");
842 } 850 }
843 851
844 void _addError(AsyncError value) { 852 void _addError(value) {
845 throw new UnsupportedError("Cannot inject events into generated stream"); 853 throw new UnsupportedError("Cannot inject events into generated stream");
846 } 854 }
847 855
848 void _close() { 856 void _close() {
849 throw new UnsupportedError("Cannot inject events into generated stream"); 857 throw new UnsupportedError("Cannot inject events into generated stream");
850 } 858 }
851 } 859 }
852 860
853 861
854 /** Pending events object that gets its events from an [Iterable]. */ 862 /** Pending events object that gets its events from an [Iterable]. */
(...skipping 14 matching lines...) Expand all
869 void handleNext(_StreamImpl<T> stream) { 877 void handleNext(_StreamImpl<T> stream) {
870 if (_isDone) throw new StateError("No events pending."); 878 if (_isDone) throw new StateError("No events pending.");
871 try { 879 try {
872 _isDone = !_iterator.moveNext(); 880 _isDone = !_iterator.moveNext();
873 if (!_isDone) { 881 if (!_isDone) {
874 stream._sendData(_iterator.current); 882 stream._sendData(_iterator.current);
875 } else { 883 } else {
876 stream._sendDone(); 884 stream._sendDone();
877 } 885 }
878 } catch (e, s) { 886 } catch (e, s) {
879 stream._sendError(new AsyncError(e, s)); 887 stream._sendError(_asyncError(e, s));
880 stream._sendDone(); 888 stream._sendDone();
881 _isDone = true; 889 _isDone = true;
882 } 890 }
883 } 891 }
884 } 892 }
885 893
886 894
887 /** 895 /**
888 * The subscription class that the [StreamController] uses. 896 * The subscription class that the [StreamController] uses.
889 * 897 *
(...skipping 21 matching lines...) Expand all
911 this._onData, 919 this._onData,
912 this._onError, 920 this._onError,
913 this._onDone, 921 this._onDone,
914 this._cancelOnError) : super(source); 922 this._cancelOnError) : super(source);
915 923
916 void onData(void handleData(T event)) { 924 void onData(void handleData(T event)) {
917 if (handleData == null) handleData = _nullDataHandler; 925 if (handleData == null) handleData = _nullDataHandler;
918 _onData = handleData; 926 _onData = handleData;
919 } 927 }
920 928
921 void onError(void handleError(AsyncError error)) { 929 void onError(void handleError(error)) {
922 if (handleError == null) handleError = _nullErrorHandler; 930 if (handleError == null) handleError = _nullErrorHandler;
923 _onError = handleError; 931 _onError = handleError;
924 } 932 }
925 933
926 void onDone(void handleDone()) { 934 void onDone(void handleDone()) {
927 if (handleDone == null) handleDone = _nullDoneHandler; 935 if (handleDone == null) handleDone = _nullDoneHandler;
928 _onDone = handleDone; 936 _onDone = handleDone;
929 } 937 }
930 938
931 void _sendData(T data) { 939 void _sendData(T data) {
932 _onData(data); 940 _onData(data);
933 } 941 }
934 942
935 void _sendError(AsyncError error) { 943 void _sendError(error) {
936 _onError(error); 944 _onError(error);
937 if (_cancelOnError) _source._cancel(this); 945 if (_cancelOnError) _source._cancel(this);
938 } 946 }
939 947
940 void _sendDone() { 948 void _sendDone() {
941 _onDone(); 949 _onDone();
942 } 950 }
943 951
944 void cancel() { 952 void cancel() {
945 if (!_isSubscribed) return; 953 if (!_isSubscribed) return;
946 _source._cancel(this); 954 _source._cancel(this);
947 } 955 }
948 956
949 void pause([Future resumeSignal]) { 957 void pause([Future resumeSignal]) {
950 if (!_isSubscribed) return; 958 if (!_isSubscribed) return;
951 _source._pause(this, resumeSignal); 959 _source._pause(this, resumeSignal);
952 } 960 }
953 961
954 void resume() { 962 void resume() {
955 if (!_isSubscribed || !isPaused) return; 963 if (!_isSubscribed || !isPaused) return;
956 _source._resume(this, false); 964 _source._resume(this, false);
957 } 965 }
958 966
959 Future asFuture([var futureValue]) { 967 Future asFuture([var futureValue]) {
960 _FutureImpl<T> result = new _FutureImpl<T>(); 968 _FutureImpl<T> result = new _FutureImpl<T>();
961 969
962 // Overwrite the onDone and onError handlers. 970 // Overwrite the onDone and onError handlers.
963 onDone(() { result._setValue(futureValue); }); 971 onDone(() { result._setValue(futureValue); });
964 onError((AsyncError error) { 972 onError((error) {
965 cancel(); 973 cancel();
966 result._setError(error); 974 result._setError(error);
967 }); 975 });
968 976
969 return result; 977 return result;
970 } 978 }
971 } 979 }
972 980
973 // Internal helpers. 981 // Internal helpers.
974 982
975 // Types of the different handlers on a stream. Types used to type fields. 983 // Types of the different handlers on a stream. Types used to type fields.
976 typedef void _DataHandler<T>(T value); 984 typedef void _DataHandler<T>(T value);
977 typedef void _ErrorHandler(AsyncError error); 985 typedef void _ErrorHandler(error);
978 typedef void _DoneHandler(); 986 typedef void _DoneHandler();
979 987
980 988
981 /** Default data handler, does nothing. */ 989 /** Default data handler, does nothing. */
982 void _nullDataHandler(var value) {} 990 void _nullDataHandler(var value) {}
983 991
984 /** Default error handler, reports the error to the global handler. */ 992 /** Default error handler, reports the error to the global handler. */
985 void _nullErrorHandler(AsyncError error) { 993 void _nullErrorHandler(error) {
986 error.throwDelayed(); 994 _throwDelayed(error);
987 } 995 }
988 996
989 /** Default done handler, does nothing. */ 997 /** Default done handler, does nothing. */
990 void _nullDoneHandler() {} 998 void _nullDoneHandler() {}
991 999
992 1000
993 /** A delayed event on a stream implementation. */ 1001 /** A delayed event on a stream implementation. */
994 abstract class _DelayedEvent { 1002 abstract class _DelayedEvent {
995 /** Added as a linked list on the [StreamController]. */ 1003 /** Added as a linked list on the [StreamController]. */
996 _DelayedEvent next; 1004 _DelayedEvent next;
997 /** Execute the delayed event on the [StreamController]. */ 1005 /** Execute the delayed event on the [StreamController]. */
998 void perform(_StreamImpl stream); 1006 void perform(_StreamImpl stream);
999 } 1007 }
1000 1008
1001 /** A delayed data event. */ 1009 /** A delayed data event. */
1002 class _DelayedData<T> extends _DelayedEvent{ 1010 class _DelayedData<T> extends _DelayedEvent{
1003 T value; 1011 final T value;
1004 _DelayedData(this.value); 1012 _DelayedData(this.value);
1005 void perform(_StreamImpl<T> stream) { 1013 void perform(_StreamImpl<T> stream) {
1006 stream._sendData(value); 1014 stream._sendData(value);
1007 } 1015 }
1008 } 1016 }
1009 1017
1010 /** A delayed error event. */ 1018 /** A delayed error event. */
1011 class _DelayedError extends _DelayedEvent { 1019 class _DelayedError extends _DelayedEvent {
1012 AsyncError error; 1020 final error;
1013 _DelayedError(this.error); 1021 _DelayedError(this.error);
1014 void perform(_StreamImpl stream) { 1022 void perform(_StreamImpl stream) {
1015 stream._sendError(error); 1023 stream._sendError(error);
1016 } 1024 }
1017 } 1025 }
1018 1026
1019 /** A delayed done event. */ 1027 /** A delayed done event. */
1020 class _DelayedDone implements _DelayedEvent { 1028 class _DelayedDone implements _DelayedEvent {
1021 const _DelayedDone(); 1029 const _DelayedDone();
1022 void perform(_StreamImpl stream) { 1030 void perform(_StreamImpl stream) {
(...skipping 83 matching lines...) Expand 10 before | Expand all | Expand 10 after
1106 list._previousLink = otherLast; 1114 list._previousLink = otherLast;
1107 otherLast._nextLink = list; 1115 otherLast._nextLink = list;
1108 // Clean up [other]. 1116 // Clean up [other].
1109 other._nextLink = other._previousLink = other; 1117 other._nextLink = other._previousLink = other;
1110 } 1118 }
1111 } 1119 }
1112 1120
1113 /** Abstract type for an internal interface for sending events. */ 1121 /** Abstract type for an internal interface for sending events. */
1114 abstract class _EventOutputSink<T> { 1122 abstract class _EventOutputSink<T> {
1115 _sendData(T data); 1123 _sendData(T data);
1116 _sendError(AsyncError error); 1124 _sendError(error);
1117 _sendDone(); 1125 _sendDone();
1118 } 1126 }
1119 1127
1120 abstract class _StreamListener<T> extends _InternalLink 1128 abstract class _StreamListener<T> extends _InternalLink
1121 implements _EventOutputSink<T> { 1129 implements _EventOutputSink<T> {
1122 final _StreamImpl _source; 1130 final _StreamImpl _source;
1123 int _state = _LISTENER_UNSUBSCRIBED; 1131 int _state = _LISTENER_UNSUBSCRIBED;
1124 1132
1125 _StreamListener(this._source); 1133 _StreamListener(this._source);
1126 1134
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after
1181 void _incrementPauseCount() { 1189 void _incrementPauseCount() {
1182 _state += 1 << _LISTENER_PAUSE_COUNT_SHIFT; 1190 _state += 1 << _LISTENER_PAUSE_COUNT_SHIFT;
1183 } 1191 }
1184 1192
1185 void _decrementPauseCount() { 1193 void _decrementPauseCount() {
1186 assert(isPaused); 1194 assert(isPaused);
1187 _state -= 1 << _LISTENER_PAUSE_COUNT_SHIFT; 1195 _state -= 1 << _LISTENER_PAUSE_COUNT_SHIFT;
1188 } 1196 }
1189 1197
1190 _sendData(T data); 1198 _sendData(T data);
1191 _sendError(AsyncError error); 1199 _sendError(error);
1192 _sendDone(); 1200 _sendDone();
1193 } 1201 }
1194 1202
1195 /** Superclass for provider of pending events. */ 1203 /** Superclass for provider of pending events. */
1196 abstract class _PendingEvents { 1204 abstract class _PendingEvents {
1197 /** 1205 /**
1198 * Timer set when pending events are scheduled for execution. 1206 * Timer set when pending events are scheduled for execution.
1199 * 1207 *
1200 * When scheduling pending events for execution in a later cycle, the timer 1208 * When scheduling pending events for execution in a later cycle, the timer
1201 * is stored here. If pending events are executed earlier than that, e.g., 1209 * is stored here. If pending events are executed earlier than that, e.g.,
(...skipping 67 matching lines...) Expand 10 before | Expand all | Expand 10 after
1269 assert(_timer == null && _pauseCount == 0); 1277 assert(_timer == null && _pauseCount == 0);
1270 _timer = new Timer(Duration.ZERO, () { 1278 _timer = new Timer(Duration.ZERO, () {
1271 if (_handler != null) _handler(); 1279 if (_handler != null) _handler();
1272 }); 1280 });
1273 } 1281 }
1274 1282
1275 bool get _isComplete => _timer == null && _pauseCount == 0; 1283 bool get _isComplete => _timer == null && _pauseCount == 0;
1276 1284
1277 void onData(void handleAction(T value)) {} 1285 void onData(void handleAction(T value)) {}
1278 1286
1279 void onError(void handleError(AsyncError error)) {} 1287 void onError(void handleError(error)) {}
1280 1288
1281 void onDone(void handleDone()) { 1289 void onDone(void handleDone()) {
1282 _handler = handleDone; 1290 _handler = handleDone;
1283 } 1291 }
1284 1292
1285 void pause([Future signal]) { 1293 void pause([Future signal]) {
1286 if (_isComplete) return; 1294 if (_isComplete) return;
1287 if (_timer != null) { 1295 if (_timer != null) {
1288 _timer.cancel(); 1296 _timer.cancel();
1289 _timer = null; 1297 _timer = null;
(...skipping 21 matching lines...) Expand all
1311 } 1319 }
1312 _pauseCount = 0; 1320 _pauseCount = 0;
1313 } 1321 }
1314 1322
1315 Future asFuture([var futureValue]) { 1323 Future asFuture([var futureValue]) {
1316 // TODO(floitsch): share more code. 1324 // TODO(floitsch): share more code.
1317 _FutureImpl<T> result = new _FutureImpl<T>(); 1325 _FutureImpl<T> result = new _FutureImpl<T>();
1318 1326
1319 // Overwrite the onDone and onError handlers. 1327 // Overwrite the onDone and onError handlers.
1320 onDone(() { result._setValue(futureValue); }); 1328 onDone(() { result._setValue(futureValue); });
1321 onError((AsyncError error) { 1329 onError((error) {
1322 cancel(); 1330 cancel();
1323 result._setError(error); 1331 result._setError(error);
1324 }); 1332 });
1325 1333
1326 return result; 1334 return result;
1327 } 1335 }
1328 } 1336 }
1329 1337
1330 class _SingleStreamMultiplexer<T> extends _MultiStreamImpl<T> { 1338 class _SingleStreamMultiplexer<T> extends _MultiStreamImpl<T> {
1331 final Stream<T> _source; 1339 final Stream<T> _source;
(...skipping 24 matching lines...) Expand all
1356 onError: this._addError, 1364 onError: this._addError,
1357 onDone: this._close); 1365 onDone: this._close);
1358 } else { 1366 } else {
1359 // TODO(lrn): Check why this can happen. 1367 // TODO(lrn): Check why this can happen.
1360 if (_subscription == null) return; 1368 if (_subscription == null) return;
1361 _subscription.cancel(); 1369 _subscription.cancel();
1362 _subscription = null; 1370 _subscription = null;
1363 } 1371 }
1364 } 1372 }
1365 } 1373 }
OLDNEW
« no previous file with comments | « sdk/lib/async/stream_controller.dart ('k') | sdk/lib/async/stream_pipe.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698