| Index: lib/src/peer.dart
|
| diff --git a/lib/src/peer.dart b/lib/src/peer.dart
|
| index a6707e255ad2e2b22d96c0c29f358f200b454886..810d173b228681ce012aeda05e5b11448229a5a6 100644
|
| --- a/lib/src/peer.dart
|
| +++ b/lib/src/peer.dart
|
| @@ -4,12 +4,13 @@
|
|
|
| import 'dart:async';
|
|
|
| -import '../error_code.dart' as error_code;
|
| +import 'package:stream_channel/stream_channel.dart';
|
| +
|
| +import 'channel_manager.dart';
|
| import 'client.dart';
|
| -import 'exception.dart';
|
| import 'parameters.dart';
|
| import 'server.dart';
|
| -import 'two_way_stream.dart';
|
| +import 'utils.dart';
|
|
|
| /// A JSON-RPC 2.0 client *and* server.
|
| ///
|
| @@ -17,7 +18,7 @@ import 'two_way_stream.dart';
|
| /// 2.0 endpoint. It sends both requests and responses across the same
|
| /// communication channel and expects to connect to a peer that does the same.
|
| class Peer implements Client, Server {
|
| - TwoWayStream _streams;
|
| + final ChannelManager _manager;
|
|
|
| /// The underlying client that handles request-sending and response-receiving
|
| /// logic.
|
| @@ -35,55 +36,31 @@ class Peer implements Client, Server {
|
| /// they're responses.
|
| final _clientIncomingForwarder = new StreamController(sync: true);
|
|
|
| - /// A stream controller that forwards outgoing messages from both [_server]
|
| - /// and [_client].
|
| - final _outgoingForwarder = new StreamController(sync: true);
|
| -
|
| - Future get done => _streams.done;
|
| - bool get isClosed => _streams.isClosed;
|
| + Future get done => _manager.done;
|
| + bool get isClosed => _manager.isClosed;
|
|
|
| - /// Creates a [Peer] that reads incoming messages from [incoming] and writes
|
| - /// outgoing messages to [outgoing].
|
| + /// Creates a [Peer] that communicates over [channel].
|
| ///
|
| - /// If [incoming] is a [StreamSink] as well as a [Stream] (for example, a
|
| - /// `WebSocket`), [outgoing] may be omitted.
|
| - ///
|
| - /// Note that the peer won't begin listening to [incoming] until [Peer.listen]
|
| + /// Note that the peer won't begin listening to [channel] until [Peer.listen]
|
| /// is called.
|
| - Peer(Stream<String> incoming, [StreamSink<String> outgoing]) {
|
| - _streams = new TwoWayStream("Peer", incoming, "incoming",
|
| - outgoing, "outgoing", onInvalidInput: (message, error) {
|
| - _streams.add(new RpcException(error_code.PARSE_ERROR,
|
| - 'Invalid JSON: ${error.message}').serialize(message));
|
| - });
|
| + Peer(StreamChannel<String> channel)
|
| + : this.withoutJson(channel
|
| + .transform(jsonDocument)
|
| + .transform(respondToFormatExceptions));
|
|
|
| - _outgoingForwarder.stream.listen(_streams.add);
|
| - _server = new Server.withoutJson(
|
| - _serverIncomingForwarder.stream, _outgoingForwarder);
|
| - _client = new Client.withoutJson(
|
| - _clientIncomingForwarder.stream, _outgoingForwarder);
|
| - }
|
| -
|
| - /// Creates a [Peer] that reads incoming decoded messages from [incoming] and
|
| - /// writes outgoing decoded messages to [outgoing].
|
| + /// Creates a [Peer] that communicates using decoded messages over [channel].
|
| ///
|
| /// Unlike [new Peer], this doesn't read or write JSON strings. Instead, it
|
| /// reads and writes decoded maps or lists.
|
| ///
|
| - /// If [incoming] is a [StreamSink] as well as a [Stream], [outgoing] may be
|
| - /// omitted.
|
| - ///
|
| - /// Note that the peer won't begin listening to [incoming] until
|
| + /// Note that the peer won't begin listening to [channel] until
|
| /// [Peer.listen] is called.
|
| - Peer.withoutJson(Stream incoming, [StreamSink outgoing]) {
|
| - _streams = new TwoWayStream.withoutJson("Peer", incoming, "incoming",
|
| - outgoing, "outgoing");
|
| -
|
| - _outgoingForwarder.stream.listen(_streams.add);
|
| - _server = new Server.withoutJson(
|
| - _serverIncomingForwarder.stream, _outgoingForwarder);
|
| - _client = new Client.withoutJson(
|
| - _clientIncomingForwarder.stream, _outgoingForwarder);
|
| + Peer.withoutJson(StreamChannel channel)
|
| + : _manager = new ChannelManager("Peer", channel) {
|
| + _server = new Server.withoutJson(new StreamChannel(
|
| + _serverIncomingForwarder.stream, channel.sink));
|
| + _client = new Client.withoutJson(new StreamChannel(
|
| + _clientIncomingForwarder.stream, channel.sink));
|
| }
|
|
|
| // Client methods.
|
| @@ -109,7 +86,7 @@ class Peer implements Client, Server {
|
| Future listen() {
|
| _client.listen();
|
| _server.listen();
|
| - return _streams.listen((message) {
|
| + return _manager.listen((message) {
|
| if (message is Map) {
|
| if (message.containsKey('result') || message.containsKey('error')) {
|
| _clientIncomingForwarder.add(message);
|
| @@ -133,5 +110,5 @@ class Peer implements Client, Server {
|
| }
|
|
|
| Future close() =>
|
| - Future.wait([_client.close(), _server.close(), _streams.close()]);
|
| + Future.wait([_client.close(), _server.close(), _manager.close()]);
|
| }
|
|
|