OLD | NEW |
1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 import 'dart:async'; | 5 import 'dart:async'; |
6 | 6 |
7 import '../error_code.dart' as error_code; | 7 import 'package:stream_channel/stream_channel.dart'; |
| 8 |
| 9 import 'channel_manager.dart'; |
8 import 'client.dart'; | 10 import 'client.dart'; |
9 import 'exception.dart'; | |
10 import 'parameters.dart'; | 11 import 'parameters.dart'; |
11 import 'server.dart'; | 12 import 'server.dart'; |
12 import 'two_way_stream.dart'; | 13 import 'utils.dart'; |
13 | 14 |
14 /// A JSON-RPC 2.0 client *and* server. | 15 /// A JSON-RPC 2.0 client *and* server. |
15 /// | 16 /// |
16 /// This supports bidirectional peer-to-peer communication with another JSON-RPC | 17 /// This supports bidirectional peer-to-peer communication with another JSON-RPC |
17 /// 2.0 endpoint. It sends both requests and responses across the same | 18 /// 2.0 endpoint. It sends both requests and responses across the same |
18 /// communication channel and expects to connect to a peer that does the same. | 19 /// communication channel and expects to connect to a peer that does the same. |
19 class Peer implements Client, Server { | 20 class Peer implements Client, Server { |
20 TwoWayStream _streams; | 21 final ChannelManager _manager; |
21 | 22 |
22 /// The underlying client that handles request-sending and response-receiving | 23 /// The underlying client that handles request-sending and response-receiving |
23 /// logic. | 24 /// logic. |
24 Client _client; | 25 Client _client; |
25 | 26 |
26 /// The underlying server that handles request-receiving and response-sending | 27 /// The underlying server that handles request-receiving and response-sending |
27 /// logic. | 28 /// logic. |
28 Server _server; | 29 Server _server; |
29 | 30 |
30 /// A stream controller that forwards incoming messages to [_server] if | 31 /// A stream controller that forwards incoming messages to [_server] if |
31 /// they're requests. | 32 /// they're requests. |
32 final _serverIncomingForwarder = new StreamController(sync: true); | 33 final _serverIncomingForwarder = new StreamController(sync: true); |
33 | 34 |
34 /// A stream controller that forwards incoming messages to [_client] if | 35 /// A stream controller that forwards incoming messages to [_client] if |
35 /// they're responses. | 36 /// they're responses. |
36 final _clientIncomingForwarder = new StreamController(sync: true); | 37 final _clientIncomingForwarder = new StreamController(sync: true); |
37 | 38 |
38 /// A stream controller that forwards outgoing messages from both [_server] | 39 Future get done => _manager.done; |
39 /// and [_client]. | 40 bool get isClosed => _manager.isClosed; |
40 final _outgoingForwarder = new StreamController(sync: true); | |
41 | 41 |
42 Future get done => _streams.done; | 42 /// Creates a [Peer] that communicates over [channel]. |
43 bool get isClosed => _streams.isClosed; | 43 /// |
| 44 /// Note that the peer won't begin listening to [channel] until [Peer.listen] |
| 45 /// is called. |
| 46 Peer(StreamChannel<String> channel) |
| 47 : this.withoutJson(channel |
| 48 .transform(jsonDocument) |
| 49 .transform(respondToFormatExceptions)); |
44 | 50 |
45 /// Creates a [Peer] that reads incoming messages from [incoming] and writes | 51 /// Creates a [Peer] that communicates using decoded messages over [channel]. |
46 /// outgoing messages to [outgoing]. | |
47 /// | |
48 /// If [incoming] is a [StreamSink] as well as a [Stream] (for example, a | |
49 /// `WebSocket`), [outgoing] may be omitted. | |
50 /// | |
51 /// Note that the peer won't begin listening to [incoming] until [Peer.listen] | |
52 /// is called. | |
53 Peer(Stream<String> incoming, [StreamSink<String> outgoing]) { | |
54 _streams = new TwoWayStream("Peer", incoming, "incoming", | |
55 outgoing, "outgoing", onInvalidInput: (message, error) { | |
56 _streams.add(new RpcException(error_code.PARSE_ERROR, | |
57 'Invalid JSON: ${error.message}').serialize(message)); | |
58 }); | |
59 | |
60 _outgoingForwarder.stream.listen(_streams.add); | |
61 _server = new Server.withoutJson( | |
62 _serverIncomingForwarder.stream, _outgoingForwarder); | |
63 _client = new Client.withoutJson( | |
64 _clientIncomingForwarder.stream, _outgoingForwarder); | |
65 } | |
66 | |
67 /// Creates a [Peer] that reads incoming decoded messages from [incoming] and | |
68 /// writes outgoing decoded messages to [outgoing]. | |
69 /// | 52 /// |
70 /// Unlike [new Peer], this doesn't read or write JSON strings. Instead, it | 53 /// Unlike [new Peer], this doesn't read or write JSON strings. Instead, it |
71 /// reads and writes decoded maps or lists. | 54 /// reads and writes decoded maps or lists. |
72 /// | 55 /// |
73 /// If [incoming] is a [StreamSink] as well as a [Stream], [outgoing] may be | 56 /// Note that the peer won't begin listening to [channel] until |
74 /// omitted. | |
75 /// | |
76 /// Note that the peer won't begin listening to [incoming] until | |
77 /// [Peer.listen] is called. | 57 /// [Peer.listen] is called. |
78 Peer.withoutJson(Stream incoming, [StreamSink outgoing]) { | 58 Peer.withoutJson(StreamChannel channel) |
79 _streams = new TwoWayStream.withoutJson("Peer", incoming, "incoming", | 59 : _manager = new ChannelManager("Peer", channel) { |
80 outgoing, "outgoing"); | 60 _server = new Server.withoutJson(new StreamChannel( |
81 | 61 _serverIncomingForwarder.stream, channel.sink)); |
82 _outgoingForwarder.stream.listen(_streams.add); | 62 _client = new Client.withoutJson(new StreamChannel( |
83 _server = new Server.withoutJson( | 63 _clientIncomingForwarder.stream, channel.sink)); |
84 _serverIncomingForwarder.stream, _outgoingForwarder); | |
85 _client = new Client.withoutJson( | |
86 _clientIncomingForwarder.stream, _outgoingForwarder); | |
87 } | 64 } |
88 | 65 |
89 // Client methods. | 66 // Client methods. |
90 | 67 |
91 Future sendRequest(String method, [parameters]) => | 68 Future sendRequest(String method, [parameters]) => |
92 _client.sendRequest(method, parameters); | 69 _client.sendRequest(method, parameters); |
93 | 70 |
94 void sendNotification(String method, [parameters]) => | 71 void sendNotification(String method, [parameters]) => |
95 _client.sendNotification(method, parameters); | 72 _client.sendNotification(method, parameters); |
96 | 73 |
97 withBatch(callback()) => _client.withBatch(callback); | 74 withBatch(callback()) => _client.withBatch(callback); |
98 | 75 |
99 // Server methods. | 76 // Server methods. |
100 | 77 |
101 void registerMethod(String name, Function callback) => | 78 void registerMethod(String name, Function callback) => |
102 _server.registerMethod(name, callback); | 79 _server.registerMethod(name, callback); |
103 | 80 |
104 void registerFallback(callback(Parameters parameters)) => | 81 void registerFallback(callback(Parameters parameters)) => |
105 _server.registerFallback(callback); | 82 _server.registerFallback(callback); |
106 | 83 |
107 // Shared methods. | 84 // Shared methods. |
108 | 85 |
109 Future listen() { | 86 Future listen() { |
110 _client.listen(); | 87 _client.listen(); |
111 _server.listen(); | 88 _server.listen(); |
112 return _streams.listen((message) { | 89 return _manager.listen((message) { |
113 if (message is Map) { | 90 if (message is Map) { |
114 if (message.containsKey('result') || message.containsKey('error')) { | 91 if (message.containsKey('result') || message.containsKey('error')) { |
115 _clientIncomingForwarder.add(message); | 92 _clientIncomingForwarder.add(message); |
116 } else { | 93 } else { |
117 _serverIncomingForwarder.add(message); | 94 _serverIncomingForwarder.add(message); |
118 } | 95 } |
119 } else if (message is List && message.isNotEmpty && | 96 } else if (message is List && message.isNotEmpty && |
120 message.first is Map) { | 97 message.first is Map) { |
121 if (message.first.containsKey('result') || | 98 if (message.first.containsKey('result') || |
122 message.first.containsKey('error')) { | 99 message.first.containsKey('error')) { |
123 _clientIncomingForwarder.add(message); | 100 _clientIncomingForwarder.add(message); |
124 } else { | 101 } else { |
125 _serverIncomingForwarder.add(message); | 102 _serverIncomingForwarder.add(message); |
126 } | 103 } |
127 } else { | 104 } else { |
128 // Non-Map and -List messages are ill-formed, so we pass them to the | 105 // Non-Map and -List messages are ill-formed, so we pass them to the |
129 // server since it knows how to send error responses. | 106 // server since it knows how to send error responses. |
130 _serverIncomingForwarder.add(message); | 107 _serverIncomingForwarder.add(message); |
131 } | 108 } |
132 }); | 109 }); |
133 } | 110 } |
134 | 111 |
135 Future close() => | 112 Future close() => |
136 Future.wait([_client.close(), _server.close(), _streams.close()]); | 113 Future.wait([_client.close(), _server.close(), _manager.close()]); |
137 } | 114 } |
OLD | NEW |