Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(274)

Side by Side Diff: sdk/lib/async/stream.dart

Issue 2213193004: Revert "Return futures on Stream.cancel when possible." (Closed) Base URL: git@github.com:dart-lang/sdk.git@master
Patch Set: Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « CHANGELOG.md ('k') | sdk/lib/async/stream_impl.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Core Stream types 8 // Core Stream types
9 // ------------------------------------------------------------------- 9 // -------------------------------------------------------------------
10 10
(...skipping 214 matching lines...) Expand 10 before | Expand all | Expand 10 after
225 watch.start(); 225 watch.start();
226 timer = new Timer(period - elapsed, () { 226 timer = new Timer(period - elapsed, () {
227 timer = null; 227 timer = null;
228 startPeriodicTimer(); 228 startPeriodicTimer();
229 sendEvent(); 229 sendEvent();
230 }); 230 });
231 }, 231 },
232 onCancel: () { 232 onCancel: () {
233 if (timer != null) timer.cancel(); 233 if (timer != null) timer.cancel();
234 timer = null; 234 timer = null;
235 return Future._nullFuture;
236 }); 235 });
237 return controller.stream; 236 return controller.stream;
238 } 237 }
239 238
240 /** 239 /**
241 * Creates a stream where all events of an existing stream are piped through 240 * Creates a stream where all events of an existing stream are piped through
242 * a sink-transformation. 241 * a sink-transformation.
243 * 242 *
244 * The given [mapSink] closure is invoked when the returned stream is 243 * The given [mapSink] closure is invoked when the returned stream is
245 * listened to. All events from the [source] are added into the event sink 244 * listened to. All events from the [source] are added into the event sink
(...skipping 189 matching lines...) Expand 10 before | Expand all | Expand 10 after
435 controller = new StreamController/*<E>*/.broadcast( 434 controller = new StreamController/*<E>*/.broadcast(
436 onListen: onListen, 435 onListen: onListen,
437 onCancel: () { subscription.cancel(); }, 436 onCancel: () { subscription.cancel(); },
438 sync: true 437 sync: true
439 ); 438 );
440 } else { 439 } else {
441 controller = new StreamController/*<E>*/( 440 controller = new StreamController/*<E>*/(
442 onListen: onListen, 441 onListen: onListen,
443 onPause: () { subscription.pause(); }, 442 onPause: () { subscription.pause(); },
444 onResume: () { subscription.resume(); }, 443 onResume: () { subscription.resume(); },
445 onCancel: () => subscription.cancel(), 444 onCancel: () { subscription.cancel(); },
446 sync: true 445 sync: true
447 ); 446 );
448 } 447 }
449 return controller.stream; 448 return controller.stream;
450 } 449 }
451 450
452 /** 451 /**
453 * Creates a new stream with the events of a stream per original event. 452 * Creates a new stream with the events of a stream per original event.
454 * 453 *
455 * This acts like [expand], except that [convert] returns a [Stream] 454 * This acts like [expand], except that [convert] returns a [Stream]
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after
493 controller = new StreamController/*<E>*/.broadcast( 492 controller = new StreamController/*<E>*/.broadcast(
494 onListen: onListen, 493 onListen: onListen,
495 onCancel: () { subscription.cancel(); }, 494 onCancel: () { subscription.cancel(); },
496 sync: true 495 sync: true
497 ); 496 );
498 } else { 497 } else {
499 controller = new StreamController/*<E>*/( 498 controller = new StreamController/*<E>*/(
500 onListen: onListen, 499 onListen: onListen,
501 onPause: () { subscription.pause(); }, 500 onPause: () { subscription.pause(); },
502 onResume: () { subscription.resume(); }, 501 onResume: () { subscription.resume(); },
503 onCancel: () => subscription.cancel(), 502 onCancel: () { subscription.cancel(); },
504 sync: true 503 sync: true
505 ); 504 );
506 } 505 }
507 return controller.stream; 506 return controller.stream;
508 } 507 }
509 508
510 /** 509 /**
511 * Creates a wrapper Stream that intercepts some errors from this stream. 510 * Creates a wrapper Stream that intercepts some errors from this stream.
512 * 511 *
513 * If this stream sends an error that matches [test], then it is intercepted 512 * If this stream sends an error that matches [test], then it is intercepted
(...skipping 887 matching lines...) Expand 10 before | Expand all | Expand 10 after
1401 abstract class StreamSubscription<T> { 1400 abstract class StreamSubscription<T> {
1402 /** 1401 /**
1403 * Cancels this subscription. 1402 * Cancels this subscription.
1404 * 1403 *
1405 * After this call, the subscription no longer receives events. 1404 * After this call, the subscription no longer receives events.
1406 * 1405 *
1407 * The stream may need to shut down the source of events and clean up after 1406 * The stream may need to shut down the source of events and clean up after
1408 * the subscription is canceled. 1407 * the subscription is canceled.
1409 * 1408 *
1410 * Returns a future that is completed once the stream has finished 1409 * Returns a future that is completed once the stream has finished
1411 * its cleanup. 1410 * its cleanup. May also return `null` if no cleanup was necessary.
1412 *
1413 * For historical reasons, may also return `null` if no cleanup was necessary.
1414 * Returning `null` is deprecated and should be avoided.
1415 * 1411 *
1416 * Typically, futures are returned when the stream needs to release resources. 1412 * Typically, futures are returned when the stream needs to release resources.
1417 * For example, a stream might need to close an open file (as an asynchronous 1413 * For example, a stream might need to close an open file (as an asynchronous
1418 * operation). If the listener wants to delete the file after having 1414 * operation). If the listener wants to delete the file after having
1419 * canceled the subscription, it must wait for the cleanup future to complete. 1415 * canceled the subscription, it must wait for the cleanup future to complete.
1420 * 1416 *
1421 * A returned future completes with a `null` value. 1417 * A returned future completes with a `null` value.
1422 * If the cleanup throws, which it really shouldn't, the returned future 1418 * If the cleanup throws, which it really shouldn't, the returned future
1423 * completes with that error. 1419 * completes with that error.
1424 */ 1420 */
(...skipping 283 matching lines...) Expand 10 before | Expand all | Expand 10 after
1708 * // Duplicate the data. 1704 * // Duplicate the data.
1709 * controller.add(data); 1705 * controller.add(data);
1710 * controller.add(data); 1706 * controller.add(data);
1711 * }, 1707 * },
1712 * onError: controller.addError, 1708 * onError: controller.addError,
1713 * onDone: controller.close, 1709 * onDone: controller.close,
1714 * cancelOnError: cancelOnError); 1710 * cancelOnError: cancelOnError);
1715 * }, 1711 * },
1716 * onPause: () { subscription.pause(); }, 1712 * onPause: () { subscription.pause(); },
1717 * onResume: () { subscription.resume(); }, 1713 * onResume: () { subscription.resume(); },
1718 * onCancel: () => subscription.cancel(), 1714 * onCancel: () { subscription.cancel(); },
1719 * sync: true); 1715 * sync: true);
1720 * return controller.stream.listen(null); 1716 * return controller.stream.listen(null);
1721 * }); 1717 * });
1722 */ 1718 */
1723 const factory StreamTransformer( 1719 const factory StreamTransformer(
1724 StreamSubscription<T> transformer(Stream<S> stream, bool cancelOnError)) 1720 StreamSubscription<T> transformer(Stream<S> stream, bool cancelOnError))
1725 = _StreamSubscriptionTransformer<S, T>; 1721 = _StreamSubscriptionTransformer<S, T>;
1726 1722
1727 /** 1723 /**
1728 * Creates a [StreamTransformer] that delegates events to the given functions. 1724 * Creates a [StreamTransformer] that delegates events to the given functions.
(...skipping 97 matching lines...) Expand 10 before | Expand all | Expand 10 after
1826 class _ControllerEventSinkWrapper<T> implements EventSink<T> { 1822 class _ControllerEventSinkWrapper<T> implements EventSink<T> {
1827 EventSink _sink; 1823 EventSink _sink;
1828 _ControllerEventSinkWrapper(this._sink); 1824 _ControllerEventSinkWrapper(this._sink);
1829 1825
1830 void add(T data) { _sink.add(data); } 1826 void add(T data) { _sink.add(data); }
1831 void addError(error, [StackTrace stackTrace]) { 1827 void addError(error, [StackTrace stackTrace]) {
1832 _sink.addError(error, stackTrace); 1828 _sink.addError(error, stackTrace);
1833 } 1829 }
1834 void close() { _sink.close(); } 1830 void close() { _sink.close(); }
1835 } 1831 }
OLDNEW
« no previous file with comments | « CHANGELOG.md ('k') | sdk/lib/async/stream_impl.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698