OLD | NEW |
1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 library channel; | 5 library channel; |
6 | 6 |
| 7 import 'dart:async'; |
7 import 'dart:convert'; | 8 import 'dart:convert'; |
8 import 'dart:io'; | 9 import 'dart:io'; |
9 | 10 |
10 import 'package:analysis_server/src/protocol.dart'; | 11 import 'package:analysis_server/src/protocol.dart'; |
11 | 12 |
12 /** | 13 /** |
13 * The abstract class [ClientCommunicationChannel] defines the behavior of | 14 * The abstract class [ClientCommunicationChannel] defines the behavior of |
14 * objects that allows an object to send [Request]s to [AnalysisServer] and to | 15 * objects that allows an object to send [Request]s to [AnalysisServer] and to |
15 * receive both [Response]s and [Notification]s. | 16 * receive both [Response]s and [Notification]s. |
16 */ | 17 */ |
17 abstract class ClientCommunicationChannel { | 18 abstract class ClientCommunicationChannel { |
18 /** | |
19 * Listen to the channel for responses and notifications. | |
20 * If a response is received, invoke the [onResponse] function. | |
21 * If a notification is received, invoke the [onNotification] function. | |
22 * If an error is encountered while trying to read from | |
23 * the socket, invoke the [onError] function. If the socket is closed by the | |
24 * client, invoke the [onDone] function. | |
25 */ | |
26 void listen(void onResponse(Response response), | |
27 void onNotification(Notification notification), | |
28 {void onError(), void onDone()}); | |
29 | 19 |
30 /** | 20 /** |
31 * Send the given [request] to the server. | 21 * The stream of notifications from the server. |
32 */ | 22 */ |
33 void sendRequest(Request request); | 23 Stream<Notification> notificationStream; |
| 24 |
| 25 /** |
| 26 * The stream of responses from the server. |
| 27 */ |
| 28 Stream<Response> responseStream; |
| 29 |
| 30 /** |
| 31 * Send the given [request] to the server |
| 32 * and return a future with the associated [Response]. |
| 33 */ |
| 34 Future<Response> sendRequest(Request request); |
| 35 |
| 36 /** |
| 37 * Close the channel to the server. Once called, all future communication |
| 38 * with the server via [sendRequest] will silently be ignored. |
| 39 */ |
| 40 Future close(); |
34 } | 41 } |
35 | 42 |
36 /** | 43 /** |
37 * The abstract class [ServerCommunicationChannel] defines the behavior of | 44 * The abstract class [ServerCommunicationChannel] defines the behavior of |
38 * objects that allow an [AnalysisServer] to receive [Request]s and to return | 45 * objects that allow an [AnalysisServer] to receive [Request]s and to return |
39 * both [Response]s and [Notification]s. | 46 * both [Response]s and [Notification]s. |
40 */ | 47 */ |
41 abstract class ServerCommunicationChannel { | 48 abstract class ServerCommunicationChannel { |
42 /** | 49 /** |
43 * Listen to the channel for requests. If a request is received, invoke the | 50 * Listen to the channel for requests. If a request is received, invoke the |
44 * [onRequest] function. If an error is encountered while trying to read from | 51 * [onRequest] function. If an error is encountered while trying to read from |
45 * the socket, invoke the [onError] function. If the socket is closed by the | 52 * the socket, invoke the [onError] function. If the socket is closed by the |
46 * client, invoke the [onDone] function. | 53 * client, invoke the [onDone] function. |
| 54 * Only one listener is allowed per channel. |
47 */ | 55 */ |
48 void listen(void onRequest(Request request), {void onError(), void onDone()}); | 56 void listen(void onRequest(Request request), {void onError(), void onDone()}); |
49 | 57 |
50 /** | 58 /** |
51 * Send the given [notification] to the client. | 59 * Send the given [notification] to the client. |
52 */ | 60 */ |
53 void sendNotification(Notification notification); | 61 void sendNotification(Notification notification); |
54 | 62 |
55 /** | 63 /** |
56 * Send the given [response] to the client. | 64 * Send the given [response] to the client. |
57 */ | 65 */ |
58 void sendResponse(Response response); | 66 void sendResponse(Response response); |
59 } | 67 } |
60 | 68 |
61 /** | 69 /** |
62 * Instances of the class [WebSocketClientChannel] implement a | 70 * Instances of the class [WebSocketClientChannel] implement a |
63 * [ClientCommunicationChannel] that uses a [WebSocket] to communicate with | 71 * [ClientCommunicationChannel] that uses a [WebSocket] to communicate with |
64 * servers. | 72 * servers. |
65 */ | 73 */ |
66 class WebSocketClientChannel implements ClientCommunicationChannel { | 74 class WebSocketClientChannel implements ClientCommunicationChannel { |
67 /** | 75 /** |
68 * The socket being wrapped. | 76 * The socket being wrapped. |
69 */ | 77 */ |
70 final WebSocket socket; | 78 final WebSocket _socket; |
71 | 79 |
72 final JsonEncoder jsonEncoder = const JsonEncoder(null); | 80 @override |
| 81 Stream<Response> responseStream; |
73 | 82 |
74 final JsonDecoder jsonDecoder = const JsonDecoder(null); | 83 @override |
| 84 Stream<Notification> notificationStream; |
75 | 85 |
76 /** | 86 /** |
77 * Initialize a newly create [WebSocket] wrapper to wrap the given [socket]. | 87 * Initialize a new [WebSocket] wrapper for the given [_socket]. |
78 */ | 88 */ |
79 WebSocketClientChannel(this.socket); | 89 WebSocketClientChannel(this._socket) { |
80 | 90 Stream jsonStream = _socket |
81 @override | 91 .where((data) => data is String) |
82 void listen(void onResponse(Response response), | 92 .transform(new _JsonStreamDecoder()) |
83 void onNotification(Notification notification), | 93 .where((json) => json is Map) |
84 {void onError(), void onDone()}) { | 94 .asBroadcastStream(); |
85 socket.listen((data) => _read(data, onResponse, onNotification), | 95 responseStream = jsonStream |
86 onError: onError, onDone: onDone); | 96 .where((json) => json[Notification.EVENT] == null) |
| 97 .transform(new _ResponseConverter()) |
| 98 .asBroadcastStream(); |
| 99 notificationStream = jsonStream |
| 100 .where((json) => json[Notification.EVENT] != null) |
| 101 .transform(new _NotificationConverter()) |
| 102 .asBroadcastStream(); |
87 } | 103 } |
88 | 104 |
89 @override | 105 @override |
90 void sendRequest(Request request) { | 106 Future<Response> sendRequest(Request request) { |
91 socket.add(jsonEncoder.convert(request.toJson())); | 107 String id = request.id; |
| 108 _socket.add(JSON.encode(request.toJson())); |
| 109 return responseStream.firstWhere((Response response) => response.id == id); |
92 } | 110 } |
93 | 111 |
94 /** | 112 @override |
95 * Read a request from the given [data] and use the given function to handle | 113 Future close() { |
96 * the request. | 114 return _socket.close(); |
97 */ | |
98 void _read(Object data, | |
99 void onResponse(Response response), | |
100 void onNotification(Notification notification)) { | |
101 if (data is String) { | |
102 // Parse the string as a JSON descriptor | |
103 var json; | |
104 try { | |
105 json = jsonDecoder.convert(data); | |
106 if (json is! Map) { | |
107 return; | |
108 } | |
109 } catch (error) { | |
110 return; | |
111 } | |
112 // Process the resulting structure as a response or notification. | |
113 if (json[Notification.EVENT] != null) { | |
114 Notification notification = new Notification.fromJson(json); | |
115 if (notification != null) { | |
116 onNotification(notification); | |
117 } | |
118 } else { | |
119 Response response = new Response.fromJson(json); | |
120 if (response != null) { | |
121 onResponse(response); | |
122 } | |
123 } | |
124 } | |
125 } | 115 } |
126 } | 116 } |
127 | 117 |
128 /** | 118 /** |
129 * Instances of the class [WebSocketServerChannel] implement a | 119 * Instances of the class [WebSocketServerChannel] implement a |
130 * [ServerCommunicationChannel] that uses a [WebSocket] to communicate with | 120 * [ServerCommunicationChannel] that uses a [WebSocket] to communicate with |
131 * clients. | 121 * clients. |
132 */ | 122 */ |
133 class WebSocketServerChannel implements ServerCommunicationChannel { | 123 class WebSocketServerChannel implements ServerCommunicationChannel { |
134 /** | 124 /** |
135 * The socket being wrapped. | 125 * The socket being wrapped. |
136 */ | 126 */ |
137 final WebSocket socket; | 127 final WebSocket socket; |
138 | 128 |
139 final JsonEncoder jsonEncoder = const JsonEncoder(null); | |
140 | |
141 /** | 129 /** |
142 * Initialize a newly create [WebSocket] wrapper to wrap the given [socket]. | 130 * Initialize a newly create [WebSocket] wrapper to wrap the given [socket]. |
143 */ | 131 */ |
144 WebSocketServerChannel(this.socket); | 132 WebSocketServerChannel(this.socket); |
145 | 133 |
146 @override | 134 @override |
147 void listen(void onRequest(Request request), {void onError(), void onDone()})
{ | 135 void listen(void onRequest(Request request), {void onError(), void onDone()})
{ |
148 socket.listen((data) => _readRequest(data, onRequest), onError: onError, | 136 socket.listen((data) => _readRequest(data, onRequest), onError: onError, |
149 onDone: onDone); | 137 onDone: onDone); |
150 } | 138 } |
151 | 139 |
152 @override | 140 @override |
153 void sendNotification(Notification notification) { | 141 void sendNotification(Notification notification) { |
154 socket.add(jsonEncoder.convert(notification.toJson())); | 142 socket.add(JSON.encode(notification.toJson())); |
155 } | 143 } |
156 | 144 |
157 @override | 145 @override |
158 void sendResponse(Response response) { | 146 void sendResponse(Response response) { |
159 socket.add(jsonEncoder.convert(response.toJson())); | 147 socket.add(JSON.encode(response.toJson())); |
160 } | 148 } |
161 | 149 |
162 /** | 150 /** |
163 * Read a request from the given [data] and use the given function to handle | 151 * Read a request from the given [data] and use the given function to handle |
164 * the request. | 152 * the request. |
165 */ | 153 */ |
166 void _readRequest(Object data, void onRequest(Request request)) { | 154 void _readRequest(Object data, void onRequest(Request request)) { |
167 if (data is List<int>) { | 155 if (data is List<int>) { |
168 sendResponse(new Response.invalidRequestFormat()); | 156 sendResponse(new Response.invalidRequestFormat()); |
169 return; | 157 return; |
170 } | 158 } |
171 if (data is String) { | 159 if (data is String) { |
172 // Parse the string as a JSON descriptor and process the resulting | 160 // Parse the string as a JSON descriptor and process the resulting |
173 // structure as a request. | 161 // structure as a request. |
174 Request request = new Request.fromString(data); | 162 Request request = new Request.fromString(data); |
175 if (request == null) { | 163 if (request == null) { |
176 sendResponse(new Response.invalidRequestFormat()); | 164 sendResponse(new Response.invalidRequestFormat()); |
177 return; | 165 return; |
178 } | 166 } |
179 onRequest(request); | 167 onRequest(request); |
180 } | 168 } |
181 } | 169 } |
182 } | 170 } |
| 171 |
| 172 /** |
| 173 * Instances of [_JsonStreamDecoder] convert JSON strings to JSON maps |
| 174 */ |
| 175 class _JsonStreamDecoder extends Converter<String, Map> { |
| 176 |
| 177 @override |
| 178 Map convert(String text) => JSON.decode(text); |
| 179 |
| 180 @override |
| 181 ChunkedConversionSink startChunkedConversion(ChunkedConversionSink sink) => |
| 182 new _ChannelChunkSink<String, Map>(this, sink); |
| 183 } |
| 184 |
| 185 /** |
| 186 * Instances of [_ResponseConverter] convert JSON maps to [Response]s. |
| 187 */ |
| 188 class _ResponseConverter extends Converter<Map, Response> { |
| 189 |
| 190 @override |
| 191 Response convert(Map json) => new Response.fromJson(json); |
| 192 |
| 193 @override |
| 194 ChunkedConversionSink startChunkedConversion(ChunkedConversionSink sink) => |
| 195 new _ChannelChunkSink<Map, Response>(this, sink); |
| 196 } |
| 197 |
| 198 /** |
| 199 * Instances of [_NotificationConverter] convert JSON maps to [Notification]s. |
| 200 */ |
| 201 class _NotificationConverter extends Converter<Map, Notification> { |
| 202 |
| 203 @override |
| 204 Notification convert(Map json) => new Notification.fromJson(json); |
| 205 |
| 206 @override |
| 207 ChunkedConversionSink startChunkedConversion(ChunkedConversionSink sink) => |
| 208 new _ChannelChunkSink<Map, Notification>(this, sink); |
| 209 } |
| 210 |
| 211 /** |
| 212 * A [_ChannelChunkSink] uses a [Convter] to translate chunks. |
| 213 */ |
| 214 class _ChannelChunkSink<S, T> extends ChunkedConversionSink<S> { |
| 215 final Converter<S, T> _converter; |
| 216 final ChunkedConversionSink _chunkedSink; |
| 217 |
| 218 _ChannelChunkSink(this._converter, this._chunkedSink); |
| 219 |
| 220 @override |
| 221 void add(S chunk) { |
| 222 var convertedChunk = _converter.convert(chunk); |
| 223 if (convertedChunk != null) { |
| 224 _chunkedSink.add(convertedChunk); |
| 225 } |
| 226 } |
| 227 |
| 228 @override |
| 229 void close() => _chunkedSink.close(); |
| 230 } |
OLD | NEW |