OLD | NEW |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 // States shared by single/multi stream implementations. | 7 // States shared by single/multi stream implementations. |
8 | 8 |
9 // Completion state of the stream. | 9 // Completion state of the stream. |
10 /// Initial and default state where the stream can receive and send events. | 10 /// Initial and default state where the stream can receive and send events. |
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
48 const int _LISTENER_PENDING_UNSUBSCRIBE = 2; | 48 const int _LISTENER_PENDING_UNSUBSCRIBE = 2; |
49 | 49 |
50 /// Bit that contains the last sent event's "id bit". | 50 /// Bit that contains the last sent event's "id bit". |
51 const int _LISTENER_EVENT_ID = 4; | 51 const int _LISTENER_EVENT_ID = 4; |
52 const int _LISTENER_EVENT_ID_SHIFT = 2; | 52 const int _LISTENER_EVENT_ID_SHIFT = 2; |
53 | 53 |
54 /// The count of times a listener has paused is stored in the | 54 /// The count of times a listener has paused is stored in the |
55 /// state, shifted by this amount. | 55 /// state, shifted by this amount. |
56 const int _LISTENER_PAUSE_COUNT_SHIFT = 3; | 56 const int _LISTENER_PAUSE_COUNT_SHIFT = 3; |
57 | 57 |
| 58 /** Throws the given error in the next cycle. */ |
| 59 _throwDelayed(var error, [Object stackTrace]) { |
| 60 // We are going to reach the top-level here, but there might be a global |
| 61 // exception handler. This means that we shouldn't print the stack trace. |
| 62 // TODO(floitsch): Find better solution that doesn't print the stack trace |
| 63 // if there is a global exception handler. |
| 64 runAsync(() { |
| 65 if (stackTrace != null) print(stackTrace); |
| 66 var trace = getAttachedStackTrace(error); |
| 67 if (trace != null && trace != stackTrace) print(trace); |
| 68 throw error; |
| 69 }); |
| 70 } |
| 71 |
58 | 72 |
59 // ------------------------------------------------------------------- | 73 // ------------------------------------------------------------------- |
60 // Common base class for single and multi-subscription streams. | 74 // Common base class for single and multi-subscription streams. |
61 // ------------------------------------------------------------------- | 75 // ------------------------------------------------------------------- |
62 abstract class _StreamImpl<T> extends Stream<T> { | 76 abstract class _StreamImpl<T> extends Stream<T> { |
63 /** Current state of the stream. */ | 77 /** Current state of the stream. */ |
64 int _state = _STREAM_OPEN; | 78 int _state = _STREAM_OPEN; |
65 | 79 |
66 /** | 80 /** |
67 * List of pending events. | 81 * List of pending events. |
68 * | 82 * |
69 * If events are added to the stream (using [_add], [_addError] or [_done]) | 83 * If events are added to the stream (using [_add], [_addError] or [_done]) |
70 * while the stream is paused, or while another event is firing, events will | 84 * while the stream is paused, or while another event is firing, events will |
71 * stored here. | 85 * stored here. |
72 * Also supports scheduling the events for later execution. | 86 * Also supports scheduling the events for later execution. |
73 */ | 87 */ |
74 _PendingEvents _pendingEvents; | 88 _PendingEvents _pendingEvents; |
75 | 89 |
76 // ------------------------------------------------------------------ | 90 // ------------------------------------------------------------------ |
77 // Stream interface. | 91 // Stream interface. |
78 | 92 |
79 StreamSubscription<T> listen(void onData(T data), | 93 StreamSubscription<T> listen(void onData(T data), |
80 { void onError(AsyncError error), | 94 { void onError(error), |
81 void onDone(), | 95 void onDone(), |
82 bool cancelOnError }) { | 96 bool cancelOnError }) { |
83 if (_isComplete) { | 97 if (_isComplete) { |
84 return new _DoneSubscription(onDone); | 98 return new _DoneSubscription(onDone); |
85 } | 99 } |
86 if (onData == null) onData = _nullDataHandler; | 100 if (onData == null) onData = _nullDataHandler; |
87 if (onError == null) onError = _nullErrorHandler; | 101 if (onError == null) onError = _nullErrorHandler; |
88 if (onDone == null) onDone = _nullDoneHandler; | 102 if (onDone == null) onDone = _nullDoneHandler; |
89 cancelOnError = identical(true, cancelOnError); | 103 cancelOnError = identical(true, cancelOnError); |
90 _StreamSubscriptionImpl subscription = | 104 _StreamSubscriptionImpl subscription = |
(...skipping 25 matching lines...) Expand all Loading... |
116 } | 130 } |
117 _handlePendingEvents(); | 131 _handlePendingEvents(); |
118 } | 132 } |
119 | 133 |
120 /** | 134 /** |
121 * Send or enqueue an error event. | 135 * Send or enqueue an error event. |
122 * | 136 * |
123 * If a subscription has requested to be unsubscribed on errors, | 137 * If a subscription has requested to be unsubscribed on errors, |
124 * it will be unsubscribed after receiving this event. | 138 * it will be unsubscribed after receiving this event. |
125 */ | 139 */ |
126 void _addError(AsyncError error) { | 140 void _addError(error) { |
127 if (_isClosed) throw new StateError("Sending on closed stream"); | 141 if (_isClosed) throw new StateError("Sending on closed stream"); |
128 if (!_mayFireState) { | 142 if (!_mayFireState) { |
129 // Not the time to send events. | 143 // Not the time to send events. |
130 _addPendingEvent(new _DelayedError(error)); | 144 _addPendingEvent(new _DelayedError(error)); |
131 return; | 145 return; |
132 } | 146 } |
133 if (_hasPendingEvent) { | 147 if (_hasPendingEvent) { |
134 _addPendingEvent(new _DelayedError(error)); | 148 _addPendingEvent(new _DelayedError(error)); |
135 } else { | 149 } else { |
136 _sendError(error); | 150 _sendError(error); |
(...skipping 230 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
367 | 381 |
368 /** Schedule pending events to be executed. */ | 382 /** Schedule pending events to be executed. */ |
369 void _schedulePendingEvents() { | 383 void _schedulePendingEvents() { |
370 assert(_hasPendingEvent); | 384 assert(_hasPendingEvent); |
371 _pendingEvents.schedule(this); | 385 _pendingEvents.schedule(this); |
372 } | 386 } |
373 | 387 |
374 /** Create a subscription object. Called by [subcribe]. */ | 388 /** Create a subscription object. Called by [subcribe]. */ |
375 _StreamSubscriptionImpl<T> _createSubscription( | 389 _StreamSubscriptionImpl<T> _createSubscription( |
376 void onData(T data), | 390 void onData(T data), |
377 void onError(AsyncError error), | 391 void onError(error), |
378 void onDone(), | 392 void onDone(), |
379 bool cancelOnError); | 393 bool cancelOnError); |
380 | 394 |
381 /** | 395 /** |
382 * Adds a listener to this stream. | 396 * Adds a listener to this stream. |
383 */ | 397 */ |
384 void _addListener(_StreamSubscriptionImpl subscription); | 398 void _addListener(_StreamSubscriptionImpl subscription); |
385 | 399 |
386 /** | 400 /** |
387 * Handle a cancel requested from a [_StreamSubscriptionImpl]. | 401 * Handle a cancel requested from a [_StreamSubscriptionImpl]. |
(...skipping 98 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
486 /** | 500 /** |
487 * Send a data event directly to each subscriber. | 501 * Send a data event directly to each subscriber. |
488 */ | 502 */ |
489 _sendData(T value) { | 503 _sendData(T value) { |
490 assert(!_isPaused); | 504 assert(!_isPaused); |
491 assert(!_isComplete); | 505 assert(!_isComplete); |
492 if (!_hasListener) return; | 506 if (!_hasListener) return; |
493 _forEachSubscriber((subscriber) { | 507 _forEachSubscriber((subscriber) { |
494 try { | 508 try { |
495 subscriber._sendData(value); | 509 subscriber._sendData(value); |
496 } on AsyncError catch (e) { | |
497 e.throwDelayed(); | |
498 } catch (e, s) { | 510 } catch (e, s) { |
499 new AsyncError(e, s).throwDelayed(); | 511 _throwDelayed(e, s); |
500 } | 512 } |
501 }); | 513 }); |
502 } | 514 } |
503 | 515 |
504 /** | 516 /** |
505 * Sends an error event directly to each subscriber. | 517 * Sends an error event directly to each subscriber. |
506 */ | 518 */ |
507 void _sendError(AsyncError error) { | 519 void _sendError(error) { |
508 assert(!_isPaused); | 520 assert(!_isPaused); |
509 assert(!_isComplete); | 521 assert(!_isComplete); |
510 if (!_hasListener) return; | 522 if (!_hasListener) return; |
511 _forEachSubscriber((subscriber) { | 523 _forEachSubscriber((subscriber) { |
512 try { | 524 try { |
513 subscriber._sendError(error); | 525 subscriber._sendError(error); |
514 } on AsyncError catch (e) { | |
515 e.throwDelayed(); | |
516 } catch (e, s) { | 526 } catch (e, s) { |
517 new AsyncError.withCause(e, s, error).throwDelayed(); | 527 _throwDelayed(e, s); |
518 } | 528 } |
519 }); | 529 }); |
520 } | 530 } |
521 | 531 |
522 /** | 532 /** |
523 * Sends the "done" message directly to each subscriber. | 533 * Sends the "done" message directly to each subscriber. |
524 * This automatically stops further subscription and | 534 * This automatically stops further subscription and |
525 * unsubscribes all subscribers. | 535 * unsubscribes all subscribers. |
526 */ | 536 */ |
527 void _sendDone() { | 537 void _sendDone() { |
528 assert(!_isPaused); | 538 assert(!_isPaused); |
529 assert(_isClosed); | 539 assert(_isClosed); |
530 _setComplete(); | 540 _setComplete(); |
531 if (!_hasListener) return; | 541 if (!_hasListener) return; |
532 _forEachSubscriber((subscriber) { | 542 _forEachSubscriber((subscriber) { |
533 _cancel(subscriber); | 543 _cancel(subscriber); |
534 try { | 544 try { |
535 subscriber._sendDone(); | 545 subscriber._sendDone(); |
536 } on AsyncError catch (e) { | |
537 e.throwDelayed(); | |
538 } catch (e, s) { | 546 } catch (e, s) { |
539 new AsyncError(e, s).throwDelayed(); | 547 _throwDelayed(e, s); |
540 } | 548 } |
541 }); | 549 }); |
542 assert(!_hasListener); | 550 assert(!_hasListener); |
543 } | 551 } |
544 } | 552 } |
545 | 553 |
546 // ------------------------------------------------------------------- | 554 // ------------------------------------------------------------------- |
547 // Default implementation of a stream with a single subscriber. | 555 // Default implementation of a stream with a single subscriber. |
548 // ------------------------------------------------------------------- | 556 // ------------------------------------------------------------------- |
549 /** | 557 /** |
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
585 _SingleStreamImpl() { | 593 _SingleStreamImpl() { |
586 // Start out paused. | 594 // Start out paused. |
587 _updatePauseCount(1); | 595 _updatePauseCount(1); |
588 } | 596 } |
589 | 597 |
590 /** | 598 /** |
591 * Create the new subscription object. | 599 * Create the new subscription object. |
592 */ | 600 */ |
593 _StreamSubscriptionImpl<T> _createSubscription( | 601 _StreamSubscriptionImpl<T> _createSubscription( |
594 void onData(T data), | 602 void onData(T data), |
595 void onError(AsyncError error), | 603 void onError(error), |
596 void onDone(), | 604 void onDone(), |
597 bool cancelOnError) { | 605 bool cancelOnError) { |
598 return new _StreamSubscriptionImpl<T>( | 606 return new _StreamSubscriptionImpl<T>( |
599 this, onData, onError, onDone, cancelOnError); | 607 this, onData, onError, onDone, cancelOnError); |
600 } | 608 } |
601 | 609 |
602 void _addListener(_StreamListener subscription) { | 610 void _addListener(_StreamListener subscription) { |
603 assert(!_isComplete); | 611 assert(!_isComplete); |
604 if (_hasListener) { | 612 if (_hasListener) { |
605 throw new StateError("Stream already has subscriber."); | 613 throw new StateError("Stream already has subscriber."); |
(...skipping 99 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
705 // Helper functions that can be overridden in subclasses. | 713 // Helper functions that can be overridden in subclasses. |
706 | 714 |
707 /** Whether there are currently any subscribers on this [Stream]. */ | 715 /** Whether there are currently any subscribers on this [Stream]. */ |
708 bool get _hasListener => !_InternalLinkList.isEmpty(this); | 716 bool get _hasListener => !_InternalLinkList.isEmpty(this); |
709 | 717 |
710 /** | 718 /** |
711 * Create the new subscription object. | 719 * Create the new subscription object. |
712 */ | 720 */ |
713 _StreamListener<T> _createSubscription( | 721 _StreamListener<T> _createSubscription( |
714 void onData(T data), | 722 void onData(T data), |
715 void onError(AsyncError error), | 723 void onError(error), |
716 void onDone(), | 724 void onDone(), |
717 bool cancelOnError) { | 725 bool cancelOnError) { |
718 return new _StreamSubscriptionImpl<T>( | 726 return new _StreamSubscriptionImpl<T>( |
719 this, onData, onError, onDone, cancelOnError); | 727 this, onData, onError, onDone, cancelOnError); |
720 } | 728 } |
721 | 729 |
722 // ------------------------------------------------------------------- | 730 // ------------------------------------------------------------------- |
723 // Internal implementation. | 731 // Internal implementation. |
724 | 732 |
725 /** | 733 /** |
(...skipping 108 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
834 */ | 842 */ |
835 _GeneratedSingleStreamImpl(_PendingEvents events) { | 843 _GeneratedSingleStreamImpl(_PendingEvents events) { |
836 _pendingEvents = events; | 844 _pendingEvents = events; |
837 _setClosed(); // Closed for input since all events are already pending. | 845 _setClosed(); // Closed for input since all events are already pending. |
838 } | 846 } |
839 | 847 |
840 void _add(T value) { | 848 void _add(T value) { |
841 throw new UnsupportedError("Cannot inject events into generated stream"); | 849 throw new UnsupportedError("Cannot inject events into generated stream"); |
842 } | 850 } |
843 | 851 |
844 void _addError(AsyncError value) { | 852 void _addError(value) { |
845 throw new UnsupportedError("Cannot inject events into generated stream"); | 853 throw new UnsupportedError("Cannot inject events into generated stream"); |
846 } | 854 } |
847 | 855 |
848 void _close() { | 856 void _close() { |
849 throw new UnsupportedError("Cannot inject events into generated stream"); | 857 throw new UnsupportedError("Cannot inject events into generated stream"); |
850 } | 858 } |
851 } | 859 } |
852 | 860 |
853 | 861 |
854 /** Pending events object that gets its events from an [Iterable]. */ | 862 /** Pending events object that gets its events from an [Iterable]. */ |
(...skipping 14 matching lines...) Expand all Loading... |
869 void handleNext(_StreamImpl<T> stream) { | 877 void handleNext(_StreamImpl<T> stream) { |
870 if (_isDone) throw new StateError("No events pending."); | 878 if (_isDone) throw new StateError("No events pending."); |
871 try { | 879 try { |
872 _isDone = !_iterator.moveNext(); | 880 _isDone = !_iterator.moveNext(); |
873 if (!_isDone) { | 881 if (!_isDone) { |
874 stream._sendData(_iterator.current); | 882 stream._sendData(_iterator.current); |
875 } else { | 883 } else { |
876 stream._sendDone(); | 884 stream._sendDone(); |
877 } | 885 } |
878 } catch (e, s) { | 886 } catch (e, s) { |
879 stream._sendError(new AsyncError(e, s)); | 887 stream._sendError(_asyncError(e, s)); |
880 stream._sendDone(); | 888 stream._sendDone(); |
881 _isDone = true; | 889 _isDone = true; |
882 } | 890 } |
883 } | 891 } |
884 } | 892 } |
885 | 893 |
886 | 894 |
887 /** | 895 /** |
888 * The subscription class that the [StreamController] uses. | 896 * The subscription class that the [StreamController] uses. |
889 * | 897 * |
(...skipping 21 matching lines...) Expand all Loading... |
911 this._onData, | 919 this._onData, |
912 this._onError, | 920 this._onError, |
913 this._onDone, | 921 this._onDone, |
914 this._cancelOnError) : super(source); | 922 this._cancelOnError) : super(source); |
915 | 923 |
916 void onData(void handleData(T event)) { | 924 void onData(void handleData(T event)) { |
917 if (handleData == null) handleData = _nullDataHandler; | 925 if (handleData == null) handleData = _nullDataHandler; |
918 _onData = handleData; | 926 _onData = handleData; |
919 } | 927 } |
920 | 928 |
921 void onError(void handleError(AsyncError error)) { | 929 void onError(void handleError(error)) { |
922 if (handleError == null) handleError = _nullErrorHandler; | 930 if (handleError == null) handleError = _nullErrorHandler; |
923 _onError = handleError; | 931 _onError = handleError; |
924 } | 932 } |
925 | 933 |
926 void onDone(void handleDone()) { | 934 void onDone(void handleDone()) { |
927 if (handleDone == null) handleDone = _nullDoneHandler; | 935 if (handleDone == null) handleDone = _nullDoneHandler; |
928 _onDone = handleDone; | 936 _onDone = handleDone; |
929 } | 937 } |
930 | 938 |
931 void _sendData(T data) { | 939 void _sendData(T data) { |
932 _onData(data); | 940 _onData(data); |
933 } | 941 } |
934 | 942 |
935 void _sendError(AsyncError error) { | 943 void _sendError(error) { |
936 _onError(error); | 944 _onError(error); |
937 if (_cancelOnError) _source._cancel(this); | 945 if (_cancelOnError) _source._cancel(this); |
938 } | 946 } |
939 | 947 |
940 void _sendDone() { | 948 void _sendDone() { |
941 _onDone(); | 949 _onDone(); |
942 } | 950 } |
943 | 951 |
944 void cancel() { | 952 void cancel() { |
945 if (!_isSubscribed) return; | 953 if (!_isSubscribed) return; |
946 _source._cancel(this); | 954 _source._cancel(this); |
947 } | 955 } |
948 | 956 |
949 void pause([Future resumeSignal]) { | 957 void pause([Future resumeSignal]) { |
950 if (!_isSubscribed) return; | 958 if (!_isSubscribed) return; |
951 _source._pause(this, resumeSignal); | 959 _source._pause(this, resumeSignal); |
952 } | 960 } |
953 | 961 |
954 void resume() { | 962 void resume() { |
955 if (!_isSubscribed || !isPaused) return; | 963 if (!_isSubscribed || !isPaused) return; |
956 _source._resume(this, false); | 964 _source._resume(this, false); |
957 } | 965 } |
958 | 966 |
959 Future asFuture([var futureValue]) { | 967 Future asFuture([var futureValue]) { |
960 _FutureImpl<T> result = new _FutureImpl<T>(); | 968 _FutureImpl<T> result = new _FutureImpl<T>(); |
961 | 969 |
962 // Overwrite the onDone and onError handlers. | 970 // Overwrite the onDone and onError handlers. |
963 onDone(() { result._setValue(futureValue); }); | 971 onDone(() { result._setValue(futureValue); }); |
964 onError((AsyncError error) { | 972 onError((error) { |
965 cancel(); | 973 cancel(); |
966 result._setError(error); | 974 result._setError(error); |
967 }); | 975 }); |
968 | 976 |
969 return result; | 977 return result; |
970 } | 978 } |
971 } | 979 } |
972 | 980 |
973 // Internal helpers. | 981 // Internal helpers. |
974 | 982 |
975 // Types of the different handlers on a stream. Types used to type fields. | 983 // Types of the different handlers on a stream. Types used to type fields. |
976 typedef void _DataHandler<T>(T value); | 984 typedef void _DataHandler<T>(T value); |
977 typedef void _ErrorHandler(AsyncError error); | 985 typedef void _ErrorHandler(error); |
978 typedef void _DoneHandler(); | 986 typedef void _DoneHandler(); |
979 | 987 |
980 | 988 |
981 /** Default data handler, does nothing. */ | 989 /** Default data handler, does nothing. */ |
982 void _nullDataHandler(var value) {} | 990 void _nullDataHandler(var value) {} |
983 | 991 |
984 /** Default error handler, reports the error to the global handler. */ | 992 /** Default error handler, reports the error to the global handler. */ |
985 void _nullErrorHandler(AsyncError error) { | 993 void _nullErrorHandler(error) { |
986 error.throwDelayed(); | 994 _throwDelayed(error); |
987 } | 995 } |
988 | 996 |
989 /** Default done handler, does nothing. */ | 997 /** Default done handler, does nothing. */ |
990 void _nullDoneHandler() {} | 998 void _nullDoneHandler() {} |
991 | 999 |
992 | 1000 |
993 /** A delayed event on a stream implementation. */ | 1001 /** A delayed event on a stream implementation. */ |
994 abstract class _DelayedEvent { | 1002 abstract class _DelayedEvent { |
995 /** Added as a linked list on the [StreamController]. */ | 1003 /** Added as a linked list on the [StreamController]. */ |
996 _DelayedEvent next; | 1004 _DelayedEvent next; |
997 /** Execute the delayed event on the [StreamController]. */ | 1005 /** Execute the delayed event on the [StreamController]. */ |
998 void perform(_StreamImpl stream); | 1006 void perform(_StreamImpl stream); |
999 } | 1007 } |
1000 | 1008 |
1001 /** A delayed data event. */ | 1009 /** A delayed data event. */ |
1002 class _DelayedData<T> extends _DelayedEvent{ | 1010 class _DelayedData<T> extends _DelayedEvent{ |
1003 T value; | 1011 final T value; |
1004 _DelayedData(this.value); | 1012 _DelayedData(this.value); |
1005 void perform(_StreamImpl<T> stream) { | 1013 void perform(_StreamImpl<T> stream) { |
1006 stream._sendData(value); | 1014 stream._sendData(value); |
1007 } | 1015 } |
1008 } | 1016 } |
1009 | 1017 |
1010 /** A delayed error event. */ | 1018 /** A delayed error event. */ |
1011 class _DelayedError extends _DelayedEvent { | 1019 class _DelayedError extends _DelayedEvent { |
1012 AsyncError error; | 1020 final error; |
1013 _DelayedError(this.error); | 1021 _DelayedError(this.error); |
1014 void perform(_StreamImpl stream) { | 1022 void perform(_StreamImpl stream) { |
1015 stream._sendError(error); | 1023 stream._sendError(error); |
1016 } | 1024 } |
1017 } | 1025 } |
1018 | 1026 |
1019 /** A delayed done event. */ | 1027 /** A delayed done event. */ |
1020 class _DelayedDone implements _DelayedEvent { | 1028 class _DelayedDone implements _DelayedEvent { |
1021 const _DelayedDone(); | 1029 const _DelayedDone(); |
1022 void perform(_StreamImpl stream) { | 1030 void perform(_StreamImpl stream) { |
(...skipping 83 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1106 list._previousLink = otherLast; | 1114 list._previousLink = otherLast; |
1107 otherLast._nextLink = list; | 1115 otherLast._nextLink = list; |
1108 // Clean up [other]. | 1116 // Clean up [other]. |
1109 other._nextLink = other._previousLink = other; | 1117 other._nextLink = other._previousLink = other; |
1110 } | 1118 } |
1111 } | 1119 } |
1112 | 1120 |
1113 /** Abstract type for an internal interface for sending events. */ | 1121 /** Abstract type for an internal interface for sending events. */ |
1114 abstract class _EventOutputSink<T> { | 1122 abstract class _EventOutputSink<T> { |
1115 _sendData(T data); | 1123 _sendData(T data); |
1116 _sendError(AsyncError error); | 1124 _sendError(error); |
1117 _sendDone(); | 1125 _sendDone(); |
1118 } | 1126 } |
1119 | 1127 |
1120 abstract class _StreamListener<T> extends _InternalLink | 1128 abstract class _StreamListener<T> extends _InternalLink |
1121 implements _EventOutputSink<T> { | 1129 implements _EventOutputSink<T> { |
1122 final _StreamImpl _source; | 1130 final _StreamImpl _source; |
1123 int _state = _LISTENER_UNSUBSCRIBED; | 1131 int _state = _LISTENER_UNSUBSCRIBED; |
1124 | 1132 |
1125 _StreamListener(this._source); | 1133 _StreamListener(this._source); |
1126 | 1134 |
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1181 void _incrementPauseCount() { | 1189 void _incrementPauseCount() { |
1182 _state += 1 << _LISTENER_PAUSE_COUNT_SHIFT; | 1190 _state += 1 << _LISTENER_PAUSE_COUNT_SHIFT; |
1183 } | 1191 } |
1184 | 1192 |
1185 void _decrementPauseCount() { | 1193 void _decrementPauseCount() { |
1186 assert(isPaused); | 1194 assert(isPaused); |
1187 _state -= 1 << _LISTENER_PAUSE_COUNT_SHIFT; | 1195 _state -= 1 << _LISTENER_PAUSE_COUNT_SHIFT; |
1188 } | 1196 } |
1189 | 1197 |
1190 _sendData(T data); | 1198 _sendData(T data); |
1191 _sendError(AsyncError error); | 1199 _sendError(error); |
1192 _sendDone(); | 1200 _sendDone(); |
1193 } | 1201 } |
1194 | 1202 |
1195 /** Superclass for provider of pending events. */ | 1203 /** Superclass for provider of pending events. */ |
1196 abstract class _PendingEvents { | 1204 abstract class _PendingEvents { |
1197 /** | 1205 /** |
1198 * Timer set when pending events are scheduled for execution. | 1206 * Timer set when pending events are scheduled for execution. |
1199 * | 1207 * |
1200 * When scheduling pending events for execution in a later cycle, the timer | 1208 * When scheduling pending events for execution in a later cycle, the timer |
1201 * is stored here. If pending events are executed earlier than that, e.g., | 1209 * is stored here. If pending events are executed earlier than that, e.g., |
(...skipping 67 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1269 assert(_timer == null && _pauseCount == 0); | 1277 assert(_timer == null && _pauseCount == 0); |
1270 _timer = new Timer(Duration.ZERO, () { | 1278 _timer = new Timer(Duration.ZERO, () { |
1271 if (_handler != null) _handler(); | 1279 if (_handler != null) _handler(); |
1272 }); | 1280 }); |
1273 } | 1281 } |
1274 | 1282 |
1275 bool get _isComplete => _timer == null && _pauseCount == 0; | 1283 bool get _isComplete => _timer == null && _pauseCount == 0; |
1276 | 1284 |
1277 void onData(void handleAction(T value)) {} | 1285 void onData(void handleAction(T value)) {} |
1278 | 1286 |
1279 void onError(void handleError(AsyncError error)) {} | 1287 void onError(void handleError(error)) {} |
1280 | 1288 |
1281 void onDone(void handleDone()) { | 1289 void onDone(void handleDone()) { |
1282 _handler = handleDone; | 1290 _handler = handleDone; |
1283 } | 1291 } |
1284 | 1292 |
1285 void pause([Future signal]) { | 1293 void pause([Future signal]) { |
1286 if (_isComplete) return; | 1294 if (_isComplete) return; |
1287 if (_timer != null) { | 1295 if (_timer != null) { |
1288 _timer.cancel(); | 1296 _timer.cancel(); |
1289 _timer = null; | 1297 _timer = null; |
(...skipping 21 matching lines...) Expand all Loading... |
1311 } | 1319 } |
1312 _pauseCount = 0; | 1320 _pauseCount = 0; |
1313 } | 1321 } |
1314 | 1322 |
1315 Future asFuture([var futureValue]) { | 1323 Future asFuture([var futureValue]) { |
1316 // TODO(floitsch): share more code. | 1324 // TODO(floitsch): share more code. |
1317 _FutureImpl<T> result = new _FutureImpl<T>(); | 1325 _FutureImpl<T> result = new _FutureImpl<T>(); |
1318 | 1326 |
1319 // Overwrite the onDone and onError handlers. | 1327 // Overwrite the onDone and onError handlers. |
1320 onDone(() { result._setValue(futureValue); }); | 1328 onDone(() { result._setValue(futureValue); }); |
1321 onError((AsyncError error) { | 1329 onError((error) { |
1322 cancel(); | 1330 cancel(); |
1323 result._setError(error); | 1331 result._setError(error); |
1324 }); | 1332 }); |
1325 | 1333 |
1326 return result; | 1334 return result; |
1327 } | 1335 } |
1328 } | 1336 } |
1329 | 1337 |
1330 class _SingleStreamMultiplexer<T> extends _MultiStreamImpl<T> { | 1338 class _SingleStreamMultiplexer<T> extends _MultiStreamImpl<T> { |
1331 final Stream<T> _source; | 1339 final Stream<T> _source; |
(...skipping 24 matching lines...) Expand all Loading... |
1356 onError: this._addError, | 1364 onError: this._addError, |
1357 onDone: this._close); | 1365 onDone: this._close); |
1358 } else { | 1366 } else { |
1359 // TODO(lrn): Check why this can happen. | 1367 // TODO(lrn): Check why this can happen. |
1360 if (_subscription == null) return; | 1368 if (_subscription == null) return; |
1361 _subscription.cancel(); | 1369 _subscription.cancel(); |
1362 _subscription = null; | 1370 _subscription = null; |
1363 } | 1371 } |
1364 } | 1372 } |
1365 } | 1373 } |
OLD | NEW |