| OLD | NEW |
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 /** Runs user code and takes actions depending on success or failure. */ | 7 /** Runs user code and takes actions depending on success or failure. */ |
| 8 _runUserCode<T>( | 8 _runUserCode<T>( |
| 9 T userCode(), onSuccess(T value), onError(error, StackTrace stackTrace)) { | 9 T userCode(), onSuccess(T value), onError(error, StackTrace stackTrace)) { |
| 10 try { | 10 try { |
| (...skipping 320 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 331 // this from source. | 331 // this from source. |
| 332 sink._close(); | 332 sink._close(); |
| 333 } | 333 } |
| 334 } | 334 } |
| 335 } | 335 } |
| 336 } | 336 } |
| 337 | 337 |
| 338 /** | 338 /** |
| 339 * A [_ForwardingStreamSubscription] with one extra state field. | 339 * A [_ForwardingStreamSubscription] with one extra state field. |
| 340 * | 340 * |
| 341 * Use by several different classes, some storing an integer, others a bool. | 341 * Use by several different classes, storing an integer, bool or general. |
| 342 */ | 342 */ |
| 343 class _StateStreamSubscription<T> extends _ForwardingStreamSubscription<T, T> { | 343 class _StateStreamSubscription<T> extends _ForwardingStreamSubscription<T, T> { |
| 344 // Raw state field. Typed access provided by getters and setters below. | 344 // Raw state field. Typed access provided by getters and setters below. |
| 345 var _sharedState; | 345 var _sharedState; |
| 346 | 346 |
| 347 _StateStreamSubscription(_ForwardingStream<T, T> stream, void onData(T data), | 347 _StateStreamSubscription(_ForwardingStream<T, T> stream, void onData(T data), |
| 348 Function onError, void onDone(), bool cancelOnError, this._sharedState) | 348 Function onError, void onDone(), bool cancelOnError, this._sharedState) |
| 349 : super(stream, onData, onError, onDone, cancelOnError); | 349 : super(stream, onData, onError, onDone, cancelOnError); |
| 350 | 350 |
| 351 bool get _flag => _sharedState; | 351 bool get _flag => _sharedState; |
| 352 void set _flag(bool flag) { | 352 void set _flag(bool flag) { |
| 353 _sharedState = flag; | 353 _sharedState = flag; |
| 354 } | 354 } |
| 355 | 355 |
| 356 int get _count => _sharedState; | 356 int get _count => _sharedState; |
| 357 void set _count(int count) { | 357 void set _count(int count) { |
| 358 _sharedState = count; | 358 _sharedState = count; |
| 359 } | 359 } |
| 360 |
| 361 Object get _value => _sharedState; |
| 362 void set _value(Object value) { |
| 363 _sharedState = value; |
| 364 } |
| 360 } | 365 } |
| 361 | 366 |
| 362 class _TakeWhileStream<T> extends _ForwardingStream<T, T> { | 367 class _TakeWhileStream<T> extends _ForwardingStream<T, T> { |
| 363 final _Predicate<T> _test; | 368 final _Predicate<T> _test; |
| 364 | 369 |
| 365 _TakeWhileStream(Stream<T> source, bool test(T value)) | 370 _TakeWhileStream(Stream<T> source, bool test(T value)) |
| 366 : this._test = test, | 371 : this._test = test, |
| 367 super(source); | 372 super(source); |
| 368 | 373 |
| 369 void _handleData(T inputEvent, _EventSink<T> sink) { | 374 void _handleData(T inputEvent, _EventSink<T> sink) { |
| (...skipping 76 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 446 sink._add(inputEvent); | 451 sink._add(inputEvent); |
| 447 } | 452 } |
| 448 } | 453 } |
| 449 } | 454 } |
| 450 | 455 |
| 451 typedef bool _Equality<T>(T a, T b); | 456 typedef bool _Equality<T>(T a, T b); |
| 452 | 457 |
| 453 class _DistinctStream<T> extends _ForwardingStream<T, T> { | 458 class _DistinctStream<T> extends _ForwardingStream<T, T> { |
| 454 static var _SENTINEL = new Object(); | 459 static var _SENTINEL = new Object(); |
| 455 | 460 |
| 456 _Equality<T> _equals; | 461 final _Equality<T> _equals; |
| 457 var _previous = _SENTINEL; | |
| 458 | 462 |
| 459 _DistinctStream(Stream<T> source, bool equals(T a, T b)) | 463 _DistinctStream(Stream<T> source, bool equals(T a, T b)) |
| 460 : _equals = equals, | 464 : _equals = equals, |
| 461 super(source); | 465 super(source); |
| 462 | 466 |
| 467 StreamSubscription<T> _createSubscription(void onData(T data), |
| 468 Function onError, void onDone(), bool cancelOnError) { |
| 469 return new _StateStreamSubscription<T>( |
| 470 this, onData, onError, onDone, cancelOnError, _SENTINEL); |
| 471 } |
| 472 |
| 463 void _handleData(T inputEvent, _EventSink<T> sink) { | 473 void _handleData(T inputEvent, _EventSink<T> sink) { |
| 464 if (identical(_previous, _SENTINEL)) { | 474 _StateStreamSubscription<T> subscription = sink; |
| 465 _previous = inputEvent; | 475 var previous = subscription._value; |
| 466 return sink._add(inputEvent); | 476 if (identical(previous, _SENTINEL)) { |
| 477 // First event. |
| 478 subscription._value = inputEvent; |
| 479 sink._add(inputEvent); |
| 467 } else { | 480 } else { |
| 481 T previousEvent = previous; |
| 468 bool isEqual; | 482 bool isEqual; |
| 469 try { | 483 try { |
| 470 if (_equals == null) { | 484 if (_equals == null) { |
| 471 isEqual = (_previous == inputEvent); | 485 isEqual = (previousEvent == inputEvent); |
| 472 } else { | 486 } else { |
| 473 isEqual = _equals(_previous as Object/*=T*/, inputEvent); | 487 isEqual = _equals(previousEvent, inputEvent); |
| 474 } | 488 } |
| 475 } catch (e, s) { | 489 } catch (e, s) { |
| 476 _addErrorWithReplacement(sink, e, s); | 490 _addErrorWithReplacement(sink, e, s); |
| 477 return null; | 491 return; |
| 478 } | 492 } |
| 479 if (!isEqual) { | 493 if (!isEqual) { |
| 480 sink._add(inputEvent); | 494 sink._add(inputEvent); |
| 481 _previous = inputEvent; | 495 subscription._value = inputEvent; |
| 482 } | 496 } |
| 483 } | 497 } |
| 484 } | 498 } |
| 485 } | 499 } |
| OLD | NEW |