| Index: pkg/json_rpc_2/lib/src/server.dart
|
| diff --git a/pkg/json_rpc_2/lib/src/server.dart b/pkg/json_rpc_2/lib/src/server.dart
|
| index c7ece5b9503af15202177bcc022dc53e3d1347d4..bd243de73230f35d5db3d281b553c74a409995e6 100644
|
| --- a/pkg/json_rpc_2/lib/src/server.dart
|
| +++ b/pkg/json_rpc_2/lib/src/server.dart
|
| @@ -13,6 +13,7 @@ import 'package:stack_trace/stack_trace.dart';
|
| import '../error_code.dart' as error_code;
|
| import 'exception.dart';
|
| import 'parameters.dart';
|
| +import 'two_way_stream.dart';
|
| import 'utils.dart';
|
|
|
| /// A JSON-RPC 2.0 server.
|
| @@ -26,19 +27,7 @@ import 'utils.dart';
|
| /// asynchronously, it's possible for multiple methods to be invoked at the same
|
| /// time, or even for a single method to be invoked multiple times at once.
|
| class Server {
|
| - /// The stream for decoded requests.
|
| - final Stream _requests;
|
| -
|
| - /// The subscription to the decoded request stream.
|
| - StreamSubscription _requestSubscription;
|
| -
|
| - /// The sink for decoded responses.
|
| - final StreamSink _responses;
|
| -
|
| - /// The completer for [listen].
|
| - ///
|
| - /// This is non-`null` after [listen] has been called.
|
| - Completer _listenCompleter;
|
| + TwoWayStream _streams;
|
|
|
| /// The methods registered for this server.
|
| final _methods = new Map<String, Function>();
|
| @@ -57,28 +46,12 @@ class Server {
|
| ///
|
| /// Note that the server won't begin listening to [requests] until
|
| /// [Server.listen] is called.
|
| - factory Server(Stream<String> requests, [StreamSink<String> responses]) {
|
| - if (responses == null) {
|
| - if (requests is! StreamSink) {
|
| - throw new ArgumentError("Either `requests` must be a StreamSink or "
|
| - "`responses` must be passed.");
|
| - }
|
| - responses = requests as StreamSink;
|
| - }
|
| -
|
| - var wrappedResponses = mapStreamSink(responses, JSON.encode);
|
| - return new Server.withoutJson(requests.expand((request) {
|
| - var decodedRequest;
|
| - try {
|
| - decodedRequest = JSON.decode(request);
|
| - } on FormatException catch (error) {
|
| - wrappedResponses.add(new RpcException(error_code.PARSE_ERROR,
|
| - 'Invalid JSON: ${error.message}').serialize(request));
|
| - return [];
|
| - }
|
| -
|
| - return [decodedRequest];
|
| - }), wrappedResponses);
|
| + Server(Stream<String> requests, [StreamSink<String> responses]) {
|
| + _streams = new TwoWayStream("Server", requests, "requests",
|
| + responses, "responses", onInvalidInput: (message, error) {
|
| + _streams.add(new RpcException(error_code.PARSE_ERROR,
|
| + 'Invalid JSON: ${error.message}').serialize(message));
|
| + });
|
| }
|
|
|
| /// Creates a [Server] that reads decoded requests from [requests] and writes
|
| @@ -93,14 +66,8 @@ class Server {
|
| /// Note that the server won't begin listening to [requests] until
|
| /// [Server.listen] is called.
|
| Server.withoutJson(Stream requests, [StreamSink responses])
|
| - : _requests = requests,
|
| - _responses = responses == null && requests is StreamSink ?
|
| - requests : responses {
|
| - if (_responses == null) {
|
| - throw new ArgumentError("Either `requests` must be a StreamSink or "
|
| - "`responses` must be passed.");
|
| - }
|
| - }
|
| + : _streams = new TwoWayStream.withoutJson(
|
| + "Server", requests, "requests", responses, "responses");
|
|
|
| /// Starts listening to the underlying stream.
|
| ///
|
| @@ -108,45 +75,14 @@ class Server {
|
| /// has an error.
|
| ///
|
| /// [listen] may only be called once.
|
| - Future listen() {
|
| - if (_listenCompleter != null) {
|
| - throw new StateError(
|
| - "Can only call Server.listen once on a given server.");
|
| - }
|
| -
|
| - _listenCompleter = new Completer();
|
| - _requestSubscription = _requests.listen(_handleRequest,
|
| - onError: (error, stackTrace) {
|
| - if (_listenCompleter.isCompleted) return;
|
| - _responses.close();
|
| - _listenCompleter.completeError(error, stackTrace);
|
| - }, onDone: () {
|
| - if (_listenCompleter.isCompleted) return;
|
| - _responses.close();
|
| - _listenCompleter.complete();
|
| - }, cancelOnError: true);
|
| -
|
| - return _listenCompleter.future;
|
| - }
|
| + Future listen() => _streams.listen(_handleRequest);
|
|
|
| /// Closes the server's request subscription and response sink.
|
| ///
|
| /// Returns a [Future] that completes when all resources have been released.
|
| ///
|
| /// A server can't be closed before [listen] has been called.
|
| - Future close() {
|
| - if (_listenCompleter == null) {
|
| - throw new StateError("Can't call Server.close before Server.listen.");
|
| - }
|
| -
|
| - if (!_listenCompleter.isCompleted) _listenCompleter.complete();
|
| -
|
| - var subscriptionFuture = _requestSubscription.cancel();
|
| - // TODO(nweiz): include the response future in the return value when issue
|
| - // 19095 is fixed.
|
| - _responses.close();
|
| - return subscriptionFuture == null ? new Future.value() : subscriptionFuture;
|
| - }
|
| + Future close() => _streams.close();
|
|
|
| /// Registers a method named [name] on this server.
|
| ///
|
| @@ -199,7 +135,7 @@ class Server {
|
| var nonNull = results.where((result) => result != null);
|
| return nonNull.isEmpty ? null : nonNull.toList();
|
| });
|
| - }).then(_responses.add);
|
| + }).then(_streams.add);
|
| }
|
|
|
| /// Handles an individual parsed request.
|
|
|