Index: sdk/lib/async/collection_sink.dart |
diff --git a/sdk/lib/core/sink.dart b/sdk/lib/async/collection_sink.dart |
similarity index 53% |
rename from sdk/lib/core/sink.dart |
rename to sdk/lib/async/collection_sink.dart |
index d0dafc0c2c340ae721ef25629d4bd1d3c2d0efa6..a023fd1d0bbae6abd9fdfd7096c74ed830530511 100644 |
--- a/sdk/lib/core/sink.dart |
+++ b/sdk/lib/async/collection_sink.dart |
@@ -2,27 +2,16 @@ |
// for details. All rights reserved. Use of this source code is governed by a |
// BSD-style license that can be found in the LICENSE file. |
-part of dart.core; |
-/** |
- * An interface for an object that can receive a sequence of values. |
- */ |
-abstract class Sink<T> { |
- /** Write a value to the sink. */ |
- add(T value); |
- /** Tell the sink that no further values will be written. */ |
- void close(); |
-} |
- |
-// ---------------------------------------------------------------------- |
-// Collections/Sink interoperability |
-// ---------------------------------------------------------------------- |
+part of dart.async; |
typedef void _CollectionSinkCallback<T>(Collection<T> collection); |
+typedef void _CollectionSinkErrorCallback(AsyncError error); |
-/** Sink that stores incoming data in a collection. */ |
-class CollectionSink<T> implements Sink<T> { |
+/** StreamSink that stores incoming data in a collection. */ |
+class CollectionSink<T> implements StreamSink<T> { |
final Collection<T> collection; |
- final _CollectionSinkCallback<T> callback; |
+ final _CollectionSinkCallback<T> _callback; |
+ final _CollectionSinkErrorCallback _errorCallback; |
bool _isClosed = false; |
/** |
@@ -33,17 +22,25 @@ class CollectionSink<T> implements Sink<T> { |
* If [callback] is provided, then it's called with the collection as arugment |
* when the sink's [close] method is called. |
*/ |
- CollectionSink(this.collection, [void callback(Collection<T> collection)]) |
- : this.callback = callback; |
+ CollectionSink(this.collection, |
+ { void onClose(Collection<T> collection), |
+ void onError(AsyncError error) }) |
+ : this._callback = onClose, |
+ this._errorCallback = onError; |
add(T value) { |
if (_isClosed) throw new StateError("Adding to closed sink"); |
collection.add(value); |
} |
+ void signalError(AsyncError error) { |
+ if (_isClosed) throw new StateError("Singalling error on closed sink"); |
+ if (_errorCallback != null) _errorCallback(error); |
+ } |
+ |
void close() { |
if (_isClosed) throw new StateError("Closing closed sink"); |
_isClosed = true; |
- if (callback != null) callback(collection); |
+ if (_callback != null) _callback(collection); |
} |
} |