| Index: pkg/analysis_server/lib/src/channel.dart
|
| diff --git a/pkg/analysis_server/lib/src/channel.dart b/pkg/analysis_server/lib/src/channel.dart
|
| index 7fcaa12c0e571b7aaf6da40bfd0fee1c8753eb29..6d60abda994f03cd79fb160437023509c103e269 100644
|
| --- a/pkg/analysis_server/lib/src/channel.dart
|
| +++ b/pkg/analysis_server/lib/src/channel.dart
|
| @@ -12,11 +12,10 @@ import 'package:analysis_server/src/protocol.dart';
|
|
|
| /**
|
| * The abstract class [ClientCommunicationChannel] defines the behavior of
|
| - * objects that allows an object to send [Request]s to [AnalysisServer] and to
|
| + * objects that allow a client to send [Request]s to an [AnalysisServer] and to
|
| * receive both [Response]s and [Notification]s.
|
| */
|
| abstract class ClientCommunicationChannel {
|
| -
|
| /**
|
| * The stream of notifications from the server.
|
| */
|
| @@ -53,7 +52,7 @@ abstract class ServerCommunicationChannel {
|
| * client, invoke the [onDone] function.
|
| * Only one listener is allowed per channel.
|
| */
|
| - void listen(void onRequest(Request request), {void onError(), void onDone()});
|
| + void listen(void onRequest(Request request), {Function onError, void onDone()});
|
|
|
| /**
|
| * Send the given [notification] to the client.
|
| @@ -75,7 +74,7 @@ class WebSocketClientChannel implements ClientCommunicationChannel {
|
| /**
|
| * The socket being wrapped.
|
| */
|
| - final WebSocket _socket;
|
| + final WebSocket socket;
|
|
|
| @override
|
| Stream<Response> responseStream;
|
| @@ -84,34 +83,34 @@ class WebSocketClientChannel implements ClientCommunicationChannel {
|
| Stream<Notification> notificationStream;
|
|
|
| /**
|
| - * Initialize a new [WebSocket] wrapper for the given [_socket].
|
| + * Initialize a new [WebSocket] wrapper for the given [socket].
|
| */
|
| - WebSocketClientChannel(this._socket) {
|
| - Stream jsonStream = _socket
|
| + WebSocketClientChannel(this.socket) {
|
| + Stream jsonStream = socket
|
| .where((data) => data is String)
|
| - .transform(new _JsonStreamDecoder())
|
| + .transform(new JsonStreamDecoder())
|
| .where((json) => json is Map)
|
| .asBroadcastStream();
|
| responseStream = jsonStream
|
| .where((json) => json[Notification.EVENT] == null)
|
| - .transform(new _ResponseConverter())
|
| + .transform(new ResponseConverter())
|
| .asBroadcastStream();
|
| notificationStream = jsonStream
|
| .where((json) => json[Notification.EVENT] != null)
|
| - .transform(new _NotificationConverter())
|
| + .transform(new NotificationConverter())
|
| .asBroadcastStream();
|
| }
|
|
|
| @override
|
| Future<Response> sendRequest(Request request) {
|
| String id = request.id;
|
| - _socket.add(JSON.encode(request.toJson()));
|
| + socket.add(JSON.encode(request.toJson()));
|
| return responseStream.firstWhere((Response response) => response.id == id);
|
| }
|
|
|
| @override
|
| Future close() {
|
| - return _socket.close();
|
| + return socket.close();
|
| }
|
| }
|
|
|
| @@ -133,7 +132,7 @@ class WebSocketServerChannel implements ServerCommunicationChannel {
|
|
|
| @override
|
| void listen(void onRequest(Request request), {void onError(), void onDone()}) {
|
| - socket.listen((data) => _readRequest(data, onRequest), onError: onError,
|
| + socket.listen((data) => readRequest(data, onRequest), onError: onError,
|
| onDone: onDone);
|
| }
|
|
|
| @@ -151,11 +150,7 @@ class WebSocketServerChannel implements ServerCommunicationChannel {
|
| * Read a request from the given [data] and use the given function to handle
|
| * the request.
|
| */
|
| - void _readRequest(Object data, void onRequest(Request request)) {
|
| - if (data is List<int>) {
|
| - sendResponse(new Response.invalidRequestFormat());
|
| - return;
|
| - }
|
| + void readRequest(Object data, void onRequest(Request request)) {
|
| if (data is String) {
|
| // Parse the string as a JSON descriptor and process the resulting
|
| // structure as a request.
|
| @@ -165,66 +160,92 @@ class WebSocketServerChannel implements ServerCommunicationChannel {
|
| return;
|
| }
|
| onRequest(request);
|
| + } else if (data is List<int>) {
|
| + // TODO(brianwilkerson) Implement a more efficient protocol.
|
| + sendResponse(new Response.invalidRequestFormat());
|
| + } else {
|
| + sendResponse(new Response.invalidRequestFormat());
|
| }
|
| }
|
| }
|
|
|
| /**
|
| - * Instances of [_JsonStreamDecoder] convert JSON strings to JSON maps
|
| + * Instances of the class [JsonStreamDecoder] convert JSON strings to JSON
|
| + * maps.
|
| */
|
| -class _JsonStreamDecoder extends Converter<String, Map> {
|
| -
|
| +class JsonStreamDecoder extends Converter<String, Map> {
|
| @override
|
| Map convert(String text) => JSON.decode(text);
|
|
|
| @override
|
| ChunkedConversionSink startChunkedConversion(Sink sink) =>
|
| - new _ChannelChunkSink<String, Map>(this, sink);
|
| + new ChannelChunkSink<String, Map>(this, sink);
|
| }
|
|
|
| /**
|
| - * Instances of [_ResponseConverter] convert JSON maps to [Response]s.
|
| + * Instances of the class [ResponseConverter] convert JSON maps to [Response]s.
|
| */
|
| -class _ResponseConverter extends Converter<Map, Response> {
|
| -
|
| +class ResponseConverter extends Converter<Map, Response> {
|
| @override
|
| Response convert(Map json) => new Response.fromJson(json);
|
|
|
| @override
|
| ChunkedConversionSink startChunkedConversion(Sink sink) =>
|
| - new _ChannelChunkSink<Map, Response>(this, sink);
|
| + new ChannelChunkSink<Map, Response>(this, sink);
|
| }
|
|
|
| /**
|
| - * Instances of [_NotificationConverter] convert JSON maps to [Notification]s.
|
| + * Instances of the class [NotificationConverter] convert JSON maps to
|
| + * [Notification]s.
|
| */
|
| -class _NotificationConverter extends Converter<Map, Notification> {
|
| -
|
| +class NotificationConverter extends Converter<Map, Notification> {
|
| @override
|
| Notification convert(Map json) => new Notification.fromJson(json);
|
|
|
| @override
|
| ChunkedConversionSink startChunkedConversion(Sink sink) =>
|
| - new _ChannelChunkSink<Map, Notification>(this, sink);
|
| + new ChannelChunkSink<Map, Notification>(this, sink);
|
| }
|
|
|
| /**
|
| - * A [_ChannelChunkSink] uses a [Converter] to translate chunks.
|
| + * Instances of the class [ChannelChunkSink] uses a [Converter] to translate
|
| + * chunks.
|
| */
|
| -class _ChannelChunkSink<S, T> extends ChunkedConversionSink<S> {
|
| - final Converter<S, T> _converter;
|
| - final Sink _sink;
|
| +class ChannelChunkSink<S, T> extends ChunkedConversionSink<S> {
|
| + /**
|
| + * The converter used to translate chunks.
|
| + */
|
| + final Converter<S, T> converter;
|
|
|
| - _ChannelChunkSink(this._converter, this._sink);
|
| + /**
|
| + * The sink to which the converted chunks are added.
|
| + */
|
| + final Sink sink;
|
| +
|
| + /**
|
| + * A flag indicating whether the sink has been closed.
|
| + */
|
| + bool closed = false;
|
| +
|
| + /**
|
| + * Initialize a newly create sink to use the given [converter] to convert
|
| + * chunks before adding them to the given [sink].
|
| + */
|
| + ChannelChunkSink(this.converter, this.sink);
|
|
|
| @override
|
| void add(S chunk) {
|
| - var convertedChunk = _converter.convert(chunk);
|
| - if (convertedChunk != null) {
|
| - _sink.add(convertedChunk);
|
| + if (!closed) {
|
| + T convertedChunk = converter.convert(chunk);
|
| + if (convertedChunk != null) {
|
| + sink.add(convertedChunk);
|
| + }
|
| }
|
| }
|
|
|
| @override
|
| - void close() => _sink.close();
|
| + void close() {
|
| + closed = true;
|
| + sink.close();
|
| + }
|
| }
|
|
|