Index: lib/src/close_guarantee_channel.dart |
diff --git a/lib/src/close_guarantee_channel.dart b/lib/src/close_guarantee_channel.dart |
new file mode 100644 |
index 0000000000000000000000000000000000000000..a2c69bcb8bf2e98ccb76e8a756a1db22c475be80 |
--- /dev/null |
+++ b/lib/src/close_guarantee_channel.dart |
@@ -0,0 +1,86 @@ |
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file |
+// for details. All rights reserved. Use of this source code is governed by a |
+// BSD-style license that can be found in the LICENSE file. |
+ |
+import 'dart:async'; |
+ |
+import 'package:async/async.dart'; |
+ |
+import '../stream_channel.dart'; |
+ |
+/// A [StreamChannel] that specifically enforces the stream channel guarantee |
+/// that closing the sink causes the stream to close before it emits any more |
+/// events |
+/// |
+/// This is exposed via [new StreamChannel.withCloseGuarantee]. |
+class CloseGuaranteeChannel<T> extends StreamChannelMixin<T> { |
+ Stream<T> get stream => _stream; |
+ _CloseGuaranteeStream<T> _stream; |
+ |
+ StreamSink<T> get sink => _sink; |
+ _CloseGuaranteeSink<T> _sink; |
+ |
+ /// The subscription to the inner stream. |
+ StreamSubscription<T> _subscription; |
+ |
+ /// Whether the sink has closed, causing the underlying channel to disconnect. |
+ bool _disconnected = false; |
+ |
+ CloseGuaranteeChannel(Stream<T> innerStream, StreamSink<T> innerSink) { |
+ _sink = new _CloseGuaranteeSink<T>(innerSink, this); |
+ _stream = new _CloseGuaranteeStream<T>(innerStream, this); |
+ } |
+} |
+ |
+/// The stream for [CloseGuaranteeChannel]. |
+/// |
+/// This wraps the inner stream to save the subscription on the channel when |
+/// [listen] is called. |
+class _CloseGuaranteeStream<T> extends Stream<T> { |
+ /// The inner stream this is delegating to. |
+ final Stream<T> _inner; |
+ |
+ /// The [CloseGuaranteeChannel] this belongs to. |
+ final CloseGuaranteeChannel<T> _channel; |
+ |
+ _CloseGuaranteeStream(this._inner, this._channel); |
+ |
+ StreamSubscription<T> listen(void onData(T event), |
+ {Function onError, void onDone(), bool cancelOnError}) { |
+ // If the channel is already disconnected, we shouldn't dispatch anything |
+ // but a done event. |
+ if (_channel._disconnected) { |
+ onData = null; |
+ onError = null; |
+ } |
+ |
+ var subscription = _inner.listen(onData, |
+ onError: onError, onDone: onDone, cancelOnError: cancelOnError); |
+ if (!_channel._disconnected) { |
+ _channel._subscription = subscription; |
+ } |
+ return subscription; |
+ } |
+} |
+ |
+/// The sink for [CloseGuaranteeChannel]. |
+/// |
+/// This wraps the inner sink to cancel the stream subscription when the sink is |
+/// canceled. |
+class _CloseGuaranteeSink<T> extends DelegatingStreamSink<T> { |
+ /// The [CloseGuaranteeChannel] this belongs to. |
+ final CloseGuaranteeChannel<T> _channel; |
+ |
+ _CloseGuaranteeSink(StreamSink<T> inner, this._channel) : super(inner); |
+ |
+ Future close() { |
+ var done = super.close(); |
+ _channel._disconnected = true; |
+ if (_channel._subscription != null) { |
+ // Don't dispatch anything but a done event. |
+ _channel._subscription.onData(null); |
+ _channel._subscription.onError(null); |
+ } |
+ return done; |
+ } |
+} |