Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(486)

Side by Side Diff: pkg/dev_compiler/tool/input_sdk/lib/async/stream_pipe.dart

Issue 2698353003: unfork DDC's copy of most SDK libraries (Closed)
Patch Set: revert core_patch Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 part of dart.async;
6
7 /** Runs user code and takes actions depending on success or failure. */
8 _runUserCode(userCode(),
9 onSuccess(value),
10 onError(error, StackTrace stackTrace)) {
11 try {
12 onSuccess(userCode());
13 } catch (e, s) {
14 AsyncError replacement = Zone.current.errorCallback(e, s);
15 if (replacement == null) {
16 onError(e, s);
17 } else {
18 var error = _nonNullError(replacement.error);
19 var stackTrace = replacement.stackTrace;
20 onError(error, stackTrace);
21 }
22 }
23 }
24
25 /** Helper function to cancel a subscription and wait for the potential future,
26 before completing with an error. */
27 void _cancelAndError(StreamSubscription subscription,
28 _Future future,
29 error,
30 StackTrace stackTrace) {
31 var cancelFuture = subscription.cancel();
32 if (cancelFuture is Future) {
33 cancelFuture.whenComplete(() => future._completeError(error, stackTrace));
34 } else {
35 future._completeError(error, stackTrace);
36 }
37 }
38
39 void _cancelAndErrorWithReplacement(StreamSubscription subscription,
40 _Future future,
41 error, StackTrace stackTrace) {
42 AsyncError replacement = Zone.current.errorCallback(error, stackTrace);
43 if (replacement != null) {
44 error = _nonNullError(replacement.error);
45 stackTrace = replacement.stackTrace;
46 }
47 _cancelAndError(subscription, future, error, stackTrace);
48 }
49
50 typedef void _ErrorCallback(error, StackTrace stackTrace);
51
52 /** Helper function to make an onError argument to [_runUserCode]. */
53 _ErrorCallback _cancelAndErrorClosure(
54 StreamSubscription subscription, _Future future) {
55 return (error, StackTrace stackTrace) {
56 _cancelAndError(subscription, future, error, stackTrace);
57 };
58 }
59
60 /** Helper function to cancel a subscription and wait for the potential future,
61 before completing with a value. */
62 void _cancelAndValue(StreamSubscription subscription, _Future future, value) {
63 var cancelFuture = subscription.cancel();
64 if (cancelFuture is Future) {
65 cancelFuture.whenComplete(() => future._complete(value));
66 } else {
67 future._complete(value);
68 }
69 }
70
71
72 /**
73 * A [Stream] that forwards subscriptions to another stream.
74 *
75 * This stream implements [Stream], but forwards all subscriptions
76 * to an underlying stream, and wraps the returned subscription to
77 * modify the events on the way.
78 *
79 * This class is intended for internal use only.
80 */
81 abstract class _ForwardingStream<S, T> extends Stream<T> {
82 final Stream<S> _source;
83
84 _ForwardingStream(this._source);
85
86 bool get isBroadcast => _source.isBroadcast;
87
88 StreamSubscription<T> listen(void onData(T value),
89 { Function onError,
90 void onDone(),
91 bool cancelOnError }) {
92 cancelOnError = identical(true, cancelOnError);
93 return _createSubscription(onData, onError, onDone, cancelOnError);
94 }
95
96 StreamSubscription<T> _createSubscription(
97 void onData(T data),
98 Function onError,
99 void onDone(),
100 bool cancelOnError) {
101 return new _ForwardingStreamSubscription<S, T>(
102 this, onData, onError, onDone, cancelOnError);
103 }
104
105 // Override the following methods in subclasses to change the behavior.
106
107 void _handleData(S data, _EventSink<T> sink) {
108 sink._add(data as Object /*=T*/);
109 }
110
111 void _handleError(error, StackTrace stackTrace, _EventSink<T> sink) {
112 sink._addError(error, stackTrace);
113 }
114
115 void _handleDone(_EventSink<T> sink) {
116 sink._close();
117 }
118 }
119
120 /**
121 * Abstract superclass for subscriptions that forward to other subscriptions.
122 */
123 class _ForwardingStreamSubscription<S, T>
124 extends _BufferingStreamSubscription<T> {
125 final _ForwardingStream<S, T> _stream;
126
127 StreamSubscription<S> _subscription;
128
129 _ForwardingStreamSubscription(this._stream, void onData(T data),
130 Function onError, void onDone(),
131 bool cancelOnError)
132 : super(onData, onError, onDone, cancelOnError) {
133 _subscription = _stream._source.listen(_handleData,
134 onError: _handleError,
135 onDone: _handleDone);
136 }
137
138 // _StreamSink interface.
139 // Transformers sending more than one event have no way to know if the stream
140 // is canceled or closed after the first, so we just ignore remaining events.
141
142 void _add(T data) {
143 if (_isClosed) return;
144 super._add(data);
145 }
146
147 void _addError(Object error, StackTrace stackTrace) {
148 if (_isClosed) return;
149 super._addError(error, stackTrace);
150 }
151
152 // StreamSubscription callbacks.
153
154 void _onPause() {
155 if (_subscription == null) return;
156 _subscription.pause();
157 }
158
159 void _onResume() {
160 if (_subscription == null) return;
161 _subscription.resume();
162 }
163
164 Future _onCancel() {
165 if (_subscription != null) {
166 StreamSubscription subscription = _subscription;
167 _subscription = null;
168 return subscription.cancel();
169 }
170 return null;
171 }
172
173 // Methods used as listener on source subscription.
174
175 void _handleData(S data) {
176 _stream._handleData(data, this);
177 }
178
179 void _handleError(error, StackTrace stackTrace) {
180 _stream._handleError(error, stackTrace, this);
181 }
182
183 void _handleDone() {
184 _stream._handleDone(this);
185 }
186 }
187
188 // -------------------------------------------------------------------
189 // Stream transformers used by the default Stream implementation.
190 // -------------------------------------------------------------------
191
192 typedef bool _Predicate<T>(T value);
193
194 void _addErrorWithReplacement(_EventSink sink, error, stackTrace) {
195 AsyncError replacement = Zone.current.errorCallback(error, stackTrace);
196 if (replacement != null) {
197 error = _nonNullError(replacement.error);
198 stackTrace = replacement.stackTrace;
199 }
200 sink._addError(error, stackTrace);
201 }
202
203
204 class _WhereStream<T> extends _ForwardingStream<T, T> {
205 final _Predicate<T> _test;
206
207 _WhereStream(Stream<T> source, bool test(T value))
208 : _test = test, super(source);
209
210 void _handleData(T inputEvent, _EventSink<T> sink) {
211 bool satisfies;
212 try {
213 satisfies = _test(inputEvent);
214 } catch (e, s) {
215 _addErrorWithReplacement(sink, e, s);
216 return;
217 }
218 if (satisfies) {
219 sink._add(inputEvent);
220 }
221 }
222 }
223
224
225 typedef T _Transformation<S, T>(S value);
226
227 /**
228 * A stream pipe that converts data events before passing them on.
229 */
230 class _MapStream<S, T> extends _ForwardingStream<S, T> {
231 final _Transformation<S, T> _transform;
232
233 _MapStream(Stream<S> source, T transform(S event))
234 : this._transform = transform, super(source);
235
236 void _handleData(S inputEvent, _EventSink<T> sink) {
237 T outputEvent;
238 try {
239 outputEvent = _transform(inputEvent);
240 } catch (e, s) {
241 _addErrorWithReplacement(sink, e, s);
242 return;
243 }
244 sink._add(outputEvent);
245 }
246 }
247
248 /**
249 * A stream pipe that converts data events before passing them on.
250 */
251 class _ExpandStream<S, T> extends _ForwardingStream<S, T> {
252 final _Transformation<S, Iterable<T>> _expand;
253
254 _ExpandStream(Stream<S> source, Iterable<T> expand(S event))
255 : this._expand = expand, super(source);
256
257 void _handleData(S inputEvent, _EventSink<T> sink) {
258 try {
259 for (T value in _expand(inputEvent)) {
260 sink._add(value);
261 }
262 } catch (e, s) {
263 // If either _expand or iterating the generated iterator throws,
264 // we abort the iteration.
265 _addErrorWithReplacement(sink, e, s);
266 }
267 }
268 }
269
270
271 typedef bool _ErrorTest(error);
272
273 /**
274 * A stream pipe that converts or disposes error events
275 * before passing them on.
276 */
277 class _HandleErrorStream<T> extends _ForwardingStream<T, T> {
278 final Function _transform;
279 final _ErrorTest _test;
280
281 _HandleErrorStream(Stream<T> source,
282 Function onError,
283 bool test(error))
284 : this._transform = onError, this._test = test, super(source);
285
286 void _handleError(Object error, StackTrace stackTrace, _EventSink<T> sink) {
287 bool matches = true;
288 if (_test != null) {
289 try {
290 matches = _test(error);
291 } catch (e, s) {
292 _addErrorWithReplacement(sink, e, s);
293 return;
294 }
295 }
296 if (matches) {
297 try {
298 _invokeErrorHandler(_transform, error, stackTrace);
299 } catch (e, s) {
300 if (identical(e, error)) {
301 sink._addError(error, stackTrace);
302 } else {
303 _addErrorWithReplacement(sink, e, s);
304 }
305 return;
306 }
307 } else {
308 sink._addError(error, stackTrace);
309 }
310 }
311 }
312
313
314 class _TakeStream<T> extends _ForwardingStream<T, T> {
315 final int _count;
316
317 _TakeStream(Stream<T> source, int count)
318 : this._count = count, super(source) {
319 // This test is done early to avoid handling an async error
320 // in the _handleData method.
321 if (count is! int) throw new ArgumentError(count);
322 }
323
324 StreamSubscription<T> _createSubscription(
325 void onData(T data),
326 Function onError,
327 void onDone(),
328 bool cancelOnError) {
329 return new _StateStreamSubscription<T>(
330 this, onData, onError, onDone, cancelOnError, _count);
331 }
332
333 void _handleData(T inputEvent, _EventSink<T> sink) {
334 _StateStreamSubscription<T> subscription = sink;
335 int count = subscription._count;
336 if (count > 0) {
337 sink._add(inputEvent);
338 count -= 1;
339 subscription._count = count;
340 if (count == 0) {
341 // Closing also unsubscribes all subscribers, which unsubscribes
342 // this from source.
343 sink._close();
344 }
345 }
346 }
347 }
348
349 /**
350 * A [_ForwardingStreamSubscription] with one extra state field.
351 *
352 * Use by several different classes, some storing an integer, others a bool.
353 */
354 class _StateStreamSubscription<T> extends _ForwardingStreamSubscription<T, T> {
355 // Raw state field. Typed access provided by getters and setters below.
356 var _sharedState;
357
358 _StateStreamSubscription(_ForwardingStream<T, T> stream, void onData(T data),
359 Function onError, void onDone(),
360 bool cancelOnError, this._sharedState)
361 : super(stream, onData, onError, onDone, cancelOnError);
362
363 bool get _flag => _sharedState;
364 void set _flag(bool flag) { _sharedState = flag; }
365 int get _count => _sharedState;
366 void set _count(int count) { _sharedState = count; }
367 }
368
369
370 class _TakeWhileStream<T> extends _ForwardingStream<T, T> {
371 final _Predicate<T> _test;
372
373 _TakeWhileStream(Stream<T> source, bool test(T value))
374 : this._test = test, super(source);
375
376 void _handleData(T inputEvent, _EventSink<T> sink) {
377 bool satisfies;
378 try {
379 satisfies = _test(inputEvent);
380 } catch (e, s) {
381 _addErrorWithReplacement(sink, e, s);
382 // The test didn't say true. Didn't say false either, but we stop anyway.
383 sink._close();
384 return;
385 }
386 if (satisfies) {
387 sink._add(inputEvent);
388 } else {
389 sink._close();
390 }
391 }
392 }
393
394 class _SkipStream<T> extends _ForwardingStream<T, T> {
395 final int _count;
396
397 _SkipStream(Stream<T> source, int count)
398 : this._count = count, super(source) {
399 // This test is done early to avoid handling an async error
400 // in the _handleData method.
401 if (count is! int || count < 0) throw new ArgumentError(count);
402 }
403
404 StreamSubscription<T> _createSubscription(
405 void onData(T data),
406 Function onError,
407 void onDone(),
408 bool cancelOnError) {
409 return new _StateStreamSubscription<T>(
410 this, onData, onError, onDone, cancelOnError, _count);
411 }
412
413 void _handleData(T inputEvent, _EventSink<T> sink) {
414 _StateStreamSubscription<T> subscription = sink;
415 int count = subscription._count;
416 if (count > 0) {
417 subscription._count = count - 1;
418 return;
419 }
420 sink._add(inputEvent);
421 }
422 }
423
424 class _SkipWhileStream<T> extends _ForwardingStream<T, T> {
425 final _Predicate<T> _test;
426
427 _SkipWhileStream(Stream<T> source, bool test(T value))
428 : this._test = test, super(source);
429
430 StreamSubscription<T> _createSubscription(
431 void onData(T data),
432 Function onError,
433 void onDone(),
434 bool cancelOnError) {
435 return new _StateStreamSubscription<T>(
436 this, onData, onError, onDone, cancelOnError, false);
437 }
438
439 void _handleData(T inputEvent, _EventSink<T> sink) {
440 _StateStreamSubscription<T> subscription = sink;
441 bool hasFailed = subscription._flag;
442 if (hasFailed) {
443 sink._add(inputEvent);
444 return;
445 }
446 bool satisfies;
447 try {
448 satisfies = _test(inputEvent);
449 } catch (e, s) {
450 _addErrorWithReplacement(sink, e, s);
451 // A failure to return a boolean is considered "not matching".
452 subscription._flag = true;
453 return;
454 }
455 if (!satisfies) {
456 subscription._flag = true;
457 sink._add(inputEvent);
458 }
459 }
460 }
461
462 typedef bool _Equality<T>(T a, T b);
463
464 class _DistinctStream<T> extends _ForwardingStream<T, T> {
465 static var _SENTINEL = new Object();
466
467 _Equality<T> _equals;
468 var _previous = _SENTINEL;
469
470 _DistinctStream(Stream<T> source, bool equals(T a, T b))
471 : _equals = equals, super(source);
472
473 void _handleData(T inputEvent, _EventSink<T> sink) {
474 if (identical(_previous, _SENTINEL)) {
475 _previous = inputEvent;
476 return sink._add(inputEvent);
477 } else {
478 bool isEqual;
479 try {
480 if (_equals == null) {
481 isEqual = (_previous == inputEvent);
482 } else {
483 isEqual = _equals(_previous as Object /*=T*/, inputEvent);
484 }
485 } catch (e, s) {
486 _addErrorWithReplacement(sink, e, s);
487 return null;
488 }
489 if (!isEqual) {
490 sink._add(inputEvent);
491 _previous = inputEvent;
492 }
493 }
494 }
495 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698