Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(13)

Unified Diff: tool/input_sdk/lib/async/stream_pipe.dart

Issue 1953153002: Update dart:async to match the Dart repo. (Closed) Base URL: https://github.com/dart-lang/dev_compiler.git@master
Patch Set: Remove unneeded calls. Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « tool/input_sdk/lib/async/stream_impl.dart ('k') | tool/input_sdk/lib/async/stream_transformers.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: tool/input_sdk/lib/async/stream_pipe.dart
diff --git a/tool/input_sdk/lib/async/stream_pipe.dart b/tool/input_sdk/lib/async/stream_pipe.dart
index 3683719062d332df6676ca6cca5a0f7cf5ed5792..1125620aacb3b7b445c3745e8a079c725a980f37 100644
--- a/tool/input_sdk/lib/async/stream_pipe.dart
+++ b/tool/input_sdk/lib/async/stream_pipe.dart
@@ -47,10 +47,15 @@ void _cancelAndErrorWithReplacement(StreamSubscription subscription,
_cancelAndError(subscription, future, error, stackTrace);
}
+typedef void _ErrorCallback(error, StackTrace stackTrace);
+
/** Helper function to make an onError argument to [_runUserCode]. */
-_cancelAndErrorClosure(StreamSubscription subscription, _Future future) =>
- ((error, StackTrace stackTrace) => _cancelAndError(
- subscription, future, error, stackTrace));
+_ErrorCallback _cancelAndErrorClosure(
+ StreamSubscription subscription, _Future future) {
+ return (error, StackTrace stackTrace) {
+ _cancelAndError(subscription, future, error, stackTrace);
+ };
+}
/** Helper function to cancel a subscription and wait for the potential future,
before completing with a value. */
@@ -100,8 +105,7 @@ abstract class _ForwardingStream<S, T> extends Stream<T> {
// Override the following methods in subclasses to change the behavior.
void _handleData(S data, _EventSink<T> sink) {
- dynamic outputData = data;
- sink._add(outputData);
+ sink._add(data as Object /*=T*/);
}
void _handleError(error, StackTrace stackTrace, _EventSink<T> sink) {
@@ -161,7 +165,7 @@ class _ForwardingStreamSubscription<S, T>
if (_subscription != null) {
StreamSubscription subscription = _subscription;
_subscription = null;
- subscription.cancel();
+ return subscription.cancel();
}
return null;
}
@@ -224,7 +228,7 @@ typedef T _Transformation<S, T>(S value);
* A stream pipe that converts data events before passing them on.
*/
class _MapStream<S, T> extends _ForwardingStream<S, T> {
- final _Transformation _transform;
+ final _Transformation<S, T> _transform;
_MapStream(Stream<S> source, T transform(S event))
: this._transform = transform, super(source);
@@ -308,20 +312,32 @@ class _HandleErrorStream<T> extends _ForwardingStream<T, T> {
class _TakeStream<T> extends _ForwardingStream<T, T> {
- int _remaining;
+ final int _count;
_TakeStream(Stream<T> source, int count)
- : this._remaining = count, super(source) {
+ : this._count = count, super(source) {
// This test is done early to avoid handling an async error
// in the _handleData method.
if (count is! int) throw new ArgumentError(count);
}
+ StreamSubscription<T> _createSubscription(
+ void onData(T data),
+ Function onError,
+ void onDone(),
+ bool cancelOnError) {
+ return new _StateStreamSubscription<T>(
+ this, onData, onError, onDone, cancelOnError, _count);
+ }
+
void _handleData(T inputEvent, _EventSink<T> sink) {
- if (_remaining > 0) {
+ _StateStreamSubscription<T> subscription = sink;
+ int count = subscription._count;
+ if (count > 0) {
sink._add(inputEvent);
- _remaining -= 1;
- if (_remaining == 0) {
+ count -= 1;
+ subscription._count = count;
+ if (count == 0) {
// Closing also unsubscribes all subscribers, which unsubscribes
// this from source.
sink._close();
@@ -330,6 +346,26 @@ class _TakeStream<T> extends _ForwardingStream<T, T> {
}
}
+/**
+ * A [_ForwardingStreamSubscription] with one extra state field.
+ *
+ * Use by several different classes, some storing an integer, others a bool.
+ */
+class _StateStreamSubscription<T> extends _ForwardingStreamSubscription<T, T> {
+ // Raw state field. Typed access provided by getters and setters below.
+ var _sharedState;
+
+ _StateStreamSubscription(_ForwardingStream<T, T> stream, void onData(T data),
+ Function onError, void onDone(),
+ bool cancelOnError, this._sharedState)
+ : super(stream, onData, onError, onDone, cancelOnError);
+
+ bool get _flag => _sharedState;
+ void set _flag(bool flag) { _sharedState = flag; }
+ int get _count => _sharedState;
+ void set _count(int count) { _sharedState = count; }
+}
+
class _TakeWhileStream<T> extends _ForwardingStream<T, T> {
final _Predicate<T> _test;
@@ -356,18 +392,29 @@ class _TakeWhileStream<T> extends _ForwardingStream<T, T> {
}
class _SkipStream<T> extends _ForwardingStream<T, T> {
- int _remaining;
+ final int _count;
_SkipStream(Stream<T> source, int count)
- : this._remaining = count, super(source) {
+ : this._count = count, super(source) {
// This test is done early to avoid handling an async error
// in the _handleData method.
if (count is! int || count < 0) throw new ArgumentError(count);
}
+ StreamSubscription<T> _createSubscription(
+ void onData(T data),
+ Function onError,
+ void onDone(),
+ bool cancelOnError) {
+ return new _StateStreamSubscription<T>(
+ this, onData, onError, onDone, cancelOnError, _count);
+ }
+
void _handleData(T inputEvent, _EventSink<T> sink) {
- if (_remaining > 0) {
- _remaining--;
+ _StateStreamSubscription<T> subscription = sink;
+ int count = subscription._count;
+ if (count > 0) {
+ subscription._count = count - 1;
return;
}
sink._add(inputEvent);
@@ -376,13 +423,23 @@ class _SkipStream<T> extends _ForwardingStream<T, T> {
class _SkipWhileStream<T> extends _ForwardingStream<T, T> {
final _Predicate<T> _test;
- bool _hasFailed = false;
_SkipWhileStream(Stream<T> source, bool test(T value))
: this._test = test, super(source);
+ StreamSubscription<T> _createSubscription(
+ void onData(T data),
+ Function onError,
+ void onDone(),
+ bool cancelOnError) {
+ return new _StateStreamSubscription<T>(
+ this, onData, onError, onDone, cancelOnError, false);
+ }
+
void _handleData(T inputEvent, _EventSink<T> sink) {
- if (_hasFailed) {
+ _StateStreamSubscription<T> subscription = sink;
+ bool hasFailed = subscription._flag;
+ if (hasFailed) {
sink._add(inputEvent);
return;
}
@@ -392,11 +449,11 @@ class _SkipWhileStream<T> extends _ForwardingStream<T, T> {
} catch (e, s) {
_addErrorWithReplacement(sink, e, s);
// A failure to return a boolean is considered "not matching".
- _hasFailed = true;
+ subscription._flag = true;
return;
}
if (!satisfies) {
- _hasFailed = true;
+ subscription._flag = true;
sink._add(inputEvent);
}
}
@@ -423,7 +480,7 @@ class _DistinctStream<T> extends _ForwardingStream<T, T> {
if (_equals == null) {
isEqual = (_previous == inputEvent);
} else {
- isEqual = _equals(_previous, inputEvent);
+ isEqual = _equals(_previous as Object /*=T*/, inputEvent);
}
} catch (e, s) {
_addErrorWithReplacement(sink, e, s);
« no previous file with comments | « tool/input_sdk/lib/async/stream_impl.dart ('k') | tool/input_sdk/lib/async/stream_transformers.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698