Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(28)

Side by Side Diff: sdk/lib/async/stream.dart

Issue 25354003: Redo StreamTransformers so they work with Stack traces. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Small fixes and tests. Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Core Stream types 8 // Core Stream types
9 // ------------------------------------------------------------------- 9 // -------------------------------------------------------------------
10 10
(...skipping 129 matching lines...) Expand 10 before | Expand all | Expand 10 after
140 }); 140 });
141 }, 141 },
142 onCancel: () { 142 onCancel: () {
143 if (timer != null) timer.cancel(); 143 if (timer != null) timer.cancel();
144 timer = null; 144 timer = null;
145 }); 145 });
146 return controller.stream; 146 return controller.stream;
147 } 147 }
148 148
149 /** 149 /**
150 * Creates a stream where all events of an existing stream are piped through
151 * a sink-transformation.
152 *
153 * The given [mapSink] closure is invoked when the returned stream is
154 * listened to. All events from the [source] are added into the event sink
155 * that is returned from the invocation. The transformation puts all
156 * transformed events into the sink the [mapSink] closure received during
157 * its invocation. Conceptually the [mapSink] creates a transformation pipe
158 * with the input sink being the returned [EventSink] and the output sink
159 * being the sink it received.
160 *
161 * This constructor is frequently used to build transformers.
162 *
163 * Example use for a duplicating transformer:
164 *
165 * class DuplicationSink implements EventSink<String> {
166 * final EventSink<String> _outputSink;
167 * DuplicationSink(this._outputSink);
168 *
169 * void add(String data) {
170 * _outputSink.add(data);
171 * _outputSink.add(data);
172 * }
173 *
174 * void addError(e, [st]) => _outputSink(e, st);
175 * void close() => _outputSink.close();
176 * }
177 *
178 * class DuplicationTransformer implements StreamTransformer<String, Strin g> {
179 * // Some generic types ommitted for brevety.
180 * Stream bind(Stream stream) => new Stream<String>.eventTransform(
181 * stream,
182 * (EventSink sink) => new DuplicationSink(sink));
183 * }
184 *
185 * stringStream.transform(new DuplicationTransformer());
186 */
187 factory Stream.eventTransformed(Stream source,
188 EventSink mapSink(EventSink<T> sink)) {
189 return new _BoundSinkStream(source, mapSink);
190 }
191
192 /**
150 * Reports whether this stream is a broadcast stream. 193 * Reports whether this stream is a broadcast stream.
151 */ 194 */
152 bool get isBroadcast => false; 195 bool get isBroadcast => false;
153 196
154 /** 197 /**
155 * Returns a multi-subscription stream that produces the same events as this. 198 * Returns a multi-subscription stream that produces the same events as this.
156 * 199 *
157 * If this stream is already a broadcast stream, it is returned unmodified. 200 * If this stream is already a broadcast stream, it is returned unmodified.
158 * 201 *
159 * If this stream is single-subscription, return a new stream that allows 202 * If this stream is single-subscription, return a new stream that allows
(...skipping 783 matching lines...) Expand 10 before | Expand all | Expand 10 after
943 } 986 }
944 987
945 988
946 /** 989 /**
947 * An interface that abstracts creation or handling of [Stream] events. 990 * An interface that abstracts creation or handling of [Stream] events.
948 */ 991 */
949 abstract class EventSink<T> { 992 abstract class EventSink<T> {
950 /** Create a data event */ 993 /** Create a data event */
951 void add(T event); 994 void add(T event);
952 /** Create an async error. */ 995 /** Create an async error. */
953 void addError(errorEvent); 996 void addError(errorEvent, [StackTrace stackTrace]);
954 /** Request a stream to close. */ 997 /** Request a stream to close. */
955 void close(); 998 void close();
956 } 999 }
957 1000
958 1001
959 /** [Stream] wrapper that only exposes the [Stream] interface. */ 1002 /** [Stream] wrapper that only exposes the [Stream] interface. */
960 class StreamView<T> extends Stream<T> { 1003 class StreamView<T> extends Stream<T> {
961 Stream<T> _stream; 1004 Stream<T> _stream;
962 1005
963 StreamView(this._stream); 1006 StreamView(this._stream);
(...skipping 60 matching lines...) Expand 10 before | Expand all | Expand 10 after
1024 */ 1067 */
1025 Future get done; 1068 Future get done;
1026 } 1069 }
1027 1070
1028 1071
1029 /** 1072 /**
1030 * The target of a [Stream.transform] call. 1073 * The target of a [Stream.transform] call.
1031 * 1074 *
1032 * The [Stream.transform] call will pass itself to this object and then return 1075 * The [Stream.transform] call will pass itself to this object and then return
1033 * the resulting stream. 1076 * the resulting stream.
1077 *
1078 * It is good practice to create transformers that can be used multiple times.
Lasse Reichstein Nielsen 2013/10/07 11:47:00 By "create", do you mean that it is good practice
floitsch 2013/10/10 15:39:57 Done.
1034 */ 1079 */
1035 abstract class StreamTransformer<S, T> { 1080 abstract class StreamTransformer<S, T> {
1081
1036 /** 1082 /**
1037 * Create a [StreamTransformer] that delegates events to the given functions. 1083 * Creates a [StreamTransformer].
1038 * 1084 *
1039 * This is actually a [StreamEventTransformer] where the event handling is 1085 * The returned instance takes responsibility of binding ([bind]) and only
1040 * performed by the function arguments. 1086 * invokes the given closure [transformer] when a user starts listening on
1041 * If an argument is omitted, it acts as the corresponding default method from 1087 * the bound stream. At that point the transformer should start listening
Lasse Reichstein Nielsen 2013/10/07 11:47:00 User listens on the returned stream, not the bound
floitsch 2013/10/10 15:39:57 rewritten.
1042 * [StreamEventTransformer]. 1088 * to the bound stream (which is given as argument) and return a
1089 * [StreamSubscription]. The bound stream-transformer then sets the handlers
Lasse Reichstein Nielsen 2013/10/07 11:47:00 How is the stream-transformer bound, and to what?
floitsch 2013/10/10 15:39:57 rewritten.
1090 * it received as part of the `listen` call. Missing handlers (or the ones
Lasse Reichstein Nielsen 2013/10/07 11:47:00 Sounds like the handlers, which were received as p
floitsch 2013/10/10 15:39:57 rewritten.
1091 * that are `null`) are also set (with `null`).
Lasse Reichstein Nielsen 2013/10/07 11:47:00 Last sentence seems superflous.
floitsch 2013/10/10 15:39:57 It was meant to say that the Subscription can rely
1043 * 1092 *
1044 * Example use: 1093 * There are two common ways to create a StreamSubscription:
1094 *
1095 * 1. by creating a new class that implements [StreamSubscription].
1096 * Note that the subscription should run callbacks in the [Zone] the
1097 * stream was listened to.
1098 * 2. by allocating a [StreamController] and to return the result of
1099 * listening to its stream.
1100 *
1101 * Example use of a duplicating transformer:
1045 * 1102 *
1046 * stringStream.transform(new StreamTransformer<String, String>( 1103 * stringStream.transform(new StreamTransformer<String, String>(
1104 * (Stream<String> input, bool cancelOnError) {
1105 * StreamController<String> controller;
1106 * StreamSubscription<String> subscription;
1107 * controller = new StreamController<String>(
1108 * onListen: () {
1109 * subscription = input.listen((data) {
1110 * // Duplicate the data.
1111 * controller.add(data);
1112 * controller.add(data);
1113 * },
1114 * onError: controller.addError,
1115 * onDone: controller.close,
1116 * cancelOnError: cancelOnError);
1117 * },
1118 * onPause: subscription.pause,
1119 * onResume: subscription.resume,
1120 * onCancel: subscription.cancel,
1121 * sync: true); // One of the few places, where sync is correct.
Lasse Reichstein Nielsen 2013/10/07 11:47:00 Either remove the comment or elaborate on why it i
floitsch 2013/10/10 15:39:57 Removed.
1122 * return controller.stream.listen(null);
1123 * });
1124 */
1125 const factory StreamTransformer(
1126 StreamSubscription<T> transformer(Stream<S> stream, bool cancelOnError))
1127 = _StreamSubscriptionTransformer;
1128
1129 /**
1130 * Creates a [StreamTransformer] that delegates events to the given functions.
1131 *
1132 * This constructor returns a transformer that can only be used once.
Lasse Reichstein Nielsen 2013/10/07 11:47:00 Why can it only be used once? If the handlers them
floitsch 2013/10/10 15:39:57 Done.
1133 *
1134 * Example use of a duplicating transformer:
1135 *
1136 * stringStream.transform(new StreamTransformer<String, String>.fromHandle rs(
1047 * handleData: (String value, EventSink<String> sink) { 1137 * handleData: (String value, EventSink<String> sink) {
1048 * sink.add(value); 1138 * sink.add(value);
1049 * sink.add(value); // Duplicate the incoming events. 1139 * sink.add(value); // Duplicate the incoming events.
1050 * })); 1140 * }));
1051 *
1052 */ 1141 */
1053 factory StreamTransformer({ 1142 factory StreamTransformer.fromHandlers({
1054 void handleData(S data, EventSink<T> sink), 1143 void handleData(S data, EventSink<T> sink),
1055 Function handleError, 1144 void handleError(Object error, StackTrace stackTrace, EventSink<T> sink),
1056 void handleDone(EventSink<T> sink)}) { 1145 void handleDone(EventSink<T> sink)})
1057 return new _StreamTransformerImpl<S, T>(handleData, 1146 = _StreamHandlerTransformer;
1058 handleError,
1059 handleDone);
1060 }
1061 1147
1062 Stream<T> bind(Stream<S> stream); 1148 Stream<T> bind(Stream<S> stream);
1063 } 1149 }
1064 1150
1065
1066 /**
1067 * Base class for transformers that modifies stream events.
1068 *
1069 * A [StreamEventTransformer] transforms incoming Stream
1070 * events of one kind into outgoing events of (possibly) another kind.
1071 *
1072 * Subscribing on the stream returned by [bind] is the same as subscribing on
1073 * the source stream, except that events are passed through the [transformer]
1074 * before being emitted. The transformer may generate any number and
1075 * types of events for each incoming event. Pauses on the returned
1076 * subscription are forwarded to this stream.
1077 *
1078 * An example that duplicates all data events:
1079 *
1080 * class DoubleTransformer<T> extends StreamEventTransformer<T, T> {
1081 * void handleData(T data, EventSink<T> sink) {
1082 * sink.add(value);
1083 * sink.add(value);
1084 * }
1085 * }
1086 * someTypeStream.transform(new DoubleTransformer<Type>());
1087 *
1088 * The default implementations of the "handle" methods forward
1089 * the events unmodified. If using the default [handleData] the generic type [T]
1090 * needs to be assignable to [S].
1091 */
1092 abstract class StreamEventTransformer<S, T> implements StreamTransformer<S, T> {
1093 const StreamEventTransformer();
1094
1095 Stream<T> bind(Stream<S> source) {
1096 return new EventTransformStream<S, T>(source, this);
1097 }
1098
1099 /**
1100 * Act on incoming data event.
1101 *
1102 * The method may generate any number of events on the sink, but should
1103 * not throw.
1104 */
1105 void handleData(S event, EventSink<T> sink) {
1106 var data = event;
1107 sink.add(data);
1108 }
1109
1110 /**
1111 * Act on incoming error event.
1112 *
1113 * The method may generate any number of events on the sink, but should
1114 * not throw.
1115 */
1116 void handleError(error, EventSink<T> sink) {
1117 sink.addError(error);
1118 }
1119
1120 /**
1121 * Act on incoming done event.
1122 *
1123 * The method may generate any number of events on the sink, but should
1124 * not throw.
1125 */
1126 void handleDone(EventSink<T> sink){
1127 sink.close();
1128 }
1129 }
1130
1131
1132 /**
1133 * Stream that transforms another stream by intercepting and replacing events.
1134 *
1135 * This [Stream] is a transformation of a source stream. Listening on this
1136 * stream is the same as listening on the source stream, except that events
1137 * are intercepted and modified by a [StreamEventTransformer] before becoming
1138 * events on this stream.
1139 */
1140 class EventTransformStream<S, T> extends Stream<T> {
1141 final Stream<S> _source;
1142 final StreamEventTransformer _transformer;
1143 EventTransformStream(Stream<S> source,
1144 StreamEventTransformer<S, T> transformer)
1145 : _source = source, _transformer = transformer;
1146
1147 StreamSubscription<T> listen(void onData(T data),
1148 { Function onError,
1149 void onDone(),
1150 bool cancelOnError }) {
1151 if (onData == null) onData = _nullDataHandler;
1152 if (onError == null) onError = _nullErrorHandler;
1153 if (onDone == null) onDone = _nullDoneHandler;
1154 cancelOnError = identical(true, cancelOnError);
1155 return new _EventTransformStreamSubscription(_source, _transformer,
1156 onData, onError, onDone,
1157 cancelOnError);
1158 }
1159 }
1160
1161 class _EventTransformStreamSubscription<S, T>
1162 extends _BufferingStreamSubscription<T> {
1163 /** The transformer used to transform events. */
1164 final StreamEventTransformer<S, T> _transformer;
1165
1166 /** Whether this stream has sent a done event. */
1167 bool _isClosed = false;
1168
1169 /** Source of incoming events. */
1170 StreamSubscription<S> _subscription;
1171
1172 /** Cached EventSink wrapper for this class. */
1173 EventSink<T> _sink;
1174
1175 _EventTransformStreamSubscription(Stream<S> source,
1176 this._transformer,
1177 void onData(T data),
1178 Function onError,
1179 void onDone(),
1180 bool cancelOnError)
1181 : super(onData, onError, onDone, cancelOnError) {
1182 _sink = new _EventSinkAdapter<T>(this);
1183 _subscription = source.listen(_handleData,
1184 onError: _handleError,
1185 onDone: _handleDone);
1186 }
1187
1188 /** Whether this subscription is still subscribed to its source. */
1189 bool get _isSubscribed => _subscription != null;
1190
1191 void _onPause() {
1192 if (_isSubscribed) _subscription.pause();
1193 }
1194
1195 void _onResume() {
1196 if (_isSubscribed) _subscription.resume();
1197 }
1198
1199 void _onCancel() {
1200 if (_isSubscribed) {
1201 StreamSubscription subscription = _subscription;
1202 _subscription = null;
1203 subscription.cancel();
1204 }
1205 _isClosed = true;
1206 }
1207
1208 void _handleData(S data) {
1209 try {
1210 _transformer.handleData(data, _sink);
1211 } catch (e, s) {
1212 _addError(_asyncError(e, s), s);
1213 }
1214 }
1215
1216 void _handleError(error, [stackTrace]) {
1217 try {
1218 _transformer.handleError(error, _sink);
1219 } catch (e, s) {
1220 if (identical(e, error)) {
1221 _addError(error, stackTrace);
1222 } else {
1223 _addError(_asyncError(e, s), s);
1224 }
1225 }
1226 }
1227
1228 void _handleDone() {
1229 try {
1230 _subscription = null;
1231 _transformer.handleDone(_sink);
1232 } catch (e, s) {
1233 _addError(_asyncError(e, s), s);
1234 }
1235 }
1236 }
1237
1238 class _EventSinkAdapter<T> implements EventSink<T> {
1239 _EventSink _sink;
1240 _EventSinkAdapter(this._sink);
1241
1242 void add(T data) { _sink._add(data); }
1243 void addError(error, [StackTrace stackTrace]) {
1244 _sink._addError(error, stackTrace);
1245 }
1246 void close() { _sink._close(); }
1247 }
1248
1249
1250 /** 1151 /**
1251 * An [Iterable] like interface for the values of a [Stream]. 1152 * An [Iterable] like interface for the values of a [Stream].
1252 * 1153 *
1253 * This wraps a [Stream] and a subscription on the stream. It listens 1154 * This wraps a [Stream] and a subscription on the stream. It listens
1254 * on the stream, and completes the future returned by [moveNext] when the 1155 * on the stream, and completes the future returned by [moveNext] when the
1255 * next value becomes available. 1156 * next value becomes available.
1256 * 1157 *
1257 * NOTICE: This is a tentative design. This class may change. 1158 * NOTICE: This is a tentative design. This class may change.
1258 */ 1159 */
1259 abstract class StreamIterator<T> { 1160 abstract class StreamIterator<T> {
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after
1291 * 1192 *
1292 * If a [moveNext] call has been made, it will complete with `false` as value, 1193 * If a [moveNext] call has been made, it will complete with `false` as value,
1293 * as will all further calls to [moveNext]. 1194 * as will all further calls to [moveNext].
1294 * 1195 *
1295 * If you need to stop listening for values before the stream iterator is 1196 * If you need to stop listening for values before the stream iterator is
1296 * automatically closed, you must call [cancel] to ensure that the stream 1197 * automatically closed, you must call [cancel] to ensure that the stream
1297 * is properly closed. 1198 * is properly closed.
1298 */ 1199 */
1299 void cancel(); 1200 void cancel();
1300 } 1201 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698