Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(69)

Unified Diff: sdk/lib/async/stream_controller.dart

Issue 2754013002: Format all dart: library files (Closed)
Patch Set: Format all dart: library files Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « sdk/lib/async/stream.dart ('k') | sdk/lib/async/stream_impl.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: sdk/lib/async/stream_controller.dart
diff --git a/sdk/lib/async/stream_controller.dart b/sdk/lib/async/stream_controller.dart
index d29fae354464dc064baf62cd8626f49040f1ecd9..4b88281fd0fe2f605f4553511af567649d53a4ea 100644
--- a/sdk/lib/async/stream_controller.dart
+++ b/sdk/lib/async/stream_controller.dart
@@ -91,14 +91,15 @@ abstract class StreamController<T> implements StreamSink<T> {
* If the stream is canceled before the controller needs new data the
* [onResume] call might not be executed.
*/
- factory StreamController({void onListen(),
- void onPause(),
- void onResume(),
- onCancel(),
- bool sync: false}) {
+ factory StreamController(
+ {void onListen(),
+ void onPause(),
+ void onResume(),
+ onCancel(),
+ bool sync: false}) {
return sync
- ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel)
- : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel);
+ ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel)
+ : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel);
}
/**
@@ -152,9 +153,8 @@ abstract class StreamController<T> implements StreamSink<T> {
* If a listener is added again later, after the [onCancel] was called,
* the [onListen] will be called again.
*/
- factory StreamController.broadcast({void onListen(),
- void onCancel(),
- bool sync: false}) {
+ factory StreamController.broadcast(
+ {void onListen(), void onCancel(), bool sync: false}) {
return sync
? new _SyncBroadcastStreamController<T>(onListen, onCancel)
: new _AsyncBroadcastStreamController<T>(onListen, onCancel);
@@ -262,7 +262,6 @@ abstract class StreamController<T> implements StreamSink<T> {
Future addStream(Stream<T> source, {bool cancelOnError: true});
}
-
/**
* A stream controller that delivers its events synchronously.
*
@@ -362,10 +361,7 @@ abstract class SynchronousStreamController<T> implements StreamController<T> {
abstract class _StreamControllerLifecycle<T> {
StreamSubscription<T> _subscribe(
- void onData(T data),
- Function onError,
- void onDone(),
- bool cancelOnError);
+ void onData(T data), Function onError, void onDone(), bool cancelOnError);
void _recordPause(StreamSubscription<T> subscription) {}
void _recordResume(StreamSubscription<T> subscription) {}
Future _recordCancel(StreamSubscription<T> subscription) => null;
@@ -376,10 +372,12 @@ abstract class _StreamControllerLifecycle<T> {
*
* Controls a stream that only supports a single controller.
*/
-abstract class _StreamController<T> implements StreamController<T>,
- _StreamControllerLifecycle<T>,
- _EventSink<T>,
- _EventDispatch<T> {
+abstract class _StreamController<T>
+ implements
+ StreamController<T>,
+ _StreamControllerLifecycle<T>,
+ _EventSink<T>,
+ _EventDispatch<T> {
// The states are bit-flags. More than one can be set at a time.
//
// The "subscription state" goes through the states:
@@ -450,10 +448,7 @@ abstract class _StreamController<T> implements StreamController<T>,
ControllerCallback onResume;
ControllerCancelCallback onCancel;
- _StreamController(this.onListen,
- this.onPause,
- this.onResume,
- this.onCancel);
+ _StreamController(this.onListen, this.onPause, this.onResume, this.onCancel);
// Return a new stream every time. The streams are equal, but not identical.
Stream<T> get stream => new _ControllerStream<T>(this);
@@ -479,8 +474,8 @@ abstract class _StreamController<T> implements StreamController<T>,
bool get isClosed => (_state & _STATE_CLOSED) != 0;
- bool get isPaused => hasListener ? _subscription._isInputPaused
- : !_isCanceled;
+ bool get isPaused =>
+ hasListener ? _subscription._isInputPaused : !_isCanceled;
bool get _isAddingStream => (_state & _STATE_ADDSTREAM) != 0;
@@ -497,11 +492,11 @@ abstract class _StreamController<T> implements StreamController<T>,
_PendingEvents<T> get _pendingEvents {
assert(_isInitialState);
if (!_isAddingStream) {
- return _varData as Object /*=_PendingEvents<T>*/;
+ return _varData as Object/*=_PendingEvents<T>*/;
}
_StreamControllerAddStreamState<T> state =
- _varData as Object /*=_StreamControllerAddStreamState<T>*/;
- return state.varData as Object /*=_PendingEvents<T>*/;
+ _varData as Object/*=_StreamControllerAddStreamState<T>*/;
+ return state.varData as Object/*=_PendingEvents<T>*/;
}
// Returns the pending events, and creates the object if necessary.
@@ -509,12 +504,12 @@ abstract class _StreamController<T> implements StreamController<T>,
assert(_isInitialState);
if (!_isAddingStream) {
if (_varData == null) _varData = new _StreamImplEvents<T>();
- return _varData as Object /*=_StreamImplEvents<T>*/;
+ return _varData as Object/*=_StreamImplEvents<T>*/;
}
_StreamControllerAddStreamState<T> state =
- _varData as Object /*=_StreamControllerAddStreamState<T>*/;
+ _varData as Object/*=_StreamControllerAddStreamState<T>*/;
if (state.varData == null) state.varData = new _StreamImplEvents<T>();
- return state.varData as Object /*=_StreamImplEvents<T>*/;
+ return state.varData as Object/*=_StreamImplEvents<T>*/;
}
// Get the current subscription.
@@ -524,10 +519,10 @@ abstract class _StreamController<T> implements StreamController<T>,
assert(hasListener);
if (_isAddingStream) {
_StreamControllerAddStreamState<T> addState =
- _varData as Object /*=_StreamControllerAddStreamState<T>*/;
- return addState.varData as Object /*=_ControllerSubscription<T>*/;
+ _varData as Object/*=_StreamControllerAddStreamState<T>*/;
+ return addState.varData as Object/*=_ControllerSubscription<T>*/;
}
- return _varData as Object /*=_ControllerSubscription<T>*/;
+ return _varData as Object/*=_ControllerSubscription<T>*/;
}
/**
@@ -548,10 +543,8 @@ abstract class _StreamController<T> implements StreamController<T>,
if (!_mayAddEvent) throw _badEventState();
if (_isCanceled) return new _Future.immediate(null);
_StreamControllerAddStreamState<T> addState =
- new _StreamControllerAddStreamState<T>(this,
- _varData,
- source,
- cancelOnError);
+ new _StreamControllerAddStreamState<T>(
+ this, _varData, source, cancelOnError);
_varData = addState;
_state |= _STATE_ADDSTREAM;
return addState.addStreamFuture;
@@ -650,7 +643,7 @@ abstract class _StreamController<T> implements StreamController<T>,
// End of addStream stream.
assert(_isAddingStream);
_StreamControllerAddStreamState<T> addState =
- _varData as Object /*=_StreamControllerAddStreamState<T>*/;
+ _varData as Object/*=_StreamControllerAddStreamState<T>*/;
_varData = addState.varData;
_state &= ~_STATE_ADDSTREAM;
addState.complete();
@@ -658,23 +651,19 @@ abstract class _StreamController<T> implements StreamController<T>,
// _StreamControllerLifeCycle interface
- StreamSubscription<T> _subscribe(
- void onData(T data),
- Function onError,
- void onDone(),
- bool cancelOnError) {
+ StreamSubscription<T> _subscribe(void onData(T data), Function onError,
+ void onDone(), bool cancelOnError) {
if (!_isInitialState) {
throw new StateError("Stream has already been listened to.");
}
- _ControllerSubscription<T> subscription =
- new _ControllerSubscription<T>(this, onData, onError, onDone,
- cancelOnError);
+ _ControllerSubscription<T> subscription = new _ControllerSubscription<T>(
+ this, onData, onError, onDone, cancelOnError);
_PendingEvents<T> pendingEvents = _pendingEvents;
_state |= _STATE_SUBSCRIBED;
if (_isAddingStream) {
_StreamControllerAddStreamState<T> addState =
- _varData as Object /*=_StreamControllerAddStreamState<T>*/;
+ _varData as Object/*=_StreamControllerAddStreamState<T>*/;
addState.varData = subscription;
addState.resume();
} else {
@@ -700,7 +689,7 @@ abstract class _StreamController<T> implements StreamController<T>,
Future result;
if (_isAddingStream) {
_StreamControllerAddStreamState<T> addState =
- _varData as Object /*=_StreamControllerAddStreamState<T>*/;
+ _varData as Object/*=_StreamControllerAddStreamState<T>*/;
result = addState.cancel();
}
_varData = null;
@@ -743,7 +732,7 @@ abstract class _StreamController<T> implements StreamController<T>,
void _recordPause(StreamSubscription<T> subscription) {
if (_isAddingStream) {
_StreamControllerAddStreamState<T> addState =
- _varData as Object /*=_StreamControllerAddStreamState<T>*/;
+ _varData as Object/*=_StreamControllerAddStreamState<T>*/;
addState.pause();
}
_runGuarded(onPause);
@@ -752,7 +741,7 @@ abstract class _StreamController<T> implements StreamController<T>,
void _recordResume(StreamSubscription<T> subscription) {
if (_isAddingStream) {
_StreamControllerAddStreamState<T> addState =
- _varData as Object /*=_StreamControllerAddStreamState<T>*/;
+ _varData as Object/*=_StreamControllerAddStreamState<T>*/;
addState.resume();
}
_runGuarded(onResume);
@@ -796,10 +785,10 @@ abstract class _AsyncStreamControllerDispatch<T>
// constructors in mixin superclasses.
class _AsyncStreamController<T> = _StreamController<T>
- with _AsyncStreamControllerDispatch<T>;
+ with _AsyncStreamControllerDispatch<T>;
class _SyncStreamController<T> = _StreamController<T>
- with _SyncStreamControllerDispatch<T>;
+ with _SyncStreamControllerDispatch<T>;
typedef _NotificationHandler();
@@ -819,12 +808,9 @@ class _ControllerStream<T> extends _StreamImpl<T> {
_ControllerStream(this._controller);
- StreamSubscription<T> _createSubscription(
- void onData(T data),
- Function onError,
- void onDone(),
- bool cancelOnError) =>
- _controller._subscribe(onData, onError, onDone, cancelOnError);
+ StreamSubscription<T> _createSubscription(void onData(T data),
+ Function onError, void onDone(), bool cancelOnError) =>
+ _controller._subscribe(onData, onError, onDone, cancelOnError);
// Override == and hashCode so that new streams returned by the same
// controller are considered equal. The controller returns a new stream
@@ -832,7 +818,7 @@ class _ControllerStream<T> extends _StreamImpl<T> {
int get hashCode => _controller.hashCode ^ 0x35323532;
- bool operator==(Object other) {
+ bool operator ==(Object other) {
if (identical(this, other)) return true;
if (other is! _ControllerStream) return false;
_ControllerStream otherStream = other;
@@ -844,7 +830,7 @@ class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> {
final _StreamControllerLifecycle<T> _controller;
_ControllerSubscription(this._controller, void onData(T data),
- Function onError, void onDone(), bool cancelOnError)
+ Function onError, void onDone(), bool cancelOnError)
: super(onData, onError, onDone, cancelOnError);
Future _onCancel() {
@@ -860,15 +846,18 @@ class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> {
}
}
-
/** A class that exposes only the [StreamSink] interface of an object. */
class _StreamSinkWrapper<T> implements StreamSink<T> {
final StreamController _target;
_StreamSinkWrapper(this._target);
- void add(T data) { _target.add(data); }
+ void add(T data) {
+ _target.add(data);
+ }
+
void addError(Object error, [StackTrace stackTrace]) {
_target.addError(error, stackTrace);
}
+
Future close() => _target.close();
Future addStream(Stream<T> source, {bool cancelOnError: true}) =>
_target.addStream(source, cancelOnError: cancelOnError);
@@ -888,14 +877,13 @@ class _AddStreamState<T> {
_AddStreamState(_EventSink<T> controller, Stream source, bool cancelOnError)
: addStreamFuture = new _Future(),
addSubscription = source.listen(controller._add,
- onError: cancelOnError
- ? makeErrorHandler(controller)
- : controller._addError,
- onDone: controller._close,
- cancelOnError: cancelOnError);
-
- static makeErrorHandler(_EventSink controller) =>
- (e, StackTrace s) {
+ onError: cancelOnError
+ ? makeErrorHandler(controller)
+ : controller._addError,
+ onDone: controller._close,
+ cancelOnError: cancelOnError);
+
+ static makeErrorHandler(_EventSink controller) => (e, StackTrace s) {
controller._addError(e, s);
controller._close();
};
@@ -922,7 +910,9 @@ class _AddStreamState<T> {
addStreamFuture._asyncComplete(null);
return null;
}
- return cancel.whenComplete(() { addStreamFuture._asyncComplete(null); });
+ return cancel.whenComplete(() {
+ addStreamFuture._asyncComplete(null);
+ });
}
void complete() {
@@ -936,10 +926,8 @@ class _StreamControllerAddStreamState<T> extends _AddStreamState<T> {
// to store this state object.
var varData;
- _StreamControllerAddStreamState(_StreamController<T> controller,
- this.varData,
- Stream source,
- bool cancelOnError)
+ _StreamControllerAddStreamState(_StreamController<T> controller, this.varData,
+ Stream source, bool cancelOnError)
: super(controller, source, cancelOnError) {
if (controller.isPaused) {
addSubscription.pause();
« no previous file with comments | « sdk/lib/async/stream.dart ('k') | sdk/lib/async/stream_impl.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698