Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(137)

Unified Diff: lib/src/disconnector.dart

Issue 1679193002: Add a Disconnector class. (Closed) Base URL: git@github.com:dart-lang/stream_channel.git@master
Patch Set: Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « CHANGELOG.md ('k') | lib/stream_channel.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: lib/src/disconnector.dart
diff --git a/lib/src/disconnector.dart b/lib/src/disconnector.dart
new file mode 100644
index 0000000000000000000000000000000000000000..35ecd1cc408305e34feec605e1251fced3747c38
--- /dev/null
+++ b/lib/src/disconnector.dart
@@ -0,0 +1,139 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import '../stream_channel.dart';
+
+/// Allows the caller to force a channel to disconnect.
+///
+/// When [disconnect] is called, the channel (or channels) transformed by this
+/// transformer will act as though the remote end had disconnected—the stream
+/// will emit a done event, and the sink will ignore future inputs. The inner
+/// sink will also be closed to notify the remote end of the disconnection.
+///
+/// If a channel is transformed after the [disconnect] has been called, it will
+/// be disconnected immediately.
+class Disconnector<T> implements StreamChannelTransformer<T, T> {
+ /// Whether [disconnect] has been called.
+ bool get isDisconnected => _isDisconnected;
+ var _isDisconnected = false;
+
+ /// The sinks for transformed channels.
+ ///
+ /// Note that we assume that transformed channels provide the stream channel
+ /// guarantees. This allows us to only track sinks, because we know closing
+ /// the underlying sink will cause the stream to emit a done event.
+ final _sinks = <_DisconnectorSink<T>>[];
+
+ /// Disconnects all channels that have been transformed.
+ void disconnect() {
+ _isDisconnected = true;
+ for (var sink in _sinks) {
+ sink._disconnect();
+ }
+ _sinks.clear();
+ }
+
+ StreamChannel<T> bind(StreamChannel<T> channel) {
+ return channel.changeSink((innerSink) {
+ var sink = new _DisconnectorSink(innerSink);
+
+ if (_isDisconnected) {
+ sink._disconnect();
+ } else {
+ _sinks.add(sink);
+ }
+
+ return sink;
+ });
+ }
+}
+
+/// A sink wrapper that can force a disconnection.
+class _DisconnectorSink<T> implements StreamSink<T> {
+ /// The inner sink.
+ final StreamSink<T> _inner;
+
+ Future get done => _inner.done;
+
+ /// Whether [Disconnector.disconnect] has been called.
+ var _isDisconnected = false;
tjblasi 2016/02/09 16:45:37 Out of curiosity, what is your rule for typing pro
nweiz 2016/02/09 19:10:53 In general, I don't type a field if its type would
+
+ /// Whether the user has called [close].
+ var _closed = false;
+
+ /// The subscription to the stream passed to [addStream], if a stream is
+ /// currently being added.
+ StreamSubscription<T> _addStreamSubscription;
+
+ /// The completer for the future returned by [addStream], if a stream is
+ /// currently being added.
+ Completer _addStreamCompleter;
+
+ /// Whether we're currently adding a stream with [addStream].
+ bool get _inAddStream => _addStreamSubscription != null;
+
+ _DisconnectorSink(this._inner);
+
+ void add(T data) {
+ if (_closed) throw new StateError("Cannot add event after closing.");
+ if (_inAddStream) {
+ throw new StateError("Cannot add event while adding stream.");
+ }
+ if (_isDisconnected) return;
+
+ _inner.add(data);
+ }
+
+ void addError(error, [StackTrace stackTrace]) {
+ if (_closed) throw new StateError("Cannot add event after closing.");
+ if (_inAddStream) {
+ throw new StateError("Cannot add event while adding stream.");
+ }
+ if (_isDisconnected) return;
+
+ _inner.addError(error, stackTrace);
+ }
+
+ Future addStream(Stream<T> stream) {
+ if (_closed) throw new StateError("Cannot add stream after closing.");
+ if (_inAddStream) {
+ throw new StateError("Cannot add stream while adding stream.");
+ }
+ if (_isDisconnected) return new Future.value();
+
+ _addStreamCompleter = new Completer.sync();
+ _addStreamSubscription = stream.listen(
+ _inner.add,
+ onError: _inner.addError,
+ onDone: _addStreamCompleter.complete);
+ return _addStreamCompleter.future.then((_) {
+ _addStreamCompleter = null;
+ _addStreamSubscription = null;
+ });
+ }
+
+ Future close() {
+ if (_inAddStream) {
+ throw new StateError("Cannot close sink while adding stream.");
+ }
+
+ _closed = true;
+ return _inner.close();
+ }
+
+ /// Disconnects this sink.
+ ///
+ /// This closes the underlying sink and stops forwarding events.
+ void _disconnect() {
+ _isDisconnected = true;
+ _inner.close();
+
+ if (!_inAddStream) return;
+ _addStreamCompleter.complete(_addStreamSubscription.cancel());
+ _addStreamCompleter = null;
+ _addStreamSubscription = null;
+ }
+}
« no previous file with comments | « CHANGELOG.md ('k') | lib/stream_channel.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698