Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(88)

Unified Diff: pkg/json_rpc_2/lib/src/server.dart

Issue 309503005: Convert json_rpc.Server to take a Stream and StreamSink. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: code review Created 6 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « pkg/json_rpc_2/CHANGELOG.md ('k') | pkg/json_rpc_2/lib/src/utils.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: pkg/json_rpc_2/lib/src/server.dart
diff --git a/pkg/json_rpc_2/lib/src/server.dart b/pkg/json_rpc_2/lib/src/server.dart
index d05c54f9ee85a82ae89d3c79174820727787987c..c7ece5b9503af15202177bcc022dc53e3d1347d4 100644
--- a/pkg/json_rpc_2/lib/src/server.dart
+++ b/pkg/json_rpc_2/lib/src/server.dart
@@ -26,6 +26,20 @@ import 'utils.dart';
/// asynchronously, it's possible for multiple methods to be invoked at the same
/// time, or even for a single method to be invoked multiple times at once.
class Server {
+ /// The stream for decoded requests.
+ final Stream _requests;
+
+ /// The subscription to the decoded request stream.
+ StreamSubscription _requestSubscription;
+
+ /// The sink for decoded responses.
+ final StreamSink _responses;
+
+ /// The completer for [listen].
+ ///
+ /// This is non-`null` after [listen] has been called.
+ Completer _listenCompleter;
+
/// The methods registered for this server.
final _methods = new Map<String, Function>();
@@ -35,7 +49,104 @@ class Server {
/// [RpcException.methodNotFound] exception.
final _fallbacks = new Queue<Function>();
- Server();
+ /// Creates a [Server] that reads requests from [requests] and writes
+ /// responses to [responses].
+ ///
+ /// If [requests] is a [StreamSink] as well as a [Stream] (for example, a
+ /// `WebSocket`), [responses] may be omitted.
+ ///
+ /// Note that the server won't begin listening to [requests] until
+ /// [Server.listen] is called.
+ factory Server(Stream<String> requests, [StreamSink<String> responses]) {
+ if (responses == null) {
+ if (requests is! StreamSink) {
+ throw new ArgumentError("Either `requests` must be a StreamSink or "
+ "`responses` must be passed.");
+ }
+ responses = requests as StreamSink;
+ }
+
+ var wrappedResponses = mapStreamSink(responses, JSON.encode);
+ return new Server.withoutJson(requests.expand((request) {
+ var decodedRequest;
+ try {
+ decodedRequest = JSON.decode(request);
+ } on FormatException catch (error) {
+ wrappedResponses.add(new RpcException(error_code.PARSE_ERROR,
+ 'Invalid JSON: ${error.message}').serialize(request));
+ return [];
+ }
+
+ return [decodedRequest];
+ }), wrappedResponses);
+ }
+
+ /// Creates a [Server] that reads decoded requests from [requests] and writes
+ /// decoded responses to [responses].
+ ///
+ /// Unlike [new Server], this doesn't read or write JSON strings. Instead, it
+ /// reads and writes decoded maps or lists.
+ ///
+ /// If [requests] is a [StreamSink] as well as a [Stream], [responses] may be
+ /// omitted.
+ ///
+ /// Note that the server won't begin listening to [requests] until
+ /// [Server.listen] is called.
+ Server.withoutJson(Stream requests, [StreamSink responses])
+ : _requests = requests,
+ _responses = responses == null && requests is StreamSink ?
+ requests : responses {
+ if (_responses == null) {
+ throw new ArgumentError("Either `requests` must be a StreamSink or "
+ "`responses` must be passed.");
+ }
+ }
+
+ /// Starts listening to the underlying stream.
+ ///
+ /// Returns a [Future] that will complete when the stream is closed or when it
+ /// has an error.
+ ///
+ /// [listen] may only be called once.
+ Future listen() {
+ if (_listenCompleter != null) {
+ throw new StateError(
+ "Can only call Server.listen once on a given server.");
+ }
+
+ _listenCompleter = new Completer();
+ _requestSubscription = _requests.listen(_handleRequest,
+ onError: (error, stackTrace) {
+ if (_listenCompleter.isCompleted) return;
+ _responses.close();
+ _listenCompleter.completeError(error, stackTrace);
+ }, onDone: () {
+ if (_listenCompleter.isCompleted) return;
+ _responses.close();
+ _listenCompleter.complete();
+ }, cancelOnError: true);
+
+ return _listenCompleter.future;
+ }
+
+ /// Closes the server's request subscription and response sink.
+ ///
+ /// Returns a [Future] that completes when all resources have been released.
+ ///
+ /// A server can't be closed before [listen] has been called.
+ Future close() {
+ if (_listenCompleter == null) {
+ throw new StateError("Can't call Server.close before Server.listen.");
+ }
+
+ if (!_listenCompleter.isCompleted) _listenCompleter.complete();
+
+ var subscriptionFuture = _requestSubscription.cancel();
+ // TODO(nweiz): include the response future in the return value when issue
+ // 19095 is fixed.
+ _responses.close();
+ return subscriptionFuture == null ? new Future.value() : subscriptionFuture;
+ }
/// Registers a method named [name] on this server.
///
@@ -69,14 +180,14 @@ class Server {
_fallbacks.add(callback);
}
- /// Handle a request that's already been parsed from JSON.
+ /// Handle a request.
///
/// [request] is expected to be a JSON-serializable object representing a
/// request sent by a client. This calls the appropriate method or methods for
/// handling that request and returns a JSON-serializable response, or `null`
/// if no response should be sent. [callback] may send custom
/// errors by throwing an [RpcException].
- Future handleRequest(request) {
+ Future _handleRequest(request) {
return syncFuture(() {
if (request is! List) return _handleSingleRequest(request);
if (request.isEmpty) {
@@ -88,28 +199,7 @@ class Server {
var nonNull = results.where((result) => result != null);
return nonNull.isEmpty ? null : nonNull.toList();
});
- });
- }
-
- /// Parses and handles a JSON serialized request.
- ///
- /// This calls the appropriate method or methods for handling that request and
- /// returns a JSON string, or `null` if no response should be sent.
- Future<String> parseRequest(String request) {
- return syncFuture(() {
- var decodedRequest;
- try {
- decodedRequest = JSON.decode(request);
- } on FormatException catch (error) {
- return new RpcException(error_code.PARSE_ERROR, 'Invalid JSON: '
- '${error.message}').serialize(request);
- }
-
- return handleRequest(decodedRequest);
- }).then((response) {
- if (response == null) return null;
- return JSON.encode(response);
- });
+ }).then(_responses.add);
}
/// Handles an individual parsed request.
« no previous file with comments | « pkg/json_rpc_2/CHANGELOG.md ('k') | pkg/json_rpc_2/lib/src/utils.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698