Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(10)

Unified Diff: sdk/lib/async/stream_pipe.dart

Issue 2754013002: Format all dart: library files (Closed)
Patch Set: Format all dart: library files Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « sdk/lib/async/stream_impl.dart ('k') | sdk/lib/async/stream_transformers.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: sdk/lib/async/stream_pipe.dart
diff --git a/sdk/lib/async/stream_pipe.dart b/sdk/lib/async/stream_pipe.dart
index 643d14227afca6b41cda51b7ede42bbf8ae8b465..12063bd928b65cc24970c571c2ad020746740666 100644
--- a/sdk/lib/async/stream_pipe.dart
+++ b/sdk/lib/async/stream_pipe.dart
@@ -5,9 +5,8 @@
part of dart.async;
/** Runs user code and takes actions depending on success or failure. */
-_runUserCode(userCode(),
- onSuccess(value),
- onError(error, StackTrace stackTrace)) {
+_runUserCode(
+ userCode(), onSuccess(value), onError(error, StackTrace stackTrace)) {
try {
onSuccess(userCode());
} catch (e, s) {
@@ -24,10 +23,8 @@ _runUserCode(userCode(),
/** Helper function to cancel a subscription and wait for the potential future,
before completing with an error. */
-void _cancelAndError(StreamSubscription subscription,
- _Future future,
- error,
- StackTrace stackTrace) {
+void _cancelAndError(StreamSubscription subscription, _Future future, error,
+ StackTrace stackTrace) {
var cancelFuture = subscription.cancel();
if (cancelFuture is Future && !identical(cancelFuture, Future._nullFuture)) {
cancelFuture.whenComplete(() => future._completeError(error, stackTrace));
@@ -37,8 +34,7 @@ void _cancelAndError(StreamSubscription subscription,
}
void _cancelAndErrorWithReplacement(StreamSubscription subscription,
- _Future future,
- error, StackTrace stackTrace) {
+ _Future future, error, StackTrace stackTrace) {
AsyncError replacement = Zone.current.errorCallback(error, stackTrace);
if (replacement != null) {
error = _nonNullError(replacement.error);
@@ -68,7 +64,6 @@ void _cancelAndValue(StreamSubscription subscription, _Future future, value) {
}
}
-
/**
* A [Stream] that forwards subscriptions to another stream.
*
@@ -86,18 +81,13 @@ abstract class _ForwardingStream<S, T> extends Stream<T> {
bool get isBroadcast => _source.isBroadcast;
StreamSubscription<T> listen(void onData(T value),
- { Function onError,
- void onDone(),
- bool cancelOnError }) {
+ {Function onError, void onDone(), bool cancelOnError}) {
cancelOnError = identical(true, cancelOnError);
return _createSubscription(onData, onError, onDone, cancelOnError);
}
- StreamSubscription<T> _createSubscription(
- void onData(T data),
- Function onError,
- void onDone(),
- bool cancelOnError) {
+ StreamSubscription<T> _createSubscription(void onData(T data),
+ Function onError, void onDone(), bool cancelOnError) {
return new _ForwardingStreamSubscription<S, T>(
this, onData, onError, onDone, cancelOnError);
}
@@ -105,7 +95,7 @@ abstract class _ForwardingStream<S, T> extends Stream<T> {
// Override the following methods in subclasses to change the behavior.
void _handleData(S data, _EventSink<T> sink) {
- sink._add(data as Object /*=T*/);
+ sink._add(data as Object/*=T*/);
}
void _handleError(error, StackTrace stackTrace, _EventSink<T> sink) {
@@ -127,12 +117,10 @@ class _ForwardingStreamSubscription<S, T>
StreamSubscription<S> _subscription;
_ForwardingStreamSubscription(this._stream, void onData(T data),
- Function onError, void onDone(),
- bool cancelOnError)
+ Function onError, void onDone(), bool cancelOnError)
: super(onData, onError, onDone, cancelOnError) {
- _subscription = _stream._source.listen(_handleData,
- onError: _handleError,
- onDone: _handleDone);
+ _subscription = _stream._source
+ .listen(_handleData, onError: _handleError, onDone: _handleDone);
}
// _StreamSink interface.
@@ -200,12 +188,12 @@ void _addErrorWithReplacement(_EventSink sink, error, stackTrace) {
sink._addError(error, stackTrace);
}
-
class _WhereStream<T> extends _ForwardingStream<T, T> {
final _Predicate<T> _test;
_WhereStream(Stream<T> source, bool test(T value))
- : _test = test, super(source);
+ : _test = test,
+ super(source);
void _handleData(T inputEvent, _EventSink<T> sink) {
bool satisfies;
@@ -221,7 +209,6 @@ class _WhereStream<T> extends _ForwardingStream<T, T> {
}
}
-
typedef T _Transformation<S, T>(S value);
/**
@@ -231,7 +218,8 @@ class _MapStream<S, T> extends _ForwardingStream<S, T> {
final _Transformation<S, T> _transform;
_MapStream(Stream<S> source, T transform(S event))
- : this._transform = transform, super(source);
+ : this._transform = transform,
+ super(source);
void _handleData(S inputEvent, _EventSink<T> sink) {
T outputEvent;
@@ -252,7 +240,8 @@ class _ExpandStream<S, T> extends _ForwardingStream<S, T> {
final _Transformation<S, Iterable<T>> _expand;
_ExpandStream(Stream<S> source, Iterable<T> expand(S event))
- : this._expand = expand, super(source);
+ : this._expand = expand,
+ super(source);
void _handleData(S inputEvent, _EventSink<T> sink) {
try {
@@ -267,7 +256,6 @@ class _ExpandStream<S, T> extends _ForwardingStream<S, T> {
}
}
-
typedef bool _ErrorTest(error);
/**
@@ -278,10 +266,10 @@ class _HandleErrorStream<T> extends _ForwardingStream<T, T> {
final Function _transform;
final _ErrorTest _test;
- _HandleErrorStream(Stream<T> source,
- Function onError,
- bool test(error))
- : this._transform = onError, this._test = test, super(source);
+ _HandleErrorStream(Stream<T> source, Function onError, bool test(error))
+ : this._transform = onError,
+ this._test = test,
+ super(source);
void _handleError(Object error, StackTrace stackTrace, _EventSink<T> sink) {
bool matches = true;
@@ -310,22 +298,19 @@ class _HandleErrorStream<T> extends _ForwardingStream<T, T> {
}
}
-
class _TakeStream<T> extends _ForwardingStream<T, T> {
final int _count;
_TakeStream(Stream<T> source, int count)
- : this._count = count, super(source) {
+ : this._count = count,
+ super(source) {
// This test is done early to avoid handling an async error
// in the _handleData method.
if (count is! int) throw new ArgumentError(count);
}
- StreamSubscription<T> _createSubscription(
- void onData(T data),
- Function onError,
- void onDone(),
- bool cancelOnError) {
+ StreamSubscription<T> _createSubscription(void onData(T data),
+ Function onError, void onDone(), bool cancelOnError) {
if (_count == 0) {
_source.listen(null).cancel();
return new _DoneStreamSubscription<T>(onDone);
@@ -360,22 +345,26 @@ class _StateStreamSubscription<T> extends _ForwardingStreamSubscription<T, T> {
var _sharedState;
_StateStreamSubscription(_ForwardingStream<T, T> stream, void onData(T data),
- Function onError, void onDone(),
- bool cancelOnError, this._sharedState)
+ Function onError, void onDone(), bool cancelOnError, this._sharedState)
: super(stream, onData, onError, onDone, cancelOnError);
bool get _flag => _sharedState;
- void set _flag(bool flag) { _sharedState = flag; }
+ void set _flag(bool flag) {
+ _sharedState = flag;
+ }
+
int get _count => _sharedState;
- void set _count(int count) { _sharedState = count; }
+ void set _count(int count) {
+ _sharedState = count;
+ }
}
-
class _TakeWhileStream<T> extends _ForwardingStream<T, T> {
final _Predicate<T> _test;
_TakeWhileStream(Stream<T> source, bool test(T value))
- : this._test = test, super(source);
+ : this._test = test,
+ super(source);
void _handleData(T inputEvent, _EventSink<T> sink) {
bool satisfies;
@@ -399,17 +388,15 @@ class _SkipStream<T> extends _ForwardingStream<T, T> {
final int _count;
_SkipStream(Stream<T> source, int count)
- : this._count = count, super(source) {
+ : this._count = count,
+ super(source) {
// This test is done early to avoid handling an async error
// in the _handleData method.
if (count is! int || count < 0) throw new ArgumentError(count);
}
- StreamSubscription<T> _createSubscription(
- void onData(T data),
- Function onError,
- void onDone(),
- bool cancelOnError) {
+ StreamSubscription<T> _createSubscription(void onData(T data),
+ Function onError, void onDone(), bool cancelOnError) {
return new _StateStreamSubscription<T>(
this, onData, onError, onDone, cancelOnError, _count);
}
@@ -429,13 +416,11 @@ class _SkipWhileStream<T> extends _ForwardingStream<T, T> {
final _Predicate<T> _test;
_SkipWhileStream(Stream<T> source, bool test(T value))
- : this._test = test, super(source);
+ : this._test = test,
+ super(source);
- StreamSubscription<T> _createSubscription(
- void onData(T data),
- Function onError,
- void onDone(),
- bool cancelOnError) {
+ StreamSubscription<T> _createSubscription(void onData(T data),
+ Function onError, void onDone(), bool cancelOnError) {
return new _StateStreamSubscription<T>(
this, onData, onError, onDone, cancelOnError, false);
}
@@ -472,7 +457,8 @@ class _DistinctStream<T> extends _ForwardingStream<T, T> {
var _previous = _SENTINEL;
_DistinctStream(Stream<T> source, bool equals(T a, T b))
- : _equals = equals, super(source);
+ : _equals = equals,
+ super(source);
void _handleData(T inputEvent, _EventSink<T> sink) {
if (identical(_previous, _SENTINEL)) {
@@ -484,7 +470,7 @@ class _DistinctStream<T> extends _ForwardingStream<T, T> {
if (_equals == null) {
isEqual = (_previous == inputEvent);
} else {
- isEqual = _equals(_previous as Object /*=T*/, inputEvent);
+ isEqual = _equals(_previous as Object/*=T*/, inputEvent);
}
} catch (e, s) {
_addErrorWithReplacement(sink, e, s);
« no previous file with comments | « sdk/lib/async/stream_impl.dart ('k') | sdk/lib/async/stream_transformers.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698