OLD | NEW |
1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 library json_rpc_2.server; | 5 library json_rpc_2.server; |
6 | 6 |
7 import 'dart:async'; | 7 import 'dart:async'; |
8 import 'dart:collection'; | 8 import 'dart:collection'; |
9 import 'dart:convert'; | 9 import 'dart:convert'; |
10 | 10 |
11 import 'package:stack_trace/stack_trace.dart'; | 11 import 'package:stack_trace/stack_trace.dart'; |
12 | 12 |
13 import '../error_code.dart' as error_code; | 13 import '../error_code.dart' as error_code; |
14 import 'exception.dart'; | 14 import 'exception.dart'; |
15 import 'parameters.dart'; | 15 import 'parameters.dart'; |
| 16 import 'two_way_stream.dart'; |
16 import 'utils.dart'; | 17 import 'utils.dart'; |
17 | 18 |
18 /// A JSON-RPC 2.0 server. | 19 /// A JSON-RPC 2.0 server. |
19 /// | 20 /// |
20 /// A server exposes methods that are called by requests, to which it provides | 21 /// A server exposes methods that are called by requests, to which it provides |
21 /// responses. Methods can be registered using [registerMethod] and | 22 /// responses. Methods can be registered using [registerMethod] and |
22 /// [registerFallback]. Requests can be handled using [handleRequest] and | 23 /// [registerFallback]. Requests can be handled using [handleRequest] and |
23 /// [parseRequest]. | 24 /// [parseRequest]. |
24 /// | 25 /// |
25 /// Note that since requests can arrive asynchronously and methods can run | 26 /// Note that since requests can arrive asynchronously and methods can run |
26 /// asynchronously, it's possible for multiple methods to be invoked at the same | 27 /// asynchronously, it's possible for multiple methods to be invoked at the same |
27 /// time, or even for a single method to be invoked multiple times at once. | 28 /// time, or even for a single method to be invoked multiple times at once. |
28 class Server { | 29 class Server { |
29 /// The stream for decoded requests. | 30 TwoWayStream _streams; |
30 final Stream _requests; | |
31 | |
32 /// The subscription to the decoded request stream. | |
33 StreamSubscription _requestSubscription; | |
34 | |
35 /// The sink for decoded responses. | |
36 final StreamSink _responses; | |
37 | |
38 /// The completer for [listen]. | |
39 /// | |
40 /// This is non-`null` after [listen] has been called. | |
41 Completer _listenCompleter; | |
42 | 31 |
43 /// The methods registered for this server. | 32 /// The methods registered for this server. |
44 final _methods = new Map<String, Function>(); | 33 final _methods = new Map<String, Function>(); |
45 | 34 |
46 /// The fallback methods for this server. | 35 /// The fallback methods for this server. |
47 /// | 36 /// |
48 /// These are tried in order until one of them doesn't throw a | 37 /// These are tried in order until one of them doesn't throw a |
49 /// [RpcException.methodNotFound] exception. | 38 /// [RpcException.methodNotFound] exception. |
50 final _fallbacks = new Queue<Function>(); | 39 final _fallbacks = new Queue<Function>(); |
51 | 40 |
52 /// Creates a [Server] that reads requests from [requests] and writes | 41 /// Creates a [Server] that reads requests from [requests] and writes |
53 /// responses to [responses]. | 42 /// responses to [responses]. |
54 /// | 43 /// |
55 /// If [requests] is a [StreamSink] as well as a [Stream] (for example, a | 44 /// If [requests] is a [StreamSink] as well as a [Stream] (for example, a |
56 /// `WebSocket`), [responses] may be omitted. | 45 /// `WebSocket`), [responses] may be omitted. |
57 /// | 46 /// |
58 /// Note that the server won't begin listening to [requests] until | 47 /// Note that the server won't begin listening to [requests] until |
59 /// [Server.listen] is called. | 48 /// [Server.listen] is called. |
60 factory Server(Stream<String> requests, [StreamSink<String> responses]) { | 49 Server(Stream<String> requests, [StreamSink<String> responses]) { |
61 if (responses == null) { | 50 _streams = new TwoWayStream("Server", requests, "requests", |
62 if (requests is! StreamSink) { | 51 responses, "responses", onInvalidInput: (message, error) { |
63 throw new ArgumentError("Either `requests` must be a StreamSink or " | 52 _streams.add(new RpcException(error_code.PARSE_ERROR, |
64 "`responses` must be passed."); | 53 'Invalid JSON: ${error.message}').serialize(message)); |
65 } | 54 }); |
66 responses = requests as StreamSink; | |
67 } | |
68 | |
69 var wrappedResponses = mapStreamSink(responses, JSON.encode); | |
70 return new Server.withoutJson(requests.expand((request) { | |
71 var decodedRequest; | |
72 try { | |
73 decodedRequest = JSON.decode(request); | |
74 } on FormatException catch (error) { | |
75 wrappedResponses.add(new RpcException(error_code.PARSE_ERROR, | |
76 'Invalid JSON: ${error.message}').serialize(request)); | |
77 return []; | |
78 } | |
79 | |
80 return [decodedRequest]; | |
81 }), wrappedResponses); | |
82 } | 55 } |
83 | 56 |
84 /// Creates a [Server] that reads decoded requests from [requests] and writes | 57 /// Creates a [Server] that reads decoded requests from [requests] and writes |
85 /// decoded responses to [responses]. | 58 /// decoded responses to [responses]. |
86 /// | 59 /// |
87 /// Unlike [new Server], this doesn't read or write JSON strings. Instead, it | 60 /// Unlike [new Server], this doesn't read or write JSON strings. Instead, it |
88 /// reads and writes decoded maps or lists. | 61 /// reads and writes decoded maps or lists. |
89 /// | 62 /// |
90 /// If [requests] is a [StreamSink] as well as a [Stream], [responses] may be | 63 /// If [requests] is a [StreamSink] as well as a [Stream], [responses] may be |
91 /// omitted. | 64 /// omitted. |
92 /// | 65 /// |
93 /// Note that the server won't begin listening to [requests] until | 66 /// Note that the server won't begin listening to [requests] until |
94 /// [Server.listen] is called. | 67 /// [Server.listen] is called. |
95 Server.withoutJson(Stream requests, [StreamSink responses]) | 68 Server.withoutJson(Stream requests, [StreamSink responses]) |
96 : _requests = requests, | 69 : _streams = new TwoWayStream.withoutJson( |
97 _responses = responses == null && requests is StreamSink ? | 70 "Server", requests, "requests", responses, "responses"); |
98 requests : responses { | |
99 if (_responses == null) { | |
100 throw new ArgumentError("Either `requests` must be a StreamSink or " | |
101 "`responses` must be passed."); | |
102 } | |
103 } | |
104 | 71 |
105 /// Starts listening to the underlying stream. | 72 /// Starts listening to the underlying stream. |
106 /// | 73 /// |
107 /// Returns a [Future] that will complete when the stream is closed or when it | 74 /// Returns a [Future] that will complete when the stream is closed or when it |
108 /// has an error. | 75 /// has an error. |
109 /// | 76 /// |
110 /// [listen] may only be called once. | 77 /// [listen] may only be called once. |
111 Future listen() { | 78 Future listen() => _streams.listen(_handleRequest); |
112 if (_listenCompleter != null) { | |
113 throw new StateError( | |
114 "Can only call Server.listen once on a given server."); | |
115 } | |
116 | |
117 _listenCompleter = new Completer(); | |
118 _requestSubscription = _requests.listen(_handleRequest, | |
119 onError: (error, stackTrace) { | |
120 if (_listenCompleter.isCompleted) return; | |
121 _responses.close(); | |
122 _listenCompleter.completeError(error, stackTrace); | |
123 }, onDone: () { | |
124 if (_listenCompleter.isCompleted) return; | |
125 _responses.close(); | |
126 _listenCompleter.complete(); | |
127 }, cancelOnError: true); | |
128 | |
129 return _listenCompleter.future; | |
130 } | |
131 | 79 |
132 /// Closes the server's request subscription and response sink. | 80 /// Closes the server's request subscription and response sink. |
133 /// | 81 /// |
134 /// Returns a [Future] that completes when all resources have been released. | 82 /// Returns a [Future] that completes when all resources have been released. |
135 /// | 83 /// |
136 /// A server can't be closed before [listen] has been called. | 84 /// A server can't be closed before [listen] has been called. |
137 Future close() { | 85 Future close() => _streams.close(); |
138 if (_listenCompleter == null) { | |
139 throw new StateError("Can't call Server.close before Server.listen."); | |
140 } | |
141 | |
142 if (!_listenCompleter.isCompleted) _listenCompleter.complete(); | |
143 | |
144 var subscriptionFuture = _requestSubscription.cancel(); | |
145 // TODO(nweiz): include the response future in the return value when issue | |
146 // 19095 is fixed. | |
147 _responses.close(); | |
148 return subscriptionFuture == null ? new Future.value() : subscriptionFuture; | |
149 } | |
150 | 86 |
151 /// Registers a method named [name] on this server. | 87 /// Registers a method named [name] on this server. |
152 /// | 88 /// |
153 /// [callback] can take either zero or one arguments. If it takes zero, any | 89 /// [callback] can take either zero or one arguments. If it takes zero, any |
154 /// requests for that method that include parameters will be rejected. If it | 90 /// requests for that method that include parameters will be rejected. If it |
155 /// takes one, it will be passed a [Parameters] object. | 91 /// takes one, it will be passed a [Parameters] object. |
156 /// | 92 /// |
157 /// [callback] can return either a JSON-serializable object or a Future that | 93 /// [callback] can return either a JSON-serializable object or a Future that |
158 /// completes to a JSON-serializable object. Any errors in [callback] will be | 94 /// completes to a JSON-serializable object. Any errors in [callback] will be |
159 /// reported to the client as JSON-RPC 2.0 errors. | 95 /// reported to the client as JSON-RPC 2.0 errors. |
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
192 if (request is! List) return _handleSingleRequest(request); | 128 if (request is! List) return _handleSingleRequest(request); |
193 if (request.isEmpty) { | 129 if (request.isEmpty) { |
194 return new RpcException(error_code.INVALID_REQUEST, 'A batch must ' | 130 return new RpcException(error_code.INVALID_REQUEST, 'A batch must ' |
195 'contain at least one request.').serialize(request); | 131 'contain at least one request.').serialize(request); |
196 } | 132 } |
197 | 133 |
198 return Future.wait(request.map(_handleSingleRequest)).then((results) { | 134 return Future.wait(request.map(_handleSingleRequest)).then((results) { |
199 var nonNull = results.where((result) => result != null); | 135 var nonNull = results.where((result) => result != null); |
200 return nonNull.isEmpty ? null : nonNull.toList(); | 136 return nonNull.isEmpty ? null : nonNull.toList(); |
201 }); | 137 }); |
202 }).then(_responses.add); | 138 }).then(_streams.add); |
203 } | 139 } |
204 | 140 |
205 /// Handles an individual parsed request. | 141 /// Handles an individual parsed request. |
206 Future _handleSingleRequest(request) { | 142 Future _handleSingleRequest(request) { |
207 return syncFuture(() { | 143 return syncFuture(() { |
208 _validateRequest(request); | 144 _validateRequest(request); |
209 | 145 |
210 var name = request['method']; | 146 var name = request['method']; |
211 var method = _methods[name]; | 147 var method = _methods[name]; |
212 if (method == null) method = _tryFallbacks; | 148 if (method == null) method = _tryFallbacks; |
(...skipping 88 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
301 return syncFuture(() => iterator.current(params)).catchError((error) { | 237 return syncFuture(() => iterator.current(params)).catchError((error) { |
302 if (error is! RpcException) throw error; | 238 if (error is! RpcException) throw error; |
303 if (error.code != error_code.METHOD_NOT_FOUND) throw error; | 239 if (error.code != error_code.METHOD_NOT_FOUND) throw error; |
304 return _tryNext(); | 240 return _tryNext(); |
305 }); | 241 }); |
306 } | 242 } |
307 | 243 |
308 return _tryNext(); | 244 return _tryNext(); |
309 } | 245 } |
310 } | 246 } |
OLD | NEW |