Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(68)

Unified Diff: pkg/analysis_server/lib/src/channel.dart

Issue 185313002: restructure client api to use streams (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: address comments Created 6 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « pkg/analysis_server/lib/src/analysis_manager.dart ('k') | pkg/analysis_server/lib/src/protocol.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: pkg/analysis_server/lib/src/channel.dart
diff --git a/pkg/analysis_server/lib/src/channel.dart b/pkg/analysis_server/lib/src/channel.dart
index e9fc95d859b85d8d43737a17fe3ba4c2e51504c3..85203b0bdab74a0c8dcca61d6d644c892d57344b 100644
--- a/pkg/analysis_server/lib/src/channel.dart
+++ b/pkg/analysis_server/lib/src/channel.dart
@@ -4,6 +4,7 @@
library channel;
+import 'dart:async';
import 'dart:convert';
import 'dart:io';
@@ -15,22 +16,28 @@ import 'package:analysis_server/src/protocol.dart';
* receive both [Response]s and [Notification]s.
*/
abstract class ClientCommunicationChannel {
+
/**
- * Listen to the channel for responses and notifications.
- * If a response is received, invoke the [onResponse] function.
- * If a notification is received, invoke the [onNotification] function.
- * If an error is encountered while trying to read from
- * the socket, invoke the [onError] function. If the socket is closed by the
- * client, invoke the [onDone] function.
+ * The stream of notifications from the server.
+ */
+ Stream<Notification> notificationStream;
+
+ /**
+ * The stream of responses from the server.
+ */
+ Stream<Response> responseStream;
+
+ /**
+ * Send the given [request] to the server
+ * and return a future with the associated [Response].
*/
- void listen(void onResponse(Response response),
- void onNotification(Notification notification),
- {void onError(), void onDone()});
+ Future<Response> sendRequest(Request request);
/**
- * Send the given [request] to the server.
+ * Close the channel to the server. Once called, all future communication
+ * with the server via [sendRequest] will silently be ignored.
*/
- void sendRequest(Request request);
+ Future close();
}
/**
@@ -44,6 +51,7 @@ abstract class ServerCommunicationChannel {
* [onRequest] function. If an error is encountered while trying to read from
* the socket, invoke the [onError] function. If the socket is closed by the
* client, invoke the [onDone] function.
+ * Only one listener is allowed per channel.
*/
void listen(void onRequest(Request request), {void onError(), void onDone()});
@@ -67,61 +75,43 @@ class WebSocketClientChannel implements ClientCommunicationChannel {
/**
* The socket being wrapped.
*/
- final WebSocket socket;
+ final WebSocket _socket;
- final JsonEncoder jsonEncoder = const JsonEncoder(null);
+ @override
+ Stream<Response> responseStream;
- final JsonDecoder jsonDecoder = const JsonDecoder(null);
+ @override
+ Stream<Notification> notificationStream;
/**
- * Initialize a newly create [WebSocket] wrapper to wrap the given [socket].
+ * Initialize a new [WebSocket] wrapper for the given [_socket].
*/
- WebSocketClientChannel(this.socket);
-
- @override
- void listen(void onResponse(Response response),
- void onNotification(Notification notification),
- {void onError(), void onDone()}) {
- socket.listen((data) => _read(data, onResponse, onNotification),
- onError: onError, onDone: onDone);
+ WebSocketClientChannel(this._socket) {
+ Stream jsonStream = _socket
+ .where((data) => data is String)
+ .transform(new _JsonStreamDecoder())
+ .where((json) => json is Map)
+ .asBroadcastStream();
+ responseStream = jsonStream
+ .where((json) => json[Notification.EVENT] == null)
+ .transform(new _ResponseConverter())
+ .asBroadcastStream();
+ notificationStream = jsonStream
+ .where((json) => json[Notification.EVENT] != null)
+ .transform(new _NotificationConverter())
+ .asBroadcastStream();
}
@override
- void sendRequest(Request request) {
- socket.add(jsonEncoder.convert(request.toJson()));
+ Future<Response> sendRequest(Request request) {
+ String id = request.id;
+ _socket.add(JSON.encode(request.toJson()));
+ return responseStream.firstWhere((Response response) => response.id == id);
}
- /**
- * Read a request from the given [data] and use the given function to handle
- * the request.
- */
- void _read(Object data,
- void onResponse(Response response),
- void onNotification(Notification notification)) {
- if (data is String) {
- // Parse the string as a JSON descriptor
- var json;
- try {
- json = jsonDecoder.convert(data);
- if (json is! Map) {
- return;
- }
- } catch (error) {
- return;
- }
- // Process the resulting structure as a response or notification.
- if (json[Notification.EVENT] != null) {
- Notification notification = new Notification.fromJson(json);
- if (notification != null) {
- onNotification(notification);
- }
- } else {
- Response response = new Response.fromJson(json);
- if (response != null) {
- onResponse(response);
- }
- }
- }
+ @override
+ Future close() {
+ return _socket.close();
}
}
@@ -136,8 +126,6 @@ class WebSocketServerChannel implements ServerCommunicationChannel {
*/
final WebSocket socket;
- final JsonEncoder jsonEncoder = const JsonEncoder(null);
-
/**
* Initialize a newly create [WebSocket] wrapper to wrap the given [socket].
*/
@@ -151,12 +139,12 @@ class WebSocketServerChannel implements ServerCommunicationChannel {
@override
void sendNotification(Notification notification) {
- socket.add(jsonEncoder.convert(notification.toJson()));
+ socket.add(JSON.encode(notification.toJson()));
}
@override
void sendResponse(Response response) {
- socket.add(jsonEncoder.convert(response.toJson()));
+ socket.add(JSON.encode(response.toJson()));
}
/**
@@ -180,3 +168,63 @@ class WebSocketServerChannel implements ServerCommunicationChannel {
}
}
}
+
+/**
+ * Instances of [_JsonStreamDecoder] convert JSON strings to JSON maps
+ */
+class _JsonStreamDecoder extends Converter<String, Map> {
+
+ @override
+ Map convert(String text) => JSON.decode(text);
+
+ @override
+ ChunkedConversionSink startChunkedConversion(ChunkedConversionSink sink) =>
+ new _ChannelChunkSink<String, Map>(this, sink);
+}
+
+/**
+ * Instances of [_ResponseConverter] convert JSON maps to [Response]s.
+ */
+class _ResponseConverter extends Converter<Map, Response> {
+
+ @override
+ Response convert(Map json) => new Response.fromJson(json);
+
+ @override
+ ChunkedConversionSink startChunkedConversion(ChunkedConversionSink sink) =>
+ new _ChannelChunkSink<Map, Response>(this, sink);
+}
+
+/**
+ * Instances of [_NotificationConverter] convert JSON maps to [Notification]s.
+ */
+class _NotificationConverter extends Converter<Map, Notification> {
+
+ @override
+ Notification convert(Map json) => new Notification.fromJson(json);
+
+ @override
+ ChunkedConversionSink startChunkedConversion(ChunkedConversionSink sink) =>
+ new _ChannelChunkSink<Map, Notification>(this, sink);
+}
+
+/**
+ * A [_ChannelChunkSink] uses a [Convter] to translate chunks.
+ */
+class _ChannelChunkSink<S, T> extends ChunkedConversionSink<S> {
+ final Converter<S, T> _converter;
+ final ChunkedConversionSink _chunkedSink;
+
+ _ChannelChunkSink(this._converter, this._chunkedSink);
+
+ @override
+ void add(S chunk) {
+ var convertedChunk = _converter.convert(chunk);
+ if (convertedChunk != null) {
+ _chunkedSink.add(convertedChunk);
+ }
+ }
+
+ @override
+ void close() => _chunkedSink.close();
+}
« no previous file with comments | « pkg/analysis_server/lib/src/analysis_manager.dart ('k') | pkg/analysis_server/lib/src/protocol.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698