Index: pkg/analysis_server/lib/src/channel.dart |
diff --git a/pkg/analysis_server/lib/src/channel.dart b/pkg/analysis_server/lib/src/channel.dart |
index e9fc95d859b85d8d43737a17fe3ba4c2e51504c3..85203b0bdab74a0c8dcca61d6d644c892d57344b 100644 |
--- a/pkg/analysis_server/lib/src/channel.dart |
+++ b/pkg/analysis_server/lib/src/channel.dart |
@@ -4,6 +4,7 @@ |
library channel; |
+import 'dart:async'; |
import 'dart:convert'; |
import 'dart:io'; |
@@ -15,22 +16,28 @@ import 'package:analysis_server/src/protocol.dart'; |
* receive both [Response]s and [Notification]s. |
*/ |
abstract class ClientCommunicationChannel { |
+ |
/** |
- * Listen to the channel for responses and notifications. |
- * If a response is received, invoke the [onResponse] function. |
- * If a notification is received, invoke the [onNotification] function. |
- * If an error is encountered while trying to read from |
- * the socket, invoke the [onError] function. If the socket is closed by the |
- * client, invoke the [onDone] function. |
+ * The stream of notifications from the server. |
+ */ |
+ Stream<Notification> notificationStream; |
+ |
+ /** |
+ * The stream of responses from the server. |
+ */ |
+ Stream<Response> responseStream; |
+ |
+ /** |
+ * Send the given [request] to the server |
+ * and return a future with the associated [Response]. |
*/ |
- void listen(void onResponse(Response response), |
- void onNotification(Notification notification), |
- {void onError(), void onDone()}); |
+ Future<Response> sendRequest(Request request); |
/** |
- * Send the given [request] to the server. |
+ * Close the channel to the server. Once called, all future communication |
+ * with the server via [sendRequest] will silently be ignored. |
*/ |
- void sendRequest(Request request); |
+ Future close(); |
} |
/** |
@@ -44,6 +51,7 @@ abstract class ServerCommunicationChannel { |
* [onRequest] function. If an error is encountered while trying to read from |
* the socket, invoke the [onError] function. If the socket is closed by the |
* client, invoke the [onDone] function. |
+ * Only one listener is allowed per channel. |
*/ |
void listen(void onRequest(Request request), {void onError(), void onDone()}); |
@@ -67,61 +75,43 @@ class WebSocketClientChannel implements ClientCommunicationChannel { |
/** |
* The socket being wrapped. |
*/ |
- final WebSocket socket; |
+ final WebSocket _socket; |
- final JsonEncoder jsonEncoder = const JsonEncoder(null); |
+ @override |
+ Stream<Response> responseStream; |
- final JsonDecoder jsonDecoder = const JsonDecoder(null); |
+ @override |
+ Stream<Notification> notificationStream; |
/** |
- * Initialize a newly create [WebSocket] wrapper to wrap the given [socket]. |
+ * Initialize a new [WebSocket] wrapper for the given [_socket]. |
*/ |
- WebSocketClientChannel(this.socket); |
- |
- @override |
- void listen(void onResponse(Response response), |
- void onNotification(Notification notification), |
- {void onError(), void onDone()}) { |
- socket.listen((data) => _read(data, onResponse, onNotification), |
- onError: onError, onDone: onDone); |
+ WebSocketClientChannel(this._socket) { |
+ Stream jsonStream = _socket |
+ .where((data) => data is String) |
+ .transform(new _JsonStreamDecoder()) |
+ .where((json) => json is Map) |
+ .asBroadcastStream(); |
+ responseStream = jsonStream |
+ .where((json) => json[Notification.EVENT] == null) |
+ .transform(new _ResponseConverter()) |
+ .asBroadcastStream(); |
+ notificationStream = jsonStream |
+ .where((json) => json[Notification.EVENT] != null) |
+ .transform(new _NotificationConverter()) |
+ .asBroadcastStream(); |
} |
@override |
- void sendRequest(Request request) { |
- socket.add(jsonEncoder.convert(request.toJson())); |
+ Future<Response> sendRequest(Request request) { |
+ String id = request.id; |
+ _socket.add(JSON.encode(request.toJson())); |
+ return responseStream.firstWhere((Response response) => response.id == id); |
} |
- /** |
- * Read a request from the given [data] and use the given function to handle |
- * the request. |
- */ |
- void _read(Object data, |
- void onResponse(Response response), |
- void onNotification(Notification notification)) { |
- if (data is String) { |
- // Parse the string as a JSON descriptor |
- var json; |
- try { |
- json = jsonDecoder.convert(data); |
- if (json is! Map) { |
- return; |
- } |
- } catch (error) { |
- return; |
- } |
- // Process the resulting structure as a response or notification. |
- if (json[Notification.EVENT] != null) { |
- Notification notification = new Notification.fromJson(json); |
- if (notification != null) { |
- onNotification(notification); |
- } |
- } else { |
- Response response = new Response.fromJson(json); |
- if (response != null) { |
- onResponse(response); |
- } |
- } |
- } |
+ @override |
+ Future close() { |
+ return _socket.close(); |
} |
} |
@@ -136,8 +126,6 @@ class WebSocketServerChannel implements ServerCommunicationChannel { |
*/ |
final WebSocket socket; |
- final JsonEncoder jsonEncoder = const JsonEncoder(null); |
- |
/** |
* Initialize a newly create [WebSocket] wrapper to wrap the given [socket]. |
*/ |
@@ -151,12 +139,12 @@ class WebSocketServerChannel implements ServerCommunicationChannel { |
@override |
void sendNotification(Notification notification) { |
- socket.add(jsonEncoder.convert(notification.toJson())); |
+ socket.add(JSON.encode(notification.toJson())); |
} |
@override |
void sendResponse(Response response) { |
- socket.add(jsonEncoder.convert(response.toJson())); |
+ socket.add(JSON.encode(response.toJson())); |
} |
/** |
@@ -180,3 +168,63 @@ class WebSocketServerChannel implements ServerCommunicationChannel { |
} |
} |
} |
+ |
+/** |
+ * Instances of [_JsonStreamDecoder] convert JSON strings to JSON maps |
+ */ |
+class _JsonStreamDecoder extends Converter<String, Map> { |
+ |
+ @override |
+ Map convert(String text) => JSON.decode(text); |
+ |
+ @override |
+ ChunkedConversionSink startChunkedConversion(ChunkedConversionSink sink) => |
+ new _ChannelChunkSink<String, Map>(this, sink); |
+} |
+ |
+/** |
+ * Instances of [_ResponseConverter] convert JSON maps to [Response]s. |
+ */ |
+class _ResponseConverter extends Converter<Map, Response> { |
+ |
+ @override |
+ Response convert(Map json) => new Response.fromJson(json); |
+ |
+ @override |
+ ChunkedConversionSink startChunkedConversion(ChunkedConversionSink sink) => |
+ new _ChannelChunkSink<Map, Response>(this, sink); |
+} |
+ |
+/** |
+ * Instances of [_NotificationConverter] convert JSON maps to [Notification]s. |
+ */ |
+class _NotificationConverter extends Converter<Map, Notification> { |
+ |
+ @override |
+ Notification convert(Map json) => new Notification.fromJson(json); |
+ |
+ @override |
+ ChunkedConversionSink startChunkedConversion(ChunkedConversionSink sink) => |
+ new _ChannelChunkSink<Map, Notification>(this, sink); |
+} |
+ |
+/** |
+ * A [_ChannelChunkSink] uses a [Convter] to translate chunks. |
+ */ |
+class _ChannelChunkSink<S, T> extends ChunkedConversionSink<S> { |
+ final Converter<S, T> _converter; |
+ final ChunkedConversionSink _chunkedSink; |
+ |
+ _ChannelChunkSink(this._converter, this._chunkedSink); |
+ |
+ @override |
+ void add(S chunk) { |
+ var convertedChunk = _converter.convert(chunk); |
+ if (convertedChunk != null) { |
+ _chunkedSink.add(convertedChunk); |
+ } |
+ } |
+ |
+ @override |
+ void close() => _chunkedSink.close(); |
+} |