OLD | NEW |
---|---|
1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 library channel; | 5 library channel; |
6 | 6 |
7 import 'dart:async'; | |
7 import 'dart:convert'; | 8 import 'dart:convert'; |
8 import 'dart:io'; | 9 import 'dart:io'; |
9 | 10 |
10 import 'package:analysis_server/src/protocol.dart'; | 11 import 'package:analysis_server/src/protocol.dart'; |
11 | 12 |
12 /** | 13 /** |
13 * The abstract class [ClientCommunicationChannel] defines the behavior of | 14 * The abstract class [ClientCommunicationChannel] defines the behavior of |
14 * objects that allows an object to send [Request]s to [AnalysisServer] and to | 15 * objects that allows an object to send [Request]s to [AnalysisServer] and to |
15 * receive both [Response]s and [Notification]s. | 16 * receive both [Response]s and [Notification]s. |
16 */ | 17 */ |
17 abstract class ClientCommunicationChannel { | 18 abstract class ClientCommunicationChannel { |
18 /** | |
19 * Listen to the channel for responses and notifications. | |
20 * If a response is received, invoke the [onResponse] function. | |
21 * If a notification is received, invoke the [onNotification] function. | |
22 * If an error is encountered while trying to read from | |
23 * the socket, invoke the [onError] function. If the socket is closed by the | |
24 * client, invoke the [onDone] function. | |
25 */ | |
26 void listen(void onResponse(Response response), | |
27 void onNotification(Notification notification), | |
28 {void onError(), void onDone()}); | |
29 | 19 |
30 /** | 20 /** |
31 * Send the given [request] to the server. | 21 * The stream of notifications from the server. |
32 */ | 22 */ |
33 void sendRequest(Request request); | 23 Stream<Notification> notificationStream; |
24 | |
25 /** | |
26 * The stream of responses from the server. | |
27 */ | |
28 Stream<Response> responseStream; | |
29 | |
30 /** | |
31 * Send the given [request] to the server | |
32 * and return a future with the associated [Response]. | |
33 */ | |
34 Future<Response> sendRequest(Request request); | |
35 | |
36 /** | |
37 * Close the channel to the server. Once called, all future communication | |
38 * with the server via [sendRequest] will silently be ignored. | |
39 */ | |
40 void close(); | |
34 } | 41 } |
35 | 42 |
36 /** | 43 /** |
37 * The abstract class [ServerCommunicationChannel] defines the behavior of | 44 * The abstract class [ServerCommunicationChannel] defines the behavior of |
38 * objects that allow an [AnalysisServer] to receive [Request]s and to return | 45 * objects that allow an [AnalysisServer] to receive [Request]s and to return |
39 * both [Response]s and [Notification]s. | 46 * both [Response]s and [Notification]s. |
40 */ | 47 */ |
41 abstract class ServerCommunicationChannel { | 48 abstract class ServerCommunicationChannel { |
42 /** | 49 /** |
43 * Listen to the channel for requests. If a request is received, invoke the | 50 * Listen to the channel for requests. If a request is received, invoke the |
(...skipping 16 matching lines...) Expand all Loading... | |
60 | 67 |
61 /** | 68 /** |
62 * Instances of the class [WebSocketClientChannel] implement a | 69 * Instances of the class [WebSocketClientChannel] implement a |
63 * [ClientCommunicationChannel] that uses a [WebSocket] to communicate with | 70 * [ClientCommunicationChannel] that uses a [WebSocket] to communicate with |
64 * servers. | 71 * servers. |
65 */ | 72 */ |
66 class WebSocketClientChannel implements ClientCommunicationChannel { | 73 class WebSocketClientChannel implements ClientCommunicationChannel { |
67 /** | 74 /** |
68 * The socket being wrapped. | 75 * The socket being wrapped. |
69 */ | 76 */ |
70 final WebSocket socket; | 77 final WebSocket _socket; |
71 | 78 |
72 final JsonEncoder jsonEncoder = const JsonEncoder(null); | 79 @override |
80 Stream<Response> responseStream; | |
73 | 81 |
74 final JsonDecoder jsonDecoder = const JsonDecoder(null); | 82 @override |
83 Stream<Notification> notificationStream; | |
75 | 84 |
76 /** | 85 /** |
77 * Initialize a newly create [WebSocket] wrapper to wrap the given [socket]. | 86 * Initialize a new [WebSocket] wrapper for the given [_socket]. |
78 */ | 87 */ |
79 WebSocketClientChannel(this.socket); | 88 WebSocketClientChannel(this._socket) { |
80 | 89 Stream jsonStream = _socket |
Brian Wilkerson
2014/03/01 17:16:37
Does 'jsonStream' need to be a broadcast stream in
danrubel
2014/03/03 16:52:20
Good catch! The mock stream was incorrectly simula
| |
81 @override | 90 .where((data) => data is String) |
82 void listen(void onResponse(Response response), | 91 .transform(new _JsonConverter()) |
83 void onNotification(Notification notification), | 92 .where((json) => json is Map); |
84 {void onError(), void onDone()}) { | 93 responseStream = jsonStream |
85 socket.listen((data) => _read(data, onResponse, onNotification), | 94 .where((json) => json[Notification.EVENT] == null) |
86 onError: onError, onDone: onDone); | 95 .transform(new _ResponseConverter()) |
96 .asBroadcastStream(); | |
97 notificationStream = jsonStream | |
98 .where((json) => json[Notification.EVENT] != null) | |
99 .transform(new _NotificationConverter()) | |
100 .asBroadcastStream(); | |
87 } | 101 } |
88 | 102 |
89 @override | 103 @override |
90 void sendRequest(Request request) { | 104 Future<Response> sendRequest(Request request) { |
91 socket.add(jsonEncoder.convert(request.toJson())); | 105 String id = request.id; |
106 _socket.add(_JSON_ENCODER.convert(request.toJson())); | |
107 return responseStream.firstWhere((Response response) => response.id == id); | |
92 } | 108 } |
93 | 109 |
94 /** | 110 @override |
95 * Read a request from the given [data] and use the given function to handle | 111 void close() { |
96 * the request. | 112 _socket.close(); |
97 */ | |
98 void _read(Object data, | |
99 void onResponse(Response response), | |
100 void onNotification(Notification notification)) { | |
101 if (data is String) { | |
102 // Parse the string as a JSON descriptor | |
103 var json; | |
104 try { | |
105 json = jsonDecoder.convert(data); | |
106 if (json is! Map) { | |
107 return; | |
108 } | |
109 } catch (error) { | |
110 return; | |
111 } | |
112 // Process the resulting structure as a response or notification. | |
113 if (json[Notification.EVENT] != null) { | |
114 Notification notification = new Notification.fromJson(json); | |
115 if (notification != null) { | |
116 onNotification(notification); | |
117 } | |
118 } else { | |
119 Response response = new Response.fromJson(json); | |
120 if (response != null) { | |
121 onResponse(response); | |
122 } | |
123 } | |
124 } | |
125 } | 113 } |
126 } | 114 } |
127 | 115 |
128 /** | 116 /** |
129 * Instances of the class [WebSocketServerChannel] implement a | 117 * Instances of the class [WebSocketServerChannel] implement a |
130 * [ServerCommunicationChannel] that uses a [WebSocket] to communicate with | 118 * [ServerCommunicationChannel] that uses a [WebSocket] to communicate with |
131 * clients. | 119 * clients. |
132 */ | 120 */ |
133 class WebSocketServerChannel implements ServerCommunicationChannel { | 121 class WebSocketServerChannel implements ServerCommunicationChannel { |
134 /** | 122 /** |
135 * The socket being wrapped. | 123 * The socket being wrapped. |
136 */ | 124 */ |
137 final WebSocket socket; | 125 final WebSocket socket; |
138 | 126 |
139 final JsonEncoder jsonEncoder = const JsonEncoder(null); | |
140 | |
141 /** | 127 /** |
142 * Initialize a newly create [WebSocket] wrapper to wrap the given [socket]. | 128 * Initialize a newly create [WebSocket] wrapper to wrap the given [socket]. |
143 */ | 129 */ |
144 WebSocketServerChannel(this.socket); | 130 WebSocketServerChannel(this.socket); |
145 | 131 |
146 @override | 132 @override |
147 void listen(void onRequest(Request request), {void onError(), void onDone()}) { | 133 void listen(void onRequest(Request request), {void onError(), void onDone()}) { |
148 socket.listen((data) => _readRequest(data, onRequest), onError: onError, | 134 socket.listen((data) => _readRequest(data, onRequest), onError: onError, |
149 onDone: onDone); | 135 onDone: onDone); |
150 } | 136 } |
151 | 137 |
152 @override | 138 @override |
153 void sendNotification(Notification notification) { | 139 void sendNotification(Notification notification) { |
154 socket.add(jsonEncoder.convert(notification.toJson())); | 140 socket.add(_JSON_ENCODER.convert(notification.toJson())); |
155 } | 141 } |
156 | 142 |
157 @override | 143 @override |
158 void sendResponse(Response response) { | 144 void sendResponse(Response response) { |
159 socket.add(jsonEncoder.convert(response.toJson())); | 145 socket.add(_JSON_ENCODER.convert(response.toJson())); |
160 } | 146 } |
161 | 147 |
162 /** | 148 /** |
163 * Read a request from the given [data] and use the given function to handle | 149 * Read a request from the given [data] and use the given function to handle |
164 * the request. | 150 * the request. |
165 */ | 151 */ |
166 void _readRequest(Object data, void onRequest(Request request)) { | 152 void _readRequest(Object data, void onRequest(Request request)) { |
167 if (data is List<int>) { | 153 if (data is List<int>) { |
168 sendResponse(new Response.invalidRequestFormat()); | 154 sendResponse(new Response.invalidRequestFormat()); |
169 return; | 155 return; |
170 } | 156 } |
171 if (data is String) { | 157 if (data is String) { |
172 // Parse the string as a JSON descriptor and process the resulting | 158 // Parse the string as a JSON descriptor and process the resulting |
173 // structure as a request. | 159 // structure as a request. |
174 Request request = new Request.fromString(data); | 160 Request request = new Request.fromString(data); |
175 if (request == null) { | 161 if (request == null) { |
176 sendResponse(new Response.invalidRequestFormat()); | 162 sendResponse(new Response.invalidRequestFormat()); |
177 return; | 163 return; |
178 } | 164 } |
179 onRequest(request); | 165 onRequest(request); |
180 } | 166 } |
181 } | 167 } |
182 } | 168 } |
169 | |
170 const _JSON_DECODER = const JsonDecoder(null); | |
171 | |
172 const _JSON_ENCODER = const JsonEncoder(null); | |
173 | |
174 /** | |
175 * Instances of [_JsonConverter] convert JSON strings to JSON maps | |
176 */ | |
177 class _JsonConverter extends Converter<String, Map> { | |
178 | |
179 @override | |
180 Map convert(String text) => _JSON_DECODER.convert(text); | |
181 | |
182 @override | |
183 ChunkedConversionSink startChunkedConversion(ChunkedConversionSink sink) => | |
184 new _ChannelChunkSink<String, Map>(this, sink); | |
185 } | |
186 | |
187 /** | |
188 * Instances of [_ResponseConverter] convert JSON maps to [Response]s. | |
189 */ | |
190 class _ResponseConverter extends Converter<Map, Response> { | |
191 | |
192 @override | |
193 Response convert(Map json) => new Response.fromJson(json); | |
194 | |
195 @override | |
196 ChunkedConversionSink startChunkedConversion(ChunkedConversionSink sink) => | |
197 new _ChannelChunkSink<Map, Response>(this, sink); | |
198 } | |
199 | |
200 /** | |
201 * Instances of [_NotificationConverter] convert JSON maps to [Notification]s. | |
202 */ | |
203 class _NotificationConverter extends Converter<Map, Notification> { | |
204 | |
205 @override | |
206 Notification convert(Map json) => new Notification.fromJson(json); | |
207 | |
208 @override | |
209 ChunkedConversionSink startChunkedConversion(ChunkedConversionSink sink) => | |
210 new _ChannelChunkSink<Map, Notification>(this, sink); | |
211 } | |
212 | |
213 /** | |
214 * A [_ChannelChunkSink] uses a [Convter] to translate chunks. | |
215 */ | |
216 class _ChannelChunkSink<S, T> extends ChunkedConversionSink<S> { | |
217 final Converter<S, T> _converter; | |
218 final ChunkedConversionSink _chunkedSink; | |
219 | |
220 _ChannelChunkSink(this._converter, this._chunkedSink); | |
221 | |
222 @override | |
223 void add(S chunk) { | |
224 var convertedChunk = _converter.convert(chunk); | |
225 if (convertedChunk != null) { | |
226 _chunkedSink.add(convertedChunk); | |
227 } | |
228 } | |
229 | |
230 @override | |
231 void close() => _chunkedSink.close(); | |
232 } | |
OLD | NEW |