Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(326)

Side by Side Diff: pkg/analysis_server/lib/src/channel.dart

Issue 214933002: Clean-up and progress on analysis server (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 6 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 library channel; 5 library channel;
6 6
7 import 'dart:async'; 7 import 'dart:async';
8 import 'dart:convert'; 8 import 'dart:convert';
9 import 'dart:io'; 9 import 'dart:io';
10 10
11 import 'package:analysis_server/src/protocol.dart'; 11 import 'package:analysis_server/src/protocol.dart';
12 12
13 /** 13 /**
14 * The abstract class [ClientCommunicationChannel] defines the behavior of 14 * The abstract class [ClientCommunicationChannel] defines the behavior of
15 * objects that allows an object to send [Request]s to [AnalysisServer] and to 15 * objects that allow a client to send [Request]s to an [AnalysisServer] and to
16 * receive both [Response]s and [Notification]s. 16 * receive both [Response]s and [Notification]s.
17 */ 17 */
18 abstract class ClientCommunicationChannel { 18 abstract class ClientCommunicationChannel {
19
20 /** 19 /**
21 * The stream of notifications from the server. 20 * The stream of notifications from the server.
22 */ 21 */
23 Stream<Notification> notificationStream; 22 Stream<Notification> notificationStream;
24 23
25 /** 24 /**
26 * The stream of responses from the server. 25 * The stream of responses from the server.
27 */ 26 */
28 Stream<Response> responseStream; 27 Stream<Response> responseStream;
29 28
(...skipping 16 matching lines...) Expand all
46 * both [Response]s and [Notification]s. 45 * both [Response]s and [Notification]s.
47 */ 46 */
48 abstract class ServerCommunicationChannel { 47 abstract class ServerCommunicationChannel {
49 /** 48 /**
50 * Listen to the channel for requests. If a request is received, invoke the 49 * Listen to the channel for requests. If a request is received, invoke the
51 * [onRequest] function. If an error is encountered while trying to read from 50 * [onRequest] function. If an error is encountered while trying to read from
52 * the socket, invoke the [onError] function. If the socket is closed by the 51 * the socket, invoke the [onError] function. If the socket is closed by the
53 * client, invoke the [onDone] function. 52 * client, invoke the [onDone] function.
54 * Only one listener is allowed per channel. 53 * Only one listener is allowed per channel.
55 */ 54 */
56 void listen(void onRequest(Request request), {void onError(), void onDone()}); 55 void listen(void onRequest(Request request), {Function onError, void onDone()} );
57 56
58 /** 57 /**
59 * Send the given [notification] to the client. 58 * Send the given [notification] to the client.
60 */ 59 */
61 void sendNotification(Notification notification); 60 void sendNotification(Notification notification);
62 61
63 /** 62 /**
64 * Send the given [response] to the client. 63 * Send the given [response] to the client.
65 */ 64 */
66 void sendResponse(Response response); 65 void sendResponse(Response response);
67 } 66 }
68 67
69 /** 68 /**
70 * Instances of the class [WebSocketClientChannel] implement a 69 * Instances of the class [WebSocketClientChannel] implement a
71 * [ClientCommunicationChannel] that uses a [WebSocket] to communicate with 70 * [ClientCommunicationChannel] that uses a [WebSocket] to communicate with
72 * servers. 71 * servers.
73 */ 72 */
74 class WebSocketClientChannel implements ClientCommunicationChannel { 73 class WebSocketClientChannel implements ClientCommunicationChannel {
75 /** 74 /**
76 * The socket being wrapped. 75 * The socket being wrapped.
77 */ 76 */
78 final WebSocket _socket; 77 final WebSocket socket;
79 78
80 @override 79 @override
81 Stream<Response> responseStream; 80 Stream<Response> responseStream;
82 81
83 @override 82 @override
84 Stream<Notification> notificationStream; 83 Stream<Notification> notificationStream;
85 84
86 /** 85 /**
87 * Initialize a new [WebSocket] wrapper for the given [_socket]. 86 * Initialize a new [WebSocket] wrapper for the given [socket].
88 */ 87 */
89 WebSocketClientChannel(this._socket) { 88 WebSocketClientChannel(this.socket) {
90 Stream jsonStream = _socket 89 Stream jsonStream = socket
91 .where((data) => data is String) 90 .where((data) => data is String)
92 .transform(new _JsonStreamDecoder()) 91 .transform(new JsonStreamDecoder())
93 .where((json) => json is Map) 92 .where((json) => json is Map)
94 .asBroadcastStream(); 93 .asBroadcastStream();
95 responseStream = jsonStream 94 responseStream = jsonStream
96 .where((json) => json[Notification.EVENT] == null) 95 .where((json) => json[Notification.EVENT] == null)
97 .transform(new _ResponseConverter()) 96 .transform(new ResponseConverter())
98 .asBroadcastStream(); 97 .asBroadcastStream();
99 notificationStream = jsonStream 98 notificationStream = jsonStream
100 .where((json) => json[Notification.EVENT] != null) 99 .where((json) => json[Notification.EVENT] != null)
101 .transform(new _NotificationConverter()) 100 .transform(new NotificationConverter())
102 .asBroadcastStream(); 101 .asBroadcastStream();
103 } 102 }
104 103
105 @override 104 @override
106 Future<Response> sendRequest(Request request) { 105 Future<Response> sendRequest(Request request) {
107 String id = request.id; 106 String id = request.id;
108 _socket.add(JSON.encode(request.toJson())); 107 socket.add(JSON.encode(request.toJson()));
109 return responseStream.firstWhere((Response response) => response.id == id); 108 return responseStream.firstWhere((Response response) => response.id == id);
110 } 109 }
111 110
112 @override 111 @override
113 Future close() { 112 Future close() {
114 return _socket.close(); 113 return socket.close();
115 } 114 }
116 } 115 }
117 116
118 /** 117 /**
119 * Instances of the class [WebSocketServerChannel] implement a 118 * Instances of the class [WebSocketServerChannel] implement a
120 * [ServerCommunicationChannel] that uses a [WebSocket] to communicate with 119 * [ServerCommunicationChannel] that uses a [WebSocket] to communicate with
121 * clients. 120 * clients.
122 */ 121 */
123 class WebSocketServerChannel implements ServerCommunicationChannel { 122 class WebSocketServerChannel implements ServerCommunicationChannel {
124 /** 123 /**
125 * The socket being wrapped. 124 * The socket being wrapped.
126 */ 125 */
127 final WebSocket socket; 126 final WebSocket socket;
128 127
129 /** 128 /**
130 * Initialize a newly create [WebSocket] wrapper to wrap the given [socket]. 129 * Initialize a newly create [WebSocket] wrapper to wrap the given [socket].
131 */ 130 */
132 WebSocketServerChannel(this.socket); 131 WebSocketServerChannel(this.socket);
133 132
134 @override 133 @override
135 void listen(void onRequest(Request request), {void onError(), void onDone()}) { 134 void listen(void onRequest(Request request), {void onError(), void onDone()}) {
136 socket.listen((data) => _readRequest(data, onRequest), onError: onError, 135 socket.listen((data) => readRequest(data, onRequest), onError: onError,
137 onDone: onDone); 136 onDone: onDone);
138 } 137 }
139 138
140 @override 139 @override
141 void sendNotification(Notification notification) { 140 void sendNotification(Notification notification) {
142 socket.add(JSON.encode(notification.toJson())); 141 socket.add(JSON.encode(notification.toJson()));
143 } 142 }
144 143
145 @override 144 @override
146 void sendResponse(Response response) { 145 void sendResponse(Response response) {
147 socket.add(JSON.encode(response.toJson())); 146 socket.add(JSON.encode(response.toJson()));
148 } 147 }
149 148
150 /** 149 /**
151 * Read a request from the given [data] and use the given function to handle 150 * Read a request from the given [data] and use the given function to handle
152 * the request. 151 * the request.
153 */ 152 */
154 void _readRequest(Object data, void onRequest(Request request)) { 153 void readRequest(Object data, void onRequest(Request request)) {
155 if (data is List<int>) {
156 sendResponse(new Response.invalidRequestFormat());
157 return;
158 }
159 if (data is String) { 154 if (data is String) {
160 // Parse the string as a JSON descriptor and process the resulting 155 // Parse the string as a JSON descriptor and process the resulting
161 // structure as a request. 156 // structure as a request.
162 Request request = new Request.fromString(data); 157 Request request = new Request.fromString(data);
163 if (request == null) { 158 if (request == null) {
164 sendResponse(new Response.invalidRequestFormat()); 159 sendResponse(new Response.invalidRequestFormat());
165 return; 160 return;
166 } 161 }
167 onRequest(request); 162 onRequest(request);
163 } else if (data is List<int>) {
164 // TODO(brianwilkerson) Implement a more efficient protocol.
165 sendResponse(new Response.invalidRequestFormat());
166 } else {
167 sendResponse(new Response.invalidRequestFormat());
168 } 168 }
169 } 169 }
170 } 170 }
171 171
172 /** 172 /**
173 * Instances of [_JsonStreamDecoder] convert JSON strings to JSON maps 173 * Instances of the class [JsonStreamDecoder] convert JSON strings to JSON
174 * maps.
174 */ 175 */
175 class _JsonStreamDecoder extends Converter<String, Map> { 176 class JsonStreamDecoder extends Converter<String, Map> {
176
177 @override 177 @override
178 Map convert(String text) => JSON.decode(text); 178 Map convert(String text) => JSON.decode(text);
179 179
180 @override 180 @override
181 ChunkedConversionSink startChunkedConversion(Sink sink) => 181 ChunkedConversionSink startChunkedConversion(Sink sink) =>
182 new _ChannelChunkSink<String, Map>(this, sink); 182 new ChannelChunkSink<String, Map>(this, sink);
183 } 183 }
184 184
185 /** 185 /**
186 * Instances of [_ResponseConverter] convert JSON maps to [Response]s. 186 * Instances of the class [ResponseConverter] convert JSON maps to [Response]s.
187 */ 187 */
188 class _ResponseConverter extends Converter<Map, Response> { 188 class ResponseConverter extends Converter<Map, Response> {
189
190 @override 189 @override
191 Response convert(Map json) => new Response.fromJson(json); 190 Response convert(Map json) => new Response.fromJson(json);
192 191
193 @override 192 @override
194 ChunkedConversionSink startChunkedConversion(Sink sink) => 193 ChunkedConversionSink startChunkedConversion(Sink sink) =>
195 new _ChannelChunkSink<Map, Response>(this, sink); 194 new ChannelChunkSink<Map, Response>(this, sink);
196 } 195 }
197 196
198 /** 197 /**
199 * Instances of [_NotificationConverter] convert JSON maps to [Notification]s. 198 * Instances of the class [NotificationConverter] convert JSON maps to
199 * [Notification]s.
200 */ 200 */
201 class _NotificationConverter extends Converter<Map, Notification> { 201 class NotificationConverter extends Converter<Map, Notification> {
202
203 @override 202 @override
204 Notification convert(Map json) => new Notification.fromJson(json); 203 Notification convert(Map json) => new Notification.fromJson(json);
205 204
206 @override 205 @override
207 ChunkedConversionSink startChunkedConversion(Sink sink) => 206 ChunkedConversionSink startChunkedConversion(Sink sink) =>
208 new _ChannelChunkSink<Map, Notification>(this, sink); 207 new ChannelChunkSink<Map, Notification>(this, sink);
209 } 208 }
210 209
211 /** 210 /**
212 * A [_ChannelChunkSink] uses a [Converter] to translate chunks. 211 * Instances of the class [ChannelChunkSink] uses a [Converter] to translate
212 * chunks.
213 */ 213 */
214 class _ChannelChunkSink<S, T> extends ChunkedConversionSink<S> { 214 class ChannelChunkSink<S, T> extends ChunkedConversionSink<S> {
215 final Converter<S, T> _converter; 215 /**
216 final Sink _sink; 216 * The converter used to translate chunks.
217 */
218 final Converter<S, T> converter;
217 219
218 _ChannelChunkSink(this._converter, this._sink); 220 /**
221 * The sink to which the converted chunks are added.
222 */
223 final Sink sink;
224
225 /**
226 * A flag indicating whether the sink has been closed.
227 */
228 bool closed = false;
229
230 /**
231 * Initialize a newly create sink to use the given [converter] to convert
232 * chunks before adding them to the given [sink].
233 */
234 ChannelChunkSink(this.converter, this.sink);
219 235
220 @override 236 @override
221 void add(S chunk) { 237 void add(S chunk) {
222 var convertedChunk = _converter.convert(chunk); 238 if (!closed) {
223 if (convertedChunk != null) { 239 T convertedChunk = converter.convert(chunk);
224 _sink.add(convertedChunk); 240 if (convertedChunk != null) {
241 sink.add(convertedChunk);
242 }
225 } 243 }
226 } 244 }
227 245
228 @override 246 @override
229 void close() => _sink.close(); 247 void close() {
248 closed = true;
249 sink.close();
250 }
230 } 251 }
OLDNEW
« no previous file with comments | « pkg/analysis_server/lib/src/analysis_server.dart ('k') | pkg/analysis_server/lib/src/domain_context.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698