Index: pkg/json_rpc_2/lib/src/server.dart |
diff --git a/pkg/json_rpc_2/lib/src/server.dart b/pkg/json_rpc_2/lib/src/server.dart |
index c7ece5b9503af15202177bcc022dc53e3d1347d4..23e24067bbae6e14b491f096abd0fd60455e337d 100644 |
--- a/pkg/json_rpc_2/lib/src/server.dart |
+++ b/pkg/json_rpc_2/lib/src/server.dart |
@@ -13,6 +13,7 @@ import 'package:stack_trace/stack_trace.dart'; |
import '../error_code.dart' as error_code; |
import 'exception.dart'; |
import 'parameters.dart'; |
+import 'stream_manager.dart'; |
import 'utils.dart'; |
/// A JSON-RPC 2.0 server. |
@@ -26,19 +27,7 @@ import 'utils.dart'; |
/// asynchronously, it's possible for multiple methods to be invoked at the same |
/// time, or even for a single method to be invoked multiple times at once. |
class Server { |
- /// The stream for decoded requests. |
- final Stream _requests; |
- |
- /// The subscription to the decoded request stream. |
- StreamSubscription _requestSubscription; |
- |
- /// The sink for decoded responses. |
- final StreamSink _responses; |
- |
- /// The completer for [listen]. |
- /// |
- /// This is non-`null` after [listen] has been called. |
- Completer _listenCompleter; |
+ StreamManager _streams; |
/// The methods registered for this server. |
final _methods = new Map<String, Function>(); |
@@ -57,28 +46,12 @@ class Server { |
/// |
/// Note that the server won't begin listening to [requests] until |
/// [Server.listen] is called. |
- factory Server(Stream<String> requests, [StreamSink<String> responses]) { |
- if (responses == null) { |
- if (requests is! StreamSink) { |
- throw new ArgumentError("Either `requests` must be a StreamSink or " |
- "`responses` must be passed."); |
- } |
- responses = requests as StreamSink; |
- } |
- |
- var wrappedResponses = mapStreamSink(responses, JSON.encode); |
- return new Server.withoutJson(requests.expand((request) { |
- var decodedRequest; |
- try { |
- decodedRequest = JSON.decode(request); |
- } on FormatException catch (error) { |
- wrappedResponses.add(new RpcException(error_code.PARSE_ERROR, |
- 'Invalid JSON: ${error.message}').serialize(request)); |
- return []; |
- } |
- |
- return [decodedRequest]; |
- }), wrappedResponses); |
+ Server(Stream<String> requests, [StreamSink<String> responses]) { |
+ _streams = new StreamManager("Server", requests, "requests", |
+ responses, "responses", onInvalidInput: (message, error) { |
+ _streams.add(new RpcException(error_code.PARSE_ERROR, |
+ 'Invalid JSON: ${error.message}').serialize(message)); |
+ }); |
} |
/// Creates a [Server] that reads decoded requests from [requests] and writes |
@@ -93,14 +66,8 @@ class Server { |
/// Note that the server won't begin listening to [requests] until |
/// [Server.listen] is called. |
Server.withoutJson(Stream requests, [StreamSink responses]) |
- : _requests = requests, |
- _responses = responses == null && requests is StreamSink ? |
- requests : responses { |
- if (_responses == null) { |
- throw new ArgumentError("Either `requests` must be a StreamSink or " |
- "`responses` must be passed."); |
- } |
- } |
+ : _streams = new StreamManager.withoutJson( |
+ "Server", requests, "requests", responses, "responses"); |
/// Starts listening to the underlying stream. |
/// |
@@ -108,45 +75,14 @@ class Server { |
/// has an error. |
/// |
/// [listen] may only be called once. |
- Future listen() { |
- if (_listenCompleter != null) { |
- throw new StateError( |
- "Can only call Server.listen once on a given server."); |
- } |
- |
- _listenCompleter = new Completer(); |
- _requestSubscription = _requests.listen(_handleRequest, |
- onError: (error, stackTrace) { |
- if (_listenCompleter.isCompleted) return; |
- _responses.close(); |
- _listenCompleter.completeError(error, stackTrace); |
- }, onDone: () { |
- if (_listenCompleter.isCompleted) return; |
- _responses.close(); |
- _listenCompleter.complete(); |
- }, cancelOnError: true); |
- |
- return _listenCompleter.future; |
- } |
+ Future listen() => _streams.listen(_handleRequest); |
/// Closes the server's request subscription and response sink. |
/// |
/// Returns a [Future] that completes when all resources have been released. |
/// |
/// A server can't be closed before [listen] has been called. |
- Future close() { |
- if (_listenCompleter == null) { |
- throw new StateError("Can't call Server.close before Server.listen."); |
- } |
- |
- if (!_listenCompleter.isCompleted) _listenCompleter.complete(); |
- |
- var subscriptionFuture = _requestSubscription.cancel(); |
- // TODO(nweiz): include the response future in the return value when issue |
- // 19095 is fixed. |
- _responses.close(); |
- return subscriptionFuture == null ? new Future.value() : subscriptionFuture; |
- } |
+ Future close() => _streams.close(); |
/// Registers a method named [name] on this server. |
/// |
@@ -199,7 +135,7 @@ class Server { |
var nonNull = results.where((result) => result != null); |
return nonNull.isEmpty ? null : nonNull.toList(); |
}); |
- }).then(_responses.add); |
+ }).then(_streams.add); |
} |
/// Handles an individual parsed request. |