OLD | NEW |
1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 import 'dart:async'; | 5 import 'dart:async'; |
6 import 'dart:collection'; | 6 import 'dart:collection'; |
7 import 'dart:convert'; | 7 import 'dart:convert'; |
8 | 8 |
9 import 'package:stack_trace/stack_trace.dart'; | 9 import 'package:stack_trace/stack_trace.dart'; |
| 10 import 'package:stream_channel/stream_channel.dart'; |
10 | 11 |
11 import '../error_code.dart' as error_code; | 12 import '../error_code.dart' as error_code; |
| 13 import 'channel_manager.dart'; |
12 import 'exception.dart'; | 14 import 'exception.dart'; |
13 import 'parameters.dart'; | 15 import 'parameters.dart'; |
14 import 'two_way_stream.dart'; | |
15 import 'utils.dart'; | 16 import 'utils.dart'; |
16 | 17 |
17 /// A JSON-RPC 2.0 server. | 18 /// A JSON-RPC 2.0 server. |
18 /// | 19 /// |
19 /// A server exposes methods that are called by requests, to which it provides | 20 /// A server exposes methods that are called by requests, to which it provides |
20 /// responses. Methods can be registered using [registerMethod] and | 21 /// responses. Methods can be registered using [registerMethod] and |
21 /// [registerFallback]. Requests can be handled using [handleRequest] and | 22 /// [registerFallback]. Requests can be handled using [handleRequest] and |
22 /// [parseRequest]. | 23 /// [parseRequest]. |
23 /// | 24 /// |
24 /// Note that since requests can arrive asynchronously and methods can run | 25 /// Note that since requests can arrive asynchronously and methods can run |
25 /// asynchronously, it's possible for multiple methods to be invoked at the same | 26 /// asynchronously, it's possible for multiple methods to be invoked at the same |
26 /// time, or even for a single method to be invoked multiple times at once. | 27 /// time, or even for a single method to be invoked multiple times at once. |
27 class Server { | 28 class Server { |
28 TwoWayStream _streams; | 29 final ChannelManager _manager; |
29 | 30 |
30 /// The methods registered for this server. | 31 /// The methods registered for this server. |
31 final _methods = new Map<String, Function>(); | 32 final _methods = new Map<String, Function>(); |
32 | 33 |
33 /// The fallback methods for this server. | 34 /// The fallback methods for this server. |
34 /// | 35 /// |
35 /// These are tried in order until one of them doesn't throw a | 36 /// These are tried in order until one of them doesn't throw a |
36 /// [RpcException.methodNotFound] exception. | 37 /// [RpcException.methodNotFound] exception. |
37 final _fallbacks = new Queue<Function>(); | 38 final _fallbacks = new Queue<Function>(); |
38 | 39 |
39 /// Returns a [Future] that completes when the connection is closed. | 40 /// Returns a [Future] that completes when the underlying connection is |
| 41 /// closed. |
40 /// | 42 /// |
41 /// This is the same future that's returned by [listen]. | 43 /// This is the same future that's returned by [listen] and [close]. It may |
42 Future get done => _streams.done; | 44 /// complete before [close] is called if the remote endpoint closes the |
| 45 /// connection. |
| 46 Future get done => _manager.done; |
43 | 47 |
44 /// Whether the connection is closed. | 48 /// Whether the underlying connection is closed. |
45 bool get isClosed => _streams.isClosed; | 49 /// |
| 50 /// Note that this will be `true` before [close] is called if the remote |
| 51 /// endpoint closes the connection. |
| 52 bool get isClosed => _manager.isClosed; |
46 | 53 |
47 /// Creates a [Server] that reads requests from [requests] and writes | 54 /// Creates a [Server] that communicates over [channel]. |
48 /// responses to [responses]. | |
49 /// | |
50 /// If [requests] is a [StreamSink] as well as a [Stream] (for example, a | |
51 /// `WebSocket`), [responses] may be omitted. | |
52 /// | 55 /// |
53 /// Note that the server won't begin listening to [requests] until | 56 /// Note that the server won't begin listening to [requests] until |
54 /// [Server.listen] is called. | 57 /// [Server.listen] is called. |
55 Server(Stream<String> requests, [StreamSink<String> responses]) { | 58 Server(StreamChannel<String> channel) |
56 _streams = new TwoWayStream("Server", requests, "requests", | 59 : this.withoutJson(channel |
57 responses, "responses", onInvalidInput: (message, error) { | 60 .transform(jsonDocument) |
58 _streams.add(new RpcException(error_code.PARSE_ERROR, | 61 .transform(respondToFormatExceptions)); |
59 'Invalid JSON: ${error.message}').serialize(message)); | |
60 }); | |
61 } | |
62 | 62 |
63 /// Creates a [Server] that reads decoded requests from [requests] and writes | 63 /// Creates a [Server] that communicates using decoded messages over |
64 /// decoded responses to [responses]. | 64 /// [channel]. |
65 /// | 65 /// |
66 /// Unlike [new Server], this doesn't read or write JSON strings. Instead, it | 66 /// Unlike [new Server], this doesn't read or write JSON strings. Instead, it |
67 /// reads and writes decoded maps or lists. | 67 /// reads and writes decoded maps or lists. |
68 /// | 68 /// |
69 /// If [requests] is a [StreamSink] as well as a [Stream], [responses] may be | |
70 /// omitted. | |
71 /// | |
72 /// Note that the server won't begin listening to [requests] until | 69 /// Note that the server won't begin listening to [requests] until |
73 /// [Server.listen] is called. | 70 /// [Server.listen] is called. |
74 Server.withoutJson(Stream requests, [StreamSink responses]) | 71 Server.withoutJson(StreamChannel channel) |
75 : _streams = new TwoWayStream.withoutJson( | 72 : _manager = new ChannelManager("Server", channel); |
76 "Server", requests, "requests", responses, "responses"); | |
77 | 73 |
78 /// Starts listening to the underlying stream. | 74 /// Starts listening to the underlying stream. |
79 /// | 75 /// |
80 /// Returns a [Future] that will complete when the stream is closed or when it | 76 /// Returns a [Future] that will complete when the connection is closed or |
81 /// has an error. | 77 /// when it has an error. This is the same as [done]. |
82 /// | 78 /// |
83 /// [listen] may only be called once. | 79 /// [listen] may only be called once. |
84 Future listen() => _streams.listen(_handleRequest); | 80 Future listen() => _manager.listen(_handleRequest); |
85 | 81 |
86 /// Closes the server's request subscription and response sink. | 82 /// Closes the underlying connection. |
87 /// | 83 /// |
88 /// Returns a [Future] that completes when all resources have been released. | 84 /// Returns a [Future] that completes when all resources have been released. |
89 /// | 85 /// This is the same as [done]. |
90 /// A server can't be closed before [listen] has been called. | 86 Future close() => _manager.close(); |
91 Future close() => _streams.close(); | |
92 | 87 |
93 /// Registers a method named [name] on this server. | 88 /// Registers a method named [name] on this server. |
94 /// | 89 /// |
95 /// [callback] can take either zero or one arguments. If it takes zero, any | 90 /// [callback] can take either zero or one arguments. If it takes zero, any |
96 /// requests for that method that include parameters will be rejected. If it | 91 /// requests for that method that include parameters will be rejected. If it |
97 /// takes one, it will be passed a [Parameters] object. | 92 /// takes one, it will be passed a [Parameters] object. |
98 /// | 93 /// |
99 /// [callback] can return either a JSON-serializable object or a Future that | 94 /// [callback] can return either a JSON-serializable object or a Future that |
100 /// completes to a JSON-serializable object. Any errors in [callback] will be | 95 /// completes to a JSON-serializable object. Any errors in [callback] will be |
101 /// reported to the client as JSON-RPC 2.0 errors. | 96 /// reported to the client as JSON-RPC 2.0 errors. |
(...skipping 20 matching lines...) Expand all Loading... |
122 _fallbacks.add(callback); | 117 _fallbacks.add(callback); |
123 } | 118 } |
124 | 119 |
125 /// Handle a request. | 120 /// Handle a request. |
126 /// | 121 /// |
127 /// [request] is expected to be a JSON-serializable object representing a | 122 /// [request] is expected to be a JSON-serializable object representing a |
128 /// request sent by a client. This calls the appropriate method or methods for | 123 /// request sent by a client. This calls the appropriate method or methods for |
129 /// handling that request and returns a JSON-serializable response, or `null` | 124 /// handling that request and returns a JSON-serializable response, or `null` |
130 /// if no response should be sent. [callback] may send custom | 125 /// if no response should be sent. [callback] may send custom |
131 /// errors by throwing an [RpcException]. | 126 /// errors by throwing an [RpcException]. |
132 Future _handleRequest(request) { | 127 Future _handleRequest(request) async { |
133 return syncFuture(() { | 128 var response; |
134 if (request is! List) return _handleSingleRequest(request); | 129 if (request is! List) { |
135 if (request.isEmpty) { | 130 response = await _handleSingleRequest(request); |
136 return new RpcException(error_code.INVALID_REQUEST, 'A batch must ' | 131 if (response == null) return; |
137 'contain at least one request.').serialize(request); | 132 } else if (request.isEmpty) { |
138 } | 133 response = new RpcException( |
| 134 error_code.INVALID_REQUEST, |
| 135 'A batch must contain at least one request.') |
| 136 .serialize(request); |
| 137 } else { |
| 138 var results = await Future.wait(request.map(_handleSingleRequest)); |
| 139 var nonNull = results.where((result) => result != null); |
| 140 if (nonNull.isEmpty) return; |
| 141 response = nonNull.toList(); |
| 142 } |
139 | 143 |
140 return Future.wait(request.map(_handleSingleRequest)).then((results) { | 144 if (!isClosed) _manager.add(response); |
141 var nonNull = results.where((result) => result != null); | |
142 return nonNull.isEmpty ? null : nonNull.toList(); | |
143 }); | |
144 }).then((response) { | |
145 if (!_streams.isClosed && response != null) _streams.add(response); | |
146 }); | |
147 } | 145 } |
148 | 146 |
149 /// Handles an individual parsed request. | 147 /// Handles an individual parsed request. |
150 Future _handleSingleRequest(request) { | 148 Future _handleSingleRequest(request) { |
151 return syncFuture(() { | 149 return syncFuture(() { |
152 _validateRequest(request); | 150 _validateRequest(request); |
153 | 151 |
154 var name = request['method']; | 152 var name = request['method']; |
155 var method = _methods[name]; | 153 var method = _methods[name]; |
156 if (method == null) method = _tryFallbacks; | 154 if (method == null) method = _tryFallbacks; |
(...skipping 88 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
245 return syncFuture(() => iterator.current(params)).catchError((error) { | 243 return syncFuture(() => iterator.current(params)).catchError((error) { |
246 if (error is! RpcException) throw error; | 244 if (error is! RpcException) throw error; |
247 if (error.code != error_code.METHOD_NOT_FOUND) throw error; | 245 if (error.code != error_code.METHOD_NOT_FOUND) throw error; |
248 return _tryNext(); | 246 return _tryNext(); |
249 }); | 247 }); |
250 } | 248 } |
251 | 249 |
252 return _tryNext(); | 250 return _tryNext(); |
253 } | 251 } |
254 } | 252 } |
OLD | NEW |