| OLD | NEW |
| 1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 library channel; | 5 library channel; |
| 6 | 6 |
| 7 import 'dart:async'; | 7 import 'dart:async'; |
| 8 import 'dart:convert'; | 8 import 'dart:convert'; |
| 9 import 'dart:io'; | 9 import 'dart:io'; |
| 10 | 10 |
| 11 import 'package:analysis_server/src/protocol.dart'; | 11 import 'package:analysis_server/src/protocol.dart'; |
| 12 | 12 |
| 13 /** | 13 /** |
| 14 * The abstract class [ClientCommunicationChannel] defines the behavior of | 14 * The abstract class [ClientCommunicationChannel] defines the behavior of |
| 15 * objects that allows an object to send [Request]s to [AnalysisServer] and to | 15 * objects that allow a client to send [Request]s to an [AnalysisServer] and to |
| 16 * receive both [Response]s and [Notification]s. | 16 * receive both [Response]s and [Notification]s. |
| 17 */ | 17 */ |
| 18 abstract class ClientCommunicationChannel { | 18 abstract class ClientCommunicationChannel { |
| 19 | |
| 20 /** | 19 /** |
| 21 * The stream of notifications from the server. | 20 * The stream of notifications from the server. |
| 22 */ | 21 */ |
| 23 Stream<Notification> notificationStream; | 22 Stream<Notification> notificationStream; |
| 24 | 23 |
| 25 /** | 24 /** |
| 26 * The stream of responses from the server. | 25 * The stream of responses from the server. |
| 27 */ | 26 */ |
| 28 Stream<Response> responseStream; | 27 Stream<Response> responseStream; |
| 29 | 28 |
| (...skipping 16 matching lines...) Expand all Loading... |
| 46 * both [Response]s and [Notification]s. | 45 * both [Response]s and [Notification]s. |
| 47 */ | 46 */ |
| 48 abstract class ServerCommunicationChannel { | 47 abstract class ServerCommunicationChannel { |
| 49 /** | 48 /** |
| 50 * Listen to the channel for requests. If a request is received, invoke the | 49 * Listen to the channel for requests. If a request is received, invoke the |
| 51 * [onRequest] function. If an error is encountered while trying to read from | 50 * [onRequest] function. If an error is encountered while trying to read from |
| 52 * the socket, invoke the [onError] function. If the socket is closed by the | 51 * the socket, invoke the [onError] function. If the socket is closed by the |
| 53 * client, invoke the [onDone] function. | 52 * client, invoke the [onDone] function. |
| 54 * Only one listener is allowed per channel. | 53 * Only one listener is allowed per channel. |
| 55 */ | 54 */ |
| 56 void listen(void onRequest(Request request), {void onError(), void onDone()}); | 55 void listen(void onRequest(Request request), {Function onError, void onDone()}
); |
| 57 | 56 |
| 58 /** | 57 /** |
| 59 * Send the given [notification] to the client. | 58 * Send the given [notification] to the client. |
| 60 */ | 59 */ |
| 61 void sendNotification(Notification notification); | 60 void sendNotification(Notification notification); |
| 62 | 61 |
| 63 /** | 62 /** |
| 64 * Send the given [response] to the client. | 63 * Send the given [response] to the client. |
| 65 */ | 64 */ |
| 66 void sendResponse(Response response); | 65 void sendResponse(Response response); |
| 67 } | 66 } |
| 68 | 67 |
| 69 /** | 68 /** |
| 70 * Instances of the class [WebSocketClientChannel] implement a | 69 * Instances of the class [WebSocketClientChannel] implement a |
| 71 * [ClientCommunicationChannel] that uses a [WebSocket] to communicate with | 70 * [ClientCommunicationChannel] that uses a [WebSocket] to communicate with |
| 72 * servers. | 71 * servers. |
| 73 */ | 72 */ |
| 74 class WebSocketClientChannel implements ClientCommunicationChannel { | 73 class WebSocketClientChannel implements ClientCommunicationChannel { |
| 75 /** | 74 /** |
| 76 * The socket being wrapped. | 75 * The socket being wrapped. |
| 77 */ | 76 */ |
| 78 final WebSocket _socket; | 77 final WebSocket socket; |
| 79 | 78 |
| 80 @override | 79 @override |
| 81 Stream<Response> responseStream; | 80 Stream<Response> responseStream; |
| 82 | 81 |
| 83 @override | 82 @override |
| 84 Stream<Notification> notificationStream; | 83 Stream<Notification> notificationStream; |
| 85 | 84 |
| 86 /** | 85 /** |
| 87 * Initialize a new [WebSocket] wrapper for the given [_socket]. | 86 * Initialize a new [WebSocket] wrapper for the given [socket]. |
| 88 */ | 87 */ |
| 89 WebSocketClientChannel(this._socket) { | 88 WebSocketClientChannel(this.socket) { |
| 90 Stream jsonStream = _socket | 89 Stream jsonStream = socket |
| 91 .where((data) => data is String) | 90 .where((data) => data is String) |
| 92 .transform(new _JsonStreamDecoder()) | 91 .transform(new JsonStreamDecoder()) |
| 93 .where((json) => json is Map) | 92 .where((json) => json is Map) |
| 94 .asBroadcastStream(); | 93 .asBroadcastStream(); |
| 95 responseStream = jsonStream | 94 responseStream = jsonStream |
| 96 .where((json) => json[Notification.EVENT] == null) | 95 .where((json) => json[Notification.EVENT] == null) |
| 97 .transform(new _ResponseConverter()) | 96 .transform(new ResponseConverter()) |
| 98 .asBroadcastStream(); | 97 .asBroadcastStream(); |
| 99 notificationStream = jsonStream | 98 notificationStream = jsonStream |
| 100 .where((json) => json[Notification.EVENT] != null) | 99 .where((json) => json[Notification.EVENT] != null) |
| 101 .transform(new _NotificationConverter()) | 100 .transform(new NotificationConverter()) |
| 102 .asBroadcastStream(); | 101 .asBroadcastStream(); |
| 103 } | 102 } |
| 104 | 103 |
| 105 @override | 104 @override |
| 106 Future<Response> sendRequest(Request request) { | 105 Future<Response> sendRequest(Request request) { |
| 107 String id = request.id; | 106 String id = request.id; |
| 108 _socket.add(JSON.encode(request.toJson())); | 107 socket.add(JSON.encode(request.toJson())); |
| 109 return responseStream.firstWhere((Response response) => response.id == id); | 108 return responseStream.firstWhere((Response response) => response.id == id); |
| 110 } | 109 } |
| 111 | 110 |
| 112 @override | 111 @override |
| 113 Future close() { | 112 Future close() { |
| 114 return _socket.close(); | 113 return socket.close(); |
| 115 } | 114 } |
| 116 } | 115 } |
| 117 | 116 |
| 118 /** | 117 /** |
| 119 * Instances of the class [WebSocketServerChannel] implement a | 118 * Instances of the class [WebSocketServerChannel] implement a |
| 120 * [ServerCommunicationChannel] that uses a [WebSocket] to communicate with | 119 * [ServerCommunicationChannel] that uses a [WebSocket] to communicate with |
| 121 * clients. | 120 * clients. |
| 122 */ | 121 */ |
| 123 class WebSocketServerChannel implements ServerCommunicationChannel { | 122 class WebSocketServerChannel implements ServerCommunicationChannel { |
| 124 /** | 123 /** |
| 125 * The socket being wrapped. | 124 * The socket being wrapped. |
| 126 */ | 125 */ |
| 127 final WebSocket socket; | 126 final WebSocket socket; |
| 128 | 127 |
| 129 /** | 128 /** |
| 130 * Initialize a newly create [WebSocket] wrapper to wrap the given [socket]. | 129 * Initialize a newly create [WebSocket] wrapper to wrap the given [socket]. |
| 131 */ | 130 */ |
| 132 WebSocketServerChannel(this.socket); | 131 WebSocketServerChannel(this.socket); |
| 133 | 132 |
| 134 @override | 133 @override |
| 135 void listen(void onRequest(Request request), {void onError(), void onDone()})
{ | 134 void listen(void onRequest(Request request), {void onError(), void onDone()})
{ |
| 136 socket.listen((data) => _readRequest(data, onRequest), onError: onError, | 135 socket.listen((data) => readRequest(data, onRequest), onError: onError, |
| 137 onDone: onDone); | 136 onDone: onDone); |
| 138 } | 137 } |
| 139 | 138 |
| 140 @override | 139 @override |
| 141 void sendNotification(Notification notification) { | 140 void sendNotification(Notification notification) { |
| 142 socket.add(JSON.encode(notification.toJson())); | 141 socket.add(JSON.encode(notification.toJson())); |
| 143 } | 142 } |
| 144 | 143 |
| 145 @override | 144 @override |
| 146 void sendResponse(Response response) { | 145 void sendResponse(Response response) { |
| 147 socket.add(JSON.encode(response.toJson())); | 146 socket.add(JSON.encode(response.toJson())); |
| 148 } | 147 } |
| 149 | 148 |
| 150 /** | 149 /** |
| 151 * Read a request from the given [data] and use the given function to handle | 150 * Read a request from the given [data] and use the given function to handle |
| 152 * the request. | 151 * the request. |
| 153 */ | 152 */ |
| 154 void _readRequest(Object data, void onRequest(Request request)) { | 153 void readRequest(Object data, void onRequest(Request request)) { |
| 155 if (data is List<int>) { | |
| 156 sendResponse(new Response.invalidRequestFormat()); | |
| 157 return; | |
| 158 } | |
| 159 if (data is String) { | 154 if (data is String) { |
| 160 // Parse the string as a JSON descriptor and process the resulting | 155 // Parse the string as a JSON descriptor and process the resulting |
| 161 // structure as a request. | 156 // structure as a request. |
| 162 Request request = new Request.fromString(data); | 157 Request request = new Request.fromString(data); |
| 163 if (request == null) { | 158 if (request == null) { |
| 164 sendResponse(new Response.invalidRequestFormat()); | 159 sendResponse(new Response.invalidRequestFormat()); |
| 165 return; | 160 return; |
| 166 } | 161 } |
| 167 onRequest(request); | 162 onRequest(request); |
| 163 } else if (data is List<int>) { |
| 164 // TODO(brianwilkerson) Implement a more efficient protocol. |
| 165 sendResponse(new Response.invalidRequestFormat()); |
| 166 } else { |
| 167 sendResponse(new Response.invalidRequestFormat()); |
| 168 } | 168 } |
| 169 } | 169 } |
| 170 } | 170 } |
| 171 | 171 |
| 172 /** | 172 /** |
| 173 * Instances of [_JsonStreamDecoder] convert JSON strings to JSON maps | 173 * Instances of the class [JsonStreamDecoder] convert JSON strings to JSON |
| 174 * maps. |
| 174 */ | 175 */ |
| 175 class _JsonStreamDecoder extends Converter<String, Map> { | 176 class JsonStreamDecoder extends Converter<String, Map> { |
| 176 | |
| 177 @override | 177 @override |
| 178 Map convert(String text) => JSON.decode(text); | 178 Map convert(String text) => JSON.decode(text); |
| 179 | 179 |
| 180 @override | 180 @override |
| 181 ChunkedConversionSink startChunkedConversion(Sink sink) => | 181 ChunkedConversionSink startChunkedConversion(Sink sink) => |
| 182 new _ChannelChunkSink<String, Map>(this, sink); | 182 new ChannelChunkSink<String, Map>(this, sink); |
| 183 } | 183 } |
| 184 | 184 |
| 185 /** | 185 /** |
| 186 * Instances of [_ResponseConverter] convert JSON maps to [Response]s. | 186 * Instances of the class [ResponseConverter] convert JSON maps to [Response]s. |
| 187 */ | 187 */ |
| 188 class _ResponseConverter extends Converter<Map, Response> { | 188 class ResponseConverter extends Converter<Map, Response> { |
| 189 | |
| 190 @override | 189 @override |
| 191 Response convert(Map json) => new Response.fromJson(json); | 190 Response convert(Map json) => new Response.fromJson(json); |
| 192 | 191 |
| 193 @override | 192 @override |
| 194 ChunkedConversionSink startChunkedConversion(Sink sink) => | 193 ChunkedConversionSink startChunkedConversion(Sink sink) => |
| 195 new _ChannelChunkSink<Map, Response>(this, sink); | 194 new ChannelChunkSink<Map, Response>(this, sink); |
| 196 } | 195 } |
| 197 | 196 |
| 198 /** | 197 /** |
| 199 * Instances of [_NotificationConverter] convert JSON maps to [Notification]s. | 198 * Instances of the class [NotificationConverter] convert JSON maps to |
| 199 * [Notification]s. |
| 200 */ | 200 */ |
| 201 class _NotificationConverter extends Converter<Map, Notification> { | 201 class NotificationConverter extends Converter<Map, Notification> { |
| 202 | |
| 203 @override | 202 @override |
| 204 Notification convert(Map json) => new Notification.fromJson(json); | 203 Notification convert(Map json) => new Notification.fromJson(json); |
| 205 | 204 |
| 206 @override | 205 @override |
| 207 ChunkedConversionSink startChunkedConversion(Sink sink) => | 206 ChunkedConversionSink startChunkedConversion(Sink sink) => |
| 208 new _ChannelChunkSink<Map, Notification>(this, sink); | 207 new ChannelChunkSink<Map, Notification>(this, sink); |
| 209 } | 208 } |
| 210 | 209 |
| 211 /** | 210 /** |
| 212 * A [_ChannelChunkSink] uses a [Converter] to translate chunks. | 211 * Instances of the class [ChannelChunkSink] uses a [Converter] to translate |
| 212 * chunks. |
| 213 */ | 213 */ |
| 214 class _ChannelChunkSink<S, T> extends ChunkedConversionSink<S> { | 214 class ChannelChunkSink<S, T> extends ChunkedConversionSink<S> { |
| 215 final Converter<S, T> _converter; | 215 /** |
| 216 final Sink _sink; | 216 * The converter used to translate chunks. |
| 217 */ |
| 218 final Converter<S, T> converter; |
| 217 | 219 |
| 218 _ChannelChunkSink(this._converter, this._sink); | 220 /** |
| 221 * The sink to which the converted chunks are added. |
| 222 */ |
| 223 final Sink sink; |
| 224 |
| 225 /** |
| 226 * A flag indicating whether the sink has been closed. |
| 227 */ |
| 228 bool closed = false; |
| 229 |
| 230 /** |
| 231 * Initialize a newly create sink to use the given [converter] to convert |
| 232 * chunks before adding them to the given [sink]. |
| 233 */ |
| 234 ChannelChunkSink(this.converter, this.sink); |
| 219 | 235 |
| 220 @override | 236 @override |
| 221 void add(S chunk) { | 237 void add(S chunk) { |
| 222 var convertedChunk = _converter.convert(chunk); | 238 if (!closed) { |
| 223 if (convertedChunk != null) { | 239 T convertedChunk = converter.convert(chunk); |
| 224 _sink.add(convertedChunk); | 240 if (convertedChunk != null) { |
| 241 sink.add(convertedChunk); |
| 242 } |
| 225 } | 243 } |
| 226 } | 244 } |
| 227 | 245 |
| 228 @override | 246 @override |
| 229 void close() => _sink.close(); | 247 void close() { |
| 248 closed = true; |
| 249 sink.close(); |
| 250 } |
| 230 } | 251 } |
| OLD | NEW |