OLD | NEW |
---|---|
1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 library json_rpc_2.server; | 5 library json_rpc_2.server; |
6 | 6 |
7 import 'dart:async'; | 7 import 'dart:async'; |
8 import 'dart:collection'; | 8 import 'dart:collection'; |
9 import 'dart:convert'; | 9 import 'dart:convert'; |
10 | 10 |
11 import 'package:stack_trace/stack_trace.dart'; | 11 import 'package:stack_trace/stack_trace.dart'; |
12 | 12 |
13 import '../error_code.dart' as error_code; | 13 import '../error_code.dart' as error_code; |
14 import 'exception.dart'; | 14 import 'exception.dart'; |
15 import 'parameters.dart'; | 15 import 'parameters.dart'; |
16 import 'utils.dart'; | 16 import 'utils.dart'; |
17 | 17 |
18 /// A JSON-RPC 2.0 server. | 18 /// A JSON-RPC 2.0 server. |
19 /// | 19 /// |
20 /// A server exposes methods that are called by requests, to which it provides | 20 /// A server exposes methods that are called by requests, to which it provides |
21 /// responses. Methods can be registered using [registerMethod] and | 21 /// responses. Methods can be registered using [registerMethod] and |
22 /// [registerFallback]. Requests can be handled using [handleRequest] and | 22 /// [registerFallback]. Requests can be handled using [handleRequest] and |
23 /// [parseRequest]. | 23 /// [parseRequest]. |
24 /// | 24 /// |
25 /// Note that since requests can arrive asynchronously and methods can run | 25 /// Note that since requests can arrive asynchronously and methods can run |
26 /// asynchronously, it's possible for multiple methods to be invoked at the same | 26 /// asynchronously, it's possible for multiple methods to be invoked at the same |
27 /// time, or even for a single method to be invoked multiple times at once. | 27 /// time, or even for a single method to be invoked multiple times at once. |
28 class Server { | 28 class Server { |
29 /// The stream for decoded requests. | |
30 final Stream _requests; | |
31 | |
32 /// The subscription to the decoded request stream. | |
33 StreamSubscription _requestSubscription; | |
34 | |
35 /// The sink for decoded responses. | |
36 final StreamSink _responses; | |
37 | |
38 /// The completer for [listen]. | |
39 /// | |
40 /// This is non-`null` after [listen] has been called. | |
41 Completer _listenCompleter; | |
42 | |
29 /// The methods registered for this server. | 43 /// The methods registered for this server. |
30 final _methods = new Map<String, Function>(); | 44 final _methods = new Map<String, Function>(); |
31 | 45 |
32 /// The fallback methods for this server. | 46 /// The fallback methods for this server. |
33 /// | 47 /// |
34 /// These are tried in order until one of them doesn't throw a | 48 /// These are tried in order until one of them doesn't throw a |
35 /// [RpcException.methodNotFound] exception. | 49 /// [RpcException.methodNotFound] exception. |
36 final _fallbacks = new Queue<Function>(); | 50 final _fallbacks = new Queue<Function>(); |
37 | 51 |
38 Server(); | 52 /// Creates a [Server] that reads requests from [requests] and writes |
53 /// responses to [responses]. | |
54 /// | |
55 /// If [requests] is a [StreamSink] as well as a [Stream] (for example, a | |
56 /// `WebSocket`), [responses] may be omitted. | |
57 /// | |
58 /// Note that the server won't begin listening to [requests] until | |
59 /// [Server.listen] is called. | |
60 factory Server(Stream<String> requests, [StreamSink<String> responses]) { | |
61 if (responses == null) { | |
62 if (requests is! StreamSink) { | |
63 throw new ArgumentError("Either `requests` must be a StreamSink or " | |
64 "`responses` must be passed."); | |
65 } | |
66 responses = requests as StreamSink; | |
67 } | |
68 | |
69 var wrappedResponses = mapStreamSink(responses, JSON.encode); | |
70 return new Server.withoutJson(requests.expand((request) { | |
71 var decodedRequest; | |
72 try { | |
73 decodedRequest = JSON.decode(request); | |
74 } on FormatException catch (error) { | |
75 wrappedResponses.add(new RpcException(error_code.PARSE_ERROR, | |
76 'Invalid JSON: ${error.message}').serialize(request)); | |
77 return []; | |
78 } | |
79 | |
80 return [decodedRequest]; | |
81 }), wrappedResponses); | |
82 } | |
83 | |
84 /// Creates a [Server] that reads decoded requests from [requests] and writes | |
85 /// decoded responses to [responses]. | |
86 /// | |
87 /// Unlike [new Server], this doesn't read or write JSON strings. Instead, it | |
88 /// reads and writes decoded maps or lists. | |
89 /// | |
90 /// If [requests] is a [StreamSink] as well as a [Stream] (for example, a | |
91 /// `WebSocket`), [responses] may be omitted. | |
Bob Nystrom
2014/06/03 22:26:13
WebSocket is no longer a good example here since i
nweiz
2014/06/03 23:37:00
What do you mean? WebSocket sends and receives str
Bob Nystrom
2014/06/04 16:40:48
Right, but this method sends and receives decoded
nweiz
2014/06/04 20:33:51
Oh, gotcha. Changed.
| |
92 /// | |
93 /// Note that the server won't begin listening to [requests] until | |
94 /// [Server.listen] is called. | |
95 Server.withoutJson(Stream requests, [StreamSink responses]) | |
96 : _requests = requests, | |
97 _responses = responses == null && requests is StreamSink ? | |
98 requests : responses { | |
99 if (_responses == null) { | |
100 throw new ArgumentError("Either `requests` must be a StreamSink or " | |
101 "`responses` must be passed."); | |
102 } | |
103 } | |
104 | |
105 /// Starts listening to the underlying stream. | |
106 /// | |
107 /// Returns a [Future] that will complete when the stream is closed of when it | |
Bob Nystrom
2014/06/03 22:26:13
"of" -> "or".
nweiz
2014/06/03 23:37:00
Done.
| |
108 /// has an error. | |
109 /// | |
110 /// [listen] may only be called once. | |
111 Future listen() { | |
112 if (_listenCompleter != null) { | |
113 return new Future.error(new StateError( | |
114 "Can only call Server.listen once on a given server."), | |
115 new Chain.current()); | |
Bob Nystrom
2014/06/03 22:26:13
For programmatic errors like StateError, I think i
nweiz
2014/06/03 23:37:00
Done.
| |
116 } | |
117 | |
118 _listenCompleter = new Completer(); | |
119 _requestSubscription = _requests.listen(_handleRequest, | |
120 onError: (error, stackTrace) { | |
121 if (_listenCompleter.isCompleted) return; | |
122 _responses.close(); | |
123 _listenCompleter.completeError(error, stackTrace); | |
124 }, onDone: () { | |
125 if (_listenCompleter.isCompleted) return; | |
126 _responses.close(); | |
127 _listenCompleter.complete(); | |
128 }, cancelOnError: true); | |
129 | |
130 return _listenCompleter.future; | |
131 } | |
132 | |
133 /// Closes the server's request subscription and response sink. | |
134 /// | |
135 /// Returns a [Future] that completes when all resources have been released. | |
136 /// | |
137 /// A server can't be closed before [listen] has been called. | |
138 Future close() { | |
139 if (_listenCompleter == null) { | |
140 return new Future.error(new StateError( | |
141 "Can't call Server.close before Server.listen."), | |
142 new Chain.current()); | |
143 } | |
144 | |
145 if (!_listenCompleter.isCompleted) _listenCompleter.complete(); | |
146 | |
147 var subscriptionFuture = _requestSubscription.cancel(); | |
148 // TODO(nweiz): include the response future in the return value when issue | |
149 // 19095 is fixed. | |
150 _responses.close(); | |
151 return subscriptionFuture == null ? new Future.value() : subscriptionFuture; | |
152 } | |
39 | 153 |
40 /// Registers a method named [name] on this server. | 154 /// Registers a method named [name] on this server. |
41 /// | 155 /// |
42 /// [callback] can take either zero or one arguments. If it takes zero, any | 156 /// [callback] can take either zero or one arguments. If it takes zero, any |
43 /// requests for that method that include parameters will be rejected. If it | 157 /// requests for that method that include parameters will be rejected. If it |
44 /// takes one, it will be passed a [Parameters] object. | 158 /// takes one, it will be passed a [Parameters] object. |
45 /// | 159 /// |
46 /// [callback] can return either a JSON-serializable object or a Future that | 160 /// [callback] can return either a JSON-serializable object or a Future that |
47 /// completes to a JSON-serializable object. Any errors in [callback] will be | 161 /// completes to a JSON-serializable object. Any errors in [callback] will be |
48 /// reported to the client as JSON-RPC 2.0 errors. | 162 /// reported to the client as JSON-RPC 2.0 errors. |
(...skipping 13 matching lines...) Expand all Loading... | |
62 /// [RpcException.methodNotFound] exception. | 176 /// [RpcException.methodNotFound] exception. |
63 /// | 177 /// |
64 /// [callback] can return either a JSON-serializable object or a Future that | 178 /// [callback] can return either a JSON-serializable object or a Future that |
65 /// completes to a JSON-serializable object. Any errors in [callback] will be | 179 /// completes to a JSON-serializable object. Any errors in [callback] will be |
66 /// reported to the client as JSON-RPC 2.0 errors. [callback] may send custom | 180 /// reported to the client as JSON-RPC 2.0 errors. [callback] may send custom |
67 /// errors by throwing an [RpcException]. | 181 /// errors by throwing an [RpcException]. |
68 void registerFallback(callback(Parameters parameters)) { | 182 void registerFallback(callback(Parameters parameters)) { |
69 _fallbacks.add(callback); | 183 _fallbacks.add(callback); |
70 } | 184 } |
71 | 185 |
72 /// Handle a request that's already been parsed from JSON. | 186 /// Handle a request. |
73 /// | 187 /// |
74 /// [request] is expected to be a JSON-serializable object representing a | 188 /// [request] is expected to be a JSON-serializable object representing a |
75 /// request sent by a client. This calls the appropriate method or methods for | 189 /// request sent by a client. This calls the appropriate method or methods for |
76 /// handling that request and returns a JSON-serializable response, or `null` | 190 /// handling that request and returns a JSON-serializable response, or `null` |
77 /// if no response should be sent. [callback] may send custom | 191 /// if no response should be sent. [callback] may send custom |
78 /// errors by throwing an [RpcException]. | 192 /// errors by throwing an [RpcException]. |
79 Future handleRequest(request) { | 193 Future _handleRequest(request) { |
80 return syncFuture(() { | 194 return syncFuture(() { |
81 if (request is! List) return _handleSingleRequest(request); | 195 if (request is! List) return _handleSingleRequest(request); |
82 if (request.isEmpty) { | 196 if (request.isEmpty) { |
83 return new RpcException(error_code.INVALID_REQUEST, 'A batch must ' | 197 return new RpcException(error_code.INVALID_REQUEST, 'A batch must ' |
84 'contain at least one request.').serialize(request); | 198 'contain at least one request.').serialize(request); |
85 } | 199 } |
86 | 200 |
87 return Future.wait(request.map(_handleSingleRequest)).then((results) { | 201 return Future.wait(request.map(_handleSingleRequest)).then((results) { |
88 var nonNull = results.where((result) => result != null); | 202 var nonNull = results.where((result) => result != null); |
89 return nonNull.isEmpty ? null : nonNull.toList(); | 203 return nonNull.isEmpty ? null : nonNull.toList(); |
90 }); | 204 }); |
91 }); | 205 }).then(_responses.add); |
92 } | |
93 | |
94 /// Parses and handles a JSON serialized request. | |
95 /// | |
96 /// This calls the appropriate method or methods for handling that request and | |
97 /// returns a JSON string, or `null` if no response should be sent. | |
98 Future<String> parseRequest(String request) { | |
99 return syncFuture(() { | |
100 var decodedRequest; | |
101 try { | |
102 decodedRequest = JSON.decode(request); | |
103 } on FormatException catch (error) { | |
104 return new RpcException(error_code.PARSE_ERROR, 'Invalid JSON: ' | |
105 '${error.message}').serialize(request); | |
106 } | |
107 | |
108 return handleRequest(decodedRequest); | |
109 }).then((response) { | |
110 if (response == null) return null; | |
111 return JSON.encode(response); | |
112 }); | |
113 } | 206 } |
114 | 207 |
115 /// Handles an individual parsed request. | 208 /// Handles an individual parsed request. |
116 Future _handleSingleRequest(request) { | 209 Future _handleSingleRequest(request) { |
117 return syncFuture(() { | 210 return syncFuture(() { |
118 _validateRequest(request); | 211 _validateRequest(request); |
119 | 212 |
120 var name = request['method']; | 213 var name = request['method']; |
121 var method = _methods[name]; | 214 var method = _methods[name]; |
122 if (method == null) method = _tryFallbacks; | 215 if (method == null) method = _tryFallbacks; |
(...skipping 88 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
211 return syncFuture(() => iterator.current(params)).catchError((error) { | 304 return syncFuture(() => iterator.current(params)).catchError((error) { |
212 if (error is! RpcException) throw error; | 305 if (error is! RpcException) throw error; |
213 if (error.code != error_code.METHOD_NOT_FOUND) throw error; | 306 if (error.code != error_code.METHOD_NOT_FOUND) throw error; |
214 return _tryNext(); | 307 return _tryNext(); |
215 }); | 308 }); |
216 } | 309 } |
217 | 310 |
218 return _tryNext(); | 311 return _tryNext(); |
219 } | 312 } |
220 } | 313 } |
OLD | NEW |