OLD | NEW |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 /** | 7 /** |
8 * Wraps an [_EventSink] so it exposes only the [EventSink] interface. | 8 * Wraps an [_EventSink] so it exposes only the [EventSink] interface. |
9 */ | 9 */ |
10 class _EventSinkWrapper<T> implements EventSink<T> { | 10 class _EventSinkWrapper<T> implements EventSink<T> { |
(...skipping 10 matching lines...) Expand all Loading... |
21 /** | 21 /** |
22 * A StreamSubscription that pipes data through a sink. | 22 * A StreamSubscription that pipes data through a sink. |
23 * | 23 * |
24 * The constructor of this class takes a [_SinkMapper] which maps from | 24 * The constructor of this class takes a [_SinkMapper] which maps from |
25 * [EventSink] to [EventSink]. The input to the mapper is the output of | 25 * [EventSink] to [EventSink]. The input to the mapper is the output of |
26 * the transformation. The returned sink is the transformation's input. | 26 * the transformation. The returned sink is the transformation's input. |
27 */ | 27 */ |
28 class _SinkTransformerStreamSubscription<S, T> | 28 class _SinkTransformerStreamSubscription<S, T> |
29 extends _BufferingStreamSubscription<T> { | 29 extends _BufferingStreamSubscription<T> { |
30 /// The transformer's input sink. | 30 /// The transformer's input sink. |
31 EventSink _transformerSink; | 31 EventSink<S> _transformerSink; |
32 | 32 |
33 /// The subscription to the input stream. | 33 /// The subscription to the input stream. |
34 StreamSubscription<S> _subscription; | 34 StreamSubscription<S> _subscription; |
35 | 35 |
36 _SinkTransformerStreamSubscription(Stream<S> source, | 36 _SinkTransformerStreamSubscription(Stream<S> source, |
37 _SinkMapper<S, T> mapper, | 37 _SinkMapper<S, T> mapper, |
38 void onData(T data), | 38 void onData(T data), |
39 Function onError, | 39 Function onError, |
40 void onDone(), | 40 void onDone(), |
41 bool cancelOnError) | 41 bool cancelOnError) |
(...skipping 134 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
176 | 176 |
177 bool get isBroadcast => _stream.isBroadcast; | 177 bool get isBroadcast => _stream.isBroadcast; |
178 | 178 |
179 _BoundSinkStream(this._stream, this._sinkMapper); | 179 _BoundSinkStream(this._stream, this._sinkMapper); |
180 | 180 |
181 StreamSubscription<T> listen(void onData(T event), | 181 StreamSubscription<T> listen(void onData(T event), |
182 { Function onError, | 182 { Function onError, |
183 void onDone(), | 183 void onDone(), |
184 bool cancelOnError }) { | 184 bool cancelOnError }) { |
185 cancelOnError = identical(true, cancelOnError); | 185 cancelOnError = identical(true, cancelOnError); |
186 StreamSubscription<T> subscription = new _SinkTransformerStreamSubscription( | 186 StreamSubscription<T> subscription = |
187 _stream, _sinkMapper, onData, onError, onDone, cancelOnError); | 187 new _SinkTransformerStreamSubscription<S, T>( |
| 188 _stream, _sinkMapper, onData, onError, onDone, cancelOnError); |
188 return subscription; | 189 return subscription; |
189 } | 190 } |
190 } | 191 } |
191 | 192 |
192 /// Data-handler coming from [StreamTransformer.fromHandlers]. | 193 /// Data-handler coming from [StreamTransformer.fromHandlers]. |
193 typedef void _TransformDataHandler<S, T>(S data, EventSink<T> sink); | 194 typedef void _TransformDataHandler<S, T>(S data, EventSink<T> sink); |
194 /// Error-handler coming from [StreamTransformer.fromHandlers]. | 195 /// Error-handler coming from [StreamTransformer.fromHandlers]. |
195 typedef void _TransformErrorHandler<T>( | 196 typedef void _TransformErrorHandler<T>( |
196 Object error, StackTrace stackTrace, EventSink<T> sink); | 197 Object error, StackTrace stackTrace, EventSink<T> sink); |
197 /// Done-handler coming from [StreamTransformer.fromHandlers]. | 198 /// Done-handler coming from [StreamTransformer.fromHandlers]. |
198 typedef void _TransformDoneHandler<T>(EventSink<T> sink); | 199 typedef void _TransformDoneHandler<T>(EventSink<T> sink); |
199 | 200 |
200 /** | 201 /** |
201 * Wraps handlers (from [StreamTransformer.fromHandlers]) into an `EventSink`. | 202 * Wraps handlers (from [StreamTransformer.fromHandlers]) into an `EventSink`. |
202 * | 203 * |
203 * This way we can reuse the code from [_StreamSinkTransformer]. | 204 * This way we can reuse the code from [_StreamSinkTransformer]. |
204 */ | 205 */ |
205 class _HandlerEventSink<S, T> implements EventSink<S> { | 206 class _HandlerEventSink<S, T> implements EventSink<S> { |
206 final _TransformDataHandler<S, T> _handleData; | 207 final _TransformDataHandler<S, T> _handleData; |
207 final _TransformErrorHandler<T> _handleError; | 208 final _TransformErrorHandler<T> _handleError; |
208 final _TransformDoneHandler<T> _handleDone; | 209 final _TransformDoneHandler<T> _handleDone; |
209 | 210 |
210 /// The output sink where the handlers should send their data into. | 211 /// The output sink where the handlers should send their data into. |
211 final EventSink<T> _sink; | 212 final EventSink<T> _sink; |
212 | 213 |
213 _HandlerEventSink(this._handleData, this._handleError, this._handleDone, | 214 _HandlerEventSink(this._handleData, this._handleError, this._handleDone, |
214 this._sink); | 215 this._sink); |
215 | 216 |
216 void add(S data) => _handleData(data, _sink); | 217 void add(S data) { _handleData(data, _sink); } |
217 void addError(Object error, [StackTrace stackTrace]) | 218 void addError(Object error, [StackTrace stackTrace]) { |
218 => _handleError(error, stackTrace, _sink); | 219 _handleError(error, stackTrace, _sink); |
219 void close() => _handleDone(_sink); | 220 } |
| 221 void close() { _handleDone(_sink); } |
220 } | 222 } |
221 | 223 |
222 /** | 224 /** |
223 * A StreamTransformer that transformers events with the given handlers. | 225 * A StreamTransformer that transformers events with the given handlers. |
224 * | 226 * |
225 * Note that this transformer can only be used once. | 227 * Note that this transformer can only be used once. |
226 */ | 228 */ |
227 class _StreamHandlerTransformer<S, T> extends _StreamSinkTransformer<S, T> { | 229 class _StreamHandlerTransformer<S, T> extends _StreamSinkTransformer<S, T> { |
228 | 230 |
229 _StreamHandlerTransformer({ | 231 _StreamHandlerTransformer({ |
(...skipping 13 matching lines...) Expand all Loading... |
243 } | 245 } |
244 | 246 |
245 /** Default data handler forwards all data. */ | 247 /** Default data handler forwards all data. */ |
246 static void _defaultHandleData(var data, EventSink sink) { | 248 static void _defaultHandleData(var data, EventSink sink) { |
247 sink.add(data); | 249 sink.add(data); |
248 } | 250 } |
249 | 251 |
250 /** Default error handler forwards all errors. */ | 252 /** Default error handler forwards all errors. */ |
251 static void _defaultHandleError(error, StackTrace stackTrace, | 253 static void _defaultHandleError(error, StackTrace stackTrace, |
252 EventSink sink) { | 254 EventSink sink) { |
253 sink.addError(error); | 255 sink.addError(error, stackTrace); |
254 } | 256 } |
255 | 257 |
256 /** Default done handler forwards done. */ | 258 /** Default done handler forwards done. */ |
257 static void _defaultHandleDone(EventSink sink) { | 259 static void _defaultHandleDone(EventSink sink) { |
258 sink.close(); | 260 sink.close(); |
259 } | 261 } |
260 } | 262 } |
261 | 263 |
262 /// A closure mapping a stream and cancelOnError to a StreamSubscription. | 264 /// A closure mapping a stream and cancelOnError to a StreamSubscription. |
263 typedef StreamSubscription<T> _SubscriptionTransformer<S, T>( | 265 typedef StreamSubscription<T> _SubscriptionTransformer<S, T>( |
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
302 void onDone(), | 304 void onDone(), |
303 bool cancelOnError }) { | 305 bool cancelOnError }) { |
304 cancelOnError = identical(true, cancelOnError); | 306 cancelOnError = identical(true, cancelOnError); |
305 StreamSubscription<T> result = _transformer(_stream, cancelOnError); | 307 StreamSubscription<T> result = _transformer(_stream, cancelOnError); |
306 result.onData(onData); | 308 result.onData(onData); |
307 result.onError(onError); | 309 result.onError(onError); |
308 result.onDone(onDone); | 310 result.onDone(onDone); |
309 return result; | 311 return result; |
310 } | 312 } |
311 } | 313 } |
OLD | NEW |