Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(149)

Side by Side Diff: pkg/analysis_server/lib/src/channel.dart

Issue 185313002: restructure client api to use streams (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 6 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 library channel; 5 library channel;
6 6
7 import 'dart:async';
7 import 'dart:convert'; 8 import 'dart:convert';
8 import 'dart:io'; 9 import 'dart:io';
9 10
10 import 'package:analysis_server/src/protocol.dart'; 11 import 'package:analysis_server/src/protocol.dart';
11 12
12 /** 13 /**
13 * The abstract class [ClientCommunicationChannel] defines the behavior of 14 * The abstract class [ClientCommunicationChannel] defines the behavior of
14 * objects that allows an object to send [Request]s to [AnalysisServer] and to 15 * objects that allows an object to send [Request]s to [AnalysisServer] and to
15 * receive both [Response]s and [Notification]s. 16 * receive both [Response]s and [Notification]s.
16 */ 17 */
17 abstract class ClientCommunicationChannel { 18 abstract class ClientCommunicationChannel {
18 /**
19 * Listen to the channel for responses and notifications.
20 * If a response is received, invoke the [onResponse] function.
21 * If a notification is received, invoke the [onNotification] function.
22 * If an error is encountered while trying to read from
23 * the socket, invoke the [onError] function. If the socket is closed by the
24 * client, invoke the [onDone] function.
25 */
26 void listen(void onResponse(Response response),
27 void onNotification(Notification notification),
28 {void onError(), void onDone()});
29 19
30 /** 20 /**
31 * Send the given [request] to the server. 21 * The stream of notifications from the server.
32 */ 22 */
33 void sendRequest(Request request); 23 Stream<Notification> notificationStream;
24
25 /**
26 * The stream of responses from the server.
27 */
28 Stream<Response> responseStream;
29
30 /**
31 * Send the given [request] to the server
32 * and return a future with the associated [Response].
33 */
34 Future<Response> sendRequest(Request request);
35
36 /**
37 * Close the channel to the server. Once called, all future communication
38 * with the server via [sendRequest] will silently be ignored.
39 */
40 void close();
34 } 41 }
35 42
36 /** 43 /**
37 * The abstract class [ServerCommunicationChannel] defines the behavior of 44 * The abstract class [ServerCommunicationChannel] defines the behavior of
38 * objects that allow an [AnalysisServer] to receive [Request]s and to return 45 * objects that allow an [AnalysisServer] to receive [Request]s and to return
39 * both [Response]s and [Notification]s. 46 * both [Response]s and [Notification]s.
40 */ 47 */
41 abstract class ServerCommunicationChannel { 48 abstract class ServerCommunicationChannel {
42 /** 49 /**
43 * Listen to the channel for requests. If a request is received, invoke the 50 * Listen to the channel for requests. If a request is received, invoke the
(...skipping 16 matching lines...) Expand all
60 67
61 /** 68 /**
62 * Instances of the class [WebSocketClientChannel] implement a 69 * Instances of the class [WebSocketClientChannel] implement a
63 * [ClientCommunicationChannel] that uses a [WebSocket] to communicate with 70 * [ClientCommunicationChannel] that uses a [WebSocket] to communicate with
64 * servers. 71 * servers.
65 */ 72 */
66 class WebSocketClientChannel implements ClientCommunicationChannel { 73 class WebSocketClientChannel implements ClientCommunicationChannel {
67 /** 74 /**
68 * The socket being wrapped. 75 * The socket being wrapped.
69 */ 76 */
70 final WebSocket socket; 77 final WebSocket _socket;
71 78
72 final JsonEncoder jsonEncoder = const JsonEncoder(null); 79 @override
80 Stream<Response> responseStream;
73 81
74 final JsonDecoder jsonDecoder = const JsonDecoder(null); 82 @override
83 Stream<Notification> notificationStream;
75 84
76 /** 85 /**
77 * Initialize a newly create [WebSocket] wrapper to wrap the given [socket]. 86 * Initialize a new [WebSocket] wrapper for the given [_socket].
78 */ 87 */
79 WebSocketClientChannel(this.socket); 88 WebSocketClientChannel(this._socket) {
80 89 Stream jsonStream = _socket
Brian Wilkerson 2014/03/01 17:16:37 Does 'jsonStream' need to be a broadcast stream in
danrubel 2014/03/03 16:52:20 Good catch! The mock stream was incorrectly simula
81 @override 90 .where((data) => data is String)
82 void listen(void onResponse(Response response), 91 .transform(new _JsonConverter())
83 void onNotification(Notification notification), 92 .where((json) => json is Map);
84 {void onError(), void onDone()}) { 93 responseStream = jsonStream
85 socket.listen((data) => _read(data, onResponse, onNotification), 94 .where((json) => json[Notification.EVENT] == null)
86 onError: onError, onDone: onDone); 95 .transform(new _ResponseConverter())
96 .asBroadcastStream();
97 notificationStream = jsonStream
98 .where((json) => json[Notification.EVENT] != null)
99 .transform(new _NotificationConverter())
100 .asBroadcastStream();
87 } 101 }
88 102
89 @override 103 @override
90 void sendRequest(Request request) { 104 Future<Response> sendRequest(Request request) {
91 socket.add(jsonEncoder.convert(request.toJson())); 105 String id = request.id;
106 _socket.add(_JSON_ENCODER.convert(request.toJson()));
107 return responseStream.firstWhere((Response response) => response.id == id);
92 } 108 }
93 109
94 /** 110 @override
95 * Read a request from the given [data] and use the given function to handle 111 void close() {
96 * the request. 112 _socket.close();
97 */
98 void _read(Object data,
99 void onResponse(Response response),
100 void onNotification(Notification notification)) {
101 if (data is String) {
102 // Parse the string as a JSON descriptor
103 var json;
104 try {
105 json = jsonDecoder.convert(data);
106 if (json is! Map) {
107 return;
108 }
109 } catch (error) {
110 return;
111 }
112 // Process the resulting structure as a response or notification.
113 if (json[Notification.EVENT] != null) {
114 Notification notification = new Notification.fromJson(json);
115 if (notification != null) {
116 onNotification(notification);
117 }
118 } else {
119 Response response = new Response.fromJson(json);
120 if (response != null) {
121 onResponse(response);
122 }
123 }
124 }
125 } 113 }
126 } 114 }
127 115
128 /** 116 /**
129 * Instances of the class [WebSocketServerChannel] implement a 117 * Instances of the class [WebSocketServerChannel] implement a
130 * [ServerCommunicationChannel] that uses a [WebSocket] to communicate with 118 * [ServerCommunicationChannel] that uses a [WebSocket] to communicate with
131 * clients. 119 * clients.
132 */ 120 */
133 class WebSocketServerChannel implements ServerCommunicationChannel { 121 class WebSocketServerChannel implements ServerCommunicationChannel {
134 /** 122 /**
135 * The socket being wrapped. 123 * The socket being wrapped.
136 */ 124 */
137 final WebSocket socket; 125 final WebSocket socket;
138 126
139 final JsonEncoder jsonEncoder = const JsonEncoder(null);
140
141 /** 127 /**
142 * Initialize a newly create [WebSocket] wrapper to wrap the given [socket]. 128 * Initialize a newly create [WebSocket] wrapper to wrap the given [socket].
143 */ 129 */
144 WebSocketServerChannel(this.socket); 130 WebSocketServerChannel(this.socket);
145 131
146 @override 132 @override
147 void listen(void onRequest(Request request), {void onError(), void onDone()}) { 133 void listen(void onRequest(Request request), {void onError(), void onDone()}) {
148 socket.listen((data) => _readRequest(data, onRequest), onError: onError, 134 socket.listen((data) => _readRequest(data, onRequest), onError: onError,
149 onDone: onDone); 135 onDone: onDone);
150 } 136 }
151 137
152 @override 138 @override
153 void sendNotification(Notification notification) { 139 void sendNotification(Notification notification) {
154 socket.add(jsonEncoder.convert(notification.toJson())); 140 socket.add(_JSON_ENCODER.convert(notification.toJson()));
155 } 141 }
156 142
157 @override 143 @override
158 void sendResponse(Response response) { 144 void sendResponse(Response response) {
159 socket.add(jsonEncoder.convert(response.toJson())); 145 socket.add(_JSON_ENCODER.convert(response.toJson()));
160 } 146 }
161 147
162 /** 148 /**
163 * Read a request from the given [data] and use the given function to handle 149 * Read a request from the given [data] and use the given function to handle
164 * the request. 150 * the request.
165 */ 151 */
166 void _readRequest(Object data, void onRequest(Request request)) { 152 void _readRequest(Object data, void onRequest(Request request)) {
167 if (data is List<int>) { 153 if (data is List<int>) {
168 sendResponse(new Response.invalidRequestFormat()); 154 sendResponse(new Response.invalidRequestFormat());
169 return; 155 return;
170 } 156 }
171 if (data is String) { 157 if (data is String) {
172 // Parse the string as a JSON descriptor and process the resulting 158 // Parse the string as a JSON descriptor and process the resulting
173 // structure as a request. 159 // structure as a request.
174 Request request = new Request.fromString(data); 160 Request request = new Request.fromString(data);
175 if (request == null) { 161 if (request == null) {
176 sendResponse(new Response.invalidRequestFormat()); 162 sendResponse(new Response.invalidRequestFormat());
177 return; 163 return;
178 } 164 }
179 onRequest(request); 165 onRequest(request);
180 } 166 }
181 } 167 }
182 } 168 }
169
170 const _JSON_DECODER = const JsonDecoder(null);
171
172 const _JSON_ENCODER = const JsonEncoder(null);
173
174 /**
175 * Instances of [_JsonConverter] convert JSON strings to JSON maps
176 */
177 class _JsonConverter extends Converter<String, Map> {
178
179 @override
180 Map convert(String text) => _JSON_DECODER.convert(text);
181
182 @override
183 ChunkedConversionSink startChunkedConversion(ChunkedConversionSink sink) =>
184 new _ChannelChunkSink<String, Map>(this, sink);
185 }
186
187 /**
188 * Instances of [_ResponseConverter] convert JSON maps to [Response]s.
189 */
190 class _ResponseConverter extends Converter<Map, Response> {
191
192 @override
193 Response convert(Map json) => new Response.fromJson(json);
194
195 @override
196 ChunkedConversionSink startChunkedConversion(ChunkedConversionSink sink) =>
197 new _ChannelChunkSink<Map, Response>(this, sink);
198 }
199
200 /**
201 * Instances of [_NotificationConverter] convert JSON maps to [Notification]s.
202 */
203 class _NotificationConverter extends Converter<Map, Notification> {
204
205 @override
206 Notification convert(Map json) => new Notification.fromJson(json);
207
208 @override
209 ChunkedConversionSink startChunkedConversion(ChunkedConversionSink sink) =>
210 new _ChannelChunkSink<Map, Notification>(this, sink);
211 }
212
213 /**
214 * A [_ChannelChunkSink] uses a [Convter] to translate chunks.
215 */
216 class _ChannelChunkSink<S, T> extends ChunkedConversionSink<S> {
217 final Converter<S, T> _converter;
218 final ChunkedConversionSink _chunkedSink;
219
220 _ChannelChunkSink(this._converter, this._chunkedSink);
221
222 @override
223 void add(S chunk) {
224 var convertedChunk = _converter.convert(chunk);
225 if (convertedChunk != null) {
226 _chunkedSink.add(convertedChunk);
227 }
228 }
229
230 @override
231 void close() => _chunkedSink.close();
232 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698