Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(212)

Unified Diff: pkg/json_rpc_2/lib/src/peer.dart

Issue 707063002: Add a Peer class to json_rpc_2. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Code review changes Created 6 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « pkg/json_rpc_2/lib/src/client.dart ('k') | pkg/json_rpc_2/lib/src/server.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: pkg/json_rpc_2/lib/src/peer.dart
diff --git a/pkg/json_rpc_2/lib/src/peer.dart b/pkg/json_rpc_2/lib/src/peer.dart
new file mode 100644
index 0000000000000000000000000000000000000000..c83724d0605509ecb86ab13848d0185ce08edcd0
--- /dev/null
+++ b/pkg/json_rpc_2/lib/src/peer.dart
@@ -0,0 +1,136 @@
+// Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library json_rpc_2.peer;
+
+import 'dart:async';
+
+import '../error_code.dart' as error_code;
+import 'client.dart';
+import 'exception.dart';
+import 'parameters.dart';
+import 'server.dart';
+import 'two_way_stream.dart';
+
+/// A JSON-RPC 2.0 client *and* server.
+///
+/// This supports bidirectional peer-to-peer communication with another JSON-RPC
+/// 2.0 endpoint. It sends both requests and responses across the same
+/// communication channel and expects to connect to a peer that does the same.
+class Peer implements Client, Server {
+ TwoWayStream _streams;
+
+ /// The underlying client that handles request-sending and response-receiving
+ /// logic.
+ Client _client;
+
+ /// The underlying server that handles request-receiving and response-sending
+ /// logic.
+ Server _server;
+
+ /// A stream controller that forwards incoming messages to [_server] if
+ /// they're requests.
+ final _serverIncomingForwarder = new StreamController(sync: true);
+
+ /// A stream controller that forwards incoming messages to [_client] if
+ /// they're responses.
+ final _clientIncomingForwarder = new StreamController(sync: true);
+
+ /// A stream controller that forwards outgoing messages from both [_server]
+ /// and [_client].
+ final _outgoingForwarder = new StreamController(sync: true);
+
+ /// Creates a [Peer] that reads incoming messages from [incoming] and writes
+ /// outgoing messages to [outgoing].
+ ///
+ /// If [incoming] is a [StreamSink] as well as a [Stream] (for example, a
+ /// `WebSocket`), [outgoing] may be omitted.
+ ///
+ /// Note that the peer won't begin listening to [incoming] until [Peer.listen]
+ /// is called.
+ Peer(Stream<String> incoming, [StreamSink<String> outgoing]) {
+ _streams = new TwoWayStream("Peer", incoming, "incoming",
+ outgoing, "outgoing", onInvalidInput: (message, error) {
+ _streams.add(new RpcException(error_code.PARSE_ERROR,
+ 'Invalid JSON: ${error.message}').serialize(message));
+ });
+
+ _outgoingForwarder.stream.listen(_streams.add);
+ _server = new Server.withoutJson(
+ _serverIncomingForwarder.stream, _outgoingForwarder);
+ _client = new Client.withoutJson(
+ _clientIncomingForwarder.stream, _outgoingForwarder);
+ }
+
+ /// Creates a [Peer] that reads incoming decoded messages from [incoming] and
+ /// writes outgoing decoded messages to [outgoing].
+ ///
+ /// Unlike [new Peer], this doesn't read or write JSON strings. Instead, it
+ /// reads and writes decoded maps or lists.
+ ///
+ /// If [incoming] is a [StreamSink] as well as a [Stream], [outgoing] may be
+ /// omitted.
+ ///
+ /// Note that the peer won't begin listening to [incoming] until
+ /// [Peer.listen] is called.
+ Peer.withoutJson(Stream incoming, [StreamSink outgoing]) {
+ _streams = new TwoWayStream.withoutJson("Peer", incoming, "incoming",
+ outgoing, "outgoing");
+
+ _outgoingForwarder.stream.listen(_streams.add);
+ _server = new Server.withoutJson(
+ _serverIncomingForwarder.stream, _outgoingForwarder);
+ _client = new Client.withoutJson(
+ _clientIncomingForwarder.stream, _outgoingForwarder);
+ }
+
+ // Client methods.
+
+ Future sendRequest(String method, [parameters]) =>
+ _client.sendRequest(method, parameters);
+
+ void sendNotification(String method, [parameters]) =>
+ _client.sendNotification(method, parameters);
+
+ withBatch(callback()) => _client.withBatch(callback);
+
+ // Server methods.
+
+ void registerMethod(String name, Function callback) =>
+ _server.registerMethod(name, callback);
+
+ void registerFallback(callback(Parameters parameters)) =>
+ _server.registerFallback(callback);
+
+ // Shared methods.
+
+ Future listen() {
+ _client.listen();
+ _server.listen();
+ return _streams.listen((message) {
+ if (message is Map) {
+ if (message.containsKey('result') || message.containsKey('error')) {
+ _clientIncomingForwarder.add(message);
+ } else {
+ _serverIncomingForwarder.add(message);
+ }
+ } else if (message is List && message.isNotEmpty &&
+ message.first is Map) {
+ if (message.first.containsKey('result') ||
+ message.first.containsKey('error')) {
+ _clientIncomingForwarder.add(message);
+ } else {
+ _serverIncomingForwarder.add(message);
+ }
+ } else {
+ // Non-Map and -List messages are ill-formed, so we pass them to the
+ // server since it knows how to send error responses.
+ _serverIncomingForwarder.add(message);
+ }
+ });
+ }
+
+ Future close() =>
+ Future.wait([_client.close(), _server.close(), _streams.close()]);
+}
« no previous file with comments | « pkg/json_rpc_2/lib/src/client.dart ('k') | pkg/json_rpc_2/lib/src/server.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698