Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(90)

Side by Side Diff: sdk/lib/async/stream.dart

Issue 12610006: Renamed StreamSink to EventSink. Renamed signalError to addError. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Changed inheritance back! Now create StreamSink instead of EventSink where we create them. Created 7 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Core Stream types 8 // Core Stream types
9 // ------------------------------------------------------------------- 9 // -------------------------------------------------------------------
10 10
(...skipping 50 matching lines...) Expand 10 before | Expand all | Expand 10 after
61 * When the future completes, the stream will fire one event, either 61 * When the future completes, the stream will fire one event, either
62 * data or error, and then close with a done-event. 62 * data or error, and then close with a done-event.
63 */ 63 */
64 factory Stream.fromFuture(Future<T> future) { 64 factory Stream.fromFuture(Future<T> future) {
65 _StreamImpl<T> stream = new _SingleStreamImpl<T>(); 65 _StreamImpl<T> stream = new _SingleStreamImpl<T>();
66 future.then((value) { 66 future.then((value) {
67 stream._add(value); 67 stream._add(value);
68 stream._close(); 68 stream._close();
69 }, 69 },
70 onError: (error) { 70 onError: (error) {
71 stream._signalError(error); 71 stream._addError(error);
72 stream._close(); 72 stream._close();
73 }); 73 });
74 return stream; 74 return stream;
75 } 75 }
76 76
77 /** 77 /**
78 * Creates a single-subscription stream that gets its data from [data]. 78 * Creates a single-subscription stream that gets its data from [data].
79 */ 79 */
80 factory Stream.fromIterable(Iterable<T> data) { 80 factory Stream.fromIterable(Iterable<T> data) {
81 _PendingEvents iterableEvents = new _IterablePendingEvents<T>(data); 81 _PendingEvents iterableEvents = new _IterablePendingEvents<T>(data);
(...skipping 125 matching lines...) Expand 10 before | Expand all | Expand 10 after
207 result._setError(e); 207 result._setError(e);
208 }, 208 },
209 onDone: () { 209 onDone: () {
210 result._setValue(value); 210 result._setValue(value);
211 }, 211 },
212 unsubscribeOnError: true); 212 unsubscribeOnError: true);
213 return result; 213 return result;
214 } 214 }
215 215
216 // Deprecated method, previously called 'pipe', retained for compatibility. 216 // Deprecated method, previously called 'pipe', retained for compatibility.
217 Future pipeInto(StreamSink<T> sink, 217 Future pipeInto(EventSink<T> sink,
218 {void onError(AsyncError error), 218 {void onError(AsyncError error),
219 bool unsubscribeOnError}) { 219 bool unsubscribeOnError}) {
220 _FutureImpl<T> result = new _FutureImpl<T>(); 220 _FutureImpl<T> result = new _FutureImpl<T>();
221 this.listen( 221 this.listen(
222 sink.add, 222 sink.add,
223 onError: sink.signalError, 223 onError: sink.addError,
224 onDone: () { 224 onDone: () {
225 sink.close(); 225 sink.close();
226 result._setValue(null); 226 result._setValue(null);
227 }, 227 },
228 unsubscribeOnError: unsubscribeOnError); 228 unsubscribeOnError: unsubscribeOnError);
229 return result; 229 return result;
230 } 230 }
231 231
232 232
233 /** 233 /**
(...skipping 603 matching lines...) Expand 10 before | Expand all | Expand 10 after
837 void pause([Future resumeSignal]); 837 void pause([Future resumeSignal]);
838 838
839 /** 839 /**
840 * Resume after a pause. 840 * Resume after a pause.
841 */ 841 */
842 void resume(); 842 void resume();
843 } 843 }
844 844
845 845
846 /** 846 /**
847 * An interface that abstracts sending events into a [Stream]. 847 * *Deprecated*. Use [EventSink] instead.
848 */ 848 */
849 abstract class StreamSink<T> { 849 abstract class StreamSink<T> extends EventSink<T>{
850 /* TODO(8997): Remove class.*/
851 /** *Deprecated*. Use [EventSink.addError] instead.*/
852 void signalError(AsyncError errorEvent) { addError(errorEvent); }
853 }
854
855
856 /**
857 * An interface that abstracts creation or handling of [Stream] events.
858 */
859 abstract class EventSink<T> {
860 /** Create a data event */
850 void add(T event); 861 void add(T event);
851 /** Signal an async error to the receivers of this sink's values. */ 862 /** Create an async error. */
852 void signalError(AsyncError errorEvent); 863 void addError(AsyncError errorEvent);
864 /** Request a stream to close. */
853 void close(); 865 void close();
854 } 866 }
855 867
868
856 /** [Stream] wrapper that only exposes the [Stream] interface. */ 869 /** [Stream] wrapper that only exposes the [Stream] interface. */
857 class StreamView<T> extends Stream<T> { 870 class StreamView<T> extends Stream<T> {
858 Stream<T> _stream; 871 Stream<T> _stream;
859 872
860 StreamView(this._stream); 873 StreamView(this._stream);
861 874
862 bool get isBroadcast => _stream.isBroadcast; 875 bool get isBroadcast => _stream.isBroadcast;
863 876
864 Stream<T> asBroadcastStream() => _stream.asBroadcastStream(); 877 Stream<T> asBroadcastStream() => _stream.asBroadcastStream();
865 878
866 StreamSubscription<T> listen(void onData(T value), 879 StreamSubscription<T> listen(void onData(T value),
867 { void onError(AsyncError error), 880 { void onError(AsyncError error),
868 void onDone(), 881 void onDone(),
869 bool unsubscribeOnError }) { 882 bool unsubscribeOnError }) {
870 return _stream.listen(onData, onError: onError, onDone: onDone, 883 return _stream.listen(onData, onError: onError, onDone: onDone,
871 unsubscribeOnError: unsubscribeOnError); 884 unsubscribeOnError: unsubscribeOnError);
872 } 885 }
873 } 886 }
874 887
875 /** 888 /**
876 * [StreamSink] wrapper that only exposes the [StreamSink] interface. 889 * [EventSink] wrapper that only exposes the [EventSink] interface.
877 */ 890 */
878 class StreamSinkView<T> implements StreamSink<T> { 891 class EventSinkView<T> extends StreamSink<T> {
879 final StreamSink<T> _sink; 892 // TODO(8997): Implment EventSink instead.
893 final EventSink<T> _sink;
880 894
881 StreamSinkView(this._sink); 895 EventSinkView(this._sink);
882 896
883 void add(T value) { _sink.add(value); } 897 void add(T value) { _sink.add(value); }
884 void signalError(AsyncError error) { _sink.signalError(error); } 898 void addError(AsyncError error) { _sink.addError(error); }
885 void close() { _sink.close(); } 899 void close() { _sink.close(); }
886 } 900 }
887 901
888 902
889 /** 903 /**
890 * The target of a [Stream.pipe] call. 904 * The target of a [Stream.pipe] call.
891 * 905 *
892 * The [Stream.pipe] call will pass itself to this object, and then return 906 * The [Stream.pipe] call will pass itself to this object, and then return
893 * the resulting [Future]. The pipe should complete the future when it's 907 * the resulting [Future]. The pipe should complete the future when it's
894 * done. 908 * done.
(...skipping 14 matching lines...) Expand all
909 * Create a [StreamTransformer] that delegates events to the given functions. 923 * Create a [StreamTransformer] that delegates events to the given functions.
910 * 924 *
911 * This is actually a [StreamEventTransformer] where the event handling is 925 * This is actually a [StreamEventTransformer] where the event handling is
912 * performed by the function arguments. 926 * performed by the function arguments.
913 * If an argument is omitted, it acts as the corresponding default method from 927 * If an argument is omitted, it acts as the corresponding default method from
914 * [StreamEventTransformer]. 928 * [StreamEventTransformer].
915 * 929 *
916 * Example use: 930 * Example use:
917 * 931 *
918 * stringStream.transform(new StreamTransformer<String, String>( 932 * stringStream.transform(new StreamTransformer<String, String>(
919 * handleData: (Strung value, StreamSink<String> sink) { 933 * handleData: (Strung value, EventSink<String> sink) {
920 * sink.add(value); 934 * sink.add(value);
921 * sink.add(value); // Duplicate the incoming events. 935 * sink.add(value); // Duplicate the incoming events.
922 * })); 936 * }));
923 * 937 *
924 */ 938 */
925 factory StreamTransformer({ 939 factory StreamTransformer({
926 void handleData(S data, StreamSink<T> sink), 940 void handleData(S data, EventSink<T> sink),
927 void handleError(AsyncError error, StreamSink<T> sink), 941 void handleError(AsyncError error, EventSink<T> sink),
928 void handleDone(StreamSink<T> sink)}) { 942 void handleDone(EventSink<T> sink)}) {
929 return new _StreamTransformerImpl<S, T>(handleData, 943 return new _StreamTransformerImpl<S, T>(handleData,
930 handleError, 944 handleError,
931 handleDone); 945 handleDone);
932 } 946 }
933 947
934 Stream<T> bind(Stream<S> stream); 948 Stream<T> bind(Stream<S> stream);
935 } 949 }
936 950
937 951
938 /** 952 /**
939 * Base class for transformers that modifies stream events. 953 * Base class for transformers that modifies stream events.
940 * 954 *
941 * A [StreamEventTransformer] transforms incoming Stream 955 * A [StreamEventTransformer] transforms incoming Stream
942 * events of one kind into outgoing events of (possibly) another kind. 956 * events of one kind into outgoing events of (possibly) another kind.
943 * 957 *
944 * Subscribing on the stream returned by [bind] is the same as subscribing on 958 * Subscribing on the stream returned by [bind] is the same as subscribing on
945 * the source stream, except that events are passed through the [transformer] 959 * the source stream, except that events are passed through the [transformer]
946 * before being emitted. The transformer may generate any number and 960 * before being emitted. The transformer may generate any number and
947 * types of events for each incoming event. Pauses on the returned 961 * types of events for each incoming event. Pauses on the returned
948 * subscription are forwarded to this stream. 962 * subscription are forwarded to this stream.
949 * 963 *
950 * An example that duplicates all data events: 964 * An example that duplicates all data events:
951 * 965 *
952 * class DoubleTransformer<T> extends StreamEventTransformerBase<T, T> { 966 * class DoubleTransformer<T> extends StreamEventTransformerBase<T, T> {
953 * void handleData(T data, StreamSink<T> sink) { 967 * void handleData(T data, EventSink<T> sink) {
954 * sink.add(value); 968 * sink.add(value);
955 * sink.add(value); 969 * sink.add(value);
956 * } 970 * }
957 * } 971 * }
958 * someTypeStream.transform(new DoubleTransformer<Type>()); 972 * someTypeStream.transform(new DoubleTransformer<Type>());
959 * 973 *
960 * The default implementations of the "handle" methods forward 974 * The default implementations of the "handle" methods forward
961 * the events unmodified. If using the default [handleData] the generic type [T] 975 * the events unmodified. If using the default [handleData] the generic type [T]
962 * needs to be assignable to [S]. 976 * needs to be assignable to [S].
963 */ 977 */
964 abstract class StreamEventTransformer<S, T> implements StreamTransformer<S, T> { 978 abstract class StreamEventTransformer<S, T> implements StreamTransformer<S, T> {
965 const StreamEventTransformer(); 979 const StreamEventTransformer();
966 980
967 Stream<T> bind(Stream<S> source) { 981 Stream<T> bind(Stream<S> source) {
968 return new EventTransformStream<S, T>(source, this); 982 return new EventTransformStream<S, T>(source, this);
969 } 983 }
970 984
971 /** 985 /**
972 * Act on incoming data event. 986 * Act on incoming data event.
973 * 987 *
974 * The method may generate any number of events on the sink, but should 988 * The method may generate any number of events on the sink, but should
975 * not throw. 989 * not throw.
976 */ 990 */
977 void handleData(S event, StreamSink<T> sink) { 991 void handleData(S event, EventSink<T> sink) {
978 var data = event; 992 var data = event;
979 sink.add(data); 993 sink.add(data);
980 } 994 }
981 995
982 /** 996 /**
983 * Act on incoming error event. 997 * Act on incoming error event.
984 * 998 *
985 * The method may generate any number of events on the sink, but should 999 * The method may generate any number of events on the sink, but should
986 * not throw. 1000 * not throw.
987 */ 1001 */
988 void handleError(AsyncError error, StreamSink<T> sink) { 1002 void handleError(AsyncError error, EventSink<T> sink) {
989 sink.signalError(error); 1003 sink.addError(error);
990 } 1004 }
991 1005
992 /** 1006 /**
993 * Act on incoming done event. 1007 * Act on incoming done event.
994 * 1008 *
995 * The method may generate any number of events on the sink, but should 1009 * The method may generate any number of events on the sink, but should
996 * not throw. 1010 * not throw.
997 */ 1011 */
998 void handleDone(StreamSink<T> sink){ 1012 void handleDone(EventSink<T> sink){
999 sink.close(); 1013 sink.close();
1000 } 1014 }
1001 } 1015 }
1002 1016
1003 1017
1004 /** 1018 /**
1005 * Stream that transforms another stream by intercepting and replacing events. 1019 * Stream that transforms another stream by intercepting and replacing events.
1006 * 1020 *
1007 * This [Stream] is a transformation of a source stream. Listening on this 1021 * This [Stream] is a transformation of a source stream. Listening on this
1008 * stream is the same as listening on the source stream, except that events 1022 * stream is the same as listening on the source stream, except that events
(...skipping 13 matching lines...) Expand all
1022 bool unsubscribeOnError }) { 1036 bool unsubscribeOnError }) {
1023 unsubscribeOnError = identical(true, unsubscribeOnError); 1037 unsubscribeOnError = identical(true, unsubscribeOnError);
1024 return new _EventTransformStreamSubscription(_source, _transformer, 1038 return new _EventTransformStreamSubscription(_source, _transformer,
1025 onData, onError, onDone, 1039 onData, onError, onDone,
1026 unsubscribeOnError); 1040 unsubscribeOnError);
1027 } 1041 }
1028 } 1042 }
1029 1043
1030 class _EventTransformStreamSubscription<S, T> 1044 class _EventTransformStreamSubscription<S, T>
1031 extends _BaseStreamSubscription<T> 1045 extends _BaseStreamSubscription<T>
1032 implements _StreamOutputSink<T> { 1046 implements _EventOutputSink<T> {
1033 /** The transformer used to transform events. */ 1047 /** The transformer used to transform events. */
1034 final StreamEventTransformer<S, T> _transformer; 1048 final StreamEventTransformer<S, T> _transformer;
1035 /** Whether to unsubscribe when emitting an error. */ 1049 /** Whether to unsubscribe when emitting an error. */
1036 final bool _unsubscribeOnError; 1050 final bool _unsubscribeOnError;
1037 /** Source of incoming events. */ 1051 /** Source of incoming events. */
1038 StreamSubscription<S> _subscription; 1052 StreamSubscription<S> _subscription;
1039 /** Cached StreamSink wrapper for this class. */ 1053 /** Cached EventSink wrapper for this class. */
1040 StreamSink<T> _sink; 1054 EventSink<T> _sink;
1041 1055
1042 _EventTransformStreamSubscription(Stream<S> source, 1056 _EventTransformStreamSubscription(Stream<S> source,
1043 this._transformer, 1057 this._transformer,
1044 void onData(T data), 1058 void onData(T data),
1045 void onError(AsyncError error), 1059 void onError(AsyncError error),
1046 void onDone(), 1060 void onDone(),
1047 this._unsubscribeOnError) 1061 this._unsubscribeOnError)
1048 : super(onData, onError, onDone) { 1062 : super(onData, onError, onDone) {
1049 _sink = new _StreamOutputSinkWrapper<T>(this); 1063 _sink = new _EventOutputSinkWrapper<T>(this);
1050 _subscription = source.listen(_handleData, 1064 _subscription = source.listen(_handleData,
1051 onError: _handleError, 1065 onError: _handleError,
1052 onDone: _handleDone); 1066 onDone: _handleDone);
1053 } 1067 }
1054 1068
1055 void pause([Future pauseSignal]) { 1069 void pause([Future pauseSignal]) {
1056 if (_subscription != null) _subscription.pause(pauseSignal); 1070 if (_subscription != null) _subscription.pause(pauseSignal);
1057 } 1071 }
1058 1072
1059 void resume() { 1073 void resume() {
(...skipping 25 matching lines...) Expand all
1085 1099
1086 void _handleDone() { 1100 void _handleDone() {
1087 _subscription = null; 1101 _subscription = null;
1088 try { 1102 try {
1089 _transformer.handleDone(_sink); 1103 _transformer.handleDone(_sink);
1090 } catch (e, s) { 1104 } catch (e, s) {
1091 _sendError(_asyncError(e, s)); 1105 _sendError(_asyncError(e, s));
1092 } 1106 }
1093 } 1107 }
1094 1108
1095 // StreamOutputSink interface. 1109 // EventOutputSink interface.
1096 void _sendData(T data) { 1110 void _sendData(T data) {
1097 _onData(data); 1111 _onData(data);
1098 } 1112 }
1099 1113
1100 void _sendError(AsyncError error) { 1114 void _sendError(AsyncError error) {
1101 _onError(error); 1115 _onError(error);
1102 if (_unsubscribeOnError) { 1116 if (_unsubscribeOnError) {
1103 cancel(); 1117 cancel();
1104 } 1118 }
1105 } 1119 }
1106 1120
1107 void _sendDone() { 1121 void _sendDone() {
1108 // It's ok to cancel even if we have been unsubscribed already. 1122 // It's ok to cancel even if we have been unsubscribed already.
1109 cancel(); 1123 cancel();
1110 _onDone(); 1124 _onDone();
1111 } 1125 }
1112 } 1126 }
1113 1127
1114 class _StreamOutputSinkWrapper<T> implements StreamSink<T> { 1128 /* TODO(8997): Implement EventSink instead, */
1115 _StreamOutputSink _sink; 1129 class _EventOutputSinkWrapper<T> extends StreamSink<T> {
1116 _StreamOutputSinkWrapper(this._sink); 1130 _EventOutputSink _sink;
1131 _EventOutputSinkWrapper(this._sink);
1117 1132
1118 void add(T data) => _sink._sendData(data); 1133 void add(T data) { _sink._sendData(data); }
1119 void signalError(AsyncError error) => _sink._sendError(error); 1134 void addError(AsyncError error) { _sink._sendError(error); }
1120 void close() => _sink._sendDone(); 1135 void close() { _sink._sendDone(); }
1121 } 1136 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698