Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(256)

Side by Side Diff: runtime/vm/service/vmservice.dart

Issue 1166433008: 2nd attempt at adding streamListen/streamCancel to the service protocol. (Closed) Base URL: git@github.com:dart-lang/sdk.git@master
Patch Set: fix context objects Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « runtime/vm/service/service.md ('k') | runtime/vm/service_event.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 library vmservice; 5 library vmservice;
6 6
7 import 'dart:async'; 7 import 'dart:async';
8 import 'dart:convert'; 8 import 'dart:convert';
9 import 'dart:isolate'; 9 import 'dart:isolate';
10 import 'dart:typed_data'; 10 import 'dart:typed_data';
(...skipping 18 matching lines...) Expand all
29 29
30 /// Collection of currently running isolates. 30 /// Collection of currently running isolates.
31 RunningIsolates runningIsolates = new RunningIsolates(); 31 RunningIsolates runningIsolates = new RunningIsolates();
32 32
33 /// A port used to receive events from the VM. 33 /// A port used to receive events from the VM.
34 final RawReceivePort eventPort; 34 final RawReceivePort eventPort;
35 35
36 ShutdownCallback onShutdown; 36 ShutdownCallback onShutdown;
37 37
38 void _addClient(Client client) { 38 void _addClient(Client client) {
39 assert(client.streams.isEmpty);
39 clients.add(client); 40 clients.add(client);
40 } 41 }
41 42
42 void _removeClient(Client client) { 43 void _removeClient(Client client) {
43 clients.remove(client); 44 clients.remove(client);
44 } 45 for (var streamId in client.streams) {
45 46 if (!_isAnyClientSubscribed(streamId)) {
46 void _eventMessageHandler(dynamic eventMessage) { 47 _vmCancelStream(streamId);
47 for (var client in clients) {
48 if (client.sendEvents) {
49 client.post(eventMessage);
50 } 48 }
51 } 49 }
52 } 50 }
51
52 void _eventMessageHandler(List eventMessage) {
53 var streamId = eventMessage[0];
54 var event = eventMessage[1];
55 for (var client in clients) {
56 if (client.sendEvents && client.streams.contains(streamId)) {
57 client.post(event);
58 }
59 }
60 }
53 61
54 void _controlMessageHandler(int code, 62 void _controlMessageHandler(int code,
55 int portId, 63 int portId,
56 SendPort sp, 64 SendPort sp,
57 String name) { 65 String name) {
58 switch (code) { 66 switch (code) {
59 case Constants.ISOLATE_STARTUP_MESSAGE_ID: 67 case Constants.ISOLATE_STARTUP_MESSAGE_ID:
60 runningIsolates.isolateStartup(portId, sp, name); 68 runningIsolates.isolateStartup(portId, sp, name);
61 break; 69 break;
62 case Constants.ISOLATE_SHUTDOWN_MESSAGE_ID: 70 case Constants.ISOLATE_SHUTDOWN_MESSAGE_ID:
(...skipping 11 matching lines...) Expand all
74 client.close(); 82 client.close();
75 } 83 }
76 // Call embedder shutdown hook after the internal shutdown. 84 // Call embedder shutdown hook after the internal shutdown.
77 if (onShutdown != null) { 85 if (onShutdown != null) {
78 onShutdown(); 86 onShutdown();
79 } 87 }
80 _onExit(); 88 _onExit();
81 } 89 }
82 90
83 void messageHandler(message) { 91 void messageHandler(message) {
84 if (message is String) {
85 // This is an event intended for all clients.
86 _eventMessageHandler(message);
87 return;
88 }
89 if (message is Uint8List) {
90 // This is "raw" data intended for a specific client.
91 //
92 // TODO(turnidge): Do not broadcast this data to all clients.
93 _eventMessageHandler(message);
94 return;
95 }
96 if (message is List) { 92 if (message is List) {
97 // This is an internal vm service event. 93 if (message.length == 2) {
94 // This is an event.
95 assert(message[0] is String);
96 assert(message[1] is String || message[1] is Uint8List);
97 _eventMessageHandler(message);
98 return;
99 }
98 if (message.length == 1) { 100 if (message.length == 1) {
99 // This is a control message directing the vm service to exit. 101 // This is a control message directing the vm service to exit.
100 assert(message[0] == Constants.SERVICE_EXIT_MESSAGE_ID); 102 assert(message[0] == Constants.SERVICE_EXIT_MESSAGE_ID);
101 _exit(); 103 _exit();
102 return; 104 return;
103 } 105 }
104 if (message.length == 4) { 106 if (message.length == 4) {
105 // This is a message informing us of the birth or death of an 107 // This is a message informing us of the birth or death of an
106 // isolate. 108 // isolate.
107 _controlMessageHandler(message[0], message[1], message[2], message[3]); 109 _controlMessageHandler(message[0], message[1], message[2], message[3]);
108 return; 110 return;
109 } 111 }
110 } 112 }
111
112 Logger.root.severe( 113 Logger.root.severe(
113 'Internal vm-service error: ignoring illegal message: $message'); 114 'Internal vm-service error: ignoring illegal message: $message');
114 } 115 }
115 116
116 void _notSupported(_) { 117 void _notSupported(_) {
117 throw new UnimplementedError('Service script loading not supported.'); 118 throw new UnimplementedError('Service script loading not supported.');
118 } 119 }
119 120
120 VMService._internal() 121 VMService._internal()
121 : eventPort = isolateLifecyclePort { 122 : eventPort = isolateLifecyclePort {
(...skipping 13 matching lines...) Expand all
135 var members = []; 136 var members = [];
136 var result = {}; 137 var result = {};
137 clients.forEach((client) { 138 clients.forEach((client) {
138 members.add(client.toJson()); 139 members.add(client.toJson());
139 }); 140 });
140 result['type'] = 'ClientList'; 141 result['type'] = 'ClientList';
141 result['members'] = members; 142 result['members'] = members;
142 message.setResponse(JSON.encode(result)); 143 message.setResponse(JSON.encode(result));
143 } 144 }
144 145
146 // These must be kept in sync with the declarations in vm/json_stream.h.
147 static const _kInvalidParams = -32602;
148 static const _kStreamAlreadySubscribed = 103;
149 static const _kStreamNotSubscribed = 104;
150
151 var _errorMessages = {
152 _kInvalidParams: 'Invalid params"',
153 _kStreamAlreadySubscribed: 'Stream already subscribed',
154 _kStreamNotSubscribed: 'Stream not subscribed',
155 };
156
157 String _encodeError(Message message, int code, {String details}) {
158 var response = {
159 'id' : message.serial,
160 'error' : {
161 'code': code,
162 'message': _errorMessages[code],
163 },
164 };
165 if (details != null) {
166 response['error']['data'] = {
167 'details': details,
168 };
169 }
170 return JSON.encode(response);
171 }
172
173 String _encodeResult(Message message, Map result) {
174 var response = {
175 'id' : message.serial,
176 'result' : result,
177 };
178 return JSON.encode(response);
179 }
180
181 bool _isValidStream(String streamId) {
182 final validStreams = [ 'Isolate', 'Debug', 'GC', '_Echo', '_Graph' ];
183 return validStreams.contains(streamId);
184 }
185
186 bool _isAnyClientSubscribed(String streamId) {
187 for (var client in clients) {
188 if (client.streams.contains(streamId)) {
189 return true;
190 }
191 }
192 return false;
193 }
194
195 Future<String> _streamListen(Message message) async {
196 var client = message.client;
197 var streamId = message.params['streamId'];
198
199 if (!_isValidStream(streamId)) {
200 return _encodeError(
201 message, _kInvalidParams,
202 details:"streamListen: invalid 'streamId' parameter: ${streamId}");
203 }
204 if (client.streams.contains(streamId)) {
205 return _encodeError(message, _kStreamAlreadySubscribed);
206 }
207 if (!_isAnyClientSubscribed(streamId)) {
208 _vmListenStream(streamId);
209 }
210 client.streams.add(streamId);
211
212 var result = { 'type' : 'Success' };
213 return _encodeResult(message, result);
214 }
215
216 Future<String> _streamCancel(Message message) async {
217 var client = message.client;
218 var streamId = message.params['streamId'];
219
220 if (!_isValidStream(streamId)) {
221 return _encodeError(
222 message, _kInvalidParams,
223 details:"streamCancel: invalid 'streamId' parameter: ${streamId}");
224 }
225 if (!client.streams.contains(streamId)) {
226 return _encodeError(message, _kStreamNotSubscribed);
227 }
228 client.streams.remove(streamId);
229 if (!_isAnyClientSubscribed(streamId)) {
230 _vmCancelStream(streamId);
231 }
232
233 var result = { 'type' : 'Success' };
234 return _encodeResult(message, result);
235 }
236
145 // TODO(johnmccutchan): Turn this into a command line tool that uses the 237 // TODO(johnmccutchan): Turn this into a command line tool that uses the
146 // service library. 238 // service library.
147 Future<String> _getCrashDump() async { 239 Future<String> _getCrashDump(Message message) async {
240 var client = message.client;
148 final perIsolateRequests = [ 241 final perIsolateRequests = [
149 // ?isolateId=<isolate id> will be appended to each of these requests. 242 // ?isolateId=<isolate id> will be appended to each of these requests.
150 // Isolate information. 243 // Isolate information.
151 Uri.parse('getIsolate'), 244 Uri.parse('getIsolate'),
152 // State of heap. 245 // State of heap.
153 Uri.parse('_getAllocationProfile'), 246 Uri.parse('_getAllocationProfile'),
154 // Call stack + local variables. 247 // Call stack + local variables.
155 Uri.parse('getStack?_full=true'), 248 Uri.parse('getStack?_full=true'),
156 ]; 249 ];
157 250
158 // Snapshot of running isolates. 251 // Snapshot of running isolates.
159 var isolates = runningIsolates.isolates.values.toList(); 252 var isolates = runningIsolates.isolates.values.toList();
160 253
161 // Collect the mapping from request uris to responses. 254 // Collect the mapping from request uris to responses.
162 var responses = { 255 var responses = {
163 }; 256 };
164 257
165 // Request VM. 258 // Request VM.
166 var getVM = Uri.parse('getVM'); 259 var getVM = Uri.parse('getVM');
167 var getVmResponse = JSON.decode( 260 var getVmResponse = JSON.decode(
168 await new Message.fromUri(getVM).sendToVM()); 261 await new Message.fromUri(client, getVM).sendToVM());
169 responses[getVM.toString()] = getVmResponse['result']; 262 responses[getVM.toString()] = getVmResponse['result'];
170 263
171 // Request command line flags. 264 // Request command line flags.
172 var getFlagList = Uri.parse('getFlagList'); 265 var getFlagList = Uri.parse('getFlagList');
173 var getFlagListResponse = JSON.decode( 266 var getFlagListResponse = JSON.decode(
174 await new Message.fromUri(getFlagList).sendToVM()); 267 await new Message.fromUri(client, getFlagList).sendToVM());
175 responses[getFlagList.toString()] = getFlagListResponse['result']; 268 responses[getFlagList.toString()] = getFlagListResponse['result'];
176 269
177 // Make requests to each isolate. 270 // Make requests to each isolate.
178 for (var isolate in isolates) { 271 for (var isolate in isolates) {
179 for (var request in perIsolateRequests) { 272 for (var request in perIsolateRequests) {
180 var message = new Message.forIsolate(request, isolate); 273 var message = new Message.forIsolate(request, isolate);
181 // Decode the JSON and and insert it into the map. The map key 274 // Decode the JSON and and insert it into the map. The map key
182 // is the request Uri. 275 // is the request Uri.
183 var response = JSON.decode(await isolate.route(message)); 276 var response = JSON.decode(await isolate.route(message));
184 responses[message.toUri().toString()] = response['result']; 277 responses[message.toUri().toString()] = response['result'];
185 } 278 }
186 // Dump the object id ring requests. 279 // Dump the object id ring requests.
187 var message = 280 var message =
188 new Message.forIsolate(Uri.parse('_dumpIdZone'), isolate); 281 new Message.forIsolate(client, Uri.parse('_dumpIdZone'), isolate);
189 var response = JSON.decode(await isolate.route(message)); 282 var response = JSON.decode(await isolate.route(message));
190 // Insert getObject requests into responses map. 283 // Insert getObject requests into responses map.
191 for (var object in response['result']['objects']) { 284 for (var object in response['result']['objects']) {
192 final requestUri = 285 final requestUri =
193 'getObject&isolateId=${isolate.serviceId}?objectId=${object["id"]}'; 286 'getObject&isolateId=${isolate.serviceId}?objectId=${object["id"]}';
194 responses[requestUri] = object; 287 responses[requestUri] = object;
195 } 288 }
196 } 289 }
197 290
198 // Encode the entire crash dump. 291 // Encode the entire crash dump.
199 return JSON.encode({ 292 return _encodeResult(message, responses);
200 'id' : null,
201 'result' : responses,
202 });
203 } 293 }
204 294
205 Future<String> route(Message message) { 295 Future<String> route(Message message) {
206 if (message.completed) { 296 if (message.completed) {
207 return message.response; 297 return message.response;
208 } 298 }
209 // TODO(turnidge): Update to json rpc. BEFORE SUBMIT. 299 // TODO(turnidge): Update to json rpc. BEFORE SUBMIT.
210 if ((message.path.length == 1) && (message.path[0] == 'clients')) { 300 if ((message.path.length == 1) && (message.path[0] == 'clients')) {
211 _clientCollection(message); 301 _clientCollection(message);
212 return message.response; 302 return message.response;
213 } 303 }
214 if (message.method == '_getCrashDump') { 304 if (message.method == '_getCrashDump') {
215 return _getCrashDump(); 305 return _getCrashDump(message);
306 }
307 if (message.method == 'streamListen') {
308 return _streamListen(message);
309 }
310 if (message.method == 'streamCancel') {
311 return _streamCancel(message);
216 } 312 }
217 if (message.params['isolateId'] != null) { 313 if (message.params['isolateId'] != null) {
218 return runningIsolates.route(message); 314 return runningIsolates.route(message);
219 } 315 }
220 return message.sendToVM(); 316 return message.sendToVM();
221 } 317 }
222 } 318 }
223 319
224 RawReceivePort boot() { 320 RawReceivePort boot() {
225 // Return the port we expect isolate startup and shutdown messages on. 321 // Return the port we expect isolate startup and shutdown messages on.
226 return isolateLifecyclePort; 322 return isolateLifecyclePort;
227 } 323 }
228 324
229 void _registerIsolate(int port_id, SendPort sp, String name) { 325 void _registerIsolate(int port_id, SendPort sp, String name) {
230 var service = new VMService(); 326 var service = new VMService();
231 service.runningIsolates.isolateStartup(port_id, sp, name); 327 service.runningIsolates.isolateStartup(port_id, sp, name);
232 } 328 }
233 329
234 void _onStart() native "VMService_OnStart"; 330 void _onStart() native "VMService_OnStart";
235 331
236 void _onExit() native "VMService_OnExit"; 332 void _onExit() native "VMService_OnExit";
333
334 void _vmListenStream(String streamId) native "VMService_ListenStream";
335
336 void _vmCancelStream(String streamId) native "VMService_CancelStream";
OLDNEW
« no previous file with comments | « runtime/vm/service/service.md ('k') | runtime/vm/service_event.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698