OLD | NEW |
---|---|
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
8 // Core Stream types | 8 // Core Stream types |
9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
10 | 10 |
(...skipping 50 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
61 * When the future completes, the stream will fire one event, either | 61 * When the future completes, the stream will fire one event, either |
62 * data or error, and then close with a done-event. | 62 * data or error, and then close with a done-event. |
63 */ | 63 */ |
64 factory Stream.fromFuture(Future<T> future) { | 64 factory Stream.fromFuture(Future<T> future) { |
65 _StreamImpl<T> stream = new _SingleStreamImpl<T>(); | 65 _StreamImpl<T> stream = new _SingleStreamImpl<T>(); |
66 future.then((value) { | 66 future.then((value) { |
67 stream._add(value); | 67 stream._add(value); |
68 stream._close(); | 68 stream._close(); |
69 }, | 69 }, |
70 onError: (error) { | 70 onError: (error) { |
71 stream._signalError(error); | 71 stream._addError(error); |
72 stream._close(); | 72 stream._close(); |
73 }); | 73 }); |
74 return stream; | 74 return stream; |
75 } | 75 } |
76 | 76 |
77 /** | 77 /** |
78 * Creates a single-subscription stream that gets its data from [data]. | 78 * Creates a single-subscription stream that gets its data from [data]. |
79 */ | 79 */ |
80 factory Stream.fromIterable(Iterable<T> data) { | 80 factory Stream.fromIterable(Iterable<T> data) { |
81 _PendingEvents iterableEvents = new _IterablePendingEvents<T>(data); | 81 _PendingEvents iterableEvents = new _IterablePendingEvents<T>(data); |
(...skipping 125 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
207 result._setError(e); | 207 result._setError(e); |
208 }, | 208 }, |
209 onDone: () { | 209 onDone: () { |
210 result._setValue(value); | 210 result._setValue(value); |
211 }, | 211 }, |
212 unsubscribeOnError: true); | 212 unsubscribeOnError: true); |
213 return result; | 213 return result; |
214 } | 214 } |
215 | 215 |
216 // Deprecated method, previously called 'pipe', retained for compatibility. | 216 // Deprecated method, previously called 'pipe', retained for compatibility. |
217 Future pipeInto(StreamSink<T> sink, | 217 Future pipeInto(EventSink<T> sink, |
218 {void onError(AsyncError error), | 218 {void onError(AsyncError error), |
219 bool unsubscribeOnError}) { | 219 bool unsubscribeOnError}) { |
220 _FutureImpl<T> result = new _FutureImpl<T>(); | 220 _FutureImpl<T> result = new _FutureImpl<T>(); |
221 this.listen( | 221 this.listen( |
222 sink.add, | 222 sink.add, |
223 onError: sink.signalError, | 223 onError: sink.addError, |
224 onDone: () { | 224 onDone: () { |
225 sink.close(); | 225 sink.close(); |
226 result._setValue(null); | 226 result._setValue(null); |
227 }, | 227 }, |
228 unsubscribeOnError: unsubscribeOnError); | 228 unsubscribeOnError: unsubscribeOnError); |
229 return result; | 229 return result; |
230 } | 230 } |
231 | 231 |
232 | 232 |
233 /** | 233 /** |
(...skipping 605 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
839 /** | 839 /** |
840 * Resume after a pause. | 840 * Resume after a pause. |
841 */ | 841 */ |
842 void resume(); | 842 void resume(); |
843 } | 843 } |
844 | 844 |
845 | 845 |
846 /** | 846 /** |
847 * An interface that abstracts sending events into a [Stream]. | 847 * An interface that abstracts sending events into a [Stream]. |
848 */ | 848 */ |
849 abstract class StreamSink<T> { | 849 abstract class EventSink<T> { |
850 void add(T event); | 850 void add(T event); |
851 /** Signal an async error to the receivers of this sink's values. */ | 851 /** Signal an async error to the receivers of this sink's values. */ |
852 void signalError(AsyncError errorEvent); | 852 void addError(AsyncError errorEvent); |
853 /** *Deprecated*. Use [addError] instead. */ | |
854 void signalError(AsyncError errorEvent) { | |
floitsch
2013/03/07 14:19:34
Move this method to StreamSink below. Users that u
Lasse Reichstein Nielsen
2013/03/08 10:18:25
That's problematic. This is very much a consumed i
| |
855 addError(errorEvent); | |
856 } | |
853 void close(); | 857 void close(); |
854 } | 858 } |
855 | 859 |
860 /** | |
861 * *Deprecated*. Use [EventSink] instead. | |
862 */ | |
863 abstract class StreamSink<T> extends EventSink<T> {} | |
floitsch
2013/03/07 14:19:34
ditto for @deprecated.
Lasse Reichstein Nielsen
2013/03/08 10:18:25
Done.
| |
864 | |
856 /** [Stream] wrapper that only exposes the [Stream] interface. */ | 865 /** [Stream] wrapper that only exposes the [Stream] interface. */ |
857 class StreamView<T> extends Stream<T> { | 866 class StreamView<T> extends Stream<T> { |
858 Stream<T> _stream; | 867 Stream<T> _stream; |
859 | 868 |
860 StreamView(this._stream); | 869 StreamView(this._stream); |
861 | 870 |
862 bool get isBroadcast => _stream.isBroadcast; | 871 bool get isBroadcast => _stream.isBroadcast; |
863 | 872 |
864 Stream<T> asBroadcastStream() => _stream.asBroadcastStream(); | 873 Stream<T> asBroadcastStream() => _stream.asBroadcastStream(); |
865 | 874 |
866 StreamSubscription<T> listen(void onData(T value), | 875 StreamSubscription<T> listen(void onData(T value), |
867 { void onError(AsyncError error), | 876 { void onError(AsyncError error), |
868 void onDone(), | 877 void onDone(), |
869 bool unsubscribeOnError }) { | 878 bool unsubscribeOnError }) { |
870 return _stream.listen(onData, onError: onError, onDone: onDone, | 879 return _stream.listen(onData, onError: onError, onDone: onDone, |
871 unsubscribeOnError: unsubscribeOnError); | 880 unsubscribeOnError: unsubscribeOnError); |
872 } | 881 } |
873 } | 882 } |
874 | 883 |
875 /** | 884 /** |
876 * [StreamSink] wrapper that only exposes the [StreamSink] interface. | 885 * [EventSink] wrapper that only exposes the [EventSink] interface. |
877 */ | 886 */ |
878 class StreamSinkView<T> implements StreamSink<T> { | 887 class EventSinkView<T> implements EventSink<T> { |
879 final StreamSink<T> _sink; | 888 final EventSink<T> _sink; |
880 | 889 |
881 StreamSinkView(this._sink); | 890 EventSinkView(this._sink); |
882 | 891 |
883 void add(T value) { _sink.add(value); } | 892 void add(T value) { _sink.add(value); } |
884 void signalError(AsyncError error) { _sink.signalError(error); } | 893 void addError(AsyncError error) { _sink.addError(error); } |
894 void signalError(AsyncError error) { addError(error); } | |
floitsch
2013/03/07 14:19:34
Add TODO?
Alternatively add comment to EventSink.s
Lasse Reichstein Nielsen
2013/03/08 10:18:25
Added TODO(8997) where relevant (and created bug f
| |
885 void close() { _sink.close(); } | 895 void close() { _sink.close(); } |
886 } | 896 } |
887 | 897 |
888 | 898 |
889 /** | 899 /** |
890 * The target of a [Stream.pipe] call. | 900 * The target of a [Stream.pipe] call. |
891 * | 901 * |
892 * The [Stream.pipe] call will pass itself to this object, and then return | 902 * The [Stream.pipe] call will pass itself to this object, and then return |
893 * the resulting [Future]. The pipe should complete the future when it's | 903 * the resulting [Future]. The pipe should complete the future when it's |
894 * done. | 904 * done. |
(...skipping 14 matching lines...) Expand all Loading... | |
909 * Create a [StreamTransformer] that delegates events to the given functions. | 919 * Create a [StreamTransformer] that delegates events to the given functions. |
910 * | 920 * |
911 * This is actually a [StreamEventTransformer] where the event handling is | 921 * This is actually a [StreamEventTransformer] where the event handling is |
912 * performed by the function arguments. | 922 * performed by the function arguments. |
913 * If an argument is omitted, it acts as the corresponding default method from | 923 * If an argument is omitted, it acts as the corresponding default method from |
914 * [StreamEventTransformer]. | 924 * [StreamEventTransformer]. |
915 * | 925 * |
916 * Example use: | 926 * Example use: |
917 * | 927 * |
918 * stringStream.transform(new StreamTransformer<String, String>( | 928 * stringStream.transform(new StreamTransformer<String, String>( |
919 * handleData: (Strung value, StreamSink<String> sink) { | 929 * handleData: (Strung value, EventSink<String> sink) { |
920 * sink.add(value); | 930 * sink.add(value); |
921 * sink.add(value); // Duplicate the incoming events. | 931 * sink.add(value); // Duplicate the incoming events. |
922 * })); | 932 * })); |
923 * | 933 * |
924 */ | 934 */ |
925 factory StreamTransformer({ | 935 factory StreamTransformer({ |
926 void handleData(S data, StreamSink<T> sink), | 936 void handleData(S data, EventSink<T> sink), |
927 void handleError(AsyncError error, StreamSink<T> sink), | 937 void handleError(AsyncError error, EventSink<T> sink), |
928 void handleDone(StreamSink<T> sink)}) { | 938 void handleDone(EventSink<T> sink)}) { |
929 return new _StreamTransformerImpl<S, T>(handleData, | 939 return new _StreamTransformerImpl<S, T>(handleData, |
930 handleError, | 940 handleError, |
931 handleDone); | 941 handleDone); |
932 } | 942 } |
933 | 943 |
934 Stream<T> bind(Stream<S> stream); | 944 Stream<T> bind(Stream<S> stream); |
935 } | 945 } |
936 | 946 |
937 | 947 |
938 /** | 948 /** |
939 * Base class for transformers that modifies stream events. | 949 * Base class for transformers that modifies stream events. |
940 * | 950 * |
941 * A [StreamEventTransformer] transforms incoming Stream | 951 * A [StreamEventTransformer] transforms incoming Stream |
942 * events of one kind into outgoing events of (possibly) another kind. | 952 * events of one kind into outgoing events of (possibly) another kind. |
943 * | 953 * |
944 * Subscribing on the stream returned by [bind] is the same as subscribing on | 954 * Subscribing on the stream returned by [bind] is the same as subscribing on |
945 * the source stream, except that events are passed through the [transformer] | 955 * the source stream, except that events are passed through the [transformer] |
946 * before being emitted. The transformer may generate any number and | 956 * before being emitted. The transformer may generate any number and |
947 * types of events for each incoming event. Pauses on the returned | 957 * types of events for each incoming event. Pauses on the returned |
948 * subscription are forwarded to this stream. | 958 * subscription are forwarded to this stream. |
949 * | 959 * |
950 * An example that duplicates all data events: | 960 * An example that duplicates all data events: |
951 * | 961 * |
952 * class DoubleTransformer<T> extends StreamEventTransformerBase<T, T> { | 962 * class DoubleTransformer<T> extends StreamEventTransformerBase<T, T> { |
953 * void handleData(T data, StreamSink<T> sink) { | 963 * void handleData(T data, EventSink<T> sink) { |
954 * sink.add(value); | 964 * sink.add(value); |
955 * sink.add(value); | 965 * sink.add(value); |
956 * } | 966 * } |
957 * } | 967 * } |
958 * someTypeStream.transform(new DoubleTransformer<Type>()); | 968 * someTypeStream.transform(new DoubleTransformer<Type>()); |
959 * | 969 * |
960 * The default implementations of the "handle" methods forward | 970 * The default implementations of the "handle" methods forward |
961 * the events unmodified. If using the default [handleData] the generic type [T] | 971 * the events unmodified. If using the default [handleData] the generic type [T] |
962 * needs to be assignable to [S]. | 972 * needs to be assignable to [S]. |
963 */ | 973 */ |
964 abstract class StreamEventTransformer<S, T> implements StreamTransformer<S, T> { | 974 abstract class StreamEventTransformer<S, T> implements StreamTransformer<S, T> { |
965 const StreamEventTransformer(); | 975 const StreamEventTransformer(); |
966 | 976 |
967 Stream<T> bind(Stream<S> source) { | 977 Stream<T> bind(Stream<S> source) { |
968 return new EventTransformStream<S, T>(source, this); | 978 return new EventTransformStream<S, T>(source, this); |
969 } | 979 } |
970 | 980 |
971 /** | 981 /** |
972 * Act on incoming data event. | 982 * Act on incoming data event. |
973 * | 983 * |
974 * The method may generate any number of events on the sink, but should | 984 * The method may generate any number of events on the sink, but should |
975 * not throw. | 985 * not throw. |
976 */ | 986 */ |
977 void handleData(S event, StreamSink<T> sink) { | 987 void handleData(S event, EventSink<T> sink) { |
978 var data = event; | 988 var data = event; |
979 sink.add(data); | 989 sink.add(data); |
980 } | 990 } |
981 | 991 |
982 /** | 992 /** |
983 * Act on incoming error event. | 993 * Act on incoming error event. |
984 * | 994 * |
985 * The method may generate any number of events on the sink, but should | 995 * The method may generate any number of events on the sink, but should |
986 * not throw. | 996 * not throw. |
987 */ | 997 */ |
988 void handleError(AsyncError error, StreamSink<T> sink) { | 998 void handleError(AsyncError error, EventSink<T> sink) { |
989 sink.signalError(error); | 999 sink.addError(error); |
990 } | 1000 } |
991 | 1001 |
992 /** | 1002 /** |
993 * Act on incoming done event. | 1003 * Act on incoming done event. |
994 * | 1004 * |
995 * The method may generate any number of events on the sink, but should | 1005 * The method may generate any number of events on the sink, but should |
996 * not throw. | 1006 * not throw. |
997 */ | 1007 */ |
998 void handleDone(StreamSink<T> sink){ | 1008 void handleDone(EventSink<T> sink){ |
999 sink.close(); | 1009 sink.close(); |
1000 } | 1010 } |
1001 } | 1011 } |
1002 | 1012 |
1003 | 1013 |
1004 /** | 1014 /** |
1005 * Stream that transforms another stream by intercepting and replacing events. | 1015 * Stream that transforms another stream by intercepting and replacing events. |
1006 * | 1016 * |
1007 * This [Stream] is a transformation of a source stream. Listening on this | 1017 * This [Stream] is a transformation of a source stream. Listening on this |
1008 * stream is the same as listening on the source stream, except that events | 1018 * stream is the same as listening on the source stream, except that events |
(...skipping 13 matching lines...) Expand all Loading... | |
1022 bool unsubscribeOnError }) { | 1032 bool unsubscribeOnError }) { |
1023 unsubscribeOnError = identical(true, unsubscribeOnError); | 1033 unsubscribeOnError = identical(true, unsubscribeOnError); |
1024 return new _EventTransformStreamSubscription(_source, _transformer, | 1034 return new _EventTransformStreamSubscription(_source, _transformer, |
1025 onData, onError, onDone, | 1035 onData, onError, onDone, |
1026 unsubscribeOnError); | 1036 unsubscribeOnError); |
1027 } | 1037 } |
1028 } | 1038 } |
1029 | 1039 |
1030 class _EventTransformStreamSubscription<S, T> | 1040 class _EventTransformStreamSubscription<S, T> |
1031 extends _BaseStreamSubscription<T> | 1041 extends _BaseStreamSubscription<T> |
1032 implements _StreamOutputSink<T> { | 1042 implements _EventOutputSink<T> { |
1033 /** The transformer used to transform events. */ | 1043 /** The transformer used to transform events. */ |
1034 final StreamEventTransformer<S, T> _transformer; | 1044 final StreamEventTransformer<S, T> _transformer; |
1035 /** Whether to unsubscribe when emitting an error. */ | 1045 /** Whether to unsubscribe when emitting an error. */ |
1036 final bool _unsubscribeOnError; | 1046 final bool _unsubscribeOnError; |
1037 /** Source of incoming events. */ | 1047 /** Source of incoming events. */ |
1038 StreamSubscription<S> _subscription; | 1048 StreamSubscription<S> _subscription; |
1039 /** Cached StreamSink wrapper for this class. */ | 1049 /** Cached EventSink wrapper for this class. */ |
1040 StreamSink<T> _sink; | 1050 EventSink<T> _sink; |
1041 | 1051 |
1042 _EventTransformStreamSubscription(Stream<S> source, | 1052 _EventTransformStreamSubscription(Stream<S> source, |
1043 this._transformer, | 1053 this._transformer, |
1044 void onData(T data), | 1054 void onData(T data), |
1045 void onError(AsyncError error), | 1055 void onError(AsyncError error), |
1046 void onDone(), | 1056 void onDone(), |
1047 this._unsubscribeOnError) | 1057 this._unsubscribeOnError) |
1048 : super(onData, onError, onDone) { | 1058 : super(onData, onError, onDone) { |
1049 _sink = new _StreamOutputSinkWrapper<T>(this); | 1059 _sink = new _EventOutputSinkWrapper<T>(this); |
1050 _subscription = source.listen(_handleData, | 1060 _subscription = source.listen(_handleData, |
1051 onError: _handleError, | 1061 onError: _handleError, |
1052 onDone: _handleDone); | 1062 onDone: _handleDone); |
1053 } | 1063 } |
1054 | 1064 |
1055 void pause([Future pauseSignal]) { | 1065 void pause([Future pauseSignal]) { |
1056 if (_subscription != null) _subscription.pause(pauseSignal); | 1066 if (_subscription != null) _subscription.pause(pauseSignal); |
1057 } | 1067 } |
1058 | 1068 |
1059 void resume() { | 1069 void resume() { |
(...skipping 25 matching lines...) Expand all Loading... | |
1085 | 1095 |
1086 void _handleDone() { | 1096 void _handleDone() { |
1087 _subscription = null; | 1097 _subscription = null; |
1088 try { | 1098 try { |
1089 _transformer.handleDone(_sink); | 1099 _transformer.handleDone(_sink); |
1090 } catch (e, s) { | 1100 } catch (e, s) { |
1091 _sendError(_asyncError(e, s)); | 1101 _sendError(_asyncError(e, s)); |
1092 } | 1102 } |
1093 } | 1103 } |
1094 | 1104 |
1095 // StreamOutputSink interface. | 1105 // EventOutputSink interface. |
1096 void _sendData(T data) { | 1106 void _sendData(T data) { |
1097 _onData(data); | 1107 _onData(data); |
1098 } | 1108 } |
1099 | 1109 |
1100 void _sendError(AsyncError error) { | 1110 void _sendError(AsyncError error) { |
1101 _onError(error); | 1111 _onError(error); |
1102 if (_unsubscribeOnError) { | 1112 if (_unsubscribeOnError) { |
1103 cancel(); | 1113 cancel(); |
1104 } | 1114 } |
1105 } | 1115 } |
1106 | 1116 |
1107 void _sendDone() { | 1117 void _sendDone() { |
1108 // It's ok to cancel even if we have been unsubscribed already. | 1118 // It's ok to cancel even if we have been unsubscribed already. |
1109 cancel(); | 1119 cancel(); |
1110 _onDone(); | 1120 _onDone(); |
1111 } | 1121 } |
1112 } | 1122 } |
1113 | 1123 |
1114 class _StreamOutputSinkWrapper<T> implements StreamSink<T> { | 1124 class _EventOutputSinkWrapper<T> implements EventSink<T> { |
1115 _StreamOutputSink _sink; | 1125 _EventOutputSink _sink; |
1116 _StreamOutputSinkWrapper(this._sink); | 1126 _EventOutputSinkWrapper(this._sink); |
1117 | 1127 |
1118 void add(T data) => _sink._sendData(data); | 1128 void add(T data) { _sink._sendData(data); } |
1119 void signalError(AsyncError error) => _sink._sendError(error); | 1129 void addError(AsyncError error) { _sink._sendError(error); } |
1120 void close() => _sink._sendDone(); | 1130 void signalError(AsyncError error) { addError(error); } |
1131 void close() { _sink._sendDone(); } | |
1121 } | 1132 } |
OLD | NEW |