| OLD | NEW |
| 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
| 8 // Core Stream types | 8 // Core Stream types |
| 9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
| 10 | 10 |
| (...skipping 97 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 108 } | 108 } |
| 109 | 109 |
| 110 void startPeriodicTimer() { | 110 void startPeriodicTimer() { |
| 111 assert(timer == null); | 111 assert(timer == null); |
| 112 timer = new Timer.periodic(period, (Timer timer) { | 112 timer = new Timer.periodic(period, (Timer timer) { |
| 113 sendEvent(); | 113 sendEvent(); |
| 114 }); | 114 }); |
| 115 } | 115 } |
| 116 | 116 |
| 117 controller = new StreamController<T>( | 117 controller = new StreamController<T>( |
| 118 onPauseStateChange: () { | 118 onListen: () { |
| 119 if (controller.isPaused) { | 119 watch.start(); |
| 120 timer.cancel(); | 120 startPeriodicTimer(); |
| 121 }, |
| 122 onPause: () { |
| 123 timer.cancel(); |
| 124 timer = null; |
| 125 watch.stop(); |
| 126 }, |
| 127 onResume: () { |
| 128 assert(timer == null); |
| 129 Duration elapsed = watch.elapsed; |
| 130 watch.start(); |
| 131 timer = new Timer(period - elapsed, () { |
| 121 timer = null; | 132 timer = null; |
| 122 watch.stop(); | 133 startPeriodicTimer(); |
| 123 } else { | 134 sendEvent(); |
| 124 assert(timer == null); | 135 }); |
| 125 Duration elapsed = watch.elapsed; | |
| 126 watch.start(); | |
| 127 timer = new Timer(period - elapsed, () { | |
| 128 timer = null; | |
| 129 startPeriodicTimer(); | |
| 130 sendEvent(); | |
| 131 }); | |
| 132 } | |
| 133 }, | 136 }, |
| 134 onSubscriptionStateChange: () { | 137 onCancel: () { |
| 135 if (controller.hasListener) { | 138 if (timer != null) timer.cancel(); |
| 136 watch.start(); | 139 timer = null; |
| 137 startPeriodicTimer(); | |
| 138 } else { | |
| 139 if (timer != null) timer.cancel(); | |
| 140 timer = null; | |
| 141 } | |
| 142 }); | 140 }); |
| 143 return controller.stream; | 141 return controller.stream; |
| 144 } | 142 } |
| 145 | 143 |
| 146 /** | 144 /** |
| 147 * Reports whether this stream is a broadcast stream. | 145 * Reports whether this stream is a broadcast stream. |
| 148 */ | 146 */ |
| 149 bool get isBroadcast => false; | 147 bool get isBroadcast => false; |
| 150 | 148 |
| 151 /** | 149 /** |
| (...skipping 855 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1007 abstract class StreamEventTransformer<S, T> implements StreamTransformer<S, T> { | 1005 abstract class StreamEventTransformer<S, T> implements StreamTransformer<S, T> { |
| 1008 const StreamEventTransformer(); | 1006 const StreamEventTransformer(); |
| 1009 | 1007 |
| 1010 Stream<T> bind(Stream<S> source) { | 1008 Stream<T> bind(Stream<S> source) { |
| 1011 // Hackish way of buffering data that goes out of the event-transformer. | 1009 // Hackish way of buffering data that goes out of the event-transformer. |
| 1012 // TODO(floitsch): replace this with a correct solution. | 1010 // TODO(floitsch): replace this with a correct solution. |
| 1013 Stream transformingStream = new EventTransformStream<S, T>(source, this); | 1011 Stream transformingStream = new EventTransformStream<S, T>(source, this); |
| 1014 StreamController controller; | 1012 StreamController controller; |
| 1015 StreamSubscription subscription; | 1013 StreamSubscription subscription; |
| 1016 controller = new StreamController<T>( | 1014 controller = new StreamController<T>( |
| 1017 onPauseStateChange: () { | 1015 onListen: () { |
| 1018 if (controller.isPaused) { | 1016 subscription = transformingStream.listen( |
| 1019 subscription.pause(); | 1017 controller.add, |
| 1020 } else { | 1018 onError: controller.addError, |
| 1021 subscription.resume(); | 1019 onDone: controller.close); |
| 1022 } | |
| 1023 }, | 1020 }, |
| 1024 onSubscriptionStateChange: () { | 1021 onPause: () => subscription.pause(), |
| 1025 if (controller.hasListener) { | 1022 onResume: () => subscription.resume(), |
| 1026 subscription = transformingStream.listen( | 1023 onCancel: () => subscription.cancel()); |
| 1027 controller.add, | |
| 1028 onError: controller.addError, | |
| 1029 onDone: controller.close); | |
| 1030 } else { | |
| 1031 subscription.cancel(); | |
| 1032 } | |
| 1033 }); | |
| 1034 return controller.stream; | 1024 return controller.stream; |
| 1035 } | 1025 } |
| 1036 | 1026 |
| 1037 /** | 1027 /** |
| 1038 * Act on incoming data event. | 1028 * Act on incoming data event. |
| 1039 * | 1029 * |
| 1040 * The method may generate any number of events on the sink, but should | 1030 * The method may generate any number of events on the sink, but should |
| 1041 * not throw. | 1031 * not throw. |
| 1042 */ | 1032 */ |
| 1043 void handleData(S event, EventSink<T> sink) { | 1033 void handleData(S event, EventSink<T> sink) { |
| (...skipping 147 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1191 } | 1181 } |
| 1192 | 1182 |
| 1193 class _EventOutputSinkWrapper<T> extends EventSink<T> { | 1183 class _EventOutputSinkWrapper<T> extends EventSink<T> { |
| 1194 _EventOutputSink _sink; | 1184 _EventOutputSink _sink; |
| 1195 _EventOutputSinkWrapper(this._sink); | 1185 _EventOutputSinkWrapper(this._sink); |
| 1196 | 1186 |
| 1197 void add(T data) { _sink._sendData(data); } | 1187 void add(T data) { _sink._sendData(data); } |
| 1198 void addError(AsyncError error) { _sink._sendError(error); } | 1188 void addError(AsyncError error) { _sink._sendError(error); } |
| 1199 void close() { _sink._sendDone(); } | 1189 void close() { _sink._sendDone(); } |
| 1200 } | 1190 } |
| OLD | NEW |