Chromium Code Reviews| Index: pkg/json_rpc_2/lib/src/server.dart |
| diff --git a/pkg/json_rpc_2/lib/src/server.dart b/pkg/json_rpc_2/lib/src/server.dart |
| index d05c54f9ee85a82ae89d3c79174820727787987c..20626bed12ac16d9a3c75025fa49c5ebc164b44c 100644 |
| --- a/pkg/json_rpc_2/lib/src/server.dart |
| +++ b/pkg/json_rpc_2/lib/src/server.dart |
| @@ -26,6 +26,20 @@ import 'utils.dart'; |
| /// asynchronously, it's possible for multiple methods to be invoked at the same |
| /// time, or even for a single method to be invoked multiple times at once. |
| class Server { |
| + /// The stream for decoded requests. |
| + final Stream _requests; |
| + |
| + /// The subscription to the decoded request stream. |
| + StreamSubscription _requestSubscription; |
| + |
| + /// The sink for decoded responses. |
| + final StreamSink _responses; |
| + |
| + /// The completer for [listen]. |
| + /// |
| + /// This is non-`null` after [listen] has been called. |
| + Completer _listenCompleter; |
| + |
| /// The methods registered for this server. |
| final _methods = new Map<String, Function>(); |
| @@ -35,7 +49,107 @@ class Server { |
| /// [RpcException.methodNotFound] exception. |
| final _fallbacks = new Queue<Function>(); |
| - Server(); |
| + /// Creates a [Server] that reads requests from [requests] and writes |
| + /// responses to [responses]. |
| + /// |
| + /// If [requests] is a [StreamSink] as well as a [Stream] (for example, a |
| + /// `WebSocket`), [responses] may be omitted. |
| + /// |
| + /// Note that the server won't begin listening to [requests] until |
| + /// [Server.listen] is called. |
| + factory Server(Stream<String> requests, [StreamSink<String> responses]) { |
| + if (responses == null) { |
| + if (requests is! StreamSink) { |
| + throw new ArgumentError("Either `requests` must be a StreamSink or " |
| + "`responses` must be passed."); |
| + } |
| + responses = requests as StreamSink; |
| + } |
| + |
| + var wrappedResponses = mapStreamSink(responses, JSON.encode); |
| + return new Server.withoutJson(requests.expand((request) { |
| + var decodedRequest; |
| + try { |
| + decodedRequest = JSON.decode(request); |
| + } on FormatException catch (error) { |
| + wrappedResponses.add(new RpcException(error_code.PARSE_ERROR, |
| + 'Invalid JSON: ${error.message}').serialize(request)); |
| + return []; |
| + } |
| + |
| + return [decodedRequest]; |
| + }), wrappedResponses); |
| + } |
| + |
| + /// Creates a [Server] that reads decoded requests from [requests] and writes |
| + /// decoded responses to [responses]. |
| + /// |
| + /// Unlike [new Server], this doesn't read or write JSON strings. Instead, it |
| + /// reads and writes decoded maps or lists. |
| + /// |
| + /// If [requests] is a [StreamSink] as well as a [Stream] (for example, a |
| + /// `WebSocket`), [responses] may be omitted. |
|
Bob Nystrom
2014/06/03 22:26:13
WebSocket is no longer a good example here since i
nweiz
2014/06/03 23:37:00
What do you mean? WebSocket sends and receives str
Bob Nystrom
2014/06/04 16:40:48
Right, but this method sends and receives decoded
nweiz
2014/06/04 20:33:51
Oh, gotcha. Changed.
|
| + /// |
| + /// Note that the server won't begin listening to [requests] until |
| + /// [Server.listen] is called. |
| + Server.withoutJson(Stream requests, [StreamSink responses]) |
| + : _requests = requests, |
| + _responses = responses == null && requests is StreamSink ? |
| + requests : responses { |
| + if (_responses == null) { |
| + throw new ArgumentError("Either `requests` must be a StreamSink or " |
| + "`responses` must be passed."); |
| + } |
| + } |
| + |
| + /// Starts listening to the underlying stream. |
| + /// |
| + /// Returns a [Future] that will complete when the stream is closed of when it |
|
Bob Nystrom
2014/06/03 22:26:13
"of" -> "or".
nweiz
2014/06/03 23:37:00
Done.
|
| + /// has an error. |
| + /// |
| + /// [listen] may only be called once. |
| + Future listen() { |
| + if (_listenCompleter != null) { |
| + return new Future.error(new StateError( |
| + "Can only call Server.listen once on a given server."), |
| + new Chain.current()); |
|
Bob Nystrom
2014/06/03 22:26:13
For programmatic errors like StateError, I think i
nweiz
2014/06/03 23:37:00
Done.
|
| + } |
| + |
| + _listenCompleter = new Completer(); |
| + _requestSubscription = _requests.listen(_handleRequest, |
| + onError: (error, stackTrace) { |
| + if (_listenCompleter.isCompleted) return; |
| + _responses.close(); |
| + _listenCompleter.completeError(error, stackTrace); |
| + }, onDone: () { |
| + if (_listenCompleter.isCompleted) return; |
| + _responses.close(); |
| + _listenCompleter.complete(); |
| + }, cancelOnError: true); |
| + |
| + return _listenCompleter.future; |
| + } |
| + |
| + /// Closes the server's request subscription and response sink. |
| + /// |
| + /// Returns a [Future] that completes when all resources have been released. |
| + /// |
| + /// A server can't be closed before [listen] has been called. |
| + Future close() { |
| + if (_listenCompleter == null) { |
| + return new Future.error(new StateError( |
| + "Can't call Server.close before Server.listen."), |
| + new Chain.current()); |
| + } |
| + |
| + if (!_listenCompleter.isCompleted) _listenCompleter.complete(); |
| + |
| + var subscriptionFuture = _requestSubscription.cancel(); |
| + // TODO(nweiz): include the response future in the return value when issue |
| + // 19095 is fixed. |
| + _responses.close(); |
| + return subscriptionFuture == null ? new Future.value() : subscriptionFuture; |
| + } |
| /// Registers a method named [name] on this server. |
| /// |
| @@ -69,14 +183,14 @@ class Server { |
| _fallbacks.add(callback); |
| } |
| - /// Handle a request that's already been parsed from JSON. |
| + /// Handle a request. |
| /// |
| /// [request] is expected to be a JSON-serializable object representing a |
| /// request sent by a client. This calls the appropriate method or methods for |
| /// handling that request and returns a JSON-serializable response, or `null` |
| /// if no response should be sent. [callback] may send custom |
| /// errors by throwing an [RpcException]. |
| - Future handleRequest(request) { |
| + Future _handleRequest(request) { |
| return syncFuture(() { |
| if (request is! List) return _handleSingleRequest(request); |
| if (request.isEmpty) { |
| @@ -88,28 +202,7 @@ class Server { |
| var nonNull = results.where((result) => result != null); |
| return nonNull.isEmpty ? null : nonNull.toList(); |
| }); |
| - }); |
| - } |
| - |
| - /// Parses and handles a JSON serialized request. |
| - /// |
| - /// This calls the appropriate method or methods for handling that request and |
| - /// returns a JSON string, or `null` if no response should be sent. |
| - Future<String> parseRequest(String request) { |
| - return syncFuture(() { |
| - var decodedRequest; |
| - try { |
| - decodedRequest = JSON.decode(request); |
| - } on FormatException catch (error) { |
| - return new RpcException(error_code.PARSE_ERROR, 'Invalid JSON: ' |
| - '${error.message}').serialize(request); |
| - } |
| - |
| - return handleRequest(decodedRequest); |
| - }).then((response) { |
| - if (response == null) return null; |
| - return JSON.encode(response); |
| - }); |
| + }).then(_responses.add); |
| } |
| /// Handles an individual parsed request. |