Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(36)

Side by Side Diff: sdk/lib/async/stream.dart

Issue 12610006: Renamed StreamSink to EventSink. Renamed signalError to addError. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 7 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Core Stream types 8 // Core Stream types
9 // ------------------------------------------------------------------- 9 // -------------------------------------------------------------------
10 10
(...skipping 50 matching lines...) Expand 10 before | Expand all | Expand 10 after
61 * When the future completes, the stream will fire one event, either 61 * When the future completes, the stream will fire one event, either
62 * data or error, and then close with a done-event. 62 * data or error, and then close with a done-event.
63 */ 63 */
64 factory Stream.fromFuture(Future<T> future) { 64 factory Stream.fromFuture(Future<T> future) {
65 _StreamImpl<T> stream = new _SingleStreamImpl<T>(); 65 _StreamImpl<T> stream = new _SingleStreamImpl<T>();
66 future.then((value) { 66 future.then((value) {
67 stream._add(value); 67 stream._add(value);
68 stream._close(); 68 stream._close();
69 }, 69 },
70 onError: (error) { 70 onError: (error) {
71 stream._signalError(error); 71 stream._addError(error);
72 stream._close(); 72 stream._close();
73 }); 73 });
74 return stream; 74 return stream;
75 } 75 }
76 76
77 /** 77 /**
78 * Creates a single-subscription stream that gets its data from [data]. 78 * Creates a single-subscription stream that gets its data from [data].
79 */ 79 */
80 factory Stream.fromIterable(Iterable<T> data) { 80 factory Stream.fromIterable(Iterable<T> data) {
81 _PendingEvents iterableEvents = new _IterablePendingEvents<T>(data); 81 _PendingEvents iterableEvents = new _IterablePendingEvents<T>(data);
(...skipping 125 matching lines...) Expand 10 before | Expand all | Expand 10 after
207 result._setError(e); 207 result._setError(e);
208 }, 208 },
209 onDone: () { 209 onDone: () {
210 result._setValue(value); 210 result._setValue(value);
211 }, 211 },
212 unsubscribeOnError: true); 212 unsubscribeOnError: true);
213 return result; 213 return result;
214 } 214 }
215 215
216 // Deprecated method, previously called 'pipe', retained for compatibility. 216 // Deprecated method, previously called 'pipe', retained for compatibility.
217 Future pipeInto(StreamSink<T> sink, 217 Future pipeInto(EventSink<T> sink,
218 {void onError(AsyncError error), 218 {void onError(AsyncError error),
219 bool unsubscribeOnError}) { 219 bool unsubscribeOnError}) {
220 _FutureImpl<T> result = new _FutureImpl<T>(); 220 _FutureImpl<T> result = new _FutureImpl<T>();
221 this.listen( 221 this.listen(
222 sink.add, 222 sink.add,
223 onError: sink.signalError, 223 onError: sink.addError,
224 onDone: () { 224 onDone: () {
225 sink.close(); 225 sink.close();
226 result._setValue(null); 226 result._setValue(null);
227 }, 227 },
228 unsubscribeOnError: unsubscribeOnError); 228 unsubscribeOnError: unsubscribeOnError);
229 return result; 229 return result;
230 } 230 }
231 231
232 232
233 /** 233 /**
(...skipping 605 matching lines...) Expand 10 before | Expand all | Expand 10 after
839 /** 839 /**
840 * Resume after a pause. 840 * Resume after a pause.
841 */ 841 */
842 void resume(); 842 void resume();
843 } 843 }
844 844
845 845
846 /** 846 /**
847 * An interface that abstracts sending events into a [Stream]. 847 * An interface that abstracts sending events into a [Stream].
848 */ 848 */
849 abstract class StreamSink<T> { 849 abstract class EventSink<T> {
850 void add(T event); 850 void add(T event);
851 /** Signal an async error to the receivers of this sink's values. */ 851 /** Signal an async error to the receivers of this sink's values. */
852 void signalError(AsyncError errorEvent); 852 void addError(AsyncError errorEvent);
853 /** *Deprecated*. Use [addError] instead. */
854 void signalError(AsyncError errorEvent) {
floitsch 2013/03/07 14:19:34 Move this method to StreamSink below. Users that u
Lasse Reichstein Nielsen 2013/03/08 10:18:25 That's problematic. This is very much a consumed i
855 addError(errorEvent);
856 }
853 void close(); 857 void close();
854 } 858 }
855 859
860 /**
861 * *Deprecated*. Use [EventSink] instead.
862 */
863 abstract class StreamSink<T> extends EventSink<T> {}
floitsch 2013/03/07 14:19:34 ditto for @deprecated.
Lasse Reichstein Nielsen 2013/03/08 10:18:25 Done.
864
856 /** [Stream] wrapper that only exposes the [Stream] interface. */ 865 /** [Stream] wrapper that only exposes the [Stream] interface. */
857 class StreamView<T> extends Stream<T> { 866 class StreamView<T> extends Stream<T> {
858 Stream<T> _stream; 867 Stream<T> _stream;
859 868
860 StreamView(this._stream); 869 StreamView(this._stream);
861 870
862 bool get isBroadcast => _stream.isBroadcast; 871 bool get isBroadcast => _stream.isBroadcast;
863 872
864 Stream<T> asBroadcastStream() => _stream.asBroadcastStream(); 873 Stream<T> asBroadcastStream() => _stream.asBroadcastStream();
865 874
866 StreamSubscription<T> listen(void onData(T value), 875 StreamSubscription<T> listen(void onData(T value),
867 { void onError(AsyncError error), 876 { void onError(AsyncError error),
868 void onDone(), 877 void onDone(),
869 bool unsubscribeOnError }) { 878 bool unsubscribeOnError }) {
870 return _stream.listen(onData, onError: onError, onDone: onDone, 879 return _stream.listen(onData, onError: onError, onDone: onDone,
871 unsubscribeOnError: unsubscribeOnError); 880 unsubscribeOnError: unsubscribeOnError);
872 } 881 }
873 } 882 }
874 883
875 /** 884 /**
876 * [StreamSink] wrapper that only exposes the [StreamSink] interface. 885 * [EventSink] wrapper that only exposes the [EventSink] interface.
877 */ 886 */
878 class StreamSinkView<T> implements StreamSink<T> { 887 class EventSinkView<T> implements EventSink<T> {
879 final StreamSink<T> _sink; 888 final EventSink<T> _sink;
880 889
881 StreamSinkView(this._sink); 890 EventSinkView(this._sink);
882 891
883 void add(T value) { _sink.add(value); } 892 void add(T value) { _sink.add(value); }
884 void signalError(AsyncError error) { _sink.signalError(error); } 893 void addError(AsyncError error) { _sink.addError(error); }
894 void signalError(AsyncError error) { addError(error); }
floitsch 2013/03/07 14:19:34 Add TODO? Alternatively add comment to EventSink.s
Lasse Reichstein Nielsen 2013/03/08 10:18:25 Added TODO(8997) where relevant (and created bug f
885 void close() { _sink.close(); } 895 void close() { _sink.close(); }
886 } 896 }
887 897
888 898
889 /** 899 /**
890 * The target of a [Stream.pipe] call. 900 * The target of a [Stream.pipe] call.
891 * 901 *
892 * The [Stream.pipe] call will pass itself to this object, and then return 902 * The [Stream.pipe] call will pass itself to this object, and then return
893 * the resulting [Future]. The pipe should complete the future when it's 903 * the resulting [Future]. The pipe should complete the future when it's
894 * done. 904 * done.
(...skipping 14 matching lines...) Expand all
909 * Create a [StreamTransformer] that delegates events to the given functions. 919 * Create a [StreamTransformer] that delegates events to the given functions.
910 * 920 *
911 * This is actually a [StreamEventTransformer] where the event handling is 921 * This is actually a [StreamEventTransformer] where the event handling is
912 * performed by the function arguments. 922 * performed by the function arguments.
913 * If an argument is omitted, it acts as the corresponding default method from 923 * If an argument is omitted, it acts as the corresponding default method from
914 * [StreamEventTransformer]. 924 * [StreamEventTransformer].
915 * 925 *
916 * Example use: 926 * Example use:
917 * 927 *
918 * stringStream.transform(new StreamTransformer<String, String>( 928 * stringStream.transform(new StreamTransformer<String, String>(
919 * handleData: (Strung value, StreamSink<String> sink) { 929 * handleData: (Strung value, EventSink<String> sink) {
920 * sink.add(value); 930 * sink.add(value);
921 * sink.add(value); // Duplicate the incoming events. 931 * sink.add(value); // Duplicate the incoming events.
922 * })); 932 * }));
923 * 933 *
924 */ 934 */
925 factory StreamTransformer({ 935 factory StreamTransformer({
926 void handleData(S data, StreamSink<T> sink), 936 void handleData(S data, EventSink<T> sink),
927 void handleError(AsyncError error, StreamSink<T> sink), 937 void handleError(AsyncError error, EventSink<T> sink),
928 void handleDone(StreamSink<T> sink)}) { 938 void handleDone(EventSink<T> sink)}) {
929 return new _StreamTransformerImpl<S, T>(handleData, 939 return new _StreamTransformerImpl<S, T>(handleData,
930 handleError, 940 handleError,
931 handleDone); 941 handleDone);
932 } 942 }
933 943
934 Stream<T> bind(Stream<S> stream); 944 Stream<T> bind(Stream<S> stream);
935 } 945 }
936 946
937 947
938 /** 948 /**
939 * Base class for transformers that modifies stream events. 949 * Base class for transformers that modifies stream events.
940 * 950 *
941 * A [StreamEventTransformer] transforms incoming Stream 951 * A [StreamEventTransformer] transforms incoming Stream
942 * events of one kind into outgoing events of (possibly) another kind. 952 * events of one kind into outgoing events of (possibly) another kind.
943 * 953 *
944 * Subscribing on the stream returned by [bind] is the same as subscribing on 954 * Subscribing on the stream returned by [bind] is the same as subscribing on
945 * the source stream, except that events are passed through the [transformer] 955 * the source stream, except that events are passed through the [transformer]
946 * before being emitted. The transformer may generate any number and 956 * before being emitted. The transformer may generate any number and
947 * types of events for each incoming event. Pauses on the returned 957 * types of events for each incoming event. Pauses on the returned
948 * subscription are forwarded to this stream. 958 * subscription are forwarded to this stream.
949 * 959 *
950 * An example that duplicates all data events: 960 * An example that duplicates all data events:
951 * 961 *
952 * class DoubleTransformer<T> extends StreamEventTransformerBase<T, T> { 962 * class DoubleTransformer<T> extends StreamEventTransformerBase<T, T> {
953 * void handleData(T data, StreamSink<T> sink) { 963 * void handleData(T data, EventSink<T> sink) {
954 * sink.add(value); 964 * sink.add(value);
955 * sink.add(value); 965 * sink.add(value);
956 * } 966 * }
957 * } 967 * }
958 * someTypeStream.transform(new DoubleTransformer<Type>()); 968 * someTypeStream.transform(new DoubleTransformer<Type>());
959 * 969 *
960 * The default implementations of the "handle" methods forward 970 * The default implementations of the "handle" methods forward
961 * the events unmodified. If using the default [handleData] the generic type [T] 971 * the events unmodified. If using the default [handleData] the generic type [T]
962 * needs to be assignable to [S]. 972 * needs to be assignable to [S].
963 */ 973 */
964 abstract class StreamEventTransformer<S, T> implements StreamTransformer<S, T> { 974 abstract class StreamEventTransformer<S, T> implements StreamTransformer<S, T> {
965 const StreamEventTransformer(); 975 const StreamEventTransformer();
966 976
967 Stream<T> bind(Stream<S> source) { 977 Stream<T> bind(Stream<S> source) {
968 return new EventTransformStream<S, T>(source, this); 978 return new EventTransformStream<S, T>(source, this);
969 } 979 }
970 980
971 /** 981 /**
972 * Act on incoming data event. 982 * Act on incoming data event.
973 * 983 *
974 * The method may generate any number of events on the sink, but should 984 * The method may generate any number of events on the sink, but should
975 * not throw. 985 * not throw.
976 */ 986 */
977 void handleData(S event, StreamSink<T> sink) { 987 void handleData(S event, EventSink<T> sink) {
978 var data = event; 988 var data = event;
979 sink.add(data); 989 sink.add(data);
980 } 990 }
981 991
982 /** 992 /**
983 * Act on incoming error event. 993 * Act on incoming error event.
984 * 994 *
985 * The method may generate any number of events on the sink, but should 995 * The method may generate any number of events on the sink, but should
986 * not throw. 996 * not throw.
987 */ 997 */
988 void handleError(AsyncError error, StreamSink<T> sink) { 998 void handleError(AsyncError error, EventSink<T> sink) {
989 sink.signalError(error); 999 sink.addError(error);
990 } 1000 }
991 1001
992 /** 1002 /**
993 * Act on incoming done event. 1003 * Act on incoming done event.
994 * 1004 *
995 * The method may generate any number of events on the sink, but should 1005 * The method may generate any number of events on the sink, but should
996 * not throw. 1006 * not throw.
997 */ 1007 */
998 void handleDone(StreamSink<T> sink){ 1008 void handleDone(EventSink<T> sink){
999 sink.close(); 1009 sink.close();
1000 } 1010 }
1001 } 1011 }
1002 1012
1003 1013
1004 /** 1014 /**
1005 * Stream that transforms another stream by intercepting and replacing events. 1015 * Stream that transforms another stream by intercepting and replacing events.
1006 * 1016 *
1007 * This [Stream] is a transformation of a source stream. Listening on this 1017 * This [Stream] is a transformation of a source stream. Listening on this
1008 * stream is the same as listening on the source stream, except that events 1018 * stream is the same as listening on the source stream, except that events
(...skipping 13 matching lines...) Expand all
1022 bool unsubscribeOnError }) { 1032 bool unsubscribeOnError }) {
1023 unsubscribeOnError = identical(true, unsubscribeOnError); 1033 unsubscribeOnError = identical(true, unsubscribeOnError);
1024 return new _EventTransformStreamSubscription(_source, _transformer, 1034 return new _EventTransformStreamSubscription(_source, _transformer,
1025 onData, onError, onDone, 1035 onData, onError, onDone,
1026 unsubscribeOnError); 1036 unsubscribeOnError);
1027 } 1037 }
1028 } 1038 }
1029 1039
1030 class _EventTransformStreamSubscription<S, T> 1040 class _EventTransformStreamSubscription<S, T>
1031 extends _BaseStreamSubscription<T> 1041 extends _BaseStreamSubscription<T>
1032 implements _StreamOutputSink<T> { 1042 implements _EventOutputSink<T> {
1033 /** The transformer used to transform events. */ 1043 /** The transformer used to transform events. */
1034 final StreamEventTransformer<S, T> _transformer; 1044 final StreamEventTransformer<S, T> _transformer;
1035 /** Whether to unsubscribe when emitting an error. */ 1045 /** Whether to unsubscribe when emitting an error. */
1036 final bool _unsubscribeOnError; 1046 final bool _unsubscribeOnError;
1037 /** Source of incoming events. */ 1047 /** Source of incoming events. */
1038 StreamSubscription<S> _subscription; 1048 StreamSubscription<S> _subscription;
1039 /** Cached StreamSink wrapper for this class. */ 1049 /** Cached EventSink wrapper for this class. */
1040 StreamSink<T> _sink; 1050 EventSink<T> _sink;
1041 1051
1042 _EventTransformStreamSubscription(Stream<S> source, 1052 _EventTransformStreamSubscription(Stream<S> source,
1043 this._transformer, 1053 this._transformer,
1044 void onData(T data), 1054 void onData(T data),
1045 void onError(AsyncError error), 1055 void onError(AsyncError error),
1046 void onDone(), 1056 void onDone(),
1047 this._unsubscribeOnError) 1057 this._unsubscribeOnError)
1048 : super(onData, onError, onDone) { 1058 : super(onData, onError, onDone) {
1049 _sink = new _StreamOutputSinkWrapper<T>(this); 1059 _sink = new _EventOutputSinkWrapper<T>(this);
1050 _subscription = source.listen(_handleData, 1060 _subscription = source.listen(_handleData,
1051 onError: _handleError, 1061 onError: _handleError,
1052 onDone: _handleDone); 1062 onDone: _handleDone);
1053 } 1063 }
1054 1064
1055 void pause([Future pauseSignal]) { 1065 void pause([Future pauseSignal]) {
1056 if (_subscription != null) _subscription.pause(pauseSignal); 1066 if (_subscription != null) _subscription.pause(pauseSignal);
1057 } 1067 }
1058 1068
1059 void resume() { 1069 void resume() {
(...skipping 25 matching lines...) Expand all
1085 1095
1086 void _handleDone() { 1096 void _handleDone() {
1087 _subscription = null; 1097 _subscription = null;
1088 try { 1098 try {
1089 _transformer.handleDone(_sink); 1099 _transformer.handleDone(_sink);
1090 } catch (e, s) { 1100 } catch (e, s) {
1091 _sendError(_asyncError(e, s)); 1101 _sendError(_asyncError(e, s));
1092 } 1102 }
1093 } 1103 }
1094 1104
1095 // StreamOutputSink interface. 1105 // EventOutputSink interface.
1096 void _sendData(T data) { 1106 void _sendData(T data) {
1097 _onData(data); 1107 _onData(data);
1098 } 1108 }
1099 1109
1100 void _sendError(AsyncError error) { 1110 void _sendError(AsyncError error) {
1101 _onError(error); 1111 _onError(error);
1102 if (_unsubscribeOnError) { 1112 if (_unsubscribeOnError) {
1103 cancel(); 1113 cancel();
1104 } 1114 }
1105 } 1115 }
1106 1116
1107 void _sendDone() { 1117 void _sendDone() {
1108 // It's ok to cancel even if we have been unsubscribed already. 1118 // It's ok to cancel even if we have been unsubscribed already.
1109 cancel(); 1119 cancel();
1110 _onDone(); 1120 _onDone();
1111 } 1121 }
1112 } 1122 }
1113 1123
1114 class _StreamOutputSinkWrapper<T> implements StreamSink<T> { 1124 class _EventOutputSinkWrapper<T> implements EventSink<T> {
1115 _StreamOutputSink _sink; 1125 _EventOutputSink _sink;
1116 _StreamOutputSinkWrapper(this._sink); 1126 _EventOutputSinkWrapper(this._sink);
1117 1127
1118 void add(T data) => _sink._sendData(data); 1128 void add(T data) { _sink._sendData(data); }
1119 void signalError(AsyncError error) => _sink._sendError(error); 1129 void addError(AsyncError error) { _sink._sendError(error); }
1120 void close() => _sink._sendDone(); 1130 void signalError(AsyncError error) { addError(error); }
1131 void close() { _sink._sendDone(); }
1121 } 1132 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698