Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1217)

Unified Diff: sdk/lib/async/stream_pipe.dart

Issue 15989006: Revert until Windows crash is debugged. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 7 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « sdk/lib/async/stream_impl.dart ('k') | sdk/lib/io/http_impl.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: sdk/lib/async/stream_pipe.dart
diff --git a/sdk/lib/async/stream_pipe.dart b/sdk/lib/async/stream_pipe.dart
index e5aca172415406ea65dbe4da256ade70f2649958..e089f6ce88dce9180a110615d26ec6df7d2b2ab0 100644
--- a/sdk/lib/async/stream_pipe.dart
+++ b/sdk/lib/async/stream_pipe.dart
@@ -69,26 +69,84 @@ abstract class _ForwardingStream<S, T> extends Stream<T> {
// Override the following methods in subclasses to change the behavior.
- void _handleData(S data, _EventSink<T> sink) {
+ void _handleData(S data, _EventOutputSink<T> sink) {
var outputData = data;
- sink._add(outputData);
+ sink._sendData(outputData);
}
- void _handleError(error, _EventSink<T> sink) {
- sink._addError(error);
+ void _handleError(error, _EventOutputSink<T> sink) {
+ sink._sendError(error);
}
- void _handleDone(_EventSink<T> sink) {
- sink._close();
+ void _handleDone(_EventOutputSink<T> sink) {
+ sink._sendDone();
}
}
/**
+ * Common behavior of [StreamSubscription] classes.
+ *
+ * Stores and allows updating of the event handlers of a [StreamSubscription].
+ */
+abstract class _BaseStreamSubscription<T> implements StreamSubscription<T> {
+ // TODO(ahe): Restore type when feature is implemented in dart2js
+ // checked mode. http://dartbug.com/7733
+ var /* _DataHandler<T> */ _onData;
+ _ErrorHandler _onError;
+ _DoneHandler _onDone;
+
+ _BaseStreamSubscription(this._onData,
+ this._onError,
+ this._onDone) {
+ if (_onData == null) _onData = _nullDataHandler;
+ if (_onError == null) _onError = _nullErrorHandler;
+ if (_onDone == null) _onDone = _nullDoneHandler;
+ }
+
+ // StreamSubscription interface.
+ void onData(void handleData(T event)) {
+ if (handleData == null) handleData = _nullDataHandler;
+ _onData = handleData;
+ }
+
+ void onError(void handleError(error)) {
+ if (handleError == null) handleError = _nullErrorHandler;
+ _onError = handleError;
+ }
+
+ void onDone(void handleDone()) {
+ if (handleDone == null) handleDone = _nullDoneHandler;
+ _onDone = handleDone;
+ }
+
+ void pause([Future resumeSignal]);
+
+ void resume();
+
+ void cancel();
+
+ Future asFuture([var futureValue]) {
+ _FutureImpl<T> result = new _FutureImpl<T>();
+
+ // Overwrite the onDone and onError handlers.
+ onDone(() { result._setValue(futureValue); });
+ onError((error) {
+ cancel();
+ result._setError(error);
+ });
+
+ return result;
+ }
+}
+
+
+/**
* Abstract superclass for subscriptions that forward to other subscriptions.
*/
class _ForwardingStreamSubscription<S, T>
- extends _BufferingStreamSubscription<T> {
+ extends _BaseStreamSubscription<T> implements _EventOutputSink<T> {
final _ForwardingStream<S, T> _stream;
+ final bool _cancelOnError;
StreamSubscription<S> _subscription;
@@ -96,46 +154,60 @@ class _ForwardingStreamSubscription<S, T>
void onData(T data),
void onError(error),
void onDone(),
- bool cancelOnError)
- : super(onData, onError, onDone, cancelOnError) {
+ this._cancelOnError)
+ : super(onData, onError, onDone) {
+ // Don't unsubscribe on incoming error, only if we send an error forwards.
_subscription =
_stream._source.listen(_handleData,
onError: _handleError,
onDone: _handleDone);
}
- // _StreamSink interface.
- // Transformers sending more than one event have no way to know if the stream
- // is canceled or closed after the first, so we just ignore remaining events.
+ // StreamSubscription interface.
- void _add(T data) {
- if (_isClosed) return;
- super._add(data);
+ void pause([Future resumeSignal]) {
+ if (_subscription == null) return;
+ _subscription.pause(resumeSignal);
}
- void _addError(Object error) {
- if (_isClosed) return;
- super._addError(error);
+ void resume() {
+ if (_subscription == null) return;
+ _subscription.resume();
}
- // StreamSubscription callbacks.
+ bool get isPaused {
+ if (_subscription == null) return false;
+ return _subscription.isPaused;
+ }
- void _onPause() {
- if (_subscription == null) return;
- _subscription.pause();
+ void cancel() {
+ if (_subscription != null) {
+ _subscription.cancel();
+ _subscription = null;
+ }
}
- void _onResume() {
- if (_subscription == null) return;
- _subscription.resume();
+ // _EventOutputSink interface. Sends data to this subscription.
+
+ void _sendData(T data) {
+ _onData(data);
+ }
+
+ void _sendError(error) {
+ _onError(error);
+ if (_cancelOnError) {
+ _subscription.cancel();
+ _subscription = null;
+ }
}
- void _onCancel() {
+ void _sendDone() {
+ // If the transformation sends a done signal, we stop the subscription.
if (_subscription != null) {
- StreamSubscription subscription = _subscription;
+ _subscription.cancel();
_subscription = null;
- subscription.cancel();
}
+ _onDone();
}
// Methods used as listener on source subscription.
@@ -169,16 +241,16 @@ class _WhereStream<T> extends _ForwardingStream<T, T> {
_WhereStream(Stream<T> source, bool test(T value))
: _test = test, super(source);
- void _handleData(T inputEvent, _EventSink<T> sink) {
+ void _handleData(T inputEvent, _EventOutputSink<T> sink) {
bool satisfies;
try {
satisfies = _test(inputEvent);
} catch (e, s) {
- sink._addError(_asyncError(e, s));
+ sink._sendError(_asyncError(e, s));
return;
}
if (satisfies) {
- sink._add(inputEvent);
+ sink._sendData(inputEvent);
}
}
}
@@ -195,15 +267,15 @@ class _MapStream<S, T> extends _ForwardingStream<S, T> {
_MapStream(Stream<S> source, T transform(S event))
: this._transform = transform, super(source);
- void _handleData(S inputEvent, _EventSink<T> sink) {
+ void _handleData(S inputEvent, _EventOutputSink<T> sink) {
T outputEvent;
try {
outputEvent = _transform(inputEvent);
} catch (e, s) {
- sink._addError(_asyncError(e, s));
+ sink._sendError(_asyncError(e, s));
return;
}
- sink._add(outputEvent);
+ sink._sendData(outputEvent);
}
}
@@ -216,15 +288,15 @@ class _ExpandStream<S, T> extends _ForwardingStream<S, T> {
_ExpandStream(Stream<S> source, Iterable<T> expand(S event))
: this._expand = expand, super(source);
- void _handleData(S inputEvent, _EventSink<T> sink) {
+ void _handleData(S inputEvent, _EventOutputSink<T> sink) {
try {
for (T value in _expand(inputEvent)) {
- sink._add(value);
+ sink._sendData(value);
}
} catch (e, s) {
// If either _expand or iterating the generated iterator throws,
// we abort the iteration.
- sink._addError(_asyncError(e, s));
+ sink._sendError(_asyncError(e, s));
}
}
}
@@ -246,13 +318,13 @@ class _HandleErrorStream<T> extends _ForwardingStream<T, T> {
bool test(error))
: this._transform = transform, this._test = test, super(source);
- void _handleError(Object error, _EventSink<T> sink) {
+ void _handleError(Object error, _EventOutputSink<T> sink) {
bool matches = true;
if (_test != null) {
try {
matches = _test(error);
} catch (e, s) {
- sink._addError(_asyncError(e, s));
+ sink._sendError(_asyncError(e, s));
return;
}
}
@@ -260,11 +332,11 @@ class _HandleErrorStream<T> extends _ForwardingStream<T, T> {
try {
_transform(error);
} catch (e, s) {
- sink._addError(_asyncError(e, s));
+ sink._sendError(_asyncError(e, s));
return;
}
} else {
- sink._addError(error);
+ sink._sendError(error);
}
}
}
@@ -280,14 +352,14 @@ class _TakeStream<T> extends _ForwardingStream<T, T> {
if (count is! int) throw new ArgumentError(count);
}
- void _handleData(T inputEvent, _EventSink<T> sink) {
+ void _handleData(T inputEvent, _EventOutputSink<T> sink) {
if (_remaining > 0) {
- sink._add(inputEvent);
+ sink._sendData(inputEvent);
_remaining -= 1;
if (_remaining == 0) {
// Closing also unsubscribes all subscribers, which unsubscribes
// this from source.
- sink._close();
+ sink._sendDone();
}
}
}
@@ -300,20 +372,20 @@ class _TakeWhileStream<T> extends _ForwardingStream<T, T> {
_TakeWhileStream(Stream<T> source, bool test(T value))
: this._test = test, super(source);
- void _handleData(T inputEvent, _EventSink<T> sink) {
+ void _handleData(T inputEvent, _EventOutputSink<T> sink) {
bool satisfies;
try {
satisfies = _test(inputEvent);
} catch (e, s) {
- sink._addError(_asyncError(e, s));
+ sink._sendError(_asyncError(e, s));
// The test didn't say true. Didn't say false either, but we stop anyway.
- sink._close();
+ sink._sendDone();
return;
}
if (satisfies) {
- sink._add(inputEvent);
+ sink._sendData(inputEvent);
} else {
- sink._close();
+ sink._sendDone();
}
}
}
@@ -328,12 +400,12 @@ class _SkipStream<T> extends _ForwardingStream<T, T> {
if (count is! int || count < 0) throw new ArgumentError(count);
}
- void _handleData(T inputEvent, _EventSink<T> sink) {
+ void _handleData(T inputEvent, _EventOutputSink<T> sink) {
if (_remaining > 0) {
_remaining--;
return;
}
- return sink._add(inputEvent);
+ return sink._sendData(inputEvent);
}
}
@@ -344,23 +416,23 @@ class _SkipWhileStream<T> extends _ForwardingStream<T, T> {
_SkipWhileStream(Stream<T> source, bool test(T value))
: this._test = test, super(source);
- void _handleData(T inputEvent, _EventSink<T> sink) {
+ void _handleData(T inputEvent, _EventOutputSink<T> sink) {
if (_hasFailed) {
- sink._add(inputEvent);
+ sink._sendData(inputEvent);
return;
}
bool satisfies;
try {
satisfies = _test(inputEvent);
} catch (e, s) {
- sink._addError(_asyncError(e, s));
+ sink._sendError(_asyncError(e, s));
// A failure to return a boolean is considered "not matching".
_hasFailed = true;
return;
}
if (!satisfies) {
_hasFailed = true;
- sink._add(inputEvent);
+ sink._sendData(inputEvent);
}
}
}
@@ -376,10 +448,10 @@ class _DistinctStream<T> extends _ForwardingStream<T, T> {
_DistinctStream(Stream<T> source, bool equals(T a, T b))
: _equals = equals, super(source);
- void _handleData(T inputEvent, _EventSink<T> sink) {
+ void _handleData(T inputEvent, _EventOutputSink<T> sink) {
if (identical(_previous, _SENTINEL)) {
_previous = inputEvent;
- return sink._add(inputEvent);
+ return sink._sendData(inputEvent);
} else {
bool isEqual;
try {
@@ -389,11 +461,11 @@ class _DistinctStream<T> extends _ForwardingStream<T, T> {
isEqual = _equals(_previous, inputEvent);
}
} catch (e, s) {
- sink._addError(_asyncError(e, s));
+ sink._sendError(_asyncError(e, s));
return null;
}
if (!isEqual) {
- sink._add(inputEvent);
+ sink._sendData(inputEvent);
_previous = inputEvent;
}
}
« no previous file with comments | « sdk/lib/async/stream_impl.dart ('k') | sdk/lib/io/http_impl.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698