OLD | NEW |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
8 // Core Stream types | 8 // Core Stream types |
9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
10 | 10 |
(...skipping 97 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
108 } | 108 } |
109 | 109 |
110 void startPeriodicTimer() { | 110 void startPeriodicTimer() { |
111 assert(timer == null); | 111 assert(timer == null); |
112 timer = new Timer.periodic(period, (Timer timer) { | 112 timer = new Timer.periodic(period, (Timer timer) { |
113 sendEvent(); | 113 sendEvent(); |
114 }); | 114 }); |
115 } | 115 } |
116 | 116 |
117 controller = new StreamController<T>( | 117 controller = new StreamController<T>( |
118 onPauseStateChange: () { | 118 onListen: () { |
119 if (controller.isPaused) { | 119 watch.start(); |
120 timer.cancel(); | 120 startPeriodicTimer(); |
| 121 }, |
| 122 onPause: () { |
| 123 timer.cancel(); |
| 124 timer = null; |
| 125 watch.stop(); |
| 126 }, |
| 127 onResume: () { |
| 128 assert(timer == null); |
| 129 Duration elapsed = watch.elapsed; |
| 130 watch.start(); |
| 131 timer = new Timer(period - elapsed, () { |
121 timer = null; | 132 timer = null; |
122 watch.stop(); | 133 startPeriodicTimer(); |
123 } else { | 134 sendEvent(); |
124 assert(timer == null); | 135 }); |
125 Duration elapsed = watch.elapsed; | |
126 watch.start(); | |
127 timer = new Timer(period - elapsed, () { | |
128 timer = null; | |
129 startPeriodicTimer(); | |
130 sendEvent(); | |
131 }); | |
132 } | |
133 }, | 136 }, |
134 onSubscriptionStateChange: () { | 137 onCancel: () { |
135 if (controller.hasListener) { | 138 if (timer != null) timer.cancel(); |
136 watch.start(); | 139 timer = null; |
137 startPeriodicTimer(); | |
138 } else { | |
139 if (timer != null) timer.cancel(); | |
140 timer = null; | |
141 } | |
142 }); | 140 }); |
143 return controller.stream; | 141 return controller.stream; |
144 } | 142 } |
145 | 143 |
146 /** | 144 /** |
147 * Reports whether this stream is a broadcast stream. | 145 * Reports whether this stream is a broadcast stream. |
148 */ | 146 */ |
149 bool get isBroadcast => false; | 147 bool get isBroadcast => false; |
150 | 148 |
151 /** | 149 /** |
(...skipping 814 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
966 abstract class StreamEventTransformer<S, T> implements StreamTransformer<S, T> { | 964 abstract class StreamEventTransformer<S, T> implements StreamTransformer<S, T> { |
967 const StreamEventTransformer(); | 965 const StreamEventTransformer(); |
968 | 966 |
969 Stream<T> bind(Stream<S> source) { | 967 Stream<T> bind(Stream<S> source) { |
970 // Hackish way of buffering data that goes out of the event-transformer. | 968 // Hackish way of buffering data that goes out of the event-transformer. |
971 // TODO(floitsch): replace this with a correct solution. | 969 // TODO(floitsch): replace this with a correct solution. |
972 Stream transformingStream = new EventTransformStream<S, T>(source, this); | 970 Stream transformingStream = new EventTransformStream<S, T>(source, this); |
973 StreamController controller; | 971 StreamController controller; |
974 StreamSubscription subscription; | 972 StreamSubscription subscription; |
975 controller = new StreamController<T>( | 973 controller = new StreamController<T>( |
976 onPauseStateChange: () { | 974 onListen: () { |
977 if (controller.isPaused) { | 975 subscription = transformingStream.listen( |
978 subscription.pause(); | 976 controller.add, |
979 } else { | 977 onError: controller.addError, |
980 subscription.resume(); | 978 onDone: controller.close); |
981 } | |
982 }, | 979 }, |
983 onSubscriptionStateChange: () { | 980 onPause: () => subscription.pause(), |
984 if (controller.hasListener) { | 981 onResume: () => subscription.resume(), |
985 subscription = transformingStream.listen( | 982 onCancel: () => subscription.cancel()); |
986 controller.add, | |
987 onError: controller.addError, | |
988 onDone: controller.close); | |
989 } else { | |
990 subscription.cancel(); | |
991 } | |
992 }); | |
993 return controller.stream; | 983 return controller.stream; |
994 } | 984 } |
995 | 985 |
996 /** | 986 /** |
997 * Act on incoming data event. | 987 * Act on incoming data event. |
998 * | 988 * |
999 * The method may generate any number of events on the sink, but should | 989 * The method may generate any number of events on the sink, but should |
1000 * not throw. | 990 * not throw. |
1001 */ | 991 */ |
1002 void handleData(S event, EventSink<T> sink) { | 992 void handleData(S event, EventSink<T> sink) { |
(...skipping 147 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1150 } | 1140 } |
1151 | 1141 |
1152 class _EventOutputSinkWrapper<T> extends EventSink<T> { | 1142 class _EventOutputSinkWrapper<T> extends EventSink<T> { |
1153 _EventOutputSink _sink; | 1143 _EventOutputSink _sink; |
1154 _EventOutputSinkWrapper(this._sink); | 1144 _EventOutputSinkWrapper(this._sink); |
1155 | 1145 |
1156 void add(T data) { _sink._sendData(data); } | 1146 void add(T data) { _sink._sendData(data); } |
1157 void addError(AsyncError error) { _sink._sendError(error); } | 1147 void addError(AsyncError error) { _sink._sendError(error); } |
1158 void close() { _sink._sendDone(); } | 1148 void close() { _sink._sendDone(); } |
1159 } | 1149 } |
OLD | NEW |