| Index: pkg/analysis_server/lib/src/channel.dart
|
| diff --git a/pkg/analysis_server/lib/src/channel.dart b/pkg/analysis_server/lib/src/channel.dart
|
| index e9fc95d859b85d8d43737a17fe3ba4c2e51504c3..85203b0bdab74a0c8dcca61d6d644c892d57344b 100644
|
| --- a/pkg/analysis_server/lib/src/channel.dart
|
| +++ b/pkg/analysis_server/lib/src/channel.dart
|
| @@ -4,6 +4,7 @@
|
|
|
| library channel;
|
|
|
| +import 'dart:async';
|
| import 'dart:convert';
|
| import 'dart:io';
|
|
|
| @@ -15,22 +16,28 @@ import 'package:analysis_server/src/protocol.dart';
|
| * receive both [Response]s and [Notification]s.
|
| */
|
| abstract class ClientCommunicationChannel {
|
| +
|
| /**
|
| - * Listen to the channel for responses and notifications.
|
| - * If a response is received, invoke the [onResponse] function.
|
| - * If a notification is received, invoke the [onNotification] function.
|
| - * If an error is encountered while trying to read from
|
| - * the socket, invoke the [onError] function. If the socket is closed by the
|
| - * client, invoke the [onDone] function.
|
| + * The stream of notifications from the server.
|
| + */
|
| + Stream<Notification> notificationStream;
|
| +
|
| + /**
|
| + * The stream of responses from the server.
|
| + */
|
| + Stream<Response> responseStream;
|
| +
|
| + /**
|
| + * Send the given [request] to the server
|
| + * and return a future with the associated [Response].
|
| */
|
| - void listen(void onResponse(Response response),
|
| - void onNotification(Notification notification),
|
| - {void onError(), void onDone()});
|
| + Future<Response> sendRequest(Request request);
|
|
|
| /**
|
| - * Send the given [request] to the server.
|
| + * Close the channel to the server. Once called, all future communication
|
| + * with the server via [sendRequest] will silently be ignored.
|
| */
|
| - void sendRequest(Request request);
|
| + Future close();
|
| }
|
|
|
| /**
|
| @@ -44,6 +51,7 @@ abstract class ServerCommunicationChannel {
|
| * [onRequest] function. If an error is encountered while trying to read from
|
| * the socket, invoke the [onError] function. If the socket is closed by the
|
| * client, invoke the [onDone] function.
|
| + * Only one listener is allowed per channel.
|
| */
|
| void listen(void onRequest(Request request), {void onError(), void onDone()});
|
|
|
| @@ -67,61 +75,43 @@ class WebSocketClientChannel implements ClientCommunicationChannel {
|
| /**
|
| * The socket being wrapped.
|
| */
|
| - final WebSocket socket;
|
| + final WebSocket _socket;
|
|
|
| - final JsonEncoder jsonEncoder = const JsonEncoder(null);
|
| + @override
|
| + Stream<Response> responseStream;
|
|
|
| - final JsonDecoder jsonDecoder = const JsonDecoder(null);
|
| + @override
|
| + Stream<Notification> notificationStream;
|
|
|
| /**
|
| - * Initialize a newly create [WebSocket] wrapper to wrap the given [socket].
|
| + * Initialize a new [WebSocket] wrapper for the given [_socket].
|
| */
|
| - WebSocketClientChannel(this.socket);
|
| -
|
| - @override
|
| - void listen(void onResponse(Response response),
|
| - void onNotification(Notification notification),
|
| - {void onError(), void onDone()}) {
|
| - socket.listen((data) => _read(data, onResponse, onNotification),
|
| - onError: onError, onDone: onDone);
|
| + WebSocketClientChannel(this._socket) {
|
| + Stream jsonStream = _socket
|
| + .where((data) => data is String)
|
| + .transform(new _JsonStreamDecoder())
|
| + .where((json) => json is Map)
|
| + .asBroadcastStream();
|
| + responseStream = jsonStream
|
| + .where((json) => json[Notification.EVENT] == null)
|
| + .transform(new _ResponseConverter())
|
| + .asBroadcastStream();
|
| + notificationStream = jsonStream
|
| + .where((json) => json[Notification.EVENT] != null)
|
| + .transform(new _NotificationConverter())
|
| + .asBroadcastStream();
|
| }
|
|
|
| @override
|
| - void sendRequest(Request request) {
|
| - socket.add(jsonEncoder.convert(request.toJson()));
|
| + Future<Response> sendRequest(Request request) {
|
| + String id = request.id;
|
| + _socket.add(JSON.encode(request.toJson()));
|
| + return responseStream.firstWhere((Response response) => response.id == id);
|
| }
|
|
|
| - /**
|
| - * Read a request from the given [data] and use the given function to handle
|
| - * the request.
|
| - */
|
| - void _read(Object data,
|
| - void onResponse(Response response),
|
| - void onNotification(Notification notification)) {
|
| - if (data is String) {
|
| - // Parse the string as a JSON descriptor
|
| - var json;
|
| - try {
|
| - json = jsonDecoder.convert(data);
|
| - if (json is! Map) {
|
| - return;
|
| - }
|
| - } catch (error) {
|
| - return;
|
| - }
|
| - // Process the resulting structure as a response or notification.
|
| - if (json[Notification.EVENT] != null) {
|
| - Notification notification = new Notification.fromJson(json);
|
| - if (notification != null) {
|
| - onNotification(notification);
|
| - }
|
| - } else {
|
| - Response response = new Response.fromJson(json);
|
| - if (response != null) {
|
| - onResponse(response);
|
| - }
|
| - }
|
| - }
|
| + @override
|
| + Future close() {
|
| + return _socket.close();
|
| }
|
| }
|
|
|
| @@ -136,8 +126,6 @@ class WebSocketServerChannel implements ServerCommunicationChannel {
|
| */
|
| final WebSocket socket;
|
|
|
| - final JsonEncoder jsonEncoder = const JsonEncoder(null);
|
| -
|
| /**
|
| * Initialize a newly create [WebSocket] wrapper to wrap the given [socket].
|
| */
|
| @@ -151,12 +139,12 @@ class WebSocketServerChannel implements ServerCommunicationChannel {
|
|
|
| @override
|
| void sendNotification(Notification notification) {
|
| - socket.add(jsonEncoder.convert(notification.toJson()));
|
| + socket.add(JSON.encode(notification.toJson()));
|
| }
|
|
|
| @override
|
| void sendResponse(Response response) {
|
| - socket.add(jsonEncoder.convert(response.toJson()));
|
| + socket.add(JSON.encode(response.toJson()));
|
| }
|
|
|
| /**
|
| @@ -180,3 +168,63 @@ class WebSocketServerChannel implements ServerCommunicationChannel {
|
| }
|
| }
|
| }
|
| +
|
| +/**
|
| + * Instances of [_JsonStreamDecoder] convert JSON strings to JSON maps
|
| + */
|
| +class _JsonStreamDecoder extends Converter<String, Map> {
|
| +
|
| + @override
|
| + Map convert(String text) => JSON.decode(text);
|
| +
|
| + @override
|
| + ChunkedConversionSink startChunkedConversion(ChunkedConversionSink sink) =>
|
| + new _ChannelChunkSink<String, Map>(this, sink);
|
| +}
|
| +
|
| +/**
|
| + * Instances of [_ResponseConverter] convert JSON maps to [Response]s.
|
| + */
|
| +class _ResponseConverter extends Converter<Map, Response> {
|
| +
|
| + @override
|
| + Response convert(Map json) => new Response.fromJson(json);
|
| +
|
| + @override
|
| + ChunkedConversionSink startChunkedConversion(ChunkedConversionSink sink) =>
|
| + new _ChannelChunkSink<Map, Response>(this, sink);
|
| +}
|
| +
|
| +/**
|
| + * Instances of [_NotificationConverter] convert JSON maps to [Notification]s.
|
| + */
|
| +class _NotificationConverter extends Converter<Map, Notification> {
|
| +
|
| + @override
|
| + Notification convert(Map json) => new Notification.fromJson(json);
|
| +
|
| + @override
|
| + ChunkedConversionSink startChunkedConversion(ChunkedConversionSink sink) =>
|
| + new _ChannelChunkSink<Map, Notification>(this, sink);
|
| +}
|
| +
|
| +/**
|
| + * A [_ChannelChunkSink] uses a [Convter] to translate chunks.
|
| + */
|
| +class _ChannelChunkSink<S, T> extends ChunkedConversionSink<S> {
|
| + final Converter<S, T> _converter;
|
| + final ChunkedConversionSink _chunkedSink;
|
| +
|
| + _ChannelChunkSink(this._converter, this._chunkedSink);
|
| +
|
| + @override
|
| + void add(S chunk) {
|
| + var convertedChunk = _converter.convert(chunk);
|
| + if (convertedChunk != null) {
|
| + _chunkedSink.add(convertedChunk);
|
| + }
|
| + }
|
| +
|
| + @override
|
| + void close() => _chunkedSink.close();
|
| +}
|
|
|