Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(539)

Side by Side Diff: tool/input_sdk/lib/async/stream_pipe.dart

Issue 1953153002: Update dart:async to match the Dart repo. (Closed) Base URL: https://github.com/dart-lang/dev_compiler.git@master
Patch Set: Remove unneeded calls. Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 /** Runs user code and takes actions depending on success or failure. */ 7 /** Runs user code and takes actions depending on success or failure. */
8 _runUserCode(userCode(), 8 _runUserCode(userCode(),
9 onSuccess(value), 9 onSuccess(value),
10 onError(error, StackTrace stackTrace)) { 10 onError(error, StackTrace stackTrace)) {
(...skipping 29 matching lines...) Expand all
40 _Future future, 40 _Future future,
41 error, StackTrace stackTrace) { 41 error, StackTrace stackTrace) {
42 AsyncError replacement = Zone.current.errorCallback(error, stackTrace); 42 AsyncError replacement = Zone.current.errorCallback(error, stackTrace);
43 if (replacement != null) { 43 if (replacement != null) {
44 error = _nonNullError(replacement.error); 44 error = _nonNullError(replacement.error);
45 stackTrace = replacement.stackTrace; 45 stackTrace = replacement.stackTrace;
46 } 46 }
47 _cancelAndError(subscription, future, error, stackTrace); 47 _cancelAndError(subscription, future, error, stackTrace);
48 } 48 }
49 49
50 typedef void _ErrorCallback(error, StackTrace stackTrace);
51
50 /** Helper function to make an onError argument to [_runUserCode]. */ 52 /** Helper function to make an onError argument to [_runUserCode]. */
51 _cancelAndErrorClosure(StreamSubscription subscription, _Future future) => 53 _ErrorCallback _cancelAndErrorClosure(
52 ((error, StackTrace stackTrace) => _cancelAndError( 54 StreamSubscription subscription, _Future future) {
53 subscription, future, error, stackTrace)); 55 return (error, StackTrace stackTrace) {
56 _cancelAndError(subscription, future, error, stackTrace);
57 };
58 }
54 59
55 /** Helper function to cancel a subscription and wait for the potential future, 60 /** Helper function to cancel a subscription and wait for the potential future,
56 before completing with a value. */ 61 before completing with a value. */
57 void _cancelAndValue(StreamSubscription subscription, _Future future, value) { 62 void _cancelAndValue(StreamSubscription subscription, _Future future, value) {
58 var cancelFuture = subscription.cancel(); 63 var cancelFuture = subscription.cancel();
59 if (cancelFuture is Future) { 64 if (cancelFuture is Future) {
60 cancelFuture.whenComplete(() => future._complete(value)); 65 cancelFuture.whenComplete(() => future._complete(value));
61 } else { 66 } else {
62 future._complete(value); 67 future._complete(value);
63 } 68 }
(...skipping 29 matching lines...) Expand all
93 Function onError, 98 Function onError,
94 void onDone(), 99 void onDone(),
95 bool cancelOnError) { 100 bool cancelOnError) {
96 return new _ForwardingStreamSubscription<S, T>( 101 return new _ForwardingStreamSubscription<S, T>(
97 this, onData, onError, onDone, cancelOnError); 102 this, onData, onError, onDone, cancelOnError);
98 } 103 }
99 104
100 // Override the following methods in subclasses to change the behavior. 105 // Override the following methods in subclasses to change the behavior.
101 106
102 void _handleData(S data, _EventSink<T> sink) { 107 void _handleData(S data, _EventSink<T> sink) {
103 dynamic outputData = data; 108 sink._add(data as Object /*=T*/);
104 sink._add(outputData);
105 } 109 }
106 110
107 void _handleError(error, StackTrace stackTrace, _EventSink<T> sink) { 111 void _handleError(error, StackTrace stackTrace, _EventSink<T> sink) {
108 sink._addError(error, stackTrace); 112 sink._addError(error, stackTrace);
109 } 113 }
110 114
111 void _handleDone(_EventSink<T> sink) { 115 void _handleDone(_EventSink<T> sink) {
112 sink._close(); 116 sink._close();
113 } 117 }
114 } 118 }
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after
154 158
155 void _onResume() { 159 void _onResume() {
156 if (_subscription == null) return; 160 if (_subscription == null) return;
157 _subscription.resume(); 161 _subscription.resume();
158 } 162 }
159 163
160 Future _onCancel() { 164 Future _onCancel() {
161 if (_subscription != null) { 165 if (_subscription != null) {
162 StreamSubscription subscription = _subscription; 166 StreamSubscription subscription = _subscription;
163 _subscription = null; 167 _subscription = null;
164 subscription.cancel(); 168 return subscription.cancel();
165 } 169 }
166 return null; 170 return null;
167 } 171 }
168 172
169 // Methods used as listener on source subscription. 173 // Methods used as listener on source subscription.
170 174
171 void _handleData(S data) { 175 void _handleData(S data) {
172 _stream._handleData(data, this); 176 _stream._handleData(data, this);
173 } 177 }
174 178
(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after
217 } 221 }
218 } 222 }
219 223
220 224
221 typedef T _Transformation<S, T>(S value); 225 typedef T _Transformation<S, T>(S value);
222 226
223 /** 227 /**
224 * A stream pipe that converts data events before passing them on. 228 * A stream pipe that converts data events before passing them on.
225 */ 229 */
226 class _MapStream<S, T> extends _ForwardingStream<S, T> { 230 class _MapStream<S, T> extends _ForwardingStream<S, T> {
227 final _Transformation _transform; 231 final _Transformation<S, T> _transform;
228 232
229 _MapStream(Stream<S> source, T transform(S event)) 233 _MapStream(Stream<S> source, T transform(S event))
230 : this._transform = transform, super(source); 234 : this._transform = transform, super(source);
231 235
232 void _handleData(S inputEvent, _EventSink<T> sink) { 236 void _handleData(S inputEvent, _EventSink<T> sink) {
233 T outputEvent; 237 T outputEvent;
234 try { 238 try {
235 outputEvent = _transform(inputEvent); 239 outputEvent = _transform(inputEvent);
236 } catch (e, s) { 240 } catch (e, s) {
237 _addErrorWithReplacement(sink, e, s); 241 _addErrorWithReplacement(sink, e, s);
(...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after
301 return; 305 return;
302 } 306 }
303 } else { 307 } else {
304 sink._addError(error, stackTrace); 308 sink._addError(error, stackTrace);
305 } 309 }
306 } 310 }
307 } 311 }
308 312
309 313
310 class _TakeStream<T> extends _ForwardingStream<T, T> { 314 class _TakeStream<T> extends _ForwardingStream<T, T> {
311 int _remaining; 315 final int _count;
312 316
313 _TakeStream(Stream<T> source, int count) 317 _TakeStream(Stream<T> source, int count)
314 : this._remaining = count, super(source) { 318 : this._count = count, super(source) {
315 // This test is done early to avoid handling an async error 319 // This test is done early to avoid handling an async error
316 // in the _handleData method. 320 // in the _handleData method.
317 if (count is! int) throw new ArgumentError(count); 321 if (count is! int) throw new ArgumentError(count);
318 } 322 }
319 323
324 StreamSubscription<T> _createSubscription(
325 void onData(T data),
326 Function onError,
327 void onDone(),
328 bool cancelOnError) {
329 return new _StateStreamSubscription<T>(
330 this, onData, onError, onDone, cancelOnError, _count);
331 }
332
320 void _handleData(T inputEvent, _EventSink<T> sink) { 333 void _handleData(T inputEvent, _EventSink<T> sink) {
321 if (_remaining > 0) { 334 _StateStreamSubscription<T> subscription = sink;
335 int count = subscription._count;
336 if (count > 0) {
322 sink._add(inputEvent); 337 sink._add(inputEvent);
323 _remaining -= 1; 338 count -= 1;
324 if (_remaining == 0) { 339 subscription._count = count;
340 if (count == 0) {
325 // Closing also unsubscribes all subscribers, which unsubscribes 341 // Closing also unsubscribes all subscribers, which unsubscribes
326 // this from source. 342 // this from source.
327 sink._close(); 343 sink._close();
328 } 344 }
329 } 345 }
330 } 346 }
331 } 347 }
332 348
349 /**
350 * A [_ForwardingStreamSubscription] with one extra state field.
351 *
352 * Use by several different classes, some storing an integer, others a bool.
353 */
354 class _StateStreamSubscription<T> extends _ForwardingStreamSubscription<T, T> {
355 // Raw state field. Typed access provided by getters and setters below.
356 var _sharedState;
357
358 _StateStreamSubscription(_ForwardingStream<T, T> stream, void onData(T data),
359 Function onError, void onDone(),
360 bool cancelOnError, this._sharedState)
361 : super(stream, onData, onError, onDone, cancelOnError);
362
363 bool get _flag => _sharedState;
364 void set _flag(bool flag) { _sharedState = flag; }
365 int get _count => _sharedState;
366 void set _count(int count) { _sharedState = count; }
367 }
368
333 369
334 class _TakeWhileStream<T> extends _ForwardingStream<T, T> { 370 class _TakeWhileStream<T> extends _ForwardingStream<T, T> {
335 final _Predicate<T> _test; 371 final _Predicate<T> _test;
336 372
337 _TakeWhileStream(Stream<T> source, bool test(T value)) 373 _TakeWhileStream(Stream<T> source, bool test(T value))
338 : this._test = test, super(source); 374 : this._test = test, super(source);
339 375
340 void _handleData(T inputEvent, _EventSink<T> sink) { 376 void _handleData(T inputEvent, _EventSink<T> sink) {
341 bool satisfies; 377 bool satisfies;
342 try { 378 try {
343 satisfies = _test(inputEvent); 379 satisfies = _test(inputEvent);
344 } catch (e, s) { 380 } catch (e, s) {
345 _addErrorWithReplacement(sink, e, s); 381 _addErrorWithReplacement(sink, e, s);
346 // The test didn't say true. Didn't say false either, but we stop anyway. 382 // The test didn't say true. Didn't say false either, but we stop anyway.
347 sink._close(); 383 sink._close();
348 return; 384 return;
349 } 385 }
350 if (satisfies) { 386 if (satisfies) {
351 sink._add(inputEvent); 387 sink._add(inputEvent);
352 } else { 388 } else {
353 sink._close(); 389 sink._close();
354 } 390 }
355 } 391 }
356 } 392 }
357 393
358 class _SkipStream<T> extends _ForwardingStream<T, T> { 394 class _SkipStream<T> extends _ForwardingStream<T, T> {
359 int _remaining; 395 final int _count;
360 396
361 _SkipStream(Stream<T> source, int count) 397 _SkipStream(Stream<T> source, int count)
362 : this._remaining = count, super(source) { 398 : this._count = count, super(source) {
363 // This test is done early to avoid handling an async error 399 // This test is done early to avoid handling an async error
364 // in the _handleData method. 400 // in the _handleData method.
365 if (count is! int || count < 0) throw new ArgumentError(count); 401 if (count is! int || count < 0) throw new ArgumentError(count);
366 } 402 }
367 403
404 StreamSubscription<T> _createSubscription(
405 void onData(T data),
406 Function onError,
407 void onDone(),
408 bool cancelOnError) {
409 return new _StateStreamSubscription<T>(
410 this, onData, onError, onDone, cancelOnError, _count);
411 }
412
368 void _handleData(T inputEvent, _EventSink<T> sink) { 413 void _handleData(T inputEvent, _EventSink<T> sink) {
369 if (_remaining > 0) { 414 _StateStreamSubscription<T> subscription = sink;
370 _remaining--; 415 int count = subscription._count;
416 if (count > 0) {
417 subscription._count = count - 1;
371 return; 418 return;
372 } 419 }
373 sink._add(inputEvent); 420 sink._add(inputEvent);
374 } 421 }
375 } 422 }
376 423
377 class _SkipWhileStream<T> extends _ForwardingStream<T, T> { 424 class _SkipWhileStream<T> extends _ForwardingStream<T, T> {
378 final _Predicate<T> _test; 425 final _Predicate<T> _test;
379 bool _hasFailed = false;
380 426
381 _SkipWhileStream(Stream<T> source, bool test(T value)) 427 _SkipWhileStream(Stream<T> source, bool test(T value))
382 : this._test = test, super(source); 428 : this._test = test, super(source);
383 429
430 StreamSubscription<T> _createSubscription(
431 void onData(T data),
432 Function onError,
433 void onDone(),
434 bool cancelOnError) {
435 return new _StateStreamSubscription<T>(
436 this, onData, onError, onDone, cancelOnError, false);
437 }
438
384 void _handleData(T inputEvent, _EventSink<T> sink) { 439 void _handleData(T inputEvent, _EventSink<T> sink) {
385 if (_hasFailed) { 440 _StateStreamSubscription<T> subscription = sink;
441 bool hasFailed = subscription._flag;
442 if (hasFailed) {
386 sink._add(inputEvent); 443 sink._add(inputEvent);
387 return; 444 return;
388 } 445 }
389 bool satisfies; 446 bool satisfies;
390 try { 447 try {
391 satisfies = _test(inputEvent); 448 satisfies = _test(inputEvent);
392 } catch (e, s) { 449 } catch (e, s) {
393 _addErrorWithReplacement(sink, e, s); 450 _addErrorWithReplacement(sink, e, s);
394 // A failure to return a boolean is considered "not matching". 451 // A failure to return a boolean is considered "not matching".
395 _hasFailed = true; 452 subscription._flag = true;
396 return; 453 return;
397 } 454 }
398 if (!satisfies) { 455 if (!satisfies) {
399 _hasFailed = true; 456 subscription._flag = true;
400 sink._add(inputEvent); 457 sink._add(inputEvent);
401 } 458 }
402 } 459 }
403 } 460 }
404 461
405 typedef bool _Equality<T>(T a, T b); 462 typedef bool _Equality<T>(T a, T b);
406 463
407 class _DistinctStream<T> extends _ForwardingStream<T, T> { 464 class _DistinctStream<T> extends _ForwardingStream<T, T> {
408 static var _SENTINEL = new Object(); 465 static var _SENTINEL = new Object();
409 466
410 _Equality<T> _equals; 467 _Equality<T> _equals;
411 var _previous = _SENTINEL; 468 var _previous = _SENTINEL;
412 469
413 _DistinctStream(Stream<T> source, bool equals(T a, T b)) 470 _DistinctStream(Stream<T> source, bool equals(T a, T b))
414 : _equals = equals, super(source); 471 : _equals = equals, super(source);
415 472
416 void _handleData(T inputEvent, _EventSink<T> sink) { 473 void _handleData(T inputEvent, _EventSink<T> sink) {
417 if (identical(_previous, _SENTINEL)) { 474 if (identical(_previous, _SENTINEL)) {
418 _previous = inputEvent; 475 _previous = inputEvent;
419 return sink._add(inputEvent); 476 return sink._add(inputEvent);
420 } else { 477 } else {
421 bool isEqual; 478 bool isEqual;
422 try { 479 try {
423 if (_equals == null) { 480 if (_equals == null) {
424 isEqual = (_previous == inputEvent); 481 isEqual = (_previous == inputEvent);
425 } else { 482 } else {
426 isEqual = _equals(_previous, inputEvent); 483 isEqual = _equals(_previous as Object /*=T*/, inputEvent);
427 } 484 }
428 } catch (e, s) { 485 } catch (e, s) {
429 _addErrorWithReplacement(sink, e, s); 486 _addErrorWithReplacement(sink, e, s);
430 return null; 487 return null;
431 } 488 }
432 if (!isEqual) { 489 if (!isEqual) {
433 sink._add(inputEvent); 490 sink._add(inputEvent);
434 _previous = inputEvent; 491 _previous = inputEvent;
435 } 492 }
436 } 493 }
437 } 494 }
438 } 495 }
OLDNEW
« no previous file with comments | « tool/input_sdk/lib/async/stream_impl.dart ('k') | tool/input_sdk/lib/async/stream_transformers.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698