Chromium Code Reviews| Index: lib/src/channel_manager.dart |
| diff --git a/lib/src/channel_manager.dart b/lib/src/channel_manager.dart |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..40a3c58a9c0f9b7b7d0550c5f814cba003bd4423 |
| --- /dev/null |
| +++ b/lib/src/channel_manager.dart |
| @@ -0,0 +1,79 @@ |
| +// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
|
Bob Nystrom
2016/02/02 17:54:15
2016! :)
|
| +// for details. All rights reserved. Use of this source code is governed by a |
| +// BSD-style license that can be found in the LICENSE file. |
| + |
| +import 'dart:async'; |
| + |
| +import 'package:stream_channel/stream_channel.dart'; |
| + |
| +/// Wraps a [StreamChannel] and handles logic that's shared between [Server], |
| +/// [Client], and [Peer]. |
| +/// |
| +/// These classes don't provide the user direct access to a |
| +/// [StreamSubscription]. Instead, they use the future returned by [listen] to |
| +/// notify the user of the remote endpoint closing or producing an error. |
| +class ChannelManager { |
| + /// The name of the component whose channel is wrapped (e.g. "Server"). |
| + /// |
| + /// Used for error reporting. |
| + final String _name; |
| + |
| + /// The underlying channel. |
| + final StreamChannel _channel; |
| + |
| + /// Returns a [Future] that completes when the connection is closed. |
| + /// |
| + /// This is the same future that's returned by [listen]. |
| + Future get done => _doneCompleter.future; |
| + final _doneCompleter = new Completer(); |
| + |
| + /// Whether the underlying communication channel is closed. |
| + bool get isClosed => _doneCompleter.isCompleted; |
| + |
| + /// Whether [listen] has been called. |
| + bool _listenCalled = false; |
| + |
| + /// Whether [close] has been called. |
| + /// |
| + /// Note that [isClosed] tracks whether the underlying connection is closed, |
| + /// whereas this tracks only whether it was explicitly closed from this end. |
| + bool _closeCalled = false; |
| + |
| + ChannelManager(this._name, this._channel); |
| + |
| + /// Starts listening to the channel. |
| + /// |
| + /// The returned Future will complete when the input stream is closed. If the |
| + /// input stream emits an error, that will be piped to the returned Future. |
| + Future listen(void handleInput(input)) { |
| + if (_listenCalled) { |
| + throw new StateError("Can only call $_name.listen() once."); |
| + } |
| + _listenCalled = true; |
| + |
| + _channel.stream.listen(handleInput, |
| + onError: (error, stackTrace) { |
| + _doneCompleter.completeError(error, stackTrace); |
| + _channel.sink.close(); |
| + }, |
| + onDone: _doneCompleter.complete, |
| + cancelOnError: true); |
| + |
| + return done; |
| + } |
| + |
| + /// Emit [event]. |
| + void add(event) { |
| + if (isClosed && !_closeCalled) return; |
| + _channel.sink.add(event); |
| + } |
| + |
| + /// Closes the channel. |
| + Future close() { |
| + _closeCalled = true; |
| + if (!_doneCompleter.isCompleted) { |
| + _doneCompleter.complete(_channel.sink.close()); |
| + } |
| + return done; |
| + } |
| +} |