| OLD | NEW |
| 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
| 8 // Core Stream types | 8 // Core Stream types |
| 9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
| 10 | 10 |
| (...skipping 50 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 61 * When the future completes, the stream will fire one event, either | 61 * When the future completes, the stream will fire one event, either |
| 62 * data or error, and then close with a done-event. | 62 * data or error, and then close with a done-event. |
| 63 */ | 63 */ |
| 64 factory Stream.fromFuture(Future<T> future) { | 64 factory Stream.fromFuture(Future<T> future) { |
| 65 _StreamImpl<T> stream = new _SingleStreamImpl<T>(); | 65 _StreamImpl<T> stream = new _SingleStreamImpl<T>(); |
| 66 future.then((value) { | 66 future.then((value) { |
| 67 stream._add(value); | 67 stream._add(value); |
| 68 stream._close(); | 68 stream._close(); |
| 69 }, | 69 }, |
| 70 onError: (error) { | 70 onError: (error) { |
| 71 stream._signalError(error); | 71 stream._addError(error); |
| 72 stream._close(); | 72 stream._close(); |
| 73 }); | 73 }); |
| 74 return stream; | 74 return stream; |
| 75 } | 75 } |
| 76 | 76 |
| 77 /** | 77 /** |
| 78 * Creates a single-subscription stream that gets its data from [data]. | 78 * Creates a single-subscription stream that gets its data from [data]. |
| 79 */ | 79 */ |
| 80 factory Stream.fromIterable(Iterable<T> data) { | 80 factory Stream.fromIterable(Iterable<T> data) { |
| 81 _PendingEvents iterableEvents = new _IterablePendingEvents<T>(data); | 81 _PendingEvents iterableEvents = new _IterablePendingEvents<T>(data); |
| (...skipping 125 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 207 result._setError(e); | 207 result._setError(e); |
| 208 }, | 208 }, |
| 209 onDone: () { | 209 onDone: () { |
| 210 result._setValue(value); | 210 result._setValue(value); |
| 211 }, | 211 }, |
| 212 unsubscribeOnError: true); | 212 unsubscribeOnError: true); |
| 213 return result; | 213 return result; |
| 214 } | 214 } |
| 215 | 215 |
| 216 // Deprecated method, previously called 'pipe', retained for compatibility. | 216 // Deprecated method, previously called 'pipe', retained for compatibility. |
| 217 Future pipeInto(StreamSink<T> sink, | 217 Future pipeInto(EventSink<T> sink, |
| 218 {void onError(AsyncError error), | 218 {void onError(AsyncError error), |
| 219 bool unsubscribeOnError}) { | 219 bool unsubscribeOnError}) { |
| 220 _FutureImpl<T> result = new _FutureImpl<T>(); | 220 _FutureImpl<T> result = new _FutureImpl<T>(); |
| 221 this.listen( | 221 this.listen( |
| 222 sink.add, | 222 sink.add, |
| 223 onError: sink.signalError, | 223 onError: sink.addError, |
| 224 onDone: () { | 224 onDone: () { |
| 225 sink.close(); | 225 sink.close(); |
| 226 result._setValue(null); | 226 result._setValue(null); |
| 227 }, | 227 }, |
| 228 unsubscribeOnError: unsubscribeOnError); | 228 unsubscribeOnError: unsubscribeOnError); |
| 229 return result; | 229 return result; |
| 230 } | 230 } |
| 231 | 231 |
| 232 | 232 |
| 233 /** | 233 /** |
| (...skipping 603 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 837 void pause([Future resumeSignal]); | 837 void pause([Future resumeSignal]); |
| 838 | 838 |
| 839 /** | 839 /** |
| 840 * Resume after a pause. | 840 * Resume after a pause. |
| 841 */ | 841 */ |
| 842 void resume(); | 842 void resume(); |
| 843 } | 843 } |
| 844 | 844 |
| 845 | 845 |
| 846 /** | 846 /** |
| 847 * An interface that abstracts sending events into a [Stream]. | 847 * *Deprecated*. Use [EventSink] instead. |
| 848 */ | 848 */ |
| 849 abstract class StreamSink<T> { | 849 abstract class StreamSink<T> extends EventSink<T>{ |
| 850 /* TODO(8997): Remove class.*/ |
| 851 /** *Deprecated*. Use [EventSink.addError] instead.*/ |
| 852 void signalError(AsyncError errorEvent) { addError(errorEvent); } |
| 853 } |
| 854 |
| 855 |
| 856 /** |
| 857 * An interface that abstracts creation or handling of [Stream] events. |
| 858 */ |
| 859 abstract class EventSink<T> { |
| 860 /** Create a data event */ |
| 850 void add(T event); | 861 void add(T event); |
| 851 /** Signal an async error to the receivers of this sink's values. */ | 862 /** Create an async error. */ |
| 852 void signalError(AsyncError errorEvent); | 863 void addError(AsyncError errorEvent); |
| 864 /** Request a stream to close. */ |
| 853 void close(); | 865 void close(); |
| 854 } | 866 } |
| 855 | 867 |
| 868 |
| 856 /** [Stream] wrapper that only exposes the [Stream] interface. */ | 869 /** [Stream] wrapper that only exposes the [Stream] interface. */ |
| 857 class StreamView<T> extends Stream<T> { | 870 class StreamView<T> extends Stream<T> { |
| 858 Stream<T> _stream; | 871 Stream<T> _stream; |
| 859 | 872 |
| 860 StreamView(this._stream); | 873 StreamView(this._stream); |
| 861 | 874 |
| 862 bool get isBroadcast => _stream.isBroadcast; | 875 bool get isBroadcast => _stream.isBroadcast; |
| 863 | 876 |
| 864 Stream<T> asBroadcastStream() => _stream.asBroadcastStream(); | 877 Stream<T> asBroadcastStream() => _stream.asBroadcastStream(); |
| 865 | 878 |
| 866 StreamSubscription<T> listen(void onData(T value), | 879 StreamSubscription<T> listen(void onData(T value), |
| 867 { void onError(AsyncError error), | 880 { void onError(AsyncError error), |
| 868 void onDone(), | 881 void onDone(), |
| 869 bool unsubscribeOnError }) { | 882 bool unsubscribeOnError }) { |
| 870 return _stream.listen(onData, onError: onError, onDone: onDone, | 883 return _stream.listen(onData, onError: onError, onDone: onDone, |
| 871 unsubscribeOnError: unsubscribeOnError); | 884 unsubscribeOnError: unsubscribeOnError); |
| 872 } | 885 } |
| 873 } | 886 } |
| 874 | 887 |
| 875 /** | 888 /** |
| 876 * [StreamSink] wrapper that only exposes the [StreamSink] interface. | 889 * [EventSink] wrapper that only exposes the [EventSink] interface. |
| 877 */ | 890 */ |
| 878 class StreamSinkView<T> implements StreamSink<T> { | 891 class EventSinkView<T> extends StreamSink<T> { |
| 879 final StreamSink<T> _sink; | 892 // TODO(8997): Implment EventSink instead. |
| 893 final EventSink<T> _sink; |
| 880 | 894 |
| 881 StreamSinkView(this._sink); | 895 EventSinkView(this._sink); |
| 882 | 896 |
| 883 void add(T value) { _sink.add(value); } | 897 void add(T value) { _sink.add(value); } |
| 884 void signalError(AsyncError error) { _sink.signalError(error); } | 898 void addError(AsyncError error) { _sink.addError(error); } |
| 885 void close() { _sink.close(); } | 899 void close() { _sink.close(); } |
| 886 } | 900 } |
| 887 | 901 |
| 888 | 902 |
| 889 /** | 903 /** |
| 890 * The target of a [Stream.pipe] call. | 904 * The target of a [Stream.pipe] call. |
| 891 * | 905 * |
| 892 * The [Stream.pipe] call will pass itself to this object, and then return | 906 * The [Stream.pipe] call will pass itself to this object, and then return |
| 893 * the resulting [Future]. The pipe should complete the future when it's | 907 * the resulting [Future]. The pipe should complete the future when it's |
| 894 * done. | 908 * done. |
| (...skipping 14 matching lines...) Expand all Loading... |
| 909 * Create a [StreamTransformer] that delegates events to the given functions. | 923 * Create a [StreamTransformer] that delegates events to the given functions. |
| 910 * | 924 * |
| 911 * This is actually a [StreamEventTransformer] where the event handling is | 925 * This is actually a [StreamEventTransformer] where the event handling is |
| 912 * performed by the function arguments. | 926 * performed by the function arguments. |
| 913 * If an argument is omitted, it acts as the corresponding default method from | 927 * If an argument is omitted, it acts as the corresponding default method from |
| 914 * [StreamEventTransformer]. | 928 * [StreamEventTransformer]. |
| 915 * | 929 * |
| 916 * Example use: | 930 * Example use: |
| 917 * | 931 * |
| 918 * stringStream.transform(new StreamTransformer<String, String>( | 932 * stringStream.transform(new StreamTransformer<String, String>( |
| 919 * handleData: (Strung value, StreamSink<String> sink) { | 933 * handleData: (Strung value, EventSink<String> sink) { |
| 920 * sink.add(value); | 934 * sink.add(value); |
| 921 * sink.add(value); // Duplicate the incoming events. | 935 * sink.add(value); // Duplicate the incoming events. |
| 922 * })); | 936 * })); |
| 923 * | 937 * |
| 924 */ | 938 */ |
| 925 factory StreamTransformer({ | 939 factory StreamTransformer({ |
| 926 void handleData(S data, StreamSink<T> sink), | 940 void handleData(S data, EventSink<T> sink), |
| 927 void handleError(AsyncError error, StreamSink<T> sink), | 941 void handleError(AsyncError error, EventSink<T> sink), |
| 928 void handleDone(StreamSink<T> sink)}) { | 942 void handleDone(EventSink<T> sink)}) { |
| 929 return new _StreamTransformerImpl<S, T>(handleData, | 943 return new _StreamTransformerImpl<S, T>(handleData, |
| 930 handleError, | 944 handleError, |
| 931 handleDone); | 945 handleDone); |
| 932 } | 946 } |
| 933 | 947 |
| 934 Stream<T> bind(Stream<S> stream); | 948 Stream<T> bind(Stream<S> stream); |
| 935 } | 949 } |
| 936 | 950 |
| 937 | 951 |
| 938 /** | 952 /** |
| 939 * Base class for transformers that modifies stream events. | 953 * Base class for transformers that modifies stream events. |
| 940 * | 954 * |
| 941 * A [StreamEventTransformer] transforms incoming Stream | 955 * A [StreamEventTransformer] transforms incoming Stream |
| 942 * events of one kind into outgoing events of (possibly) another kind. | 956 * events of one kind into outgoing events of (possibly) another kind. |
| 943 * | 957 * |
| 944 * Subscribing on the stream returned by [bind] is the same as subscribing on | 958 * Subscribing on the stream returned by [bind] is the same as subscribing on |
| 945 * the source stream, except that events are passed through the [transformer] | 959 * the source stream, except that events are passed through the [transformer] |
| 946 * before being emitted. The transformer may generate any number and | 960 * before being emitted. The transformer may generate any number and |
| 947 * types of events for each incoming event. Pauses on the returned | 961 * types of events for each incoming event. Pauses on the returned |
| 948 * subscription are forwarded to this stream. | 962 * subscription are forwarded to this stream. |
| 949 * | 963 * |
| 950 * An example that duplicates all data events: | 964 * An example that duplicates all data events: |
| 951 * | 965 * |
| 952 * class DoubleTransformer<T> extends StreamEventTransformerBase<T, T> { | 966 * class DoubleTransformer<T> extends StreamEventTransformerBase<T, T> { |
| 953 * void handleData(T data, StreamSink<T> sink) { | 967 * void handleData(T data, EventSink<T> sink) { |
| 954 * sink.add(value); | 968 * sink.add(value); |
| 955 * sink.add(value); | 969 * sink.add(value); |
| 956 * } | 970 * } |
| 957 * } | 971 * } |
| 958 * someTypeStream.transform(new DoubleTransformer<Type>()); | 972 * someTypeStream.transform(new DoubleTransformer<Type>()); |
| 959 * | 973 * |
| 960 * The default implementations of the "handle" methods forward | 974 * The default implementations of the "handle" methods forward |
| 961 * the events unmodified. If using the default [handleData] the generic type [T] | 975 * the events unmodified. If using the default [handleData] the generic type [T] |
| 962 * needs to be assignable to [S]. | 976 * needs to be assignable to [S]. |
| 963 */ | 977 */ |
| 964 abstract class StreamEventTransformer<S, T> implements StreamTransformer<S, T> { | 978 abstract class StreamEventTransformer<S, T> implements StreamTransformer<S, T> { |
| 965 const StreamEventTransformer(); | 979 const StreamEventTransformer(); |
| 966 | 980 |
| 967 Stream<T> bind(Stream<S> source) { | 981 Stream<T> bind(Stream<S> source) { |
| 968 return new EventTransformStream<S, T>(source, this); | 982 return new EventTransformStream<S, T>(source, this); |
| 969 } | 983 } |
| 970 | 984 |
| 971 /** | 985 /** |
| 972 * Act on incoming data event. | 986 * Act on incoming data event. |
| 973 * | 987 * |
| 974 * The method may generate any number of events on the sink, but should | 988 * The method may generate any number of events on the sink, but should |
| 975 * not throw. | 989 * not throw. |
| 976 */ | 990 */ |
| 977 void handleData(S event, StreamSink<T> sink) { | 991 void handleData(S event, EventSink<T> sink) { |
| 978 var data = event; | 992 var data = event; |
| 979 sink.add(data); | 993 sink.add(data); |
| 980 } | 994 } |
| 981 | 995 |
| 982 /** | 996 /** |
| 983 * Act on incoming error event. | 997 * Act on incoming error event. |
| 984 * | 998 * |
| 985 * The method may generate any number of events on the sink, but should | 999 * The method may generate any number of events on the sink, but should |
| 986 * not throw. | 1000 * not throw. |
| 987 */ | 1001 */ |
| 988 void handleError(AsyncError error, StreamSink<T> sink) { | 1002 void handleError(AsyncError error, EventSink<T> sink) { |
| 989 sink.signalError(error); | 1003 sink.addError(error); |
| 990 } | 1004 } |
| 991 | 1005 |
| 992 /** | 1006 /** |
| 993 * Act on incoming done event. | 1007 * Act on incoming done event. |
| 994 * | 1008 * |
| 995 * The method may generate any number of events on the sink, but should | 1009 * The method may generate any number of events on the sink, but should |
| 996 * not throw. | 1010 * not throw. |
| 997 */ | 1011 */ |
| 998 void handleDone(StreamSink<T> sink){ | 1012 void handleDone(EventSink<T> sink){ |
| 999 sink.close(); | 1013 sink.close(); |
| 1000 } | 1014 } |
| 1001 } | 1015 } |
| 1002 | 1016 |
| 1003 | 1017 |
| 1004 /** | 1018 /** |
| 1005 * Stream that transforms another stream by intercepting and replacing events. | 1019 * Stream that transforms another stream by intercepting and replacing events. |
| 1006 * | 1020 * |
| 1007 * This [Stream] is a transformation of a source stream. Listening on this | 1021 * This [Stream] is a transformation of a source stream. Listening on this |
| 1008 * stream is the same as listening on the source stream, except that events | 1022 * stream is the same as listening on the source stream, except that events |
| (...skipping 13 matching lines...) Expand all Loading... |
| 1022 bool unsubscribeOnError }) { | 1036 bool unsubscribeOnError }) { |
| 1023 unsubscribeOnError = identical(true, unsubscribeOnError); | 1037 unsubscribeOnError = identical(true, unsubscribeOnError); |
| 1024 return new _EventTransformStreamSubscription(_source, _transformer, | 1038 return new _EventTransformStreamSubscription(_source, _transformer, |
| 1025 onData, onError, onDone, | 1039 onData, onError, onDone, |
| 1026 unsubscribeOnError); | 1040 unsubscribeOnError); |
| 1027 } | 1041 } |
| 1028 } | 1042 } |
| 1029 | 1043 |
| 1030 class _EventTransformStreamSubscription<S, T> | 1044 class _EventTransformStreamSubscription<S, T> |
| 1031 extends _BaseStreamSubscription<T> | 1045 extends _BaseStreamSubscription<T> |
| 1032 implements _StreamOutputSink<T> { | 1046 implements _EventOutputSink<T> { |
| 1033 /** The transformer used to transform events. */ | 1047 /** The transformer used to transform events. */ |
| 1034 final StreamEventTransformer<S, T> _transformer; | 1048 final StreamEventTransformer<S, T> _transformer; |
| 1035 /** Whether to unsubscribe when emitting an error. */ | 1049 /** Whether to unsubscribe when emitting an error. */ |
| 1036 final bool _unsubscribeOnError; | 1050 final bool _unsubscribeOnError; |
| 1037 /** Source of incoming events. */ | 1051 /** Source of incoming events. */ |
| 1038 StreamSubscription<S> _subscription; | 1052 StreamSubscription<S> _subscription; |
| 1039 /** Cached StreamSink wrapper for this class. */ | 1053 /** Cached EventSink wrapper for this class. */ |
| 1040 StreamSink<T> _sink; | 1054 EventSink<T> _sink; |
| 1041 | 1055 |
| 1042 _EventTransformStreamSubscription(Stream<S> source, | 1056 _EventTransformStreamSubscription(Stream<S> source, |
| 1043 this._transformer, | 1057 this._transformer, |
| 1044 void onData(T data), | 1058 void onData(T data), |
| 1045 void onError(AsyncError error), | 1059 void onError(AsyncError error), |
| 1046 void onDone(), | 1060 void onDone(), |
| 1047 this._unsubscribeOnError) | 1061 this._unsubscribeOnError) |
| 1048 : super(onData, onError, onDone) { | 1062 : super(onData, onError, onDone) { |
| 1049 _sink = new _StreamOutputSinkWrapper<T>(this); | 1063 _sink = new _EventOutputSinkWrapper<T>(this); |
| 1050 _subscription = source.listen(_handleData, | 1064 _subscription = source.listen(_handleData, |
| 1051 onError: _handleError, | 1065 onError: _handleError, |
| 1052 onDone: _handleDone); | 1066 onDone: _handleDone); |
| 1053 } | 1067 } |
| 1054 | 1068 |
| 1055 void pause([Future pauseSignal]) { | 1069 void pause([Future pauseSignal]) { |
| 1056 if (_subscription != null) _subscription.pause(pauseSignal); | 1070 if (_subscription != null) _subscription.pause(pauseSignal); |
| 1057 } | 1071 } |
| 1058 | 1072 |
| 1059 void resume() { | 1073 void resume() { |
| (...skipping 25 matching lines...) Expand all Loading... |
| 1085 | 1099 |
| 1086 void _handleDone() { | 1100 void _handleDone() { |
| 1087 _subscription = null; | 1101 _subscription = null; |
| 1088 try { | 1102 try { |
| 1089 _transformer.handleDone(_sink); | 1103 _transformer.handleDone(_sink); |
| 1090 } catch (e, s) { | 1104 } catch (e, s) { |
| 1091 _sendError(_asyncError(e, s)); | 1105 _sendError(_asyncError(e, s)); |
| 1092 } | 1106 } |
| 1093 } | 1107 } |
| 1094 | 1108 |
| 1095 // StreamOutputSink interface. | 1109 // EventOutputSink interface. |
| 1096 void _sendData(T data) { | 1110 void _sendData(T data) { |
| 1097 _onData(data); | 1111 _onData(data); |
| 1098 } | 1112 } |
| 1099 | 1113 |
| 1100 void _sendError(AsyncError error) { | 1114 void _sendError(AsyncError error) { |
| 1101 _onError(error); | 1115 _onError(error); |
| 1102 if (_unsubscribeOnError) { | 1116 if (_unsubscribeOnError) { |
| 1103 cancel(); | 1117 cancel(); |
| 1104 } | 1118 } |
| 1105 } | 1119 } |
| 1106 | 1120 |
| 1107 void _sendDone() { | 1121 void _sendDone() { |
| 1108 // It's ok to cancel even if we have been unsubscribed already. | 1122 // It's ok to cancel even if we have been unsubscribed already. |
| 1109 cancel(); | 1123 cancel(); |
| 1110 _onDone(); | 1124 _onDone(); |
| 1111 } | 1125 } |
| 1112 } | 1126 } |
| 1113 | 1127 |
| 1114 class _StreamOutputSinkWrapper<T> implements StreamSink<T> { | 1128 /* TODO(8997): Implement EventSink instead, */ |
| 1115 _StreamOutputSink _sink; | 1129 class _EventOutputSinkWrapper<T> extends StreamSink<T> { |
| 1116 _StreamOutputSinkWrapper(this._sink); | 1130 _EventOutputSink _sink; |
| 1131 _EventOutputSinkWrapper(this._sink); |
| 1117 | 1132 |
| 1118 void add(T data) => _sink._sendData(data); | 1133 void add(T data) { _sink._sendData(data); } |
| 1119 void signalError(AsyncError error) => _sink._sendError(error); | 1134 void addError(AsyncError error) { _sink._sendError(error); } |
| 1120 void close() => _sink._sendDone(); | 1135 void close() { _sink._sendDone(); } |
| 1121 } | 1136 } |
| OLD | NEW |