OLD | NEW |
(Empty) | |
| 1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. |
| 4 |
| 5 import 'dart:async'; |
| 6 |
| 7 import 'package:stream_channel/stream_channel.dart'; |
| 8 |
| 9 /// Wraps a [StreamChannel] and handles logic that's shared between [Server], |
| 10 /// [Client], and [Peer]. |
| 11 /// |
| 12 /// These classes don't provide the user direct access to a |
| 13 /// [StreamSubscription]. Instead, they use the future returned by [listen] to |
| 14 /// notify the user of the remote endpoint closing or producing an error. |
| 15 class ChannelManager { |
| 16 /// The name of the component whose channel is wrapped (e.g. "Server"). |
| 17 /// |
| 18 /// Used for error reporting. |
| 19 final String _name; |
| 20 |
| 21 /// The underlying channel. |
| 22 final StreamChannel _channel; |
| 23 |
| 24 /// Returns a [Future] that completes when the connection is closed. |
| 25 /// |
| 26 /// This is the same future that's returned by [listen]. |
| 27 Future get done => _doneCompleter.future; |
| 28 final _doneCompleter = new Completer(); |
| 29 |
| 30 /// Whether the underlying communication channel is closed. |
| 31 bool get isClosed => _doneCompleter.isCompleted; |
| 32 |
| 33 /// Whether [listen] has been called. |
| 34 bool _listenCalled = false; |
| 35 |
| 36 /// Whether [close] has been called. |
| 37 /// |
| 38 /// Note that [isClosed] tracks whether the underlying connection is closed, |
| 39 /// whereas this tracks only whether it was explicitly closed from this end. |
| 40 bool _closeCalled = false; |
| 41 |
| 42 ChannelManager(this._name, this._channel); |
| 43 |
| 44 /// Starts listening to the channel. |
| 45 /// |
| 46 /// The returned Future will complete when the input stream is closed. If the |
| 47 /// input stream emits an error, that will be piped to the returned Future. |
| 48 Future listen(void handleInput(input)) { |
| 49 if (_listenCalled) { |
| 50 throw new StateError("Can only call $_name.listen() once."); |
| 51 } |
| 52 _listenCalled = true; |
| 53 |
| 54 _channel.stream.listen(handleInput, |
| 55 onError: (error, stackTrace) { |
| 56 _doneCompleter.completeError(error, stackTrace); |
| 57 _channel.sink.close(); |
| 58 }, |
| 59 onDone: _doneCompleter.complete, |
| 60 cancelOnError: true); |
| 61 |
| 62 return done; |
| 63 } |
| 64 |
| 65 /// Emit [event]. |
| 66 void add(event) { |
| 67 if (isClosed && !_closeCalled) return; |
| 68 _channel.sink.add(event); |
| 69 } |
| 70 |
| 71 /// Closes the channel. |
| 72 Future close() { |
| 73 _closeCalled = true; |
| 74 if (!_doneCompleter.isCompleted) { |
| 75 _doneCompleter.complete(_channel.sink.close()); |
| 76 } |
| 77 return done; |
| 78 } |
| 79 } |
OLD | NEW |