Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1284)

Unified Diff: lib/src/peer.dart

Issue 1652413002: Use StreamChannel. (Closed) Base URL: git@github.com:dart-lang/json_rpc_2.git@master
Patch Set: Code review changes Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « lib/src/client.dart ('k') | lib/src/server.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: lib/src/peer.dart
diff --git a/lib/src/peer.dart b/lib/src/peer.dart
index a6707e255ad2e2b22d96c0c29f358f200b454886..810d173b228681ce012aeda05e5b11448229a5a6 100644
--- a/lib/src/peer.dart
+++ b/lib/src/peer.dart
@@ -4,12 +4,13 @@
import 'dart:async';
-import '../error_code.dart' as error_code;
+import 'package:stream_channel/stream_channel.dart';
+
+import 'channel_manager.dart';
import 'client.dart';
-import 'exception.dart';
import 'parameters.dart';
import 'server.dart';
-import 'two_way_stream.dart';
+import 'utils.dart';
/// A JSON-RPC 2.0 client *and* server.
///
@@ -17,7 +18,7 @@ import 'two_way_stream.dart';
/// 2.0 endpoint. It sends both requests and responses across the same
/// communication channel and expects to connect to a peer that does the same.
class Peer implements Client, Server {
- TwoWayStream _streams;
+ final ChannelManager _manager;
/// The underlying client that handles request-sending and response-receiving
/// logic.
@@ -35,55 +36,31 @@ class Peer implements Client, Server {
/// they're responses.
final _clientIncomingForwarder = new StreamController(sync: true);
- /// A stream controller that forwards outgoing messages from both [_server]
- /// and [_client].
- final _outgoingForwarder = new StreamController(sync: true);
-
- Future get done => _streams.done;
- bool get isClosed => _streams.isClosed;
+ Future get done => _manager.done;
+ bool get isClosed => _manager.isClosed;
- /// Creates a [Peer] that reads incoming messages from [incoming] and writes
- /// outgoing messages to [outgoing].
+ /// Creates a [Peer] that communicates over [channel].
///
- /// If [incoming] is a [StreamSink] as well as a [Stream] (for example, a
- /// `WebSocket`), [outgoing] may be omitted.
- ///
- /// Note that the peer won't begin listening to [incoming] until [Peer.listen]
+ /// Note that the peer won't begin listening to [channel] until [Peer.listen]
/// is called.
- Peer(Stream<String> incoming, [StreamSink<String> outgoing]) {
- _streams = new TwoWayStream("Peer", incoming, "incoming",
- outgoing, "outgoing", onInvalidInput: (message, error) {
- _streams.add(new RpcException(error_code.PARSE_ERROR,
- 'Invalid JSON: ${error.message}').serialize(message));
- });
+ Peer(StreamChannel<String> channel)
+ : this.withoutJson(channel
+ .transform(jsonDocument)
+ .transform(respondToFormatExceptions));
- _outgoingForwarder.stream.listen(_streams.add);
- _server = new Server.withoutJson(
- _serverIncomingForwarder.stream, _outgoingForwarder);
- _client = new Client.withoutJson(
- _clientIncomingForwarder.stream, _outgoingForwarder);
- }
-
- /// Creates a [Peer] that reads incoming decoded messages from [incoming] and
- /// writes outgoing decoded messages to [outgoing].
+ /// Creates a [Peer] that communicates using decoded messages over [channel].
///
/// Unlike [new Peer], this doesn't read or write JSON strings. Instead, it
/// reads and writes decoded maps or lists.
///
- /// If [incoming] is a [StreamSink] as well as a [Stream], [outgoing] may be
- /// omitted.
- ///
- /// Note that the peer won't begin listening to [incoming] until
+ /// Note that the peer won't begin listening to [channel] until
/// [Peer.listen] is called.
- Peer.withoutJson(Stream incoming, [StreamSink outgoing]) {
- _streams = new TwoWayStream.withoutJson("Peer", incoming, "incoming",
- outgoing, "outgoing");
-
- _outgoingForwarder.stream.listen(_streams.add);
- _server = new Server.withoutJson(
- _serverIncomingForwarder.stream, _outgoingForwarder);
- _client = new Client.withoutJson(
- _clientIncomingForwarder.stream, _outgoingForwarder);
+ Peer.withoutJson(StreamChannel channel)
+ : _manager = new ChannelManager("Peer", channel) {
+ _server = new Server.withoutJson(new StreamChannel(
+ _serverIncomingForwarder.stream, channel.sink));
+ _client = new Client.withoutJson(new StreamChannel(
+ _clientIncomingForwarder.stream, channel.sink));
}
// Client methods.
@@ -109,7 +86,7 @@ class Peer implements Client, Server {
Future listen() {
_client.listen();
_server.listen();
- return _streams.listen((message) {
+ return _manager.listen((message) {
if (message is Map) {
if (message.containsKey('result') || message.containsKey('error')) {
_clientIncomingForwarder.add(message);
@@ -133,5 +110,5 @@ class Peer implements Client, Server {
}
Future close() =>
- Future.wait([_client.close(), _server.close(), _streams.close()]);
+ Future.wait([_client.close(), _server.close(), _manager.close()]);
}
« no previous file with comments | « lib/src/client.dart ('k') | lib/src/server.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698