Index: lib/src/server.dart |
diff --git a/lib/src/server.dart b/lib/src/server.dart |
index 06dd0b9e3dab64db35baa7779dc7c756d20dd3cb..3ef59ed0f48c9a872748feaa86393bf68a73254e 100644 |
--- a/lib/src/server.dart |
+++ b/lib/src/server.dart |
@@ -7,11 +7,12 @@ import 'dart:collection'; |
import 'dart:convert'; |
import 'package:stack_trace/stack_trace.dart'; |
+import 'package:stream_channel/stream_channel.dart'; |
import '../error_code.dart' as error_code; |
+import 'channel_manager.dart'; |
import 'exception.dart'; |
import 'parameters.dart'; |
-import 'two_way_stream.dart'; |
import 'utils.dart'; |
/// A JSON-RPC 2.0 server. |
@@ -25,7 +26,7 @@ import 'utils.dart'; |
/// asynchronously, it's possible for multiple methods to be invoked at the same |
/// time, or even for a single method to be invoked multiple times at once. |
class Server { |
- TwoWayStream _streams; |
+ final ChannelManager _manager; |
/// The methods registered for this server. |
final _methods = new Map<String, Function>(); |
@@ -36,59 +37,53 @@ class Server { |
/// [RpcException.methodNotFound] exception. |
final _fallbacks = new Queue<Function>(); |
- /// Returns a [Future] that completes when the connection is closed. |
+ /// Returns a [Future] that completes when the underlying connection is |
+ /// closed. |
/// |
- /// This is the same future that's returned by [listen]. |
- Future get done => _streams.done; |
+ /// This is the same future that's returned by [listen] and [close]. It may |
+ /// complete before [close] is called if the remote endpoint closes the |
+ /// connection. |
+ Future get done => _manager.done; |
- /// Whether the connection is closed. |
- bool get isClosed => _streams.isClosed; |
- |
- /// Creates a [Server] that reads requests from [requests] and writes |
- /// responses to [responses]. |
+ /// Whether the underlying connection is closed. |
/// |
- /// If [requests] is a [StreamSink] as well as a [Stream] (for example, a |
- /// `WebSocket`), [responses] may be omitted. |
+ /// Note that this will be `true` before [close] is called if the remote |
+ /// endpoint closes the connection. |
+ bool get isClosed => _manager.isClosed; |
+ |
+ /// Creates a [Server] that communicates over [channel]. |
/// |
/// Note that the server won't begin listening to [requests] until |
/// [Server.listen] is called. |
- Server(Stream<String> requests, [StreamSink<String> responses]) { |
- _streams = new TwoWayStream("Server", requests, "requests", |
- responses, "responses", onInvalidInput: (message, error) { |
- _streams.add(new RpcException(error_code.PARSE_ERROR, |
- 'Invalid JSON: ${error.message}').serialize(message)); |
- }); |
- } |
+ Server(StreamChannel<String> channel) |
+ : this.withoutJson(channel |
+ .transform(jsonDocument) |
+ .transform(respondToFormatExceptions)); |
- /// Creates a [Server] that reads decoded requests from [requests] and writes |
- /// decoded responses to [responses]. |
+ /// Creates a [Server] that communicates using decoded messages over |
+ /// [channel]. |
/// |
/// Unlike [new Server], this doesn't read or write JSON strings. Instead, it |
/// reads and writes decoded maps or lists. |
/// |
- /// If [requests] is a [StreamSink] as well as a [Stream], [responses] may be |
- /// omitted. |
- /// |
/// Note that the server won't begin listening to [requests] until |
/// [Server.listen] is called. |
- Server.withoutJson(Stream requests, [StreamSink responses]) |
- : _streams = new TwoWayStream.withoutJson( |
- "Server", requests, "requests", responses, "responses"); |
+ Server.withoutJson(StreamChannel channel) |
+ : _manager = new ChannelManager("Server", channel); |
/// Starts listening to the underlying stream. |
/// |
- /// Returns a [Future] that will complete when the stream is closed or when it |
- /// has an error. |
+ /// Returns a [Future] that will complete when the connection is closed or |
+ /// when it has an error. This is the same as [done]. |
/// |
/// [listen] may only be called once. |
- Future listen() => _streams.listen(_handleRequest); |
+ Future listen() => _manager.listen(_handleRequest); |
- /// Closes the server's request subscription and response sink. |
+ /// Closes the underlying connection. |
/// |
/// Returns a [Future] that completes when all resources have been released. |
- /// |
- /// A server can't be closed before [listen] has been called. |
- Future close() => _streams.close(); |
+ /// This is the same as [done]. |
+ Future close() => _manager.close(); |
/// Registers a method named [name] on this server. |
/// |
@@ -129,21 +124,24 @@ class Server { |
/// handling that request and returns a JSON-serializable response, or `null` |
/// if no response should be sent. [callback] may send custom |
/// errors by throwing an [RpcException]. |
- Future _handleRequest(request) { |
- return syncFuture(() { |
- if (request is! List) return _handleSingleRequest(request); |
- if (request.isEmpty) { |
- return new RpcException(error_code.INVALID_REQUEST, 'A batch must ' |
- 'contain at least one request.').serialize(request); |
- } |
+ Future _handleRequest(request) async { |
+ var response; |
+ if (request is! List) { |
+ response = await _handleSingleRequest(request); |
+ if (response == null) return; |
+ } else if (request.isEmpty) { |
+ response = new RpcException( |
+ error_code.INVALID_REQUEST, |
+ 'A batch must contain at least one request.') |
+ .serialize(request); |
+ } else { |
+ var results = await Future.wait(request.map(_handleSingleRequest)); |
+ var nonNull = results.where((result) => result != null); |
+ if (nonNull.isEmpty) return; |
+ response = nonNull.toList(); |
+ } |
- return Future.wait(request.map(_handleSingleRequest)).then((results) { |
- var nonNull = results.where((result) => result != null); |
- return nonNull.isEmpty ? null : nonNull.toList(); |
- }); |
- }).then((response) { |
- if (!_streams.isClosed && response != null) _streams.add(response); |
- }); |
+ if (!isClosed) _manager.add(response); |
} |
/// Handles an individual parsed request. |