Index: lib/src/channel_manager.dart |
diff --git a/lib/src/channel_manager.dart b/lib/src/channel_manager.dart |
new file mode 100644 |
index 0000000000000000000000000000000000000000..35d75a60814bc80197a4c015c7da6593fdd26505 |
--- /dev/null |
+++ b/lib/src/channel_manager.dart |
@@ -0,0 +1,79 @@ |
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file |
+// for details. All rights reserved. Use of this source code is governed by a |
+// BSD-style license that can be found in the LICENSE file. |
+ |
+import 'dart:async'; |
+ |
+import 'package:stream_channel/stream_channel.dart'; |
+ |
+/// Wraps a [StreamChannel] and handles logic that's shared between [Server], |
+/// [Client], and [Peer]. |
+/// |
+/// These classes don't provide the user direct access to a |
+/// [StreamSubscription]. Instead, they use the future returned by [listen] to |
+/// notify the user of the remote endpoint closing or producing an error. |
+class ChannelManager { |
+ /// The name of the component whose channel is wrapped (e.g. "Server"). |
+ /// |
+ /// Used for error reporting. |
+ final String _name; |
+ |
+ /// The underlying channel. |
+ final StreamChannel _channel; |
+ |
+ /// Returns a [Future] that completes when the connection is closed. |
+ /// |
+ /// This is the same future that's returned by [listen]. |
+ Future get done => _doneCompleter.future; |
+ final _doneCompleter = new Completer(); |
+ |
+ /// Whether the underlying communication channel is closed. |
+ bool get isClosed => _doneCompleter.isCompleted; |
+ |
+ /// Whether [listen] has been called. |
+ bool _listenCalled = false; |
+ |
+ /// Whether [close] has been called. |
+ /// |
+ /// Note that [isClosed] tracks whether the underlying connection is closed, |
+ /// whereas this tracks only whether it was explicitly closed from this end. |
+ bool _closeCalled = false; |
+ |
+ ChannelManager(this._name, this._channel); |
+ |
+ /// Starts listening to the channel. |
+ /// |
+ /// The returned Future will complete when the input stream is closed. If the |
+ /// input stream emits an error, that will be piped to the returned Future. |
+ Future listen(void handleInput(input)) { |
+ if (_listenCalled) { |
+ throw new StateError("Can only call $_name.listen() once."); |
+ } |
+ _listenCalled = true; |
+ |
+ _channel.stream.listen(handleInput, |
+ onError: (error, stackTrace) { |
+ _doneCompleter.completeError(error, stackTrace); |
+ _channel.sink.close(); |
+ }, |
+ onDone: _doneCompleter.complete, |
+ cancelOnError: true); |
+ |
+ return done; |
+ } |
+ |
+ /// Emit [event]. |
+ void add(event) { |
+ if (isClosed && !_closeCalled) return; |
+ _channel.sink.add(event); |
+ } |
+ |
+ /// Closes the channel. |
+ Future close() { |
+ _closeCalled = true; |
+ if (!_doneCompleter.isCompleted) { |
+ _doneCompleter.complete(_channel.sink.close()); |
+ } |
+ return done; |
+ } |
+} |