OLD | NEW |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 /** | 7 /** |
8 * Wraps an [_EventSink] so it exposes only the [EventSink] interface. | 8 * Wraps an [_EventSink] so it exposes only the [EventSink] interface. |
9 */ | 9 */ |
10 class _EventSinkWrapper<T> implements EventSink<T> { | 10 class _EventSinkWrapper<T> implements EventSink<T> { |
11 _EventSink _sink; | 11 _EventSink _sink; |
12 _EventSinkWrapper(this._sink); | 12 _EventSinkWrapper(this._sink); |
13 | 13 |
14 void add(T data) { _sink._add(data); } | 14 void add(T data) { |
| 15 _sink._add(data); |
| 16 } |
| 17 |
15 void addError(error, [StackTrace stackTrace]) { | 18 void addError(error, [StackTrace stackTrace]) { |
16 _sink._addError(error, stackTrace); | 19 _sink._addError(error, stackTrace); |
17 } | 20 } |
18 void close() { _sink._close(); } | 21 |
| 22 void close() { |
| 23 _sink._close(); |
| 24 } |
19 } | 25 } |
20 | 26 |
21 /** | 27 /** |
22 * A StreamSubscription that pipes data through a sink. | 28 * A StreamSubscription that pipes data through a sink. |
23 * | 29 * |
24 * The constructor of this class takes a [_SinkMapper] which maps from | 30 * The constructor of this class takes a [_SinkMapper] which maps from |
25 * [EventSink] to [EventSink]. The input to the mapper is the output of | 31 * [EventSink] to [EventSink]. The input to the mapper is the output of |
26 * the transformation. The returned sink is the transformation's input. | 32 * the transformation. The returned sink is the transformation's input. |
27 */ | 33 */ |
28 class _SinkTransformerStreamSubscription<S, T> | 34 class _SinkTransformerStreamSubscription<S, T> |
29 extends _BufferingStreamSubscription<T> { | 35 extends _BufferingStreamSubscription<T> { |
30 /// The transformer's input sink. | 36 /// The transformer's input sink. |
31 EventSink<S> _transformerSink; | 37 EventSink<S> _transformerSink; |
32 | 38 |
33 /// The subscription to the input stream. | 39 /// The subscription to the input stream. |
34 StreamSubscription<S> _subscription; | 40 StreamSubscription<S> _subscription; |
35 | 41 |
36 _SinkTransformerStreamSubscription(Stream<S> source, | 42 _SinkTransformerStreamSubscription(Stream<S> source, _SinkMapper<S, T> mapper, |
37 _SinkMapper<S, T> mapper, | 43 void onData(T data), Function onError, void onDone(), bool cancelOnError) |
38 void onData(T data), | |
39 Function onError, | |
40 void onDone(), | |
41 bool cancelOnError) | |
42 // We set the adapter's target only when the user is allowed to send data. | 44 // We set the adapter's target only when the user is allowed to send data. |
43 : super(onData, onError, onDone, cancelOnError) { | 45 : super(onData, onError, onDone, cancelOnError) { |
44 _EventSinkWrapper<T> eventSink = new _EventSinkWrapper<T>(this); | 46 _EventSinkWrapper<T> eventSink = new _EventSinkWrapper<T>(this); |
45 _transformerSink = mapper(eventSink); | 47 _transformerSink = mapper(eventSink); |
46 _subscription = source.listen(_handleData, | 48 _subscription = |
47 onError: _handleError, | 49 source.listen(_handleData, onError: _handleError, onDone: _handleDone); |
48 onDone: _handleDone); | |
49 } | 50 } |
50 | 51 |
51 /** Whether this subscription is still subscribed to its source. */ | 52 /** Whether this subscription is still subscribed to its source. */ |
52 bool get _isSubscribed => _subscription != null; | 53 bool get _isSubscribed => _subscription != null; |
53 | 54 |
54 // _EventSink interface. | 55 // _EventSink interface. |
55 | 56 |
56 /** | 57 /** |
57 * Adds an event to this subscriptions. | 58 * Adds an event to this subscriptions. |
58 * | 59 * |
(...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
137 void _handleDone() { | 138 void _handleDone() { |
138 try { | 139 try { |
139 _subscription = null; | 140 _subscription = null; |
140 _transformerSink.close(); | 141 _transformerSink.close(); |
141 } catch (e, s) { | 142 } catch (e, s) { |
142 _addError(e, s); | 143 _addError(e, s); |
143 } | 144 } |
144 } | 145 } |
145 } | 146 } |
146 | 147 |
147 | |
148 typedef EventSink<S> _SinkMapper<S, T>(EventSink<T> output); | 148 typedef EventSink<S> _SinkMapper<S, T>(EventSink<T> output); |
149 | 149 |
150 /** | 150 /** |
151 * A StreamTransformer for Sink-mappers. | 151 * A StreamTransformer for Sink-mappers. |
152 * | 152 * |
153 * A Sink-mapper takes an [EventSink] (its output) and returns another | 153 * A Sink-mapper takes an [EventSink] (its output) and returns another |
154 * EventSink (its input). | 154 * EventSink (its input). |
155 * | 155 * |
156 * Note that this class can be `const`. | 156 * Note that this class can be `const`. |
157 */ | 157 */ |
158 class _StreamSinkTransformer<S, T> implements StreamTransformer<S, T> { | 158 class _StreamSinkTransformer<S, T> implements StreamTransformer<S, T> { |
159 final _SinkMapper<S, T> _sinkMapper; | 159 final _SinkMapper<S, T> _sinkMapper; |
160 const _StreamSinkTransformer(this._sinkMapper); | 160 const _StreamSinkTransformer(this._sinkMapper); |
161 | 161 |
162 Stream<T> bind(Stream<S> stream) | 162 Stream<T> bind(Stream<S> stream) => |
163 => new _BoundSinkStream<S, T>(stream, _sinkMapper); | 163 new _BoundSinkStream<S, T>(stream, _sinkMapper); |
164 } | 164 } |
165 | 165 |
166 /** | 166 /** |
167 * The result of binding a StreamTransformer for Sink-mappers. | 167 * The result of binding a StreamTransformer for Sink-mappers. |
168 * | 168 * |
169 * It contains the bound Stream and the sink-mapper. Only when the user starts | 169 * It contains the bound Stream and the sink-mapper. Only when the user starts |
170 * listening to this stream is the sink-mapper invoked. The result is used | 170 * listening to this stream is the sink-mapper invoked. The result is used |
171 * to create a StreamSubscription that transforms events. | 171 * to create a StreamSubscription that transforms events. |
172 */ | 172 */ |
173 class _BoundSinkStream<S, T> extends Stream<T> { | 173 class _BoundSinkStream<S, T> extends Stream<T> { |
174 final _SinkMapper<S, T> _sinkMapper; | 174 final _SinkMapper<S, T> _sinkMapper; |
175 final Stream<S> _stream; | 175 final Stream<S> _stream; |
176 | 176 |
177 bool get isBroadcast => _stream.isBroadcast; | 177 bool get isBroadcast => _stream.isBroadcast; |
178 | 178 |
179 _BoundSinkStream(this._stream, this._sinkMapper); | 179 _BoundSinkStream(this._stream, this._sinkMapper); |
180 | 180 |
181 StreamSubscription<T> listen(void onData(T event), | 181 StreamSubscription<T> listen(void onData(T event), |
182 { Function onError, | 182 {Function onError, void onDone(), bool cancelOnError}) { |
183 void onDone(), | |
184 bool cancelOnError }) { | |
185 cancelOnError = identical(true, cancelOnError); | 183 cancelOnError = identical(true, cancelOnError); |
186 StreamSubscription<T> subscription = | 184 StreamSubscription<T> subscription = |
187 new _SinkTransformerStreamSubscription<S, T>( | 185 new _SinkTransformerStreamSubscription<S, T>( |
188 _stream, _sinkMapper, onData, onError, onDone, cancelOnError); | 186 _stream, _sinkMapper, onData, onError, onDone, cancelOnError); |
189 return subscription; | 187 return subscription; |
190 } | 188 } |
191 } | 189 } |
192 | 190 |
193 /// Data-handler coming from [StreamTransformer.fromHandlers]. | 191 /// Data-handler coming from [StreamTransformer.fromHandlers]. |
194 typedef void _TransformDataHandler<S, T>(S data, EventSink<T> sink); | 192 typedef void _TransformDataHandler<S, T>(S data, EventSink<T> sink); |
| 193 |
195 /// Error-handler coming from [StreamTransformer.fromHandlers]. | 194 /// Error-handler coming from [StreamTransformer.fromHandlers]. |
196 typedef void _TransformErrorHandler<T>( | 195 typedef void _TransformErrorHandler<T>( |
197 Object error, StackTrace stackTrace, EventSink<T> sink); | 196 Object error, StackTrace stackTrace, EventSink<T> sink); |
| 197 |
198 /// Done-handler coming from [StreamTransformer.fromHandlers]. | 198 /// Done-handler coming from [StreamTransformer.fromHandlers]. |
199 typedef void _TransformDoneHandler<T>(EventSink<T> sink); | 199 typedef void _TransformDoneHandler<T>(EventSink<T> sink); |
200 | 200 |
201 /** | 201 /** |
202 * Wraps handlers (from [StreamTransformer.fromHandlers]) into an `EventSink`. | 202 * Wraps handlers (from [StreamTransformer.fromHandlers]) into an `EventSink`. |
203 * | 203 * |
204 * This way we can reuse the code from [_StreamSinkTransformer]. | 204 * This way we can reuse the code from [_StreamSinkTransformer]. |
205 */ | 205 */ |
206 class _HandlerEventSink<S, T> implements EventSink<S> { | 206 class _HandlerEventSink<S, T> implements EventSink<S> { |
207 final _TransformDataHandler<S, T> _handleData; | 207 final _TransformDataHandler<S, T> _handleData; |
208 final _TransformErrorHandler<T> _handleError; | 208 final _TransformErrorHandler<T> _handleError; |
209 final _TransformDoneHandler<T> _handleDone; | 209 final _TransformDoneHandler<T> _handleDone; |
210 | 210 |
211 /// The output sink where the handlers should send their data into. | 211 /// The output sink where the handlers should send their data into. |
212 final EventSink<T> _sink; | 212 final EventSink<T> _sink; |
213 | 213 |
214 _HandlerEventSink(this._handleData, this._handleError, this._handleDone, | 214 _HandlerEventSink( |
215 this._sink); | 215 this._handleData, this._handleError, this._handleDone, this._sink); |
216 | 216 |
217 void add(S data) { _handleData(data, _sink); } | 217 void add(S data) { |
| 218 _handleData(data, _sink); |
| 219 } |
| 220 |
218 void addError(Object error, [StackTrace stackTrace]) { | 221 void addError(Object error, [StackTrace stackTrace]) { |
219 _handleError(error, stackTrace, _sink); | 222 _handleError(error, stackTrace, _sink); |
220 } | 223 } |
221 void close() { _handleDone(_sink); } | 224 |
| 225 void close() { |
| 226 _handleDone(_sink); |
| 227 } |
222 } | 228 } |
223 | 229 |
224 /** | 230 /** |
225 * A StreamTransformer that transformers events with the given handlers. | 231 * A StreamTransformer that transformers events with the given handlers. |
226 * | 232 * |
227 * Note that this transformer can only be used once. | 233 * Note that this transformer can only be used once. |
228 */ | 234 */ |
229 class _StreamHandlerTransformer<S, T> extends _StreamSinkTransformer<S, T> { | 235 class _StreamHandlerTransformer<S, T> extends _StreamSinkTransformer<S, T> { |
230 | 236 _StreamHandlerTransformer( |
231 _StreamHandlerTransformer({ | 237 {void handleData(S data, EventSink<T> sink), |
232 void handleData(S data, EventSink<T> sink), | |
233 void handleError(Object error, StackTrace stackTrace, EventSink<T> sink), | 238 void handleError(Object error, StackTrace stackTrace, EventSink<T> sink), |
234 void handleDone(EventSink<T> sink)}) | 239 void handleDone(EventSink<T> sink)}) |
235 : super((EventSink<T> outputSink) { | 240 : super((EventSink<T> outputSink) { |
236 if (handleData == null) handleData = _defaultHandleData; | 241 if (handleData == null) handleData = _defaultHandleData; |
237 if (handleError == null) handleError = _defaultHandleError; | 242 if (handleError == null) handleError = _defaultHandleError; |
238 if (handleDone == null) handleDone = _defaultHandleDone; | 243 if (handleDone == null) handleDone = _defaultHandleDone; |
239 return new _HandlerEventSink<S, T>( | 244 return new _HandlerEventSink<S, T>( |
240 handleData, handleError, handleDone, outputSink); | 245 handleData, handleError, handleDone, outputSink); |
241 }); | 246 }); |
242 | 247 |
243 Stream<T> bind(Stream<S> stream) { | 248 Stream<T> bind(Stream<S> stream) { |
244 return super.bind(stream); | 249 return super.bind(stream); |
245 } | 250 } |
246 | 251 |
247 /** Default data handler forwards all data. */ | 252 /** Default data handler forwards all data. */ |
248 static void _defaultHandleData(var data, EventSink sink) { | 253 static void _defaultHandleData(var data, EventSink sink) { |
249 sink.add(data); | 254 sink.add(data); |
250 } | 255 } |
251 | 256 |
252 /** Default error handler forwards all errors. */ | 257 /** Default error handler forwards all errors. */ |
253 static void _defaultHandleError(error, StackTrace stackTrace, | 258 static void _defaultHandleError( |
254 EventSink sink) { | 259 error, StackTrace stackTrace, EventSink sink) { |
255 sink.addError(error, stackTrace); | 260 sink.addError(error, stackTrace); |
256 } | 261 } |
257 | 262 |
258 /** Default done handler forwards done. */ | 263 /** Default done handler forwards done. */ |
259 static void _defaultHandleDone(EventSink sink) { | 264 static void _defaultHandleDone(EventSink sink) { |
260 sink.close(); | 265 sink.close(); |
261 } | 266 } |
262 } | 267 } |
263 | 268 |
264 /// A closure mapping a stream and cancelOnError to a StreamSubscription. | 269 /// A closure mapping a stream and cancelOnError to a StreamSubscription. |
(...skipping 28 matching lines...) Expand all Loading... |
293 * the stored [_stream]. Usually the transformer starts listening at this | 298 * the stored [_stream]. Usually the transformer starts listening at this |
294 * moment. | 299 * moment. |
295 */ | 300 */ |
296 class _BoundSubscriptionStream<S, T> extends Stream<T> { | 301 class _BoundSubscriptionStream<S, T> extends Stream<T> { |
297 final _SubscriptionTransformer<S, T> _transformer; | 302 final _SubscriptionTransformer<S, T> _transformer; |
298 final Stream<S> _stream; | 303 final Stream<S> _stream; |
299 | 304 |
300 _BoundSubscriptionStream(this._stream, this._transformer); | 305 _BoundSubscriptionStream(this._stream, this._transformer); |
301 | 306 |
302 StreamSubscription<T> listen(void onData(T event), | 307 StreamSubscription<T> listen(void onData(T event), |
303 { Function onError, | 308 {Function onError, void onDone(), bool cancelOnError}) { |
304 void onDone(), | |
305 bool cancelOnError }) { | |
306 cancelOnError = identical(true, cancelOnError); | 309 cancelOnError = identical(true, cancelOnError); |
307 StreamSubscription<T> result = _transformer(_stream, cancelOnError); | 310 StreamSubscription<T> result = _transformer(_stream, cancelOnError); |
308 result.onData(onData); | 311 result.onData(onData); |
309 result.onError(onError); | 312 result.onError(onError); |
310 result.onDone(onDone); | 313 result.onDone(onDone); |
311 return result; | 314 return result; |
312 } | 315 } |
313 } | 316 } |
OLD | NEW |