Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(155)

Side by Side Diff: pkg/json_rpc_2/lib/src/server.dart

Issue 309503005: Convert json_rpc.Server to take a Stream and StreamSink. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: code review Created 6 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « pkg/json_rpc_2/CHANGELOG.md ('k') | pkg/json_rpc_2/lib/src/utils.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 library json_rpc_2.server; 5 library json_rpc_2.server;
6 6
7 import 'dart:async'; 7 import 'dart:async';
8 import 'dart:collection'; 8 import 'dart:collection';
9 import 'dart:convert'; 9 import 'dart:convert';
10 10
11 import 'package:stack_trace/stack_trace.dart'; 11 import 'package:stack_trace/stack_trace.dart';
12 12
13 import '../error_code.dart' as error_code; 13 import '../error_code.dart' as error_code;
14 import 'exception.dart'; 14 import 'exception.dart';
15 import 'parameters.dart'; 15 import 'parameters.dart';
16 import 'utils.dart'; 16 import 'utils.dart';
17 17
18 /// A JSON-RPC 2.0 server. 18 /// A JSON-RPC 2.0 server.
19 /// 19 ///
20 /// A server exposes methods that are called by requests, to which it provides 20 /// A server exposes methods that are called by requests, to which it provides
21 /// responses. Methods can be registered using [registerMethod] and 21 /// responses. Methods can be registered using [registerMethod] and
22 /// [registerFallback]. Requests can be handled using [handleRequest] and 22 /// [registerFallback]. Requests can be handled using [handleRequest] and
23 /// [parseRequest]. 23 /// [parseRequest].
24 /// 24 ///
25 /// Note that since requests can arrive asynchronously and methods can run 25 /// Note that since requests can arrive asynchronously and methods can run
26 /// asynchronously, it's possible for multiple methods to be invoked at the same 26 /// asynchronously, it's possible for multiple methods to be invoked at the same
27 /// time, or even for a single method to be invoked multiple times at once. 27 /// time, or even for a single method to be invoked multiple times at once.
28 class Server { 28 class Server {
29 /// The stream for decoded requests.
30 final Stream _requests;
31
32 /// The subscription to the decoded request stream.
33 StreamSubscription _requestSubscription;
34
35 /// The sink for decoded responses.
36 final StreamSink _responses;
37
38 /// The completer for [listen].
39 ///
40 /// This is non-`null` after [listen] has been called.
41 Completer _listenCompleter;
42
29 /// The methods registered for this server. 43 /// The methods registered for this server.
30 final _methods = new Map<String, Function>(); 44 final _methods = new Map<String, Function>();
31 45
32 /// The fallback methods for this server. 46 /// The fallback methods for this server.
33 /// 47 ///
34 /// These are tried in order until one of them doesn't throw a 48 /// These are tried in order until one of them doesn't throw a
35 /// [RpcException.methodNotFound] exception. 49 /// [RpcException.methodNotFound] exception.
36 final _fallbacks = new Queue<Function>(); 50 final _fallbacks = new Queue<Function>();
37 51
38 Server(); 52 /// Creates a [Server] that reads requests from [requests] and writes
53 /// responses to [responses].
54 ///
55 /// If [requests] is a [StreamSink] as well as a [Stream] (for example, a
56 /// `WebSocket`), [responses] may be omitted.
57 ///
58 /// Note that the server won't begin listening to [requests] until
59 /// [Server.listen] is called.
60 factory Server(Stream<String> requests, [StreamSink<String> responses]) {
61 if (responses == null) {
62 if (requests is! StreamSink) {
63 throw new ArgumentError("Either `requests` must be a StreamSink or "
64 "`responses` must be passed.");
65 }
66 responses = requests as StreamSink;
67 }
68
69 var wrappedResponses = mapStreamSink(responses, JSON.encode);
70 return new Server.withoutJson(requests.expand((request) {
71 var decodedRequest;
72 try {
73 decodedRequest = JSON.decode(request);
74 } on FormatException catch (error) {
75 wrappedResponses.add(new RpcException(error_code.PARSE_ERROR,
76 'Invalid JSON: ${error.message}').serialize(request));
77 return [];
78 }
79
80 return [decodedRequest];
81 }), wrappedResponses);
82 }
83
84 /// Creates a [Server] that reads decoded requests from [requests] and writes
85 /// decoded responses to [responses].
86 ///
87 /// Unlike [new Server], this doesn't read or write JSON strings. Instead, it
88 /// reads and writes decoded maps or lists.
89 ///
90 /// If [requests] is a [StreamSink] as well as a [Stream], [responses] may be
91 /// omitted.
92 ///
93 /// Note that the server won't begin listening to [requests] until
94 /// [Server.listen] is called.
95 Server.withoutJson(Stream requests, [StreamSink responses])
96 : _requests = requests,
97 _responses = responses == null && requests is StreamSink ?
98 requests : responses {
99 if (_responses == null) {
100 throw new ArgumentError("Either `requests` must be a StreamSink or "
101 "`responses` must be passed.");
102 }
103 }
104
105 /// Starts listening to the underlying stream.
106 ///
107 /// Returns a [Future] that will complete when the stream is closed or when it
108 /// has an error.
109 ///
110 /// [listen] may only be called once.
111 Future listen() {
112 if (_listenCompleter != null) {
113 throw new StateError(
114 "Can only call Server.listen once on a given server.");
115 }
116
117 _listenCompleter = new Completer();
118 _requestSubscription = _requests.listen(_handleRequest,
119 onError: (error, stackTrace) {
120 if (_listenCompleter.isCompleted) return;
121 _responses.close();
122 _listenCompleter.completeError(error, stackTrace);
123 }, onDone: () {
124 if (_listenCompleter.isCompleted) return;
125 _responses.close();
126 _listenCompleter.complete();
127 }, cancelOnError: true);
128
129 return _listenCompleter.future;
130 }
131
132 /// Closes the server's request subscription and response sink.
133 ///
134 /// Returns a [Future] that completes when all resources have been released.
135 ///
136 /// A server can't be closed before [listen] has been called.
137 Future close() {
138 if (_listenCompleter == null) {
139 throw new StateError("Can't call Server.close before Server.listen.");
140 }
141
142 if (!_listenCompleter.isCompleted) _listenCompleter.complete();
143
144 var subscriptionFuture = _requestSubscription.cancel();
145 // TODO(nweiz): include the response future in the return value when issue
146 // 19095 is fixed.
147 _responses.close();
148 return subscriptionFuture == null ? new Future.value() : subscriptionFuture;
149 }
39 150
40 /// Registers a method named [name] on this server. 151 /// Registers a method named [name] on this server.
41 /// 152 ///
42 /// [callback] can take either zero or one arguments. If it takes zero, any 153 /// [callback] can take either zero or one arguments. If it takes zero, any
43 /// requests for that method that include parameters will be rejected. If it 154 /// requests for that method that include parameters will be rejected. If it
44 /// takes one, it will be passed a [Parameters] object. 155 /// takes one, it will be passed a [Parameters] object.
45 /// 156 ///
46 /// [callback] can return either a JSON-serializable object or a Future that 157 /// [callback] can return either a JSON-serializable object or a Future that
47 /// completes to a JSON-serializable object. Any errors in [callback] will be 158 /// completes to a JSON-serializable object. Any errors in [callback] will be
48 /// reported to the client as JSON-RPC 2.0 errors. 159 /// reported to the client as JSON-RPC 2.0 errors.
(...skipping 13 matching lines...) Expand all
62 /// [RpcException.methodNotFound] exception. 173 /// [RpcException.methodNotFound] exception.
63 /// 174 ///
64 /// [callback] can return either a JSON-serializable object or a Future that 175 /// [callback] can return either a JSON-serializable object or a Future that
65 /// completes to a JSON-serializable object. Any errors in [callback] will be 176 /// completes to a JSON-serializable object. Any errors in [callback] will be
66 /// reported to the client as JSON-RPC 2.0 errors. [callback] may send custom 177 /// reported to the client as JSON-RPC 2.0 errors. [callback] may send custom
67 /// errors by throwing an [RpcException]. 178 /// errors by throwing an [RpcException].
68 void registerFallback(callback(Parameters parameters)) { 179 void registerFallback(callback(Parameters parameters)) {
69 _fallbacks.add(callback); 180 _fallbacks.add(callback);
70 } 181 }
71 182
72 /// Handle a request that's already been parsed from JSON. 183 /// Handle a request.
73 /// 184 ///
74 /// [request] is expected to be a JSON-serializable object representing a 185 /// [request] is expected to be a JSON-serializable object representing a
75 /// request sent by a client. This calls the appropriate method or methods for 186 /// request sent by a client. This calls the appropriate method or methods for
76 /// handling that request and returns a JSON-serializable response, or `null` 187 /// handling that request and returns a JSON-serializable response, or `null`
77 /// if no response should be sent. [callback] may send custom 188 /// if no response should be sent. [callback] may send custom
78 /// errors by throwing an [RpcException]. 189 /// errors by throwing an [RpcException].
79 Future handleRequest(request) { 190 Future _handleRequest(request) {
80 return syncFuture(() { 191 return syncFuture(() {
81 if (request is! List) return _handleSingleRequest(request); 192 if (request is! List) return _handleSingleRequest(request);
82 if (request.isEmpty) { 193 if (request.isEmpty) {
83 return new RpcException(error_code.INVALID_REQUEST, 'A batch must ' 194 return new RpcException(error_code.INVALID_REQUEST, 'A batch must '
84 'contain at least one request.').serialize(request); 195 'contain at least one request.').serialize(request);
85 } 196 }
86 197
87 return Future.wait(request.map(_handleSingleRequest)).then((results) { 198 return Future.wait(request.map(_handleSingleRequest)).then((results) {
88 var nonNull = results.where((result) => result != null); 199 var nonNull = results.where((result) => result != null);
89 return nonNull.isEmpty ? null : nonNull.toList(); 200 return nonNull.isEmpty ? null : nonNull.toList();
90 }); 201 });
91 }); 202 }).then(_responses.add);
92 }
93
94 /// Parses and handles a JSON serialized request.
95 ///
96 /// This calls the appropriate method or methods for handling that request and
97 /// returns a JSON string, or `null` if no response should be sent.
98 Future<String> parseRequest(String request) {
99 return syncFuture(() {
100 var decodedRequest;
101 try {
102 decodedRequest = JSON.decode(request);
103 } on FormatException catch (error) {
104 return new RpcException(error_code.PARSE_ERROR, 'Invalid JSON: '
105 '${error.message}').serialize(request);
106 }
107
108 return handleRequest(decodedRequest);
109 }).then((response) {
110 if (response == null) return null;
111 return JSON.encode(response);
112 });
113 } 203 }
114 204
115 /// Handles an individual parsed request. 205 /// Handles an individual parsed request.
116 Future _handleSingleRequest(request) { 206 Future _handleSingleRequest(request) {
117 return syncFuture(() { 207 return syncFuture(() {
118 _validateRequest(request); 208 _validateRequest(request);
119 209
120 var name = request['method']; 210 var name = request['method'];
121 var method = _methods[name]; 211 var method = _methods[name];
122 if (method == null) method = _tryFallbacks; 212 if (method == null) method = _tryFallbacks;
(...skipping 88 matching lines...) Expand 10 before | Expand all | Expand 10 after
211 return syncFuture(() => iterator.current(params)).catchError((error) { 301 return syncFuture(() => iterator.current(params)).catchError((error) {
212 if (error is! RpcException) throw error; 302 if (error is! RpcException) throw error;
213 if (error.code != error_code.METHOD_NOT_FOUND) throw error; 303 if (error.code != error_code.METHOD_NOT_FOUND) throw error;
214 return _tryNext(); 304 return _tryNext();
215 }); 305 });
216 } 306 }
217 307
218 return _tryNext(); 308 return _tryNext();
219 } 309 }
220 } 310 }
OLDNEW
« no previous file with comments | « pkg/json_rpc_2/CHANGELOG.md ('k') | pkg/json_rpc_2/lib/src/utils.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698