| OLD | NEW |
| 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
| 8 // Core Stream types | 8 // Core Stream types |
| 9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
| 10 | 10 |
| (...skipping 1117 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1128 } | 1128 } |
| 1129 } | 1129 } |
| 1130 | 1130 |
| 1131 class _EventTransformStreamSubscription<S, T> | 1131 class _EventTransformStreamSubscription<S, T> |
| 1132 extends _BaseStreamSubscription<T> | 1132 extends _BaseStreamSubscription<T> |
| 1133 implements _EventOutputSink<T> { | 1133 implements _EventOutputSink<T> { |
| 1134 /** The transformer used to transform events. */ | 1134 /** The transformer used to transform events. */ |
| 1135 final StreamEventTransformer<S, T> _transformer; | 1135 final StreamEventTransformer<S, T> _transformer; |
| 1136 /** Whether to unsubscribe when emitting an error. */ | 1136 /** Whether to unsubscribe when emitting an error. */ |
| 1137 final bool _unsubscribeOnError; | 1137 final bool _unsubscribeOnError; |
| 1138 /** Whether this stream has sent a done event. */ |
| 1139 bool _isClosed = false; |
| 1138 /** Source of incoming events. */ | 1140 /** Source of incoming events. */ |
| 1139 StreamSubscription<S> _subscription; | 1141 StreamSubscription<S> _subscription; |
| 1140 /** Cached EventSink wrapper for this class. */ | 1142 /** Cached EventSink wrapper for this class. */ |
| 1141 EventSink<T> _sink; | 1143 EventSink<T> _sink; |
| 1142 | 1144 |
| 1143 _EventTransformStreamSubscription(Stream<S> source, | 1145 _EventTransformStreamSubscription(Stream<S> source, |
| 1144 this._transformer, | 1146 this._transformer, |
| 1145 void onData(T data), | 1147 void onData(T data), |
| 1146 void onError(AsyncError error), | 1148 void onError(AsyncError error), |
| 1147 void onDone(), | 1149 void onDone(), |
| (...skipping 11 matching lines...) Expand all Loading... |
| 1159 void pause([Future pauseSignal]) { | 1161 void pause([Future pauseSignal]) { |
| 1160 if (_isSubscribed) _subscription.pause(pauseSignal); | 1162 if (_isSubscribed) _subscription.pause(pauseSignal); |
| 1161 } | 1163 } |
| 1162 | 1164 |
| 1163 void resume() { | 1165 void resume() { |
| 1164 if (_isSubscribed) _subscription.resume(); | 1166 if (_isSubscribed) _subscription.resume(); |
| 1165 } | 1167 } |
| 1166 | 1168 |
| 1167 void cancel() { | 1169 void cancel() { |
| 1168 if (_isSubscribed) { | 1170 if (_isSubscribed) { |
| 1169 _subscription.cancel(); | 1171 StreamSubscription subscription = _subscription; |
| 1170 _subscription = null; | 1172 _subscription = null; |
| 1173 subscription.cancel(); |
| 1171 } | 1174 } |
| 1175 _isClosed = true; |
| 1172 } | 1176 } |
| 1173 | 1177 |
| 1174 void _handleData(S data) { | 1178 void _handleData(S data) { |
| 1175 try { | 1179 try { |
| 1176 _transformer.handleData(data, _sink); | 1180 _transformer.handleData(data, _sink); |
| 1177 } catch (e, s) { | 1181 } catch (e, s) { |
| 1178 _sendError(_asyncError(e, s)); | 1182 _sendError(_asyncError(e, s)); |
| 1179 } | 1183 } |
| 1180 } | 1184 } |
| 1181 | 1185 |
| 1182 void _handleError(AsyncError error) { | 1186 void _handleError(AsyncError error) { |
| 1183 try { | 1187 try { |
| 1184 _transformer.handleError(error, _sink); | 1188 _transformer.handleError(error, _sink); |
| 1185 } catch (e, s) { | 1189 } catch (e, s) { |
| 1186 _sendError(_asyncError(e, s, error)); | 1190 _sendError(_asyncError(e, s, error)); |
| 1187 } | 1191 } |
| 1188 } | 1192 } |
| 1189 | 1193 |
| 1190 void _handleDone() { | 1194 void _handleDone() { |
| 1191 try { | 1195 try { |
| 1196 _subscription = null; |
| 1192 _transformer.handleDone(_sink); | 1197 _transformer.handleDone(_sink); |
| 1193 } catch (e, s) { | 1198 } catch (e, s) { |
| 1194 _sendError(_asyncError(e, s)); | 1199 _sendError(_asyncError(e, s)); |
| 1195 } | 1200 } |
| 1196 } | 1201 } |
| 1197 | 1202 |
| 1198 // EventOutputSink interface. | 1203 // EventOutputSink interface. |
| 1199 void _sendData(T data) { | 1204 void _sendData(T data) { |
| 1200 if (!_isSubscribed) return; | 1205 if (_isClosed) return; |
| 1201 _onData(data); | 1206 _onData(data); |
| 1202 } | 1207 } |
| 1203 | 1208 |
| 1204 void _sendError(AsyncError error) { | 1209 void _sendError(AsyncError error) { |
| 1205 if (!_isSubscribed) return; | 1210 if (_isClosed) return; |
| 1206 _onError(error); | 1211 _onError(error); |
| 1207 if (_unsubscribeOnError) { | 1212 if (_unsubscribeOnError) { |
| 1208 cancel(); | 1213 cancel(); |
| 1209 } | 1214 } |
| 1210 } | 1215 } |
| 1211 | 1216 |
| 1212 void _sendDone() { | 1217 void _sendDone() { |
| 1213 if (!_isSubscribed) return; | 1218 if (_isClosed) throw new StateError("Already closed."); |
| 1214 _subscription.cancel(); | 1219 _isClosed = true; |
| 1215 _subscription = null; | 1220 if (_isSubscribed) { |
| 1221 _subscription.cancel(); |
| 1222 _subscription = null; |
| 1223 } |
| 1216 _onDone(); | 1224 _onDone(); |
| 1217 } | 1225 } |
| 1218 } | 1226 } |
| 1219 | 1227 |
| 1220 /* TODO(8997): Implement EventSink instead, */ | 1228 /* TODO(8997): Implement EventSink instead, */ |
| 1221 class _EventOutputSinkWrapper<T> extends StreamSink<T> { | 1229 class _EventOutputSinkWrapper<T> extends StreamSink<T> { |
| 1222 _EventOutputSink _sink; | 1230 _EventOutputSink _sink; |
| 1223 _EventOutputSinkWrapper(this._sink); | 1231 _EventOutputSinkWrapper(this._sink); |
| 1224 | 1232 |
| 1225 void add(T data) { _sink._sendData(data); } | 1233 void add(T data) { _sink._sendData(data); } |
| 1226 void addError(AsyncError error) { _sink._sendError(error); } | 1234 void addError(AsyncError error) { _sink._sendError(error); } |
| 1227 void close() { _sink._sendDone(); } | 1235 void close() { _sink._sendDone(); } |
| 1228 } | 1236 } |
| OLD | NEW |