Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(560)

Unified Diff: pkg/json_rpc_2/lib/src/server.dart

Issue 309503005: Convert json_rpc.Server to take a Stream and StreamSink. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 6 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: pkg/json_rpc_2/lib/src/server.dart
diff --git a/pkg/json_rpc_2/lib/src/server.dart b/pkg/json_rpc_2/lib/src/server.dart
index d05c54f9ee85a82ae89d3c79174820727787987c..20626bed12ac16d9a3c75025fa49c5ebc164b44c 100644
--- a/pkg/json_rpc_2/lib/src/server.dart
+++ b/pkg/json_rpc_2/lib/src/server.dart
@@ -26,6 +26,20 @@ import 'utils.dart';
/// asynchronously, it's possible for multiple methods to be invoked at the same
/// time, or even for a single method to be invoked multiple times at once.
class Server {
+ /// The stream for decoded requests.
+ final Stream _requests;
+
+ /// The subscription to the decoded request stream.
+ StreamSubscription _requestSubscription;
+
+ /// The sink for decoded responses.
+ final StreamSink _responses;
+
+ /// The completer for [listen].
+ ///
+ /// This is non-`null` after [listen] has been called.
+ Completer _listenCompleter;
+
/// The methods registered for this server.
final _methods = new Map<String, Function>();
@@ -35,7 +49,107 @@ class Server {
/// [RpcException.methodNotFound] exception.
final _fallbacks = new Queue<Function>();
- Server();
+ /// Creates a [Server] that reads requests from [requests] and writes
+ /// responses to [responses].
+ ///
+ /// If [requests] is a [StreamSink] as well as a [Stream] (for example, a
+ /// `WebSocket`), [responses] may be omitted.
+ ///
+ /// Note that the server won't begin listening to [requests] until
+ /// [Server.listen] is called.
+ factory Server(Stream<String> requests, [StreamSink<String> responses]) {
+ if (responses == null) {
+ if (requests is! StreamSink) {
+ throw new ArgumentError("Either `requests` must be a StreamSink or "
+ "`responses` must be passed.");
+ }
+ responses = requests as StreamSink;
+ }
+
+ var wrappedResponses = mapStreamSink(responses, JSON.encode);
+ return new Server.withoutJson(requests.expand((request) {
+ var decodedRequest;
+ try {
+ decodedRequest = JSON.decode(request);
+ } on FormatException catch (error) {
+ wrappedResponses.add(new RpcException(error_code.PARSE_ERROR,
+ 'Invalid JSON: ${error.message}').serialize(request));
+ return [];
+ }
+
+ return [decodedRequest];
+ }), wrappedResponses);
+ }
+
+ /// Creates a [Server] that reads decoded requests from [requests] and writes
+ /// decoded responses to [responses].
+ ///
+ /// Unlike [new Server], this doesn't read or write JSON strings. Instead, it
+ /// reads and writes decoded maps or lists.
+ ///
+ /// If [requests] is a [StreamSink] as well as a [Stream] (for example, a
+ /// `WebSocket`), [responses] may be omitted.
Bob Nystrom 2014/06/03 22:26:13 WebSocket is no longer a good example here since i
nweiz 2014/06/03 23:37:00 What do you mean? WebSocket sends and receives str
Bob Nystrom 2014/06/04 16:40:48 Right, but this method sends and receives decoded
nweiz 2014/06/04 20:33:51 Oh, gotcha. Changed.
+ ///
+ /// Note that the server won't begin listening to [requests] until
+ /// [Server.listen] is called.
+ Server.withoutJson(Stream requests, [StreamSink responses])
+ : _requests = requests,
+ _responses = responses == null && requests is StreamSink ?
+ requests : responses {
+ if (_responses == null) {
+ throw new ArgumentError("Either `requests` must be a StreamSink or "
+ "`responses` must be passed.");
+ }
+ }
+
+ /// Starts listening to the underlying stream.
+ ///
+ /// Returns a [Future] that will complete when the stream is closed of when it
Bob Nystrom 2014/06/03 22:26:13 "of" -> "or".
nweiz 2014/06/03 23:37:00 Done.
+ /// has an error.
+ ///
+ /// [listen] may only be called once.
+ Future listen() {
+ if (_listenCompleter != null) {
+ return new Future.error(new StateError(
+ "Can only call Server.listen once on a given server."),
+ new Chain.current());
Bob Nystrom 2014/06/03 22:26:13 For programmatic errors like StateError, I think i
nweiz 2014/06/03 23:37:00 Done.
+ }
+
+ _listenCompleter = new Completer();
+ _requestSubscription = _requests.listen(_handleRequest,
+ onError: (error, stackTrace) {
+ if (_listenCompleter.isCompleted) return;
+ _responses.close();
+ _listenCompleter.completeError(error, stackTrace);
+ }, onDone: () {
+ if (_listenCompleter.isCompleted) return;
+ _responses.close();
+ _listenCompleter.complete();
+ }, cancelOnError: true);
+
+ return _listenCompleter.future;
+ }
+
+ /// Closes the server's request subscription and response sink.
+ ///
+ /// Returns a [Future] that completes when all resources have been released.
+ ///
+ /// A server can't be closed before [listen] has been called.
+ Future close() {
+ if (_listenCompleter == null) {
+ return new Future.error(new StateError(
+ "Can't call Server.close before Server.listen."),
+ new Chain.current());
+ }
+
+ if (!_listenCompleter.isCompleted) _listenCompleter.complete();
+
+ var subscriptionFuture = _requestSubscription.cancel();
+ // TODO(nweiz): include the response future in the return value when issue
+ // 19095 is fixed.
+ _responses.close();
+ return subscriptionFuture == null ? new Future.value() : subscriptionFuture;
+ }
/// Registers a method named [name] on this server.
///
@@ -69,14 +183,14 @@ class Server {
_fallbacks.add(callback);
}
- /// Handle a request that's already been parsed from JSON.
+ /// Handle a request.
///
/// [request] is expected to be a JSON-serializable object representing a
/// request sent by a client. This calls the appropriate method or methods for
/// handling that request and returns a JSON-serializable response, or `null`
/// if no response should be sent. [callback] may send custom
/// errors by throwing an [RpcException].
- Future handleRequest(request) {
+ Future _handleRequest(request) {
return syncFuture(() {
if (request is! List) return _handleSingleRequest(request);
if (request.isEmpty) {
@@ -88,28 +202,7 @@ class Server {
var nonNull = results.where((result) => result != null);
return nonNull.isEmpty ? null : nonNull.toList();
});
- });
- }
-
- /// Parses and handles a JSON serialized request.
- ///
- /// This calls the appropriate method or methods for handling that request and
- /// returns a JSON string, or `null` if no response should be sent.
- Future<String> parseRequest(String request) {
- return syncFuture(() {
- var decodedRequest;
- try {
- decodedRequest = JSON.decode(request);
- } on FormatException catch (error) {
- return new RpcException(error_code.PARSE_ERROR, 'Invalid JSON: '
- '${error.message}').serialize(request);
- }
-
- return handleRequest(decodedRequest);
- }).then((response) {
- if (response == null) return null;
- return JSON.encode(response);
- });
+ }).then(_responses.add);
}
/// Handles an individual parsed request.

Powered by Google App Engine
This is Rietveld 408576698