Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(311)

Unified Diff: lib/src/server.dart

Issue 1652413002: Use StreamChannel. (Closed) Base URL: git@github.com:dart-lang/json_rpc_2.git@master
Patch Set: Code review changes Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « lib/src/peer.dart ('k') | lib/src/two_way_stream.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: lib/src/server.dart
diff --git a/lib/src/server.dart b/lib/src/server.dart
index 06dd0b9e3dab64db35baa7779dc7c756d20dd3cb..3ef59ed0f48c9a872748feaa86393bf68a73254e 100644
--- a/lib/src/server.dart
+++ b/lib/src/server.dart
@@ -7,11 +7,12 @@ import 'dart:collection';
import 'dart:convert';
import 'package:stack_trace/stack_trace.dart';
+import 'package:stream_channel/stream_channel.dart';
import '../error_code.dart' as error_code;
+import 'channel_manager.dart';
import 'exception.dart';
import 'parameters.dart';
-import 'two_way_stream.dart';
import 'utils.dart';
/// A JSON-RPC 2.0 server.
@@ -25,7 +26,7 @@ import 'utils.dart';
/// asynchronously, it's possible for multiple methods to be invoked at the same
/// time, or even for a single method to be invoked multiple times at once.
class Server {
- TwoWayStream _streams;
+ final ChannelManager _manager;
/// The methods registered for this server.
final _methods = new Map<String, Function>();
@@ -36,59 +37,53 @@ class Server {
/// [RpcException.methodNotFound] exception.
final _fallbacks = new Queue<Function>();
- /// Returns a [Future] that completes when the connection is closed.
+ /// Returns a [Future] that completes when the underlying connection is
+ /// closed.
///
- /// This is the same future that's returned by [listen].
- Future get done => _streams.done;
+ /// This is the same future that's returned by [listen] and [close]. It may
+ /// complete before [close] is called if the remote endpoint closes the
+ /// connection.
+ Future get done => _manager.done;
- /// Whether the connection is closed.
- bool get isClosed => _streams.isClosed;
-
- /// Creates a [Server] that reads requests from [requests] and writes
- /// responses to [responses].
+ /// Whether the underlying connection is closed.
///
- /// If [requests] is a [StreamSink] as well as a [Stream] (for example, a
- /// `WebSocket`), [responses] may be omitted.
+ /// Note that this will be `true` before [close] is called if the remote
+ /// endpoint closes the connection.
+ bool get isClosed => _manager.isClosed;
+
+ /// Creates a [Server] that communicates over [channel].
///
/// Note that the server won't begin listening to [requests] until
/// [Server.listen] is called.
- Server(Stream<String> requests, [StreamSink<String> responses]) {
- _streams = new TwoWayStream("Server", requests, "requests",
- responses, "responses", onInvalidInput: (message, error) {
- _streams.add(new RpcException(error_code.PARSE_ERROR,
- 'Invalid JSON: ${error.message}').serialize(message));
- });
- }
+ Server(StreamChannel<String> channel)
+ : this.withoutJson(channel
+ .transform(jsonDocument)
+ .transform(respondToFormatExceptions));
- /// Creates a [Server] that reads decoded requests from [requests] and writes
- /// decoded responses to [responses].
+ /// Creates a [Server] that communicates using decoded messages over
+ /// [channel].
///
/// Unlike [new Server], this doesn't read or write JSON strings. Instead, it
/// reads and writes decoded maps or lists.
///
- /// If [requests] is a [StreamSink] as well as a [Stream], [responses] may be
- /// omitted.
- ///
/// Note that the server won't begin listening to [requests] until
/// [Server.listen] is called.
- Server.withoutJson(Stream requests, [StreamSink responses])
- : _streams = new TwoWayStream.withoutJson(
- "Server", requests, "requests", responses, "responses");
+ Server.withoutJson(StreamChannel channel)
+ : _manager = new ChannelManager("Server", channel);
/// Starts listening to the underlying stream.
///
- /// Returns a [Future] that will complete when the stream is closed or when it
- /// has an error.
+ /// Returns a [Future] that will complete when the connection is closed or
+ /// when it has an error. This is the same as [done].
///
/// [listen] may only be called once.
- Future listen() => _streams.listen(_handleRequest);
+ Future listen() => _manager.listen(_handleRequest);
- /// Closes the server's request subscription and response sink.
+ /// Closes the underlying connection.
///
/// Returns a [Future] that completes when all resources have been released.
- ///
- /// A server can't be closed before [listen] has been called.
- Future close() => _streams.close();
+ /// This is the same as [done].
+ Future close() => _manager.close();
/// Registers a method named [name] on this server.
///
@@ -129,21 +124,24 @@ class Server {
/// handling that request and returns a JSON-serializable response, or `null`
/// if no response should be sent. [callback] may send custom
/// errors by throwing an [RpcException].
- Future _handleRequest(request) {
- return syncFuture(() {
- if (request is! List) return _handleSingleRequest(request);
- if (request.isEmpty) {
- return new RpcException(error_code.INVALID_REQUEST, 'A batch must '
- 'contain at least one request.').serialize(request);
- }
+ Future _handleRequest(request) async {
+ var response;
+ if (request is! List) {
+ response = await _handleSingleRequest(request);
+ if (response == null) return;
+ } else if (request.isEmpty) {
+ response = new RpcException(
+ error_code.INVALID_REQUEST,
+ 'A batch must contain at least one request.')
+ .serialize(request);
+ } else {
+ var results = await Future.wait(request.map(_handleSingleRequest));
+ var nonNull = results.where((result) => result != null);
+ if (nonNull.isEmpty) return;
+ response = nonNull.toList();
+ }
- return Future.wait(request.map(_handleSingleRequest)).then((results) {
- var nonNull = results.where((result) => result != null);
- return nonNull.isEmpty ? null : nonNull.toList();
- });
- }).then((response) {
- if (!_streams.isClosed && response != null) _streams.add(response);
- });
+ if (!isClosed) _manager.add(response);
}
/// Handles an individual parsed request.
« no previous file with comments | « lib/src/peer.dart ('k') | lib/src/two_way_stream.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698