Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(303)

Unified Diff: sdk/lib/async/stream_pipe.dart

Issue 14753009: Make StreamSubscription be the active part of a stream. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Address comments. Created 7 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « sdk/lib/async/stream_impl.dart ('k') | sdk/lib/io/http_impl.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: sdk/lib/async/stream_pipe.dart
diff --git a/sdk/lib/async/stream_pipe.dart b/sdk/lib/async/stream_pipe.dart
index e089f6ce88dce9180a110615d26ec6df7d2b2ab0..e5aca172415406ea65dbe4da256ade70f2649958 100644
--- a/sdk/lib/async/stream_pipe.dart
+++ b/sdk/lib/async/stream_pipe.dart
@@ -69,84 +69,26 @@ abstract class _ForwardingStream<S, T> extends Stream<T> {
// Override the following methods in subclasses to change the behavior.
- void _handleData(S data, _EventOutputSink<T> sink) {
+ void _handleData(S data, _EventSink<T> sink) {
var outputData = data;
- sink._sendData(outputData);
+ sink._add(outputData);
}
- void _handleError(error, _EventOutputSink<T> sink) {
- sink._sendError(error);
+ void _handleError(error, _EventSink<T> sink) {
+ sink._addError(error);
}
- void _handleDone(_EventOutputSink<T> sink) {
- sink._sendDone();
+ void _handleDone(_EventSink<T> sink) {
+ sink._close();
}
}
/**
- * Common behavior of [StreamSubscription] classes.
- *
- * Stores and allows updating of the event handlers of a [StreamSubscription].
- */
-abstract class _BaseStreamSubscription<T> implements StreamSubscription<T> {
- // TODO(ahe): Restore type when feature is implemented in dart2js
- // checked mode. http://dartbug.com/7733
- var /* _DataHandler<T> */ _onData;
- _ErrorHandler _onError;
- _DoneHandler _onDone;
-
- _BaseStreamSubscription(this._onData,
- this._onError,
- this._onDone) {
- if (_onData == null) _onData = _nullDataHandler;
- if (_onError == null) _onError = _nullErrorHandler;
- if (_onDone == null) _onDone = _nullDoneHandler;
- }
-
- // StreamSubscription interface.
- void onData(void handleData(T event)) {
- if (handleData == null) handleData = _nullDataHandler;
- _onData = handleData;
- }
-
- void onError(void handleError(error)) {
- if (handleError == null) handleError = _nullErrorHandler;
- _onError = handleError;
- }
-
- void onDone(void handleDone()) {
- if (handleDone == null) handleDone = _nullDoneHandler;
- _onDone = handleDone;
- }
-
- void pause([Future resumeSignal]);
-
- void resume();
-
- void cancel();
-
- Future asFuture([var futureValue]) {
- _FutureImpl<T> result = new _FutureImpl<T>();
-
- // Overwrite the onDone and onError handlers.
- onDone(() { result._setValue(futureValue); });
- onError((error) {
- cancel();
- result._setError(error);
- });
-
- return result;
- }
-}
-
-
-/**
* Abstract superclass for subscriptions that forward to other subscriptions.
*/
class _ForwardingStreamSubscription<S, T>
- extends _BaseStreamSubscription<T> implements _EventOutputSink<T> {
+ extends _BufferingStreamSubscription<T> {
final _ForwardingStream<S, T> _stream;
- final bool _cancelOnError;
StreamSubscription<S> _subscription;
@@ -154,60 +96,46 @@ class _ForwardingStreamSubscription<S, T>
void onData(T data),
void onError(error),
void onDone(),
- this._cancelOnError)
- : super(onData, onError, onDone) {
- // Don't unsubscribe on incoming error, only if we send an error forwards.
+ bool cancelOnError)
+ : super(onData, onError, onDone, cancelOnError) {
_subscription =
_stream._source.listen(_handleData,
onError: _handleError,
onDone: _handleDone);
}
- // StreamSubscription interface.
-
- void pause([Future resumeSignal]) {
- if (_subscription == null) return;
- _subscription.pause(resumeSignal);
- }
-
- void resume() {
- if (_subscription == null) return;
- _subscription.resume();
- }
+ // _StreamSink interface.
+ // Transformers sending more than one event have no way to know if the stream
+ // is canceled or closed after the first, so we just ignore remaining events.
- bool get isPaused {
- if (_subscription == null) return false;
- return _subscription.isPaused;
+ void _add(T data) {
+ if (_isClosed) return;
+ super._add(data);
}
- void cancel() {
- if (_subscription != null) {
- _subscription.cancel();
- _subscription = null;
- }
+ void _addError(Object error) {
+ if (_isClosed) return;
+ super._addError(error);
}
- // _EventOutputSink interface. Sends data to this subscription.
+ // StreamSubscription callbacks.
- void _sendData(T data) {
- _onData(data);
+ void _onPause() {
+ if (_subscription == null) return;
+ _subscription.pause();
}
- void _sendError(error) {
- _onError(error);
- if (_cancelOnError) {
- _subscription.cancel();
- _subscription = null;
- }
+ void _onResume() {
+ if (_subscription == null) return;
+ _subscription.resume();
}
- void _sendDone() {
- // If the transformation sends a done signal, we stop the subscription.
+ void _onCancel() {
if (_subscription != null) {
- _subscription.cancel();
+ StreamSubscription subscription = _subscription;
_subscription = null;
+ subscription.cancel();
}
- _onDone();
}
// Methods used as listener on source subscription.
@@ -241,16 +169,16 @@ class _WhereStream<T> extends _ForwardingStream<T, T> {
_WhereStream(Stream<T> source, bool test(T value))
: _test = test, super(source);
- void _handleData(T inputEvent, _EventOutputSink<T> sink) {
+ void _handleData(T inputEvent, _EventSink<T> sink) {
bool satisfies;
try {
satisfies = _test(inputEvent);
} catch (e, s) {
- sink._sendError(_asyncError(e, s));
+ sink._addError(_asyncError(e, s));
return;
}
if (satisfies) {
- sink._sendData(inputEvent);
+ sink._add(inputEvent);
}
}
}
@@ -267,15 +195,15 @@ class _MapStream<S, T> extends _ForwardingStream<S, T> {
_MapStream(Stream<S> source, T transform(S event))
: this._transform = transform, super(source);
- void _handleData(S inputEvent, _EventOutputSink<T> sink) {
+ void _handleData(S inputEvent, _EventSink<T> sink) {
T outputEvent;
try {
outputEvent = _transform(inputEvent);
} catch (e, s) {
- sink._sendError(_asyncError(e, s));
+ sink._addError(_asyncError(e, s));
return;
}
- sink._sendData(outputEvent);
+ sink._add(outputEvent);
}
}
@@ -288,15 +216,15 @@ class _ExpandStream<S, T> extends _ForwardingStream<S, T> {
_ExpandStream(Stream<S> source, Iterable<T> expand(S event))
: this._expand = expand, super(source);
- void _handleData(S inputEvent, _EventOutputSink<T> sink) {
+ void _handleData(S inputEvent, _EventSink<T> sink) {
try {
for (T value in _expand(inputEvent)) {
- sink._sendData(value);
+ sink._add(value);
}
} catch (e, s) {
// If either _expand or iterating the generated iterator throws,
// we abort the iteration.
- sink._sendError(_asyncError(e, s));
+ sink._addError(_asyncError(e, s));
}
}
}
@@ -318,13 +246,13 @@ class _HandleErrorStream<T> extends _ForwardingStream<T, T> {
bool test(error))
: this._transform = transform, this._test = test, super(source);
- void _handleError(Object error, _EventOutputSink<T> sink) {
+ void _handleError(Object error, _EventSink<T> sink) {
bool matches = true;
if (_test != null) {
try {
matches = _test(error);
} catch (e, s) {
- sink._sendError(_asyncError(e, s));
+ sink._addError(_asyncError(e, s));
return;
}
}
@@ -332,11 +260,11 @@ class _HandleErrorStream<T> extends _ForwardingStream<T, T> {
try {
_transform(error);
} catch (e, s) {
- sink._sendError(_asyncError(e, s));
+ sink._addError(_asyncError(e, s));
return;
}
} else {
- sink._sendError(error);
+ sink._addError(error);
}
}
}
@@ -352,14 +280,14 @@ class _TakeStream<T> extends _ForwardingStream<T, T> {
if (count is! int) throw new ArgumentError(count);
}
- void _handleData(T inputEvent, _EventOutputSink<T> sink) {
+ void _handleData(T inputEvent, _EventSink<T> sink) {
if (_remaining > 0) {
- sink._sendData(inputEvent);
+ sink._add(inputEvent);
_remaining -= 1;
if (_remaining == 0) {
// Closing also unsubscribes all subscribers, which unsubscribes
// this from source.
- sink._sendDone();
+ sink._close();
}
}
}
@@ -372,20 +300,20 @@ class _TakeWhileStream<T> extends _ForwardingStream<T, T> {
_TakeWhileStream(Stream<T> source, bool test(T value))
: this._test = test, super(source);
- void _handleData(T inputEvent, _EventOutputSink<T> sink) {
+ void _handleData(T inputEvent, _EventSink<T> sink) {
bool satisfies;
try {
satisfies = _test(inputEvent);
} catch (e, s) {
- sink._sendError(_asyncError(e, s));
+ sink._addError(_asyncError(e, s));
// The test didn't say true. Didn't say false either, but we stop anyway.
- sink._sendDone();
+ sink._close();
return;
}
if (satisfies) {
- sink._sendData(inputEvent);
+ sink._add(inputEvent);
} else {
- sink._sendDone();
+ sink._close();
}
}
}
@@ -400,12 +328,12 @@ class _SkipStream<T> extends _ForwardingStream<T, T> {
if (count is! int || count < 0) throw new ArgumentError(count);
}
- void _handleData(T inputEvent, _EventOutputSink<T> sink) {
+ void _handleData(T inputEvent, _EventSink<T> sink) {
if (_remaining > 0) {
_remaining--;
return;
}
- return sink._sendData(inputEvent);
+ return sink._add(inputEvent);
}
}
@@ -416,23 +344,23 @@ class _SkipWhileStream<T> extends _ForwardingStream<T, T> {
_SkipWhileStream(Stream<T> source, bool test(T value))
: this._test = test, super(source);
- void _handleData(T inputEvent, _EventOutputSink<T> sink) {
+ void _handleData(T inputEvent, _EventSink<T> sink) {
if (_hasFailed) {
- sink._sendData(inputEvent);
+ sink._add(inputEvent);
return;
}
bool satisfies;
try {
satisfies = _test(inputEvent);
} catch (e, s) {
- sink._sendError(_asyncError(e, s));
+ sink._addError(_asyncError(e, s));
// A failure to return a boolean is considered "not matching".
_hasFailed = true;
return;
}
if (!satisfies) {
_hasFailed = true;
- sink._sendData(inputEvent);
+ sink._add(inputEvent);
}
}
}
@@ -448,10 +376,10 @@ class _DistinctStream<T> extends _ForwardingStream<T, T> {
_DistinctStream(Stream<T> source, bool equals(T a, T b))
: _equals = equals, super(source);
- void _handleData(T inputEvent, _EventOutputSink<T> sink) {
+ void _handleData(T inputEvent, _EventSink<T> sink) {
if (identical(_previous, _SENTINEL)) {
_previous = inputEvent;
- return sink._sendData(inputEvent);
+ return sink._add(inputEvent);
} else {
bool isEqual;
try {
@@ -461,11 +389,11 @@ class _DistinctStream<T> extends _ForwardingStream<T, T> {
isEqual = _equals(_previous, inputEvent);
}
} catch (e, s) {
- sink._sendError(_asyncError(e, s));
+ sink._addError(_asyncError(e, s));
return null;
}
if (!isEqual) {
- sink._sendData(inputEvent);
+ sink._add(inputEvent);
_previous = inputEvent;
}
}
« no previous file with comments | « sdk/lib/async/stream_impl.dart ('k') | sdk/lib/io/http_impl.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698