OLD | NEW |
1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 library channel; | 5 library channel; |
6 | 6 |
7 import 'dart:async'; | 7 import 'dart:async'; |
8 import 'dart:convert'; | 8 import 'dart:convert'; |
9 import 'dart:io'; | 9 import 'dart:io'; |
10 | 10 |
11 import 'package:analysis_server/src/protocol.dart'; | 11 import 'package:analysis_server/src/protocol.dart'; |
12 | 12 |
13 /** | 13 /** |
14 * The abstract class [ClientCommunicationChannel] defines the behavior of | 14 * The abstract class [ClientCommunicationChannel] defines the behavior of |
15 * objects that allows an object to send [Request]s to [AnalysisServer] and to | 15 * objects that allow a client to send [Request]s to an [AnalysisServer] and to |
16 * receive both [Response]s and [Notification]s. | 16 * receive both [Response]s and [Notification]s. |
17 */ | 17 */ |
18 abstract class ClientCommunicationChannel { | 18 abstract class ClientCommunicationChannel { |
19 | |
20 /** | 19 /** |
21 * The stream of notifications from the server. | 20 * The stream of notifications from the server. |
22 */ | 21 */ |
23 Stream<Notification> notificationStream; | 22 Stream<Notification> notificationStream; |
24 | 23 |
25 /** | 24 /** |
26 * The stream of responses from the server. | 25 * The stream of responses from the server. |
27 */ | 26 */ |
28 Stream<Response> responseStream; | 27 Stream<Response> responseStream; |
29 | 28 |
(...skipping 16 matching lines...) Expand all Loading... |
46 * both [Response]s and [Notification]s. | 45 * both [Response]s and [Notification]s. |
47 */ | 46 */ |
48 abstract class ServerCommunicationChannel { | 47 abstract class ServerCommunicationChannel { |
49 /** | 48 /** |
50 * Listen to the channel for requests. If a request is received, invoke the | 49 * Listen to the channel for requests. If a request is received, invoke the |
51 * [onRequest] function. If an error is encountered while trying to read from | 50 * [onRequest] function. If an error is encountered while trying to read from |
52 * the socket, invoke the [onError] function. If the socket is closed by the | 51 * the socket, invoke the [onError] function. If the socket is closed by the |
53 * client, invoke the [onDone] function. | 52 * client, invoke the [onDone] function. |
54 * Only one listener is allowed per channel. | 53 * Only one listener is allowed per channel. |
55 */ | 54 */ |
56 void listen(void onRequest(Request request), {void onError(), void onDone()}); | 55 void listen(void onRequest(Request request), {Function onError, void onDone()}
); |
57 | 56 |
58 /** | 57 /** |
59 * Send the given [notification] to the client. | 58 * Send the given [notification] to the client. |
60 */ | 59 */ |
61 void sendNotification(Notification notification); | 60 void sendNotification(Notification notification); |
62 | 61 |
63 /** | 62 /** |
64 * Send the given [response] to the client. | 63 * Send the given [response] to the client. |
65 */ | 64 */ |
66 void sendResponse(Response response); | 65 void sendResponse(Response response); |
67 } | 66 } |
68 | 67 |
69 /** | 68 /** |
70 * Instances of the class [WebSocketClientChannel] implement a | 69 * Instances of the class [WebSocketClientChannel] implement a |
71 * [ClientCommunicationChannel] that uses a [WebSocket] to communicate with | 70 * [ClientCommunicationChannel] that uses a [WebSocket] to communicate with |
72 * servers. | 71 * servers. |
73 */ | 72 */ |
74 class WebSocketClientChannel implements ClientCommunicationChannel { | 73 class WebSocketClientChannel implements ClientCommunicationChannel { |
75 /** | 74 /** |
76 * The socket being wrapped. | 75 * The socket being wrapped. |
77 */ | 76 */ |
78 final WebSocket _socket; | 77 final WebSocket socket; |
79 | 78 |
80 @override | 79 @override |
81 Stream<Response> responseStream; | 80 Stream<Response> responseStream; |
82 | 81 |
83 @override | 82 @override |
84 Stream<Notification> notificationStream; | 83 Stream<Notification> notificationStream; |
85 | 84 |
86 /** | 85 /** |
87 * Initialize a new [WebSocket] wrapper for the given [_socket]. | 86 * Initialize a new [WebSocket] wrapper for the given [socket]. |
88 */ | 87 */ |
89 WebSocketClientChannel(this._socket) { | 88 WebSocketClientChannel(this.socket) { |
90 Stream jsonStream = _socket | 89 Stream jsonStream = socket |
91 .where((data) => data is String) | 90 .where((data) => data is String) |
92 .transform(new _JsonStreamDecoder()) | 91 .transform(new JsonStreamDecoder()) |
93 .where((json) => json is Map) | 92 .where((json) => json is Map) |
94 .asBroadcastStream(); | 93 .asBroadcastStream(); |
95 responseStream = jsonStream | 94 responseStream = jsonStream |
96 .where((json) => json[Notification.EVENT] == null) | 95 .where((json) => json[Notification.EVENT] == null) |
97 .transform(new _ResponseConverter()) | 96 .transform(new ResponseConverter()) |
98 .asBroadcastStream(); | 97 .asBroadcastStream(); |
99 notificationStream = jsonStream | 98 notificationStream = jsonStream |
100 .where((json) => json[Notification.EVENT] != null) | 99 .where((json) => json[Notification.EVENT] != null) |
101 .transform(new _NotificationConverter()) | 100 .transform(new NotificationConverter()) |
102 .asBroadcastStream(); | 101 .asBroadcastStream(); |
103 } | 102 } |
104 | 103 |
105 @override | 104 @override |
106 Future<Response> sendRequest(Request request) { | 105 Future<Response> sendRequest(Request request) { |
107 String id = request.id; | 106 String id = request.id; |
108 _socket.add(JSON.encode(request.toJson())); | 107 socket.add(JSON.encode(request.toJson())); |
109 return responseStream.firstWhere((Response response) => response.id == id); | 108 return responseStream.firstWhere((Response response) => response.id == id); |
110 } | 109 } |
111 | 110 |
112 @override | 111 @override |
113 Future close() { | 112 Future close() { |
114 return _socket.close(); | 113 return socket.close(); |
115 } | 114 } |
116 } | 115 } |
117 | 116 |
118 /** | 117 /** |
119 * Instances of the class [WebSocketServerChannel] implement a | 118 * Instances of the class [WebSocketServerChannel] implement a |
120 * [ServerCommunicationChannel] that uses a [WebSocket] to communicate with | 119 * [ServerCommunicationChannel] that uses a [WebSocket] to communicate with |
121 * clients. | 120 * clients. |
122 */ | 121 */ |
123 class WebSocketServerChannel implements ServerCommunicationChannel { | 122 class WebSocketServerChannel implements ServerCommunicationChannel { |
124 /** | 123 /** |
125 * The socket being wrapped. | 124 * The socket being wrapped. |
126 */ | 125 */ |
127 final WebSocket socket; | 126 final WebSocket socket; |
128 | 127 |
129 /** | 128 /** |
130 * Initialize a newly create [WebSocket] wrapper to wrap the given [socket]. | 129 * Initialize a newly create [WebSocket] wrapper to wrap the given [socket]. |
131 */ | 130 */ |
132 WebSocketServerChannel(this.socket); | 131 WebSocketServerChannel(this.socket); |
133 | 132 |
134 @override | 133 @override |
135 void listen(void onRequest(Request request), {void onError(), void onDone()})
{ | 134 void listen(void onRequest(Request request), {void onError(), void onDone()})
{ |
136 socket.listen((data) => _readRequest(data, onRequest), onError: onError, | 135 socket.listen((data) => readRequest(data, onRequest), onError: onError, |
137 onDone: onDone); | 136 onDone: onDone); |
138 } | 137 } |
139 | 138 |
140 @override | 139 @override |
141 void sendNotification(Notification notification) { | 140 void sendNotification(Notification notification) { |
142 socket.add(JSON.encode(notification.toJson())); | 141 socket.add(JSON.encode(notification.toJson())); |
143 } | 142 } |
144 | 143 |
145 @override | 144 @override |
146 void sendResponse(Response response) { | 145 void sendResponse(Response response) { |
147 socket.add(JSON.encode(response.toJson())); | 146 socket.add(JSON.encode(response.toJson())); |
148 } | 147 } |
149 | 148 |
150 /** | 149 /** |
151 * Read a request from the given [data] and use the given function to handle | 150 * Read a request from the given [data] and use the given function to handle |
152 * the request. | 151 * the request. |
153 */ | 152 */ |
154 void _readRequest(Object data, void onRequest(Request request)) { | 153 void readRequest(Object data, void onRequest(Request request)) { |
155 if (data is List<int>) { | |
156 sendResponse(new Response.invalidRequestFormat()); | |
157 return; | |
158 } | |
159 if (data is String) { | 154 if (data is String) { |
160 // Parse the string as a JSON descriptor and process the resulting | 155 // Parse the string as a JSON descriptor and process the resulting |
161 // structure as a request. | 156 // structure as a request. |
162 Request request = new Request.fromString(data); | 157 Request request = new Request.fromString(data); |
163 if (request == null) { | 158 if (request == null) { |
164 sendResponse(new Response.invalidRequestFormat()); | 159 sendResponse(new Response.invalidRequestFormat()); |
165 return; | 160 return; |
166 } | 161 } |
167 onRequest(request); | 162 onRequest(request); |
| 163 } else if (data is List<int>) { |
| 164 // TODO(brianwilkerson) Implement a more efficient protocol. |
| 165 sendResponse(new Response.invalidRequestFormat()); |
| 166 } else { |
| 167 sendResponse(new Response.invalidRequestFormat()); |
168 } | 168 } |
169 } | 169 } |
170 } | 170 } |
171 | 171 |
172 /** | 172 /** |
173 * Instances of [_JsonStreamDecoder] convert JSON strings to JSON maps | 173 * Instances of the class [JsonStreamDecoder] convert JSON strings to JSON |
| 174 * maps. |
174 */ | 175 */ |
175 class _JsonStreamDecoder extends Converter<String, Map> { | 176 class JsonStreamDecoder extends Converter<String, Map> { |
176 | |
177 @override | 177 @override |
178 Map convert(String text) => JSON.decode(text); | 178 Map convert(String text) => JSON.decode(text); |
179 | 179 |
180 @override | 180 @override |
181 ChunkedConversionSink startChunkedConversion(Sink sink) => | 181 ChunkedConversionSink startChunkedConversion(Sink sink) => |
182 new _ChannelChunkSink<String, Map>(this, sink); | 182 new ChannelChunkSink<String, Map>(this, sink); |
183 } | 183 } |
184 | 184 |
185 /** | 185 /** |
186 * Instances of [_ResponseConverter] convert JSON maps to [Response]s. | 186 * Instances of the class [ResponseConverter] convert JSON maps to [Response]s. |
187 */ | 187 */ |
188 class _ResponseConverter extends Converter<Map, Response> { | 188 class ResponseConverter extends Converter<Map, Response> { |
189 | |
190 @override | 189 @override |
191 Response convert(Map json) => new Response.fromJson(json); | 190 Response convert(Map json) => new Response.fromJson(json); |
192 | 191 |
193 @override | 192 @override |
194 ChunkedConversionSink startChunkedConversion(Sink sink) => | 193 ChunkedConversionSink startChunkedConversion(Sink sink) => |
195 new _ChannelChunkSink<Map, Response>(this, sink); | 194 new ChannelChunkSink<Map, Response>(this, sink); |
196 } | 195 } |
197 | 196 |
198 /** | 197 /** |
199 * Instances of [_NotificationConverter] convert JSON maps to [Notification]s. | 198 * Instances of the class [NotificationConverter] convert JSON maps to |
| 199 * [Notification]s. |
200 */ | 200 */ |
201 class _NotificationConverter extends Converter<Map, Notification> { | 201 class NotificationConverter extends Converter<Map, Notification> { |
202 | |
203 @override | 202 @override |
204 Notification convert(Map json) => new Notification.fromJson(json); | 203 Notification convert(Map json) => new Notification.fromJson(json); |
205 | 204 |
206 @override | 205 @override |
207 ChunkedConversionSink startChunkedConversion(Sink sink) => | 206 ChunkedConversionSink startChunkedConversion(Sink sink) => |
208 new _ChannelChunkSink<Map, Notification>(this, sink); | 207 new ChannelChunkSink<Map, Notification>(this, sink); |
209 } | 208 } |
210 | 209 |
211 /** | 210 /** |
212 * A [_ChannelChunkSink] uses a [Converter] to translate chunks. | 211 * Instances of the class [ChannelChunkSink] uses a [Converter] to translate |
| 212 * chunks. |
213 */ | 213 */ |
214 class _ChannelChunkSink<S, T> extends ChunkedConversionSink<S> { | 214 class ChannelChunkSink<S, T> extends ChunkedConversionSink<S> { |
215 final Converter<S, T> _converter; | 215 /** |
216 final Sink _sink; | 216 * The converter used to translate chunks. |
| 217 */ |
| 218 final Converter<S, T> converter; |
217 | 219 |
218 _ChannelChunkSink(this._converter, this._sink); | 220 /** |
| 221 * The sink to which the converted chunks are added. |
| 222 */ |
| 223 final Sink sink; |
| 224 |
| 225 /** |
| 226 * A flag indicating whether the sink has been closed. |
| 227 */ |
| 228 bool closed = false; |
| 229 |
| 230 /** |
| 231 * Initialize a newly create sink to use the given [converter] to convert |
| 232 * chunks before adding them to the given [sink]. |
| 233 */ |
| 234 ChannelChunkSink(this.converter, this.sink); |
219 | 235 |
220 @override | 236 @override |
221 void add(S chunk) { | 237 void add(S chunk) { |
222 var convertedChunk = _converter.convert(chunk); | 238 if (!closed) { |
223 if (convertedChunk != null) { | 239 T convertedChunk = converter.convert(chunk); |
224 _sink.add(convertedChunk); | 240 if (convertedChunk != null) { |
| 241 sink.add(convertedChunk); |
| 242 } |
225 } | 243 } |
226 } | 244 } |
227 | 245 |
228 @override | 246 @override |
229 void close() => _sink.close(); | 247 void close() { |
| 248 closed = true; |
| 249 sink.close(); |
| 250 } |
230 } | 251 } |
OLD | NEW |