Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 library channel; | 5 library channel; |
| 6 | 6 |
| 7 import 'dart:async'; | |
| 7 import 'dart:convert'; | 8 import 'dart:convert'; |
| 8 import 'dart:io'; | 9 import 'dart:io'; |
| 9 | 10 |
| 10 import 'package:analysis_server/src/protocol.dart'; | 11 import 'package:analysis_server/src/protocol.dart'; |
| 11 | 12 |
| 12 /** | 13 /** |
| 13 * The abstract class [ClientCommunicationChannel] defines the behavior of | 14 * The abstract class [ClientCommunicationChannel] defines the behavior of |
| 14 * objects that allows an object to send [Request]s to [AnalysisServer] and to | 15 * objects that allows an object to send [Request]s to [AnalysisServer] and to |
| 15 * receive both [Response]s and [Notification]s. | 16 * receive both [Response]s and [Notification]s. |
| 16 */ | 17 */ |
| 17 abstract class ClientCommunicationChannel { | 18 abstract class ClientCommunicationChannel { |
| 18 /** | |
| 19 * Listen to the channel for responses and notifications. | |
| 20 * If a response is received, invoke the [onResponse] function. | |
| 21 * If a notification is received, invoke the [onNotification] function. | |
| 22 * If an error is encountered while trying to read from | |
| 23 * the socket, invoke the [onError] function. If the socket is closed by the | |
| 24 * client, invoke the [onDone] function. | |
| 25 */ | |
| 26 void listen(void onResponse(Response response), | |
| 27 void onNotification(Notification notification), | |
| 28 {void onError(), void onDone()}); | |
| 29 | 19 |
| 30 /** | 20 /** |
| 31 * Send the given [request] to the server. | 21 * The stream of notifications from the server. |
| 32 */ | 22 */ |
| 33 void sendRequest(Request request); | 23 Stream<Notification> notificationStream; |
| 24 | |
| 25 /** | |
| 26 * The stream of responses from the server. | |
| 27 */ | |
| 28 Stream<Response> responseStream; | |
| 29 | |
| 30 /** | |
| 31 * Send the given [request] to the server | |
| 32 * and return a future with the associated [Response]. | |
| 33 */ | |
| 34 Future<Response> sendRequest(Request request); | |
| 35 | |
| 36 /** | |
| 37 * Close the channel to the server. Once called, all future communication | |
| 38 * with the server via [sendRequest] will silently be ignored. | |
| 39 */ | |
| 40 void close(); | |
| 34 } | 41 } |
| 35 | 42 |
| 36 /** | 43 /** |
| 37 * The abstract class [ServerCommunicationChannel] defines the behavior of | 44 * The abstract class [ServerCommunicationChannel] defines the behavior of |
| 38 * objects that allow an [AnalysisServer] to receive [Request]s and to return | 45 * objects that allow an [AnalysisServer] to receive [Request]s and to return |
| 39 * both [Response]s and [Notification]s. | 46 * both [Response]s and [Notification]s. |
| 40 */ | 47 */ |
| 41 abstract class ServerCommunicationChannel { | 48 abstract class ServerCommunicationChannel { |
| 42 /** | 49 /** |
| 43 * Listen to the channel for requests. If a request is received, invoke the | 50 * Listen to the channel for requests. If a request is received, invoke the |
| (...skipping 16 matching lines...) Expand all Loading... | |
| 60 | 67 |
| 61 /** | 68 /** |
| 62 * Instances of the class [WebSocketClientChannel] implement a | 69 * Instances of the class [WebSocketClientChannel] implement a |
| 63 * [ClientCommunicationChannel] that uses a [WebSocket] to communicate with | 70 * [ClientCommunicationChannel] that uses a [WebSocket] to communicate with |
| 64 * servers. | 71 * servers. |
| 65 */ | 72 */ |
| 66 class WebSocketClientChannel implements ClientCommunicationChannel { | 73 class WebSocketClientChannel implements ClientCommunicationChannel { |
| 67 /** | 74 /** |
| 68 * The socket being wrapped. | 75 * The socket being wrapped. |
| 69 */ | 76 */ |
| 70 final WebSocket socket; | 77 final WebSocket _socket; |
| 71 | 78 |
| 72 final JsonEncoder jsonEncoder = const JsonEncoder(null); | 79 @override |
| 80 Stream<Response> responseStream; | |
| 73 | 81 |
| 74 final JsonDecoder jsonDecoder = const JsonDecoder(null); | 82 @override |
| 83 Stream<Notification> notificationStream; | |
| 75 | 84 |
| 76 /** | 85 /** |
| 77 * Initialize a newly create [WebSocket] wrapper to wrap the given [socket]. | 86 * Initialize a new [WebSocket] wrapper for the given [_socket]. |
| 78 */ | 87 */ |
| 79 WebSocketClientChannel(this.socket); | 88 WebSocketClientChannel(this._socket) { |
| 80 | 89 Stream jsonStream = _socket |
|
Brian Wilkerson
2014/03/01 17:16:37
Does 'jsonStream' need to be a broadcast stream in
danrubel
2014/03/03 16:52:20
Good catch! The mock stream was incorrectly simula
| |
| 81 @override | 90 .where((data) => data is String) |
| 82 void listen(void onResponse(Response response), | 91 .transform(new _JsonConverter()) |
| 83 void onNotification(Notification notification), | 92 .where((json) => json is Map); |
| 84 {void onError(), void onDone()}) { | 93 responseStream = jsonStream |
| 85 socket.listen((data) => _read(data, onResponse, onNotification), | 94 .where((json) => json[Notification.EVENT] == null) |
| 86 onError: onError, onDone: onDone); | 95 .transform(new _ResponseConverter()) |
| 96 .asBroadcastStream(); | |
| 97 notificationStream = jsonStream | |
| 98 .where((json) => json[Notification.EVENT] != null) | |
| 99 .transform(new _NotificationConverter()) | |
| 100 .asBroadcastStream(); | |
| 87 } | 101 } |
| 88 | 102 |
| 89 @override | 103 @override |
| 90 void sendRequest(Request request) { | 104 Future<Response> sendRequest(Request request) { |
| 91 socket.add(jsonEncoder.convert(request.toJson())); | 105 String id = request.id; |
| 106 _socket.add(_JSON_ENCODER.convert(request.toJson())); | |
| 107 return responseStream.firstWhere((Response response) => response.id == id); | |
| 92 } | 108 } |
| 93 | 109 |
| 94 /** | 110 @override |
| 95 * Read a request from the given [data] and use the given function to handle | 111 void close() { |
| 96 * the request. | 112 _socket.close(); |
| 97 */ | |
| 98 void _read(Object data, | |
| 99 void onResponse(Response response), | |
| 100 void onNotification(Notification notification)) { | |
| 101 if (data is String) { | |
| 102 // Parse the string as a JSON descriptor | |
| 103 var json; | |
| 104 try { | |
| 105 json = jsonDecoder.convert(data); | |
| 106 if (json is! Map) { | |
| 107 return; | |
| 108 } | |
| 109 } catch (error) { | |
| 110 return; | |
| 111 } | |
| 112 // Process the resulting structure as a response or notification. | |
| 113 if (json[Notification.EVENT] != null) { | |
| 114 Notification notification = new Notification.fromJson(json); | |
| 115 if (notification != null) { | |
| 116 onNotification(notification); | |
| 117 } | |
| 118 } else { | |
| 119 Response response = new Response.fromJson(json); | |
| 120 if (response != null) { | |
| 121 onResponse(response); | |
| 122 } | |
| 123 } | |
| 124 } | |
| 125 } | 113 } |
| 126 } | 114 } |
| 127 | 115 |
| 128 /** | 116 /** |
| 129 * Instances of the class [WebSocketServerChannel] implement a | 117 * Instances of the class [WebSocketServerChannel] implement a |
| 130 * [ServerCommunicationChannel] that uses a [WebSocket] to communicate with | 118 * [ServerCommunicationChannel] that uses a [WebSocket] to communicate with |
| 131 * clients. | 119 * clients. |
| 132 */ | 120 */ |
| 133 class WebSocketServerChannel implements ServerCommunicationChannel { | 121 class WebSocketServerChannel implements ServerCommunicationChannel { |
| 134 /** | 122 /** |
| 135 * The socket being wrapped. | 123 * The socket being wrapped. |
| 136 */ | 124 */ |
| 137 final WebSocket socket; | 125 final WebSocket socket; |
| 138 | 126 |
| 139 final JsonEncoder jsonEncoder = const JsonEncoder(null); | |
| 140 | |
| 141 /** | 127 /** |
| 142 * Initialize a newly create [WebSocket] wrapper to wrap the given [socket]. | 128 * Initialize a newly create [WebSocket] wrapper to wrap the given [socket]. |
| 143 */ | 129 */ |
| 144 WebSocketServerChannel(this.socket); | 130 WebSocketServerChannel(this.socket); |
| 145 | 131 |
| 146 @override | 132 @override |
| 147 void listen(void onRequest(Request request), {void onError(), void onDone()}) { | 133 void listen(void onRequest(Request request), {void onError(), void onDone()}) { |
| 148 socket.listen((data) => _readRequest(data, onRequest), onError: onError, | 134 socket.listen((data) => _readRequest(data, onRequest), onError: onError, |
| 149 onDone: onDone); | 135 onDone: onDone); |
| 150 } | 136 } |
| 151 | 137 |
| 152 @override | 138 @override |
| 153 void sendNotification(Notification notification) { | 139 void sendNotification(Notification notification) { |
| 154 socket.add(jsonEncoder.convert(notification.toJson())); | 140 socket.add(_JSON_ENCODER.convert(notification.toJson())); |
| 155 } | 141 } |
| 156 | 142 |
| 157 @override | 143 @override |
| 158 void sendResponse(Response response) { | 144 void sendResponse(Response response) { |
| 159 socket.add(jsonEncoder.convert(response.toJson())); | 145 socket.add(_JSON_ENCODER.convert(response.toJson())); |
| 160 } | 146 } |
| 161 | 147 |
| 162 /** | 148 /** |
| 163 * Read a request from the given [data] and use the given function to handle | 149 * Read a request from the given [data] and use the given function to handle |
| 164 * the request. | 150 * the request. |
| 165 */ | 151 */ |
| 166 void _readRequest(Object data, void onRequest(Request request)) { | 152 void _readRequest(Object data, void onRequest(Request request)) { |
| 167 if (data is List<int>) { | 153 if (data is List<int>) { |
| 168 sendResponse(new Response.invalidRequestFormat()); | 154 sendResponse(new Response.invalidRequestFormat()); |
| 169 return; | 155 return; |
| 170 } | 156 } |
| 171 if (data is String) { | 157 if (data is String) { |
| 172 // Parse the string as a JSON descriptor and process the resulting | 158 // Parse the string as a JSON descriptor and process the resulting |
| 173 // structure as a request. | 159 // structure as a request. |
| 174 Request request = new Request.fromString(data); | 160 Request request = new Request.fromString(data); |
| 175 if (request == null) { | 161 if (request == null) { |
| 176 sendResponse(new Response.invalidRequestFormat()); | 162 sendResponse(new Response.invalidRequestFormat()); |
| 177 return; | 163 return; |
| 178 } | 164 } |
| 179 onRequest(request); | 165 onRequest(request); |
| 180 } | 166 } |
| 181 } | 167 } |
| 182 } | 168 } |
| 169 | |
| 170 const _JSON_DECODER = const JsonDecoder(null); | |
| 171 | |
| 172 const _JSON_ENCODER = const JsonEncoder(null); | |
| 173 | |
| 174 /** | |
| 175 * Instances of [_JsonConverter] convert JSON strings to JSON maps | |
| 176 */ | |
| 177 class _JsonConverter extends Converter<String, Map> { | |
| 178 | |
| 179 @override | |
| 180 Map convert(String text) => _JSON_DECODER.convert(text); | |
| 181 | |
| 182 @override | |
| 183 ChunkedConversionSink startChunkedConversion(ChunkedConversionSink sink) => | |
| 184 new _ChannelChunkSink<String, Map>(this, sink); | |
| 185 } | |
| 186 | |
| 187 /** | |
| 188 * Instances of [_ResponseConverter] convert JSON maps to [Response]s. | |
| 189 */ | |
| 190 class _ResponseConverter extends Converter<Map, Response> { | |
| 191 | |
| 192 @override | |
| 193 Response convert(Map json) => new Response.fromJson(json); | |
| 194 | |
| 195 @override | |
| 196 ChunkedConversionSink startChunkedConversion(ChunkedConversionSink sink) => | |
| 197 new _ChannelChunkSink<Map, Response>(this, sink); | |
| 198 } | |
| 199 | |
| 200 /** | |
| 201 * Instances of [_NotificationConverter] convert JSON maps to [Notification]s. | |
| 202 */ | |
| 203 class _NotificationConverter extends Converter<Map, Notification> { | |
| 204 | |
| 205 @override | |
| 206 Notification convert(Map json) => new Notification.fromJson(json); | |
| 207 | |
| 208 @override | |
| 209 ChunkedConversionSink startChunkedConversion(ChunkedConversionSink sink) => | |
| 210 new _ChannelChunkSink<Map, Notification>(this, sink); | |
| 211 } | |
| 212 | |
| 213 /** | |
| 214 * A [_ChannelChunkSink] uses a [Convter] to translate chunks. | |
| 215 */ | |
| 216 class _ChannelChunkSink<S, T> extends ChunkedConversionSink<S> { | |
| 217 final Converter<S, T> _converter; | |
| 218 final ChunkedConversionSink _chunkedSink; | |
| 219 | |
| 220 _ChannelChunkSink(this._converter, this._chunkedSink); | |
| 221 | |
| 222 @override | |
| 223 void add(S chunk) { | |
| 224 var convertedChunk = _converter.convert(chunk); | |
| 225 if (convertedChunk != null) { | |
| 226 _chunkedSink.add(convertedChunk); | |
| 227 } | |
| 228 } | |
| 229 | |
| 230 @override | |
| 231 void close() => _chunkedSink.close(); | |
| 232 } | |
| OLD | NEW |