Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(66)

Side by Side Diff: sdk/lib/async/stream_pipe.dart

Issue 48483002: Remove deprecated parts of dart:async. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Address comments. Created 7 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 /**
8 * Utility function to attach a stack trace to an [error] if it doesn't have
9 * one already.
10 */
11 _asyncError(Object error, StackTrace stackTrace) {
12 if (stackTrace == null) return error;
13 if (getAttachedStackTrace(error) != null) return error;
14 _attachStackTrace(error, stackTrace);
15 return error;
16 }
17
18 /** Runs user code and takes actions depending on success or failure. */ 7 /** Runs user code and takes actions depending on success or failure. */
19 _runUserCode(userCode(), 8 _runUserCode(userCode(),
20 onSuccess(value), 9 onSuccess(value),
21 onError(error, StackTrace stackTrace)) { 10 onError(error, StackTrace stackTrace)) {
22 try { 11 try {
23 onSuccess(userCode()); 12 onSuccess(userCode());
24 } catch (e, s) { 13 } catch (e, s) {
25 onError(_asyncError(e, s), s); 14 onError(e, s);
26 } 15 }
27 } 16 }
28 17
29 /** Helper function to cancel a subscription and wait for the potential future, 18 /** Helper function to cancel a subscription and wait for the potential future,
30 before completing with an error. */ 19 before completing with an error. */
31 void _cancelAndError(StreamSubscription subscription, 20 void _cancelAndError(StreamSubscription subscription,
32 _Future future, 21 _Future future,
33 error, 22 error,
34 StackTrace stackTrace) { 23 StackTrace stackTrace) {
35 var cancelFuture = subscription.cancel(); 24 var cancelFuture = subscription.cancel();
(...skipping 145 matching lines...) Expand 10 before | Expand all | Expand 10 after
181 final _Predicate<T> _test; 170 final _Predicate<T> _test;
182 171
183 _WhereStream(Stream<T> source, bool test(T value)) 172 _WhereStream(Stream<T> source, bool test(T value))
184 : _test = test, super(source); 173 : _test = test, super(source);
185 174
186 void _handleData(T inputEvent, _EventSink<T> sink) { 175 void _handleData(T inputEvent, _EventSink<T> sink) {
187 bool satisfies; 176 bool satisfies;
188 try { 177 try {
189 satisfies = _test(inputEvent); 178 satisfies = _test(inputEvent);
190 } catch (e, s) { 179 } catch (e, s) {
191 sink._addError(_asyncError(e, s), s); 180 sink._addError(e, s);
192 return; 181 return;
193 } 182 }
194 if (satisfies) { 183 if (satisfies) {
195 sink._add(inputEvent); 184 sink._add(inputEvent);
196 } 185 }
197 } 186 }
198 } 187 }
199 188
200 189
201 typedef T _Transformation<S, T>(S value); 190 typedef T _Transformation<S, T>(S value);
202 191
203 /** 192 /**
204 * A stream pipe that converts data events before passing them on. 193 * A stream pipe that converts data events before passing them on.
205 */ 194 */
206 class _MapStream<S, T> extends _ForwardingStream<S, T> { 195 class _MapStream<S, T> extends _ForwardingStream<S, T> {
207 final _Transformation _transform; 196 final _Transformation _transform;
208 197
209 _MapStream(Stream<S> source, T transform(S event)) 198 _MapStream(Stream<S> source, T transform(S event))
210 : this._transform = transform, super(source); 199 : this._transform = transform, super(source);
211 200
212 void _handleData(S inputEvent, _EventSink<T> sink) { 201 void _handleData(S inputEvent, _EventSink<T> sink) {
213 T outputEvent; 202 T outputEvent;
214 try { 203 try {
215 outputEvent = _transform(inputEvent); 204 outputEvent = _transform(inputEvent);
216 } catch (e, s) { 205 } catch (e, s) {
217 sink._addError(_asyncError(e, s), s); 206 sink._addError(e, s);
218 return; 207 return;
219 } 208 }
220 sink._add(outputEvent); 209 sink._add(outputEvent);
221 } 210 }
222 } 211 }
223 212
224 /** 213 /**
225 * A stream pipe that converts data events before passing them on. 214 * A stream pipe that converts data events before passing them on.
226 */ 215 */
227 class _ExpandStream<S, T> extends _ForwardingStream<S, T> { 216 class _ExpandStream<S, T> extends _ForwardingStream<S, T> {
228 final _Transformation<S, Iterable<T>> _expand; 217 final _Transformation<S, Iterable<T>> _expand;
229 218
230 _ExpandStream(Stream<S> source, Iterable<T> expand(S event)) 219 _ExpandStream(Stream<S> source, Iterable<T> expand(S event))
231 : this._expand = expand, super(source); 220 : this._expand = expand, super(source);
232 221
233 void _handleData(S inputEvent, _EventSink<T> sink) { 222 void _handleData(S inputEvent, _EventSink<T> sink) {
234 try { 223 try {
235 for (T value in _expand(inputEvent)) { 224 for (T value in _expand(inputEvent)) {
236 sink._add(value); 225 sink._add(value);
237 } 226 }
238 } catch (e, s) { 227 } catch (e, s) {
239 // If either _expand or iterating the generated iterator throws, 228 // If either _expand or iterating the generated iterator throws,
240 // we abort the iteration. 229 // we abort the iteration.
241 sink._addError(_asyncError(e, s), s); 230 sink._addError(e, s);
242 } 231 }
243 } 232 }
244 } 233 }
245 234
246 235
247 typedef bool _ErrorTest(error); 236 typedef bool _ErrorTest(error);
248 237
249 /** 238 /**
250 * A stream pipe that converts or disposes error events 239 * A stream pipe that converts or disposes error events
251 * before passing them on. 240 * before passing them on.
252 */ 241 */
253 class _HandleErrorStream<T> extends _ForwardingStream<T, T> { 242 class _HandleErrorStream<T> extends _ForwardingStream<T, T> {
254 final Function _transform; 243 final Function _transform;
255 final _ErrorTest _test; 244 final _ErrorTest _test;
256 245
257 _HandleErrorStream(Stream<T> source, 246 _HandleErrorStream(Stream<T> source,
258 Function onError, 247 Function onError,
259 bool test(error)) 248 bool test(error))
260 : this._transform = onError, this._test = test, super(source); 249 : this._transform = onError, this._test = test, super(source);
261 250
262 void _handleError(Object error, StackTrace stackTrace, _EventSink<T> sink) { 251 void _handleError(Object error, StackTrace stackTrace, _EventSink<T> sink) {
263 bool matches = true; 252 bool matches = true;
264 if (_test != null) { 253 if (_test != null) {
265 try { 254 try {
266 matches = _test(error); 255 matches = _test(error);
267 } catch (e, s) { 256 } catch (e, s) {
268 sink._addError(_asyncError(e, s), s); 257 sink._addError(e, s);
269 return; 258 return;
270 } 259 }
271 } 260 }
272 if (matches) { 261 if (matches) {
273 try { 262 try {
274 _invokeErrorHandler(_transform, error, stackTrace); 263 _invokeErrorHandler(_transform, error, stackTrace);
275 } catch (e, s) { 264 } catch (e, s) {
276 if (identical(e, error)) { 265 if (identical(e, error)) {
277 sink._addError(error, stackTrace); 266 sink._addError(error, stackTrace);
278 } else { 267 } else {
279 sink._addError(_asyncError(e, s), s); 268 sink._addError(e, s);
280 } 269 }
281 return; 270 return;
282 } 271 }
283 } else { 272 } else {
284 sink._addError(error, stackTrace); 273 sink._addError(error, stackTrace);
285 } 274 }
286 } 275 }
287 } 276 }
288 277
289 278
(...skipping 25 matching lines...) Expand all
315 final _Predicate<T> _test; 304 final _Predicate<T> _test;
316 305
317 _TakeWhileStream(Stream<T> source, bool test(T value)) 306 _TakeWhileStream(Stream<T> source, bool test(T value))
318 : this._test = test, super(source); 307 : this._test = test, super(source);
319 308
320 void _handleData(T inputEvent, _EventSink<T> sink) { 309 void _handleData(T inputEvent, _EventSink<T> sink) {
321 bool satisfies; 310 bool satisfies;
322 try { 311 try {
323 satisfies = _test(inputEvent); 312 satisfies = _test(inputEvent);
324 } catch (e, s) { 313 } catch (e, s) {
325 sink._addError(_asyncError(e, s), s); 314 sink._addError(e, s);
326 // The test didn't say true. Didn't say false either, but we stop anyway. 315 // The test didn't say true. Didn't say false either, but we stop anyway.
327 sink._close(); 316 sink._close();
328 return; 317 return;
329 } 318 }
330 if (satisfies) { 319 if (satisfies) {
331 sink._add(inputEvent); 320 sink._add(inputEvent);
332 } else { 321 } else {
333 sink._close(); 322 sink._close();
334 } 323 }
335 } 324 }
(...skipping 27 matching lines...) Expand all
363 352
364 void _handleData(T inputEvent, _EventSink<T> sink) { 353 void _handleData(T inputEvent, _EventSink<T> sink) {
365 if (_hasFailed) { 354 if (_hasFailed) {
366 sink._add(inputEvent); 355 sink._add(inputEvent);
367 return; 356 return;
368 } 357 }
369 bool satisfies; 358 bool satisfies;
370 try { 359 try {
371 satisfies = _test(inputEvent); 360 satisfies = _test(inputEvent);
372 } catch (e, s) { 361 } catch (e, s) {
373 sink._addError(_asyncError(e, s), s); 362 sink._addError(e, s);
374 // A failure to return a boolean is considered "not matching". 363 // A failure to return a boolean is considered "not matching".
375 _hasFailed = true; 364 _hasFailed = true;
376 return; 365 return;
377 } 366 }
378 if (!satisfies) { 367 if (!satisfies) {
379 _hasFailed = true; 368 _hasFailed = true;
380 sink._add(inputEvent); 369 sink._add(inputEvent);
381 } 370 }
382 } 371 }
383 } 372 }
(...skipping 15 matching lines...) Expand all
399 return sink._add(inputEvent); 388 return sink._add(inputEvent);
400 } else { 389 } else {
401 bool isEqual; 390 bool isEqual;
402 try { 391 try {
403 if (_equals == null) { 392 if (_equals == null) {
404 isEqual = (_previous == inputEvent); 393 isEqual = (_previous == inputEvent);
405 } else { 394 } else {
406 isEqual = _equals(_previous, inputEvent); 395 isEqual = _equals(_previous, inputEvent);
407 } 396 }
408 } catch (e, s) { 397 } catch (e, s) {
409 sink._addError(_asyncError(e, s), s); 398 sink._addError(e, s);
410 return null; 399 return null;
411 } 400 }
412 if (!isEqual) { 401 if (!isEqual) {
413 sink._add(inputEvent); 402 sink._add(inputEvent);
414 _previous = inputEvent; 403 _previous = inputEvent;
415 } 404 }
416 } 405 }
417 } 406 }
418 } 407 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698