OLD | NEW |
| (Empty) |
1 part of dart.async; | |
2 _runUserCode(userCode(), onSuccess(value), onError(error, StackTrace stackTrace
)) { | |
3 try { | |
4 onSuccess(userCode()); | |
5 } | |
6 catch (e, s) { | |
7 AsyncError replacement = Zone.current.errorCallback(e, s); | |
8 if (replacement == null) { | |
9 onError(e, s); | |
10 } | |
11 else { | |
12 var error = _nonNullError(replacement.error); | |
13 var stackTrace = replacement.stackTrace; | |
14 onError(error, stackTrace); | |
15 } | |
16 } | |
17 } | |
18 void _cancelAndError(StreamSubscription subscription, _Future future, error, St
ackTrace stackTrace) { | |
19 var cancelFuture = subscription.cancel(); | |
20 if (cancelFuture is Future) { | |
21 cancelFuture.whenComplete(() => future._completeError(error, stackTrace)); | |
22 } | |
23 else { | |
24 future._completeError(error, stackTrace); | |
25 } | |
26 } | |
27 void _cancelAndErrorWithReplacement(StreamSubscription subscription, _Future fu
ture, error, StackTrace stackTrace) { | |
28 AsyncError replacement = Zone.current.errorCallback(error, stackTrace); | |
29 if (replacement != null) { | |
30 error = _nonNullError(replacement.error); | |
31 stackTrace = replacement.stackTrace; | |
32 } | |
33 _cancelAndError(subscription, future, error, stackTrace); | |
34 } | |
35 _cancelAndErrorClosure(StreamSubscription subscription, _Future future) => ((er
ror, StackTrace stackTrace) => _cancelAndError(subscription, future, error, stac
kTrace)); | |
36 void _cancelAndValue(StreamSubscription subscription, _Future future, value) { | |
37 var cancelFuture = subscription.cancel(); | |
38 if (cancelFuture is Future) { | |
39 cancelFuture.whenComplete(() => future._complete(value)); | |
40 } | |
41 else { | |
42 future._complete(value); | |
43 } | |
44 } | |
45 abstract class _ForwardingStream<S, T> extends Stream<T> {final Stream<S> _sour
ce; | |
46 _ForwardingStream(this._source); | |
47 bool get isBroadcast => _source.isBroadcast; | |
48 StreamSubscription<T> listen(void onData(T value), { | |
49 Function onError, void onDone(), bool cancelOnError} | |
50 ) { | |
51 cancelOnError = identical(true, cancelOnError); | |
52 return _createSubscription(onData, onError, onDone, cancelOnError); | |
53 } | |
54 StreamSubscription<T> _createSubscription(void onData(T data), Function onError
, void onDone(), bool cancelOnError) { | |
55 return new _ForwardingStreamSubscription<S, T>(this, onData, onError, onDone,
cancelOnError); | |
56 } | |
57 void _handleData(S data, _EventSink<T> sink) { | |
58 dynamic outputData = data; | |
59 sink._add(DEVC$RT.cast(outputData, dynamic, T, "CompositeCast", """line 104,
column 15 of dart:async/stream_pipe.dart: """, outputData is T, false)); | |
60 } | |
61 void _handleError(error, StackTrace stackTrace, _EventSink<T> sink) { | |
62 sink._addError(error, stackTrace); | |
63 } | |
64 void _handleDone(_EventSink<T> sink) { | |
65 sink._close(); | |
66 } | |
67 } | |
68 class _ForwardingStreamSubscription<S, T> extends _BufferingStreamSubscription<
T> {final _ForwardingStream<S, T> _stream; | |
69 StreamSubscription<S> _subscription; | |
70 _ForwardingStreamSubscription(this._stream, void onData(T data), Function onErr
or, void onDone(), bool cancelOnError) : super(onData, onError, onDone, cancelOn
Error) { | |
71 _subscription = _stream._source.listen(_handleData, onError: _handleError, onDon
e: _handleDone); | |
72 } | |
73 void _add(T data) { | |
74 if (_isClosed) return; super._add(data); | |
75 } | |
76 void _addError(Object error, StackTrace stackTrace) { | |
77 if (_isClosed) return; super._addError(error, stackTrace); | |
78 } | |
79 void _onPause() { | |
80 if (_subscription == null) return; _subscription.pause(); | |
81 } | |
82 void _onResume() { | |
83 if (_subscription == null) return; _subscription.resume(); | |
84 } | |
85 Future _onCancel() { | |
86 if (_subscription != null) { | |
87 StreamSubscription subscription = _subscription; | |
88 _subscription = null; | |
89 subscription.cancel(); | |
90 } | |
91 return null; | |
92 } | |
93 void _handleData(S data) { | |
94 _stream._handleData(data, this); | |
95 } | |
96 void _handleError(error, StackTrace stackTrace) { | |
97 _stream._handleError(error, stackTrace, this); | |
98 } | |
99 void _handleDone() { | |
100 _stream._handleDone(this); | |
101 } | |
102 } | |
103 typedef bool _Predicate<T>(T value); | |
104 void _addErrorWithReplacement(_EventSink sink, error, stackTrace) { | |
105 AsyncError replacement = Zone.current.errorCallback(error, DEVC$RT.cast(stackTra
ce, dynamic, StackTrace, "DynamicCast", """line 191, column 62 of dart:async/str
eam_pipe.dart: """, stackTrace is StackTrace, true)); | |
106 if (replacement != null) { | |
107 error = _nonNullError(replacement.error); | |
108 stackTrace = replacement.stackTrace; | |
109 } | |
110 sink._addError(error, DEVC$RT.cast(stackTrace, dynamic, StackTrace, "DynamicCas
t", """line 196, column 25 of dart:async/stream_pipe.dart: """, stackTrace is St
ackTrace, true)); | |
111 } | |
112 class _WhereStream<T> extends _ForwardingStream<T, T> {final _Predicate<T> _tes
t; | |
113 _WhereStream(Stream<T> source, bool test(T value)) : _test = test, super(source
); | |
114 void _handleData(T inputEvent, _EventSink<T> sink) { | |
115 bool satisfies; | |
116 try { | |
117 satisfies = _test(inputEvent); | |
118 } | |
119 catch (e, s) { | |
120 _addErrorWithReplacement(sink, e, s); | |
121 return;} | |
122 if (satisfies) { | |
123 sink._add(inputEvent); | |
124 } | |
125 } | |
126 } | |
127 typedef T _Transformation<S, T>(S value); | |
128 class _MapStream<S, T> extends _ForwardingStream<S, T> {final _Transformation _
transform; | |
129 _MapStream(Stream<S> source, T transform(S event)) : this._transform = transfor
m, super(source); | |
130 void _handleData(S inputEvent, _EventSink<T> sink) { | |
131 T outputEvent; | |
132 try { | |
133 outputEvent = ((__x58) => DEVC$RT.cast(__x58, dynamic, T, "CompositeCast", """li
ne 235, column 21 of dart:async/stream_pipe.dart: """, __x58 is T, false))(_tran
sform(inputEvent)); | |
134 } | |
135 catch (e, s) { | |
136 _addErrorWithReplacement(sink, e, s); | |
137 return;} | |
138 sink._add(outputEvent); | |
139 } | |
140 } | |
141 class _ExpandStream<S, T> extends _ForwardingStream<S, T> {final _Transformatio
n<S, Iterable<T>> _expand; | |
142 _ExpandStream(Stream<S> source, Iterable<T> expand(S event)) : this._expand = e
xpand, super(source); | |
143 void _handleData(S inputEvent, _EventSink<T> sink) { | |
144 try { | |
145 for (T value in _expand(inputEvent)) { | |
146 sink._add(value); | |
147 } | |
148 } | |
149 catch (e, s) { | |
150 _addErrorWithReplacement(sink, e, s); | |
151 } | |
152 } | |
153 } | |
154 typedef bool _ErrorTest(error); | |
155 class _HandleErrorStream<T> extends _ForwardingStream<T, T> {final Function _tr
ansform; | |
156 final _ErrorTest _test; | |
157 _HandleErrorStream(Stream<T> source, Function onError, bool test(error)) : this
._transform = onError, this._test = test, super(source); | |
158 void _handleError(Object error, StackTrace stackTrace, _EventSink<T> sink) { | |
159 bool matches = true; | |
160 if (_test != null) { | |
161 try { | |
162 matches = _test(error); | |
163 } | |
164 catch (e, s) { | |
165 _addErrorWithReplacement(sink, e, s); | |
166 return;} | |
167 } | |
168 if (matches) { | |
169 try { | |
170 _invokeErrorHandler(_transform, error, stackTrace); | |
171 } | |
172 catch (e, s) { | |
173 if (identical(e, error)) { | |
174 sink._addError(error, stackTrace); | |
175 } | |
176 else { | |
177 _addErrorWithReplacement(sink, e, s); | |
178 } | |
179 return;} | |
180 } | |
181 else { | |
182 sink._addError(error, stackTrace); | |
183 } | |
184 } | |
185 } | |
186 class _TakeStream<T> extends _ForwardingStream<T, T> {int _remaining; | |
187 _TakeStream(Stream<T> source, int count) : this._remaining = count, super(sourc
e) { | |
188 if (count is! int) throw new ArgumentError(count); | |
189 } | |
190 void _handleData(T inputEvent, _EventSink<T> sink) { | |
191 if (_remaining > 0) { | |
192 sink._add(inputEvent); | |
193 _remaining -= 1; | |
194 if (_remaining == 0) { | |
195 sink._close(); | |
196 } | |
197 } | |
198 } | |
199 } | |
200 class _TakeWhileStream<T> extends _ForwardingStream<T, T> {final _Predicate<T>
_test; | |
201 _TakeWhileStream(Stream<T> source, bool test(T value)) : this._test = test, sup
er(source); | |
202 void _handleData(T inputEvent, _EventSink<T> sink) { | |
203 bool satisfies; | |
204 try { | |
205 satisfies = _test(inputEvent); | |
206 } | |
207 catch (e, s) { | |
208 _addErrorWithReplacement(sink, e, s); | |
209 sink._close(); | |
210 return;} | |
211 if (satisfies) { | |
212 sink._add(inputEvent); | |
213 } | |
214 else { | |
215 sink._close(); | |
216 } | |
217 } | |
218 } | |
219 class _SkipStream<T> extends _ForwardingStream<T, T> {int _remaining; | |
220 _SkipStream(Stream<T> source, int count) : this._remaining = count, super(sourc
e) { | |
221 if (count is! int || count < 0) throw new ArgumentError(count); | |
222 } | |
223 void _handleData(T inputEvent, _EventSink<T> sink) { | |
224 if (_remaining > 0) { | |
225 _remaining--; | |
226 return;} | |
227 sink._add(inputEvent); | |
228 } | |
229 } | |
230 class _SkipWhileStream<T> extends _ForwardingStream<T, T> {final _Predicate<T>
_test; | |
231 bool _hasFailed = false; | |
232 _SkipWhileStream(Stream<T> source, bool test(T value)) : this._test = test, sup
er(source); | |
233 void _handleData(T inputEvent, _EventSink<T> sink) { | |
234 if (_hasFailed) { | |
235 sink._add(inputEvent); | |
236 return;} | |
237 bool satisfies; | |
238 try { | |
239 satisfies = _test(inputEvent); | |
240 } | |
241 catch (e, s) { | |
242 _addErrorWithReplacement(sink, e, s); | |
243 _hasFailed = true; | |
244 return;} | |
245 if (!satisfies) { | |
246 _hasFailed = true; | |
247 sink._add(inputEvent); | |
248 } | |
249 } | |
250 } | |
251 typedef bool _Equality<T>(T a, T b); | |
252 class _DistinctStream<T> extends _ForwardingStream<T, T> {static var _SENTINEL
= new Object(); | |
253 _Equality<T> _equals; | |
254 var _previous = _SENTINEL; | |
255 _DistinctStream(Stream<T> source, bool equals(T a, T b)) : _equals = equals, su
per(source); | |
256 void _handleData(T inputEvent, _EventSink<T> sink) { | |
257 if (identical(_previous, _SENTINEL)) { | |
258 _previous = inputEvent; | |
259 return sink._add(inputEvent); | |
260 } | |
261 else { | |
262 bool isEqual; | |
263 try { | |
264 if (_equals == null) { | |
265 isEqual = (_previous == inputEvent); | |
266 } | |
267 else { | |
268 isEqual = _equals(DEVC$RT.cast(_previous, Object, T, "CompositeCast", """line 42
6, column 29 of dart:async/stream_pipe.dart: """, _previous is T, false), inputE
vent); | |
269 } | |
270 } | |
271 catch (e, s) { | |
272 _addErrorWithReplacement(sink, e, s); | |
273 return null; | |
274 } | |
275 if (!isEqual) { | |
276 sink._add(inputEvent); | |
277 _previous = inputEvent; | |
278 } | |
279 } | |
280 } | |
281 } | |
OLD | NEW |