OLD | NEW |
| (Empty) |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | |
2 // for details. All rights reserved. Use of this source code is governed by a | |
3 // BSD-style license that can be found in the LICENSE file. | |
4 | |
5 part of dart.async; | |
6 | |
7 /** Runs user code and takes actions depending on success or failure. */ | |
8 _runUserCode(userCode(), | |
9 onSuccess(value), | |
10 onError(error, StackTrace stackTrace)) { | |
11 try { | |
12 onSuccess(userCode()); | |
13 } catch (e, s) { | |
14 AsyncError replacement = Zone.current.errorCallback(e, s); | |
15 if (replacement == null) { | |
16 onError(e, s); | |
17 } else { | |
18 var error = _nonNullError(replacement.error); | |
19 var stackTrace = replacement.stackTrace; | |
20 onError(error, stackTrace); | |
21 } | |
22 } | |
23 } | |
24 | |
25 /** Helper function to cancel a subscription and wait for the potential future, | |
26 before completing with an error. */ | |
27 void _cancelAndError(StreamSubscription subscription, | |
28 _Future future, | |
29 error, | |
30 StackTrace stackTrace) { | |
31 var cancelFuture = subscription.cancel(); | |
32 if (cancelFuture is Future) { | |
33 cancelFuture.whenComplete(() => future._completeError(error, stackTrace)); | |
34 } else { | |
35 future._completeError(error, stackTrace); | |
36 } | |
37 } | |
38 | |
39 void _cancelAndErrorWithReplacement(StreamSubscription subscription, | |
40 _Future future, | |
41 error, StackTrace stackTrace) { | |
42 AsyncError replacement = Zone.current.errorCallback(error, stackTrace); | |
43 if (replacement != null) { | |
44 error = _nonNullError(replacement.error); | |
45 stackTrace = replacement.stackTrace; | |
46 } | |
47 _cancelAndError(subscription, future, error, stackTrace); | |
48 } | |
49 | |
50 typedef void _ErrorCallback(error, StackTrace stackTrace); | |
51 | |
52 /** Helper function to make an onError argument to [_runUserCode]. */ | |
53 _ErrorCallback _cancelAndErrorClosure( | |
54 StreamSubscription subscription, _Future future) { | |
55 return (error, StackTrace stackTrace) { | |
56 _cancelAndError(subscription, future, error, stackTrace); | |
57 }; | |
58 } | |
59 | |
60 /** Helper function to cancel a subscription and wait for the potential future, | |
61 before completing with a value. */ | |
62 void _cancelAndValue(StreamSubscription subscription, _Future future, value) { | |
63 var cancelFuture = subscription.cancel(); | |
64 if (cancelFuture is Future) { | |
65 cancelFuture.whenComplete(() => future._complete(value)); | |
66 } else { | |
67 future._complete(value); | |
68 } | |
69 } | |
70 | |
71 | |
72 /** | |
73 * A [Stream] that forwards subscriptions to another stream. | |
74 * | |
75 * This stream implements [Stream], but forwards all subscriptions | |
76 * to an underlying stream, and wraps the returned subscription to | |
77 * modify the events on the way. | |
78 * | |
79 * This class is intended for internal use only. | |
80 */ | |
81 abstract class _ForwardingStream<S, T> extends Stream<T> { | |
82 final Stream<S> _source; | |
83 | |
84 _ForwardingStream(this._source); | |
85 | |
86 bool get isBroadcast => _source.isBroadcast; | |
87 | |
88 StreamSubscription<T> listen(void onData(T value), | |
89 { Function onError, | |
90 void onDone(), | |
91 bool cancelOnError }) { | |
92 cancelOnError = identical(true, cancelOnError); | |
93 return _createSubscription(onData, onError, onDone, cancelOnError); | |
94 } | |
95 | |
96 StreamSubscription<T> _createSubscription( | |
97 void onData(T data), | |
98 Function onError, | |
99 void onDone(), | |
100 bool cancelOnError) { | |
101 return new _ForwardingStreamSubscription<S, T>( | |
102 this, onData, onError, onDone, cancelOnError); | |
103 } | |
104 | |
105 // Override the following methods in subclasses to change the behavior. | |
106 | |
107 void _handleData(S data, _EventSink<T> sink) { | |
108 sink._add(data as Object /*=T*/); | |
109 } | |
110 | |
111 void _handleError(error, StackTrace stackTrace, _EventSink<T> sink) { | |
112 sink._addError(error, stackTrace); | |
113 } | |
114 | |
115 void _handleDone(_EventSink<T> sink) { | |
116 sink._close(); | |
117 } | |
118 } | |
119 | |
120 /** | |
121 * Abstract superclass for subscriptions that forward to other subscriptions. | |
122 */ | |
123 class _ForwardingStreamSubscription<S, T> | |
124 extends _BufferingStreamSubscription<T> { | |
125 final _ForwardingStream<S, T> _stream; | |
126 | |
127 StreamSubscription<S> _subscription; | |
128 | |
129 _ForwardingStreamSubscription(this._stream, void onData(T data), | |
130 Function onError, void onDone(), | |
131 bool cancelOnError) | |
132 : super(onData, onError, onDone, cancelOnError) { | |
133 _subscription = _stream._source.listen(_handleData, | |
134 onError: _handleError, | |
135 onDone: _handleDone); | |
136 } | |
137 | |
138 // _StreamSink interface. | |
139 // Transformers sending more than one event have no way to know if the stream | |
140 // is canceled or closed after the first, so we just ignore remaining events. | |
141 | |
142 void _add(T data) { | |
143 if (_isClosed) return; | |
144 super._add(data); | |
145 } | |
146 | |
147 void _addError(Object error, StackTrace stackTrace) { | |
148 if (_isClosed) return; | |
149 super._addError(error, stackTrace); | |
150 } | |
151 | |
152 // StreamSubscription callbacks. | |
153 | |
154 void _onPause() { | |
155 if (_subscription == null) return; | |
156 _subscription.pause(); | |
157 } | |
158 | |
159 void _onResume() { | |
160 if (_subscription == null) return; | |
161 _subscription.resume(); | |
162 } | |
163 | |
164 Future _onCancel() { | |
165 if (_subscription != null) { | |
166 StreamSubscription subscription = _subscription; | |
167 _subscription = null; | |
168 return subscription.cancel(); | |
169 } | |
170 return null; | |
171 } | |
172 | |
173 // Methods used as listener on source subscription. | |
174 | |
175 void _handleData(S data) { | |
176 _stream._handleData(data, this); | |
177 } | |
178 | |
179 void _handleError(error, StackTrace stackTrace) { | |
180 _stream._handleError(error, stackTrace, this); | |
181 } | |
182 | |
183 void _handleDone() { | |
184 _stream._handleDone(this); | |
185 } | |
186 } | |
187 | |
188 // ------------------------------------------------------------------- | |
189 // Stream transformers used by the default Stream implementation. | |
190 // ------------------------------------------------------------------- | |
191 | |
192 typedef bool _Predicate<T>(T value); | |
193 | |
194 void _addErrorWithReplacement(_EventSink sink, error, stackTrace) { | |
195 AsyncError replacement = Zone.current.errorCallback(error, stackTrace); | |
196 if (replacement != null) { | |
197 error = _nonNullError(replacement.error); | |
198 stackTrace = replacement.stackTrace; | |
199 } | |
200 sink._addError(error, stackTrace); | |
201 } | |
202 | |
203 | |
204 class _WhereStream<T> extends _ForwardingStream<T, T> { | |
205 final _Predicate<T> _test; | |
206 | |
207 _WhereStream(Stream<T> source, bool test(T value)) | |
208 : _test = test, super(source); | |
209 | |
210 void _handleData(T inputEvent, _EventSink<T> sink) { | |
211 bool satisfies; | |
212 try { | |
213 satisfies = _test(inputEvent); | |
214 } catch (e, s) { | |
215 _addErrorWithReplacement(sink, e, s); | |
216 return; | |
217 } | |
218 if (satisfies) { | |
219 sink._add(inputEvent); | |
220 } | |
221 } | |
222 } | |
223 | |
224 | |
225 typedef T _Transformation<S, T>(S value); | |
226 | |
227 /** | |
228 * A stream pipe that converts data events before passing them on. | |
229 */ | |
230 class _MapStream<S, T> extends _ForwardingStream<S, T> { | |
231 final _Transformation<S, T> _transform; | |
232 | |
233 _MapStream(Stream<S> source, T transform(S event)) | |
234 : this._transform = transform, super(source); | |
235 | |
236 void _handleData(S inputEvent, _EventSink<T> sink) { | |
237 T outputEvent; | |
238 try { | |
239 outputEvent = _transform(inputEvent); | |
240 } catch (e, s) { | |
241 _addErrorWithReplacement(sink, e, s); | |
242 return; | |
243 } | |
244 sink._add(outputEvent); | |
245 } | |
246 } | |
247 | |
248 /** | |
249 * A stream pipe that converts data events before passing them on. | |
250 */ | |
251 class _ExpandStream<S, T> extends _ForwardingStream<S, T> { | |
252 final _Transformation<S, Iterable<T>> _expand; | |
253 | |
254 _ExpandStream(Stream<S> source, Iterable<T> expand(S event)) | |
255 : this._expand = expand, super(source); | |
256 | |
257 void _handleData(S inputEvent, _EventSink<T> sink) { | |
258 try { | |
259 for (T value in _expand(inputEvent)) { | |
260 sink._add(value); | |
261 } | |
262 } catch (e, s) { | |
263 // If either _expand or iterating the generated iterator throws, | |
264 // we abort the iteration. | |
265 _addErrorWithReplacement(sink, e, s); | |
266 } | |
267 } | |
268 } | |
269 | |
270 | |
271 typedef bool _ErrorTest(error); | |
272 | |
273 /** | |
274 * A stream pipe that converts or disposes error events | |
275 * before passing them on. | |
276 */ | |
277 class _HandleErrorStream<T> extends _ForwardingStream<T, T> { | |
278 final Function _transform; | |
279 final _ErrorTest _test; | |
280 | |
281 _HandleErrorStream(Stream<T> source, | |
282 Function onError, | |
283 bool test(error)) | |
284 : this._transform = onError, this._test = test, super(source); | |
285 | |
286 void _handleError(Object error, StackTrace stackTrace, _EventSink<T> sink) { | |
287 bool matches = true; | |
288 if (_test != null) { | |
289 try { | |
290 matches = _test(error); | |
291 } catch (e, s) { | |
292 _addErrorWithReplacement(sink, e, s); | |
293 return; | |
294 } | |
295 } | |
296 if (matches) { | |
297 try { | |
298 _invokeErrorHandler(_transform, error, stackTrace); | |
299 } catch (e, s) { | |
300 if (identical(e, error)) { | |
301 sink._addError(error, stackTrace); | |
302 } else { | |
303 _addErrorWithReplacement(sink, e, s); | |
304 } | |
305 return; | |
306 } | |
307 } else { | |
308 sink._addError(error, stackTrace); | |
309 } | |
310 } | |
311 } | |
312 | |
313 | |
314 class _TakeStream<T> extends _ForwardingStream<T, T> { | |
315 final int _count; | |
316 | |
317 _TakeStream(Stream<T> source, int count) | |
318 : this._count = count, super(source) { | |
319 // This test is done early to avoid handling an async error | |
320 // in the _handleData method. | |
321 if (count is! int) throw new ArgumentError(count); | |
322 } | |
323 | |
324 StreamSubscription<T> _createSubscription( | |
325 void onData(T data), | |
326 Function onError, | |
327 void onDone(), | |
328 bool cancelOnError) { | |
329 return new _StateStreamSubscription<T>( | |
330 this, onData, onError, onDone, cancelOnError, _count); | |
331 } | |
332 | |
333 void _handleData(T inputEvent, _EventSink<T> sink) { | |
334 _StateStreamSubscription<T> subscription = sink; | |
335 int count = subscription._count; | |
336 if (count > 0) { | |
337 sink._add(inputEvent); | |
338 count -= 1; | |
339 subscription._count = count; | |
340 if (count == 0) { | |
341 // Closing also unsubscribes all subscribers, which unsubscribes | |
342 // this from source. | |
343 sink._close(); | |
344 } | |
345 } | |
346 } | |
347 } | |
348 | |
349 /** | |
350 * A [_ForwardingStreamSubscription] with one extra state field. | |
351 * | |
352 * Use by several different classes, some storing an integer, others a bool. | |
353 */ | |
354 class _StateStreamSubscription<T> extends _ForwardingStreamSubscription<T, T> { | |
355 // Raw state field. Typed access provided by getters and setters below. | |
356 var _sharedState; | |
357 | |
358 _StateStreamSubscription(_ForwardingStream<T, T> stream, void onData(T data), | |
359 Function onError, void onDone(), | |
360 bool cancelOnError, this._sharedState) | |
361 : super(stream, onData, onError, onDone, cancelOnError); | |
362 | |
363 bool get _flag => _sharedState; | |
364 void set _flag(bool flag) { _sharedState = flag; } | |
365 int get _count => _sharedState; | |
366 void set _count(int count) { _sharedState = count; } | |
367 } | |
368 | |
369 | |
370 class _TakeWhileStream<T> extends _ForwardingStream<T, T> { | |
371 final _Predicate<T> _test; | |
372 | |
373 _TakeWhileStream(Stream<T> source, bool test(T value)) | |
374 : this._test = test, super(source); | |
375 | |
376 void _handleData(T inputEvent, _EventSink<T> sink) { | |
377 bool satisfies; | |
378 try { | |
379 satisfies = _test(inputEvent); | |
380 } catch (e, s) { | |
381 _addErrorWithReplacement(sink, e, s); | |
382 // The test didn't say true. Didn't say false either, but we stop anyway. | |
383 sink._close(); | |
384 return; | |
385 } | |
386 if (satisfies) { | |
387 sink._add(inputEvent); | |
388 } else { | |
389 sink._close(); | |
390 } | |
391 } | |
392 } | |
393 | |
394 class _SkipStream<T> extends _ForwardingStream<T, T> { | |
395 final int _count; | |
396 | |
397 _SkipStream(Stream<T> source, int count) | |
398 : this._count = count, super(source) { | |
399 // This test is done early to avoid handling an async error | |
400 // in the _handleData method. | |
401 if (count is! int || count < 0) throw new ArgumentError(count); | |
402 } | |
403 | |
404 StreamSubscription<T> _createSubscription( | |
405 void onData(T data), | |
406 Function onError, | |
407 void onDone(), | |
408 bool cancelOnError) { | |
409 return new _StateStreamSubscription<T>( | |
410 this, onData, onError, onDone, cancelOnError, _count); | |
411 } | |
412 | |
413 void _handleData(T inputEvent, _EventSink<T> sink) { | |
414 _StateStreamSubscription<T> subscription = sink; | |
415 int count = subscription._count; | |
416 if (count > 0) { | |
417 subscription._count = count - 1; | |
418 return; | |
419 } | |
420 sink._add(inputEvent); | |
421 } | |
422 } | |
423 | |
424 class _SkipWhileStream<T> extends _ForwardingStream<T, T> { | |
425 final _Predicate<T> _test; | |
426 | |
427 _SkipWhileStream(Stream<T> source, bool test(T value)) | |
428 : this._test = test, super(source); | |
429 | |
430 StreamSubscription<T> _createSubscription( | |
431 void onData(T data), | |
432 Function onError, | |
433 void onDone(), | |
434 bool cancelOnError) { | |
435 return new _StateStreamSubscription<T>( | |
436 this, onData, onError, onDone, cancelOnError, false); | |
437 } | |
438 | |
439 void _handleData(T inputEvent, _EventSink<T> sink) { | |
440 _StateStreamSubscription<T> subscription = sink; | |
441 bool hasFailed = subscription._flag; | |
442 if (hasFailed) { | |
443 sink._add(inputEvent); | |
444 return; | |
445 } | |
446 bool satisfies; | |
447 try { | |
448 satisfies = _test(inputEvent); | |
449 } catch (e, s) { | |
450 _addErrorWithReplacement(sink, e, s); | |
451 // A failure to return a boolean is considered "not matching". | |
452 subscription._flag = true; | |
453 return; | |
454 } | |
455 if (!satisfies) { | |
456 subscription._flag = true; | |
457 sink._add(inputEvent); | |
458 } | |
459 } | |
460 } | |
461 | |
462 typedef bool _Equality<T>(T a, T b); | |
463 | |
464 class _DistinctStream<T> extends _ForwardingStream<T, T> { | |
465 static var _SENTINEL = new Object(); | |
466 | |
467 _Equality<T> _equals; | |
468 var _previous = _SENTINEL; | |
469 | |
470 _DistinctStream(Stream<T> source, bool equals(T a, T b)) | |
471 : _equals = equals, super(source); | |
472 | |
473 void _handleData(T inputEvent, _EventSink<T> sink) { | |
474 if (identical(_previous, _SENTINEL)) { | |
475 _previous = inputEvent; | |
476 return sink._add(inputEvent); | |
477 } else { | |
478 bool isEqual; | |
479 try { | |
480 if (_equals == null) { | |
481 isEqual = (_previous == inputEvent); | |
482 } else { | |
483 isEqual = _equals(_previous as Object /*=T*/, inputEvent); | |
484 } | |
485 } catch (e, s) { | |
486 _addErrorWithReplacement(sink, e, s); | |
487 return null; | |
488 } | |
489 if (!isEqual) { | |
490 sink._add(inputEvent); | |
491 _previous = inputEvent; | |
492 } | |
493 } | |
494 } | |
495 } | |
OLD | NEW |