Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(353)

Unified Diff: sdk/lib/async/stream.dart

Issue 2754013002: Format all dart: library files (Closed)
Patch Set: Format all dart: library files Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « sdk/lib/async/schedule_microtask.dart ('k') | sdk/lib/async/stream_controller.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: sdk/lib/async/stream.dart
diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart
index b2a7a215b81caeee9e762136cb308cfe4f61d213..26343b69b35dee57f03ef60331b095f593b3825c 100644
--- a/sdk/lib/async/stream.dart
+++ b/sdk/lib/async/stream.dart
@@ -104,13 +104,12 @@ abstract class Stream<T> {
// to wait for a listener before doing the `then` on the future.
_StreamController<T> controller = new StreamController<T>(sync: true);
future.then((value) {
- controller._add(value);
- controller._closeUnchecked();
- },
- onError: (error, stackTrace) {
- controller._addError(error, stackTrace);
- controller._closeUnchecked();
- });
+ controller._add(value);
+ controller._closeUnchecked();
+ }, onError: (error, stackTrace) {
+ controller._addError(error, stackTrace);
+ controller._closeUnchecked();
+ });
return controller.stream;
}
@@ -181,7 +180,7 @@ abstract class Stream<T> {
* If [computation] is omitted the event values will all be `null`.
*/
factory Stream.periodic(Duration period,
- [T computation(int computationCount)]) {
+ [T computation(int computationCount)]) {
Timer timer;
int computationCount = 0;
StreamController<T> controller;
@@ -209,7 +208,8 @@ abstract class Stream<T> {
});
}
- controller = new StreamController<T>(sync: true,
+ controller = new StreamController<T>(
+ sync: true,
onListen: () {
watch.start();
startPeriodicTimer();
@@ -277,8 +277,8 @@ abstract class Stream<T> {
*
* The resulting stream is a broadcast stream if [source] is.
*/
- factory Stream.eventTransformed(Stream source,
- EventSink mapSink(EventSink<T> sink)) {
+ factory Stream.eventTransformed(
+ Stream source, EventSink mapSink(EventSink<T> sink)) {
return new _BoundSinkStream(source, mapSink);
}
@@ -308,9 +308,9 @@ abstract class Stream<T> {
* while having no subscribers to prevent losing events, or canceling the
* subscription when there are no listeners.
*/
- Stream<T> asBroadcastStream({
- void onListen(StreamSubscription<T> subscription),
- void onCancel(StreamSubscription<T> subscription) }) {
+ Stream<T> asBroadcastStream(
+ {void onListen(StreamSubscription<T> subscription),
+ void onCancel(StreamSubscription<T> subscription)}) {
return new _AsBroadcastStream<T>(this, onListen, onCancel);
}
@@ -350,9 +350,7 @@ abstract class Stream<T> {
* event handler functions are called.
*/
StreamSubscription<T> listen(void onData(T event),
- { Function onError,
- void onDone(),
- bool cancelOnError});
+ {Function onError, void onDone(), bool cancelOnError});
/**
* Creates a new stream from this stream that discards some data events.
@@ -405,46 +403,46 @@ abstract class Stream<T> {
void onListen() {
final add = controller.add;
assert(controller is _StreamController ||
- controller is _BroadcastStreamController);
- final _EventSink<E> eventSink =
- controller as Object /*=_EventSink<E>*/;
+ controller is _BroadcastStreamController);
+ final _EventSink<E> eventSink = controller as Object/*=_EventSink<E>*/;
final addError = eventSink._addError;
- subscription = this.listen(
- (T event) {
- dynamic newValue;
- try {
- newValue = convert(event);
- } catch (e, s) {
- controller.addError(e, s);
- return;
- }
- if (newValue is Future) {
- subscription.pause();
- newValue.then(add, onError: addError)
- .whenComplete(subscription.resume);
- } else {
- controller.add(newValue as Object/*=E*/);
- }
- },
- onError: addError,
- onDone: controller.close
- );
+ subscription = this.listen((T event) {
+ dynamic newValue;
+ try {
+ newValue = convert(event);
+ } catch (e, s) {
+ controller.addError(e, s);
+ return;
+ }
+ if (newValue is Future) {
+ subscription.pause();
+ newValue
+ .then(add, onError: addError)
+ .whenComplete(subscription.resume);
+ } else {
+ controller.add(newValue as Object/*=E*/);
+ }
+ }, onError: addError, onDone: controller.close);
}
if (this.isBroadcast) {
controller = new StreamController<E>.broadcast(
- onListen: onListen,
- onCancel: () { subscription.cancel(); },
- sync: true
- );
+ onListen: onListen,
+ onCancel: () {
+ subscription.cancel();
+ },
+ sync: true);
} else {
controller = new StreamController<E>(
- onListen: onListen,
- onPause: () { subscription.pause(); },
- onResume: () { subscription.resume(); },
- onCancel: () => subscription.cancel(),
- sync: true
- );
+ onListen: onListen,
+ onPause: () {
+ subscription.pause();
+ },
+ onResume: () {
+ subscription.resume();
+ },
+ onCancel: () => subscription.cancel(),
+ sync: true);
}
return controller.stream;
}
@@ -467,42 +465,43 @@ abstract class Stream<T> {
StreamSubscription<T> subscription;
void onListen() {
assert(controller is _StreamController ||
- controller is _BroadcastStreamController);
- final _EventSink<E> eventSink =
- controller as Object /*=_EventSink<E>*/;
- subscription = this.listen(
- (T event) {
- Stream<E> newStream;
- try {
- newStream = convert(event);
- } catch (e, s) {
- controller.addError(e, s);
- return;
- }
- if (newStream != null) {
- subscription.pause();
- controller.addStream(newStream)
- .whenComplete(subscription.resume);
- }
- },
- onError: eventSink._addError, // Avoid Zone error replacement.
- onDone: controller.close
- );
+ controller is _BroadcastStreamController);
+ final _EventSink<E> eventSink = controller as Object/*=_EventSink<E>*/;
+ subscription = this.listen((T event) {
+ Stream<E> newStream;
+ try {
+ newStream = convert(event);
+ } catch (e, s) {
+ controller.addError(e, s);
+ return;
+ }
+ if (newStream != null) {
+ subscription.pause();
+ controller.addStream(newStream).whenComplete(subscription.resume);
+ }
+ },
+ onError: eventSink._addError, // Avoid Zone error replacement.
+ onDone: controller.close);
}
+
if (this.isBroadcast) {
controller = new StreamController<E>.broadcast(
- onListen: onListen,
- onCancel: () { subscription.cancel(); },
- sync: true
- );
+ onListen: onListen,
+ onCancel: () {
+ subscription.cancel();
+ },
+ sync: true);
} else {
controller = new StreamController<E>(
- onListen: onListen,
- onPause: () { subscription.pause(); },
- onResume: () { subscription.resume(); },
- onCancel: () => subscription.cancel(),
- sync: true
- );
+ onListen: onListen,
+ onPause: () {
+ subscription.pause();
+ },
+ onResume: () {
+ subscription.resume();
+ },
+ onCancel: () => subscription.cancel(),
+ sync: true);
}
return controller.stream;
}
@@ -535,7 +534,7 @@ abstract class Stream<T> {
* If a broadcast stream is listened to more than once, each subscription
* will individually perform the `test` and handle the error.
*/
- Stream<T> handleError(Function onError, { bool test(error) }) {
+ Stream<T> handleError(Function onError, {bool test(error)}) {
return new _HandleErrorStream<T>(this, onError, test);
}
@@ -585,8 +584,7 @@ abstract class Stream<T> {
* The `streamTransformer` can decide whether it wants to return a
* broadcast stream or not.
*/
- Stream<S> transform<S>(
- StreamTransformer<T, S > streamTransformer) {
+ Stream<S> transform<S>(StreamTransformer<T, S> streamTransformer) {
return streamTransformer.bind(this);
}
@@ -599,55 +597,46 @@ abstract class Stream<T> {
T value;
StreamSubscription subscription;
subscription = this.listen(
- (T element) {
- if (seenFirst) {
- _runUserCode(() => combine(value, element),
- (T newValue) { value = newValue; },
- _cancelAndErrorClosure(subscription, result));
- } else {
- value = element;
- seenFirst = true;
- }
- },
- onError: result._completeError,
- onDone: () {
- if (!seenFirst) {
- try {
- throw IterableElementError.noElement();
- } catch (e, s) {
- _completeWithErrorCallback(result, e, s);
+ (T element) {
+ if (seenFirst) {
+ _runUserCode(() => combine(value, element), (T newValue) {
+ value = newValue;
+ }, _cancelAndErrorClosure(subscription, result));
+ } else {
+ value = element;
+ seenFirst = true;
}
- } else {
- result._complete(value);
- }
- },
- cancelOnError: true
- );
+ },
+ onError: result._completeError,
+ onDone: () {
+ if (!seenFirst) {
+ try {
+ throw IterableElementError.noElement();
+ } catch (e, s) {
+ _completeWithErrorCallback(result, e, s);
+ }
+ } else {
+ result._complete(value);
+ }
+ },
+ cancelOnError: true);
return result;
}
/** Reduces a sequence of values by repeatedly applying [combine]. */
- Future<S> fold<S>(S initialValue,
- S combine(S previous, T element)) {
-
+ Future<S> fold<S>(S initialValue, S combine(S previous, T element)) {
_Future<S> result = new _Future<S>();
S value = initialValue;
StreamSubscription subscription;
- subscription = this.listen(
- (T element) {
- _runUserCode(
- () => combine(value, element),
- (S newValue) { value = newValue; },
- _cancelAndErrorClosure(subscription, result)
- );
- },
- onError: (e, st) {
- result._completeError(e, st);
- },
- onDone: () {
- result._complete(value);
- },
- cancelOnError: true);
+ subscription = this.listen((T element) {
+ _runUserCode(() => combine(value, element), (S newValue) {
+ value = newValue;
+ }, _cancelAndErrorClosure(subscription, result));
+ }, onError: (e, st) {
+ result._completeError(e, st);
+ }, onDone: () {
+ result._complete(value);
+ }, cancelOnError: true);
return result;
}
@@ -666,25 +655,21 @@ abstract class Stream<T> {
StringBuffer buffer = new StringBuffer();
StreamSubscription subscription;
bool first = true;
- subscription = this.listen(
- (T element) {
- if (!first) {
- buffer.write(separator);
- }
- first = false;
- try {
- buffer.write(element);
- } catch (e, s) {
- _cancelAndErrorWithReplacement(subscription, result, e, s);
- }
- },
- onError: (e) {
- result._completeError(e);
- },
- onDone: () {
- result._complete(buffer.toString());
- },
- cancelOnError: true);
+ subscription = this.listen((T element) {
+ if (!first) {
+ buffer.write(separator);
+ }
+ first = false;
+ try {
+ buffer.write(element);
+ } catch (e, s) {
+ _cancelAndErrorWithReplacement(subscription, result, e, s);
+ }
+ }, onError: (e) {
+ result._completeError(e);
+ }, onDone: () {
+ result._complete(buffer.toString());
+ }, cancelOnError: true);
return result;
}
@@ -699,15 +684,11 @@ abstract class Stream<T> {
StreamSubscription subscription;
subscription = this.listen(
(T element) {
- _runUserCode(
- () => (element == needle),
- (bool isMatch) {
- if (isMatch) {
- _cancelAndValue(subscription, future, true);
- }
- },
- _cancelAndErrorClosure(subscription, future)
- );
+ _runUserCode(() => (element == needle), (bool isMatch) {
+ if (isMatch) {
+ _cancelAndValue(subscription, future, true);
+ }
+ }, _cancelAndErrorClosure(subscription, future));
},
onError: future._completeError,
onDone: () {
@@ -729,11 +710,8 @@ abstract class Stream<T> {
StreamSubscription subscription;
subscription = this.listen(
(T element) {
- _runUserCode(
- () => action(element),
- (_) {},
- _cancelAndErrorClosure(subscription, future)
- );
+ _runUserCode(() => action(element), (_) {},
+ _cancelAndErrorClosure(subscription, future));
},
onError: future._completeError,
onDone: () {
@@ -754,15 +732,11 @@ abstract class Stream<T> {
StreamSubscription subscription;
subscription = this.listen(
(T element) {
- _runUserCode(
- () => test(element),
- (bool isMatch) {
- if (!isMatch) {
- _cancelAndValue(subscription, future, false);
- }
- },
- _cancelAndErrorClosure(subscription, future)
- );
+ _runUserCode(() => test(element), (bool isMatch) {
+ if (!isMatch) {
+ _cancelAndValue(subscription, future, false);
+ }
+ }, _cancelAndErrorClosure(subscription, future));
},
onError: future._completeError,
onDone: () {
@@ -791,15 +765,11 @@ abstract class Stream<T> {
StreamSubscription subscription;
subscription = this.listen(
(T element) {
- _runUserCode(
- () => test(element),
- (bool isMatch) {
- if (isMatch) {
- _cancelAndValue(subscription, future, true);
- }
- },
- _cancelAndErrorClosure(subscription, future)
- );
+ _runUserCode(() => test(element), (bool isMatch) {
+ if (isMatch) {
+ _cancelAndValue(subscription, future, true);
+ }
+ }, _cancelAndErrorClosure(subscription, future));
},
onError: future._completeError,
onDone: () {
@@ -809,18 +779,19 @@ abstract class Stream<T> {
return future;
}
-
/** Counts the elements in the stream. */
Future<int> get length {
_Future<int> future = new _Future<int>();
int count = 0;
this.listen(
- (_) { count++; },
- onError: future._completeError,
- onDone: () {
- future._complete(count);
- },
- cancelOnError: true);
+ (_) {
+ count++;
+ },
+ onError: future._completeError,
+ onDone: () {
+ future._complete(count);
+ },
+ cancelOnError: true);
return future;
}
@@ -837,14 +808,14 @@ abstract class Stream<T> {
_Future<bool> future = new _Future<bool>();
StreamSubscription subscription;
subscription = this.listen(
- (_) {
- _cancelAndValue(subscription, future, false);
- },
- onError: future._completeError,
- onDone: () {
- future._complete(true);
- },
- cancelOnError: true);
+ (_) {
+ _cancelAndValue(subscription, future, false);
+ },
+ onError: future._completeError,
+ onDone: () {
+ future._complete(true);
+ },
+ cancelOnError: true);
return future;
}
@@ -853,14 +824,14 @@ abstract class Stream<T> {
List<T> result = <T>[];
_Future<List<T>> future = new _Future<List<T>>();
this.listen(
- (T data) {
- result.add(data);
- },
- onError: future._completeError,
- onDone: () {
- future._complete(result);
- },
- cancelOnError: true);
+ (T data) {
+ result.add(data);
+ },
+ onError: future._completeError,
+ onDone: () {
+ future._complete(result);
+ },
+ cancelOnError: true);
return future;
}
@@ -877,14 +848,14 @@ abstract class Stream<T> {
Set<T> result = new Set<T>();
_Future<Set<T>> future = new _Future<Set<T>>();
this.listen(
- (T data) {
- result.add(data);
- },
- onError: future._completeError,
- onDone: () {
- future._complete(result);
- },
- cancelOnError: true);
+ (T data) {
+ result.add(data);
+ },
+ onError: future._completeError,
+ onDone: () {
+ future._complete(result);
+ },
+ cancelOnError: true);
return future;
}
@@ -899,8 +870,8 @@ abstract class Stream<T> {
* In case of a `done` event the future completes with the given
* [futureValue].
*/
- Future<E> drain<E>([E futureValue])
- => listen(null, cancelOnError: true).asFuture<E>(futureValue);
+ Future<E> drain<E>([E futureValue]) => listen(null, cancelOnError: true)
+ .asFuture<E>(futureValue);
/**
* Provides at most the first [count] data events of this stream.
@@ -1014,18 +985,18 @@ abstract class Stream<T> {
_Future<T> future = new _Future<T>();
StreamSubscription subscription;
subscription = this.listen(
- (T value) {
- _cancelAndValue(subscription, future, value);
- },
- onError: future._completeError,
- onDone: () {
- try {
- throw IterableElementError.noElement();
- } catch (e, s) {
- _completeWithErrorCallback(future, e, s);
- }
- },
- cancelOnError: true);
+ (T value) {
+ _cancelAndValue(subscription, future, value);
+ },
+ onError: future._completeError,
+ onDone: () {
+ try {
+ throw IterableElementError.noElement();
+ } catch (e, s) {
+ _completeWithErrorCallback(future, e, s);
+ }
+ },
+ cancelOnError: true);
return future;
}
@@ -1043,23 +1014,23 @@ abstract class Stream<T> {
T result = null;
bool foundResult = false;
listen(
- (T value) {
- foundResult = true;
- result = value;
- },
- onError: future._completeError,
- onDone: () {
- if (foundResult) {
- future._complete(result);
- return;
- }
- try {
- throw IterableElementError.noElement();
- } catch (e, s) {
- _completeWithErrorCallback(future, e, s);
- }
- },
- cancelOnError: true);
+ (T value) {
+ foundResult = true;
+ result = value;
+ },
+ onError: future._completeError,
+ onDone: () {
+ if (foundResult) {
+ future._complete(result);
+ return;
+ }
+ try {
+ throw IterableElementError.noElement();
+ } catch (e, s) {
+ _completeWithErrorCallback(future, e, s);
+ }
+ },
+ cancelOnError: true);
return future;
}
@@ -1077,32 +1048,32 @@ abstract class Stream<T> {
bool foundResult = false;
StreamSubscription subscription;
subscription = this.listen(
- (T value) {
- if (foundResult) {
- // This is the second element we get.
+ (T value) {
+ if (foundResult) {
+ // This is the second element we get.
+ try {
+ throw IterableElementError.tooMany();
+ } catch (e, s) {
+ _cancelAndErrorWithReplacement(subscription, future, e, s);
+ }
+ return;
+ }
+ foundResult = true;
+ result = value;
+ },
+ onError: future._completeError,
+ onDone: () {
+ if (foundResult) {
+ future._complete(result);
+ return;
+ }
try {
- throw IterableElementError.tooMany();
+ throw IterableElementError.noElement();
} catch (e, s) {
- _cancelAndErrorWithReplacement(subscription, future, e, s);
+ _completeWithErrorCallback(future, e, s);
}
- return;
- }
- foundResult = true;
- result = value;
- },
- onError: future._completeError,
- onDone: () {
- if (foundResult) {
- future._complete(result);
- return;
- }
- try {
- throw IterableElementError.noElement();
- } catch (e, s) {
- _completeWithErrorCallback(future, e, s);
- }
- },
- cancelOnError: true);
+ },
+ cancelOnError: true);
return future;
}
@@ -1131,30 +1102,26 @@ abstract class Stream<T> {
_Future<dynamic> future = new _Future();
StreamSubscription subscription;
subscription = this.listen(
- (T value) {
- _runUserCode(
- () => test(value),
- (bool isMatch) {
+ (T value) {
+ _runUserCode(() => test(value), (bool isMatch) {
if (isMatch) {
_cancelAndValue(subscription, future, value);
}
- },
- _cancelAndErrorClosure(subscription, future)
- );
- },
- onError: future._completeError,
- onDone: () {
- if (defaultValue != null) {
- _runUserCode(defaultValue, future._complete, future._completeError);
- return;
- }
- try {
- throw IterableElementError.noElement();
- } catch (e, s) {
- _completeWithErrorCallback(future, e, s);
- }
- },
- cancelOnError: true);
+ }, _cancelAndErrorClosure(subscription, future));
+ },
+ onError: future._completeError,
+ onDone: () {
+ if (defaultValue != null) {
+ _runUserCode(defaultValue, future._complete, future._completeError);
+ return;
+ }
+ try {
+ throw IterableElementError.noElement();
+ } catch (e, s) {
+ _completeWithErrorCallback(future, e, s);
+ }
+ },
+ cancelOnError: true);
return future;
}
@@ -1171,35 +1138,31 @@ abstract class Stream<T> {
bool foundResult = false;
StreamSubscription subscription;
subscription = this.listen(
- (T value) {
- _runUserCode(
- () => true == test(value),
- (bool isMatch) {
+ (T value) {
+ _runUserCode(() => true == test(value), (bool isMatch) {
if (isMatch) {
foundResult = true;
result = value;
}
- },
- _cancelAndErrorClosure(subscription, future)
- );
- },
- onError: future._completeError,
- onDone: () {
- if (foundResult) {
- future._complete(result);
- return;
- }
- if (defaultValue != null) {
- _runUserCode(defaultValue, future._complete, future._completeError);
- return;
- }
- try {
- throw IterableElementError.noElement();
- } catch (e, s) {
- _completeWithErrorCallback(future, e, s);
- }
- },
- cancelOnError: true);
+ }, _cancelAndErrorClosure(subscription, future));
+ },
+ onError: future._completeError,
+ onDone: () {
+ if (foundResult) {
+ future._complete(result);
+ return;
+ }
+ if (defaultValue != null) {
+ _runUserCode(defaultValue, future._complete, future._completeError);
+ return;
+ }
+ try {
+ throw IterableElementError.noElement();
+ } catch (e, s) {
+ _completeWithErrorCallback(future, e, s);
+ }
+ },
+ cancelOnError: true);
return future;
}
@@ -1215,10 +1178,8 @@ abstract class Stream<T> {
bool foundResult = false;
StreamSubscription subscription;
subscription = this.listen(
- (T value) {
- _runUserCode(
- () => true == test(value),
- (bool isMatch) {
+ (T value) {
+ _runUserCode(() => true == test(value), (bool isMatch) {
if (isMatch) {
if (foundResult) {
try {
@@ -1231,23 +1192,21 @@ abstract class Stream<T> {
foundResult = true;
result = value;
}
- },
- _cancelAndErrorClosure(subscription, future)
- );
- },
- onError: future._completeError,
- onDone: () {
- if (foundResult) {
- future._complete(result);
- return;
- }
- try {
- throw IterableElementError.noElement();
- } catch (e, s) {
- _completeWithErrorCallback(future, e, s);
- }
- },
- cancelOnError: true);
+ }, _cancelAndErrorClosure(subscription, future));
+ },
+ onError: future._completeError,
+ onDone: () {
+ if (foundResult) {
+ future._complete(result);
+ return;
+ }
+ try {
+ throw IterableElementError.noElement();
+ } catch (e, s) {
+ _completeWithErrorCallback(future, e, s);
+ }
+ },
+ cancelOnError: true);
return future;
}
@@ -1273,19 +1232,19 @@ abstract class Stream<T> {
StreamSubscription subscription;
int elementIndex = 0;
subscription = this.listen(
- (T value) {
- if (index == elementIndex) {
- _cancelAndValue(subscription, future, value);
- return;
- }
- elementIndex += 1;
- },
- onError: future._completeError,
- onDone: () {
- future._completeError(
- new RangeError.index(index, this, "index", null, elementIndex));
- },
- cancelOnError: true);
+ (T value) {
+ if (index == elementIndex) {
+ _cancelAndValue(subscription, future, value);
+ return;
+ }
+ elementIndex += 1;
+ },
+ onError: future._completeError,
+ onDone: () {
+ future._completeError(
+ new RangeError.index(index, this, "index", null, elementIndex));
+ },
+ cancelOnError: true);
return future;
}
@@ -1324,18 +1283,21 @@ abstract class Stream<T> {
controller.add(event);
timer = zone.createTimer(timeLimit, timeout);
}
+
void onError(error, StackTrace stackTrace) {
timer.cancel();
assert(controller is _StreamController ||
- controller is _BroadcastStreamController);
+ controller is _BroadcastStreamController);
dynamic eventSink = controller;
- eventSink._addError(error, stackTrace); // Avoid Zone error replacement.
+ eventSink._addError(error, stackTrace); // Avoid Zone error replacement.
timer = zone.createTimer(timeLimit, timeout);
}
+
void onDone() {
timer.cancel();
controller.close();
}
+
void onListen() {
// This is the onListen callback for of controller.
// It runs in the same zone that the subscription was created in.
@@ -1344,8 +1306,8 @@ abstract class Stream<T> {
zone = Zone.current;
if (onTimeout == null) {
timeout = () {
- controller.addError(new TimeoutException("No stream event",
- timeLimit), null);
+ controller.addError(
+ new TimeoutException("No stream event", timeLimit), null);
};
} else {
// TODO(floitsch): the return type should be 'void', and the type
@@ -1355,7 +1317,7 @@ abstract class Stream<T> {
_ControllerEventSinkWrapper wrapper =
new _ControllerEventSinkWrapper(null);
timeout = () {
- wrapper._sink = controller; // Only valid during call.
+ wrapper._sink = controller; // Only valid during call.
zone.runUnaryGuarded(registeredOnTimeout, wrapper);
wrapper._sink = null;
};
@@ -1364,26 +1326,24 @@ abstract class Stream<T> {
subscription = this.listen(onData, onError: onError, onDone: onDone);
timer = zone.createTimer(timeLimit, timeout);
}
+
Future onCancel() {
timer.cancel();
Future result = subscription.cancel();
subscription = null;
return result;
}
+
controller = isBroadcast
? new _SyncBroadcastStreamController<T>(onListen, onCancel)
- : new _SyncStreamController<T>(
- onListen,
- () {
- // Don't null the timer, onCancel may call cancel again.
- timer.cancel();
- subscription.pause();
- },
- () {
- subscription.resume();
- timer = zone.createTimer(timeLimit, timeout);
- },
- onCancel);
+ : new _SyncStreamController<T>(onListen, () {
+ // Don't null the timer, onCancel may call cancel again.
+ timer.cancel();
+ subscription.pause();
+ }, () {
+ subscription.resume();
+ timer = zone.createTimer(timeLimit, timeout);
+ }, onCancel);
return controller.stream;
}
}
@@ -1501,7 +1461,6 @@ abstract class StreamSubscription<T> {
Future<E> asFuture<E>([E futureValue]);
}
-
/**
* An interface that abstracts creation or handling of [Stream] events.
*/
@@ -1516,30 +1475,28 @@ abstract class EventSink<T> implements Sink<T> {
void close();
}
-
/** [Stream] wrapper that only exposes the [Stream] interface. */
class StreamView<T> extends Stream<T> {
final Stream<T> _stream;
- const StreamView(Stream<T> stream) : _stream = stream, super._internal();
+ const StreamView(Stream<T> stream)
+ : _stream = stream,
+ super._internal();
bool get isBroadcast => _stream.isBroadcast;
Stream<T> asBroadcastStream(
- {void onListen(StreamSubscription<T> subscription),
- void onCancel(StreamSubscription<T> subscription)})
- => _stream.asBroadcastStream(onListen: onListen, onCancel: onCancel);
+ {void onListen(StreamSubscription<T> subscription),
+ void onCancel(StreamSubscription<T> subscription)}) =>
+ _stream.asBroadcastStream(onListen: onListen, onCancel: onCancel);
StreamSubscription<T> listen(void onData(T value),
- { Function onError,
- void onDone(),
- bool cancelOnError }) {
- return _stream.listen(onData, onError: onError, onDone: onDone,
- cancelOnError: cancelOnError);
+ {Function onError, void onDone(), bool cancelOnError}) {
+ return _stream.listen(onData,
+ onError: onError, onDone: onDone, cancelOnError: cancelOnError);
}
}
-
/**
* Abstract interface for a "sink" accepting multiple entire streams.
*
@@ -1589,7 +1546,6 @@ abstract class StreamConsumer<S> {
Future close();
}
-
/**
* A object that accepts stream events both synchronously and asynchronously.
*
@@ -1648,7 +1604,6 @@ abstract class StreamSink<S> implements EventSink<S>, StreamConsumer<S> {
Future get done;
}
-
/**
* The target of a [Stream.transform] call.
*
@@ -1722,8 +1677,9 @@ abstract class StreamTransformer<S, T> {
* });
*/
const factory StreamTransformer(
- StreamSubscription<T> transformer(Stream<S> stream, bool cancelOnError))
- = _StreamSubscriptionTransformer<S, T>;
+ StreamSubscription<T> transformer(
+ Stream<S> stream, bool cancelOnError)) =
+ _StreamSubscriptionTransformer<S, T>;
/**
* Creates a [StreamTransformer] that delegates events to the given functions.
@@ -1736,11 +1692,10 @@ abstract class StreamTransformer<S, T> {
* sink.add(value); // Duplicate the incoming events.
* }));
*/
- factory StreamTransformer.fromHandlers({
- void handleData(S data, EventSink<T> sink),
+ factory StreamTransformer.fromHandlers(
+ {void handleData(S data, EventSink<T> sink),
void handleError(Object error, StackTrace stackTrace, EventSink<T> sink),
- void handleDone(EventSink<T> sink)})
- = _StreamHandlerTransformer<S, T>;
+ void handleDone(EventSink<T> sink)}) = _StreamHandlerTransformer<S, T>;
/**
* Transform the incoming [stream]'s events.
@@ -1765,12 +1720,12 @@ abstract class StreamTransformer<S, T> {
* The stream may be paused between calls to [moveNext].
*/
abstract class StreamIterator<T> {
-
/** Create a [StreamIterator] on [stream]. */
factory StreamIterator(Stream<T> stream)
// TODO(lrn): use redirecting factory constructor when type
// arguments are supported.
- => new _StreamIterator<T>(stream);
+ =>
+ new _StreamIterator<T>(stream);
/**
* Wait for the next stream value to be available.
@@ -1822,7 +1777,6 @@ abstract class StreamIterator<T> {
Future cancel();
}
-
/**
* Wraps an [_EventSink] so it exposes only the [EventSink] interface.
*/
@@ -1830,9 +1784,15 @@ class _ControllerEventSinkWrapper<T> implements EventSink<T> {
EventSink _sink;
_ControllerEventSinkWrapper(this._sink);
- void add(T data) { _sink.add(data); }
+ void add(T data) {
+ _sink.add(data);
+ }
+
void addError(error, [StackTrace stackTrace]) {
_sink.addError(error, stackTrace);
}
- void close() { _sink.close(); }
+
+ void close() {
+ _sink.close();
+ }
}
« no previous file with comments | « sdk/lib/async/schedule_microtask.dart ('k') | sdk/lib/async/stream_controller.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698