Chromium Code Reviews| Index: pkg/analysis_server/lib/src/channel.dart |
| diff --git a/pkg/analysis_server/lib/src/channel.dart b/pkg/analysis_server/lib/src/channel.dart |
| index e9fc95d859b85d8d43737a17fe3ba4c2e51504c3..361ba69933dbc9989091b8983620a286f397cde4 100644 |
| --- a/pkg/analysis_server/lib/src/channel.dart |
| +++ b/pkg/analysis_server/lib/src/channel.dart |
| @@ -4,6 +4,7 @@ |
| library channel; |
| +import 'dart:async'; |
| import 'dart:convert'; |
| import 'dart:io'; |
| @@ -15,22 +16,28 @@ import 'package:analysis_server/src/protocol.dart'; |
| * receive both [Response]s and [Notification]s. |
| */ |
| abstract class ClientCommunicationChannel { |
| + |
| /** |
| - * Listen to the channel for responses and notifications. |
| - * If a response is received, invoke the [onResponse] function. |
| - * If a notification is received, invoke the [onNotification] function. |
| - * If an error is encountered while trying to read from |
| - * the socket, invoke the [onError] function. If the socket is closed by the |
| - * client, invoke the [onDone] function. |
| + * The stream of notifications from the server. |
| + */ |
| + Stream<Notification> notificationStream; |
| + |
| + /** |
| + * The stream of responses from the server. |
| + */ |
| + Stream<Response> responseStream; |
| + |
| + /** |
| + * Send the given [request] to the server |
| + * and return a future with the associated [Response]. |
| */ |
| - void listen(void onResponse(Response response), |
| - void onNotification(Notification notification), |
| - {void onError(), void onDone()}); |
| + Future<Response> sendRequest(Request request); |
| /** |
| - * Send the given [request] to the server. |
| + * Close the channel to the server. Once called, all future communication |
| + * with the server via [sendRequest] will silently be ignored. |
| */ |
| - void sendRequest(Request request); |
| + void close(); |
| } |
| /** |
| @@ -67,61 +74,42 @@ class WebSocketClientChannel implements ClientCommunicationChannel { |
| /** |
| * The socket being wrapped. |
| */ |
| - final WebSocket socket; |
| + final WebSocket _socket; |
| - final JsonEncoder jsonEncoder = const JsonEncoder(null); |
| + @override |
| + Stream<Response> responseStream; |
| - final JsonDecoder jsonDecoder = const JsonDecoder(null); |
| + @override |
| + Stream<Notification> notificationStream; |
| /** |
| - * Initialize a newly create [WebSocket] wrapper to wrap the given [socket]. |
| + * Initialize a new [WebSocket] wrapper for the given [_socket]. |
| */ |
| - WebSocketClientChannel(this.socket); |
| - |
| - @override |
| - void listen(void onResponse(Response response), |
| - void onNotification(Notification notification), |
| - {void onError(), void onDone()}) { |
| - socket.listen((data) => _read(data, onResponse, onNotification), |
| - onError: onError, onDone: onDone); |
| + WebSocketClientChannel(this._socket) { |
| + Stream jsonStream = _socket |
|
Brian Wilkerson
2014/03/01 17:16:37
Does 'jsonStream' need to be a broadcast stream in
danrubel
2014/03/03 16:52:20
Good catch! The mock stream was incorrectly simula
|
| + .where((data) => data is String) |
| + .transform(new _JsonConverter()) |
| + .where((json) => json is Map); |
| + responseStream = jsonStream |
| + .where((json) => json[Notification.EVENT] == null) |
| + .transform(new _ResponseConverter()) |
| + .asBroadcastStream(); |
| + notificationStream = jsonStream |
| + .where((json) => json[Notification.EVENT] != null) |
| + .transform(new _NotificationConverter()) |
| + .asBroadcastStream(); |
| } |
| @override |
| - void sendRequest(Request request) { |
| - socket.add(jsonEncoder.convert(request.toJson())); |
| + Future<Response> sendRequest(Request request) { |
| + String id = request.id; |
| + _socket.add(_JSON_ENCODER.convert(request.toJson())); |
| + return responseStream.firstWhere((Response response) => response.id == id); |
| } |
| - /** |
| - * Read a request from the given [data] and use the given function to handle |
| - * the request. |
| - */ |
| - void _read(Object data, |
| - void onResponse(Response response), |
| - void onNotification(Notification notification)) { |
| - if (data is String) { |
| - // Parse the string as a JSON descriptor |
| - var json; |
| - try { |
| - json = jsonDecoder.convert(data); |
| - if (json is! Map) { |
| - return; |
| - } |
| - } catch (error) { |
| - return; |
| - } |
| - // Process the resulting structure as a response or notification. |
| - if (json[Notification.EVENT] != null) { |
| - Notification notification = new Notification.fromJson(json); |
| - if (notification != null) { |
| - onNotification(notification); |
| - } |
| - } else { |
| - Response response = new Response.fromJson(json); |
| - if (response != null) { |
| - onResponse(response); |
| - } |
| - } |
| - } |
| + @override |
| + void close() { |
| + _socket.close(); |
| } |
| } |
| @@ -136,8 +124,6 @@ class WebSocketServerChannel implements ServerCommunicationChannel { |
| */ |
| final WebSocket socket; |
| - final JsonEncoder jsonEncoder = const JsonEncoder(null); |
| - |
| /** |
| * Initialize a newly create [WebSocket] wrapper to wrap the given [socket]. |
| */ |
| @@ -151,12 +137,12 @@ class WebSocketServerChannel implements ServerCommunicationChannel { |
| @override |
| void sendNotification(Notification notification) { |
| - socket.add(jsonEncoder.convert(notification.toJson())); |
| + socket.add(_JSON_ENCODER.convert(notification.toJson())); |
| } |
| @override |
| void sendResponse(Response response) { |
| - socket.add(jsonEncoder.convert(response.toJson())); |
| + socket.add(_JSON_ENCODER.convert(response.toJson())); |
| } |
| /** |
| @@ -180,3 +166,67 @@ class WebSocketServerChannel implements ServerCommunicationChannel { |
| } |
| } |
| } |
| + |
| +const _JSON_DECODER = const JsonDecoder(null); |
| + |
| +const _JSON_ENCODER = const JsonEncoder(null); |
| + |
| +/** |
| + * Instances of [_JsonConverter] convert JSON strings to JSON maps |
| + */ |
| +class _JsonConverter extends Converter<String, Map> { |
| + |
| + @override |
| + Map convert(String text) => _JSON_DECODER.convert(text); |
| + |
| + @override |
| + ChunkedConversionSink startChunkedConversion(ChunkedConversionSink sink) => |
| + new _ChannelChunkSink<String, Map>(this, sink); |
| +} |
| + |
| +/** |
| + * Instances of [_ResponseConverter] convert JSON maps to [Response]s. |
| + */ |
| +class _ResponseConverter extends Converter<Map, Response> { |
| + |
| + @override |
| + Response convert(Map json) => new Response.fromJson(json); |
| + |
| + @override |
| + ChunkedConversionSink startChunkedConversion(ChunkedConversionSink sink) => |
| + new _ChannelChunkSink<Map, Response>(this, sink); |
| +} |
| + |
| +/** |
| + * Instances of [_NotificationConverter] convert JSON maps to [Notification]s. |
| + */ |
| +class _NotificationConverter extends Converter<Map, Notification> { |
| + |
| + @override |
| + Notification convert(Map json) => new Notification.fromJson(json); |
| + |
| + @override |
| + ChunkedConversionSink startChunkedConversion(ChunkedConversionSink sink) => |
| + new _ChannelChunkSink<Map, Notification>(this, sink); |
| +} |
| + |
| +/** |
| + * A [_ChannelChunkSink] uses a [Convter] to translate chunks. |
| + */ |
| +class _ChannelChunkSink<S, T> extends ChunkedConversionSink<S> { |
| + final Converter<S, T> _converter; |
| + final ChunkedConversionSink _chunkedSink; |
| + |
| + _ChannelChunkSink(this._converter, this._chunkedSink); |
| + |
| + @override |
| + void add(S chunk) { |
| + var convertedChunk = _converter.convert(chunk); |
| + if (convertedChunk != null) { |
| + _chunkedSink.add(convertedChunk); |
| + } |
| + } |
| + |
| + @override |
| + void close() => _chunkedSink.close(); |
| +} |