Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(424)

Unified Diff: sdk/lib/async/stream_impl.dart

Issue 2754013002: Format all dart: library files (Closed)
Patch Set: Format all dart: library files Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « sdk/lib/async/stream_controller.dart ('k') | sdk/lib/async/stream_pipe.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: sdk/lib/async/stream_impl.dart
diff --git a/sdk/lib/async/stream_impl.dart b/sdk/lib/async/stream_impl.dart
index 29cfbd1c3ce5cc1055f0f5240734352e7be9b7d3..164d3b242802874e9d1d4e2cbffaeaf6438a3d68 100644
--- a/sdk/lib/async/stream_impl.dart
+++ b/sdk/lib/async/stream_impl.dart
@@ -49,9 +49,8 @@ abstract class _EventDispatch<T> {
* but if it happens anyway, the subscription will enqueue the events just as
* when new events arrive while still firing an old event.
*/
-class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
- _EventSink<T>,
- _EventDispatch<T> {
+class _BufferingStreamSubscription<T>
+ implements StreamSubscription<T>, _EventSink<T>, _EventDispatch<T> {
/** The `cancelOnError` flag from the `listen` call. */
static const int _STATE_CANCEL_ON_ERROR = 1;
/**
@@ -107,10 +106,8 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
*/
_PendingEvents<T> _pending;
- _BufferingStreamSubscription(void onData(T data),
- Function onError,
- void onDone(),
- bool cancelOnError)
+ _BufferingStreamSubscription(
+ void onData(T data), Function onError, void onDone(), bool cancelOnError)
: _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0) {
this.onData(onData);
this.onError(onError);
@@ -197,7 +194,9 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
_Future<E> result = new _Future<E>();
// Overwrite the onDone and onError handlers.
- _onDone = () { result._complete(futureValue); };
+ _onDone = () {
+ result._complete(futureValue);
+ };
_onError = (error, stackTrace) {
Future cancelFuture = cancel();
if (!identical(cancelFuture, Future._nullFuture)) {
@@ -264,7 +263,7 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
void _addError(Object error, StackTrace stackTrace) {
if (_isCanceled) return;
if (_canFire) {
- _sendError(error, stackTrace); // Reports cancel after sending.
+ _sendError(error, stackTrace); // Reports cancel after sending.
} else {
_addPending(new _DelayedError(error, stackTrace));
}
@@ -346,11 +345,11 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
_state |= _STATE_IN_CALLBACK;
if (_onError is ZoneBinaryCallback<dynamic, Object, StackTrace>) {
ZoneBinaryCallback<dynamic, Object, StackTrace> errorCallback = _onError
- as Object /*=ZoneBinaryCallback<dynamic, Object, StackTrace>*/;
+ as Object/*=ZoneBinaryCallback<dynamic, Object, StackTrace>*/;
_zone.runBinaryGuarded(errorCallback, error, stackTrace);
} else {
_zone.runUnaryGuarded<dynamic, dynamic>(
- _onError as Object /*=ZoneUnaryCallback<dynamic, dynamic>*/, error);
+ _onError as Object/*=ZoneUnaryCallback<dynamic, dynamic>*/, error);
}
_state &= ~_STATE_IN_CALLBACK;
}
@@ -461,9 +460,7 @@ abstract class _StreamImpl<T> extends Stream<T> {
// Stream interface.
StreamSubscription<T> listen(void onData(T data),
- { Function onError,
- void onDone(),
- bool cancelOnError }) {
+ {Function onError, void onDone(), bool cancelOnError}) {
cancelOnError = identical(true, cancelOnError);
StreamSubscription<T> subscription =
_createSubscription(onData, onError, onDone, cancelOnError);
@@ -473,13 +470,10 @@ abstract class _StreamImpl<T> extends Stream<T> {
// -------------------------------------------------------------------
/** Create a subscription object. Called by [subcribe]. */
- StreamSubscription<T> _createSubscription(
- void onData(T data),
- Function onError,
- void onDone(),
- bool cancelOnError) {
- return new _BufferingStreamSubscription<T>(onData, onError, onDone,
- cancelOnError);
+ StreamSubscription<T> _createSubscription(void onData(T data),
+ Function onError, void onDone(), bool cancelOnError) {
+ return new _BufferingStreamSubscription<T>(
+ onData, onError, onDone, cancelOnError);
}
/** Hook called when the subscription has been created. */
@@ -500,11 +494,8 @@ class _GeneratedStreamImpl<T> extends _StreamImpl<T> {
*/
_GeneratedStreamImpl(this._pending);
- StreamSubscription<T> _createSubscription(
- void onData(T data),
- Function onError,
- void onDone(),
- bool cancelOnError) {
+ StreamSubscription<T> _createSubscription(void onData(T data),
+ Function onError, void onDone(), bool cancelOnError) {
if (_isUsed) throw new StateError("Stream has already been listened to.");
_isUsed = true;
return new _BufferingStreamSubscription<T>(
@@ -512,7 +503,6 @@ class _GeneratedStreamImpl<T> extends _StreamImpl<T> {
}
}
-
/** Pending events object that gets its events from an [Iterable]. */
class _IterablePendingEvents<T> extends _PendingEvents<T> {
// The iterator providing data for data events.
@@ -554,14 +544,12 @@ class _IterablePendingEvents<T> extends _PendingEvents<T> {
}
}
-
// Internal helpers.
// Types of the different handlers on a stream. Types used to type fields.
typedef void _DataHandler<T>(T value);
typedef void _DoneHandler();
-
/** Default data handler, does nothing. */
void _nullDataHandler(var value) {}
@@ -573,7 +561,6 @@ void _nullErrorHandler(error, [StackTrace stackTrace]) {
/** Default done handler, does nothing. */
void _nullDoneHandler() {}
-
/** A delayed event on a buffering stream subscription. */
abstract class _DelayedEvent<T> {
/** Added as a linked list on the [StreamController]. */
@@ -678,11 +665,11 @@ abstract class _PendingEvents<T> {
void clear();
}
-
/** Class holding pending events for a [_StreamImpl]. */
class _StreamImplEvents<T> extends _PendingEvents<T> {
/// Single linked list of [_DelayedEvent] objects.
_DelayedEvent firstPendingEvent = null;
+
/// Last element in the list of pending events. New events are added after it.
_DelayedEvent lastPendingEvent = null;
@@ -742,7 +729,9 @@ class _DoneStreamSubscription<T> implements StreamSubscription<T> {
void onData(void handleData(T data)) {}
void onError(Function handleError) {}
- void onDone(void handleDone()) { _onDone = handleDone; }
+ void onDone(void handleDone()) {
+ _onDone = handleDone;
+ }
void pause([Future resumeSignal]) {
_state += _PAUSED;
@@ -762,7 +751,9 @@ class _DoneStreamSubscription<T> implements StreamSubscription<T> {
Future<E> asFuture<E>([E futureValue]) {
_Future<E> result = new _Future<E>();
- _onDone = () { result._completeWithValue(null); };
+ _onDone = () {
+ result._completeWithValue(null);
+ };
return result;
}
@@ -783,15 +774,18 @@ class _AsBroadcastStream<T> extends Stream<T> {
_AsBroadcastStreamController<T> _controller;
StreamSubscription<T> _subscription;
- _AsBroadcastStream(this._source,
- void onListenHandler(StreamSubscription<T> subscription),
- void onCancelHandler(StreamSubscription<T> subscription))
+ _AsBroadcastStream(
+ this._source,
+ void onListenHandler(StreamSubscription<T> subscription),
+ void onCancelHandler(StreamSubscription<T> subscription))
// TODO(floitsch): the return type should be void and should be
// inferred.
- : _onListenHandler = Zone.current.registerUnaryCallback
- <dynamic, StreamSubscription<T>>(onListenHandler),
- _onCancelHandler = Zone.current.registerUnaryCallback
- <dynamic, StreamSubscription<T>>(onCancelHandler),
+ : _onListenHandler = Zone.current
+ .registerUnaryCallback<dynamic, StreamSubscription<T>>(
+ onListenHandler),
+ _onCancelHandler = Zone.current
+ .registerUnaryCallback<dynamic, StreamSubscription<T>>(
+ onCancelHandler),
_zone = Zone.current {
_controller = new _AsBroadcastStreamController<T>(_onListen, _onCancel);
}
@@ -799,9 +793,7 @@ class _AsBroadcastStream<T> extends Stream<T> {
bool get isBroadcast => true;
StreamSubscription<T> listen(void onData(T data),
- { Function onError,
- void onDone(),
- bool cancelOnError}) {
+ {Function onError, void onDone(), bool cancelOnError}) {
if (_controller == null || _controller.isClosed) {
// Return a dummy subscription backed by nothing, since
// it will only ever send one done event.
@@ -809,8 +801,7 @@ class _AsBroadcastStream<T> extends Stream<T> {
}
if (_subscription == null) {
_subscription = _source.listen(_controller.add,
- onError: _controller.addError,
- onDone: _controller.close);
+ onError: _controller.addError, onDone: _controller.close);
}
cancelOnError = identical(true, cancelOnError);
return _controller._subscribe(onData, onError, onDone, cancelOnError);
@@ -843,7 +834,7 @@ class _AsBroadcastStream<T> extends Stream<T> {
// Called by [_controller] when it has no subscribers left.
StreamSubscription subscription = _subscription;
_subscription = null;
- _controller = null; // Marks the stream as no longer listenable.
+ _controller = null; // Marks the stream as no longer listenable.
subscription.cancel();
}
@@ -909,7 +900,6 @@ class _BroadcastSubscriptionWrapper<T> implements StreamSubscription<T> {
}
}
-
/**
* Simple implementation of [StreamIterator].
*
@@ -969,7 +959,7 @@ class _StreamIterator<T> implements StreamIterator<T> {
T get current {
if (_subscription != null && _isPaused) {
- return _stateData as Object /*=T*/;
+ return _stateData as Object/*=T*/;
}
return null;
}
@@ -997,9 +987,9 @@ class _StreamIterator<T> implements StreamIterator<T> {
assert(_subscription == null);
var stateData = _stateData;
if (stateData != null) {
- Stream<T> stream = stateData as Object /*=Stream<T>*/;
- _subscription = stream.listen(
- _onData, onError: _onError, onDone: _onDone, cancelOnError: true);
+ Stream<T> stream = stateData as Object/*=Stream<T>*/;
+ _subscription = stream.listen(_onData,
+ onError: _onError, onDone: _onDone, cancelOnError: true);
var future = new _Future<bool>();
_stateData = future;
return future;
@@ -1014,7 +1004,7 @@ class _StreamIterator<T> implements StreamIterator<T> {
if (subscription != null) {
_subscription = null;
if (!_isPaused) {
- _Future<bool> future = stateData as Object /*=_Future<bool>*/;
+ _Future<bool> future = stateData as Object/*=_Future<bool>*/;
future._asyncComplete(false);
}
return subscription.cancel();
@@ -1024,7 +1014,7 @@ class _StreamIterator<T> implements StreamIterator<T> {
void _onData(T data) {
assert(_subscription != null && !_isPaused);
- _Future<bool> moveNextFuture = _stateData as Object /*=_Future<bool>*/;
+ _Future<bool> moveNextFuture = _stateData as Object/*=_Future<bool>*/;
_stateData = data;
_isPaused = true;
moveNextFuture._complete(true);
@@ -1033,7 +1023,7 @@ class _StreamIterator<T> implements StreamIterator<T> {
void _onError(Object error, [StackTrace stackTrace]) {
assert(_subscription != null && !_isPaused);
- _Future<bool> moveNextFuture = _stateData as Object /*=_Future<bool>*/;
+ _Future<bool> moveNextFuture = _stateData as Object/*=_Future<bool>*/;
_subscription = null;
_stateData = null;
moveNextFuture._completeError(error, stackTrace);
@@ -1041,7 +1031,7 @@ class _StreamIterator<T> implements StreamIterator<T> {
void _onDone() {
assert(_subscription != null && !_isPaused);
- _Future<bool> moveNextFuture = _stateData as Object /*=_Future<bool>*/;
+ _Future<bool> moveNextFuture = _stateData as Object/*=_Future<bool>*/;
_subscription = null;
_stateData = null;
moveNextFuture._complete(false);
@@ -1053,9 +1043,7 @@ class _EmptyStream<T> extends Stream<T> {
const _EmptyStream() : super._internal();
bool get isBroadcast => true;
StreamSubscription<T> listen(void onData(T data),
- {Function onError,
- void onDone(),
- bool cancelOnError}) {
+ {Function onError, void onDone(), bool cancelOnError}) {
return new _DoneStreamSubscription<T>(onDone);
}
}
« no previous file with comments | « sdk/lib/async/stream_controller.dart ('k') | sdk/lib/async/stream_pipe.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698