| Index: pkg/json_rpc_2/lib/src/server.dart
|
| diff --git a/pkg/json_rpc_2/lib/src/server.dart b/pkg/json_rpc_2/lib/src/server.dart
|
| index d05c54f9ee85a82ae89d3c79174820727787987c..c7ece5b9503af15202177bcc022dc53e3d1347d4 100644
|
| --- a/pkg/json_rpc_2/lib/src/server.dart
|
| +++ b/pkg/json_rpc_2/lib/src/server.dart
|
| @@ -26,6 +26,20 @@ import 'utils.dart';
|
| /// asynchronously, it's possible for multiple methods to be invoked at the same
|
| /// time, or even for a single method to be invoked multiple times at once.
|
| class Server {
|
| + /// The stream for decoded requests.
|
| + final Stream _requests;
|
| +
|
| + /// The subscription to the decoded request stream.
|
| + StreamSubscription _requestSubscription;
|
| +
|
| + /// The sink for decoded responses.
|
| + final StreamSink _responses;
|
| +
|
| + /// The completer for [listen].
|
| + ///
|
| + /// This is non-`null` after [listen] has been called.
|
| + Completer _listenCompleter;
|
| +
|
| /// The methods registered for this server.
|
| final _methods = new Map<String, Function>();
|
|
|
| @@ -35,7 +49,104 @@ class Server {
|
| /// [RpcException.methodNotFound] exception.
|
| final _fallbacks = new Queue<Function>();
|
|
|
| - Server();
|
| + /// Creates a [Server] that reads requests from [requests] and writes
|
| + /// responses to [responses].
|
| + ///
|
| + /// If [requests] is a [StreamSink] as well as a [Stream] (for example, a
|
| + /// `WebSocket`), [responses] may be omitted.
|
| + ///
|
| + /// Note that the server won't begin listening to [requests] until
|
| + /// [Server.listen] is called.
|
| + factory Server(Stream<String> requests, [StreamSink<String> responses]) {
|
| + if (responses == null) {
|
| + if (requests is! StreamSink) {
|
| + throw new ArgumentError("Either `requests` must be a StreamSink or "
|
| + "`responses` must be passed.");
|
| + }
|
| + responses = requests as StreamSink;
|
| + }
|
| +
|
| + var wrappedResponses = mapStreamSink(responses, JSON.encode);
|
| + return new Server.withoutJson(requests.expand((request) {
|
| + var decodedRequest;
|
| + try {
|
| + decodedRequest = JSON.decode(request);
|
| + } on FormatException catch (error) {
|
| + wrappedResponses.add(new RpcException(error_code.PARSE_ERROR,
|
| + 'Invalid JSON: ${error.message}').serialize(request));
|
| + return [];
|
| + }
|
| +
|
| + return [decodedRequest];
|
| + }), wrappedResponses);
|
| + }
|
| +
|
| + /// Creates a [Server] that reads decoded requests from [requests] and writes
|
| + /// decoded responses to [responses].
|
| + ///
|
| + /// Unlike [new Server], this doesn't read or write JSON strings. Instead, it
|
| + /// reads and writes decoded maps or lists.
|
| + ///
|
| + /// If [requests] is a [StreamSink] as well as a [Stream], [responses] may be
|
| + /// omitted.
|
| + ///
|
| + /// Note that the server won't begin listening to [requests] until
|
| + /// [Server.listen] is called.
|
| + Server.withoutJson(Stream requests, [StreamSink responses])
|
| + : _requests = requests,
|
| + _responses = responses == null && requests is StreamSink ?
|
| + requests : responses {
|
| + if (_responses == null) {
|
| + throw new ArgumentError("Either `requests` must be a StreamSink or "
|
| + "`responses` must be passed.");
|
| + }
|
| + }
|
| +
|
| + /// Starts listening to the underlying stream.
|
| + ///
|
| + /// Returns a [Future] that will complete when the stream is closed or when it
|
| + /// has an error.
|
| + ///
|
| + /// [listen] may only be called once.
|
| + Future listen() {
|
| + if (_listenCompleter != null) {
|
| + throw new StateError(
|
| + "Can only call Server.listen once on a given server.");
|
| + }
|
| +
|
| + _listenCompleter = new Completer();
|
| + _requestSubscription = _requests.listen(_handleRequest,
|
| + onError: (error, stackTrace) {
|
| + if (_listenCompleter.isCompleted) return;
|
| + _responses.close();
|
| + _listenCompleter.completeError(error, stackTrace);
|
| + }, onDone: () {
|
| + if (_listenCompleter.isCompleted) return;
|
| + _responses.close();
|
| + _listenCompleter.complete();
|
| + }, cancelOnError: true);
|
| +
|
| + return _listenCompleter.future;
|
| + }
|
| +
|
| + /// Closes the server's request subscription and response sink.
|
| + ///
|
| + /// Returns a [Future] that completes when all resources have been released.
|
| + ///
|
| + /// A server can't be closed before [listen] has been called.
|
| + Future close() {
|
| + if (_listenCompleter == null) {
|
| + throw new StateError("Can't call Server.close before Server.listen.");
|
| + }
|
| +
|
| + if (!_listenCompleter.isCompleted) _listenCompleter.complete();
|
| +
|
| + var subscriptionFuture = _requestSubscription.cancel();
|
| + // TODO(nweiz): include the response future in the return value when issue
|
| + // 19095 is fixed.
|
| + _responses.close();
|
| + return subscriptionFuture == null ? new Future.value() : subscriptionFuture;
|
| + }
|
|
|
| /// Registers a method named [name] on this server.
|
| ///
|
| @@ -69,14 +180,14 @@ class Server {
|
| _fallbacks.add(callback);
|
| }
|
|
|
| - /// Handle a request that's already been parsed from JSON.
|
| + /// Handle a request.
|
| ///
|
| /// [request] is expected to be a JSON-serializable object representing a
|
| /// request sent by a client. This calls the appropriate method or methods for
|
| /// handling that request and returns a JSON-serializable response, or `null`
|
| /// if no response should be sent. [callback] may send custom
|
| /// errors by throwing an [RpcException].
|
| - Future handleRequest(request) {
|
| + Future _handleRequest(request) {
|
| return syncFuture(() {
|
| if (request is! List) return _handleSingleRequest(request);
|
| if (request.isEmpty) {
|
| @@ -88,28 +199,7 @@ class Server {
|
| var nonNull = results.where((result) => result != null);
|
| return nonNull.isEmpty ? null : nonNull.toList();
|
| });
|
| - });
|
| - }
|
| -
|
| - /// Parses and handles a JSON serialized request.
|
| - ///
|
| - /// This calls the appropriate method or methods for handling that request and
|
| - /// returns a JSON string, or `null` if no response should be sent.
|
| - Future<String> parseRequest(String request) {
|
| - return syncFuture(() {
|
| - var decodedRequest;
|
| - try {
|
| - decodedRequest = JSON.decode(request);
|
| - } on FormatException catch (error) {
|
| - return new RpcException(error_code.PARSE_ERROR, 'Invalid JSON: '
|
| - '${error.message}').serialize(request);
|
| - }
|
| -
|
| - return handleRequest(decodedRequest);
|
| - }).then((response) {
|
| - if (response == null) return null;
|
| - return JSON.encode(response);
|
| - });
|
| + }).then(_responses.add);
|
| }
|
|
|
| /// Handles an individual parsed request.
|
|
|