Index: pkg/analysis_server/lib/src/channel.dart |
diff --git a/pkg/analysis_server/lib/src/channel.dart b/pkg/analysis_server/lib/src/channel.dart |
index 7fcaa12c0e571b7aaf6da40bfd0fee1c8753eb29..6d60abda994f03cd79fb160437023509c103e269 100644 |
--- a/pkg/analysis_server/lib/src/channel.dart |
+++ b/pkg/analysis_server/lib/src/channel.dart |
@@ -12,11 +12,10 @@ import 'package:analysis_server/src/protocol.dart'; |
/** |
* The abstract class [ClientCommunicationChannel] defines the behavior of |
- * objects that allows an object to send [Request]s to [AnalysisServer] and to |
+ * objects that allow a client to send [Request]s to an [AnalysisServer] and to |
* receive both [Response]s and [Notification]s. |
*/ |
abstract class ClientCommunicationChannel { |
- |
/** |
* The stream of notifications from the server. |
*/ |
@@ -53,7 +52,7 @@ abstract class ServerCommunicationChannel { |
* client, invoke the [onDone] function. |
* Only one listener is allowed per channel. |
*/ |
- void listen(void onRequest(Request request), {void onError(), void onDone()}); |
+ void listen(void onRequest(Request request), {Function onError, void onDone()}); |
/** |
* Send the given [notification] to the client. |
@@ -75,7 +74,7 @@ class WebSocketClientChannel implements ClientCommunicationChannel { |
/** |
* The socket being wrapped. |
*/ |
- final WebSocket _socket; |
+ final WebSocket socket; |
@override |
Stream<Response> responseStream; |
@@ -84,34 +83,34 @@ class WebSocketClientChannel implements ClientCommunicationChannel { |
Stream<Notification> notificationStream; |
/** |
- * Initialize a new [WebSocket] wrapper for the given [_socket]. |
+ * Initialize a new [WebSocket] wrapper for the given [socket]. |
*/ |
- WebSocketClientChannel(this._socket) { |
- Stream jsonStream = _socket |
+ WebSocketClientChannel(this.socket) { |
+ Stream jsonStream = socket |
.where((data) => data is String) |
- .transform(new _JsonStreamDecoder()) |
+ .transform(new JsonStreamDecoder()) |
.where((json) => json is Map) |
.asBroadcastStream(); |
responseStream = jsonStream |
.where((json) => json[Notification.EVENT] == null) |
- .transform(new _ResponseConverter()) |
+ .transform(new ResponseConverter()) |
.asBroadcastStream(); |
notificationStream = jsonStream |
.where((json) => json[Notification.EVENT] != null) |
- .transform(new _NotificationConverter()) |
+ .transform(new NotificationConverter()) |
.asBroadcastStream(); |
} |
@override |
Future<Response> sendRequest(Request request) { |
String id = request.id; |
- _socket.add(JSON.encode(request.toJson())); |
+ socket.add(JSON.encode(request.toJson())); |
return responseStream.firstWhere((Response response) => response.id == id); |
} |
@override |
Future close() { |
- return _socket.close(); |
+ return socket.close(); |
} |
} |
@@ -133,7 +132,7 @@ class WebSocketServerChannel implements ServerCommunicationChannel { |
@override |
void listen(void onRequest(Request request), {void onError(), void onDone()}) { |
- socket.listen((data) => _readRequest(data, onRequest), onError: onError, |
+ socket.listen((data) => readRequest(data, onRequest), onError: onError, |
onDone: onDone); |
} |
@@ -151,11 +150,7 @@ class WebSocketServerChannel implements ServerCommunicationChannel { |
* Read a request from the given [data] and use the given function to handle |
* the request. |
*/ |
- void _readRequest(Object data, void onRequest(Request request)) { |
- if (data is List<int>) { |
- sendResponse(new Response.invalidRequestFormat()); |
- return; |
- } |
+ void readRequest(Object data, void onRequest(Request request)) { |
if (data is String) { |
// Parse the string as a JSON descriptor and process the resulting |
// structure as a request. |
@@ -165,66 +160,92 @@ class WebSocketServerChannel implements ServerCommunicationChannel { |
return; |
} |
onRequest(request); |
+ } else if (data is List<int>) { |
+ // TODO(brianwilkerson) Implement a more efficient protocol. |
+ sendResponse(new Response.invalidRequestFormat()); |
+ } else { |
+ sendResponse(new Response.invalidRequestFormat()); |
} |
} |
} |
/** |
- * Instances of [_JsonStreamDecoder] convert JSON strings to JSON maps |
+ * Instances of the class [JsonStreamDecoder] convert JSON strings to JSON |
+ * maps. |
*/ |
-class _JsonStreamDecoder extends Converter<String, Map> { |
- |
+class JsonStreamDecoder extends Converter<String, Map> { |
@override |
Map convert(String text) => JSON.decode(text); |
@override |
ChunkedConversionSink startChunkedConversion(Sink sink) => |
- new _ChannelChunkSink<String, Map>(this, sink); |
+ new ChannelChunkSink<String, Map>(this, sink); |
} |
/** |
- * Instances of [_ResponseConverter] convert JSON maps to [Response]s. |
+ * Instances of the class [ResponseConverter] convert JSON maps to [Response]s. |
*/ |
-class _ResponseConverter extends Converter<Map, Response> { |
- |
+class ResponseConverter extends Converter<Map, Response> { |
@override |
Response convert(Map json) => new Response.fromJson(json); |
@override |
ChunkedConversionSink startChunkedConversion(Sink sink) => |
- new _ChannelChunkSink<Map, Response>(this, sink); |
+ new ChannelChunkSink<Map, Response>(this, sink); |
} |
/** |
- * Instances of [_NotificationConverter] convert JSON maps to [Notification]s. |
+ * Instances of the class [NotificationConverter] convert JSON maps to |
+ * [Notification]s. |
*/ |
-class _NotificationConverter extends Converter<Map, Notification> { |
- |
+class NotificationConverter extends Converter<Map, Notification> { |
@override |
Notification convert(Map json) => new Notification.fromJson(json); |
@override |
ChunkedConversionSink startChunkedConversion(Sink sink) => |
- new _ChannelChunkSink<Map, Notification>(this, sink); |
+ new ChannelChunkSink<Map, Notification>(this, sink); |
} |
/** |
- * A [_ChannelChunkSink] uses a [Converter] to translate chunks. |
+ * Instances of the class [ChannelChunkSink] uses a [Converter] to translate |
+ * chunks. |
*/ |
-class _ChannelChunkSink<S, T> extends ChunkedConversionSink<S> { |
- final Converter<S, T> _converter; |
- final Sink _sink; |
+class ChannelChunkSink<S, T> extends ChunkedConversionSink<S> { |
+ /** |
+ * The converter used to translate chunks. |
+ */ |
+ final Converter<S, T> converter; |
- _ChannelChunkSink(this._converter, this._sink); |
+ /** |
+ * The sink to which the converted chunks are added. |
+ */ |
+ final Sink sink; |
+ |
+ /** |
+ * A flag indicating whether the sink has been closed. |
+ */ |
+ bool closed = false; |
+ |
+ /** |
+ * Initialize a newly create sink to use the given [converter] to convert |
+ * chunks before adding them to the given [sink]. |
+ */ |
+ ChannelChunkSink(this.converter, this.sink); |
@override |
void add(S chunk) { |
- var convertedChunk = _converter.convert(chunk); |
- if (convertedChunk != null) { |
- _sink.add(convertedChunk); |
+ if (!closed) { |
+ T convertedChunk = converter.convert(chunk); |
+ if (convertedChunk != null) { |
+ sink.add(convertedChunk); |
+ } |
} |
} |
@override |
- void close() => _sink.close(); |
+ void close() { |
+ closed = true; |
+ sink.close(); |
+ } |
} |