Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
| 8 // Core Stream types | 8 // Core Stream types |
| 9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
| 10 | 10 |
| (...skipping 50 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 61 * When the future completes, the stream will fire one event, either | 61 * When the future completes, the stream will fire one event, either |
| 62 * data or error, and then close with a done-event. | 62 * data or error, and then close with a done-event. |
| 63 */ | 63 */ |
| 64 factory Stream.fromFuture(Future<T> future) { | 64 factory Stream.fromFuture(Future<T> future) { |
| 65 _StreamImpl<T> stream = new _SingleStreamImpl<T>(); | 65 _StreamImpl<T> stream = new _SingleStreamImpl<T>(); |
| 66 future.then((value) { | 66 future.then((value) { |
| 67 stream._add(value); | 67 stream._add(value); |
| 68 stream._close(); | 68 stream._close(); |
| 69 }, | 69 }, |
| 70 onError: (error) { | 70 onError: (error) { |
| 71 stream._signalError(error); | 71 stream._addError(error); |
| 72 stream._close(); | 72 stream._close(); |
| 73 }); | 73 }); |
| 74 return stream; | 74 return stream; |
| 75 } | 75 } |
| 76 | 76 |
| 77 /** | 77 /** |
| 78 * Creates a single-subscription stream that gets its data from [data]. | 78 * Creates a single-subscription stream that gets its data from [data]. |
| 79 */ | 79 */ |
| 80 factory Stream.fromIterable(Iterable<T> data) { | 80 factory Stream.fromIterable(Iterable<T> data) { |
| 81 _PendingEvents iterableEvents = new _IterablePendingEvents<T>(data); | 81 _PendingEvents iterableEvents = new _IterablePendingEvents<T>(data); |
| (...skipping 125 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 207 result._setError(e); | 207 result._setError(e); |
| 208 }, | 208 }, |
| 209 onDone: () { | 209 onDone: () { |
| 210 result._setValue(value); | 210 result._setValue(value); |
| 211 }, | 211 }, |
| 212 unsubscribeOnError: true); | 212 unsubscribeOnError: true); |
| 213 return result; | 213 return result; |
| 214 } | 214 } |
| 215 | 215 |
| 216 // Deprecated method, previously called 'pipe', retained for compatibility. | 216 // Deprecated method, previously called 'pipe', retained for compatibility. |
| 217 Future pipeInto(StreamSink<T> sink, | 217 Future pipeInto(EventSink<T> sink, |
| 218 {void onError(AsyncError error), | 218 {void onError(AsyncError error), |
| 219 bool unsubscribeOnError}) { | 219 bool unsubscribeOnError}) { |
| 220 _FutureImpl<T> result = new _FutureImpl<T>(); | 220 _FutureImpl<T> result = new _FutureImpl<T>(); |
| 221 this.listen( | 221 this.listen( |
| 222 sink.add, | 222 sink.add, |
| 223 onError: sink.signalError, | 223 onError: sink.addError, |
| 224 onDone: () { | 224 onDone: () { |
| 225 sink.close(); | 225 sink.close(); |
| 226 result._setValue(null); | 226 result._setValue(null); |
| 227 }, | 227 }, |
| 228 unsubscribeOnError: unsubscribeOnError); | 228 unsubscribeOnError: unsubscribeOnError); |
| 229 return result; | 229 return result; |
| 230 } | 230 } |
| 231 | 231 |
| 232 | 232 |
| 233 /** | 233 /** |
| (...skipping 605 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 839 /** | 839 /** |
| 840 * Resume after a pause. | 840 * Resume after a pause. |
| 841 */ | 841 */ |
| 842 void resume(); | 842 void resume(); |
| 843 } | 843 } |
| 844 | 844 |
| 845 | 845 |
| 846 /** | 846 /** |
| 847 * An interface that abstracts sending events into a [Stream]. | 847 * An interface that abstracts sending events into a [Stream]. |
| 848 */ | 848 */ |
| 849 abstract class StreamSink<T> { | 849 abstract class EventSink<T> { |
| 850 void add(T event); | 850 void add(T event); |
| 851 /** Signal an async error to the receivers of this sink's values. */ | 851 /** Signal an async error to the receivers of this sink's values. */ |
| 852 void signalError(AsyncError errorEvent); | 852 void addError(AsyncError errorEvent); |
| 853 /** *Deprecated*. Use [addError] instead. */ | |
| 854 void signalError(AsyncError errorEvent) { | |
|
floitsch
2013/03/07 14:19:34
Move this method to StreamSink below. Users that u
Lasse Reichstein Nielsen
2013/03/08 10:18:25
That's problematic. This is very much a consumed i
| |
| 855 addError(errorEvent); | |
| 856 } | |
| 853 void close(); | 857 void close(); |
| 854 } | 858 } |
| 855 | 859 |
| 860 /** | |
| 861 * *Deprecated*. Use [EventSink] instead. | |
| 862 */ | |
| 863 abstract class StreamSink<T> extends EventSink<T> {} | |
|
floitsch
2013/03/07 14:19:34
ditto for @deprecated.
Lasse Reichstein Nielsen
2013/03/08 10:18:25
Done.
| |
| 864 | |
| 856 /** [Stream] wrapper that only exposes the [Stream] interface. */ | 865 /** [Stream] wrapper that only exposes the [Stream] interface. */ |
| 857 class StreamView<T> extends Stream<T> { | 866 class StreamView<T> extends Stream<T> { |
| 858 Stream<T> _stream; | 867 Stream<T> _stream; |
| 859 | 868 |
| 860 StreamView(this._stream); | 869 StreamView(this._stream); |
| 861 | 870 |
| 862 bool get isBroadcast => _stream.isBroadcast; | 871 bool get isBroadcast => _stream.isBroadcast; |
| 863 | 872 |
| 864 Stream<T> asBroadcastStream() => _stream.asBroadcastStream(); | 873 Stream<T> asBroadcastStream() => _stream.asBroadcastStream(); |
| 865 | 874 |
| 866 StreamSubscription<T> listen(void onData(T value), | 875 StreamSubscription<T> listen(void onData(T value), |
| 867 { void onError(AsyncError error), | 876 { void onError(AsyncError error), |
| 868 void onDone(), | 877 void onDone(), |
| 869 bool unsubscribeOnError }) { | 878 bool unsubscribeOnError }) { |
| 870 return _stream.listen(onData, onError: onError, onDone: onDone, | 879 return _stream.listen(onData, onError: onError, onDone: onDone, |
| 871 unsubscribeOnError: unsubscribeOnError); | 880 unsubscribeOnError: unsubscribeOnError); |
| 872 } | 881 } |
| 873 } | 882 } |
| 874 | 883 |
| 875 /** | 884 /** |
| 876 * [StreamSink] wrapper that only exposes the [StreamSink] interface. | 885 * [EventSink] wrapper that only exposes the [EventSink] interface. |
| 877 */ | 886 */ |
| 878 class StreamSinkView<T> implements StreamSink<T> { | 887 class EventSinkView<T> implements EventSink<T> { |
| 879 final StreamSink<T> _sink; | 888 final EventSink<T> _sink; |
| 880 | 889 |
| 881 StreamSinkView(this._sink); | 890 EventSinkView(this._sink); |
| 882 | 891 |
| 883 void add(T value) { _sink.add(value); } | 892 void add(T value) { _sink.add(value); } |
| 884 void signalError(AsyncError error) { _sink.signalError(error); } | 893 void addError(AsyncError error) { _sink.addError(error); } |
| 894 void signalError(AsyncError error) { addError(error); } | |
|
floitsch
2013/03/07 14:19:34
Add TODO?
Alternatively add comment to EventSink.s
Lasse Reichstein Nielsen
2013/03/08 10:18:25
Added TODO(8997) where relevant (and created bug f
| |
| 885 void close() { _sink.close(); } | 895 void close() { _sink.close(); } |
| 886 } | 896 } |
| 887 | 897 |
| 888 | 898 |
| 889 /** | 899 /** |
| 890 * The target of a [Stream.pipe] call. | 900 * The target of a [Stream.pipe] call. |
| 891 * | 901 * |
| 892 * The [Stream.pipe] call will pass itself to this object, and then return | 902 * The [Stream.pipe] call will pass itself to this object, and then return |
| 893 * the resulting [Future]. The pipe should complete the future when it's | 903 * the resulting [Future]. The pipe should complete the future when it's |
| 894 * done. | 904 * done. |
| (...skipping 14 matching lines...) Expand all Loading... | |
| 909 * Create a [StreamTransformer] that delegates events to the given functions. | 919 * Create a [StreamTransformer] that delegates events to the given functions. |
| 910 * | 920 * |
| 911 * This is actually a [StreamEventTransformer] where the event handling is | 921 * This is actually a [StreamEventTransformer] where the event handling is |
| 912 * performed by the function arguments. | 922 * performed by the function arguments. |
| 913 * If an argument is omitted, it acts as the corresponding default method from | 923 * If an argument is omitted, it acts as the corresponding default method from |
| 914 * [StreamEventTransformer]. | 924 * [StreamEventTransformer]. |
| 915 * | 925 * |
| 916 * Example use: | 926 * Example use: |
| 917 * | 927 * |
| 918 * stringStream.transform(new StreamTransformer<String, String>( | 928 * stringStream.transform(new StreamTransformer<String, String>( |
| 919 * handleData: (Strung value, StreamSink<String> sink) { | 929 * handleData: (Strung value, EventSink<String> sink) { |
| 920 * sink.add(value); | 930 * sink.add(value); |
| 921 * sink.add(value); // Duplicate the incoming events. | 931 * sink.add(value); // Duplicate the incoming events. |
| 922 * })); | 932 * })); |
| 923 * | 933 * |
| 924 */ | 934 */ |
| 925 factory StreamTransformer({ | 935 factory StreamTransformer({ |
| 926 void handleData(S data, StreamSink<T> sink), | 936 void handleData(S data, EventSink<T> sink), |
| 927 void handleError(AsyncError error, StreamSink<T> sink), | 937 void handleError(AsyncError error, EventSink<T> sink), |
| 928 void handleDone(StreamSink<T> sink)}) { | 938 void handleDone(EventSink<T> sink)}) { |
| 929 return new _StreamTransformerImpl<S, T>(handleData, | 939 return new _StreamTransformerImpl<S, T>(handleData, |
| 930 handleError, | 940 handleError, |
| 931 handleDone); | 941 handleDone); |
| 932 } | 942 } |
| 933 | 943 |
| 934 Stream<T> bind(Stream<S> stream); | 944 Stream<T> bind(Stream<S> stream); |
| 935 } | 945 } |
| 936 | 946 |
| 937 | 947 |
| 938 /** | 948 /** |
| 939 * Base class for transformers that modifies stream events. | 949 * Base class for transformers that modifies stream events. |
| 940 * | 950 * |
| 941 * A [StreamEventTransformer] transforms incoming Stream | 951 * A [StreamEventTransformer] transforms incoming Stream |
| 942 * events of one kind into outgoing events of (possibly) another kind. | 952 * events of one kind into outgoing events of (possibly) another kind. |
| 943 * | 953 * |
| 944 * Subscribing on the stream returned by [bind] is the same as subscribing on | 954 * Subscribing on the stream returned by [bind] is the same as subscribing on |
| 945 * the source stream, except that events are passed through the [transformer] | 955 * the source stream, except that events are passed through the [transformer] |
| 946 * before being emitted. The transformer may generate any number and | 956 * before being emitted. The transformer may generate any number and |
| 947 * types of events for each incoming event. Pauses on the returned | 957 * types of events for each incoming event. Pauses on the returned |
| 948 * subscription are forwarded to this stream. | 958 * subscription are forwarded to this stream. |
| 949 * | 959 * |
| 950 * An example that duplicates all data events: | 960 * An example that duplicates all data events: |
| 951 * | 961 * |
| 952 * class DoubleTransformer<T> extends StreamEventTransformerBase<T, T> { | 962 * class DoubleTransformer<T> extends StreamEventTransformerBase<T, T> { |
| 953 * void handleData(T data, StreamSink<T> sink) { | 963 * void handleData(T data, EventSink<T> sink) { |
| 954 * sink.add(value); | 964 * sink.add(value); |
| 955 * sink.add(value); | 965 * sink.add(value); |
| 956 * } | 966 * } |
| 957 * } | 967 * } |
| 958 * someTypeStream.transform(new DoubleTransformer<Type>()); | 968 * someTypeStream.transform(new DoubleTransformer<Type>()); |
| 959 * | 969 * |
| 960 * The default implementations of the "handle" methods forward | 970 * The default implementations of the "handle" methods forward |
| 961 * the events unmodified. If using the default [handleData] the generic type [T] | 971 * the events unmodified. If using the default [handleData] the generic type [T] |
| 962 * needs to be assignable to [S]. | 972 * needs to be assignable to [S]. |
| 963 */ | 973 */ |
| 964 abstract class StreamEventTransformer<S, T> implements StreamTransformer<S, T> { | 974 abstract class StreamEventTransformer<S, T> implements StreamTransformer<S, T> { |
| 965 const StreamEventTransformer(); | 975 const StreamEventTransformer(); |
| 966 | 976 |
| 967 Stream<T> bind(Stream<S> source) { | 977 Stream<T> bind(Stream<S> source) { |
| 968 return new EventTransformStream<S, T>(source, this); | 978 return new EventTransformStream<S, T>(source, this); |
| 969 } | 979 } |
| 970 | 980 |
| 971 /** | 981 /** |
| 972 * Act on incoming data event. | 982 * Act on incoming data event. |
| 973 * | 983 * |
| 974 * The method may generate any number of events on the sink, but should | 984 * The method may generate any number of events on the sink, but should |
| 975 * not throw. | 985 * not throw. |
| 976 */ | 986 */ |
| 977 void handleData(S event, StreamSink<T> sink) { | 987 void handleData(S event, EventSink<T> sink) { |
| 978 var data = event; | 988 var data = event; |
| 979 sink.add(data); | 989 sink.add(data); |
| 980 } | 990 } |
| 981 | 991 |
| 982 /** | 992 /** |
| 983 * Act on incoming error event. | 993 * Act on incoming error event. |
| 984 * | 994 * |
| 985 * The method may generate any number of events on the sink, but should | 995 * The method may generate any number of events on the sink, but should |
| 986 * not throw. | 996 * not throw. |
| 987 */ | 997 */ |
| 988 void handleError(AsyncError error, StreamSink<T> sink) { | 998 void handleError(AsyncError error, EventSink<T> sink) { |
| 989 sink.signalError(error); | 999 sink.addError(error); |
| 990 } | 1000 } |
| 991 | 1001 |
| 992 /** | 1002 /** |
| 993 * Act on incoming done event. | 1003 * Act on incoming done event. |
| 994 * | 1004 * |
| 995 * The method may generate any number of events on the sink, but should | 1005 * The method may generate any number of events on the sink, but should |
| 996 * not throw. | 1006 * not throw. |
| 997 */ | 1007 */ |
| 998 void handleDone(StreamSink<T> sink){ | 1008 void handleDone(EventSink<T> sink){ |
| 999 sink.close(); | 1009 sink.close(); |
| 1000 } | 1010 } |
| 1001 } | 1011 } |
| 1002 | 1012 |
| 1003 | 1013 |
| 1004 /** | 1014 /** |
| 1005 * Stream that transforms another stream by intercepting and replacing events. | 1015 * Stream that transforms another stream by intercepting and replacing events. |
| 1006 * | 1016 * |
| 1007 * This [Stream] is a transformation of a source stream. Listening on this | 1017 * This [Stream] is a transformation of a source stream. Listening on this |
| 1008 * stream is the same as listening on the source stream, except that events | 1018 * stream is the same as listening on the source stream, except that events |
| (...skipping 13 matching lines...) Expand all Loading... | |
| 1022 bool unsubscribeOnError }) { | 1032 bool unsubscribeOnError }) { |
| 1023 unsubscribeOnError = identical(true, unsubscribeOnError); | 1033 unsubscribeOnError = identical(true, unsubscribeOnError); |
| 1024 return new _EventTransformStreamSubscription(_source, _transformer, | 1034 return new _EventTransformStreamSubscription(_source, _transformer, |
| 1025 onData, onError, onDone, | 1035 onData, onError, onDone, |
| 1026 unsubscribeOnError); | 1036 unsubscribeOnError); |
| 1027 } | 1037 } |
| 1028 } | 1038 } |
| 1029 | 1039 |
| 1030 class _EventTransformStreamSubscription<S, T> | 1040 class _EventTransformStreamSubscription<S, T> |
| 1031 extends _BaseStreamSubscription<T> | 1041 extends _BaseStreamSubscription<T> |
| 1032 implements _StreamOutputSink<T> { | 1042 implements _EventOutputSink<T> { |
| 1033 /** The transformer used to transform events. */ | 1043 /** The transformer used to transform events. */ |
| 1034 final StreamEventTransformer<S, T> _transformer; | 1044 final StreamEventTransformer<S, T> _transformer; |
| 1035 /** Whether to unsubscribe when emitting an error. */ | 1045 /** Whether to unsubscribe when emitting an error. */ |
| 1036 final bool _unsubscribeOnError; | 1046 final bool _unsubscribeOnError; |
| 1037 /** Source of incoming events. */ | 1047 /** Source of incoming events. */ |
| 1038 StreamSubscription<S> _subscription; | 1048 StreamSubscription<S> _subscription; |
| 1039 /** Cached StreamSink wrapper for this class. */ | 1049 /** Cached EventSink wrapper for this class. */ |
| 1040 StreamSink<T> _sink; | 1050 EventSink<T> _sink; |
| 1041 | 1051 |
| 1042 _EventTransformStreamSubscription(Stream<S> source, | 1052 _EventTransformStreamSubscription(Stream<S> source, |
| 1043 this._transformer, | 1053 this._transformer, |
| 1044 void onData(T data), | 1054 void onData(T data), |
| 1045 void onError(AsyncError error), | 1055 void onError(AsyncError error), |
| 1046 void onDone(), | 1056 void onDone(), |
| 1047 this._unsubscribeOnError) | 1057 this._unsubscribeOnError) |
| 1048 : super(onData, onError, onDone) { | 1058 : super(onData, onError, onDone) { |
| 1049 _sink = new _StreamOutputSinkWrapper<T>(this); | 1059 _sink = new _EventOutputSinkWrapper<T>(this); |
| 1050 _subscription = source.listen(_handleData, | 1060 _subscription = source.listen(_handleData, |
| 1051 onError: _handleError, | 1061 onError: _handleError, |
| 1052 onDone: _handleDone); | 1062 onDone: _handleDone); |
| 1053 } | 1063 } |
| 1054 | 1064 |
| 1055 void pause([Future pauseSignal]) { | 1065 void pause([Future pauseSignal]) { |
| 1056 if (_subscription != null) _subscription.pause(pauseSignal); | 1066 if (_subscription != null) _subscription.pause(pauseSignal); |
| 1057 } | 1067 } |
| 1058 | 1068 |
| 1059 void resume() { | 1069 void resume() { |
| (...skipping 25 matching lines...) Expand all Loading... | |
| 1085 | 1095 |
| 1086 void _handleDone() { | 1096 void _handleDone() { |
| 1087 _subscription = null; | 1097 _subscription = null; |
| 1088 try { | 1098 try { |
| 1089 _transformer.handleDone(_sink); | 1099 _transformer.handleDone(_sink); |
| 1090 } catch (e, s) { | 1100 } catch (e, s) { |
| 1091 _sendError(_asyncError(e, s)); | 1101 _sendError(_asyncError(e, s)); |
| 1092 } | 1102 } |
| 1093 } | 1103 } |
| 1094 | 1104 |
| 1095 // StreamOutputSink interface. | 1105 // EventOutputSink interface. |
| 1096 void _sendData(T data) { | 1106 void _sendData(T data) { |
| 1097 _onData(data); | 1107 _onData(data); |
| 1098 } | 1108 } |
| 1099 | 1109 |
| 1100 void _sendError(AsyncError error) { | 1110 void _sendError(AsyncError error) { |
| 1101 _onError(error); | 1111 _onError(error); |
| 1102 if (_unsubscribeOnError) { | 1112 if (_unsubscribeOnError) { |
| 1103 cancel(); | 1113 cancel(); |
| 1104 } | 1114 } |
| 1105 } | 1115 } |
| 1106 | 1116 |
| 1107 void _sendDone() { | 1117 void _sendDone() { |
| 1108 // It's ok to cancel even if we have been unsubscribed already. | 1118 // It's ok to cancel even if we have been unsubscribed already. |
| 1109 cancel(); | 1119 cancel(); |
| 1110 _onDone(); | 1120 _onDone(); |
| 1111 } | 1121 } |
| 1112 } | 1122 } |
| 1113 | 1123 |
| 1114 class _StreamOutputSinkWrapper<T> implements StreamSink<T> { | 1124 class _EventOutputSinkWrapper<T> implements EventSink<T> { |
| 1115 _StreamOutputSink _sink; | 1125 _EventOutputSink _sink; |
| 1116 _StreamOutputSinkWrapper(this._sink); | 1126 _EventOutputSinkWrapper(this._sink); |
| 1117 | 1127 |
| 1118 void add(T data) => _sink._sendData(data); | 1128 void add(T data) { _sink._sendData(data); } |
| 1119 void signalError(AsyncError error) => _sink._sendError(error); | 1129 void addError(AsyncError error) { _sink._sendError(error); } |
| 1120 void close() => _sink._sendDone(); | 1130 void signalError(AsyncError error) { addError(error); } |
| 1131 void close() { _sink._sendDone(); } | |
| 1121 } | 1132 } |
| OLD | NEW |