OLD | NEW |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
8 // Core Stream types | 8 // Core Stream types |
9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
10 | 10 |
(...skipping 114 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
125 Duration elapsed = watch.elapsed; | 125 Duration elapsed = watch.elapsed; |
126 watch.start(); | 126 watch.start(); |
127 timer = new Timer(period - elapsed, () { | 127 timer = new Timer(period - elapsed, () { |
128 timer = null; | 128 timer = null; |
129 startPeriodicTimer(); | 129 startPeriodicTimer(); |
130 sendEvent(); | 130 sendEvent(); |
131 }); | 131 }); |
132 } | 132 } |
133 }, | 133 }, |
134 onSubscriptionStateChange: () { | 134 onSubscriptionStateChange: () { |
135 if (controller.hasSubscribers) { | 135 if (controller.hasListener) { |
136 watch.start(); | 136 watch.start(); |
137 startPeriodicTimer(); | 137 startPeriodicTimer(); |
138 } else { | 138 } else { |
139 if (timer != null) timer.cancel(); | 139 if (timer != null) timer.cancel(); |
140 timer = null; | 140 timer = null; |
141 } | 141 } |
142 }); | 142 }); |
143 return controller.stream; | 143 return controller.stream; |
144 } | 144 } |
145 | 145 |
(...skipping 828 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
974 StreamSubscription subscription; | 974 StreamSubscription subscription; |
975 controller = new StreamController<T>( | 975 controller = new StreamController<T>( |
976 onPauseStateChange: () { | 976 onPauseStateChange: () { |
977 if (controller.isPaused) { | 977 if (controller.isPaused) { |
978 subscription.pause(); | 978 subscription.pause(); |
979 } else { | 979 } else { |
980 subscription.resume(); | 980 subscription.resume(); |
981 } | 981 } |
982 }, | 982 }, |
983 onSubscriptionStateChange: () { | 983 onSubscriptionStateChange: () { |
984 if (controller.hasSubscribers) { | 984 if (controller.hasListener) { |
985 subscription = transformingStream.listen( | 985 subscription = transformingStream.listen( |
986 controller.add, | 986 controller.add, |
987 onError: controller.addError, | 987 onError: controller.addError, |
988 onDone: controller.close); | 988 onDone: controller.close); |
989 } else { | 989 } else { |
990 subscription.cancel(); | 990 subscription.cancel(); |
991 } | 991 } |
992 }); | 992 }); |
993 return controller.stream; | 993 return controller.stream; |
994 } | 994 } |
(...skipping 155 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1150 } | 1150 } |
1151 | 1151 |
1152 class _EventOutputSinkWrapper<T> extends EventSink<T> { | 1152 class _EventOutputSinkWrapper<T> extends EventSink<T> { |
1153 _EventOutputSink _sink; | 1153 _EventOutputSink _sink; |
1154 _EventOutputSinkWrapper(this._sink); | 1154 _EventOutputSinkWrapper(this._sink); |
1155 | 1155 |
1156 void add(T data) { _sink._sendData(data); } | 1156 void add(T data) { _sink._sendData(data); } |
1157 void addError(AsyncError error) { _sink._sendError(error); } | 1157 void addError(AsyncError error) { _sink._sendError(error); } |
1158 void close() { _sink._sendDone(); } | 1158 void close() { _sink._sendDone(); } |
1159 } | 1159 } |
OLD | NEW |