Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(387)

Side by Side Diff: sdk/lib/async/stream_pipe.dart

Issue 48483002: Remove deprecated parts of dart:async. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 7 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 /**
8 * Utility function to attach a stack trace to an [error] if it doesn't have
9 * one already.
10 */
11 _asyncError(Object error, StackTrace stackTrace) {
12 if (stackTrace == null) return error;
13 if (getAttachedStackTrace(error) != null) return error;
14 _attachStackTrace(error, stackTrace);
15 return error;
16 }
17
18 /** Runs user code and takes actions depending on success or failure. */ 7 /** Runs user code and takes actions depending on success or failure. */
19 _runUserCode(userCode(), 8 _runUserCode(userCode(),
20 onSuccess(value), 9 onSuccess(value),
21 onError(error, StackTrace stackTrace)) { 10 onError(error, StackTrace stackTrace)) {
22 try { 11 try {
23 onSuccess(userCode()); 12 onSuccess(userCode());
24 } catch (e, s) { 13 } catch (e, s) {
25 onError(_asyncError(e, s), s); 14 onError(e, s);
26 } 15 }
27 } 16 }
28 17
29 /** Helper function to cancel a subscription and wait for the potential future, 18 /** Helper function to cancel a subscription and wait for the potential future,
30 before completing with an error. */ 19 before completing with an error. */
31 void _cancelAndError(StreamSubscription subscription, 20 void _cancelAndError(StreamSubscription subscription,
32 _Future future, 21 _Future future,
33 error, 22 error,
34 StackTrace stackTrace) { 23 StackTrace stackTrace) {
35 var cancelFuture = subscription.cancel(); 24 var cancelFuture = subscription.cancel();
(...skipping 144 matching lines...) Expand 10 before | Expand all | Expand 10 after
180 final _Predicate<T> _test; 169 final _Predicate<T> _test;
181 170
182 _WhereStream(Stream<T> source, bool test(T value)) 171 _WhereStream(Stream<T> source, bool test(T value))
183 : _test = test, super(source); 172 : _test = test, super(source);
184 173
185 void _handleData(T inputEvent, _EventSink<T> sink) { 174 void _handleData(T inputEvent, _EventSink<T> sink) {
186 bool satisfies; 175 bool satisfies;
187 try { 176 try {
188 satisfies = _test(inputEvent); 177 satisfies = _test(inputEvent);
189 } catch (e, s) { 178 } catch (e, s) {
190 sink._addError(_asyncError(e, s), s); 179 sink._addError(e, s);
191 return; 180 return;
192 } 181 }
193 if (satisfies) { 182 if (satisfies) {
194 sink._add(inputEvent); 183 sink._add(inputEvent);
195 } 184 }
196 } 185 }
197 } 186 }
198 187
199 188
200 typedef T _Transformation<S, T>(S value); 189 typedef T _Transformation<S, T>(S value);
201 190
202 /** 191 /**
203 * A stream pipe that converts data events before passing them on. 192 * A stream pipe that converts data events before passing them on.
204 */ 193 */
205 class _MapStream<S, T> extends _ForwardingStream<S, T> { 194 class _MapStream<S, T> extends _ForwardingStream<S, T> {
206 final _Transformation _transform; 195 final _Transformation _transform;
207 196
208 _MapStream(Stream<S> source, T transform(S event)) 197 _MapStream(Stream<S> source, T transform(S event))
209 : this._transform = transform, super(source); 198 : this._transform = transform, super(source);
210 199
211 void _handleData(S inputEvent, _EventSink<T> sink) { 200 void _handleData(S inputEvent, _EventSink<T> sink) {
212 T outputEvent; 201 T outputEvent;
213 try { 202 try {
214 outputEvent = _transform(inputEvent); 203 outputEvent = _transform(inputEvent);
215 } catch (e, s) { 204 } catch (e, s) {
216 sink._addError(_asyncError(e, s), s); 205 sink._addError(e, s);
217 return; 206 return;
218 } 207 }
219 sink._add(outputEvent); 208 sink._add(outputEvent);
220 } 209 }
221 } 210 }
222 211
223 /** 212 /**
224 * A stream pipe that converts data events before passing them on. 213 * A stream pipe that converts data events before passing them on.
225 */ 214 */
226 class _ExpandStream<S, T> extends _ForwardingStream<S, T> { 215 class _ExpandStream<S, T> extends _ForwardingStream<S, T> {
227 final _Transformation<S, Iterable<T>> _expand; 216 final _Transformation<S, Iterable<T>> _expand;
228 217
229 _ExpandStream(Stream<S> source, Iterable<T> expand(S event)) 218 _ExpandStream(Stream<S> source, Iterable<T> expand(S event))
230 : this._expand = expand, super(source); 219 : this._expand = expand, super(source);
231 220
232 void _handleData(S inputEvent, _EventSink<T> sink) { 221 void _handleData(S inputEvent, _EventSink<T> sink) {
233 try { 222 try {
234 for (T value in _expand(inputEvent)) { 223 for (T value in _expand(inputEvent)) {
235 sink._add(value); 224 sink._add(value);
236 } 225 }
237 } catch (e, s) { 226 } catch (e, s) {
238 // If either _expand or iterating the generated iterator throws, 227 // If either _expand or iterating the generated iterator throws,
239 // we abort the iteration. 228 // we abort the iteration.
240 sink._addError(_asyncError(e, s), s); 229 sink._addError(e, s);
241 } 230 }
242 } 231 }
243 } 232 }
244 233
245 234
246 typedef bool _ErrorTest(error); 235 typedef bool _ErrorTest(error);
247 236
248 /** 237 /**
249 * A stream pipe that converts or disposes error events 238 * A stream pipe that converts or disposes error events
250 * before passing them on. 239 * before passing them on.
251 */ 240 */
252 class _HandleErrorStream<T> extends _ForwardingStream<T, T> { 241 class _HandleErrorStream<T> extends _ForwardingStream<T, T> {
253 final Function _transform; 242 final Function _transform;
254 final _ErrorTest _test; 243 final _ErrorTest _test;
255 244
256 _HandleErrorStream(Stream<T> source, 245 _HandleErrorStream(Stream<T> source,
257 Function onError, 246 Function onError,
258 bool test(error)) 247 bool test(error))
259 : this._transform = onError, this._test = test, super(source); 248 : this._transform = onError, this._test = test, super(source);
260 249
261 void _handleError(Object error, StackTrace stackTrace, _EventSink<T> sink) { 250 void _handleError(Object error, StackTrace stackTrace, _EventSink<T> sink) {
262 bool matches = true; 251 bool matches = true;
263 if (_test != null) { 252 if (_test != null) {
264 try { 253 try {
265 matches = _test(error); 254 matches = _test(error);
266 } catch (e, s) { 255 } catch (e, s) {
267 sink._addError(_asyncError(e, s), s); 256 sink._addError(e, s);
268 return; 257 return;
269 } 258 }
270 } 259 }
271 if (matches) { 260 if (matches) {
272 try { 261 try {
273 _invokeErrorHandler(_transform, error, stackTrace); 262 _invokeErrorHandler(_transform, error, stackTrace);
274 } catch (e, s) { 263 } catch (e, s) {
275 if (identical(e, error)) { 264 if (identical(e, error)) {
276 sink._addError(error, stackTrace); 265 sink._addError(error, stackTrace);
277 } else { 266 } else {
278 sink._addError(_asyncError(e, s), s); 267 sink._addError(e, s);
279 } 268 }
280 return; 269 return;
281 } 270 }
282 } else { 271 } else {
283 sink._addError(error, stackTrace); 272 sink._addError(error, stackTrace);
284 } 273 }
285 } 274 }
286 } 275 }
287 276
288 277
(...skipping 25 matching lines...) Expand all
314 final _Predicate<T> _test; 303 final _Predicate<T> _test;
315 304
316 _TakeWhileStream(Stream<T> source, bool test(T value)) 305 _TakeWhileStream(Stream<T> source, bool test(T value))
317 : this._test = test, super(source); 306 : this._test = test, super(source);
318 307
319 void _handleData(T inputEvent, _EventSink<T> sink) { 308 void _handleData(T inputEvent, _EventSink<T> sink) {
320 bool satisfies; 309 bool satisfies;
321 try { 310 try {
322 satisfies = _test(inputEvent); 311 satisfies = _test(inputEvent);
323 } catch (e, s) { 312 } catch (e, s) {
324 sink._addError(_asyncError(e, s), s); 313 sink._addError(e, s);
325 // The test didn't say true. Didn't say false either, but we stop anyway. 314 // The test didn't say true. Didn't say false either, but we stop anyway.
326 sink._close(); 315 sink._close();
327 return; 316 return;
328 } 317 }
329 if (satisfies) { 318 if (satisfies) {
330 sink._add(inputEvent); 319 sink._add(inputEvent);
331 } else { 320 } else {
332 sink._close(); 321 sink._close();
333 } 322 }
334 } 323 }
(...skipping 27 matching lines...) Expand all
362 351
363 void _handleData(T inputEvent, _EventSink<T> sink) { 352 void _handleData(T inputEvent, _EventSink<T> sink) {
364 if (_hasFailed) { 353 if (_hasFailed) {
365 sink._add(inputEvent); 354 sink._add(inputEvent);
366 return; 355 return;
367 } 356 }
368 bool satisfies; 357 bool satisfies;
369 try { 358 try {
370 satisfies = _test(inputEvent); 359 satisfies = _test(inputEvent);
371 } catch (e, s) { 360 } catch (e, s) {
372 sink._addError(_asyncError(e, s), s); 361 sink._addError(e, s);
373 // A failure to return a boolean is considered "not matching". 362 // A failure to return a boolean is considered "not matching".
374 _hasFailed = true; 363 _hasFailed = true;
375 return; 364 return;
376 } 365 }
377 if (!satisfies) { 366 if (!satisfies) {
378 _hasFailed = true; 367 _hasFailed = true;
379 sink._add(inputEvent); 368 sink._add(inputEvent);
380 } 369 }
381 } 370 }
382 } 371 }
(...skipping 15 matching lines...) Expand all
398 return sink._add(inputEvent); 387 return sink._add(inputEvent);
399 } else { 388 } else {
400 bool isEqual; 389 bool isEqual;
401 try { 390 try {
402 if (_equals == null) { 391 if (_equals == null) {
403 isEqual = (_previous == inputEvent); 392 isEqual = (_previous == inputEvent);
404 } else { 393 } else {
405 isEqual = _equals(_previous, inputEvent); 394 isEqual = _equals(_previous, inputEvent);
406 } 395 }
407 } catch (e, s) { 396 } catch (e, s) {
408 sink._addError(_asyncError(e, s), s); 397 sink._addError(e, s);
409 return null; 398 return null;
410 } 399 }
411 if (!isEqual) { 400 if (!isEqual) {
412 sink._add(inputEvent); 401 sink._add(inputEvent);
413 _previous = inputEvent; 402 _previous = inputEvent;
414 } 403 }
415 } 404 }
416 } 405 }
417 } 406 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698