Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(400)

Side by Side Diff: pkg/json_rpc_2/lib/src/server.dart

Issue 333683003: Extract out a StreamManager class from json_rpc.Server. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: code review Created 6 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « no previous file | pkg/json_rpc_2/lib/src/two_way_stream.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 library json_rpc_2.server; 5 library json_rpc_2.server;
6 6
7 import 'dart:async'; 7 import 'dart:async';
8 import 'dart:collection'; 8 import 'dart:collection';
9 import 'dart:convert'; 9 import 'dart:convert';
10 10
11 import 'package:stack_trace/stack_trace.dart'; 11 import 'package:stack_trace/stack_trace.dart';
12 12
13 import '../error_code.dart' as error_code; 13 import '../error_code.dart' as error_code;
14 import 'exception.dart'; 14 import 'exception.dart';
15 import 'parameters.dart'; 15 import 'parameters.dart';
16 import 'two_way_stream.dart';
16 import 'utils.dart'; 17 import 'utils.dart';
17 18
18 /// A JSON-RPC 2.0 server. 19 /// A JSON-RPC 2.0 server.
19 /// 20 ///
20 /// A server exposes methods that are called by requests, to which it provides 21 /// A server exposes methods that are called by requests, to which it provides
21 /// responses. Methods can be registered using [registerMethod] and 22 /// responses. Methods can be registered using [registerMethod] and
22 /// [registerFallback]. Requests can be handled using [handleRequest] and 23 /// [registerFallback]. Requests can be handled using [handleRequest] and
23 /// [parseRequest]. 24 /// [parseRequest].
24 /// 25 ///
25 /// Note that since requests can arrive asynchronously and methods can run 26 /// Note that since requests can arrive asynchronously and methods can run
26 /// asynchronously, it's possible for multiple methods to be invoked at the same 27 /// asynchronously, it's possible for multiple methods to be invoked at the same
27 /// time, or even for a single method to be invoked multiple times at once. 28 /// time, or even for a single method to be invoked multiple times at once.
28 class Server { 29 class Server {
29 /// The stream for decoded requests. 30 TwoWayStream _streams;
30 final Stream _requests;
31
32 /// The subscription to the decoded request stream.
33 StreamSubscription _requestSubscription;
34
35 /// The sink for decoded responses.
36 final StreamSink _responses;
37
38 /// The completer for [listen].
39 ///
40 /// This is non-`null` after [listen] has been called.
41 Completer _listenCompleter;
42 31
43 /// The methods registered for this server. 32 /// The methods registered for this server.
44 final _methods = new Map<String, Function>(); 33 final _methods = new Map<String, Function>();
45 34
46 /// The fallback methods for this server. 35 /// The fallback methods for this server.
47 /// 36 ///
48 /// These are tried in order until one of them doesn't throw a 37 /// These are tried in order until one of them doesn't throw a
49 /// [RpcException.methodNotFound] exception. 38 /// [RpcException.methodNotFound] exception.
50 final _fallbacks = new Queue<Function>(); 39 final _fallbacks = new Queue<Function>();
51 40
52 /// Creates a [Server] that reads requests from [requests] and writes 41 /// Creates a [Server] that reads requests from [requests] and writes
53 /// responses to [responses]. 42 /// responses to [responses].
54 /// 43 ///
55 /// If [requests] is a [StreamSink] as well as a [Stream] (for example, a 44 /// If [requests] is a [StreamSink] as well as a [Stream] (for example, a
56 /// `WebSocket`), [responses] may be omitted. 45 /// `WebSocket`), [responses] may be omitted.
57 /// 46 ///
58 /// Note that the server won't begin listening to [requests] until 47 /// Note that the server won't begin listening to [requests] until
59 /// [Server.listen] is called. 48 /// [Server.listen] is called.
60 factory Server(Stream<String> requests, [StreamSink<String> responses]) { 49 Server(Stream<String> requests, [StreamSink<String> responses]) {
61 if (responses == null) { 50 _streams = new TwoWayStream("Server", requests, "requests",
62 if (requests is! StreamSink) { 51 responses, "responses", onInvalidInput: (message, error) {
63 throw new ArgumentError("Either `requests` must be a StreamSink or " 52 _streams.add(new RpcException(error_code.PARSE_ERROR,
64 "`responses` must be passed."); 53 'Invalid JSON: ${error.message}').serialize(message));
65 } 54 });
66 responses = requests as StreamSink;
67 }
68
69 var wrappedResponses = mapStreamSink(responses, JSON.encode);
70 return new Server.withoutJson(requests.expand((request) {
71 var decodedRequest;
72 try {
73 decodedRequest = JSON.decode(request);
74 } on FormatException catch (error) {
75 wrappedResponses.add(new RpcException(error_code.PARSE_ERROR,
76 'Invalid JSON: ${error.message}').serialize(request));
77 return [];
78 }
79
80 return [decodedRequest];
81 }), wrappedResponses);
82 } 55 }
83 56
84 /// Creates a [Server] that reads decoded requests from [requests] and writes 57 /// Creates a [Server] that reads decoded requests from [requests] and writes
85 /// decoded responses to [responses]. 58 /// decoded responses to [responses].
86 /// 59 ///
87 /// Unlike [new Server], this doesn't read or write JSON strings. Instead, it 60 /// Unlike [new Server], this doesn't read or write JSON strings. Instead, it
88 /// reads and writes decoded maps or lists. 61 /// reads and writes decoded maps or lists.
89 /// 62 ///
90 /// If [requests] is a [StreamSink] as well as a [Stream], [responses] may be 63 /// If [requests] is a [StreamSink] as well as a [Stream], [responses] may be
91 /// omitted. 64 /// omitted.
92 /// 65 ///
93 /// Note that the server won't begin listening to [requests] until 66 /// Note that the server won't begin listening to [requests] until
94 /// [Server.listen] is called. 67 /// [Server.listen] is called.
95 Server.withoutJson(Stream requests, [StreamSink responses]) 68 Server.withoutJson(Stream requests, [StreamSink responses])
96 : _requests = requests, 69 : _streams = new TwoWayStream.withoutJson(
97 _responses = responses == null && requests is StreamSink ? 70 "Server", requests, "requests", responses, "responses");
98 requests : responses {
99 if (_responses == null) {
100 throw new ArgumentError("Either `requests` must be a StreamSink or "
101 "`responses` must be passed.");
102 }
103 }
104 71
105 /// Starts listening to the underlying stream. 72 /// Starts listening to the underlying stream.
106 /// 73 ///
107 /// Returns a [Future] that will complete when the stream is closed or when it 74 /// Returns a [Future] that will complete when the stream is closed or when it
108 /// has an error. 75 /// has an error.
109 /// 76 ///
110 /// [listen] may only be called once. 77 /// [listen] may only be called once.
111 Future listen() { 78 Future listen() => _streams.listen(_handleRequest);
112 if (_listenCompleter != null) {
113 throw new StateError(
114 "Can only call Server.listen once on a given server.");
115 }
116
117 _listenCompleter = new Completer();
118 _requestSubscription = _requests.listen(_handleRequest,
119 onError: (error, stackTrace) {
120 if (_listenCompleter.isCompleted) return;
121 _responses.close();
122 _listenCompleter.completeError(error, stackTrace);
123 }, onDone: () {
124 if (_listenCompleter.isCompleted) return;
125 _responses.close();
126 _listenCompleter.complete();
127 }, cancelOnError: true);
128
129 return _listenCompleter.future;
130 }
131 79
132 /// Closes the server's request subscription and response sink. 80 /// Closes the server's request subscription and response sink.
133 /// 81 ///
134 /// Returns a [Future] that completes when all resources have been released. 82 /// Returns a [Future] that completes when all resources have been released.
135 /// 83 ///
136 /// A server can't be closed before [listen] has been called. 84 /// A server can't be closed before [listen] has been called.
137 Future close() { 85 Future close() => _streams.close();
138 if (_listenCompleter == null) {
139 throw new StateError("Can't call Server.close before Server.listen.");
140 }
141
142 if (!_listenCompleter.isCompleted) _listenCompleter.complete();
143
144 var subscriptionFuture = _requestSubscription.cancel();
145 // TODO(nweiz): include the response future in the return value when issue
146 // 19095 is fixed.
147 _responses.close();
148 return subscriptionFuture == null ? new Future.value() : subscriptionFuture;
149 }
150 86
151 /// Registers a method named [name] on this server. 87 /// Registers a method named [name] on this server.
152 /// 88 ///
153 /// [callback] can take either zero or one arguments. If it takes zero, any 89 /// [callback] can take either zero or one arguments. If it takes zero, any
154 /// requests for that method that include parameters will be rejected. If it 90 /// requests for that method that include parameters will be rejected. If it
155 /// takes one, it will be passed a [Parameters] object. 91 /// takes one, it will be passed a [Parameters] object.
156 /// 92 ///
157 /// [callback] can return either a JSON-serializable object or a Future that 93 /// [callback] can return either a JSON-serializable object or a Future that
158 /// completes to a JSON-serializable object. Any errors in [callback] will be 94 /// completes to a JSON-serializable object. Any errors in [callback] will be
159 /// reported to the client as JSON-RPC 2.0 errors. 95 /// reported to the client as JSON-RPC 2.0 errors.
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after
192 if (request is! List) return _handleSingleRequest(request); 128 if (request is! List) return _handleSingleRequest(request);
193 if (request.isEmpty) { 129 if (request.isEmpty) {
194 return new RpcException(error_code.INVALID_REQUEST, 'A batch must ' 130 return new RpcException(error_code.INVALID_REQUEST, 'A batch must '
195 'contain at least one request.').serialize(request); 131 'contain at least one request.').serialize(request);
196 } 132 }
197 133
198 return Future.wait(request.map(_handleSingleRequest)).then((results) { 134 return Future.wait(request.map(_handleSingleRequest)).then((results) {
199 var nonNull = results.where((result) => result != null); 135 var nonNull = results.where((result) => result != null);
200 return nonNull.isEmpty ? null : nonNull.toList(); 136 return nonNull.isEmpty ? null : nonNull.toList();
201 }); 137 });
202 }).then(_responses.add); 138 }).then(_streams.add);
203 } 139 }
204 140
205 /// Handles an individual parsed request. 141 /// Handles an individual parsed request.
206 Future _handleSingleRequest(request) { 142 Future _handleSingleRequest(request) {
207 return syncFuture(() { 143 return syncFuture(() {
208 _validateRequest(request); 144 _validateRequest(request);
209 145
210 var name = request['method']; 146 var name = request['method'];
211 var method = _methods[name]; 147 var method = _methods[name];
212 if (method == null) method = _tryFallbacks; 148 if (method == null) method = _tryFallbacks;
(...skipping 88 matching lines...) Expand 10 before | Expand all | Expand 10 after
301 return syncFuture(() => iterator.current(params)).catchError((error) { 237 return syncFuture(() => iterator.current(params)).catchError((error) {
302 if (error is! RpcException) throw error; 238 if (error is! RpcException) throw error;
303 if (error.code != error_code.METHOD_NOT_FOUND) throw error; 239 if (error.code != error_code.METHOD_NOT_FOUND) throw error;
304 return _tryNext(); 240 return _tryNext();
305 }); 241 });
306 } 242 }
307 243
308 return _tryNext(); 244 return _tryNext();
309 } 245 }
310 } 246 }
OLDNEW
« no previous file with comments | « no previous file | pkg/json_rpc_2/lib/src/two_way_stream.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698