Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(340)

Side by Side Diff: sdk/lib/async/stream.dart

Issue 25094002: Adapt streams for additional stackTrace argument. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Address comments. Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Core Stream types 8 // Core Stream types
9 // ------------------------------------------------------------------- 9 // -------------------------------------------------------------------
10 10
(...skipping 173 matching lines...) Expand 10 before | Expand all | Expand 10 after
184 184
185 /** 185 /**
186 * Adds a subscription to this stream. 186 * Adds a subscription to this stream.
187 * 187 *
188 * On each data event from this stream, the subscriber's [onData] handler 188 * On each data event from this stream, the subscriber's [onData] handler
189 * is called. If [onData] is null, nothing happens. 189 * is called. If [onData] is null, nothing happens.
190 * 190 *
191 * On errors from this stream, the [onError] handler is given a 191 * On errors from this stream, the [onError] handler is given a
192 * object describing the error. 192 * object describing the error.
193 * 193 *
194 * `void onError(error, StackTrace stackTrace)`. If [onError] accepts
195 * two arguments it is called with the stack trace (which could be `null` if
196 * the stream itself received an error without stack trace).
197 * Otherwise it is called with just the error object.
198 *
194 * If this stream closes, the [onDone] handler is called. 199 * If this stream closes, the [onDone] handler is called.
195 * 200 *
196 * If [cancelOnError] is true, the subscription is ended when 201 * If [cancelOnError] is true, the subscription is ended when
197 * the first error is reported. The default is false. 202 * the first error is reported. The default is false.
198 */ 203 */
199 StreamSubscription<T> listen(void onData(T event), 204 StreamSubscription<T> listen(void onData(T event),
200 { void onError(error), 205 { Function onError,
201 void onDone(), 206 void onDone(),
202 bool cancelOnError}); 207 bool cancelOnError});
203 208
204 /** 209 /**
205 * Creates a new stream from this stream that discards some data events. 210 * Creates a new stream from this stream that discards some data events.
206 * 211 *
207 * The new stream sends the same error and done events as this stream, 212 * The new stream sends the same error and done events as this stream,
208 * but it only sends the data events that satisfy the [test]. 213 * but it only sends the data events that satisfy the [test].
209 */ 214 */
210 Stream<T> where(bool test(T event)) { 215 Stream<T> where(bool test(T event)) {
211 return new _WhereStream<T>(this, test); 216 return new _WhereStream<T>(this, test);
212 } 217 }
213 218
214 /** 219 /**
215 * Creates a new stream that converts each element of this stream 220 * Creates a new stream that converts each element of this stream
216 * to a new value using the [convert] function. 221 * to a new value using the [convert] function.
217 */ 222 */
218 Stream map(convert(T event)) { 223 Stream map(convert(T event)) {
219 return new _MapStream<T, dynamic>(this, convert); 224 return new _MapStream<T, dynamic>(this, convert);
220 } 225 }
221 226
222 /** 227 /**
223 * Creates a wrapper Stream that intercepts some errors from this stream. 228 * Creates a wrapper Stream that intercepts some errors from this stream.
224 * 229 *
225 * If this stream sends an error that matches [test], then it is intercepted 230 * If this stream sends an error that matches [test], then it is intercepted
226 * by the [handle] function. 231 * by the [handle] function.
227 * 232 *
233 * The [onError] callback must be of type `void onError(error)` or
234 * `void onError(error, StackTrace stackTrace)`. Depending on the function
235 * type the the stream either invokes [onError] with or without a stack
236 * trace. The stack trace argument might be `null` if the stream itself
237 * received an error without stack trace.
238 *
228 * An [AsyncError] [:e:] is matched by a test function if [:test(e):] returns 239 * An [AsyncError] [:e:] is matched by a test function if [:test(e):] returns
229 * true. If [test] is omitted, every error is considered matching. 240 * true. If [test] is omitted, every error is considered matching.
230 * 241 *
231 * If the error is intercepted, the [handle] function can decide what to do 242 * If the error is intercepted, the [handle] function can decide what to do
232 * with it. It can throw if it wants to raise a new (or the same) error, 243 * with it. It can throw if it wants to raise a new (or the same) error,
233 * or simply return to make the stream forget the error. 244 * or simply return to make the stream forget the error.
234 * 245 *
235 * If you need to transform an error into a data event, use the more generic 246 * If you need to transform an error into a data event, use the more generic
236 * [Stream.transform] to handle the event by writing a data event to 247 * [Stream.transform] to handle the event by writing a data event to
237 * the output sink 248 * the output sink
238 */ 249 */
239 Stream<T> handleError(void handle( error), { bool test(error) }) { 250 Stream<T> handleError(Function onError, { bool test(error) }) {
240 return new _HandleErrorStream<T>(this, handle, test); 251 return new _HandleErrorStream<T>(this, onError, test);
241 } 252 }
242 253
243 /** 254 /**
244 * Creates a new stream from this stream that converts each element 255 * Creates a new stream from this stream that converts each element
245 * into zero or more events. 256 * into zero or more events.
246 * 257 *
247 * Each incoming event is converted to an [Iterable] of new events, 258 * Each incoming event is converted to an [Iterable] of new events,
248 * and each of these new events are then sent by the returned stream 259 * and each of these new events are then sent by the returned stream
249 * in order. 260 * in order.
250 */ 261 */
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after
306 var value = initialValue; 317 var value = initialValue;
307 StreamSubscription subscription; 318 StreamSubscription subscription;
308 subscription = this.listen( 319 subscription = this.listen(
309 (T element) { 320 (T element) {
310 _runUserCode( 321 _runUserCode(
311 () => combine(value, element), 322 () => combine(value, element),
312 (newValue) { value = newValue; }, 323 (newValue) { value = newValue; },
313 _cancelAndError(subscription, result) 324 _cancelAndError(subscription, result)
314 ); 325 );
315 }, 326 },
316 onError: (e) { 327 onError: (e, st) {
317 result._completeError(e); 328 result._completeError(e, st);
318 }, 329 },
319 onDone: () { 330 onDone: () {
320 result._complete(value); 331 result._complete(value);
321 }, 332 },
322 cancelOnError: true); 333 cancelOnError: true);
323 return result; 334 return result;
324 } 335 }
325 336
326 /** 337 /**
327 * Collects string of data events' string representations. 338 * Collects string of data events' string representations.
(...skipping 552 matching lines...) Expand 10 before | Expand all | Expand 10 after
880 * 891 *
881 * If an event is currently firing, this unsubscription will only 892 * If an event is currently firing, this unsubscription will only
882 * take effect after all subscribers have received the current event. 893 * take effect after all subscribers have received the current event.
883 */ 894 */
884 void cancel(); 895 void cancel();
885 896
886 /** Set or override the data event handler of this subscription. */ 897 /** Set or override the data event handler of this subscription. */
887 void onData(void handleData(T data)); 898 void onData(void handleData(T data));
888 899
889 /** Set or override the error event handler of this subscription. */ 900 /** Set or override the error event handler of this subscription. */
890 void onError(void handleError(error)); 901 void onError(Function handleError);
891 902
892 /** Set or override the done event handler of this subscription. */ 903 /** Set or override the done event handler of this subscription. */
893 void onDone(void handleDone()); 904 void onDone(void handleDone());
894 905
895 /** 906 /**
896 * Request that the stream pauses events until further notice. 907 * Request that the stream pauses events until further notice.
897 * 908 *
898 * If [resumeSignal] is provided, the stream will undo the pause 909 * If [resumeSignal] is provided, the stream will undo the pause
899 * when the future completes. If the future completes with an error, 910 * when the future completes. If the future completes with an error,
900 * it will not be handled! 911 * it will not be handled!
(...skipping 50 matching lines...) Expand 10 before | Expand all | Expand 10 after
951 962
952 StreamView(this._stream); 963 StreamView(this._stream);
953 964
954 bool get isBroadcast => _stream.isBroadcast; 965 bool get isBroadcast => _stream.isBroadcast;
955 966
956 Stream<T> asBroadcastStream({void onListen(StreamSubscription subscription), 967 Stream<T> asBroadcastStream({void onListen(StreamSubscription subscription),
957 void onCancel(StreamSubscription subscription)}) 968 void onCancel(StreamSubscription subscription)})
958 => _stream.asBroadcastStream(onListen: onListen, onCancel: onCancel); 969 => _stream.asBroadcastStream(onListen: onListen, onCancel: onCancel);
959 970
960 StreamSubscription<T> listen(void onData(T value), 971 StreamSubscription<T> listen(void onData(T value),
961 { void onError(error), 972 { Function onError,
962 void onDone(), 973 void onDone(),
963 bool cancelOnError }) { 974 bool cancelOnError }) {
964 return _stream.listen(onData, onError: onError, onDone: onDone, 975 return _stream.listen(onData, onError: onError, onDone: onDone,
965 cancelOnError: cancelOnError); 976 cancelOnError: cancelOnError);
966 } 977 }
967 } 978 }
968 979
969 980
970 /** 981 /**
971 * The target of a [Stream.pipe] call. 982 * The target of a [Stream.pipe] call.
(...skipping 62 matching lines...) Expand 10 before | Expand all | Expand 10 after
1034 * 1045 *
1035 * stringStream.transform(new StreamTransformer<String, String>( 1046 * stringStream.transform(new StreamTransformer<String, String>(
1036 * handleData: (String value, EventSink<String> sink) { 1047 * handleData: (String value, EventSink<String> sink) {
1037 * sink.add(value); 1048 * sink.add(value);
1038 * sink.add(value); // Duplicate the incoming events. 1049 * sink.add(value); // Duplicate the incoming events.
1039 * })); 1050 * }));
1040 * 1051 *
1041 */ 1052 */
1042 factory StreamTransformer({ 1053 factory StreamTransformer({
1043 void handleData(S data, EventSink<T> sink), 1054 void handleData(S data, EventSink<T> sink),
1044 void handleError(error, EventSink<T> sink), 1055 Function handleError,
1045 void handleDone(EventSink<T> sink)}) { 1056 void handleDone(EventSink<T> sink)}) {
1046 return new _StreamTransformerImpl<S, T>(handleData, 1057 return new _StreamTransformerImpl<S, T>(handleData,
1047 handleError, 1058 handleError,
1048 handleDone); 1059 handleDone);
1049 } 1060 }
1050 1061
1051 Stream<T> bind(Stream<S> stream); 1062 Stream<T> bind(Stream<S> stream);
1052 } 1063 }
1053 1064
1054 1065
(...skipping 72 matching lines...) Expand 10 before | Expand all | Expand 10 after
1127 * events on this stream. 1138 * events on this stream.
1128 */ 1139 */
1129 class EventTransformStream<S, T> extends Stream<T> { 1140 class EventTransformStream<S, T> extends Stream<T> {
1130 final Stream<S> _source; 1141 final Stream<S> _source;
1131 final StreamEventTransformer _transformer; 1142 final StreamEventTransformer _transformer;
1132 EventTransformStream(Stream<S> source, 1143 EventTransformStream(Stream<S> source,
1133 StreamEventTransformer<S, T> transformer) 1144 StreamEventTransformer<S, T> transformer)
1134 : _source = source, _transformer = transformer; 1145 : _source = source, _transformer = transformer;
1135 1146
1136 StreamSubscription<T> listen(void onData(T data), 1147 StreamSubscription<T> listen(void onData(T data),
1137 { void onError(error), 1148 { Function onError,
1138 void onDone(), 1149 void onDone(),
1139 bool cancelOnError }) { 1150 bool cancelOnError }) {
1140 if (onData == null) onData = _nullDataHandler; 1151 if (onData == null) onData = _nullDataHandler;
1141 if (onError == null) onError = _nullErrorHandler; 1152 if (onError == null) onError = _nullErrorHandler;
1142 if (onDone == null) onDone = _nullDoneHandler; 1153 if (onDone == null) onDone = _nullDoneHandler;
1143 cancelOnError = identical(true, cancelOnError); 1154 cancelOnError = identical(true, cancelOnError);
1144 return new _EventTransformStreamSubscription(_source, _transformer, 1155 return new _EventTransformStreamSubscription(_source, _transformer,
1145 onData, onError, onDone, 1156 onData, onError, onDone,
1146 cancelOnError); 1157 cancelOnError);
1147 } 1158 }
1148 } 1159 }
1149 1160
1150 class _EventTransformStreamSubscription<S, T> 1161 class _EventTransformStreamSubscription<S, T>
1151 extends _BufferingStreamSubscription<T> { 1162 extends _BufferingStreamSubscription<T> {
1152 /** The transformer used to transform events. */ 1163 /** The transformer used to transform events. */
1153 final StreamEventTransformer<S, T> _transformer; 1164 final StreamEventTransformer<S, T> _transformer;
1154 1165
1155 /** Whether this stream has sent a done event. */ 1166 /** Whether this stream has sent a done event. */
1156 bool _isClosed = false; 1167 bool _isClosed = false;
1157 1168
1158 /** Source of incoming events. */ 1169 /** Source of incoming events. */
1159 StreamSubscription<S> _subscription; 1170 StreamSubscription<S> _subscription;
1160 1171
1161 /** Cached EventSink wrapper for this class. */ 1172 /** Cached EventSink wrapper for this class. */
1162 EventSink<T> _sink; 1173 EventSink<T> _sink;
1163 1174
1164 _EventTransformStreamSubscription(Stream<S> source, 1175 _EventTransformStreamSubscription(Stream<S> source,
1165 this._transformer, 1176 this._transformer,
1166 void onData(T data), 1177 void onData(T data),
1167 void onError(error), 1178 Function onError,
1168 void onDone(), 1179 void onDone(),
1169 bool cancelOnError) 1180 bool cancelOnError)
1170 : super(onData, onError, onDone, cancelOnError) { 1181 : super(onData, onError, onDone, cancelOnError) {
1171 _sink = new _EventSinkAdapter<T>(this); 1182 _sink = new _EventSinkAdapter<T>(this);
1172 _subscription = source.listen(_handleData, 1183 _subscription = source.listen(_handleData,
1173 onError: _handleError, 1184 onError: _handleError,
1174 onDone: _handleDone); 1185 onDone: _handleDone);
1175 } 1186 }
1176 1187
1177 /** Whether this subscription is still subscribed to its source. */ 1188 /** Whether this subscription is still subscribed to its source. */
(...skipping 13 matching lines...) Expand all
1191 _subscription = null; 1202 _subscription = null;
1192 subscription.cancel(); 1203 subscription.cancel();
1193 } 1204 }
1194 _isClosed = true; 1205 _isClosed = true;
1195 } 1206 }
1196 1207
1197 void _handleData(S data) { 1208 void _handleData(S data) {
1198 try { 1209 try {
1199 _transformer.handleData(data, _sink); 1210 _transformer.handleData(data, _sink);
1200 } catch (e, s) { 1211 } catch (e, s) {
1201 _addError(_asyncError(e, s)); 1212 _addError(_asyncError(e, s), s);
1202 } 1213 }
1203 } 1214 }
1204 1215
1205 void _handleError(error) { 1216 void _handleError(error, [stackTrace]) {
1206 try { 1217 try {
1207 _transformer.handleError(error, _sink); 1218 _transformer.handleError(error, _sink);
1208 } catch (e, s) { 1219 } catch (e, s) {
1209 _addError(_asyncError(e, s)); 1220 if (identical(e, error)) {
1221 _addError(error, stackTrace);
1222 } else {
1223 _addError(_asyncError(e, s), s);
1224 }
1210 } 1225 }
1211 } 1226 }
1212 1227
1213 void _handleDone() { 1228 void _handleDone() {
1214 try { 1229 try {
1215 _subscription = null; 1230 _subscription = null;
1216 _transformer.handleDone(_sink); 1231 _transformer.handleDone(_sink);
1217 } catch (e, s) { 1232 } catch (e, s) {
1218 _addError(_asyncError(e, s)); 1233 _addError(_asyncError(e, s), s);
1219 } 1234 }
1220 } 1235 }
1221 } 1236 }
1222 1237
1223 class _EventSinkAdapter<T> implements EventSink<T> { 1238 class _EventSinkAdapter<T> implements EventSink<T> {
1224 _EventSink _sink; 1239 _EventSink _sink;
1225 _EventSinkAdapter(this._sink); 1240 _EventSinkAdapter(this._sink);
1226 1241
1227 void add(T data) { _sink._add(data); } 1242 void add(T data) { _sink._add(data); }
1228 void addError(error) { _sink._addError(error); } 1243 void addError(error, [StackTrace stackTrace]) {
1244 _sink._addError(error, stackTrace);
1245 }
1229 void close() { _sink._close(); } 1246 void close() { _sink._close(); }
1230 } 1247 }
1231 1248
1232 1249
1233 /** 1250 /**
1234 * An [Iterable] like interface for the values of a [Stream]. 1251 * An [Iterable] like interface for the values of a [Stream].
1235 * 1252 *
1236 * This wraps a [Stream] and a subscription on the stream. It listens 1253 * This wraps a [Stream] and a subscription on the stream. It listens
1237 * on the stream, and completes the future returned by [moveNext] when the 1254 * on the stream, and completes the future returned by [moveNext] when the
1238 * next value becomes available. 1255 * next value becomes available.
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after
1274 * 1291 *
1275 * If a [moveNext] call has been made, it will complete with `false` as value, 1292 * If a [moveNext] call has been made, it will complete with `false` as value,
1276 * as will all further calls to [moveNext]. 1293 * as will all further calls to [moveNext].
1277 * 1294 *
1278 * If you need to stop listening for values before the stream iterator is 1295 * If you need to stop listening for values before the stream iterator is
1279 * automatically closed, you must call [cancel] to ensure that the stream 1296 * automatically closed, you must call [cancel] to ensure that the stream
1280 * is properly closed. 1297 * is properly closed.
1281 */ 1298 */
1282 void cancel(); 1299 void cancel();
1283 } 1300 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698