| OLD | NEW |
| 1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 import 'dart:async'; | 5 import 'dart:async'; |
| 6 import 'dart:collection'; | 6 import 'dart:collection'; |
| 7 import 'dart:convert'; | 7 import 'dart:convert'; |
| 8 | 8 |
| 9 import 'package:stack_trace/stack_trace.dart'; | 9 import 'package:stack_trace/stack_trace.dart'; |
| 10 import 'package:stream_channel/stream_channel.dart'; |
| 10 | 11 |
| 11 import '../error_code.dart' as error_code; | 12 import '../error_code.dart' as error_code; |
| 13 import 'channel_manager.dart'; |
| 12 import 'exception.dart'; | 14 import 'exception.dart'; |
| 13 import 'parameters.dart'; | 15 import 'parameters.dart'; |
| 14 import 'two_way_stream.dart'; | |
| 15 import 'utils.dart'; | 16 import 'utils.dart'; |
| 16 | 17 |
| 17 /// A JSON-RPC 2.0 server. | 18 /// A JSON-RPC 2.0 server. |
| 18 /// | 19 /// |
| 19 /// A server exposes methods that are called by requests, to which it provides | 20 /// A server exposes methods that are called by requests, to which it provides |
| 20 /// responses. Methods can be registered using [registerMethod] and | 21 /// responses. Methods can be registered using [registerMethod] and |
| 21 /// [registerFallback]. Requests can be handled using [handleRequest] and | 22 /// [registerFallback]. Requests can be handled using [handleRequest] and |
| 22 /// [parseRequest]. | 23 /// [parseRequest]. |
| 23 /// | 24 /// |
| 24 /// Note that since requests can arrive asynchronously and methods can run | 25 /// Note that since requests can arrive asynchronously and methods can run |
| 25 /// asynchronously, it's possible for multiple methods to be invoked at the same | 26 /// asynchronously, it's possible for multiple methods to be invoked at the same |
| 26 /// time, or even for a single method to be invoked multiple times at once. | 27 /// time, or even for a single method to be invoked multiple times at once. |
| 27 class Server { | 28 class Server { |
| 28 TwoWayStream _streams; | 29 final ChannelManager _manager; |
| 29 | 30 |
| 30 /// The methods registered for this server. | 31 /// The methods registered for this server. |
| 31 final _methods = new Map<String, Function>(); | 32 final _methods = new Map<String, Function>(); |
| 32 | 33 |
| 33 /// The fallback methods for this server. | 34 /// The fallback methods for this server. |
| 34 /// | 35 /// |
| 35 /// These are tried in order until one of them doesn't throw a | 36 /// These are tried in order until one of them doesn't throw a |
| 36 /// [RpcException.methodNotFound] exception. | 37 /// [RpcException.methodNotFound] exception. |
| 37 final _fallbacks = new Queue<Function>(); | 38 final _fallbacks = new Queue<Function>(); |
| 38 | 39 |
| 39 /// Returns a [Future] that completes when the connection is closed. | 40 /// Returns a [Future] that completes when the underlying connection is |
| 41 /// closed. |
| 40 /// | 42 /// |
| 41 /// This is the same future that's returned by [listen]. | 43 /// This is the same future that's returned by [listen] and [close]. It may |
| 42 Future get done => _streams.done; | 44 /// complete before [close] is called if the remote endpoint closes the |
| 45 /// connection. |
| 46 Future get done => _manager.done; |
| 43 | 47 |
| 44 /// Whether the connection is closed. | 48 /// Whether the underlying connection is closed. |
| 45 bool get isClosed => _streams.isClosed; | 49 /// |
| 50 /// Note that this will be `true` before [close] is called if the remote |
| 51 /// endpoint closes the connection. |
| 52 bool get isClosed => _manager.isClosed; |
| 46 | 53 |
| 47 /// Creates a [Server] that reads requests from [requests] and writes | 54 /// Creates a [Server] that communicates over [channel]. |
| 48 /// responses to [responses]. | |
| 49 /// | |
| 50 /// If [requests] is a [StreamSink] as well as a [Stream] (for example, a | |
| 51 /// `WebSocket`), [responses] may be omitted. | |
| 52 /// | 55 /// |
| 53 /// Note that the server won't begin listening to [requests] until | 56 /// Note that the server won't begin listening to [requests] until |
| 54 /// [Server.listen] is called. | 57 /// [Server.listen] is called. |
| 55 Server(Stream<String> requests, [StreamSink<String> responses]) { | 58 Server(StreamChannel<String> channel) |
| 56 _streams = new TwoWayStream("Server", requests, "requests", | 59 : this.withoutJson(channel |
| 57 responses, "responses", onInvalidInput: (message, error) { | 60 .transform(jsonDocument) |
| 58 _streams.add(new RpcException(error_code.PARSE_ERROR, | 61 .transform(respondToFormatExceptions)); |
| 59 'Invalid JSON: ${error.message}').serialize(message)); | |
| 60 }); | |
| 61 } | |
| 62 | 62 |
| 63 /// Creates a [Server] that reads decoded requests from [requests] and writes | 63 /// Creates a [Server] that communicates using decoded messages over |
| 64 /// decoded responses to [responses]. | 64 /// [channel]. |
| 65 /// | 65 /// |
| 66 /// Unlike [new Server], this doesn't read or write JSON strings. Instead, it | 66 /// Unlike [new Server], this doesn't read or write JSON strings. Instead, it |
| 67 /// reads and writes decoded maps or lists. | 67 /// reads and writes decoded maps or lists. |
| 68 /// | 68 /// |
| 69 /// If [requests] is a [StreamSink] as well as a [Stream], [responses] may be | |
| 70 /// omitted. | |
| 71 /// | |
| 72 /// Note that the server won't begin listening to [requests] until | 69 /// Note that the server won't begin listening to [requests] until |
| 73 /// [Server.listen] is called. | 70 /// [Server.listen] is called. |
| 74 Server.withoutJson(Stream requests, [StreamSink responses]) | 71 Server.withoutJson(StreamChannel channel) |
| 75 : _streams = new TwoWayStream.withoutJson( | 72 : _manager = new ChannelManager("Server", channel); |
| 76 "Server", requests, "requests", responses, "responses"); | |
| 77 | 73 |
| 78 /// Starts listening to the underlying stream. | 74 /// Starts listening to the underlying stream. |
| 79 /// | 75 /// |
| 80 /// Returns a [Future] that will complete when the stream is closed or when it | 76 /// Returns a [Future] that will complete when the connection is closed or |
| 81 /// has an error. | 77 /// when it has an error. This is the same as [done]. |
| 82 /// | 78 /// |
| 83 /// [listen] may only be called once. | 79 /// [listen] may only be called once. |
| 84 Future listen() => _streams.listen(_handleRequest); | 80 Future listen() => _manager.listen(_handleRequest); |
| 85 | 81 |
| 86 /// Closes the server's request subscription and response sink. | 82 /// Closes the underlying connection. |
| 87 /// | 83 /// |
| 88 /// Returns a [Future] that completes when all resources have been released. | 84 /// Returns a [Future] that completes when all resources have been released. |
| 89 /// | 85 /// This is the same as [done]. |
| 90 /// A server can't be closed before [listen] has been called. | 86 Future close() => _manager.close(); |
| 91 Future close() => _streams.close(); | |
| 92 | 87 |
| 93 /// Registers a method named [name] on this server. | 88 /// Registers a method named [name] on this server. |
| 94 /// | 89 /// |
| 95 /// [callback] can take either zero or one arguments. If it takes zero, any | 90 /// [callback] can take either zero or one arguments. If it takes zero, any |
| 96 /// requests for that method that include parameters will be rejected. If it | 91 /// requests for that method that include parameters will be rejected. If it |
| 97 /// takes one, it will be passed a [Parameters] object. | 92 /// takes one, it will be passed a [Parameters] object. |
| 98 /// | 93 /// |
| 99 /// [callback] can return either a JSON-serializable object or a Future that | 94 /// [callback] can return either a JSON-serializable object or a Future that |
| 100 /// completes to a JSON-serializable object. Any errors in [callback] will be | 95 /// completes to a JSON-serializable object. Any errors in [callback] will be |
| 101 /// reported to the client as JSON-RPC 2.0 errors. | 96 /// reported to the client as JSON-RPC 2.0 errors. |
| (...skipping 20 matching lines...) Expand all Loading... |
| 122 _fallbacks.add(callback); | 117 _fallbacks.add(callback); |
| 123 } | 118 } |
| 124 | 119 |
| 125 /// Handle a request. | 120 /// Handle a request. |
| 126 /// | 121 /// |
| 127 /// [request] is expected to be a JSON-serializable object representing a | 122 /// [request] is expected to be a JSON-serializable object representing a |
| 128 /// request sent by a client. This calls the appropriate method or methods for | 123 /// request sent by a client. This calls the appropriate method or methods for |
| 129 /// handling that request and returns a JSON-serializable response, or `null` | 124 /// handling that request and returns a JSON-serializable response, or `null` |
| 130 /// if no response should be sent. [callback] may send custom | 125 /// if no response should be sent. [callback] may send custom |
| 131 /// errors by throwing an [RpcException]. | 126 /// errors by throwing an [RpcException]. |
| 132 Future _handleRequest(request) { | 127 Future _handleRequest(request) async { |
| 133 return syncFuture(() { | 128 var response; |
| 134 if (request is! List) return _handleSingleRequest(request); | 129 if (request is! List) { |
| 135 if (request.isEmpty) { | 130 response = await _handleSingleRequest(request); |
| 136 return new RpcException(error_code.INVALID_REQUEST, 'A batch must ' | 131 if (response == null) return; |
| 137 'contain at least one request.').serialize(request); | 132 } else if (request.isEmpty) { |
| 138 } | 133 response = new RpcException( |
| 134 error_code.INVALID_REQUEST, |
| 135 'A batch must contain at least one request.') |
| 136 .serialize(request); |
| 137 } else { |
| 138 var results = await Future.wait(request.map(_handleSingleRequest)); |
| 139 var nonNull = results.where((result) => result != null); |
| 140 if (nonNull.isEmpty) return; |
| 141 response = nonNull.toList(); |
| 142 } |
| 139 | 143 |
| 140 return Future.wait(request.map(_handleSingleRequest)).then((results) { | 144 if (!isClosed) _manager.add(response); |
| 141 var nonNull = results.where((result) => result != null); | |
| 142 return nonNull.isEmpty ? null : nonNull.toList(); | |
| 143 }); | |
| 144 }).then((response) { | |
| 145 if (!_streams.isClosed && response != null) _streams.add(response); | |
| 146 }); | |
| 147 } | 145 } |
| 148 | 146 |
| 149 /// Handles an individual parsed request. | 147 /// Handles an individual parsed request. |
| 150 Future _handleSingleRequest(request) { | 148 Future _handleSingleRequest(request) { |
| 151 return syncFuture(() { | 149 return syncFuture(() { |
| 152 _validateRequest(request); | 150 _validateRequest(request); |
| 153 | 151 |
| 154 var name = request['method']; | 152 var name = request['method']; |
| 155 var method = _methods[name]; | 153 var method = _methods[name]; |
| 156 if (method == null) method = _tryFallbacks; | 154 if (method == null) method = _tryFallbacks; |
| (...skipping 88 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 245 return syncFuture(() => iterator.current(params)).catchError((error) { | 243 return syncFuture(() => iterator.current(params)).catchError((error) { |
| 246 if (error is! RpcException) throw error; | 244 if (error is! RpcException) throw error; |
| 247 if (error.code != error_code.METHOD_NOT_FOUND) throw error; | 245 if (error.code != error_code.METHOD_NOT_FOUND) throw error; |
| 248 return _tryNext(); | 246 return _tryNext(); |
| 249 }); | 247 }); |
| 250 } | 248 } |
| 251 | 249 |
| 252 return _tryNext(); | 250 return _tryNext(); |
| 253 } | 251 } |
| 254 } | 252 } |
| OLD | NEW |