Index: lib/stream_channel.dart |
diff --git a/lib/stream_channel.dart b/lib/stream_channel.dart |
index 3615d2129510f38bcdfa43d8bdc7584b7049936c..16323b169cdef5a7b116b87e7914a55e43a1e6b3 100644 |
--- a/lib/stream_channel.dart |
+++ b/lib/stream_channel.dart |
@@ -7,6 +7,7 @@ import 'dart:async'; |
import 'package:async/async.dart'; |
import 'src/guarantee_channel.dart'; |
+import 'src/close_guarantee_channel.dart'; |
import 'src/stream_channel_transformer.dart'; |
export 'src/delegating_stream_channel.dart'; |
@@ -87,6 +88,19 @@ abstract class StreamChannel<T> { |
{bool allowSinkErrors: true}) => |
new GuaranteeChannel(stream, sink, allowSinkErrors: allowSinkErrors); |
+ /// Creates a new [StreamChannel] that communicates over [stream] and [sink]. |
+ /// |
+ /// This specifically enforces the second guarantee: closing the sink causes |
+ /// the stream to close before it emits any more events. This guarantee is |
+ /// invalidated when an asynchronous gap is added between the original |
+ /// stream's event dispatch and the returned stream's, for example by |
+ /// transforming it with a [StreamTransformer]. This is a lighter-weight way |
+ /// of preserving that guarantee in particular than |
+ /// [StreamChannel.withGuarantees]. |
+ factory StreamChannel.withCloseGuarantee(Stream<T> stream, |
+ StreamSink<T> sink) => |
+ new CloseGuaranteeChannel(stream, sink); |
+ |
/// Connects [this] to [other], so that any values emitted by either are sent |
/// directly to the other. |
void pipe(StreamChannel<T> other); |
@@ -148,10 +162,10 @@ abstract class StreamChannelMixin<T> implements StreamChannel<T> { |
changeSink(transformer.bind); |
StreamChannel<T> changeStream(Stream<T> change(Stream<T> stream)) => |
- new StreamChannel(change(stream), sink); |
+ new StreamChannel.withCloseGuarantee(change(stream), sink); |
StreamChannel<T> changeSink(StreamSink<T> change(StreamSink<T> sink)) => |
- new StreamChannel(stream, change(sink)); |
+ new StreamChannel.withCloseGuarantee(stream, change(sink)); |
StreamChannel/*<S>*/ cast/*<S>*/() => new StreamChannel( |
DelegatingStream.typed(stream), DelegatingStreamSink.typed(sink)); |