| OLD | NEW |
| 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 /** | 7 /** |
| 8 * Wraps an [_EventSink] so it exposes only the [EventSink] interface. | 8 * Wraps an [_EventSink] so it exposes only the [EventSink] interface. |
| 9 */ | 9 */ |
| 10 class _EventSinkWrapper<T> implements EventSink<T> { | 10 class _EventSinkWrapper<T> implements EventSink<T> { |
| 11 _EventSink _sink; | 11 _EventSink _sink; |
| 12 _EventSinkWrapper(this._sink); | 12 _EventSinkWrapper(this._sink); |
| 13 | 13 |
| 14 void add(T data) { _sink._add(data); } | 14 void add(T data) { |
| 15 _sink._add(data); |
| 16 } |
| 17 |
| 15 void addError(error, [StackTrace stackTrace]) { | 18 void addError(error, [StackTrace stackTrace]) { |
| 16 _sink._addError(error, stackTrace); | 19 _sink._addError(error, stackTrace); |
| 17 } | 20 } |
| 18 void close() { _sink._close(); } | 21 |
| 22 void close() { |
| 23 _sink._close(); |
| 24 } |
| 19 } | 25 } |
| 20 | 26 |
| 21 /** | 27 /** |
| 22 * A StreamSubscription that pipes data through a sink. | 28 * A StreamSubscription that pipes data through a sink. |
| 23 * | 29 * |
| 24 * The constructor of this class takes a [_SinkMapper] which maps from | 30 * The constructor of this class takes a [_SinkMapper] which maps from |
| 25 * [EventSink] to [EventSink]. The input to the mapper is the output of | 31 * [EventSink] to [EventSink]. The input to the mapper is the output of |
| 26 * the transformation. The returned sink is the transformation's input. | 32 * the transformation. The returned sink is the transformation's input. |
| 27 */ | 33 */ |
| 28 class _SinkTransformerStreamSubscription<S, T> | 34 class _SinkTransformerStreamSubscription<S, T> |
| 29 extends _BufferingStreamSubscription<T> { | 35 extends _BufferingStreamSubscription<T> { |
| 30 /// The transformer's input sink. | 36 /// The transformer's input sink. |
| 31 EventSink<S> _transformerSink; | 37 EventSink<S> _transformerSink; |
| 32 | 38 |
| 33 /// The subscription to the input stream. | 39 /// The subscription to the input stream. |
| 34 StreamSubscription<S> _subscription; | 40 StreamSubscription<S> _subscription; |
| 35 | 41 |
| 36 _SinkTransformerStreamSubscription(Stream<S> source, | 42 _SinkTransformerStreamSubscription(Stream<S> source, _SinkMapper<S, T> mapper, |
| 37 _SinkMapper<S, T> mapper, | 43 void onData(T data), Function onError, void onDone(), bool cancelOnError) |
| 38 void onData(T data), | |
| 39 Function onError, | |
| 40 void onDone(), | |
| 41 bool cancelOnError) | |
| 42 // We set the adapter's target only when the user is allowed to send data. | 44 // We set the adapter's target only when the user is allowed to send data. |
| 43 : super(onData, onError, onDone, cancelOnError) { | 45 : super(onData, onError, onDone, cancelOnError) { |
| 44 _EventSinkWrapper<T> eventSink = new _EventSinkWrapper<T>(this); | 46 _EventSinkWrapper<T> eventSink = new _EventSinkWrapper<T>(this); |
| 45 _transformerSink = mapper(eventSink); | 47 _transformerSink = mapper(eventSink); |
| 46 _subscription = source.listen(_handleData, | 48 _subscription = |
| 47 onError: _handleError, | 49 source.listen(_handleData, onError: _handleError, onDone: _handleDone); |
| 48 onDone: _handleDone); | |
| 49 } | 50 } |
| 50 | 51 |
| 51 /** Whether this subscription is still subscribed to its source. */ | 52 /** Whether this subscription is still subscribed to its source. */ |
| 52 bool get _isSubscribed => _subscription != null; | 53 bool get _isSubscribed => _subscription != null; |
| 53 | 54 |
| 54 // _EventSink interface. | 55 // _EventSink interface. |
| 55 | 56 |
| 56 /** | 57 /** |
| 57 * Adds an event to this subscriptions. | 58 * Adds an event to this subscriptions. |
| 58 * | 59 * |
| (...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 137 void _handleDone() { | 138 void _handleDone() { |
| 138 try { | 139 try { |
| 139 _subscription = null; | 140 _subscription = null; |
| 140 _transformerSink.close(); | 141 _transformerSink.close(); |
| 141 } catch (e, s) { | 142 } catch (e, s) { |
| 142 _addError(e, s); | 143 _addError(e, s); |
| 143 } | 144 } |
| 144 } | 145 } |
| 145 } | 146 } |
| 146 | 147 |
| 147 | |
| 148 typedef EventSink<S> _SinkMapper<S, T>(EventSink<T> output); | 148 typedef EventSink<S> _SinkMapper<S, T>(EventSink<T> output); |
| 149 | 149 |
| 150 /** | 150 /** |
| 151 * A StreamTransformer for Sink-mappers. | 151 * A StreamTransformer for Sink-mappers. |
| 152 * | 152 * |
| 153 * A Sink-mapper takes an [EventSink] (its output) and returns another | 153 * A Sink-mapper takes an [EventSink] (its output) and returns another |
| 154 * EventSink (its input). | 154 * EventSink (its input). |
| 155 * | 155 * |
| 156 * Note that this class can be `const`. | 156 * Note that this class can be `const`. |
| 157 */ | 157 */ |
| 158 class _StreamSinkTransformer<S, T> implements StreamTransformer<S, T> { | 158 class _StreamSinkTransformer<S, T> implements StreamTransformer<S, T> { |
| 159 final _SinkMapper<S, T> _sinkMapper; | 159 final _SinkMapper<S, T> _sinkMapper; |
| 160 const _StreamSinkTransformer(this._sinkMapper); | 160 const _StreamSinkTransformer(this._sinkMapper); |
| 161 | 161 |
| 162 Stream<T> bind(Stream<S> stream) | 162 Stream<T> bind(Stream<S> stream) => |
| 163 => new _BoundSinkStream<S, T>(stream, _sinkMapper); | 163 new _BoundSinkStream<S, T>(stream, _sinkMapper); |
| 164 } | 164 } |
| 165 | 165 |
| 166 /** | 166 /** |
| 167 * The result of binding a StreamTransformer for Sink-mappers. | 167 * The result of binding a StreamTransformer for Sink-mappers. |
| 168 * | 168 * |
| 169 * It contains the bound Stream and the sink-mapper. Only when the user starts | 169 * It contains the bound Stream and the sink-mapper. Only when the user starts |
| 170 * listening to this stream is the sink-mapper invoked. The result is used | 170 * listening to this stream is the sink-mapper invoked. The result is used |
| 171 * to create a StreamSubscription that transforms events. | 171 * to create a StreamSubscription that transforms events. |
| 172 */ | 172 */ |
| 173 class _BoundSinkStream<S, T> extends Stream<T> { | 173 class _BoundSinkStream<S, T> extends Stream<T> { |
| 174 final _SinkMapper<S, T> _sinkMapper; | 174 final _SinkMapper<S, T> _sinkMapper; |
| 175 final Stream<S> _stream; | 175 final Stream<S> _stream; |
| 176 | 176 |
| 177 bool get isBroadcast => _stream.isBroadcast; | 177 bool get isBroadcast => _stream.isBroadcast; |
| 178 | 178 |
| 179 _BoundSinkStream(this._stream, this._sinkMapper); | 179 _BoundSinkStream(this._stream, this._sinkMapper); |
| 180 | 180 |
| 181 StreamSubscription<T> listen(void onData(T event), | 181 StreamSubscription<T> listen(void onData(T event), |
| 182 { Function onError, | 182 {Function onError, void onDone(), bool cancelOnError}) { |
| 183 void onDone(), | |
| 184 bool cancelOnError }) { | |
| 185 cancelOnError = identical(true, cancelOnError); | 183 cancelOnError = identical(true, cancelOnError); |
| 186 StreamSubscription<T> subscription = | 184 StreamSubscription<T> subscription = |
| 187 new _SinkTransformerStreamSubscription<S, T>( | 185 new _SinkTransformerStreamSubscription<S, T>( |
| 188 _stream, _sinkMapper, onData, onError, onDone, cancelOnError); | 186 _stream, _sinkMapper, onData, onError, onDone, cancelOnError); |
| 189 return subscription; | 187 return subscription; |
| 190 } | 188 } |
| 191 } | 189 } |
| 192 | 190 |
| 193 /// Data-handler coming from [StreamTransformer.fromHandlers]. | 191 /// Data-handler coming from [StreamTransformer.fromHandlers]. |
| 194 typedef void _TransformDataHandler<S, T>(S data, EventSink<T> sink); | 192 typedef void _TransformDataHandler<S, T>(S data, EventSink<T> sink); |
| 193 |
| 195 /// Error-handler coming from [StreamTransformer.fromHandlers]. | 194 /// Error-handler coming from [StreamTransformer.fromHandlers]. |
| 196 typedef void _TransformErrorHandler<T>( | 195 typedef void _TransformErrorHandler<T>( |
| 197 Object error, StackTrace stackTrace, EventSink<T> sink); | 196 Object error, StackTrace stackTrace, EventSink<T> sink); |
| 197 |
| 198 /// Done-handler coming from [StreamTransformer.fromHandlers]. | 198 /// Done-handler coming from [StreamTransformer.fromHandlers]. |
| 199 typedef void _TransformDoneHandler<T>(EventSink<T> sink); | 199 typedef void _TransformDoneHandler<T>(EventSink<T> sink); |
| 200 | 200 |
| 201 /** | 201 /** |
| 202 * Wraps handlers (from [StreamTransformer.fromHandlers]) into an `EventSink`. | 202 * Wraps handlers (from [StreamTransformer.fromHandlers]) into an `EventSink`. |
| 203 * | 203 * |
| 204 * This way we can reuse the code from [_StreamSinkTransformer]. | 204 * This way we can reuse the code from [_StreamSinkTransformer]. |
| 205 */ | 205 */ |
| 206 class _HandlerEventSink<S, T> implements EventSink<S> { | 206 class _HandlerEventSink<S, T> implements EventSink<S> { |
| 207 final _TransformDataHandler<S, T> _handleData; | 207 final _TransformDataHandler<S, T> _handleData; |
| 208 final _TransformErrorHandler<T> _handleError; | 208 final _TransformErrorHandler<T> _handleError; |
| 209 final _TransformDoneHandler<T> _handleDone; | 209 final _TransformDoneHandler<T> _handleDone; |
| 210 | 210 |
| 211 /// The output sink where the handlers should send their data into. | 211 /// The output sink where the handlers should send their data into. |
| 212 final EventSink<T> _sink; | 212 final EventSink<T> _sink; |
| 213 | 213 |
| 214 _HandlerEventSink(this._handleData, this._handleError, this._handleDone, | 214 _HandlerEventSink( |
| 215 this._sink); | 215 this._handleData, this._handleError, this._handleDone, this._sink); |
| 216 | 216 |
| 217 void add(S data) { _handleData(data, _sink); } | 217 void add(S data) { |
| 218 _handleData(data, _sink); |
| 219 } |
| 220 |
| 218 void addError(Object error, [StackTrace stackTrace]) { | 221 void addError(Object error, [StackTrace stackTrace]) { |
| 219 _handleError(error, stackTrace, _sink); | 222 _handleError(error, stackTrace, _sink); |
| 220 } | 223 } |
| 221 void close() { _handleDone(_sink); } | 224 |
| 225 void close() { |
| 226 _handleDone(_sink); |
| 227 } |
| 222 } | 228 } |
| 223 | 229 |
| 224 /** | 230 /** |
| 225 * A StreamTransformer that transformers events with the given handlers. | 231 * A StreamTransformer that transformers events with the given handlers. |
| 226 * | 232 * |
| 227 * Note that this transformer can only be used once. | 233 * Note that this transformer can only be used once. |
| 228 */ | 234 */ |
| 229 class _StreamHandlerTransformer<S, T> extends _StreamSinkTransformer<S, T> { | 235 class _StreamHandlerTransformer<S, T> extends _StreamSinkTransformer<S, T> { |
| 230 | 236 _StreamHandlerTransformer( |
| 231 _StreamHandlerTransformer({ | 237 {void handleData(S data, EventSink<T> sink), |
| 232 void handleData(S data, EventSink<T> sink), | |
| 233 void handleError(Object error, StackTrace stackTrace, EventSink<T> sink), | 238 void handleError(Object error, StackTrace stackTrace, EventSink<T> sink), |
| 234 void handleDone(EventSink<T> sink)}) | 239 void handleDone(EventSink<T> sink)}) |
| 235 : super((EventSink<T> outputSink) { | 240 : super((EventSink<T> outputSink) { |
| 236 if (handleData == null) handleData = _defaultHandleData; | 241 if (handleData == null) handleData = _defaultHandleData; |
| 237 if (handleError == null) handleError = _defaultHandleError; | 242 if (handleError == null) handleError = _defaultHandleError; |
| 238 if (handleDone == null) handleDone = _defaultHandleDone; | 243 if (handleDone == null) handleDone = _defaultHandleDone; |
| 239 return new _HandlerEventSink<S, T>( | 244 return new _HandlerEventSink<S, T>( |
| 240 handleData, handleError, handleDone, outputSink); | 245 handleData, handleError, handleDone, outputSink); |
| 241 }); | 246 }); |
| 242 | 247 |
| 243 Stream<T> bind(Stream<S> stream) { | 248 Stream<T> bind(Stream<S> stream) { |
| 244 return super.bind(stream); | 249 return super.bind(stream); |
| 245 } | 250 } |
| 246 | 251 |
| 247 /** Default data handler forwards all data. */ | 252 /** Default data handler forwards all data. */ |
| 248 static void _defaultHandleData(var data, EventSink sink) { | 253 static void _defaultHandleData(var data, EventSink sink) { |
| 249 sink.add(data); | 254 sink.add(data); |
| 250 } | 255 } |
| 251 | 256 |
| 252 /** Default error handler forwards all errors. */ | 257 /** Default error handler forwards all errors. */ |
| 253 static void _defaultHandleError(error, StackTrace stackTrace, | 258 static void _defaultHandleError( |
| 254 EventSink sink) { | 259 error, StackTrace stackTrace, EventSink sink) { |
| 255 sink.addError(error, stackTrace); | 260 sink.addError(error, stackTrace); |
| 256 } | 261 } |
| 257 | 262 |
| 258 /** Default done handler forwards done. */ | 263 /** Default done handler forwards done. */ |
| 259 static void _defaultHandleDone(EventSink sink) { | 264 static void _defaultHandleDone(EventSink sink) { |
| 260 sink.close(); | 265 sink.close(); |
| 261 } | 266 } |
| 262 } | 267 } |
| 263 | 268 |
| 264 /// A closure mapping a stream and cancelOnError to a StreamSubscription. | 269 /// A closure mapping a stream and cancelOnError to a StreamSubscription. |
| (...skipping 28 matching lines...) Expand all Loading... |
| 293 * the stored [_stream]. Usually the transformer starts listening at this | 298 * the stored [_stream]. Usually the transformer starts listening at this |
| 294 * moment. | 299 * moment. |
| 295 */ | 300 */ |
| 296 class _BoundSubscriptionStream<S, T> extends Stream<T> { | 301 class _BoundSubscriptionStream<S, T> extends Stream<T> { |
| 297 final _SubscriptionTransformer<S, T> _transformer; | 302 final _SubscriptionTransformer<S, T> _transformer; |
| 298 final Stream<S> _stream; | 303 final Stream<S> _stream; |
| 299 | 304 |
| 300 _BoundSubscriptionStream(this._stream, this._transformer); | 305 _BoundSubscriptionStream(this._stream, this._transformer); |
| 301 | 306 |
| 302 StreamSubscription<T> listen(void onData(T event), | 307 StreamSubscription<T> listen(void onData(T event), |
| 303 { Function onError, | 308 {Function onError, void onDone(), bool cancelOnError}) { |
| 304 void onDone(), | |
| 305 bool cancelOnError }) { | |
| 306 cancelOnError = identical(true, cancelOnError); | 309 cancelOnError = identical(true, cancelOnError); |
| 307 StreamSubscription<T> result = _transformer(_stream, cancelOnError); | 310 StreamSubscription<T> result = _transformer(_stream, cancelOnError); |
| 308 result.onData(onData); | 311 result.onData(onData); |
| 309 result.onError(onError); | 312 result.onError(onError); |
| 310 result.onDone(onDone); | 313 result.onDone(onDone); |
| 311 return result; | 314 return result; |
| 312 } | 315 } |
| 313 } | 316 } |
| OLD | NEW |