Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(497)

Side by Side Diff: pkg/analysis_server/lib/src/channel.dart

Issue 185313002: restructure client api to use streams (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: address comments Created 6 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 library channel; 5 library channel;
6 6
7 import 'dart:async';
7 import 'dart:convert'; 8 import 'dart:convert';
8 import 'dart:io'; 9 import 'dart:io';
9 10
10 import 'package:analysis_server/src/protocol.dart'; 11 import 'package:analysis_server/src/protocol.dart';
11 12
12 /** 13 /**
13 * The abstract class [ClientCommunicationChannel] defines the behavior of 14 * The abstract class [ClientCommunicationChannel] defines the behavior of
14 * objects that allows an object to send [Request]s to [AnalysisServer] and to 15 * objects that allows an object to send [Request]s to [AnalysisServer] and to
15 * receive both [Response]s and [Notification]s. 16 * receive both [Response]s and [Notification]s.
16 */ 17 */
17 abstract class ClientCommunicationChannel { 18 abstract class ClientCommunicationChannel {
18 /**
19 * Listen to the channel for responses and notifications.
20 * If a response is received, invoke the [onResponse] function.
21 * If a notification is received, invoke the [onNotification] function.
22 * If an error is encountered while trying to read from
23 * the socket, invoke the [onError] function. If the socket is closed by the
24 * client, invoke the [onDone] function.
25 */
26 void listen(void onResponse(Response response),
27 void onNotification(Notification notification),
28 {void onError(), void onDone()});
29 19
30 /** 20 /**
31 * Send the given [request] to the server. 21 * The stream of notifications from the server.
32 */ 22 */
33 void sendRequest(Request request); 23 Stream<Notification> notificationStream;
24
25 /**
26 * The stream of responses from the server.
27 */
28 Stream<Response> responseStream;
29
30 /**
31 * Send the given [request] to the server
32 * and return a future with the associated [Response].
33 */
34 Future<Response> sendRequest(Request request);
35
36 /**
37 * Close the channel to the server. Once called, all future communication
38 * with the server via [sendRequest] will silently be ignored.
39 */
40 Future close();
34 } 41 }
35 42
36 /** 43 /**
37 * The abstract class [ServerCommunicationChannel] defines the behavior of 44 * The abstract class [ServerCommunicationChannel] defines the behavior of
38 * objects that allow an [AnalysisServer] to receive [Request]s and to return 45 * objects that allow an [AnalysisServer] to receive [Request]s and to return
39 * both [Response]s and [Notification]s. 46 * both [Response]s and [Notification]s.
40 */ 47 */
41 abstract class ServerCommunicationChannel { 48 abstract class ServerCommunicationChannel {
42 /** 49 /**
43 * Listen to the channel for requests. If a request is received, invoke the 50 * Listen to the channel for requests. If a request is received, invoke the
44 * [onRequest] function. If an error is encountered while trying to read from 51 * [onRequest] function. If an error is encountered while trying to read from
45 * the socket, invoke the [onError] function. If the socket is closed by the 52 * the socket, invoke the [onError] function. If the socket is closed by the
46 * client, invoke the [onDone] function. 53 * client, invoke the [onDone] function.
54 * Only one listener is allowed per channel.
47 */ 55 */
48 void listen(void onRequest(Request request), {void onError(), void onDone()}); 56 void listen(void onRequest(Request request), {void onError(), void onDone()});
49 57
50 /** 58 /**
51 * Send the given [notification] to the client. 59 * Send the given [notification] to the client.
52 */ 60 */
53 void sendNotification(Notification notification); 61 void sendNotification(Notification notification);
54 62
55 /** 63 /**
56 * Send the given [response] to the client. 64 * Send the given [response] to the client.
57 */ 65 */
58 void sendResponse(Response response); 66 void sendResponse(Response response);
59 } 67 }
60 68
61 /** 69 /**
62 * Instances of the class [WebSocketClientChannel] implement a 70 * Instances of the class [WebSocketClientChannel] implement a
63 * [ClientCommunicationChannel] that uses a [WebSocket] to communicate with 71 * [ClientCommunicationChannel] that uses a [WebSocket] to communicate with
64 * servers. 72 * servers.
65 */ 73 */
66 class WebSocketClientChannel implements ClientCommunicationChannel { 74 class WebSocketClientChannel implements ClientCommunicationChannel {
67 /** 75 /**
68 * The socket being wrapped. 76 * The socket being wrapped.
69 */ 77 */
70 final WebSocket socket; 78 final WebSocket _socket;
71 79
72 final JsonEncoder jsonEncoder = const JsonEncoder(null); 80 @override
81 Stream<Response> responseStream;
73 82
74 final JsonDecoder jsonDecoder = const JsonDecoder(null); 83 @override
84 Stream<Notification> notificationStream;
75 85
76 /** 86 /**
77 * Initialize a newly create [WebSocket] wrapper to wrap the given [socket]. 87 * Initialize a new [WebSocket] wrapper for the given [_socket].
78 */ 88 */
79 WebSocketClientChannel(this.socket); 89 WebSocketClientChannel(this._socket) {
80 90 Stream jsonStream = _socket
81 @override 91 .where((data) => data is String)
82 void listen(void onResponse(Response response), 92 .transform(new _JsonStreamDecoder())
83 void onNotification(Notification notification), 93 .where((json) => json is Map)
84 {void onError(), void onDone()}) { 94 .asBroadcastStream();
85 socket.listen((data) => _read(data, onResponse, onNotification), 95 responseStream = jsonStream
86 onError: onError, onDone: onDone); 96 .where((json) => json[Notification.EVENT] == null)
97 .transform(new _ResponseConverter())
98 .asBroadcastStream();
99 notificationStream = jsonStream
100 .where((json) => json[Notification.EVENT] != null)
101 .transform(new _NotificationConverter())
102 .asBroadcastStream();
87 } 103 }
88 104
89 @override 105 @override
90 void sendRequest(Request request) { 106 Future<Response> sendRequest(Request request) {
91 socket.add(jsonEncoder.convert(request.toJson())); 107 String id = request.id;
108 _socket.add(JSON.encode(request.toJson()));
109 return responseStream.firstWhere((Response response) => response.id == id);
92 } 110 }
93 111
94 /** 112 @override
95 * Read a request from the given [data] and use the given function to handle 113 Future close() {
96 * the request. 114 return _socket.close();
97 */
98 void _read(Object data,
99 void onResponse(Response response),
100 void onNotification(Notification notification)) {
101 if (data is String) {
102 // Parse the string as a JSON descriptor
103 var json;
104 try {
105 json = jsonDecoder.convert(data);
106 if (json is! Map) {
107 return;
108 }
109 } catch (error) {
110 return;
111 }
112 // Process the resulting structure as a response or notification.
113 if (json[Notification.EVENT] != null) {
114 Notification notification = new Notification.fromJson(json);
115 if (notification != null) {
116 onNotification(notification);
117 }
118 } else {
119 Response response = new Response.fromJson(json);
120 if (response != null) {
121 onResponse(response);
122 }
123 }
124 }
125 } 115 }
126 } 116 }
127 117
128 /** 118 /**
129 * Instances of the class [WebSocketServerChannel] implement a 119 * Instances of the class [WebSocketServerChannel] implement a
130 * [ServerCommunicationChannel] that uses a [WebSocket] to communicate with 120 * [ServerCommunicationChannel] that uses a [WebSocket] to communicate with
131 * clients. 121 * clients.
132 */ 122 */
133 class WebSocketServerChannel implements ServerCommunicationChannel { 123 class WebSocketServerChannel implements ServerCommunicationChannel {
134 /** 124 /**
135 * The socket being wrapped. 125 * The socket being wrapped.
136 */ 126 */
137 final WebSocket socket; 127 final WebSocket socket;
138 128
139 final JsonEncoder jsonEncoder = const JsonEncoder(null);
140
141 /** 129 /**
142 * Initialize a newly create [WebSocket] wrapper to wrap the given [socket]. 130 * Initialize a newly create [WebSocket] wrapper to wrap the given [socket].
143 */ 131 */
144 WebSocketServerChannel(this.socket); 132 WebSocketServerChannel(this.socket);
145 133
146 @override 134 @override
147 void listen(void onRequest(Request request), {void onError(), void onDone()}) { 135 void listen(void onRequest(Request request), {void onError(), void onDone()}) {
148 socket.listen((data) => _readRequest(data, onRequest), onError: onError, 136 socket.listen((data) => _readRequest(data, onRequest), onError: onError,
149 onDone: onDone); 137 onDone: onDone);
150 } 138 }
151 139
152 @override 140 @override
153 void sendNotification(Notification notification) { 141 void sendNotification(Notification notification) {
154 socket.add(jsonEncoder.convert(notification.toJson())); 142 socket.add(JSON.encode(notification.toJson()));
155 } 143 }
156 144
157 @override 145 @override
158 void sendResponse(Response response) { 146 void sendResponse(Response response) {
159 socket.add(jsonEncoder.convert(response.toJson())); 147 socket.add(JSON.encode(response.toJson()));
160 } 148 }
161 149
162 /** 150 /**
163 * Read a request from the given [data] and use the given function to handle 151 * Read a request from the given [data] and use the given function to handle
164 * the request. 152 * the request.
165 */ 153 */
166 void _readRequest(Object data, void onRequest(Request request)) { 154 void _readRequest(Object data, void onRequest(Request request)) {
167 if (data is List<int>) { 155 if (data is List<int>) {
168 sendResponse(new Response.invalidRequestFormat()); 156 sendResponse(new Response.invalidRequestFormat());
169 return; 157 return;
170 } 158 }
171 if (data is String) { 159 if (data is String) {
172 // Parse the string as a JSON descriptor and process the resulting 160 // Parse the string as a JSON descriptor and process the resulting
173 // structure as a request. 161 // structure as a request.
174 Request request = new Request.fromString(data); 162 Request request = new Request.fromString(data);
175 if (request == null) { 163 if (request == null) {
176 sendResponse(new Response.invalidRequestFormat()); 164 sendResponse(new Response.invalidRequestFormat());
177 return; 165 return;
178 } 166 }
179 onRequest(request); 167 onRequest(request);
180 } 168 }
181 } 169 }
182 } 170 }
171
172 /**
173 * Instances of [_JsonStreamDecoder] convert JSON strings to JSON maps
174 */
175 class _JsonStreamDecoder extends Converter<String, Map> {
176
177 @override
178 Map convert(String text) => JSON.decode(text);
179
180 @override
181 ChunkedConversionSink startChunkedConversion(ChunkedConversionSink sink) =>
182 new _ChannelChunkSink<String, Map>(this, sink);
183 }
184
185 /**
186 * Instances of [_ResponseConverter] convert JSON maps to [Response]s.
187 */
188 class _ResponseConverter extends Converter<Map, Response> {
189
190 @override
191 Response convert(Map json) => new Response.fromJson(json);
192
193 @override
194 ChunkedConversionSink startChunkedConversion(ChunkedConversionSink sink) =>
195 new _ChannelChunkSink<Map, Response>(this, sink);
196 }
197
198 /**
199 * Instances of [_NotificationConverter] convert JSON maps to [Notification]s.
200 */
201 class _NotificationConverter extends Converter<Map, Notification> {
202
203 @override
204 Notification convert(Map json) => new Notification.fromJson(json);
205
206 @override
207 ChunkedConversionSink startChunkedConversion(ChunkedConversionSink sink) =>
208 new _ChannelChunkSink<Map, Notification>(this, sink);
209 }
210
211 /**
212 * A [_ChannelChunkSink] uses a [Convter] to translate chunks.
213 */
214 class _ChannelChunkSink<S, T> extends ChunkedConversionSink<S> {
215 final Converter<S, T> _converter;
216 final ChunkedConversionSink _chunkedSink;
217
218 _ChannelChunkSink(this._converter, this._chunkedSink);
219
220 @override
221 void add(S chunk) {
222 var convertedChunk = _converter.convert(chunk);
223 if (convertedChunk != null) {
224 _chunkedSink.add(convertedChunk);
225 }
226 }
227
228 @override
229 void close() => _chunkedSink.close();
230 }
OLDNEW
« no previous file with comments | « pkg/analysis_server/lib/src/analysis_manager.dart ('k') | pkg/analysis_server/lib/src/protocol.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698