Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(544)

Side by Side Diff: lib/src/server.dart

Issue 1652413002: Use StreamChannel. (Closed) Base URL: git@github.com:dart-lang/json_rpc_2.git@master
Patch Set: Code review changes Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « lib/src/peer.dart ('k') | lib/src/two_way_stream.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 import 'dart:async'; 5 import 'dart:async';
6 import 'dart:collection'; 6 import 'dart:collection';
7 import 'dart:convert'; 7 import 'dart:convert';
8 8
9 import 'package:stack_trace/stack_trace.dart'; 9 import 'package:stack_trace/stack_trace.dart';
10 import 'package:stream_channel/stream_channel.dart';
10 11
11 import '../error_code.dart' as error_code; 12 import '../error_code.dart' as error_code;
13 import 'channel_manager.dart';
12 import 'exception.dart'; 14 import 'exception.dart';
13 import 'parameters.dart'; 15 import 'parameters.dart';
14 import 'two_way_stream.dart';
15 import 'utils.dart'; 16 import 'utils.dart';
16 17
17 /// A JSON-RPC 2.0 server. 18 /// A JSON-RPC 2.0 server.
18 /// 19 ///
19 /// A server exposes methods that are called by requests, to which it provides 20 /// A server exposes methods that are called by requests, to which it provides
20 /// responses. Methods can be registered using [registerMethod] and 21 /// responses. Methods can be registered using [registerMethod] and
21 /// [registerFallback]. Requests can be handled using [handleRequest] and 22 /// [registerFallback]. Requests can be handled using [handleRequest] and
22 /// [parseRequest]. 23 /// [parseRequest].
23 /// 24 ///
24 /// Note that since requests can arrive asynchronously and methods can run 25 /// Note that since requests can arrive asynchronously and methods can run
25 /// asynchronously, it's possible for multiple methods to be invoked at the same 26 /// asynchronously, it's possible for multiple methods to be invoked at the same
26 /// time, or even for a single method to be invoked multiple times at once. 27 /// time, or even for a single method to be invoked multiple times at once.
27 class Server { 28 class Server {
28 TwoWayStream _streams; 29 final ChannelManager _manager;
29 30
30 /// The methods registered for this server. 31 /// The methods registered for this server.
31 final _methods = new Map<String, Function>(); 32 final _methods = new Map<String, Function>();
32 33
33 /// The fallback methods for this server. 34 /// The fallback methods for this server.
34 /// 35 ///
35 /// These are tried in order until one of them doesn't throw a 36 /// These are tried in order until one of them doesn't throw a
36 /// [RpcException.methodNotFound] exception. 37 /// [RpcException.methodNotFound] exception.
37 final _fallbacks = new Queue<Function>(); 38 final _fallbacks = new Queue<Function>();
38 39
39 /// Returns a [Future] that completes when the connection is closed. 40 /// Returns a [Future] that completes when the underlying connection is
41 /// closed.
40 /// 42 ///
41 /// This is the same future that's returned by [listen]. 43 /// This is the same future that's returned by [listen] and [close]. It may
42 Future get done => _streams.done; 44 /// complete before [close] is called if the remote endpoint closes the
45 /// connection.
46 Future get done => _manager.done;
43 47
44 /// Whether the connection is closed. 48 /// Whether the underlying connection is closed.
45 bool get isClosed => _streams.isClosed; 49 ///
50 /// Note that this will be `true` before [close] is called if the remote
51 /// endpoint closes the connection.
52 bool get isClosed => _manager.isClosed;
46 53
47 /// Creates a [Server] that reads requests from [requests] and writes 54 /// Creates a [Server] that communicates over [channel].
48 /// responses to [responses].
49 ///
50 /// If [requests] is a [StreamSink] as well as a [Stream] (for example, a
51 /// `WebSocket`), [responses] may be omitted.
52 /// 55 ///
53 /// Note that the server won't begin listening to [requests] until 56 /// Note that the server won't begin listening to [requests] until
54 /// [Server.listen] is called. 57 /// [Server.listen] is called.
55 Server(Stream<String> requests, [StreamSink<String> responses]) { 58 Server(StreamChannel<String> channel)
56 _streams = new TwoWayStream("Server", requests, "requests", 59 : this.withoutJson(channel
57 responses, "responses", onInvalidInput: (message, error) { 60 .transform(jsonDocument)
58 _streams.add(new RpcException(error_code.PARSE_ERROR, 61 .transform(respondToFormatExceptions));
59 'Invalid JSON: ${error.message}').serialize(message));
60 });
61 }
62 62
63 /// Creates a [Server] that reads decoded requests from [requests] and writes 63 /// Creates a [Server] that communicates using decoded messages over
64 /// decoded responses to [responses]. 64 /// [channel].
65 /// 65 ///
66 /// Unlike [new Server], this doesn't read or write JSON strings. Instead, it 66 /// Unlike [new Server], this doesn't read or write JSON strings. Instead, it
67 /// reads and writes decoded maps or lists. 67 /// reads and writes decoded maps or lists.
68 /// 68 ///
69 /// If [requests] is a [StreamSink] as well as a [Stream], [responses] may be
70 /// omitted.
71 ///
72 /// Note that the server won't begin listening to [requests] until 69 /// Note that the server won't begin listening to [requests] until
73 /// [Server.listen] is called. 70 /// [Server.listen] is called.
74 Server.withoutJson(Stream requests, [StreamSink responses]) 71 Server.withoutJson(StreamChannel channel)
75 : _streams = new TwoWayStream.withoutJson( 72 : _manager = new ChannelManager("Server", channel);
76 "Server", requests, "requests", responses, "responses");
77 73
78 /// Starts listening to the underlying stream. 74 /// Starts listening to the underlying stream.
79 /// 75 ///
80 /// Returns a [Future] that will complete when the stream is closed or when it 76 /// Returns a [Future] that will complete when the connection is closed or
81 /// has an error. 77 /// when it has an error. This is the same as [done].
82 /// 78 ///
83 /// [listen] may only be called once. 79 /// [listen] may only be called once.
84 Future listen() => _streams.listen(_handleRequest); 80 Future listen() => _manager.listen(_handleRequest);
85 81
86 /// Closes the server's request subscription and response sink. 82 /// Closes the underlying connection.
87 /// 83 ///
88 /// Returns a [Future] that completes when all resources have been released. 84 /// Returns a [Future] that completes when all resources have been released.
89 /// 85 /// This is the same as [done].
90 /// A server can't be closed before [listen] has been called. 86 Future close() => _manager.close();
91 Future close() => _streams.close();
92 87
93 /// Registers a method named [name] on this server. 88 /// Registers a method named [name] on this server.
94 /// 89 ///
95 /// [callback] can take either zero or one arguments. If it takes zero, any 90 /// [callback] can take either zero or one arguments. If it takes zero, any
96 /// requests for that method that include parameters will be rejected. If it 91 /// requests for that method that include parameters will be rejected. If it
97 /// takes one, it will be passed a [Parameters] object. 92 /// takes one, it will be passed a [Parameters] object.
98 /// 93 ///
99 /// [callback] can return either a JSON-serializable object or a Future that 94 /// [callback] can return either a JSON-serializable object or a Future that
100 /// completes to a JSON-serializable object. Any errors in [callback] will be 95 /// completes to a JSON-serializable object. Any errors in [callback] will be
101 /// reported to the client as JSON-RPC 2.0 errors. 96 /// reported to the client as JSON-RPC 2.0 errors.
(...skipping 20 matching lines...) Expand all
122 _fallbacks.add(callback); 117 _fallbacks.add(callback);
123 } 118 }
124 119
125 /// Handle a request. 120 /// Handle a request.
126 /// 121 ///
127 /// [request] is expected to be a JSON-serializable object representing a 122 /// [request] is expected to be a JSON-serializable object representing a
128 /// request sent by a client. This calls the appropriate method or methods for 123 /// request sent by a client. This calls the appropriate method or methods for
129 /// handling that request and returns a JSON-serializable response, or `null` 124 /// handling that request and returns a JSON-serializable response, or `null`
130 /// if no response should be sent. [callback] may send custom 125 /// if no response should be sent. [callback] may send custom
131 /// errors by throwing an [RpcException]. 126 /// errors by throwing an [RpcException].
132 Future _handleRequest(request) { 127 Future _handleRequest(request) async {
133 return syncFuture(() { 128 var response;
134 if (request is! List) return _handleSingleRequest(request); 129 if (request is! List) {
135 if (request.isEmpty) { 130 response = await _handleSingleRequest(request);
136 return new RpcException(error_code.INVALID_REQUEST, 'A batch must ' 131 if (response == null) return;
137 'contain at least one request.').serialize(request); 132 } else if (request.isEmpty) {
138 } 133 response = new RpcException(
134 error_code.INVALID_REQUEST,
135 'A batch must contain at least one request.')
136 .serialize(request);
137 } else {
138 var results = await Future.wait(request.map(_handleSingleRequest));
139 var nonNull = results.where((result) => result != null);
140 if (nonNull.isEmpty) return;
141 response = nonNull.toList();
142 }
139 143
140 return Future.wait(request.map(_handleSingleRequest)).then((results) { 144 if (!isClosed) _manager.add(response);
141 var nonNull = results.where((result) => result != null);
142 return nonNull.isEmpty ? null : nonNull.toList();
143 });
144 }).then((response) {
145 if (!_streams.isClosed && response != null) _streams.add(response);
146 });
147 } 145 }
148 146
149 /// Handles an individual parsed request. 147 /// Handles an individual parsed request.
150 Future _handleSingleRequest(request) { 148 Future _handleSingleRequest(request) {
151 return syncFuture(() { 149 return syncFuture(() {
152 _validateRequest(request); 150 _validateRequest(request);
153 151
154 var name = request['method']; 152 var name = request['method'];
155 var method = _methods[name]; 153 var method = _methods[name];
156 if (method == null) method = _tryFallbacks; 154 if (method == null) method = _tryFallbacks;
(...skipping 88 matching lines...) Expand 10 before | Expand all | Expand 10 after
245 return syncFuture(() => iterator.current(params)).catchError((error) { 243 return syncFuture(() => iterator.current(params)).catchError((error) {
246 if (error is! RpcException) throw error; 244 if (error is! RpcException) throw error;
247 if (error.code != error_code.METHOD_NOT_FOUND) throw error; 245 if (error.code != error_code.METHOD_NOT_FOUND) throw error;
248 return _tryNext(); 246 return _tryNext();
249 }); 247 });
250 } 248 }
251 249
252 return _tryNext(); 250 return _tryNext();
253 } 251 }
254 } 252 }
OLDNEW
« no previous file with comments | « lib/src/peer.dart ('k') | lib/src/two_way_stream.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698