Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(459)

Side by Side Diff: test/dart_codegen/expect/async/stream_pipe.dart

Issue 1148283010: Remove dart backend (Closed) Base URL: https://github.com/dart-lang/dev_compiler.git@master
Patch Set: Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 part of dart.async;
2 _runUserCode(userCode(), onSuccess(value), onError(error, StackTrace stackTrace )) {
3 try {
4 onSuccess(userCode());
5 }
6 catch (e, s) {
7 AsyncError replacement = Zone.current.errorCallback(e, s);
8 if (replacement == null) {
9 onError(e, s);
10 }
11 else {
12 var error = _nonNullError(replacement.error);
13 var stackTrace = replacement.stackTrace;
14 onError(error, stackTrace);
15 }
16 }
17 }
18 void _cancelAndError(StreamSubscription subscription, _Future future, error, St ackTrace stackTrace) {
19 var cancelFuture = subscription.cancel();
20 if (cancelFuture is Future) {
21 cancelFuture.whenComplete(() => future._completeError(error, stackTrace));
22 }
23 else {
24 future._completeError(error, stackTrace);
25 }
26 }
27 void _cancelAndErrorWithReplacement(StreamSubscription subscription, _Future fu ture, error, StackTrace stackTrace) {
28 AsyncError replacement = Zone.current.errorCallback(error, stackTrace);
29 if (replacement != null) {
30 error = _nonNullError(replacement.error);
31 stackTrace = replacement.stackTrace;
32 }
33 _cancelAndError(subscription, future, error, stackTrace);
34 }
35 _cancelAndErrorClosure(StreamSubscription subscription, _Future future) => ((er ror, StackTrace stackTrace) => _cancelAndError(subscription, future, error, stac kTrace));
36 void _cancelAndValue(StreamSubscription subscription, _Future future, value) {
37 var cancelFuture = subscription.cancel();
38 if (cancelFuture is Future) {
39 cancelFuture.whenComplete(() => future._complete(value));
40 }
41 else {
42 future._complete(value);
43 }
44 }
45 abstract class _ForwardingStream<S, T> extends Stream<T> {final Stream<S> _sour ce;
46 _ForwardingStream(this._source);
47 bool get isBroadcast => _source.isBroadcast;
48 StreamSubscription<T> listen(void onData(T value), {
49 Function onError, void onDone(), bool cancelOnError}
50 ) {
51 cancelOnError = identical(true, cancelOnError);
52 return _createSubscription(onData, onError, onDone, cancelOnError);
53 }
54 StreamSubscription<T> _createSubscription(void onData(T data), Function onError , void onDone(), bool cancelOnError) {
55 return new _ForwardingStreamSubscription<S, T>(this, onData, onError, onDone, cancelOnError);
56 }
57 void _handleData(S data, _EventSink<T> sink) {
58 dynamic outputData = data;
59 sink._add(DEVC$RT.cast(outputData, dynamic, T, "CompositeCast", """line 104, column 15 of dart:async/stream_pipe.dart: """, outputData is T, false));
60 }
61 void _handleError(error, StackTrace stackTrace, _EventSink<T> sink) {
62 sink._addError(error, stackTrace);
63 }
64 void _handleDone(_EventSink<T> sink) {
65 sink._close();
66 }
67 }
68 class _ForwardingStreamSubscription<S, T> extends _BufferingStreamSubscription< T> {final _ForwardingStream<S, T> _stream;
69 StreamSubscription<S> _subscription;
70 _ForwardingStreamSubscription(this._stream, void onData(T data), Function onErr or, void onDone(), bool cancelOnError) : super(onData, onError, onDone, cancelOn Error) {
71 _subscription = _stream._source.listen(_handleData, onError: _handleError, onDon e: _handleDone);
72 }
73 void _add(T data) {
74 if (_isClosed) return; super._add(data);
75 }
76 void _addError(Object error, StackTrace stackTrace) {
77 if (_isClosed) return; super._addError(error, stackTrace);
78 }
79 void _onPause() {
80 if (_subscription == null) return; _subscription.pause();
81 }
82 void _onResume() {
83 if (_subscription == null) return; _subscription.resume();
84 }
85 Future _onCancel() {
86 if (_subscription != null) {
87 StreamSubscription subscription = _subscription;
88 _subscription = null;
89 subscription.cancel();
90 }
91 return null;
92 }
93 void _handleData(S data) {
94 _stream._handleData(data, this);
95 }
96 void _handleError(error, StackTrace stackTrace) {
97 _stream._handleError(error, stackTrace, this);
98 }
99 void _handleDone() {
100 _stream._handleDone(this);
101 }
102 }
103 typedef bool _Predicate<T>(T value);
104 void _addErrorWithReplacement(_EventSink sink, error, stackTrace) {
105 AsyncError replacement = Zone.current.errorCallback(error, DEVC$RT.cast(stackTra ce, dynamic, StackTrace, "DynamicCast", """line 191, column 62 of dart:async/str eam_pipe.dart: """, stackTrace is StackTrace, true));
106 if (replacement != null) {
107 error = _nonNullError(replacement.error);
108 stackTrace = replacement.stackTrace;
109 }
110 sink._addError(error, DEVC$RT.cast(stackTrace, dynamic, StackTrace, "DynamicCas t", """line 196, column 25 of dart:async/stream_pipe.dart: """, stackTrace is St ackTrace, true));
111 }
112 class _WhereStream<T> extends _ForwardingStream<T, T> {final _Predicate<T> _tes t;
113 _WhereStream(Stream<T> source, bool test(T value)) : _test = test, super(source );
114 void _handleData(T inputEvent, _EventSink<T> sink) {
115 bool satisfies;
116 try {
117 satisfies = _test(inputEvent);
118 }
119 catch (e, s) {
120 _addErrorWithReplacement(sink, e, s);
121 return;}
122 if (satisfies) {
123 sink._add(inputEvent);
124 }
125 }
126 }
127 typedef T _Transformation<S, T>(S value);
128 class _MapStream<S, T> extends _ForwardingStream<S, T> {final _Transformation _ transform;
129 _MapStream(Stream<S> source, T transform(S event)) : this._transform = transfor m, super(source);
130 void _handleData(S inputEvent, _EventSink<T> sink) {
131 T outputEvent;
132 try {
133 outputEvent = ((__x58) => DEVC$RT.cast(__x58, dynamic, T, "CompositeCast", """li ne 235, column 21 of dart:async/stream_pipe.dart: """, __x58 is T, false))(_tran sform(inputEvent));
134 }
135 catch (e, s) {
136 _addErrorWithReplacement(sink, e, s);
137 return;}
138 sink._add(outputEvent);
139 }
140 }
141 class _ExpandStream<S, T> extends _ForwardingStream<S, T> {final _Transformatio n<S, Iterable<T>> _expand;
142 _ExpandStream(Stream<S> source, Iterable<T> expand(S event)) : this._expand = e xpand, super(source);
143 void _handleData(S inputEvent, _EventSink<T> sink) {
144 try {
145 for (T value in _expand(inputEvent)) {
146 sink._add(value);
147 }
148 }
149 catch (e, s) {
150 _addErrorWithReplacement(sink, e, s);
151 }
152 }
153 }
154 typedef bool _ErrorTest(error);
155 class _HandleErrorStream<T> extends _ForwardingStream<T, T> {final Function _tr ansform;
156 final _ErrorTest _test;
157 _HandleErrorStream(Stream<T> source, Function onError, bool test(error)) : this ._transform = onError, this._test = test, super(source);
158 void _handleError(Object error, StackTrace stackTrace, _EventSink<T> sink) {
159 bool matches = true;
160 if (_test != null) {
161 try {
162 matches = _test(error);
163 }
164 catch (e, s) {
165 _addErrorWithReplacement(sink, e, s);
166 return;}
167 }
168 if (matches) {
169 try {
170 _invokeErrorHandler(_transform, error, stackTrace);
171 }
172 catch (e, s) {
173 if (identical(e, error)) {
174 sink._addError(error, stackTrace);
175 }
176 else {
177 _addErrorWithReplacement(sink, e, s);
178 }
179 return;}
180 }
181 else {
182 sink._addError(error, stackTrace);
183 }
184 }
185 }
186 class _TakeStream<T> extends _ForwardingStream<T, T> {int _remaining;
187 _TakeStream(Stream<T> source, int count) : this._remaining = count, super(sourc e) {
188 if (count is! int) throw new ArgumentError(count);
189 }
190 void _handleData(T inputEvent, _EventSink<T> sink) {
191 if (_remaining > 0) {
192 sink._add(inputEvent);
193 _remaining -= 1;
194 if (_remaining == 0) {
195 sink._close();
196 }
197 }
198 }
199 }
200 class _TakeWhileStream<T> extends _ForwardingStream<T, T> {final _Predicate<T> _test;
201 _TakeWhileStream(Stream<T> source, bool test(T value)) : this._test = test, sup er(source);
202 void _handleData(T inputEvent, _EventSink<T> sink) {
203 bool satisfies;
204 try {
205 satisfies = _test(inputEvent);
206 }
207 catch (e, s) {
208 _addErrorWithReplacement(sink, e, s);
209 sink._close();
210 return;}
211 if (satisfies) {
212 sink._add(inputEvent);
213 }
214 else {
215 sink._close();
216 }
217 }
218 }
219 class _SkipStream<T> extends _ForwardingStream<T, T> {int _remaining;
220 _SkipStream(Stream<T> source, int count) : this._remaining = count, super(sourc e) {
221 if (count is! int || count < 0) throw new ArgumentError(count);
222 }
223 void _handleData(T inputEvent, _EventSink<T> sink) {
224 if (_remaining > 0) {
225 _remaining--;
226 return;}
227 sink._add(inputEvent);
228 }
229 }
230 class _SkipWhileStream<T> extends _ForwardingStream<T, T> {final _Predicate<T> _test;
231 bool _hasFailed = false;
232 _SkipWhileStream(Stream<T> source, bool test(T value)) : this._test = test, sup er(source);
233 void _handleData(T inputEvent, _EventSink<T> sink) {
234 if (_hasFailed) {
235 sink._add(inputEvent);
236 return;}
237 bool satisfies;
238 try {
239 satisfies = _test(inputEvent);
240 }
241 catch (e, s) {
242 _addErrorWithReplacement(sink, e, s);
243 _hasFailed = true;
244 return;}
245 if (!satisfies) {
246 _hasFailed = true;
247 sink._add(inputEvent);
248 }
249 }
250 }
251 typedef bool _Equality<T>(T a, T b);
252 class _DistinctStream<T> extends _ForwardingStream<T, T> {static var _SENTINEL = new Object();
253 _Equality<T> _equals;
254 var _previous = _SENTINEL;
255 _DistinctStream(Stream<T> source, bool equals(T a, T b)) : _equals = equals, su per(source);
256 void _handleData(T inputEvent, _EventSink<T> sink) {
257 if (identical(_previous, _SENTINEL)) {
258 _previous = inputEvent;
259 return sink._add(inputEvent);
260 }
261 else {
262 bool isEqual;
263 try {
264 if (_equals == null) {
265 isEqual = (_previous == inputEvent);
266 }
267 else {
268 isEqual = _equals(DEVC$RT.cast(_previous, Object, T, "CompositeCast", """line 42 6, column 29 of dart:async/stream_pipe.dart: """, _previous is T, false), inputE vent);
269 }
270 }
271 catch (e, s) {
272 _addErrorWithReplacement(sink, e, s);
273 return null;
274 }
275 if (!isEqual) {
276 sink._add(inputEvent);
277 _previous = inputEvent;
278 }
279 }
280 }
281 }
OLDNEW
« no previous file with comments | « test/dart_codegen/expect/async/stream_impl.dart ('k') | test/dart_codegen/expect/async/stream_transformers.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698