Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(255)

Side by Side Diff: lib/src/peer.dart

Issue 1652413002: Use StreamChannel. (Closed) Base URL: git@github.com:dart-lang/json_rpc_2.git@master
Patch Set: Code review changes Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « lib/src/client.dart ('k') | lib/src/server.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 import 'dart:async'; 5 import 'dart:async';
6 6
7 import '../error_code.dart' as error_code; 7 import 'package:stream_channel/stream_channel.dart';
8
9 import 'channel_manager.dart';
8 import 'client.dart'; 10 import 'client.dart';
9 import 'exception.dart';
10 import 'parameters.dart'; 11 import 'parameters.dart';
11 import 'server.dart'; 12 import 'server.dart';
12 import 'two_way_stream.dart'; 13 import 'utils.dart';
13 14
14 /// A JSON-RPC 2.0 client *and* server. 15 /// A JSON-RPC 2.0 client *and* server.
15 /// 16 ///
16 /// This supports bidirectional peer-to-peer communication with another JSON-RPC 17 /// This supports bidirectional peer-to-peer communication with another JSON-RPC
17 /// 2.0 endpoint. It sends both requests and responses across the same 18 /// 2.0 endpoint. It sends both requests and responses across the same
18 /// communication channel and expects to connect to a peer that does the same. 19 /// communication channel and expects to connect to a peer that does the same.
19 class Peer implements Client, Server { 20 class Peer implements Client, Server {
20 TwoWayStream _streams; 21 final ChannelManager _manager;
21 22
22 /// The underlying client that handles request-sending and response-receiving 23 /// The underlying client that handles request-sending and response-receiving
23 /// logic. 24 /// logic.
24 Client _client; 25 Client _client;
25 26
26 /// The underlying server that handles request-receiving and response-sending 27 /// The underlying server that handles request-receiving and response-sending
27 /// logic. 28 /// logic.
28 Server _server; 29 Server _server;
29 30
30 /// A stream controller that forwards incoming messages to [_server] if 31 /// A stream controller that forwards incoming messages to [_server] if
31 /// they're requests. 32 /// they're requests.
32 final _serverIncomingForwarder = new StreamController(sync: true); 33 final _serverIncomingForwarder = new StreamController(sync: true);
33 34
34 /// A stream controller that forwards incoming messages to [_client] if 35 /// A stream controller that forwards incoming messages to [_client] if
35 /// they're responses. 36 /// they're responses.
36 final _clientIncomingForwarder = new StreamController(sync: true); 37 final _clientIncomingForwarder = new StreamController(sync: true);
37 38
38 /// A stream controller that forwards outgoing messages from both [_server] 39 Future get done => _manager.done;
39 /// and [_client]. 40 bool get isClosed => _manager.isClosed;
40 final _outgoingForwarder = new StreamController(sync: true);
41 41
42 Future get done => _streams.done; 42 /// Creates a [Peer] that communicates over [channel].
43 bool get isClosed => _streams.isClosed; 43 ///
44 /// Note that the peer won't begin listening to [channel] until [Peer.listen]
45 /// is called.
46 Peer(StreamChannel<String> channel)
47 : this.withoutJson(channel
48 .transform(jsonDocument)
49 .transform(respondToFormatExceptions));
44 50
45 /// Creates a [Peer] that reads incoming messages from [incoming] and writes 51 /// Creates a [Peer] that communicates using decoded messages over [channel].
46 /// outgoing messages to [outgoing].
47 ///
48 /// If [incoming] is a [StreamSink] as well as a [Stream] (for example, a
49 /// `WebSocket`), [outgoing] may be omitted.
50 ///
51 /// Note that the peer won't begin listening to [incoming] until [Peer.listen]
52 /// is called.
53 Peer(Stream<String> incoming, [StreamSink<String> outgoing]) {
54 _streams = new TwoWayStream("Peer", incoming, "incoming",
55 outgoing, "outgoing", onInvalidInput: (message, error) {
56 _streams.add(new RpcException(error_code.PARSE_ERROR,
57 'Invalid JSON: ${error.message}').serialize(message));
58 });
59
60 _outgoingForwarder.stream.listen(_streams.add);
61 _server = new Server.withoutJson(
62 _serverIncomingForwarder.stream, _outgoingForwarder);
63 _client = new Client.withoutJson(
64 _clientIncomingForwarder.stream, _outgoingForwarder);
65 }
66
67 /// Creates a [Peer] that reads incoming decoded messages from [incoming] and
68 /// writes outgoing decoded messages to [outgoing].
69 /// 52 ///
70 /// Unlike [new Peer], this doesn't read or write JSON strings. Instead, it 53 /// Unlike [new Peer], this doesn't read or write JSON strings. Instead, it
71 /// reads and writes decoded maps or lists. 54 /// reads and writes decoded maps or lists.
72 /// 55 ///
73 /// If [incoming] is a [StreamSink] as well as a [Stream], [outgoing] may be 56 /// Note that the peer won't begin listening to [channel] until
74 /// omitted.
75 ///
76 /// Note that the peer won't begin listening to [incoming] until
77 /// [Peer.listen] is called. 57 /// [Peer.listen] is called.
78 Peer.withoutJson(Stream incoming, [StreamSink outgoing]) { 58 Peer.withoutJson(StreamChannel channel)
79 _streams = new TwoWayStream.withoutJson("Peer", incoming, "incoming", 59 : _manager = new ChannelManager("Peer", channel) {
80 outgoing, "outgoing"); 60 _server = new Server.withoutJson(new StreamChannel(
81 61 _serverIncomingForwarder.stream, channel.sink));
82 _outgoingForwarder.stream.listen(_streams.add); 62 _client = new Client.withoutJson(new StreamChannel(
83 _server = new Server.withoutJson( 63 _clientIncomingForwarder.stream, channel.sink));
84 _serverIncomingForwarder.stream, _outgoingForwarder);
85 _client = new Client.withoutJson(
86 _clientIncomingForwarder.stream, _outgoingForwarder);
87 } 64 }
88 65
89 // Client methods. 66 // Client methods.
90 67
91 Future sendRequest(String method, [parameters]) => 68 Future sendRequest(String method, [parameters]) =>
92 _client.sendRequest(method, parameters); 69 _client.sendRequest(method, parameters);
93 70
94 void sendNotification(String method, [parameters]) => 71 void sendNotification(String method, [parameters]) =>
95 _client.sendNotification(method, parameters); 72 _client.sendNotification(method, parameters);
96 73
97 withBatch(callback()) => _client.withBatch(callback); 74 withBatch(callback()) => _client.withBatch(callback);
98 75
99 // Server methods. 76 // Server methods.
100 77
101 void registerMethod(String name, Function callback) => 78 void registerMethod(String name, Function callback) =>
102 _server.registerMethod(name, callback); 79 _server.registerMethod(name, callback);
103 80
104 void registerFallback(callback(Parameters parameters)) => 81 void registerFallback(callback(Parameters parameters)) =>
105 _server.registerFallback(callback); 82 _server.registerFallback(callback);
106 83
107 // Shared methods. 84 // Shared methods.
108 85
109 Future listen() { 86 Future listen() {
110 _client.listen(); 87 _client.listen();
111 _server.listen(); 88 _server.listen();
112 return _streams.listen((message) { 89 return _manager.listen((message) {
113 if (message is Map) { 90 if (message is Map) {
114 if (message.containsKey('result') || message.containsKey('error')) { 91 if (message.containsKey('result') || message.containsKey('error')) {
115 _clientIncomingForwarder.add(message); 92 _clientIncomingForwarder.add(message);
116 } else { 93 } else {
117 _serverIncomingForwarder.add(message); 94 _serverIncomingForwarder.add(message);
118 } 95 }
119 } else if (message is List && message.isNotEmpty && 96 } else if (message is List && message.isNotEmpty &&
120 message.first is Map) { 97 message.first is Map) {
121 if (message.first.containsKey('result') || 98 if (message.first.containsKey('result') ||
122 message.first.containsKey('error')) { 99 message.first.containsKey('error')) {
123 _clientIncomingForwarder.add(message); 100 _clientIncomingForwarder.add(message);
124 } else { 101 } else {
125 _serverIncomingForwarder.add(message); 102 _serverIncomingForwarder.add(message);
126 } 103 }
127 } else { 104 } else {
128 // Non-Map and -List messages are ill-formed, so we pass them to the 105 // Non-Map and -List messages are ill-formed, so we pass them to the
129 // server since it knows how to send error responses. 106 // server since it knows how to send error responses.
130 _serverIncomingForwarder.add(message); 107 _serverIncomingForwarder.add(message);
131 } 108 }
132 }); 109 });
133 } 110 }
134 111
135 Future close() => 112 Future close() =>
136 Future.wait([_client.close(), _server.close(), _streams.close()]); 113 Future.wait([_client.close(), _server.close(), _manager.close()]);
137 } 114 }
OLDNEW
« no previous file with comments | « lib/src/client.dart ('k') | lib/src/server.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698