Index: sdk/lib/async/stream.dart |
diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart |
index d1fe15b67e2edd1645fdbeed4e3772cbd1e5d27b..9f0f6a7ffb71f323955b13acb285445a4e536e61 100644 |
--- a/sdk/lib/async/stream.dart |
+++ b/sdk/lib/async/stream.dart |
@@ -115,30 +115,28 @@ abstract class Stream<T> { |
} |
controller = new StreamController<T>( |
- onPauseStateChange: () { |
- if (controller.isPaused) { |
- timer.cancel(); |
- timer = null; |
- watch.stop(); |
- } else { |
- assert(timer == null); |
- Duration elapsed = watch.elapsed; |
- watch.start(); |
- timer = new Timer(period - elapsed, () { |
- timer = null; |
- startPeriodicTimer(); |
- sendEvent(); |
- }); |
- } |
+ onListen: () { |
+ watch.start(); |
+ startPeriodicTimer(); |
}, |
- onSubscriptionStateChange: () { |
- if (controller.hasListener) { |
- watch.start(); |
- startPeriodicTimer(); |
- } else { |
- if (timer != null) timer.cancel(); |
+ onPause: () { |
+ timer.cancel(); |
+ timer = null; |
+ watch.stop(); |
+ }, |
+ onResume: () { |
+ assert(timer == null); |
+ Duration elapsed = watch.elapsed; |
+ watch.start(); |
+ timer = new Timer(period - elapsed, () { |
timer = null; |
- } |
+ startPeriodicTimer(); |
+ sendEvent(); |
+ }); |
+ }, |
+ onCancel: () { |
+ if (timer != null) timer.cancel(); |
+ timer = null; |
}); |
return controller.stream; |
} |
@@ -1014,23 +1012,15 @@ abstract class StreamEventTransformer<S, T> implements StreamTransformer<S, T> { |
StreamController controller; |
StreamSubscription subscription; |
controller = new StreamController<T>( |
- onPauseStateChange: () { |
- if (controller.isPaused) { |
- subscription.pause(); |
- } else { |
- subscription.resume(); |
- } |
+ onListen: () { |
+ subscription = transformingStream.listen( |
+ controller.add, |
+ onError: controller.addError, |
+ onDone: controller.close); |
}, |
- onSubscriptionStateChange: () { |
- if (controller.hasListener) { |
- subscription = transformingStream.listen( |
- controller.add, |
- onError: controller.addError, |
- onDone: controller.close); |
- } else { |
- subscription.cancel(); |
- } |
- }); |
+ onPause: () => subscription.pause(), |
+ onResume: () => subscription.resume(), |
+ onCancel: () => subscription.cancel()); |
return controller.stream; |
} |