Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(362)

Side by Side Diff: sdk/lib/async/stream.dart

Issue 2202533003: Return futures on Stream.cancel when possible. (Closed) Base URL: git@github.com:dart-lang/sdk.git@master
Patch Set: Don't make Pipe.cancel wait for the null future. Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « CHANGELOG.md ('k') | sdk/lib/async/stream_impl.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Core Stream types 8 // Core Stream types
9 // ------------------------------------------------------------------- 9 // -------------------------------------------------------------------
10 10
(...skipping 214 matching lines...) Expand 10 before | Expand all | Expand 10 after
225 watch.start(); 225 watch.start();
226 timer = new Timer(period - elapsed, () { 226 timer = new Timer(period - elapsed, () {
227 timer = null; 227 timer = null;
228 startPeriodicTimer(); 228 startPeriodicTimer();
229 sendEvent(); 229 sendEvent();
230 }); 230 });
231 }, 231 },
232 onCancel: () { 232 onCancel: () {
233 if (timer != null) timer.cancel(); 233 if (timer != null) timer.cancel();
234 timer = null; 234 timer = null;
235 return Future._nullFuture;
235 }); 236 });
236 return controller.stream; 237 return controller.stream;
237 } 238 }
238 239
239 /** 240 /**
240 * Creates a stream where all events of an existing stream are piped through 241 * Creates a stream where all events of an existing stream are piped through
241 * a sink-transformation. 242 * a sink-transformation.
242 * 243 *
243 * The given [mapSink] closure is invoked when the returned stream is 244 * The given [mapSink] closure is invoked when the returned stream is
244 * listened to. All events from the [source] are added into the event sink 245 * listened to. All events from the [source] are added into the event sink
(...skipping 189 matching lines...) Expand 10 before | Expand all | Expand 10 after
434 controller = new StreamController/*<E>*/.broadcast( 435 controller = new StreamController/*<E>*/.broadcast(
435 onListen: onListen, 436 onListen: onListen,
436 onCancel: () { subscription.cancel(); }, 437 onCancel: () { subscription.cancel(); },
437 sync: true 438 sync: true
438 ); 439 );
439 } else { 440 } else {
440 controller = new StreamController/*<E>*/( 441 controller = new StreamController/*<E>*/(
441 onListen: onListen, 442 onListen: onListen,
442 onPause: () { subscription.pause(); }, 443 onPause: () { subscription.pause(); },
443 onResume: () { subscription.resume(); }, 444 onResume: () { subscription.resume(); },
444 onCancel: () { subscription.cancel(); }, 445 onCancel: () => subscription.cancel(),
445 sync: true 446 sync: true
446 ); 447 );
447 } 448 }
448 return controller.stream; 449 return controller.stream;
449 } 450 }
450 451
451 /** 452 /**
452 * Creates a new stream with the events of a stream per original event. 453 * Creates a new stream with the events of a stream per original event.
453 * 454 *
454 * This acts like [expand], except that [convert] returns a [Stream] 455 * This acts like [expand], except that [convert] returns a [Stream]
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after
492 controller = new StreamController/*<E>*/.broadcast( 493 controller = new StreamController/*<E>*/.broadcast(
493 onListen: onListen, 494 onListen: onListen,
494 onCancel: () { subscription.cancel(); }, 495 onCancel: () { subscription.cancel(); },
495 sync: true 496 sync: true
496 ); 497 );
497 } else { 498 } else {
498 controller = new StreamController/*<E>*/( 499 controller = new StreamController/*<E>*/(
499 onListen: onListen, 500 onListen: onListen,
500 onPause: () { subscription.pause(); }, 501 onPause: () { subscription.pause(); },
501 onResume: () { subscription.resume(); }, 502 onResume: () { subscription.resume(); },
502 onCancel: () { subscription.cancel(); }, 503 onCancel: () => subscription.cancel(),
503 sync: true 504 sync: true
504 ); 505 );
505 } 506 }
506 return controller.stream; 507 return controller.stream;
507 } 508 }
508 509
509 /** 510 /**
510 * Creates a wrapper Stream that intercepts some errors from this stream. 511 * Creates a wrapper Stream that intercepts some errors from this stream.
511 * 512 *
512 * If this stream sends an error that matches [test], then it is intercepted 513 * If this stream sends an error that matches [test], then it is intercepted
(...skipping 887 matching lines...) Expand 10 before | Expand all | Expand 10 after
1400 abstract class StreamSubscription<T> { 1401 abstract class StreamSubscription<T> {
1401 /** 1402 /**
1402 * Cancels this subscription. 1403 * Cancels this subscription.
1403 * 1404 *
1404 * After this call, the subscription no longer receives events. 1405 * After this call, the subscription no longer receives events.
1405 * 1406 *
1406 * The stream may need to shut down the source of events and clean up after 1407 * The stream may need to shut down the source of events and clean up after
1407 * the subscription is canceled. 1408 * the subscription is canceled.
1408 * 1409 *
1409 * Returns a future that is completed once the stream has finished 1410 * Returns a future that is completed once the stream has finished
1410 * its cleanup. May also return `null` if no cleanup was necessary. 1411 * its cleanup.
1412 *
1413 * For historical reasons, may also return `null` if no cleanup was necessary.
1414 * Returning `null` is deprecated and should be avoided.
1411 * 1415 *
1412 * Typically, futures are returned when the stream needs to release resources. 1416 * Typically, futures are returned when the stream needs to release resources.
1413 * For example, a stream might need to close an open file (as an asynchronous 1417 * For example, a stream might need to close an open file (as an asynchronous
1414 * operation). If the listener wants to delete the file after having 1418 * operation). If the listener wants to delete the file after having
1415 * canceled the subscription, it must wait for the cleanup future to complete. 1419 * canceled the subscription, it must wait for the cleanup future to complete.
1416 * 1420 *
1417 * A returned future completes with a `null` value. 1421 * A returned future completes with a `null` value.
1418 * If the cleanup throws, which it really shouldn't, the returned future 1422 * If the cleanup throws, which it really shouldn't, the returned future
1419 * completes with that error. 1423 * completes with that error.
1420 */ 1424 */
(...skipping 283 matching lines...) Expand 10 before | Expand all | Expand 10 after
1704 * // Duplicate the data. 1708 * // Duplicate the data.
1705 * controller.add(data); 1709 * controller.add(data);
1706 * controller.add(data); 1710 * controller.add(data);
1707 * }, 1711 * },
1708 * onError: controller.addError, 1712 * onError: controller.addError,
1709 * onDone: controller.close, 1713 * onDone: controller.close,
1710 * cancelOnError: cancelOnError); 1714 * cancelOnError: cancelOnError);
1711 * }, 1715 * },
1712 * onPause: () { subscription.pause(); }, 1716 * onPause: () { subscription.pause(); },
1713 * onResume: () { subscription.resume(); }, 1717 * onResume: () { subscription.resume(); },
1714 * onCancel: () { subscription.cancel(); }, 1718 * onCancel: () => subscription.cancel(),
1715 * sync: true); 1719 * sync: true);
1716 * return controller.stream.listen(null); 1720 * return controller.stream.listen(null);
1717 * }); 1721 * });
1718 */ 1722 */
1719 const factory StreamTransformer( 1723 const factory StreamTransformer(
1720 StreamSubscription<T> transformer(Stream<S> stream, bool cancelOnError)) 1724 StreamSubscription<T> transformer(Stream<S> stream, bool cancelOnError))
1721 = _StreamSubscriptionTransformer<S, T>; 1725 = _StreamSubscriptionTransformer<S, T>;
1722 1726
1723 /** 1727 /**
1724 * Creates a [StreamTransformer] that delegates events to the given functions. 1728 * Creates a [StreamTransformer] that delegates events to the given functions.
(...skipping 97 matching lines...) Expand 10 before | Expand all | Expand 10 after
1822 class _ControllerEventSinkWrapper<T> implements EventSink<T> { 1826 class _ControllerEventSinkWrapper<T> implements EventSink<T> {
1823 EventSink _sink; 1827 EventSink _sink;
1824 _ControllerEventSinkWrapper(this._sink); 1828 _ControllerEventSinkWrapper(this._sink);
1825 1829
1826 void add(T data) { _sink.add(data); } 1830 void add(T data) { _sink.add(data); }
1827 void addError(error, [StackTrace stackTrace]) { 1831 void addError(error, [StackTrace stackTrace]) {
1828 _sink.addError(error, stackTrace); 1832 _sink.addError(error, stackTrace);
1829 } 1833 }
1830 void close() { _sink.close(); } 1834 void close() { _sink.close(); }
1831 } 1835 }
OLDNEW
« no previous file with comments | « CHANGELOG.md ('k') | sdk/lib/async/stream_impl.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698