Index: lib/src/disconnector.dart |
diff --git a/lib/src/disconnector.dart b/lib/src/disconnector.dart |
new file mode 100644 |
index 0000000000000000000000000000000000000000..35ecd1cc408305e34feec605e1251fced3747c38 |
--- /dev/null |
+++ b/lib/src/disconnector.dart |
@@ -0,0 +1,139 @@ |
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file |
+// for details. All rights reserved. Use of this source code is governed by a |
+// BSD-style license that can be found in the LICENSE file. |
+ |
+import 'dart:async'; |
+ |
+import '../stream_channel.dart'; |
+ |
+/// Allows the caller to force a channel to disconnect. |
+/// |
+/// When [disconnect] is called, the channel (or channels) transformed by this |
+/// transformer will act as though the remote end had disconnected—the stream |
+/// will emit a done event, and the sink will ignore future inputs. The inner |
+/// sink will also be closed to notify the remote end of the disconnection. |
+/// |
+/// If a channel is transformed after the [disconnect] has been called, it will |
+/// be disconnected immediately. |
+class Disconnector<T> implements StreamChannelTransformer<T, T> { |
+ /// Whether [disconnect] has been called. |
+ bool get isDisconnected => _isDisconnected; |
+ var _isDisconnected = false; |
+ |
+ /// The sinks for transformed channels. |
+ /// |
+ /// Note that we assume that transformed channels provide the stream channel |
+ /// guarantees. This allows us to only track sinks, because we know closing |
+ /// the underlying sink will cause the stream to emit a done event. |
+ final _sinks = <_DisconnectorSink<T>>[]; |
+ |
+ /// Disconnects all channels that have been transformed. |
+ void disconnect() { |
+ _isDisconnected = true; |
+ for (var sink in _sinks) { |
+ sink._disconnect(); |
+ } |
+ _sinks.clear(); |
+ } |
+ |
+ StreamChannel<T> bind(StreamChannel<T> channel) { |
+ return channel.changeSink((innerSink) { |
+ var sink = new _DisconnectorSink(innerSink); |
+ |
+ if (_isDisconnected) { |
+ sink._disconnect(); |
+ } else { |
+ _sinks.add(sink); |
+ } |
+ |
+ return sink; |
+ }); |
+ } |
+} |
+ |
+/// A sink wrapper that can force a disconnection. |
+class _DisconnectorSink<T> implements StreamSink<T> { |
+ /// The inner sink. |
+ final StreamSink<T> _inner; |
+ |
+ Future get done => _inner.done; |
+ |
+ /// Whether [Disconnector.disconnect] has been called. |
+ var _isDisconnected = false; |
tjblasi
2016/02/09 16:45:37
Out of curiosity, what is your rule for typing pro
nweiz
2016/02/09 19:10:53
In general, I don't type a field if its type would
|
+ |
+ /// Whether the user has called [close]. |
+ var _closed = false; |
+ |
+ /// The subscription to the stream passed to [addStream], if a stream is |
+ /// currently being added. |
+ StreamSubscription<T> _addStreamSubscription; |
+ |
+ /// The completer for the future returned by [addStream], if a stream is |
+ /// currently being added. |
+ Completer _addStreamCompleter; |
+ |
+ /// Whether we're currently adding a stream with [addStream]. |
+ bool get _inAddStream => _addStreamSubscription != null; |
+ |
+ _DisconnectorSink(this._inner); |
+ |
+ void add(T data) { |
+ if (_closed) throw new StateError("Cannot add event after closing."); |
+ if (_inAddStream) { |
+ throw new StateError("Cannot add event while adding stream."); |
+ } |
+ if (_isDisconnected) return; |
+ |
+ _inner.add(data); |
+ } |
+ |
+ void addError(error, [StackTrace stackTrace]) { |
+ if (_closed) throw new StateError("Cannot add event after closing."); |
+ if (_inAddStream) { |
+ throw new StateError("Cannot add event while adding stream."); |
+ } |
+ if (_isDisconnected) return; |
+ |
+ _inner.addError(error, stackTrace); |
+ } |
+ |
+ Future addStream(Stream<T> stream) { |
+ if (_closed) throw new StateError("Cannot add stream after closing."); |
+ if (_inAddStream) { |
+ throw new StateError("Cannot add stream while adding stream."); |
+ } |
+ if (_isDisconnected) return new Future.value(); |
+ |
+ _addStreamCompleter = new Completer.sync(); |
+ _addStreamSubscription = stream.listen( |
+ _inner.add, |
+ onError: _inner.addError, |
+ onDone: _addStreamCompleter.complete); |
+ return _addStreamCompleter.future.then((_) { |
+ _addStreamCompleter = null; |
+ _addStreamSubscription = null; |
+ }); |
+ } |
+ |
+ Future close() { |
+ if (_inAddStream) { |
+ throw new StateError("Cannot close sink while adding stream."); |
+ } |
+ |
+ _closed = true; |
+ return _inner.close(); |
+ } |
+ |
+ /// Disconnects this sink. |
+ /// |
+ /// This closes the underlying sink and stops forwarding events. |
+ void _disconnect() { |
+ _isDisconnected = true; |
+ _inner.close(); |
+ |
+ if (!_inAddStream) return; |
+ _addStreamCompleter.complete(_addStreamSubscription.cancel()); |
+ _addStreamCompleter = null; |
+ _addStreamSubscription = null; |
+ } |
+} |