OLD | NEW |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
8 // Core Stream types | 8 // Core Stream types |
9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
10 | 10 |
(...skipping 1117 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1128 } | 1128 } |
1129 } | 1129 } |
1130 | 1130 |
1131 class _EventTransformStreamSubscription<S, T> | 1131 class _EventTransformStreamSubscription<S, T> |
1132 extends _BaseStreamSubscription<T> | 1132 extends _BaseStreamSubscription<T> |
1133 implements _EventOutputSink<T> { | 1133 implements _EventOutputSink<T> { |
1134 /** The transformer used to transform events. */ | 1134 /** The transformer used to transform events. */ |
1135 final StreamEventTransformer<S, T> _transformer; | 1135 final StreamEventTransformer<S, T> _transformer; |
1136 /** Whether to unsubscribe when emitting an error. */ | 1136 /** Whether to unsubscribe when emitting an error. */ |
1137 final bool _unsubscribeOnError; | 1137 final bool _unsubscribeOnError; |
| 1138 /** Whether this stream has sent a done event. */ |
| 1139 bool _isClosed = false; |
1138 /** Source of incoming events. */ | 1140 /** Source of incoming events. */ |
1139 StreamSubscription<S> _subscription; | 1141 StreamSubscription<S> _subscription; |
1140 /** Cached EventSink wrapper for this class. */ | 1142 /** Cached EventSink wrapper for this class. */ |
1141 EventSink<T> _sink; | 1143 EventSink<T> _sink; |
1142 | 1144 |
1143 _EventTransformStreamSubscription(Stream<S> source, | 1145 _EventTransformStreamSubscription(Stream<S> source, |
1144 this._transformer, | 1146 this._transformer, |
1145 void onData(T data), | 1147 void onData(T data), |
1146 void onError(AsyncError error), | 1148 void onError(AsyncError error), |
1147 void onDone(), | 1149 void onDone(), |
(...skipping 11 matching lines...) Expand all Loading... |
1159 void pause([Future pauseSignal]) { | 1161 void pause([Future pauseSignal]) { |
1160 if (_isSubscribed) _subscription.pause(pauseSignal); | 1162 if (_isSubscribed) _subscription.pause(pauseSignal); |
1161 } | 1163 } |
1162 | 1164 |
1163 void resume() { | 1165 void resume() { |
1164 if (_isSubscribed) _subscription.resume(); | 1166 if (_isSubscribed) _subscription.resume(); |
1165 } | 1167 } |
1166 | 1168 |
1167 void cancel() { | 1169 void cancel() { |
1168 if (_isSubscribed) { | 1170 if (_isSubscribed) { |
1169 _subscription.cancel(); | 1171 StreamSubscription subscription = _subscription; |
1170 _subscription = null; | 1172 _subscription = null; |
| 1173 subscription.cancel(); |
1171 } | 1174 } |
| 1175 _isClosed = true; |
1172 } | 1176 } |
1173 | 1177 |
1174 void _handleData(S data) { | 1178 void _handleData(S data) { |
1175 try { | 1179 try { |
1176 _transformer.handleData(data, _sink); | 1180 _transformer.handleData(data, _sink); |
1177 } catch (e, s) { | 1181 } catch (e, s) { |
1178 _sendError(_asyncError(e, s)); | 1182 _sendError(_asyncError(e, s)); |
1179 } | 1183 } |
1180 } | 1184 } |
1181 | 1185 |
1182 void _handleError(AsyncError error) { | 1186 void _handleError(AsyncError error) { |
1183 try { | 1187 try { |
1184 _transformer.handleError(error, _sink); | 1188 _transformer.handleError(error, _sink); |
1185 } catch (e, s) { | 1189 } catch (e, s) { |
1186 _sendError(_asyncError(e, s, error)); | 1190 _sendError(_asyncError(e, s, error)); |
1187 } | 1191 } |
1188 } | 1192 } |
1189 | 1193 |
1190 void _handleDone() { | 1194 void _handleDone() { |
1191 try { | 1195 try { |
| 1196 _subscription = null; |
1192 _transformer.handleDone(_sink); | 1197 _transformer.handleDone(_sink); |
1193 } catch (e, s) { | 1198 } catch (e, s) { |
1194 _sendError(_asyncError(e, s)); | 1199 _sendError(_asyncError(e, s)); |
1195 } | 1200 } |
1196 } | 1201 } |
1197 | 1202 |
1198 // EventOutputSink interface. | 1203 // EventOutputSink interface. |
1199 void _sendData(T data) { | 1204 void _sendData(T data) { |
1200 if (!_isSubscribed) return; | 1205 if (_isClosed) return; |
1201 _onData(data); | 1206 _onData(data); |
1202 } | 1207 } |
1203 | 1208 |
1204 void _sendError(AsyncError error) { | 1209 void _sendError(AsyncError error) { |
1205 if (!_isSubscribed) return; | 1210 if (_isClosed) return; |
1206 _onError(error); | 1211 _onError(error); |
1207 if (_unsubscribeOnError) { | 1212 if (_unsubscribeOnError) { |
1208 cancel(); | 1213 cancel(); |
1209 } | 1214 } |
1210 } | 1215 } |
1211 | 1216 |
1212 void _sendDone() { | 1217 void _sendDone() { |
1213 if (!_isSubscribed) return; | 1218 if (_isClosed) throw new StateError("Already closed."); |
1214 _subscription.cancel(); | 1219 _isClosed = true; |
1215 _subscription = null; | 1220 if (_isSubscribed) { |
| 1221 _subscription.cancel(); |
| 1222 _subscription = null; |
| 1223 } |
1216 _onDone(); | 1224 _onDone(); |
1217 } | 1225 } |
1218 } | 1226 } |
1219 | 1227 |
1220 /* TODO(8997): Implement EventSink instead, */ | 1228 /* TODO(8997): Implement EventSink instead, */ |
1221 class _EventOutputSinkWrapper<T> extends StreamSink<T> { | 1229 class _EventOutputSinkWrapper<T> extends StreamSink<T> { |
1222 _EventOutputSink _sink; | 1230 _EventOutputSink _sink; |
1223 _EventOutputSinkWrapper(this._sink); | 1231 _EventOutputSinkWrapper(this._sink); |
1224 | 1232 |
1225 void add(T data) { _sink._sendData(data); } | 1233 void add(T data) { _sink._sendData(data); } |
1226 void addError(AsyncError error) { _sink._sendError(error); } | 1234 void addError(AsyncError error) { _sink._sendError(error); } |
1227 void close() { _sink._sendDone(); } | 1235 void close() { _sink._sendDone(); } |
1228 } | 1236 } |
OLD | NEW |