| Index: lib/src/server.dart
|
| diff --git a/lib/src/server.dart b/lib/src/server.dart
|
| index 06dd0b9e3dab64db35baa7779dc7c756d20dd3cb..3ef59ed0f48c9a872748feaa86393bf68a73254e 100644
|
| --- a/lib/src/server.dart
|
| +++ b/lib/src/server.dart
|
| @@ -7,11 +7,12 @@ import 'dart:collection';
|
| import 'dart:convert';
|
|
|
| import 'package:stack_trace/stack_trace.dart';
|
| +import 'package:stream_channel/stream_channel.dart';
|
|
|
| import '../error_code.dart' as error_code;
|
| +import 'channel_manager.dart';
|
| import 'exception.dart';
|
| import 'parameters.dart';
|
| -import 'two_way_stream.dart';
|
| import 'utils.dart';
|
|
|
| /// A JSON-RPC 2.0 server.
|
| @@ -25,7 +26,7 @@ import 'utils.dart';
|
| /// asynchronously, it's possible for multiple methods to be invoked at the same
|
| /// time, or even for a single method to be invoked multiple times at once.
|
| class Server {
|
| - TwoWayStream _streams;
|
| + final ChannelManager _manager;
|
|
|
| /// The methods registered for this server.
|
| final _methods = new Map<String, Function>();
|
| @@ -36,59 +37,53 @@ class Server {
|
| /// [RpcException.methodNotFound] exception.
|
| final _fallbacks = new Queue<Function>();
|
|
|
| - /// Returns a [Future] that completes when the connection is closed.
|
| + /// Returns a [Future] that completes when the underlying connection is
|
| + /// closed.
|
| ///
|
| - /// This is the same future that's returned by [listen].
|
| - Future get done => _streams.done;
|
| + /// This is the same future that's returned by [listen] and [close]. It may
|
| + /// complete before [close] is called if the remote endpoint closes the
|
| + /// connection.
|
| + Future get done => _manager.done;
|
|
|
| - /// Whether the connection is closed.
|
| - bool get isClosed => _streams.isClosed;
|
| -
|
| - /// Creates a [Server] that reads requests from [requests] and writes
|
| - /// responses to [responses].
|
| + /// Whether the underlying connection is closed.
|
| ///
|
| - /// If [requests] is a [StreamSink] as well as a [Stream] (for example, a
|
| - /// `WebSocket`), [responses] may be omitted.
|
| + /// Note that this will be `true` before [close] is called if the remote
|
| + /// endpoint closes the connection.
|
| + bool get isClosed => _manager.isClosed;
|
| +
|
| + /// Creates a [Server] that communicates over [channel].
|
| ///
|
| /// Note that the server won't begin listening to [requests] until
|
| /// [Server.listen] is called.
|
| - Server(Stream<String> requests, [StreamSink<String> responses]) {
|
| - _streams = new TwoWayStream("Server", requests, "requests",
|
| - responses, "responses", onInvalidInput: (message, error) {
|
| - _streams.add(new RpcException(error_code.PARSE_ERROR,
|
| - 'Invalid JSON: ${error.message}').serialize(message));
|
| - });
|
| - }
|
| + Server(StreamChannel<String> channel)
|
| + : this.withoutJson(channel
|
| + .transform(jsonDocument)
|
| + .transform(respondToFormatExceptions));
|
|
|
| - /// Creates a [Server] that reads decoded requests from [requests] and writes
|
| - /// decoded responses to [responses].
|
| + /// Creates a [Server] that communicates using decoded messages over
|
| + /// [channel].
|
| ///
|
| /// Unlike [new Server], this doesn't read or write JSON strings. Instead, it
|
| /// reads and writes decoded maps or lists.
|
| ///
|
| - /// If [requests] is a [StreamSink] as well as a [Stream], [responses] may be
|
| - /// omitted.
|
| - ///
|
| /// Note that the server won't begin listening to [requests] until
|
| /// [Server.listen] is called.
|
| - Server.withoutJson(Stream requests, [StreamSink responses])
|
| - : _streams = new TwoWayStream.withoutJson(
|
| - "Server", requests, "requests", responses, "responses");
|
| + Server.withoutJson(StreamChannel channel)
|
| + : _manager = new ChannelManager("Server", channel);
|
|
|
| /// Starts listening to the underlying stream.
|
| ///
|
| - /// Returns a [Future] that will complete when the stream is closed or when it
|
| - /// has an error.
|
| + /// Returns a [Future] that will complete when the connection is closed or
|
| + /// when it has an error. This is the same as [done].
|
| ///
|
| /// [listen] may only be called once.
|
| - Future listen() => _streams.listen(_handleRequest);
|
| + Future listen() => _manager.listen(_handleRequest);
|
|
|
| - /// Closes the server's request subscription and response sink.
|
| + /// Closes the underlying connection.
|
| ///
|
| /// Returns a [Future] that completes when all resources have been released.
|
| - ///
|
| - /// A server can't be closed before [listen] has been called.
|
| - Future close() => _streams.close();
|
| + /// This is the same as [done].
|
| + Future close() => _manager.close();
|
|
|
| /// Registers a method named [name] on this server.
|
| ///
|
| @@ -129,21 +124,24 @@ class Server {
|
| /// handling that request and returns a JSON-serializable response, or `null`
|
| /// if no response should be sent. [callback] may send custom
|
| /// errors by throwing an [RpcException].
|
| - Future _handleRequest(request) {
|
| - return syncFuture(() {
|
| - if (request is! List) return _handleSingleRequest(request);
|
| - if (request.isEmpty) {
|
| - return new RpcException(error_code.INVALID_REQUEST, 'A batch must '
|
| - 'contain at least one request.').serialize(request);
|
| - }
|
| + Future _handleRequest(request) async {
|
| + var response;
|
| + if (request is! List) {
|
| + response = await _handleSingleRequest(request);
|
| + if (response == null) return;
|
| + } else if (request.isEmpty) {
|
| + response = new RpcException(
|
| + error_code.INVALID_REQUEST,
|
| + 'A batch must contain at least one request.')
|
| + .serialize(request);
|
| + } else {
|
| + var results = await Future.wait(request.map(_handleSingleRequest));
|
| + var nonNull = results.where((result) => result != null);
|
| + if (nonNull.isEmpty) return;
|
| + response = nonNull.toList();
|
| + }
|
|
|
| - return Future.wait(request.map(_handleSingleRequest)).then((results) {
|
| - var nonNull = results.where((result) => result != null);
|
| - return nonNull.isEmpty ? null : nonNull.toList();
|
| - });
|
| - }).then((response) {
|
| - if (!_streams.isClosed && response != null) _streams.add(response);
|
| - });
|
| + if (!isClosed) _manager.add(response);
|
| }
|
|
|
| /// Handles an individual parsed request.
|
|
|