Chromium Code Reviews| Index: pkg/analysis_server/lib/src/channel.dart |
| diff --git a/pkg/analysis_server/lib/src/channel.dart b/pkg/analysis_server/lib/src/channel.dart |
| index e9fc95d859b85d8d43737a17fe3ba4c2e51504c3..94bbb127c5659588ea2a1ea75c1bcdb2434eceb6 100644 |
| --- a/pkg/analysis_server/lib/src/channel.dart |
| +++ b/pkg/analysis_server/lib/src/channel.dart |
| @@ -4,6 +4,7 @@ |
| library channel; |
| +import 'dart:async'; |
| import 'dart:convert'; |
| import 'dart:io'; |
| @@ -15,22 +16,28 @@ import 'package:analysis_server/src/protocol.dart'; |
| * receive both [Response]s and [Notification]s. |
| */ |
| abstract class ClientCommunicationChannel { |
| + |
| /** |
| - * Listen to the channel for responses and notifications. |
| - * If a response is received, invoke the [onResponse] function. |
| - * If a notification is received, invoke the [onNotification] function. |
| - * If an error is encountered while trying to read from |
| - * the socket, invoke the [onError] function. If the socket is closed by the |
| - * client, invoke the [onDone] function. |
| + * The stream of notifications from the server. |
| + */ |
| + Stream<Notification> notificationStream; |
| + |
| + /** |
| + * The stream of responses from the server. |
| + */ |
| + Stream<Response> responseStream; |
| + |
| + /** |
| + * Send the given [request] to the server |
| + * and return a future with the associated [Response]. |
| */ |
| - void listen(void onResponse(Response response), |
| - void onNotification(Notification notification), |
| - {void onError(), void onDone()}); |
| + Future<Response> sendRequest(Request request); |
| /** |
| - * Send the given [request] to the server. |
| + * Close the channel to the server. Once called, all future communication |
| + * with the server via [sendRequest] will silently be ignored. |
| */ |
| - void sendRequest(Request request); |
| + void close(); |
|
Bob Nystrom
2014/03/04 01:03:45
This should return a Future.
danrubel
2014/03/05 15:32:04
Good point. Done!
|
| } |
| /** |
| @@ -44,6 +51,7 @@ abstract class ServerCommunicationChannel { |
| * [onRequest] function. If an error is encountered while trying to read from |
| * the socket, invoke the [onError] function. If the socket is closed by the |
| * client, invoke the [onDone] function. |
| + * Only one listener is allowed per channel. |
| */ |
| void listen(void onRequest(Request request), {void onError(), void onDone()}); |
| @@ -67,61 +75,43 @@ class WebSocketClientChannel implements ClientCommunicationChannel { |
| /** |
| * The socket being wrapped. |
| */ |
| - final WebSocket socket; |
| + final WebSocket _socket; |
| - final JsonEncoder jsonEncoder = const JsonEncoder(null); |
| + @override |
| + Stream<Response> responseStream; |
| - final JsonDecoder jsonDecoder = const JsonDecoder(null); |
| + @override |
| + Stream<Notification> notificationStream; |
| /** |
| - * Initialize a newly create [WebSocket] wrapper to wrap the given [socket]. |
| + * Initialize a new [WebSocket] wrapper for the given [_socket]. |
| */ |
| - WebSocketClientChannel(this.socket); |
| - |
| - @override |
| - void listen(void onResponse(Response response), |
| - void onNotification(Notification notification), |
| - {void onError(), void onDone()}) { |
| - socket.listen((data) => _read(data, onResponse, onNotification), |
| - onError: onError, onDone: onDone); |
| + WebSocketClientChannel(this._socket) { |
| + Stream jsonStream = _socket |
| + .where((data) => data is String) |
| + .transform(new _JsonConverter()) |
|
Bob Nystrom
2014/03/04 01:03:45
Interesting. Is this so you can handle JSON object
danrubel
2014/03/05 15:32:04
It allows the stream to decide what characters in
nweiz
2014/03/05 20:16:39
This seems strange to me. I believe web sockets gu
danrubel
2014/03/11 19:06:09
Good to know. Perhaps misunderstanding on my part.
|
| + .where((json) => json is Map) |
| + .asBroadcastStream(); |
|
Bob Nystrom
2014/03/04 01:03:45
Fancy! :)
|
| + responseStream = jsonStream |
| + .where((json) => json[Notification.EVENT] == null) |
| + .transform(new _ResponseConverter()) |
| + .asBroadcastStream(); |
| + notificationStream = jsonStream |
| + .where((json) => json[Notification.EVENT] != null) |
| + .transform(new _NotificationConverter()) |
| + .asBroadcastStream(); |
| } |
| @override |
| - void sendRequest(Request request) { |
| - socket.add(jsonEncoder.convert(request.toJson())); |
| + Future<Response> sendRequest(Request request) { |
| + String id = request.id; |
| + _socket.add(_JSON_ENCODER.convert(request.toJson())); |
| + return responseStream.firstWhere((Response response) => response.id == id); |
| } |
| - /** |
| - * Read a request from the given [data] and use the given function to handle |
| - * the request. |
| - */ |
| - void _read(Object data, |
| - void onResponse(Response response), |
| - void onNotification(Notification notification)) { |
| - if (data is String) { |
| - // Parse the string as a JSON descriptor |
| - var json; |
| - try { |
| - json = jsonDecoder.convert(data); |
| - if (json is! Map) { |
| - return; |
| - } |
| - } catch (error) { |
| - return; |
| - } |
| - // Process the resulting structure as a response or notification. |
| - if (json[Notification.EVENT] != null) { |
| - Notification notification = new Notification.fromJson(json); |
| - if (notification != null) { |
| - onNotification(notification); |
| - } |
| - } else { |
| - Response response = new Response.fromJson(json); |
| - if (response != null) { |
| - onResponse(response); |
| - } |
| - } |
| - } |
| + @override |
| + void close() { |
| + _socket.close(); |
|
Bob Nystrom
2014/03/04 01:03:45
Future close() => _socket.close();
I'm not sure,
danrubel
2014/03/05 15:32:04
Good point. Done.
|
| } |
| } |
| @@ -136,8 +126,6 @@ class WebSocketServerChannel implements ServerCommunicationChannel { |
| */ |
| final WebSocket socket; |
| - final JsonEncoder jsonEncoder = const JsonEncoder(null); |
| - |
| /** |
| * Initialize a newly create [WebSocket] wrapper to wrap the given [socket]. |
| */ |
| @@ -151,12 +139,12 @@ class WebSocketServerChannel implements ServerCommunicationChannel { |
| @override |
| void sendNotification(Notification notification) { |
| - socket.add(jsonEncoder.convert(notification.toJson())); |
| + socket.add(_JSON_ENCODER.convert(notification.toJson())); |
|
Bob Nystrom
2014/03/04 01:03:45
JSON.encode(notification.toJson())
danrubel
2014/03/05 15:32:04
Done.
|
| } |
| @override |
| void sendResponse(Response response) { |
| - socket.add(jsonEncoder.convert(response.toJson())); |
| + socket.add(_JSON_ENCODER.convert(response.toJson())); |
|
Bob Nystrom
2014/03/04 01:03:45
Ditto.
danrubel
2014/03/05 15:32:04
Done.
|
| } |
| /** |
| @@ -180,3 +168,67 @@ class WebSocketServerChannel implements ServerCommunicationChannel { |
| } |
| } |
| } |
| + |
| +const _JSON_DECODER = const JsonDecoder(null); |
| + |
| +const _JSON_ENCODER = const JsonEncoder(null); |
|
Bob Nystrom
2014/03/04 01:03:45
You can ditch these and just use JSON.
danrubel
2014/03/05 15:32:04
Done.
|
| + |
| +/** |
| + * Instances of [_JsonConverter] convert JSON strings to JSON maps |
| + */ |
| +class _JsonConverter extends Converter<String, Map> { |
| + |
| + @override |
| + Map convert(String text) => _JSON_DECODER.convert(text); |
| + |
| + @override |
| + ChunkedConversionSink startChunkedConversion(ChunkedConversionSink sink) => |
| + new _ChannelChunkSink<String, Map>(this, sink); |
| +} |
| + |
| +/** |
| + * Instances of [_ResponseConverter] convert JSON maps to [Response]s. |
| + */ |
| +class _ResponseConverter extends Converter<Map, Response> { |
| + |
| + @override |
| + Response convert(Map json) => new Response.fromJson(json); |
| + |
| + @override |
| + ChunkedConversionSink startChunkedConversion(ChunkedConversionSink sink) => |
| + new _ChannelChunkSink<Map, Response>(this, sink); |
| +} |
| + |
| +/** |
| + * Instances of [_NotificationConverter] convert JSON maps to [Notification]s. |
| + */ |
| +class _NotificationConverter extends Converter<Map, Notification> { |
| + |
| + @override |
| + Notification convert(Map json) => new Notification.fromJson(json); |
| + |
| + @override |
| + ChunkedConversionSink startChunkedConversion(ChunkedConversionSink sink) => |
| + new _ChannelChunkSink<Map, Notification>(this, sink); |
| +} |
| + |
| +/** |
| + * A [_ChannelChunkSink] uses a [Convter] to translate chunks. |
| + */ |
| +class _ChannelChunkSink<S, T> extends ChunkedConversionSink<S> { |
| + final Converter<S, T> _converter; |
| + final ChunkedConversionSink _chunkedSink; |
| + |
| + _ChannelChunkSink(this._converter, this._chunkedSink); |
| + |
| + @override |
| + void add(S chunk) { |
| + var convertedChunk = _converter.convert(chunk); |
| + if (convertedChunk != null) { |
| + _chunkedSink.add(convertedChunk); |
| + } |
| + } |
| + |
| + @override |
| + void close() => _chunkedSink.close(); |
| +} |