Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(296)

Unified Diff: pkg/json_rpc_2/lib/src/server.dart

Issue 333683003: Extract out a StreamManager class from json_rpc.Server. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: code review Created 6 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | pkg/json_rpc_2/lib/src/two_way_stream.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: pkg/json_rpc_2/lib/src/server.dart
diff --git a/pkg/json_rpc_2/lib/src/server.dart b/pkg/json_rpc_2/lib/src/server.dart
index c7ece5b9503af15202177bcc022dc53e3d1347d4..bd243de73230f35d5db3d281b553c74a409995e6 100644
--- a/pkg/json_rpc_2/lib/src/server.dart
+++ b/pkg/json_rpc_2/lib/src/server.dart
@@ -13,6 +13,7 @@ import 'package:stack_trace/stack_trace.dart';
import '../error_code.dart' as error_code;
import 'exception.dart';
import 'parameters.dart';
+import 'two_way_stream.dart';
import 'utils.dart';
/// A JSON-RPC 2.0 server.
@@ -26,19 +27,7 @@ import 'utils.dart';
/// asynchronously, it's possible for multiple methods to be invoked at the same
/// time, or even for a single method to be invoked multiple times at once.
class Server {
- /// The stream for decoded requests.
- final Stream _requests;
-
- /// The subscription to the decoded request stream.
- StreamSubscription _requestSubscription;
-
- /// The sink for decoded responses.
- final StreamSink _responses;
-
- /// The completer for [listen].
- ///
- /// This is non-`null` after [listen] has been called.
- Completer _listenCompleter;
+ TwoWayStream _streams;
/// The methods registered for this server.
final _methods = new Map<String, Function>();
@@ -57,28 +46,12 @@ class Server {
///
/// Note that the server won't begin listening to [requests] until
/// [Server.listen] is called.
- factory Server(Stream<String> requests, [StreamSink<String> responses]) {
- if (responses == null) {
- if (requests is! StreamSink) {
- throw new ArgumentError("Either `requests` must be a StreamSink or "
- "`responses` must be passed.");
- }
- responses = requests as StreamSink;
- }
-
- var wrappedResponses = mapStreamSink(responses, JSON.encode);
- return new Server.withoutJson(requests.expand((request) {
- var decodedRequest;
- try {
- decodedRequest = JSON.decode(request);
- } on FormatException catch (error) {
- wrappedResponses.add(new RpcException(error_code.PARSE_ERROR,
- 'Invalid JSON: ${error.message}').serialize(request));
- return [];
- }
-
- return [decodedRequest];
- }), wrappedResponses);
+ Server(Stream<String> requests, [StreamSink<String> responses]) {
+ _streams = new TwoWayStream("Server", requests, "requests",
+ responses, "responses", onInvalidInput: (message, error) {
+ _streams.add(new RpcException(error_code.PARSE_ERROR,
+ 'Invalid JSON: ${error.message}').serialize(message));
+ });
}
/// Creates a [Server] that reads decoded requests from [requests] and writes
@@ -93,14 +66,8 @@ class Server {
/// Note that the server won't begin listening to [requests] until
/// [Server.listen] is called.
Server.withoutJson(Stream requests, [StreamSink responses])
- : _requests = requests,
- _responses = responses == null && requests is StreamSink ?
- requests : responses {
- if (_responses == null) {
- throw new ArgumentError("Either `requests` must be a StreamSink or "
- "`responses` must be passed.");
- }
- }
+ : _streams = new TwoWayStream.withoutJson(
+ "Server", requests, "requests", responses, "responses");
/// Starts listening to the underlying stream.
///
@@ -108,45 +75,14 @@ class Server {
/// has an error.
///
/// [listen] may only be called once.
- Future listen() {
- if (_listenCompleter != null) {
- throw new StateError(
- "Can only call Server.listen once on a given server.");
- }
-
- _listenCompleter = new Completer();
- _requestSubscription = _requests.listen(_handleRequest,
- onError: (error, stackTrace) {
- if (_listenCompleter.isCompleted) return;
- _responses.close();
- _listenCompleter.completeError(error, stackTrace);
- }, onDone: () {
- if (_listenCompleter.isCompleted) return;
- _responses.close();
- _listenCompleter.complete();
- }, cancelOnError: true);
-
- return _listenCompleter.future;
- }
+ Future listen() => _streams.listen(_handleRequest);
/// Closes the server's request subscription and response sink.
///
/// Returns a [Future] that completes when all resources have been released.
///
/// A server can't be closed before [listen] has been called.
- Future close() {
- if (_listenCompleter == null) {
- throw new StateError("Can't call Server.close before Server.listen.");
- }
-
- if (!_listenCompleter.isCompleted) _listenCompleter.complete();
-
- var subscriptionFuture = _requestSubscription.cancel();
- // TODO(nweiz): include the response future in the return value when issue
- // 19095 is fixed.
- _responses.close();
- return subscriptionFuture == null ? new Future.value() : subscriptionFuture;
- }
+ Future close() => _streams.close();
/// Registers a method named [name] on this server.
///
@@ -199,7 +135,7 @@ class Server {
var nonNull = results.where((result) => result != null);
return nonNull.isEmpty ? null : nonNull.toList();
});
- }).then(_responses.add);
+ }).then(_streams.add);
}
/// Handles an individual parsed request.
« no previous file with comments | « no previous file | pkg/json_rpc_2/lib/src/two_way_stream.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698