Index: lib/src/peer.dart |
diff --git a/lib/src/peer.dart b/lib/src/peer.dart |
index a6707e255ad2e2b22d96c0c29f358f200b454886..810d173b228681ce012aeda05e5b11448229a5a6 100644 |
--- a/lib/src/peer.dart |
+++ b/lib/src/peer.dart |
@@ -4,12 +4,13 @@ |
import 'dart:async'; |
-import '../error_code.dart' as error_code; |
+import 'package:stream_channel/stream_channel.dart'; |
+ |
+import 'channel_manager.dart'; |
import 'client.dart'; |
-import 'exception.dart'; |
import 'parameters.dart'; |
import 'server.dart'; |
-import 'two_way_stream.dart'; |
+import 'utils.dart'; |
/// A JSON-RPC 2.0 client *and* server. |
/// |
@@ -17,7 +18,7 @@ import 'two_way_stream.dart'; |
/// 2.0 endpoint. It sends both requests and responses across the same |
/// communication channel and expects to connect to a peer that does the same. |
class Peer implements Client, Server { |
- TwoWayStream _streams; |
+ final ChannelManager _manager; |
/// The underlying client that handles request-sending and response-receiving |
/// logic. |
@@ -35,55 +36,31 @@ class Peer implements Client, Server { |
/// they're responses. |
final _clientIncomingForwarder = new StreamController(sync: true); |
- /// A stream controller that forwards outgoing messages from both [_server] |
- /// and [_client]. |
- final _outgoingForwarder = new StreamController(sync: true); |
- |
- Future get done => _streams.done; |
- bool get isClosed => _streams.isClosed; |
+ Future get done => _manager.done; |
+ bool get isClosed => _manager.isClosed; |
- /// Creates a [Peer] that reads incoming messages from [incoming] and writes |
- /// outgoing messages to [outgoing]. |
+ /// Creates a [Peer] that communicates over [channel]. |
/// |
- /// If [incoming] is a [StreamSink] as well as a [Stream] (for example, a |
- /// `WebSocket`), [outgoing] may be omitted. |
- /// |
- /// Note that the peer won't begin listening to [incoming] until [Peer.listen] |
+ /// Note that the peer won't begin listening to [channel] until [Peer.listen] |
/// is called. |
- Peer(Stream<String> incoming, [StreamSink<String> outgoing]) { |
- _streams = new TwoWayStream("Peer", incoming, "incoming", |
- outgoing, "outgoing", onInvalidInput: (message, error) { |
- _streams.add(new RpcException(error_code.PARSE_ERROR, |
- 'Invalid JSON: ${error.message}').serialize(message)); |
- }); |
+ Peer(StreamChannel<String> channel) |
+ : this.withoutJson(channel |
+ .transform(jsonDocument) |
+ .transform(respondToFormatExceptions)); |
- _outgoingForwarder.stream.listen(_streams.add); |
- _server = new Server.withoutJson( |
- _serverIncomingForwarder.stream, _outgoingForwarder); |
- _client = new Client.withoutJson( |
- _clientIncomingForwarder.stream, _outgoingForwarder); |
- } |
- |
- /// Creates a [Peer] that reads incoming decoded messages from [incoming] and |
- /// writes outgoing decoded messages to [outgoing]. |
+ /// Creates a [Peer] that communicates using decoded messages over [channel]. |
/// |
/// Unlike [new Peer], this doesn't read or write JSON strings. Instead, it |
/// reads and writes decoded maps or lists. |
/// |
- /// If [incoming] is a [StreamSink] as well as a [Stream], [outgoing] may be |
- /// omitted. |
- /// |
- /// Note that the peer won't begin listening to [incoming] until |
+ /// Note that the peer won't begin listening to [channel] until |
/// [Peer.listen] is called. |
- Peer.withoutJson(Stream incoming, [StreamSink outgoing]) { |
- _streams = new TwoWayStream.withoutJson("Peer", incoming, "incoming", |
- outgoing, "outgoing"); |
- |
- _outgoingForwarder.stream.listen(_streams.add); |
- _server = new Server.withoutJson( |
- _serverIncomingForwarder.stream, _outgoingForwarder); |
- _client = new Client.withoutJson( |
- _clientIncomingForwarder.stream, _outgoingForwarder); |
+ Peer.withoutJson(StreamChannel channel) |
+ : _manager = new ChannelManager("Peer", channel) { |
+ _server = new Server.withoutJson(new StreamChannel( |
+ _serverIncomingForwarder.stream, channel.sink)); |
+ _client = new Client.withoutJson(new StreamChannel( |
+ _clientIncomingForwarder.stream, channel.sink)); |
} |
// Client methods. |
@@ -109,7 +86,7 @@ class Peer implements Client, Server { |
Future listen() { |
_client.listen(); |
_server.listen(); |
- return _streams.listen((message) { |
+ return _manager.listen((message) { |
if (message is Map) { |
if (message.containsKey('result') || message.containsKey('error')) { |
_clientIncomingForwarder.add(message); |
@@ -133,5 +110,5 @@ class Peer implements Client, Server { |
} |
Future close() => |
- Future.wait([_client.close(), _server.close(), _streams.close()]); |
+ Future.wait([_client.close(), _server.close(), _manager.close()]); |
} |