Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1746)

Unified Diff: sdk/lib/async/stream_pipe.dart

Issue 12610006: Renamed StreamSink to EventSink. Renamed signalError to addError. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Changed inheritance back! Now create StreamSink instead of EventSink where we create them. Created 7 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: sdk/lib/async/stream_pipe.dart
diff --git a/sdk/lib/async/stream_pipe.dart b/sdk/lib/async/stream_pipe.dart
index aaa34b059362638f8f9b04fad2f50f6eaccc9eea..69210ecabd1f2259571bfe6db85f32fc8942bd19 100644
--- a/sdk/lib/async/stream_pipe.dart
+++ b/sdk/lib/async/stream_pipe.dart
@@ -74,16 +74,16 @@ abstract class _ForwardingStream<S, T> extends Stream<T> {
// Override the following methods in subclasses to change the behavior.
- void _handleData(S data, _StreamOutputSink<T> sink) {
+ void _handleData(S data, _EventOutputSink<T> sink) {
var outputData = data;
sink._sendData(outputData);
}
- void _handleError(AsyncError error, _StreamOutputSink<T> sink) {
+ void _handleError(AsyncError error, _EventOutputSink<T> sink) {
sink._sendError(error);
}
- void _handleDone(_StreamOutputSink<T> sink) {
+ void _handleDone(_EventOutputSink<T> sink) {
sink._sendDone();
}
}
@@ -136,7 +136,7 @@ abstract class _BaseStreamSubscription<T> implements StreamSubscription<T> {
* Abstract superclass for subscriptions that forward to other subscriptions.
*/
class _ForwardingStreamSubscription<S, T>
- extends _BaseStreamSubscription<T> implements _StreamOutputSink<T> {
+ extends _BaseStreamSubscription<T> implements _EventOutputSink<T> {
final _ForwardingStream<S, T> _stream;
final bool _unsubscribeOnError;
@@ -179,7 +179,7 @@ class _ForwardingStreamSubscription<S, T>
_subscription = null;
}
- // _StreamOutputSink interface. Sends data to this subscription.
+ // _EventOutputSink interface. Sends data to this subscription.
void _sendData(T data) {
_onData(data);
@@ -233,7 +233,7 @@ class _WhereStream<T> extends _ForwardingStream<T, T> {
_WhereStream(Stream<T> source, bool test(T value))
: _test = test, super(source);
- void _handleData(T inputEvent, _StreamOutputSink<T> sink) {
+ void _handleData(T inputEvent, _EventOutputSink<T> sink) {
bool satisfies;
try {
satisfies = _test(inputEvent);
@@ -259,7 +259,7 @@ class _MapStream<S, T> extends _ForwardingStream<S, T> {
_MapStream(Stream<S> source, T transform(S event))
: this._transform = transform, super(source);
- void _handleData(S inputEvent, _StreamOutputSink<T> sink) {
+ void _handleData(S inputEvent, _EventOutputSink<T> sink) {
T outputEvent;
try {
outputEvent = _transform(inputEvent);
@@ -280,7 +280,7 @@ class _ExpandStream<S, T> extends _ForwardingStream<S, T> {
_ExpandStream(Stream<S> source, Iterable<T> expand(S event))
: this._expand = expand, super(source);
- void _handleData(S inputEvent, _StreamOutputSink<T> sink) {
+ void _handleData(S inputEvent, _EventOutputSink<T> sink) {
try {
for (T value in _expand(inputEvent)) {
sink._sendData(value);
@@ -310,7 +310,7 @@ class _HandleErrorStream<T> extends _ForwardingStream<T, T> {
bool test(error))
: this._transform = transform, this._test = test, super(source);
- void _handleError(AsyncError error, _StreamOutputSink<T> sink) {
+ void _handleError(AsyncError error, _EventOutputSink<T> sink) {
bool matches = true;
if (_test != null) {
try {
@@ -344,7 +344,7 @@ class _TakeStream<T> extends _ForwardingStream<T, T> {
if (count is! int) throw new ArgumentError(count);
}
- void _handleData(T inputEvent, _StreamOutputSink<T> sink) {
+ void _handleData(T inputEvent, _EventOutputSink<T> sink) {
if (_remaining > 0) {
sink._sendData(inputEvent);
_remaining -= 1;
@@ -364,7 +364,7 @@ class _TakeWhileStream<T> extends _ForwardingStream<T, T> {
_TakeWhileStream(Stream<T> source, bool test(T value))
: this._test = test, super(source);
- void _handleData(T inputEvent, _StreamOutputSink<T> sink) {
+ void _handleData(T inputEvent, _EventOutputSink<T> sink) {
bool satisfies;
try {
satisfies = _test(inputEvent);
@@ -392,7 +392,7 @@ class _SkipStream<T> extends _ForwardingStream<T, T> {
if (count is! int || count < 0) throw new ArgumentError(count);
}
- void _handleData(T inputEvent, _StreamOutputSink<T> sink) {
+ void _handleData(T inputEvent, _EventOutputSink<T> sink) {
if (_remaining > 0) {
_remaining--;
return;
@@ -408,7 +408,7 @@ class _SkipWhileStream<T> extends _ForwardingStream<T, T> {
_SkipWhileStream(Stream<T> source, bool test(T value))
: this._test = test, super(source);
- void _handleData(T inputEvent, _StreamOutputSink<T> sink) {
+ void _handleData(T inputEvent, _EventOutputSink<T> sink) {
if (_hasFailed) {
sink._sendData(inputEvent);
}
@@ -439,7 +439,7 @@ class _DistinctStream<T> extends _ForwardingStream<T, T> {
_DistinctStream(Stream<T> source, bool equals(T a, T b))
: _equals = equals, super(source);
- void _handleData(T inputEvent, _StreamOutputSink<T> sink) {
+ void _handleData(T inputEvent, _EventOutputSink<T> sink) {
if (identical(_previous, _SENTINEL)) {
_previous = inputEvent;
return sink._sendData(inputEvent);
@@ -465,35 +465,26 @@ class _DistinctStream<T> extends _ForwardingStream<T, T> {
// Stream transformations and event transformations.
-typedef void _TransformDataHandler<S, T>(S data, StreamSink<T> sink);
-typedef void _TransformErrorHandler<T>(AsyncError data, StreamSink<T> sink);
-typedef void _TransformDoneHandler<T>(StreamSink<T> sink);
+typedef void _TransformDataHandler<S, T>(S data, EventSink<T> sink);
+typedef void _TransformErrorHandler<T>(AsyncError data, EventSink<T> sink);
+typedef void _TransformDoneHandler<T>(EventSink<T> sink);
/** Default data handler forwards all data. */
-void _defaultHandleData(var data, StreamSink sink) {
+void _defaultHandleData(var data, EventSink sink) {
sink.add(data);
}
/** Default error handler forwards all errors. */
-void _defaultHandleError(AsyncError error, StreamSink sink) {
- sink.signalError(error);
+void _defaultHandleError(AsyncError error, EventSink sink) {
+ sink.addError(error);
}
/** Default done handler forwards done. */
-void _defaultHandleDone(StreamSink sink) {
+void _defaultHandleDone(EventSink sink) {
sink.close();
}
-/** Creates a [StreamSink] from a [_StreamImpl]'s input methods. */
-class _StreamImplSink<T> implements StreamSink<T> {
- _StreamImpl<T> _target;
- _StreamImplSink(this._target);
- void add(T data) { _target._add(data); }
- void signalError(AsyncError error) { _target._signalError(error); }
- void close() { _target._close(); }
-}
-
/**
* A [StreamTransformer] that modifies stream events.
*
@@ -511,9 +502,9 @@ class _StreamTransformerImpl<S, T> extends StreamEventTransformer<S, T> {
final _TransformErrorHandler<T> _handleError;
final _TransformDoneHandler<T> _handleDone;
- _StreamTransformerImpl(void handleData(S data, StreamSink<T> sink),
- void handleError(AsyncError data, StreamSink<T> sink),
- void handleDone(StreamSink<T> sink))
+ _StreamTransformerImpl(void handleData(S data, EventSink<T> sink),
+ void handleError(AsyncError data, EventSink<T> sink),
+ void handleDone(EventSink<T> sink))
: this._handleData = (handleData == null ? _defaultHandleData
: handleData),
this._handleError = (handleError == null ? _defaultHandleError
@@ -521,15 +512,15 @@ class _StreamTransformerImpl<S, T> extends StreamEventTransformer<S, T> {
this._handleDone = (handleDone == null ? _defaultHandleDone
: handleDone);
- void handleData(S data, StreamSink<T> sink) {
+ void handleData(S data, EventSink<T> sink) {
_handleData(data, sink);
}
- void handleError(AsyncError error, StreamSink<T> sink) {
+ void handleError(AsyncError error, EventSink<T> sink) {
_handleError(error, sink);
}
- void handleDone(StreamSink<T> sink) {
+ void handleDone(EventSink<T> sink) {
_handleDone(sink);
}
}

Powered by Google App Engine
This is Rietveld 408576698