| OLD | NEW |
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 // States shared by single/multi stream implementations. | 7 // States shared by single/multi stream implementations. |
| 8 | 8 |
| 9 // Completion state of the stream. | 9 // Completion state of the stream. |
| 10 /// Initial and default state where the stream can receive and send events. | 10 /// Initial and default state where the stream can receive and send events. |
| (...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 48 const int _LISTENER_PENDING_UNSUBSCRIBE = 2; | 48 const int _LISTENER_PENDING_UNSUBSCRIBE = 2; |
| 49 | 49 |
| 50 /// Bit that contains the last sent event's "id bit". | 50 /// Bit that contains the last sent event's "id bit". |
| 51 const int _LISTENER_EVENT_ID = 4; | 51 const int _LISTENER_EVENT_ID = 4; |
| 52 const int _LISTENER_EVENT_ID_SHIFT = 2; | 52 const int _LISTENER_EVENT_ID_SHIFT = 2; |
| 53 | 53 |
| 54 /// The count of times a listener has paused is stored in the | 54 /// The count of times a listener has paused is stored in the |
| 55 /// state, shifted by this amount. | 55 /// state, shifted by this amount. |
| 56 const int _LISTENER_PAUSE_COUNT_SHIFT = 3; | 56 const int _LISTENER_PAUSE_COUNT_SHIFT = 3; |
| 57 | 57 |
| 58 /** Throws the given error in the next cycle. */ |
| 59 _throwDelayed(var error, [Object stackTrace]) { |
| 60 // We are going to reach the top-level here, but there might be a global |
| 61 // exception handler. This means that we shouldn't print the stack trace. |
| 62 // TODO(floitsch): Find better solution that doesn't print the stack trace |
| 63 // if there is a global exception handler. |
| 64 runAsync(() { |
| 65 if (stackTrace != null) print(stackTrace); |
| 66 var trace = getAttachedStackTrace(error); |
| 67 if (trace != null && trace != stackTrace) print(trace); |
| 68 throw error; |
| 69 }); |
| 70 } |
| 71 |
| 58 | 72 |
| 59 // ------------------------------------------------------------------- | 73 // ------------------------------------------------------------------- |
| 60 // Common base class for single and multi-subscription streams. | 74 // Common base class for single and multi-subscription streams. |
| 61 // ------------------------------------------------------------------- | 75 // ------------------------------------------------------------------- |
| 62 abstract class _StreamImpl<T> extends Stream<T> { | 76 abstract class _StreamImpl<T> extends Stream<T> { |
| 63 /** Current state of the stream. */ | 77 /** Current state of the stream. */ |
| 64 int _state = _STREAM_OPEN; | 78 int _state = _STREAM_OPEN; |
| 65 | 79 |
| 66 /** | 80 /** |
| 67 * List of pending events. | 81 * List of pending events. |
| 68 * | 82 * |
| 69 * If events are added to the stream (using [_add], [_addError] or [_done]) | 83 * If events are added to the stream (using [_add], [_addError] or [_done]) |
| 70 * while the stream is paused, or while another event is firing, events will | 84 * while the stream is paused, or while another event is firing, events will |
| 71 * stored here. | 85 * stored here. |
| 72 * Also supports scheduling the events for later execution. | 86 * Also supports scheduling the events for later execution. |
| 73 */ | 87 */ |
| 74 _PendingEvents _pendingEvents; | 88 _PendingEvents _pendingEvents; |
| 75 | 89 |
| 76 // ------------------------------------------------------------------ | 90 // ------------------------------------------------------------------ |
| 77 // Stream interface. | 91 // Stream interface. |
| 78 | 92 |
| 79 StreamSubscription<T> listen(void onData(T data), | 93 StreamSubscription<T> listen(void onData(T data), |
| 80 { void onError(AsyncError error), | 94 { void onError(error), |
| 81 void onDone(), | 95 void onDone(), |
| 82 bool cancelOnError }) { | 96 bool cancelOnError }) { |
| 83 if (_isComplete) { | 97 if (_isComplete) { |
| 84 return new _DoneSubscription(onDone); | 98 return new _DoneSubscription(onDone); |
| 85 } | 99 } |
| 86 if (onData == null) onData = _nullDataHandler; | 100 if (onData == null) onData = _nullDataHandler; |
| 87 if (onError == null) onError = _nullErrorHandler; | 101 if (onError == null) onError = _nullErrorHandler; |
| 88 if (onDone == null) onDone = _nullDoneHandler; | 102 if (onDone == null) onDone = _nullDoneHandler; |
| 89 cancelOnError = identical(true, cancelOnError); | 103 cancelOnError = identical(true, cancelOnError); |
| 90 _StreamSubscriptionImpl subscription = | 104 _StreamSubscriptionImpl subscription = |
| (...skipping 25 matching lines...) Expand all Loading... |
| 116 } | 130 } |
| 117 _handlePendingEvents(); | 131 _handlePendingEvents(); |
| 118 } | 132 } |
| 119 | 133 |
| 120 /** | 134 /** |
| 121 * Send or enqueue an error event. | 135 * Send or enqueue an error event. |
| 122 * | 136 * |
| 123 * If a subscription has requested to be unsubscribed on errors, | 137 * If a subscription has requested to be unsubscribed on errors, |
| 124 * it will be unsubscribed after receiving this event. | 138 * it will be unsubscribed after receiving this event. |
| 125 */ | 139 */ |
| 126 void _addError(AsyncError error) { | 140 void _addError(error) { |
| 127 if (_isClosed) throw new StateError("Sending on closed stream"); | 141 if (_isClosed) throw new StateError("Sending on closed stream"); |
| 128 if (!_mayFireState) { | 142 if (!_mayFireState) { |
| 129 // Not the time to send events. | 143 // Not the time to send events. |
| 130 _addPendingEvent(new _DelayedError(error)); | 144 _addPendingEvent(new _DelayedError(error)); |
| 131 return; | 145 return; |
| 132 } | 146 } |
| 133 if (_hasPendingEvent) { | 147 if (_hasPendingEvent) { |
| 134 _addPendingEvent(new _DelayedError(error)); | 148 _addPendingEvent(new _DelayedError(error)); |
| 135 } else { | 149 } else { |
| 136 _sendError(error); | 150 _sendError(error); |
| (...skipping 230 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 367 | 381 |
| 368 /** Schedule pending events to be executed. */ | 382 /** Schedule pending events to be executed. */ |
| 369 void _schedulePendingEvents() { | 383 void _schedulePendingEvents() { |
| 370 assert(_hasPendingEvent); | 384 assert(_hasPendingEvent); |
| 371 _pendingEvents.schedule(this); | 385 _pendingEvents.schedule(this); |
| 372 } | 386 } |
| 373 | 387 |
| 374 /** Create a subscription object. Called by [subcribe]. */ | 388 /** Create a subscription object. Called by [subcribe]. */ |
| 375 _StreamSubscriptionImpl<T> _createSubscription( | 389 _StreamSubscriptionImpl<T> _createSubscription( |
| 376 void onData(T data), | 390 void onData(T data), |
| 377 void onError(AsyncError error), | 391 void onError(error), |
| 378 void onDone(), | 392 void onDone(), |
| 379 bool cancelOnError); | 393 bool cancelOnError); |
| 380 | 394 |
| 381 /** | 395 /** |
| 382 * Adds a listener to this stream. | 396 * Adds a listener to this stream. |
| 383 */ | 397 */ |
| 384 void _addListener(_StreamSubscriptionImpl subscription); | 398 void _addListener(_StreamSubscriptionImpl subscription); |
| 385 | 399 |
| 386 /** | 400 /** |
| 387 * Handle a cancel requested from a [_StreamSubscriptionImpl]. | 401 * Handle a cancel requested from a [_StreamSubscriptionImpl]. |
| (...skipping 98 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 486 /** | 500 /** |
| 487 * Send a data event directly to each subscriber. | 501 * Send a data event directly to each subscriber. |
| 488 */ | 502 */ |
| 489 _sendData(T value) { | 503 _sendData(T value) { |
| 490 assert(!_isPaused); | 504 assert(!_isPaused); |
| 491 assert(!_isComplete); | 505 assert(!_isComplete); |
| 492 if (!_hasListener) return; | 506 if (!_hasListener) return; |
| 493 _forEachSubscriber((subscriber) { | 507 _forEachSubscriber((subscriber) { |
| 494 try { | 508 try { |
| 495 subscriber._sendData(value); | 509 subscriber._sendData(value); |
| 496 } on AsyncError catch (e) { | |
| 497 e.throwDelayed(); | |
| 498 } catch (e, s) { | 510 } catch (e, s) { |
| 499 new AsyncError(e, s).throwDelayed(); | 511 _throwDelayed(e, s); |
| 500 } | 512 } |
| 501 }); | 513 }); |
| 502 } | 514 } |
| 503 | 515 |
| 504 /** | 516 /** |
| 505 * Sends an error event directly to each subscriber. | 517 * Sends an error event directly to each subscriber. |
| 506 */ | 518 */ |
| 507 void _sendError(AsyncError error) { | 519 void _sendError(error) { |
| 508 assert(!_isPaused); | 520 assert(!_isPaused); |
| 509 assert(!_isComplete); | 521 assert(!_isComplete); |
| 510 if (!_hasListener) return; | 522 if (!_hasListener) return; |
| 511 _forEachSubscriber((subscriber) { | 523 _forEachSubscriber((subscriber) { |
| 512 try { | 524 try { |
| 513 subscriber._sendError(error); | 525 subscriber._sendError(error); |
| 514 } on AsyncError catch (e) { | |
| 515 e.throwDelayed(); | |
| 516 } catch (e, s) { | 526 } catch (e, s) { |
| 517 new AsyncError.withCause(e, s, error).throwDelayed(); | 527 _throwDelayed(e, s); |
| 518 } | 528 } |
| 519 }); | 529 }); |
| 520 } | 530 } |
| 521 | 531 |
| 522 /** | 532 /** |
| 523 * Sends the "done" message directly to each subscriber. | 533 * Sends the "done" message directly to each subscriber. |
| 524 * This automatically stops further subscription and | 534 * This automatically stops further subscription and |
| 525 * unsubscribes all subscribers. | 535 * unsubscribes all subscribers. |
| 526 */ | 536 */ |
| 527 void _sendDone() { | 537 void _sendDone() { |
| 528 assert(!_isPaused); | 538 assert(!_isPaused); |
| 529 assert(_isClosed); | 539 assert(_isClosed); |
| 530 _setComplete(); | 540 _setComplete(); |
| 531 if (!_hasListener) return; | 541 if (!_hasListener) return; |
| 532 _forEachSubscriber((subscriber) { | 542 _forEachSubscriber((subscriber) { |
| 533 _cancel(subscriber); | 543 _cancel(subscriber); |
| 534 try { | 544 try { |
| 535 subscriber._sendDone(); | 545 subscriber._sendDone(); |
| 536 } on AsyncError catch (e) { | |
| 537 e.throwDelayed(); | |
| 538 } catch (e, s) { | 546 } catch (e, s) { |
| 539 new AsyncError(e, s).throwDelayed(); | 547 _throwDelayed(e, s); |
| 540 } | 548 } |
| 541 }); | 549 }); |
| 542 assert(!_hasListener); | 550 assert(!_hasListener); |
| 543 } | 551 } |
| 544 } | 552 } |
| 545 | 553 |
| 546 // ------------------------------------------------------------------- | 554 // ------------------------------------------------------------------- |
| 547 // Default implementation of a stream with a single subscriber. | 555 // Default implementation of a stream with a single subscriber. |
| 548 // ------------------------------------------------------------------- | 556 // ------------------------------------------------------------------- |
| 549 /** | 557 /** |
| (...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 585 _SingleStreamImpl() { | 593 _SingleStreamImpl() { |
| 586 // Start out paused. | 594 // Start out paused. |
| 587 _updatePauseCount(1); | 595 _updatePauseCount(1); |
| 588 } | 596 } |
| 589 | 597 |
| 590 /** | 598 /** |
| 591 * Create the new subscription object. | 599 * Create the new subscription object. |
| 592 */ | 600 */ |
| 593 _StreamSubscriptionImpl<T> _createSubscription( | 601 _StreamSubscriptionImpl<T> _createSubscription( |
| 594 void onData(T data), | 602 void onData(T data), |
| 595 void onError(AsyncError error), | 603 void onError(error), |
| 596 void onDone(), | 604 void onDone(), |
| 597 bool cancelOnError) { | 605 bool cancelOnError) { |
| 598 return new _StreamSubscriptionImpl<T>( | 606 return new _StreamSubscriptionImpl<T>( |
| 599 this, onData, onError, onDone, cancelOnError); | 607 this, onData, onError, onDone, cancelOnError); |
| 600 } | 608 } |
| 601 | 609 |
| 602 void _addListener(_StreamListener subscription) { | 610 void _addListener(_StreamListener subscription) { |
| 603 assert(!_isComplete); | 611 assert(!_isComplete); |
| 604 if (_hasListener) { | 612 if (_hasListener) { |
| 605 throw new StateError("Stream already has subscriber."); | 613 throw new StateError("Stream already has subscriber."); |
| (...skipping 99 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 705 // Helper functions that can be overridden in subclasses. | 713 // Helper functions that can be overridden in subclasses. |
| 706 | 714 |
| 707 /** Whether there are currently any subscribers on this [Stream]. */ | 715 /** Whether there are currently any subscribers on this [Stream]. */ |
| 708 bool get _hasListener => !_InternalLinkList.isEmpty(this); | 716 bool get _hasListener => !_InternalLinkList.isEmpty(this); |
| 709 | 717 |
| 710 /** | 718 /** |
| 711 * Create the new subscription object. | 719 * Create the new subscription object. |
| 712 */ | 720 */ |
| 713 _StreamListener<T> _createSubscription( | 721 _StreamListener<T> _createSubscription( |
| 714 void onData(T data), | 722 void onData(T data), |
| 715 void onError(AsyncError error), | 723 void onError(error), |
| 716 void onDone(), | 724 void onDone(), |
| 717 bool cancelOnError) { | 725 bool cancelOnError) { |
| 718 return new _StreamSubscriptionImpl<T>( | 726 return new _StreamSubscriptionImpl<T>( |
| 719 this, onData, onError, onDone, cancelOnError); | 727 this, onData, onError, onDone, cancelOnError); |
| 720 } | 728 } |
| 721 | 729 |
| 722 // ------------------------------------------------------------------- | 730 // ------------------------------------------------------------------- |
| 723 // Internal implementation. | 731 // Internal implementation. |
| 724 | 732 |
| 725 /** | 733 /** |
| (...skipping 108 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 834 */ | 842 */ |
| 835 _GeneratedSingleStreamImpl(_PendingEvents events) { | 843 _GeneratedSingleStreamImpl(_PendingEvents events) { |
| 836 _pendingEvents = events; | 844 _pendingEvents = events; |
| 837 _setClosed(); // Closed for input since all events are already pending. | 845 _setClosed(); // Closed for input since all events are already pending. |
| 838 } | 846 } |
| 839 | 847 |
| 840 void _add(T value) { | 848 void _add(T value) { |
| 841 throw new UnsupportedError("Cannot inject events into generated stream"); | 849 throw new UnsupportedError("Cannot inject events into generated stream"); |
| 842 } | 850 } |
| 843 | 851 |
| 844 void _addError(AsyncError value) { | 852 void _addError(value) { |
| 845 throw new UnsupportedError("Cannot inject events into generated stream"); | 853 throw new UnsupportedError("Cannot inject events into generated stream"); |
| 846 } | 854 } |
| 847 | 855 |
| 848 void _close() { | 856 void _close() { |
| 849 throw new UnsupportedError("Cannot inject events into generated stream"); | 857 throw new UnsupportedError("Cannot inject events into generated stream"); |
| 850 } | 858 } |
| 851 } | 859 } |
| 852 | 860 |
| 853 | 861 |
| 854 /** Pending events object that gets its events from an [Iterable]. */ | 862 /** Pending events object that gets its events from an [Iterable]. */ |
| (...skipping 14 matching lines...) Expand all Loading... |
| 869 void handleNext(_StreamImpl<T> stream) { | 877 void handleNext(_StreamImpl<T> stream) { |
| 870 if (_isDone) throw new StateError("No events pending."); | 878 if (_isDone) throw new StateError("No events pending."); |
| 871 try { | 879 try { |
| 872 _isDone = !_iterator.moveNext(); | 880 _isDone = !_iterator.moveNext(); |
| 873 if (!_isDone) { | 881 if (!_isDone) { |
| 874 stream._sendData(_iterator.current); | 882 stream._sendData(_iterator.current); |
| 875 } else { | 883 } else { |
| 876 stream._sendDone(); | 884 stream._sendDone(); |
| 877 } | 885 } |
| 878 } catch (e, s) { | 886 } catch (e, s) { |
| 879 stream._sendError(new AsyncError(e, s)); | 887 stream._sendError(_asyncError(e, s)); |
| 880 stream._sendDone(); | 888 stream._sendDone(); |
| 881 _isDone = true; | 889 _isDone = true; |
| 882 } | 890 } |
| 883 } | 891 } |
| 884 } | 892 } |
| 885 | 893 |
| 886 | 894 |
| 887 /** | 895 /** |
| 888 * The subscription class that the [StreamController] uses. | 896 * The subscription class that the [StreamController] uses. |
| 889 * | 897 * |
| (...skipping 21 matching lines...) Expand all Loading... |
| 911 this._onData, | 919 this._onData, |
| 912 this._onError, | 920 this._onError, |
| 913 this._onDone, | 921 this._onDone, |
| 914 this._cancelOnError) : super(source); | 922 this._cancelOnError) : super(source); |
| 915 | 923 |
| 916 void onData(void handleData(T event)) { | 924 void onData(void handleData(T event)) { |
| 917 if (handleData == null) handleData = _nullDataHandler; | 925 if (handleData == null) handleData = _nullDataHandler; |
| 918 _onData = handleData; | 926 _onData = handleData; |
| 919 } | 927 } |
| 920 | 928 |
| 921 void onError(void handleError(AsyncError error)) { | 929 void onError(void handleError(error)) { |
| 922 if (handleError == null) handleError = _nullErrorHandler; | 930 if (handleError == null) handleError = _nullErrorHandler; |
| 923 _onError = handleError; | 931 _onError = handleError; |
| 924 } | 932 } |
| 925 | 933 |
| 926 void onDone(void handleDone()) { | 934 void onDone(void handleDone()) { |
| 927 if (handleDone == null) handleDone = _nullDoneHandler; | 935 if (handleDone == null) handleDone = _nullDoneHandler; |
| 928 _onDone = handleDone; | 936 _onDone = handleDone; |
| 929 } | 937 } |
| 930 | 938 |
| 931 void _sendData(T data) { | 939 void _sendData(T data) { |
| 932 _onData(data); | 940 _onData(data); |
| 933 } | 941 } |
| 934 | 942 |
| 935 void _sendError(AsyncError error) { | 943 void _sendError(error) { |
| 936 _onError(error); | 944 _onError(error); |
| 937 if (_cancelOnError) _source._cancel(this); | 945 if (_cancelOnError) _source._cancel(this); |
| 938 } | 946 } |
| 939 | 947 |
| 940 void _sendDone() { | 948 void _sendDone() { |
| 941 _onDone(); | 949 _onDone(); |
| 942 } | 950 } |
| 943 | 951 |
| 944 void cancel() { | 952 void cancel() { |
| 945 if (!_isSubscribed) return; | 953 if (!_isSubscribed) return; |
| 946 _source._cancel(this); | 954 _source._cancel(this); |
| 947 } | 955 } |
| 948 | 956 |
| 949 void pause([Future resumeSignal]) { | 957 void pause([Future resumeSignal]) { |
| 950 if (!_isSubscribed) return; | 958 if (!_isSubscribed) return; |
| 951 _source._pause(this, resumeSignal); | 959 _source._pause(this, resumeSignal); |
| 952 } | 960 } |
| 953 | 961 |
| 954 void resume() { | 962 void resume() { |
| 955 if (!_isSubscribed || !isPaused) return; | 963 if (!_isSubscribed || !isPaused) return; |
| 956 _source._resume(this, false); | 964 _source._resume(this, false); |
| 957 } | 965 } |
| 958 | 966 |
| 959 Future asFuture([var futureValue]) { | 967 Future asFuture([var futureValue]) { |
| 960 _FutureImpl<T> result = new _FutureImpl<T>(); | 968 _FutureImpl<T> result = new _FutureImpl<T>(); |
| 961 | 969 |
| 962 // Overwrite the onDone and onError handlers. | 970 // Overwrite the onDone and onError handlers. |
| 963 onDone(() { result._setValue(futureValue); }); | 971 onDone(() { result._setValue(futureValue); }); |
| 964 onError((AsyncError error) { | 972 onError((error) { |
| 965 cancel(); | 973 cancel(); |
| 966 result._setError(error); | 974 result._setError(error); |
| 967 }); | 975 }); |
| 968 | 976 |
| 969 return result; | 977 return result; |
| 970 } | 978 } |
| 971 } | 979 } |
| 972 | 980 |
| 973 // Internal helpers. | 981 // Internal helpers. |
| 974 | 982 |
| 975 // Types of the different handlers on a stream. Types used to type fields. | 983 // Types of the different handlers on a stream. Types used to type fields. |
| 976 typedef void _DataHandler<T>(T value); | 984 typedef void _DataHandler<T>(T value); |
| 977 typedef void _ErrorHandler(AsyncError error); | 985 typedef void _ErrorHandler(error); |
| 978 typedef void _DoneHandler(); | 986 typedef void _DoneHandler(); |
| 979 | 987 |
| 980 | 988 |
| 981 /** Default data handler, does nothing. */ | 989 /** Default data handler, does nothing. */ |
| 982 void _nullDataHandler(var value) {} | 990 void _nullDataHandler(var value) {} |
| 983 | 991 |
| 984 /** Default error handler, reports the error to the global handler. */ | 992 /** Default error handler, reports the error to the global handler. */ |
| 985 void _nullErrorHandler(AsyncError error) { | 993 void _nullErrorHandler(error) { |
| 986 error.throwDelayed(); | 994 _throwDelayed(error); |
| 987 } | 995 } |
| 988 | 996 |
| 989 /** Default done handler, does nothing. */ | 997 /** Default done handler, does nothing. */ |
| 990 void _nullDoneHandler() {} | 998 void _nullDoneHandler() {} |
| 991 | 999 |
| 992 | 1000 |
| 993 /** A delayed event on a stream implementation. */ | 1001 /** A delayed event on a stream implementation. */ |
| 994 abstract class _DelayedEvent { | 1002 abstract class _DelayedEvent { |
| 995 /** Added as a linked list on the [StreamController]. */ | 1003 /** Added as a linked list on the [StreamController]. */ |
| 996 _DelayedEvent next; | 1004 _DelayedEvent next; |
| 997 /** Execute the delayed event on the [StreamController]. */ | 1005 /** Execute the delayed event on the [StreamController]. */ |
| 998 void perform(_StreamImpl stream); | 1006 void perform(_StreamImpl stream); |
| 999 } | 1007 } |
| 1000 | 1008 |
| 1001 /** A delayed data event. */ | 1009 /** A delayed data event. */ |
| 1002 class _DelayedData<T> extends _DelayedEvent{ | 1010 class _DelayedData<T> extends _DelayedEvent{ |
| 1003 T value; | 1011 final T value; |
| 1004 _DelayedData(this.value); | 1012 _DelayedData(this.value); |
| 1005 void perform(_StreamImpl<T> stream) { | 1013 void perform(_StreamImpl<T> stream) { |
| 1006 stream._sendData(value); | 1014 stream._sendData(value); |
| 1007 } | 1015 } |
| 1008 } | 1016 } |
| 1009 | 1017 |
| 1010 /** A delayed error event. */ | 1018 /** A delayed error event. */ |
| 1011 class _DelayedError extends _DelayedEvent { | 1019 class _DelayedError extends _DelayedEvent { |
| 1012 AsyncError error; | 1020 final error; |
| 1013 _DelayedError(this.error); | 1021 _DelayedError(this.error); |
| 1014 void perform(_StreamImpl stream) { | 1022 void perform(_StreamImpl stream) { |
| 1015 stream._sendError(error); | 1023 stream._sendError(error); |
| 1016 } | 1024 } |
| 1017 } | 1025 } |
| 1018 | 1026 |
| 1019 /** A delayed done event. */ | 1027 /** A delayed done event. */ |
| 1020 class _DelayedDone implements _DelayedEvent { | 1028 class _DelayedDone implements _DelayedEvent { |
| 1021 const _DelayedDone(); | 1029 const _DelayedDone(); |
| 1022 void perform(_StreamImpl stream) { | 1030 void perform(_StreamImpl stream) { |
| (...skipping 83 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1106 list._previousLink = otherLast; | 1114 list._previousLink = otherLast; |
| 1107 otherLast._nextLink = list; | 1115 otherLast._nextLink = list; |
| 1108 // Clean up [other]. | 1116 // Clean up [other]. |
| 1109 other._nextLink = other._previousLink = other; | 1117 other._nextLink = other._previousLink = other; |
| 1110 } | 1118 } |
| 1111 } | 1119 } |
| 1112 | 1120 |
| 1113 /** Abstract type for an internal interface for sending events. */ | 1121 /** Abstract type for an internal interface for sending events. */ |
| 1114 abstract class _EventOutputSink<T> { | 1122 abstract class _EventOutputSink<T> { |
| 1115 _sendData(T data); | 1123 _sendData(T data); |
| 1116 _sendError(AsyncError error); | 1124 _sendError(error); |
| 1117 _sendDone(); | 1125 _sendDone(); |
| 1118 } | 1126 } |
| 1119 | 1127 |
| 1120 abstract class _StreamListener<T> extends _InternalLink | 1128 abstract class _StreamListener<T> extends _InternalLink |
| 1121 implements _EventOutputSink<T> { | 1129 implements _EventOutputSink<T> { |
| 1122 final _StreamImpl _source; | 1130 final _StreamImpl _source; |
| 1123 int _state = _LISTENER_UNSUBSCRIBED; | 1131 int _state = _LISTENER_UNSUBSCRIBED; |
| 1124 | 1132 |
| 1125 _StreamListener(this._source); | 1133 _StreamListener(this._source); |
| 1126 | 1134 |
| (...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1181 void _incrementPauseCount() { | 1189 void _incrementPauseCount() { |
| 1182 _state += 1 << _LISTENER_PAUSE_COUNT_SHIFT; | 1190 _state += 1 << _LISTENER_PAUSE_COUNT_SHIFT; |
| 1183 } | 1191 } |
| 1184 | 1192 |
| 1185 void _decrementPauseCount() { | 1193 void _decrementPauseCount() { |
| 1186 assert(isPaused); | 1194 assert(isPaused); |
| 1187 _state -= 1 << _LISTENER_PAUSE_COUNT_SHIFT; | 1195 _state -= 1 << _LISTENER_PAUSE_COUNT_SHIFT; |
| 1188 } | 1196 } |
| 1189 | 1197 |
| 1190 _sendData(T data); | 1198 _sendData(T data); |
| 1191 _sendError(AsyncError error); | 1199 _sendError(error); |
| 1192 _sendDone(); | 1200 _sendDone(); |
| 1193 } | 1201 } |
| 1194 | 1202 |
| 1195 /** Superclass for provider of pending events. */ | 1203 /** Superclass for provider of pending events. */ |
| 1196 abstract class _PendingEvents { | 1204 abstract class _PendingEvents { |
| 1197 /** | 1205 /** |
| 1198 * Timer set when pending events are scheduled for execution. | 1206 * Timer set when pending events are scheduled for execution. |
| 1199 * | 1207 * |
| 1200 * When scheduling pending events for execution in a later cycle, the timer | 1208 * When scheduling pending events for execution in a later cycle, the timer |
| 1201 * is stored here. If pending events are executed earlier than that, e.g., | 1209 * is stored here. If pending events are executed earlier than that, e.g., |
| (...skipping 67 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1269 assert(_timer == null && _pauseCount == 0); | 1277 assert(_timer == null && _pauseCount == 0); |
| 1270 _timer = new Timer(Duration.ZERO, () { | 1278 _timer = new Timer(Duration.ZERO, () { |
| 1271 if (_handler != null) _handler(); | 1279 if (_handler != null) _handler(); |
| 1272 }); | 1280 }); |
| 1273 } | 1281 } |
| 1274 | 1282 |
| 1275 bool get _isComplete => _timer == null && _pauseCount == 0; | 1283 bool get _isComplete => _timer == null && _pauseCount == 0; |
| 1276 | 1284 |
| 1277 void onData(void handleAction(T value)) {} | 1285 void onData(void handleAction(T value)) {} |
| 1278 | 1286 |
| 1279 void onError(void handleError(AsyncError error)) {} | 1287 void onError(void handleError(error)) {} |
| 1280 | 1288 |
| 1281 void onDone(void handleDone()) { | 1289 void onDone(void handleDone()) { |
| 1282 _handler = handleDone; | 1290 _handler = handleDone; |
| 1283 } | 1291 } |
| 1284 | 1292 |
| 1285 void pause([Future signal]) { | 1293 void pause([Future signal]) { |
| 1286 if (_isComplete) return; | 1294 if (_isComplete) return; |
| 1287 if (_timer != null) { | 1295 if (_timer != null) { |
| 1288 _timer.cancel(); | 1296 _timer.cancel(); |
| 1289 _timer = null; | 1297 _timer = null; |
| (...skipping 21 matching lines...) Expand all Loading... |
| 1311 } | 1319 } |
| 1312 _pauseCount = 0; | 1320 _pauseCount = 0; |
| 1313 } | 1321 } |
| 1314 | 1322 |
| 1315 Future asFuture([var futureValue]) { | 1323 Future asFuture([var futureValue]) { |
| 1316 // TODO(floitsch): share more code. | 1324 // TODO(floitsch): share more code. |
| 1317 _FutureImpl<T> result = new _FutureImpl<T>(); | 1325 _FutureImpl<T> result = new _FutureImpl<T>(); |
| 1318 | 1326 |
| 1319 // Overwrite the onDone and onError handlers. | 1327 // Overwrite the onDone and onError handlers. |
| 1320 onDone(() { result._setValue(futureValue); }); | 1328 onDone(() { result._setValue(futureValue); }); |
| 1321 onError((AsyncError error) { | 1329 onError((error) { |
| 1322 cancel(); | 1330 cancel(); |
| 1323 result._setError(error); | 1331 result._setError(error); |
| 1324 }); | 1332 }); |
| 1325 | 1333 |
| 1326 return result; | 1334 return result; |
| 1327 } | 1335 } |
| 1328 } | 1336 } |
| 1329 | 1337 |
| 1330 class _SingleStreamMultiplexer<T> extends _MultiStreamImpl<T> { | 1338 class _SingleStreamMultiplexer<T> extends _MultiStreamImpl<T> { |
| 1331 final Stream<T> _source; | 1339 final Stream<T> _source; |
| (...skipping 24 matching lines...) Expand all Loading... |
| 1356 onError: this._addError, | 1364 onError: this._addError, |
| 1357 onDone: this._close); | 1365 onDone: this._close); |
| 1358 } else { | 1366 } else { |
| 1359 // TODO(lrn): Check why this can happen. | 1367 // TODO(lrn): Check why this can happen. |
| 1360 if (_subscription == null) return; | 1368 if (_subscription == null) return; |
| 1361 _subscription.cancel(); | 1369 _subscription.cancel(); |
| 1362 _subscription = null; | 1370 _subscription = null; |
| 1363 } | 1371 } |
| 1364 } | 1372 } |
| 1365 } | 1373 } |
| OLD | NEW |