Index: pkg/json_rpc_2/lib/src/server.dart |
diff --git a/pkg/json_rpc_2/lib/src/server.dart b/pkg/json_rpc_2/lib/src/server.dart |
index d05c54f9ee85a82ae89d3c79174820727787987c..c7ece5b9503af15202177bcc022dc53e3d1347d4 100644 |
--- a/pkg/json_rpc_2/lib/src/server.dart |
+++ b/pkg/json_rpc_2/lib/src/server.dart |
@@ -26,6 +26,20 @@ import 'utils.dart'; |
/// asynchronously, it's possible for multiple methods to be invoked at the same |
/// time, or even for a single method to be invoked multiple times at once. |
class Server { |
+ /// The stream for decoded requests. |
+ final Stream _requests; |
+ |
+ /// The subscription to the decoded request stream. |
+ StreamSubscription _requestSubscription; |
+ |
+ /// The sink for decoded responses. |
+ final StreamSink _responses; |
+ |
+ /// The completer for [listen]. |
+ /// |
+ /// This is non-`null` after [listen] has been called. |
+ Completer _listenCompleter; |
+ |
/// The methods registered for this server. |
final _methods = new Map<String, Function>(); |
@@ -35,7 +49,104 @@ class Server { |
/// [RpcException.methodNotFound] exception. |
final _fallbacks = new Queue<Function>(); |
- Server(); |
+ /// Creates a [Server] that reads requests from [requests] and writes |
+ /// responses to [responses]. |
+ /// |
+ /// If [requests] is a [StreamSink] as well as a [Stream] (for example, a |
+ /// `WebSocket`), [responses] may be omitted. |
+ /// |
+ /// Note that the server won't begin listening to [requests] until |
+ /// [Server.listen] is called. |
+ factory Server(Stream<String> requests, [StreamSink<String> responses]) { |
+ if (responses == null) { |
+ if (requests is! StreamSink) { |
+ throw new ArgumentError("Either `requests` must be a StreamSink or " |
+ "`responses` must be passed."); |
+ } |
+ responses = requests as StreamSink; |
+ } |
+ |
+ var wrappedResponses = mapStreamSink(responses, JSON.encode); |
+ return new Server.withoutJson(requests.expand((request) { |
+ var decodedRequest; |
+ try { |
+ decodedRequest = JSON.decode(request); |
+ } on FormatException catch (error) { |
+ wrappedResponses.add(new RpcException(error_code.PARSE_ERROR, |
+ 'Invalid JSON: ${error.message}').serialize(request)); |
+ return []; |
+ } |
+ |
+ return [decodedRequest]; |
+ }), wrappedResponses); |
+ } |
+ |
+ /// Creates a [Server] that reads decoded requests from [requests] and writes |
+ /// decoded responses to [responses]. |
+ /// |
+ /// Unlike [new Server], this doesn't read or write JSON strings. Instead, it |
+ /// reads and writes decoded maps or lists. |
+ /// |
+ /// If [requests] is a [StreamSink] as well as a [Stream], [responses] may be |
+ /// omitted. |
+ /// |
+ /// Note that the server won't begin listening to [requests] until |
+ /// [Server.listen] is called. |
+ Server.withoutJson(Stream requests, [StreamSink responses]) |
+ : _requests = requests, |
+ _responses = responses == null && requests is StreamSink ? |
+ requests : responses { |
+ if (_responses == null) { |
+ throw new ArgumentError("Either `requests` must be a StreamSink or " |
+ "`responses` must be passed."); |
+ } |
+ } |
+ |
+ /// Starts listening to the underlying stream. |
+ /// |
+ /// Returns a [Future] that will complete when the stream is closed or when it |
+ /// has an error. |
+ /// |
+ /// [listen] may only be called once. |
+ Future listen() { |
+ if (_listenCompleter != null) { |
+ throw new StateError( |
+ "Can only call Server.listen once on a given server."); |
+ } |
+ |
+ _listenCompleter = new Completer(); |
+ _requestSubscription = _requests.listen(_handleRequest, |
+ onError: (error, stackTrace) { |
+ if (_listenCompleter.isCompleted) return; |
+ _responses.close(); |
+ _listenCompleter.completeError(error, stackTrace); |
+ }, onDone: () { |
+ if (_listenCompleter.isCompleted) return; |
+ _responses.close(); |
+ _listenCompleter.complete(); |
+ }, cancelOnError: true); |
+ |
+ return _listenCompleter.future; |
+ } |
+ |
+ /// Closes the server's request subscription and response sink. |
+ /// |
+ /// Returns a [Future] that completes when all resources have been released. |
+ /// |
+ /// A server can't be closed before [listen] has been called. |
+ Future close() { |
+ if (_listenCompleter == null) { |
+ throw new StateError("Can't call Server.close before Server.listen."); |
+ } |
+ |
+ if (!_listenCompleter.isCompleted) _listenCompleter.complete(); |
+ |
+ var subscriptionFuture = _requestSubscription.cancel(); |
+ // TODO(nweiz): include the response future in the return value when issue |
+ // 19095 is fixed. |
+ _responses.close(); |
+ return subscriptionFuture == null ? new Future.value() : subscriptionFuture; |
+ } |
/// Registers a method named [name] on this server. |
/// |
@@ -69,14 +180,14 @@ class Server { |
_fallbacks.add(callback); |
} |
- /// Handle a request that's already been parsed from JSON. |
+ /// Handle a request. |
/// |
/// [request] is expected to be a JSON-serializable object representing a |
/// request sent by a client. This calls the appropriate method or methods for |
/// handling that request and returns a JSON-serializable response, or `null` |
/// if no response should be sent. [callback] may send custom |
/// errors by throwing an [RpcException]. |
- Future handleRequest(request) { |
+ Future _handleRequest(request) { |
return syncFuture(() { |
if (request is! List) return _handleSingleRequest(request); |
if (request.isEmpty) { |
@@ -88,28 +199,7 @@ class Server { |
var nonNull = results.where((result) => result != null); |
return nonNull.isEmpty ? null : nonNull.toList(); |
}); |
- }); |
- } |
- |
- /// Parses and handles a JSON serialized request. |
- /// |
- /// This calls the appropriate method or methods for handling that request and |
- /// returns a JSON string, or `null` if no response should be sent. |
- Future<String> parseRequest(String request) { |
- return syncFuture(() { |
- var decodedRequest; |
- try { |
- decodedRequest = JSON.decode(request); |
- } on FormatException catch (error) { |
- return new RpcException(error_code.PARSE_ERROR, 'Invalid JSON: ' |
- '${error.message}').serialize(request); |
- } |
- |
- return handleRequest(decodedRequest); |
- }).then((response) { |
- if (response == null) return null; |
- return JSON.encode(response); |
- }); |
+ }).then(_responses.add); |
} |
/// Handles an individual parsed request. |