Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(240)

Side by Side Diff: pkg/json_rpc_2/lib/src/server.dart

Issue 309503005: Convert json_rpc.Server to take a Stream and StreamSink. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 6 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 library json_rpc_2.server; 5 library json_rpc_2.server;
6 6
7 import 'dart:async'; 7 import 'dart:async';
8 import 'dart:collection'; 8 import 'dart:collection';
9 import 'dart:convert'; 9 import 'dart:convert';
10 10
11 import 'package:stack_trace/stack_trace.dart'; 11 import 'package:stack_trace/stack_trace.dart';
12 12
13 import '../error_code.dart' as error_code; 13 import '../error_code.dart' as error_code;
14 import 'exception.dart'; 14 import 'exception.dart';
15 import 'parameters.dart'; 15 import 'parameters.dart';
16 import 'utils.dart'; 16 import 'utils.dart';
17 17
18 /// A JSON-RPC 2.0 server. 18 /// A JSON-RPC 2.0 server.
19 /// 19 ///
20 /// A server exposes methods that are called by requests, to which it provides 20 /// A server exposes methods that are called by requests, to which it provides
21 /// responses. Methods can be registered using [registerMethod] and 21 /// responses. Methods can be registered using [registerMethod] and
22 /// [registerFallback]. Requests can be handled using [handleRequest] and 22 /// [registerFallback]. Requests can be handled using [handleRequest] and
23 /// [parseRequest]. 23 /// [parseRequest].
24 /// 24 ///
25 /// Note that since requests can arrive asynchronously and methods can run 25 /// Note that since requests can arrive asynchronously and methods can run
26 /// asynchronously, it's possible for multiple methods to be invoked at the same 26 /// asynchronously, it's possible for multiple methods to be invoked at the same
27 /// time, or even for a single method to be invoked multiple times at once. 27 /// time, or even for a single method to be invoked multiple times at once.
28 class Server { 28 class Server {
29 /// The stream for decoded requests.
30 final Stream _requests;
31
32 /// The subscription to the decoded request stream.
33 StreamSubscription _requestSubscription;
34
35 /// The sink for decoded responses.
36 final StreamSink _responses;
37
38 /// The completer for [listen].
39 ///
40 /// This is non-`null` after [listen] has been called.
41 Completer _listenCompleter;
42
29 /// The methods registered for this server. 43 /// The methods registered for this server.
30 final _methods = new Map<String, Function>(); 44 final _methods = new Map<String, Function>();
31 45
32 /// The fallback methods for this server. 46 /// The fallback methods for this server.
33 /// 47 ///
34 /// These are tried in order until one of them doesn't throw a 48 /// These are tried in order until one of them doesn't throw a
35 /// [RpcException.methodNotFound] exception. 49 /// [RpcException.methodNotFound] exception.
36 final _fallbacks = new Queue<Function>(); 50 final _fallbacks = new Queue<Function>();
37 51
38 Server(); 52 /// Creates a [Server] that reads requests from [requests] and writes
53 /// responses to [responses].
54 ///
55 /// If [requests] is a [StreamSink] as well as a [Stream] (for example, a
56 /// `WebSocket`), [responses] may be omitted.
57 ///
58 /// Note that the server won't begin listening to [requests] until
59 /// [Server.listen] is called.
60 factory Server(Stream<String> requests, [StreamSink<String> responses]) {
61 if (responses == null) {
62 if (requests is! StreamSink) {
63 throw new ArgumentError("Either `requests` must be a StreamSink or "
64 "`responses` must be passed.");
65 }
66 responses = requests as StreamSink;
67 }
68
69 var wrappedResponses = mapStreamSink(responses, JSON.encode);
70 return new Server.withoutJson(requests.expand((request) {
71 var decodedRequest;
72 try {
73 decodedRequest = JSON.decode(request);
74 } on FormatException catch (error) {
75 wrappedResponses.add(new RpcException(error_code.PARSE_ERROR,
76 'Invalid JSON: ${error.message}').serialize(request));
77 return [];
78 }
79
80 return [decodedRequest];
81 }), wrappedResponses);
82 }
83
84 /// Creates a [Server] that reads decoded requests from [requests] and writes
85 /// decoded responses to [responses].
86 ///
87 /// Unlike [new Server], this doesn't read or write JSON strings. Instead, it
88 /// reads and writes decoded maps or lists.
89 ///
90 /// If [requests] is a [StreamSink] as well as a [Stream] (for example, a
91 /// `WebSocket`), [responses] may be omitted.
Bob Nystrom 2014/06/03 22:26:13 WebSocket is no longer a good example here since i
nweiz 2014/06/03 23:37:00 What do you mean? WebSocket sends and receives str
Bob Nystrom 2014/06/04 16:40:48 Right, but this method sends and receives decoded
nweiz 2014/06/04 20:33:51 Oh, gotcha. Changed.
92 ///
93 /// Note that the server won't begin listening to [requests] until
94 /// [Server.listen] is called.
95 Server.withoutJson(Stream requests, [StreamSink responses])
96 : _requests = requests,
97 _responses = responses == null && requests is StreamSink ?
98 requests : responses {
99 if (_responses == null) {
100 throw new ArgumentError("Either `requests` must be a StreamSink or "
101 "`responses` must be passed.");
102 }
103 }
104
105 /// Starts listening to the underlying stream.
106 ///
107 /// Returns a [Future] that will complete when the stream is closed of when it
Bob Nystrom 2014/06/03 22:26:13 "of" -> "or".
nweiz 2014/06/03 23:37:00 Done.
108 /// has an error.
109 ///
110 /// [listen] may only be called once.
111 Future listen() {
112 if (_listenCompleter != null) {
113 return new Future.error(new StateError(
114 "Can only call Server.listen once on a given server."),
115 new Chain.current());
Bob Nystrom 2014/06/03 22:26:13 For programmatic errors like StateError, I think i
nweiz 2014/06/03 23:37:00 Done.
116 }
117
118 _listenCompleter = new Completer();
119 _requestSubscription = _requests.listen(_handleRequest,
120 onError: (error, stackTrace) {
121 if (_listenCompleter.isCompleted) return;
122 _responses.close();
123 _listenCompleter.completeError(error, stackTrace);
124 }, onDone: () {
125 if (_listenCompleter.isCompleted) return;
126 _responses.close();
127 _listenCompleter.complete();
128 }, cancelOnError: true);
129
130 return _listenCompleter.future;
131 }
132
133 /// Closes the server's request subscription and response sink.
134 ///
135 /// Returns a [Future] that completes when all resources have been released.
136 ///
137 /// A server can't be closed before [listen] has been called.
138 Future close() {
139 if (_listenCompleter == null) {
140 return new Future.error(new StateError(
141 "Can't call Server.close before Server.listen."),
142 new Chain.current());
143 }
144
145 if (!_listenCompleter.isCompleted) _listenCompleter.complete();
146
147 var subscriptionFuture = _requestSubscription.cancel();
148 // TODO(nweiz): include the response future in the return value when issue
149 // 19095 is fixed.
150 _responses.close();
151 return subscriptionFuture == null ? new Future.value() : subscriptionFuture;
152 }
39 153
40 /// Registers a method named [name] on this server. 154 /// Registers a method named [name] on this server.
41 /// 155 ///
42 /// [callback] can take either zero or one arguments. If it takes zero, any 156 /// [callback] can take either zero or one arguments. If it takes zero, any
43 /// requests for that method that include parameters will be rejected. If it 157 /// requests for that method that include parameters will be rejected. If it
44 /// takes one, it will be passed a [Parameters] object. 158 /// takes one, it will be passed a [Parameters] object.
45 /// 159 ///
46 /// [callback] can return either a JSON-serializable object or a Future that 160 /// [callback] can return either a JSON-serializable object or a Future that
47 /// completes to a JSON-serializable object. Any errors in [callback] will be 161 /// completes to a JSON-serializable object. Any errors in [callback] will be
48 /// reported to the client as JSON-RPC 2.0 errors. 162 /// reported to the client as JSON-RPC 2.0 errors.
(...skipping 13 matching lines...) Expand all
62 /// [RpcException.methodNotFound] exception. 176 /// [RpcException.methodNotFound] exception.
63 /// 177 ///
64 /// [callback] can return either a JSON-serializable object or a Future that 178 /// [callback] can return either a JSON-serializable object or a Future that
65 /// completes to a JSON-serializable object. Any errors in [callback] will be 179 /// completes to a JSON-serializable object. Any errors in [callback] will be
66 /// reported to the client as JSON-RPC 2.0 errors. [callback] may send custom 180 /// reported to the client as JSON-RPC 2.0 errors. [callback] may send custom
67 /// errors by throwing an [RpcException]. 181 /// errors by throwing an [RpcException].
68 void registerFallback(callback(Parameters parameters)) { 182 void registerFallback(callback(Parameters parameters)) {
69 _fallbacks.add(callback); 183 _fallbacks.add(callback);
70 } 184 }
71 185
72 /// Handle a request that's already been parsed from JSON. 186 /// Handle a request.
73 /// 187 ///
74 /// [request] is expected to be a JSON-serializable object representing a 188 /// [request] is expected to be a JSON-serializable object representing a
75 /// request sent by a client. This calls the appropriate method or methods for 189 /// request sent by a client. This calls the appropriate method or methods for
76 /// handling that request and returns a JSON-serializable response, or `null` 190 /// handling that request and returns a JSON-serializable response, or `null`
77 /// if no response should be sent. [callback] may send custom 191 /// if no response should be sent. [callback] may send custom
78 /// errors by throwing an [RpcException]. 192 /// errors by throwing an [RpcException].
79 Future handleRequest(request) { 193 Future _handleRequest(request) {
80 return syncFuture(() { 194 return syncFuture(() {
81 if (request is! List) return _handleSingleRequest(request); 195 if (request is! List) return _handleSingleRequest(request);
82 if (request.isEmpty) { 196 if (request.isEmpty) {
83 return new RpcException(error_code.INVALID_REQUEST, 'A batch must ' 197 return new RpcException(error_code.INVALID_REQUEST, 'A batch must '
84 'contain at least one request.').serialize(request); 198 'contain at least one request.').serialize(request);
85 } 199 }
86 200
87 return Future.wait(request.map(_handleSingleRequest)).then((results) { 201 return Future.wait(request.map(_handleSingleRequest)).then((results) {
88 var nonNull = results.where((result) => result != null); 202 var nonNull = results.where((result) => result != null);
89 return nonNull.isEmpty ? null : nonNull.toList(); 203 return nonNull.isEmpty ? null : nonNull.toList();
90 }); 204 });
91 }); 205 }).then(_responses.add);
92 }
93
94 /// Parses and handles a JSON serialized request.
95 ///
96 /// This calls the appropriate method or methods for handling that request and
97 /// returns a JSON string, or `null` if no response should be sent.
98 Future<String> parseRequest(String request) {
99 return syncFuture(() {
100 var decodedRequest;
101 try {
102 decodedRequest = JSON.decode(request);
103 } on FormatException catch (error) {
104 return new RpcException(error_code.PARSE_ERROR, 'Invalid JSON: '
105 '${error.message}').serialize(request);
106 }
107
108 return handleRequest(decodedRequest);
109 }).then((response) {
110 if (response == null) return null;
111 return JSON.encode(response);
112 });
113 } 206 }
114 207
115 /// Handles an individual parsed request. 208 /// Handles an individual parsed request.
116 Future _handleSingleRequest(request) { 209 Future _handleSingleRequest(request) {
117 return syncFuture(() { 210 return syncFuture(() {
118 _validateRequest(request); 211 _validateRequest(request);
119 212
120 var name = request['method']; 213 var name = request['method'];
121 var method = _methods[name]; 214 var method = _methods[name];
122 if (method == null) method = _tryFallbacks; 215 if (method == null) method = _tryFallbacks;
(...skipping 88 matching lines...) Expand 10 before | Expand all | Expand 10 after
211 return syncFuture(() => iterator.current(params)).catchError((error) { 304 return syncFuture(() => iterator.current(params)).catchError((error) {
212 if (error is! RpcException) throw error; 305 if (error is! RpcException) throw error;
213 if (error.code != error_code.METHOD_NOT_FOUND) throw error; 306 if (error.code != error_code.METHOD_NOT_FOUND) throw error;
214 return _tryNext(); 307 return _tryNext();
215 }); 308 });
216 } 309 }
217 310
218 return _tryNext(); 311 return _tryNext();
219 } 312 }
220 } 313 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698