Index: pkg/json_rpc_2/lib/src/server.dart |
diff --git a/pkg/json_rpc_2/lib/src/server.dart b/pkg/json_rpc_2/lib/src/server.dart |
index d05c54f9ee85a82ae89d3c79174820727787987c..20626bed12ac16d9a3c75025fa49c5ebc164b44c 100644 |
--- a/pkg/json_rpc_2/lib/src/server.dart |
+++ b/pkg/json_rpc_2/lib/src/server.dart |
@@ -26,6 +26,20 @@ import 'utils.dart'; |
/// asynchronously, it's possible for multiple methods to be invoked at the same |
/// time, or even for a single method to be invoked multiple times at once. |
class Server { |
+ /// The stream for decoded requests. |
+ final Stream _requests; |
+ |
+ /// The subscription to the decoded request stream. |
+ StreamSubscription _requestSubscription; |
+ |
+ /// The sink for decoded responses. |
+ final StreamSink _responses; |
+ |
+ /// The completer for [listen]. |
+ /// |
+ /// This is non-`null` after [listen] has been called. |
+ Completer _listenCompleter; |
+ |
/// The methods registered for this server. |
final _methods = new Map<String, Function>(); |
@@ -35,7 +49,107 @@ class Server { |
/// [RpcException.methodNotFound] exception. |
final _fallbacks = new Queue<Function>(); |
- Server(); |
+ /// Creates a [Server] that reads requests from [requests] and writes |
+ /// responses to [responses]. |
+ /// |
+ /// If [requests] is a [StreamSink] as well as a [Stream] (for example, a |
+ /// `WebSocket`), [responses] may be omitted. |
+ /// |
+ /// Note that the server won't begin listening to [requests] until |
+ /// [Server.listen] is called. |
+ factory Server(Stream<String> requests, [StreamSink<String> responses]) { |
+ if (responses == null) { |
+ if (requests is! StreamSink) { |
+ throw new ArgumentError("Either `requests` must be a StreamSink or " |
+ "`responses` must be passed."); |
+ } |
+ responses = requests as StreamSink; |
+ } |
+ |
+ var wrappedResponses = mapStreamSink(responses, JSON.encode); |
+ return new Server.withoutJson(requests.expand((request) { |
+ var decodedRequest; |
+ try { |
+ decodedRequest = JSON.decode(request); |
+ } on FormatException catch (error) { |
+ wrappedResponses.add(new RpcException(error_code.PARSE_ERROR, |
+ 'Invalid JSON: ${error.message}').serialize(request)); |
+ return []; |
+ } |
+ |
+ return [decodedRequest]; |
+ }), wrappedResponses); |
+ } |
+ |
+ /// Creates a [Server] that reads decoded requests from [requests] and writes |
+ /// decoded responses to [responses]. |
+ /// |
+ /// Unlike [new Server], this doesn't read or write JSON strings. Instead, it |
+ /// reads and writes decoded maps or lists. |
+ /// |
+ /// If [requests] is a [StreamSink] as well as a [Stream] (for example, a |
+ /// `WebSocket`), [responses] may be omitted. |
Bob Nystrom
2014/06/03 22:26:13
WebSocket is no longer a good example here since i
nweiz
2014/06/03 23:37:00
What do you mean? WebSocket sends and receives str
Bob Nystrom
2014/06/04 16:40:48
Right, but this method sends and receives decoded
nweiz
2014/06/04 20:33:51
Oh, gotcha. Changed.
|
+ /// |
+ /// Note that the server won't begin listening to [requests] until |
+ /// [Server.listen] is called. |
+ Server.withoutJson(Stream requests, [StreamSink responses]) |
+ : _requests = requests, |
+ _responses = responses == null && requests is StreamSink ? |
+ requests : responses { |
+ if (_responses == null) { |
+ throw new ArgumentError("Either `requests` must be a StreamSink or " |
+ "`responses` must be passed."); |
+ } |
+ } |
+ |
+ /// Starts listening to the underlying stream. |
+ /// |
+ /// Returns a [Future] that will complete when the stream is closed of when it |
Bob Nystrom
2014/06/03 22:26:13
"of" -> "or".
nweiz
2014/06/03 23:37:00
Done.
|
+ /// has an error. |
+ /// |
+ /// [listen] may only be called once. |
+ Future listen() { |
+ if (_listenCompleter != null) { |
+ return new Future.error(new StateError( |
+ "Can only call Server.listen once on a given server."), |
+ new Chain.current()); |
Bob Nystrom
2014/06/03 22:26:13
For programmatic errors like StateError, I think i
nweiz
2014/06/03 23:37:00
Done.
|
+ } |
+ |
+ _listenCompleter = new Completer(); |
+ _requestSubscription = _requests.listen(_handleRequest, |
+ onError: (error, stackTrace) { |
+ if (_listenCompleter.isCompleted) return; |
+ _responses.close(); |
+ _listenCompleter.completeError(error, stackTrace); |
+ }, onDone: () { |
+ if (_listenCompleter.isCompleted) return; |
+ _responses.close(); |
+ _listenCompleter.complete(); |
+ }, cancelOnError: true); |
+ |
+ return _listenCompleter.future; |
+ } |
+ |
+ /// Closes the server's request subscription and response sink. |
+ /// |
+ /// Returns a [Future] that completes when all resources have been released. |
+ /// |
+ /// A server can't be closed before [listen] has been called. |
+ Future close() { |
+ if (_listenCompleter == null) { |
+ return new Future.error(new StateError( |
+ "Can't call Server.close before Server.listen."), |
+ new Chain.current()); |
+ } |
+ |
+ if (!_listenCompleter.isCompleted) _listenCompleter.complete(); |
+ |
+ var subscriptionFuture = _requestSubscription.cancel(); |
+ // TODO(nweiz): include the response future in the return value when issue |
+ // 19095 is fixed. |
+ _responses.close(); |
+ return subscriptionFuture == null ? new Future.value() : subscriptionFuture; |
+ } |
/// Registers a method named [name] on this server. |
/// |
@@ -69,14 +183,14 @@ class Server { |
_fallbacks.add(callback); |
} |
- /// Handle a request that's already been parsed from JSON. |
+ /// Handle a request. |
/// |
/// [request] is expected to be a JSON-serializable object representing a |
/// request sent by a client. This calls the appropriate method or methods for |
/// handling that request and returns a JSON-serializable response, or `null` |
/// if no response should be sent. [callback] may send custom |
/// errors by throwing an [RpcException]. |
- Future handleRequest(request) { |
+ Future _handleRequest(request) { |
return syncFuture(() { |
if (request is! List) return _handleSingleRequest(request); |
if (request.isEmpty) { |
@@ -88,28 +202,7 @@ class Server { |
var nonNull = results.where((result) => result != null); |
return nonNull.isEmpty ? null : nonNull.toList(); |
}); |
- }); |
- } |
- |
- /// Parses and handles a JSON serialized request. |
- /// |
- /// This calls the appropriate method or methods for handling that request and |
- /// returns a JSON string, or `null` if no response should be sent. |
- Future<String> parseRequest(String request) { |
- return syncFuture(() { |
- var decodedRequest; |
- try { |
- decodedRequest = JSON.decode(request); |
- } on FormatException catch (error) { |
- return new RpcException(error_code.PARSE_ERROR, 'Invalid JSON: ' |
- '${error.message}').serialize(request); |
- } |
- |
- return handleRequest(decodedRequest); |
- }).then((response) { |
- if (response == null) return null; |
- return JSON.encode(response); |
- }); |
+ }).then(_responses.add); |
} |
/// Handles an individual parsed request. |