Index: sdk/lib/async/stream_pipe.dart |
diff --git a/sdk/lib/async/stream_pipe.dart b/sdk/lib/async/stream_pipe.dart |
index 73ed6b1839fe16d6a5639e60b9c987734e1605c9..6ccb2a67ca23e68820a0bf765622469b2e5ab889 100644 |
--- a/sdk/lib/async/stream_pipe.dart |
+++ b/sdk/lib/async/stream_pipe.dart |
@@ -338,7 +338,7 @@ class _TakeStream<T> extends _ForwardingStream<T, T> { |
/** |
* A [_ForwardingStreamSubscription] with one extra state field. |
* |
- * Use by several different classes, some storing an integer, others a bool. |
+ * Use by several different classes, storing an integer, bool or general. |
*/ |
class _StateStreamSubscription<T> extends _ForwardingStreamSubscription<T, T> { |
// Raw state field. Typed access provided by getters and setters below. |
@@ -357,6 +357,11 @@ class _StateStreamSubscription<T> extends _ForwardingStreamSubscription<T, T> { |
void set _count(int count) { |
_sharedState = count; |
} |
+ |
+ Object get _value => _sharedState; |
+ void set _value(Object value) { |
+ _sharedState = value; |
+ } |
} |
class _TakeWhileStream<T> extends _ForwardingStream<T, T> { |
@@ -453,32 +458,41 @@ typedef bool _Equality<T>(T a, T b); |
class _DistinctStream<T> extends _ForwardingStream<T, T> { |
static var _SENTINEL = new Object(); |
- _Equality<T> _equals; |
- var _previous = _SENTINEL; |
+ final _Equality<T> _equals; |
_DistinctStream(Stream<T> source, bool equals(T a, T b)) |
: _equals = equals, |
super(source); |
+ StreamSubscription<T> _createSubscription(void onData(T data), |
+ Function onError, void onDone(), bool cancelOnError) { |
+ return new _StateStreamSubscription<T>( |
+ this, onData, onError, onDone, cancelOnError, _SENTINEL); |
+ } |
+ |
void _handleData(T inputEvent, _EventSink<T> sink) { |
- if (identical(_previous, _SENTINEL)) { |
- _previous = inputEvent; |
- return sink._add(inputEvent); |
+ _StateStreamSubscription<T> subscription = sink; |
+ var previous = subscription._value; |
+ if (identical(previous, _SENTINEL)) { |
+ // First event. |
+ subscription._value = inputEvent; |
+ sink._add(inputEvent); |
} else { |
+ T previousEvent = previous; |
bool isEqual; |
try { |
if (_equals == null) { |
- isEqual = (_previous == inputEvent); |
+ isEqual = (previousEvent == inputEvent); |
} else { |
- isEqual = _equals(_previous as Object/*=T*/, inputEvent); |
+ isEqual = _equals(previousEvent, inputEvent); |
} |
} catch (e, s) { |
_addErrorWithReplacement(sink, e, s); |
- return null; |
+ return; |
} |
if (!isEqual) { |
sink._add(inputEvent); |
- _previous = inputEvent; |
+ subscription._value = inputEvent; |
} |
} |
} |