Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(6)

Unified Diff: sdk/lib/async/stream_pipe.dart

Issue 2885993005: Fix Stream.distinct. (Closed)
Patch Set: Created 3 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « sdk/lib/async/stream.dart ('k') | tests/lib/async/stream_distinct_test.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: sdk/lib/async/stream_pipe.dart
diff --git a/sdk/lib/async/stream_pipe.dart b/sdk/lib/async/stream_pipe.dart
index 73ed6b1839fe16d6a5639e60b9c987734e1605c9..6ccb2a67ca23e68820a0bf765622469b2e5ab889 100644
--- a/sdk/lib/async/stream_pipe.dart
+++ b/sdk/lib/async/stream_pipe.dart
@@ -338,7 +338,7 @@ class _TakeStream<T> extends _ForwardingStream<T, T> {
/**
* A [_ForwardingStreamSubscription] with one extra state field.
*
- * Use by several different classes, some storing an integer, others a bool.
+ * Use by several different classes, storing an integer, bool or general.
*/
class _StateStreamSubscription<T> extends _ForwardingStreamSubscription<T, T> {
// Raw state field. Typed access provided by getters and setters below.
@@ -357,6 +357,11 @@ class _StateStreamSubscription<T> extends _ForwardingStreamSubscription<T, T> {
void set _count(int count) {
_sharedState = count;
}
+
+ Object get _value => _sharedState;
+ void set _value(Object value) {
+ _sharedState = value;
+ }
}
class _TakeWhileStream<T> extends _ForwardingStream<T, T> {
@@ -453,32 +458,41 @@ typedef bool _Equality<T>(T a, T b);
class _DistinctStream<T> extends _ForwardingStream<T, T> {
static var _SENTINEL = new Object();
- _Equality<T> _equals;
- var _previous = _SENTINEL;
+ final _Equality<T> _equals;
_DistinctStream(Stream<T> source, bool equals(T a, T b))
: _equals = equals,
super(source);
+ StreamSubscription<T> _createSubscription(void onData(T data),
+ Function onError, void onDone(), bool cancelOnError) {
+ return new _StateStreamSubscription<T>(
+ this, onData, onError, onDone, cancelOnError, _SENTINEL);
+ }
+
void _handleData(T inputEvent, _EventSink<T> sink) {
- if (identical(_previous, _SENTINEL)) {
- _previous = inputEvent;
- return sink._add(inputEvent);
+ _StateStreamSubscription<T> subscription = sink;
+ var previous = subscription._value;
+ if (identical(previous, _SENTINEL)) {
+ // First event.
+ subscription._value = inputEvent;
+ sink._add(inputEvent);
} else {
+ T previousEvent = previous;
bool isEqual;
try {
if (_equals == null) {
- isEqual = (_previous == inputEvent);
+ isEqual = (previousEvent == inputEvent);
} else {
- isEqual = _equals(_previous as Object/*=T*/, inputEvent);
+ isEqual = _equals(previousEvent, inputEvent);
}
} catch (e, s) {
_addErrorWithReplacement(sink, e, s);
- return null;
+ return;
}
if (!isEqual) {
sink._add(inputEvent);
- _previous = inputEvent;
+ subscription._value = inputEvent;
}
}
}
« no previous file with comments | « sdk/lib/async/stream.dart ('k') | tests/lib/async/stream_distinct_test.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698