Index: runtime/vm/service/vmservice.dart |
diff --git a/runtime/vm/service/vmservice.dart b/runtime/vm/service/vmservice.dart |
index 4566e494f2b535f99b92cd87f3d323985de7a0e8..25d1fa0c6749ae59f8fe7ce5ae65858b6f252228 100644 |
--- a/runtime/vm/service/vmservice.dart |
+++ b/runtime/vm/service/vmservice.dart |
@@ -36,17 +36,25 @@ class VMService extends MessageRouter { |
ShutdownCallback onShutdown; |
void _addClient(Client client) { |
+ assert(client.streams.isEmpty); |
clients.add(client); |
} |
void _removeClient(Client client) { |
clients.remove(client); |
+ for (var streamId in client.streams) { |
+ if (!_isAnyClientSubscribed(streamId)) { |
+ _vmCancelStream(streamId); |
+ } |
+ } |
} |
- void _eventMessageHandler(dynamic eventMessage) { |
+ void _eventMessageHandler(List eventMessage) { |
+ var streamId = eventMessage[0]; |
+ var event = eventMessage[1]; |
for (var client in clients) { |
- if (client.sendEvents) { |
- client.post(eventMessage); |
+ if (client.sendEvents && client.streams.contains(streamId)) { |
+ client.post(event); |
} |
} |
} |
@@ -81,20 +89,14 @@ class VMService extends MessageRouter { |
} |
void messageHandler(message) { |
- if (message is String) { |
- // This is an event intended for all clients. |
- _eventMessageHandler(message); |
- return; |
- } |
- if (message is Uint8List) { |
- // This is "raw" data intended for a specific client. |
- // |
- // TODO(turnidge): Do not broadcast this data to all clients. |
- _eventMessageHandler(message); |
- return; |
- } |
if (message is List) { |
- // This is an internal vm service event. |
+ if (message.length == 2) { |
+ // This is an event. |
+ assert(message[0] is String); |
+ assert(message[1] is String || message[1] is Uint8List); |
+ _eventMessageHandler(message); |
+ return; |
+ } |
if (message.length == 1) { |
// This is a control message directing the vm service to exit. |
assert(message[0] == Constants.SERVICE_EXIT_MESSAGE_ID); |
@@ -108,7 +110,6 @@ class VMService extends MessageRouter { |
return; |
} |
} |
- |
Logger.root.severe( |
'Internal vm-service error: ignoring illegal message: $message'); |
} |
@@ -142,9 +143,92 @@ class VMService extends MessageRouter { |
message.setResponse(JSON.encode(result)); |
} |
+ // These must be kept in sync with the declarations in vm/json_stream.h. |
+ static const _kInvalidStream = 100; |
+ static const _kStreamAlreadySubscribed = 101; |
+ static const _kStreamNotSubscribed = 102; |
+ |
+ var _errorMessages = { |
+ _kInvalidStream: 'Invalid stream', |
+ _kStreamAlreadySubscribed: 'Stream already subscribed', |
+ _kStreamNotSubscribed: 'Stream not subscribed', |
+ }; |
+ |
+ String _encodeError(Message message, int code) { |
+ var response = { |
+ 'id' : message.serial, |
+ 'error' : { |
+ 'code': code, |
+ 'message': _errorMessages[code], |
+ }, |
+ }; |
+ return JSON.encode(response); |
+ } |
+ |
+ String _encodeResult(Message message, Map result) { |
+ var response = { |
+ 'id' : message.serial, |
+ 'result' : result, |
+ }; |
+ return JSON.encode(response); |
+ } |
+ |
+ bool _isValidStream(String streamId) { |
+ var validStreams = [ 'Isolate', 'Debug', 'GC', '_Echo', '_Graph' ]; |
+ return validStreams.contains(streamId); |
+ } |
+ |
+ bool _isAnyClientSubscribed(String streamId) { |
+ for (var client in clients) { |
+ if (client.streams.contains(streamId)) { |
+ return true; |
+ } |
+ } |
+ return false; |
+ } |
+ |
+ Future<String> _streamListen(Message message) async { |
+ var client = message.client; |
+ var streamId = message.params['streamId']; |
+ |
+ if (!_isValidStream(streamId)) { |
+ return _encodeError(message, _kInvalidStream); |
+ } |
+ if (client.streams.contains(streamId)) { |
+ return _encodeError(message, _kStreamAlreadySubscribed); |
+ } |
+ if (!_isAnyClientSubscribed(streamId)) { |
+ _vmListenStream(streamId); |
+ } |
+ client.streams.add(streamId); |
+ |
+ var result = { 'type' : 'Success' }; |
+ return _encodeResult(message, result); |
+ } |
+ |
+ Future<String> _streamCancel(Message message) async { |
+ var client = message.client; |
+ var streamId = message.params['streamId']; |
+ |
+ if (!_isValidStream(streamId)) { |
+ return _encodeError(message, _kInvalidStream); |
+ } |
+ if (!client.streams.contains(streamId)) { |
+ return _encodeError(message, _kStreamNotSubscribed); |
+ } |
+ client.streams.remove(streamId); |
+ if (!_isAnyClientSubscribed(streamId)) { |
+ _vmCancelStream(streamId); |
+ } |
+ |
+ var result = { 'type' : 'Success' }; |
+ return _encodeResult(message, result); |
+ } |
+ |
// TODO(johnmccutchan): Turn this into a command line tool that uses the |
// service library. |
- Future<String> _getCrashDump() async { |
+ Future<String> _getCrashDump(Message message) async { |
+ var client = message.client; |
final perIsolateRequests = [ |
// ?isolateId=<isolate id> will be appended to each of these requests. |
// Isolate information. |
@@ -165,13 +249,13 @@ class VMService extends MessageRouter { |
// Request VM. |
var getVM = Uri.parse('getVM'); |
var getVmResponse = JSON.decode( |
- await new Message.fromUri(getVM).sendToVM()); |
+ await new Message.fromUri(client, getVM).sendToVM()); |
responses[getVM.toString()] = getVmResponse['result']; |
// Request command line flags. |
var getFlagList = Uri.parse('getFlagList'); |
var getFlagListResponse = JSON.decode( |
- await new Message.fromUri(getFlagList).sendToVM()); |
+ await new Message.fromUri(client, getFlagList).sendToVM()); |
responses[getFlagList.toString()] = getFlagListResponse['result']; |
// Make requests to each isolate. |
@@ -185,7 +269,7 @@ class VMService extends MessageRouter { |
} |
// Dump the object id ring requests. |
var message = |
- new Message.forIsolate(Uri.parse('_dumpIdZone'), isolate); |
+ new Message.forIsolate(client, Uri.parse('_dumpIdZone'), isolate); |
var response = JSON.decode(await isolate.route(message)); |
// Insert getObject requests into responses map. |
for (var object in response['result']['objects']) { |
@@ -196,10 +280,7 @@ class VMService extends MessageRouter { |
} |
// Encode the entire crash dump. |
- return JSON.encode({ |
- 'id' : null, |
- 'result' : responses, |
- }); |
+ return _encodeResult(message, responses); |
} |
Future<String> route(Message message) { |
@@ -212,7 +293,13 @@ class VMService extends MessageRouter { |
return message.response; |
} |
if (message.method == '_getCrashDump') { |
- return _getCrashDump(); |
+ return _getCrashDump(message); |
+ } |
+ if (message.method == 'streamListen') { |
+ return _streamListen(message); |
+ } |
+ if (message.method == 'streamCancel') { |
+ return _streamCancel(message); |
} |
if (message.params['isolateId'] != null) { |
return runningIsolates.route(message); |
@@ -234,3 +321,7 @@ void _registerIsolate(int port_id, SendPort sp, String name) { |
void _onStart() native "VMService_OnStart"; |
void _onExit() native "VMService_OnExit"; |
+ |
+void _vmListenStream(String streamId) native "VMService_ListenStream"; |
+ |
+void _vmCancelStream(String streamId) native "VMService_CancelStream"; |