| OLD | NEW |
| 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 /** | 7 /** |
| 8 * Wraps an [_EventSink] so it exposes only the [EventSink] interface. | 8 * Wraps an [_EventSink] so it exposes only the [EventSink] interface. |
| 9 */ | 9 */ |
| 10 class _EventSinkWrapper<T> implements EventSink<T> { | 10 class _EventSinkWrapper<T> implements EventSink<T> { |
| (...skipping 10 matching lines...) Expand all Loading... |
| 21 /** | 21 /** |
| 22 * A StreamSubscription that pipes data through a sink. | 22 * A StreamSubscription that pipes data through a sink. |
| 23 * | 23 * |
| 24 * The constructor of this class takes a [_SinkMapper] which maps from | 24 * The constructor of this class takes a [_SinkMapper] which maps from |
| 25 * [EventSink] to [EventSink]. The input to the mapper is the output of | 25 * [EventSink] to [EventSink]. The input to the mapper is the output of |
| 26 * the transformation. The returned sink is the transformation's input. | 26 * the transformation. The returned sink is the transformation's input. |
| 27 */ | 27 */ |
| 28 class _SinkTransformerStreamSubscription<S, T> | 28 class _SinkTransformerStreamSubscription<S, T> |
| 29 extends _BufferingStreamSubscription<T> { | 29 extends _BufferingStreamSubscription<T> { |
| 30 /// The transformer's input sink. | 30 /// The transformer's input sink. |
| 31 EventSink _transformerSink; | 31 EventSink<S> _transformerSink; |
| 32 | 32 |
| 33 /// The subscription to the input stream. | 33 /// The subscription to the input stream. |
| 34 StreamSubscription<S> _subscription; | 34 StreamSubscription<S> _subscription; |
| 35 | 35 |
| 36 _SinkTransformerStreamSubscription(Stream<S> source, | 36 _SinkTransformerStreamSubscription(Stream<S> source, |
| 37 _SinkMapper<S, T> mapper, | 37 _SinkMapper<S, T> mapper, |
| 38 void onData(T data), | 38 void onData(T data), |
| 39 Function onError, | 39 Function onError, |
| 40 void onDone(), | 40 void onDone(), |
| 41 bool cancelOnError) | 41 bool cancelOnError) |
| (...skipping 134 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 176 | 176 |
| 177 bool get isBroadcast => _stream.isBroadcast; | 177 bool get isBroadcast => _stream.isBroadcast; |
| 178 | 178 |
| 179 _BoundSinkStream(this._stream, this._sinkMapper); | 179 _BoundSinkStream(this._stream, this._sinkMapper); |
| 180 | 180 |
| 181 StreamSubscription<T> listen(void onData(T event), | 181 StreamSubscription<T> listen(void onData(T event), |
| 182 { Function onError, | 182 { Function onError, |
| 183 void onDone(), | 183 void onDone(), |
| 184 bool cancelOnError }) { | 184 bool cancelOnError }) { |
| 185 cancelOnError = identical(true, cancelOnError); | 185 cancelOnError = identical(true, cancelOnError); |
| 186 StreamSubscription<T> subscription = new _SinkTransformerStreamSubscription( | 186 StreamSubscription<T> subscription = |
| 187 _stream, _sinkMapper, onData, onError, onDone, cancelOnError); | 187 new _SinkTransformerStreamSubscription<S, T>( |
| 188 _stream, _sinkMapper, onData, onError, onDone, cancelOnError); |
| 188 return subscription; | 189 return subscription; |
| 189 } | 190 } |
| 190 } | 191 } |
| 191 | 192 |
| 192 /// Data-handler coming from [StreamTransformer.fromHandlers]. | 193 /// Data-handler coming from [StreamTransformer.fromHandlers]. |
| 193 typedef void _TransformDataHandler<S, T>(S data, EventSink<T> sink); | 194 typedef void _TransformDataHandler<S, T>(S data, EventSink<T> sink); |
| 194 /// Error-handler coming from [StreamTransformer.fromHandlers]. | 195 /// Error-handler coming from [StreamTransformer.fromHandlers]. |
| 195 typedef void _TransformErrorHandler<T>( | 196 typedef void _TransformErrorHandler<T>( |
| 196 Object error, StackTrace stackTrace, EventSink<T> sink); | 197 Object error, StackTrace stackTrace, EventSink<T> sink); |
| 197 /// Done-handler coming from [StreamTransformer.fromHandlers]. | 198 /// Done-handler coming from [StreamTransformer.fromHandlers]. |
| 198 typedef void _TransformDoneHandler<T>(EventSink<T> sink); | 199 typedef void _TransformDoneHandler<T>(EventSink<T> sink); |
| 199 | 200 |
| 200 /** | 201 /** |
| 201 * Wraps handlers (from [StreamTransformer.fromHandlers]) into an `EventSink`. | 202 * Wraps handlers (from [StreamTransformer.fromHandlers]) into an `EventSink`. |
| 202 * | 203 * |
| 203 * This way we can reuse the code from [_StreamSinkTransformer]. | 204 * This way we can reuse the code from [_StreamSinkTransformer]. |
| 204 */ | 205 */ |
| 205 class _HandlerEventSink<S, T> implements EventSink<S> { | 206 class _HandlerEventSink<S, T> implements EventSink<S> { |
| 206 final _TransformDataHandler<S, T> _handleData; | 207 final _TransformDataHandler<S, T> _handleData; |
| 207 final _TransformErrorHandler<T> _handleError; | 208 final _TransformErrorHandler<T> _handleError; |
| 208 final _TransformDoneHandler<T> _handleDone; | 209 final _TransformDoneHandler<T> _handleDone; |
| 209 | 210 |
| 210 /// The output sink where the handlers should send their data into. | 211 /// The output sink where the handlers should send their data into. |
| 211 final EventSink<T> _sink; | 212 final EventSink<T> _sink; |
| 212 | 213 |
| 213 _HandlerEventSink(this._handleData, this._handleError, this._handleDone, | 214 _HandlerEventSink(this._handleData, this._handleError, this._handleDone, |
| 214 this._sink); | 215 this._sink); |
| 215 | 216 |
| 216 void add(S data) => _handleData(data, _sink); | 217 void add(S data) { _handleData(data, _sink); } |
| 217 void addError(Object error, [StackTrace stackTrace]) | 218 void addError(Object error, [StackTrace stackTrace]) { |
| 218 => _handleError(error, stackTrace, _sink); | 219 _handleError(error, stackTrace, _sink); |
| 219 void close() => _handleDone(_sink); | 220 } |
| 221 void close() { _handleDone(_sink); } |
| 220 } | 222 } |
| 221 | 223 |
| 222 /** | 224 /** |
| 223 * A StreamTransformer that transformers events with the given handlers. | 225 * A StreamTransformer that transformers events with the given handlers. |
| 224 * | 226 * |
| 225 * Note that this transformer can only be used once. | 227 * Note that this transformer can only be used once. |
| 226 */ | 228 */ |
| 227 class _StreamHandlerTransformer<S, T> extends _StreamSinkTransformer<S, T> { | 229 class _StreamHandlerTransformer<S, T> extends _StreamSinkTransformer<S, T> { |
| 228 | 230 |
| 229 _StreamHandlerTransformer({ | 231 _StreamHandlerTransformer({ |
| (...skipping 13 matching lines...) Expand all Loading... |
| 243 } | 245 } |
| 244 | 246 |
| 245 /** Default data handler forwards all data. */ | 247 /** Default data handler forwards all data. */ |
| 246 static void _defaultHandleData(var data, EventSink sink) { | 248 static void _defaultHandleData(var data, EventSink sink) { |
| 247 sink.add(data); | 249 sink.add(data); |
| 248 } | 250 } |
| 249 | 251 |
| 250 /** Default error handler forwards all errors. */ | 252 /** Default error handler forwards all errors. */ |
| 251 static void _defaultHandleError(error, StackTrace stackTrace, | 253 static void _defaultHandleError(error, StackTrace stackTrace, |
| 252 EventSink sink) { | 254 EventSink sink) { |
| 253 sink.addError(error); | 255 sink.addError(error, stackTrace); |
| 254 } | 256 } |
| 255 | 257 |
| 256 /** Default done handler forwards done. */ | 258 /** Default done handler forwards done. */ |
| 257 static void _defaultHandleDone(EventSink sink) { | 259 static void _defaultHandleDone(EventSink sink) { |
| 258 sink.close(); | 260 sink.close(); |
| 259 } | 261 } |
| 260 } | 262 } |
| 261 | 263 |
| 262 /// A closure mapping a stream and cancelOnError to a StreamSubscription. | 264 /// A closure mapping a stream and cancelOnError to a StreamSubscription. |
| 263 typedef StreamSubscription<T> _SubscriptionTransformer<S, T>( | 265 typedef StreamSubscription<T> _SubscriptionTransformer<S, T>( |
| (...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 302 void onDone(), | 304 void onDone(), |
| 303 bool cancelOnError }) { | 305 bool cancelOnError }) { |
| 304 cancelOnError = identical(true, cancelOnError); | 306 cancelOnError = identical(true, cancelOnError); |
| 305 StreamSubscription<T> result = _transformer(_stream, cancelOnError); | 307 StreamSubscription<T> result = _transformer(_stream, cancelOnError); |
| 306 result.onData(onData); | 308 result.onData(onData); |
| 307 result.onError(onError); | 309 result.onError(onError); |
| 308 result.onDone(onDone); | 310 result.onDone(onDone); |
| 309 return result; | 311 return result; |
| 310 } | 312 } |
| 311 } | 313 } |
| OLD | NEW |