Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(350)

Side by Side Diff: sdk/lib/async/stream_pipe.dart

Issue 25094002: Adapt streams for additional stackTrace argument. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Remove types in closures. Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « sdk/lib/async/stream_impl.dart ('k') | sdk/lib/convert/chunked_conversion.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 /** 7 /**
8 * Utility function to attach a stack trace to an [error] if it doesn't have 8 * Utility function to attach a stack trace to an [error] if it doesn't have
9 * one already. 9 * one already.
10 */ 10 */
11 _asyncError(Object error, Object stackTrace) { 11 _asyncError(Object error, Object stackTrace) {
12 if (stackTrace == null) return error; 12 if (stackTrace == null) return error;
13 if (getAttachedStackTrace(error) != null) return error; 13 if (getAttachedStackTrace(error) != null) return error;
14 _attachStackTrace(error, stackTrace); 14 _attachStackTrace(error, stackTrace);
15 return error; 15 return error;
16 } 16 }
17 17
18 /** Runs user code and takes actions depending on success or failure. */ 18 /** Runs user code and takes actions depending on success or failure. */
19 _runUserCode(userCode(), onSuccess(value), onError(error)) { 19 _runUserCode(userCode(),
20 onSuccess(value),
21 onError(error, StackTrace stackTrace)) {
20 try { 22 try {
21 onSuccess(userCode()); 23 onSuccess(userCode());
22 } catch (e, s) { 24 } catch (e, s) {
23 onError(_asyncError(e, s)); 25 onError(_asyncError(e, s), s);
24 } 26 }
25 } 27 }
26 28
27 /** Helper function to make an onError argument to [_runUserCode]. */ 29 /** Helper function to make an onError argument to [_runUserCode]. */
28 _cancelAndError(StreamSubscription subscription, _Future future) => 30 _cancelAndError(StreamSubscription subscription, _Future future) =>
29 (error) { 31 (error, StackTrace stackTrace) {
30 subscription.cancel(); 32 subscription.cancel();
31 future._completeError(error); 33 future._completeError(error, stackTrace);
32 }; 34 };
33 35
34 36
35 /** 37 /**
36 * A [Stream] that forwards subscriptions to another stream. 38 * A [Stream] that forwards subscriptions to another stream.
37 * 39 *
38 * This stream implements [Stream], but forwards all subscriptions 40 * This stream implements [Stream], but forwards all subscriptions
39 * to an underlying stream, and wraps the returned subscription to 41 * to an underlying stream, and wraps the returned subscription to
40 * modify the events on the way. 42 * modify the events on the way.
41 * 43 *
42 * This class is intended for internal use only. 44 * This class is intended for internal use only.
43 */ 45 */
44 abstract class _ForwardingStream<S, T> extends Stream<T> { 46 abstract class _ForwardingStream<S, T> extends Stream<T> {
45 final Stream<S> _source; 47 final Stream<S> _source;
46 48
47 _ForwardingStream(this._source); 49 _ForwardingStream(this._source);
48 50
49 bool get isBroadcast => _source.isBroadcast; 51 bool get isBroadcast => _source.isBroadcast;
50 52
51 StreamSubscription<T> listen(void onData(T value), 53 StreamSubscription<T> listen(void onData(T value),
52 { void onError(error), 54 { Function onError,
53 void onDone(), 55 void onDone(),
54 bool cancelOnError }) { 56 bool cancelOnError }) {
55 if (onData == null) onData = _nullDataHandler; 57 if (onData == null) onData = _nullDataHandler;
56 if (onError == null) onError = _nullErrorHandler; 58 if (onError == null) onError = _nullErrorHandler;
57 if (onDone == null) onDone = _nullDoneHandler; 59 if (onDone == null) onDone = _nullDoneHandler;
58 cancelOnError = identical(true, cancelOnError); 60 cancelOnError = identical(true, cancelOnError);
59 return _createSubscription(onData, onError, onDone, cancelOnError); 61 return _createSubscription(onData, onError, onDone, cancelOnError);
60 } 62 }
61 63
62 StreamSubscription<T> _createSubscription(void onData(T value), 64 StreamSubscription<T> _createSubscription(void onData(T value),
63 void onError(error), 65 Function onError,
64 void onDone(), 66 void onDone(),
65 bool cancelOnError) { 67 bool cancelOnError) {
66 return new _ForwardingStreamSubscription<S, T>( 68 return new _ForwardingStreamSubscription<S, T>(
67 this, onData, onError, onDone, cancelOnError); 69 this, onData, onError, onDone, cancelOnError);
68 } 70 }
69 71
70 // Override the following methods in subclasses to change the behavior. 72 // Override the following methods in subclasses to change the behavior.
71 73
72 void _handleData(S data, _EventSink<T> sink) { 74 void _handleData(S data, _EventSink<T> sink) {
73 var outputData = data; 75 var outputData = data;
74 sink._add(outputData); 76 sink._add(outputData);
75 } 77 }
76 78
77 void _handleError(error, _EventSink<T> sink) { 79 void _handleError(error, StackTrace stackTrace, _EventSink<T> sink) {
78 sink._addError(error); 80 sink._addError(error, stackTrace);
79 } 81 }
80 82
81 void _handleDone(_EventSink<T> sink) { 83 void _handleDone(_EventSink<T> sink) {
82 sink._close(); 84 sink._close();
83 } 85 }
84 } 86 }
85 87
86 /** 88 /**
87 * Abstract superclass for subscriptions that forward to other subscriptions. 89 * Abstract superclass for subscriptions that forward to other subscriptions.
88 */ 90 */
89 class _ForwardingStreamSubscription<S, T> 91 class _ForwardingStreamSubscription<S, T>
90 extends _BufferingStreamSubscription<T> { 92 extends _BufferingStreamSubscription<T> {
91 final _ForwardingStream<S, T> _stream; 93 final _ForwardingStream<S, T> _stream;
92 94
93 StreamSubscription<S> _subscription; 95 StreamSubscription<S> _subscription;
94 96
95 _ForwardingStreamSubscription(this._stream, 97 _ForwardingStreamSubscription(this._stream,
96 void onData(T data), 98 void onData(T data),
97 void onError(error), 99 Function onError,
98 void onDone(), 100 void onDone(),
99 bool cancelOnError) 101 bool cancelOnError)
100 : super(onData, onError, onDone, cancelOnError) { 102 : super(onData, onError, onDone, cancelOnError) {
101 _subscription = 103 _subscription =
102 _stream._source.listen(_handleData, 104 _stream._source.listen(_handleData,
103 onError: _handleError, 105 onError: _handleError,
104 onDone: _handleDone); 106 onDone: _handleDone);
105 } 107 }
106 108
107 // _StreamSink interface. 109 // _StreamSink interface.
108 // Transformers sending more than one event have no way to know if the stream 110 // Transformers sending more than one event have no way to know if the stream
109 // is canceled or closed after the first, so we just ignore remaining events. 111 // is canceled or closed after the first, so we just ignore remaining events.
110 112
111 void _add(T data) { 113 void _add(T data) {
112 if (_isClosed) return; 114 if (_isClosed) return;
113 super._add(data); 115 super._add(data);
114 } 116 }
115 117
116 void _addError(Object error) { 118 void _addError(Object error, StackTrace stackTrace) {
117 if (_isClosed) return; 119 if (_isClosed) return;
118 super._addError(error); 120 super._addError(error, stackTrace);
119 } 121 }
120 122
121 // StreamSubscription callbacks. 123 // StreamSubscription callbacks.
122 124
123 void _onPause() { 125 void _onPause() {
124 if (_subscription == null) return; 126 if (_subscription == null) return;
125 _subscription.pause(); 127 _subscription.pause();
126 } 128 }
127 129
128 void _onResume() { 130 void _onResume() {
129 if (_subscription == null) return; 131 if (_subscription == null) return;
130 _subscription.resume(); 132 _subscription.resume();
131 } 133 }
132 134
133 void _onCancel() { 135 void _onCancel() {
134 if (_subscription != null) { 136 if (_subscription != null) {
135 StreamSubscription subscription = _subscription; 137 StreamSubscription subscription = _subscription;
136 _subscription = null; 138 _subscription = null;
137 subscription.cancel(); 139 subscription.cancel();
138 } 140 }
139 } 141 }
140 142
141 // Methods used as listener on source subscription. 143 // Methods used as listener on source subscription.
142 144
143 void _handleData(S data) { 145 void _handleData(S data) {
144 _stream._handleData(data, this); 146 _stream._handleData(data, this);
145 } 147 }
146 148
147 void _handleError(error) { 149 void _handleError(error, StackTrace stackTrace) {
148 _stream._handleError(error, this); 150 _stream._handleError(error, stackTrace, this);
149 } 151 }
150 152
151 void _handleDone() { 153 void _handleDone() {
152 _stream._handleDone(this); 154 _stream._handleDone(this);
153 } 155 }
154 } 156 }
155 157
156 // ------------------------------------------------------------------- 158 // -------------------------------------------------------------------
157 // Stream transformers used by the default Stream implementation. 159 // Stream transformers used by the default Stream implementation.
158 // ------------------------------------------------------------------- 160 // -------------------------------------------------------------------
159 161
160 typedef bool _Predicate<T>(T value); 162 typedef bool _Predicate<T>(T value);
161 163
162 class _WhereStream<T> extends _ForwardingStream<T, T> { 164 class _WhereStream<T> extends _ForwardingStream<T, T> {
163 final _Predicate<T> _test; 165 final _Predicate<T> _test;
164 166
165 _WhereStream(Stream<T> source, bool test(T value)) 167 _WhereStream(Stream<T> source, bool test(T value))
166 : _test = test, super(source); 168 : _test = test, super(source);
167 169
168 void _handleData(T inputEvent, _EventSink<T> sink) { 170 void _handleData(T inputEvent, _EventSink<T> sink) {
169 bool satisfies; 171 bool satisfies;
170 try { 172 try {
171 satisfies = _test(inputEvent); 173 satisfies = _test(inputEvent);
172 } catch (e, s) { 174 } catch (e, s) {
173 sink._addError(_asyncError(e, s)); 175 sink._addError(_asyncError(e, s), s);
174 return; 176 return;
175 } 177 }
176 if (satisfies) { 178 if (satisfies) {
177 sink._add(inputEvent); 179 sink._add(inputEvent);
178 } 180 }
179 } 181 }
180 } 182 }
181 183
182 184
183 typedef T _Transformation<S, T>(S value); 185 typedef T _Transformation<S, T>(S value);
184 186
185 /** 187 /**
186 * A stream pipe that converts data events before passing them on. 188 * A stream pipe that converts data events before passing them on.
187 */ 189 */
188 class _MapStream<S, T> extends _ForwardingStream<S, T> { 190 class _MapStream<S, T> extends _ForwardingStream<S, T> {
189 final _Transformation _transform; 191 final _Transformation _transform;
190 192
191 _MapStream(Stream<S> source, T transform(S event)) 193 _MapStream(Stream<S> source, T transform(S event))
192 : this._transform = transform, super(source); 194 : this._transform = transform, super(source);
193 195
194 void _handleData(S inputEvent, _EventSink<T> sink) { 196 void _handleData(S inputEvent, _EventSink<T> sink) {
195 T outputEvent; 197 T outputEvent;
196 try { 198 try {
197 outputEvent = _transform(inputEvent); 199 outputEvent = _transform(inputEvent);
198 } catch (e, s) { 200 } catch (e, s) {
199 sink._addError(_asyncError(e, s)); 201 sink._addError(_asyncError(e, s), s);
200 return; 202 return;
201 } 203 }
202 sink._add(outputEvent); 204 sink._add(outputEvent);
203 } 205 }
204 } 206 }
205 207
206 /** 208 /**
207 * A stream pipe that converts data events before passing them on. 209 * A stream pipe that converts data events before passing them on.
208 */ 210 */
209 class _ExpandStream<S, T> extends _ForwardingStream<S, T> { 211 class _ExpandStream<S, T> extends _ForwardingStream<S, T> {
210 final _Transformation<S, Iterable<T>> _expand; 212 final _Transformation<S, Iterable<T>> _expand;
211 213
212 _ExpandStream(Stream<S> source, Iterable<T> expand(S event)) 214 _ExpandStream(Stream<S> source, Iterable<T> expand(S event))
213 : this._expand = expand, super(source); 215 : this._expand = expand, super(source);
214 216
215 void _handleData(S inputEvent, _EventSink<T> sink) { 217 void _handleData(S inputEvent, _EventSink<T> sink) {
216 try { 218 try {
217 for (T value in _expand(inputEvent)) { 219 for (T value in _expand(inputEvent)) {
218 sink._add(value); 220 sink._add(value);
219 } 221 }
220 } catch (e, s) { 222 } catch (e, s) {
221 // If either _expand or iterating the generated iterator throws, 223 // If either _expand or iterating the generated iterator throws,
222 // we abort the iteration. 224 // we abort the iteration.
223 sink._addError(_asyncError(e, s)); 225 sink._addError(_asyncError(e, s), s);
224 } 226 }
225 } 227 }
226 } 228 }
227 229
228 230
229 typedef void _ErrorTransformation(error);
230 typedef bool _ErrorTest(error); 231 typedef bool _ErrorTest(error);
231 232
232 /** 233 /**
233 * A stream pipe that converts or disposes error events 234 * A stream pipe that converts or disposes error events
234 * before passing them on. 235 * before passing them on.
235 */ 236 */
236 class _HandleErrorStream<T> extends _ForwardingStream<T, T> { 237 class _HandleErrorStream<T> extends _ForwardingStream<T, T> {
237 final _ErrorTransformation _transform; 238 final Function _transform;
238 final _ErrorTest _test; 239 final _ErrorTest _test;
239 240
240 _HandleErrorStream(Stream<T> source, 241 _HandleErrorStream(Stream<T> source,
241 void transform(event), 242 Function onError,
242 bool test(error)) 243 bool test(error))
243 : this._transform = transform, this._test = test, super(source); 244 : this._transform = onError, this._test = test, super(source);
244 245
245 void _handleError(Object error, _EventSink<T> sink) { 246 void _handleError(Object error, StackTrace stackTrace, _EventSink<T> sink) {
246 bool matches = true; 247 bool matches = true;
247 if (_test != null) { 248 if (_test != null) {
248 try { 249 try {
249 matches = _test(error); 250 matches = _test(error);
250 } catch (e, s) { 251 } catch (e, s) {
251 sink._addError(_asyncError(e, s)); 252 sink._addError(_asyncError(e, s), s);
252 return; 253 return;
253 } 254 }
254 } 255 }
255 if (matches) { 256 if (matches) {
256 try { 257 try {
257 _transform(error); 258 _invokeErrorHandler(_transform, error, stackTrace);
258 } catch (e, s) { 259 } catch (e, s) {
259 sink._addError(_asyncError(e, s)); 260 if (identical(e, error)) {
261 sink._addError(error, stackTrace);
262 } else {
263 sink._addError(_asyncError(e, s), s);
264 }
260 return; 265 return;
261 } 266 }
262 } else { 267 } else {
263 sink._addError(error); 268 sink._addError(error, stackTrace);
264 } 269 }
265 } 270 }
266 } 271 }
267 272
268 273
269 class _TakeStream<T> extends _ForwardingStream<T, T> { 274 class _TakeStream<T> extends _ForwardingStream<T, T> {
270 int _remaining; 275 int _remaining;
271 276
272 _TakeStream(Stream<T> source, int count) 277 _TakeStream(Stream<T> source, int count)
273 : this._remaining = count, super(source) { 278 : this._remaining = count, super(source) {
(...skipping 20 matching lines...) Expand all
294 final _Predicate<T> _test; 299 final _Predicate<T> _test;
295 300
296 _TakeWhileStream(Stream<T> source, bool test(T value)) 301 _TakeWhileStream(Stream<T> source, bool test(T value))
297 : this._test = test, super(source); 302 : this._test = test, super(source);
298 303
299 void _handleData(T inputEvent, _EventSink<T> sink) { 304 void _handleData(T inputEvent, _EventSink<T> sink) {
300 bool satisfies; 305 bool satisfies;
301 try { 306 try {
302 satisfies = _test(inputEvent); 307 satisfies = _test(inputEvent);
303 } catch (e, s) { 308 } catch (e, s) {
304 sink._addError(_asyncError(e, s)); 309 sink._addError(_asyncError(e, s), s);
305 // The test didn't say true. Didn't say false either, but we stop anyway. 310 // The test didn't say true. Didn't say false either, but we stop anyway.
306 sink._close(); 311 sink._close();
307 return; 312 return;
308 } 313 }
309 if (satisfies) { 314 if (satisfies) {
310 sink._add(inputEvent); 315 sink._add(inputEvent);
311 } else { 316 } else {
312 sink._close(); 317 sink._close();
313 } 318 }
314 } 319 }
(...skipping 27 matching lines...) Expand all
342 347
343 void _handleData(T inputEvent, _EventSink<T> sink) { 348 void _handleData(T inputEvent, _EventSink<T> sink) {
344 if (_hasFailed) { 349 if (_hasFailed) {
345 sink._add(inputEvent); 350 sink._add(inputEvent);
346 return; 351 return;
347 } 352 }
348 bool satisfies; 353 bool satisfies;
349 try { 354 try {
350 satisfies = _test(inputEvent); 355 satisfies = _test(inputEvent);
351 } catch (e, s) { 356 } catch (e, s) {
352 sink._addError(_asyncError(e, s)); 357 sink._addError(_asyncError(e, s), s);
353 // A failure to return a boolean is considered "not matching". 358 // A failure to return a boolean is considered "not matching".
354 _hasFailed = true; 359 _hasFailed = true;
355 return; 360 return;
356 } 361 }
357 if (!satisfies) { 362 if (!satisfies) {
358 _hasFailed = true; 363 _hasFailed = true;
359 sink._add(inputEvent); 364 sink._add(inputEvent);
360 } 365 }
361 } 366 }
362 } 367 }
(...skipping 15 matching lines...) Expand all
378 return sink._add(inputEvent); 383 return sink._add(inputEvent);
379 } else { 384 } else {
380 bool isEqual; 385 bool isEqual;
381 try { 386 try {
382 if (_equals == null) { 387 if (_equals == null) {
383 isEqual = (_previous == inputEvent); 388 isEqual = (_previous == inputEvent);
384 } else { 389 } else {
385 isEqual = _equals(_previous, inputEvent); 390 isEqual = _equals(_previous, inputEvent);
386 } 391 }
387 } catch (e, s) { 392 } catch (e, s) {
388 sink._addError(_asyncError(e, s)); 393 sink._addError(_asyncError(e, s), s);
389 return null; 394 return null;
390 } 395 }
391 if (!isEqual) { 396 if (!isEqual) {
392 sink._add(inputEvent); 397 sink._add(inputEvent);
393 _previous = inputEvent; 398 _previous = inputEvent;
394 } 399 }
395 } 400 }
396 } 401 }
397 } 402 }
398 403
399 // Stream transformations and event transformations. 404 // Stream transformations and event transformations.
400 405
401 typedef void _TransformDataHandler<S, T>(S data, EventSink<T> sink); 406 typedef void _TransformDataHandler<S, T>(S data, EventSink<T> sink);
402 typedef void _TransformErrorHandler<T>(data, EventSink<T> sink); 407 typedef void _TransformErrorHandler<T>(Object error, EventSink<T> sink);
403 typedef void _TransformDoneHandler<T>(EventSink<T> sink); 408 typedef void _TransformDoneHandler<T>(EventSink<T> sink);
404 409
405 /** Default data handler forwards all data. */ 410 /** Default data handler forwards all data. */
406 void _defaultHandleData(var data, EventSink sink) { 411 void _defaultHandleData(var data, EventSink sink) {
407 sink.add(data); 412 sink.add(data);
408 } 413 }
409 414
410 /** Default error handler forwards all errors. */ 415 /** Default error handler forwards all errors. */
411 void _defaultHandleError(error, EventSink sink) { 416 void _defaultHandleError(error, EventSink sink) {
412 sink.addError(error); 417 sink.addError(error);
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after
449 454
450 void handleError(error, EventSink<T> sink) { 455 void handleError(error, EventSink<T> sink) {
451 _handleError(error, sink); 456 _handleError(error, sink);
452 } 457 }
453 458
454 void handleDone(EventSink<T> sink) { 459 void handleDone(EventSink<T> sink) {
455 _handleDone(sink); 460 _handleDone(sink);
456 } 461 }
457 } 462 }
458 463
OLDNEW
« no previous file with comments | « sdk/lib/async/stream_impl.dart ('k') | sdk/lib/convert/chunked_conversion.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698