Index: sdk/lib/vmservice/vmservice.dart |
diff --git a/sdk/lib/vmservice/vmservice.dart b/sdk/lib/vmservice/vmservice.dart |
index 5b60ad814c53ea14d677ce05a1ea7e7dde012713..0de475ee143d47893289aa2bf5cad0e627260e79 100644 |
--- a/sdk/lib/vmservice/vmservice.dart |
+++ b/sdk/lib/vmservice/vmservice.dart |
@@ -20,6 +20,7 @@ part 'running_isolate.dart'; |
part 'running_isolates.dart'; |
part 'message.dart'; |
part 'message_router.dart'; |
+part 'named_lookup.dart'; |
final RawReceivePort isolateControlPort = new RawReceivePort(); |
final RawReceivePort scriptLoadPort = new RawReceivePort(); |
@@ -51,11 +52,27 @@ final Map<int, IsolateEmbedderData> isolateEmbedderData = |
new Map<int, IsolateEmbedderData>(); |
// These must be kept in sync with the declarations in vm/json_stream.h. |
+const kParseError = -32700; |
+const kInvalidRequest = -32600; |
+const kMethodNotFound = -32601; |
const kInvalidParams = -32602; |
const kInternalError = -32603; |
+ |
+const kExtensionError = -32000; |
+ |
const kFeatureDisabled = 100; |
+const kCannotAddBreakpoint = 102; |
const kStreamAlreadySubscribed = 103; |
const kStreamNotSubscribed = 104; |
+const kIsolateMustBeRunnable = 105; |
+const kIsolateMustBePaused = 106; |
+const kCannotResume = 107; |
+const kIsolateIsReloading = 108; |
+const kIsolateReloadBarred = 109; |
+const kServiceAlreadyRegistered = 110; |
+const kServiceDisappeared = 111; |
+ |
+// Experimental (used in private rpcs). |
const kFileSystemAlreadyExists = 1001; |
const kFileSystemDoesNotExist = 1002; |
const kFileDoesNotExist = 1003; |
@@ -69,6 +86,8 @@ var _errorMessages = { |
kFileSystemAlreadyExists: 'File system already exists', |
kFileSystemDoesNotExist: 'File system does not exist', |
kFileDoesNotExist: 'File does not exist', |
+ kServiceAlreadyRegistered: 'Service already registered', |
+ kServiceDisappeared: 'Service has disappeared', |
}; |
String encodeRpcError(Message message, int code, {String details}) { |
@@ -141,6 +160,9 @@ typedef Future<List<int>> ReadFileCallback(Uri path); |
/// Called to list all files under some path. |
typedef Future<List<Map<String, String>>> ListFilesCallback(Uri path); |
+/// Called when we need information about the server. |
+typedef Future<Uri> ServerInformamessage_routertionCallback(); |
+ |
/// Called when we need information about the server. |
typedef Future<Uri> ServerInformationCallback(); |
@@ -165,8 +187,12 @@ class VMServiceEmbedderHooks { |
class VMService extends MessageRouter { |
static VMService _instance; |
+ static const serviceNamespace = 's'; |
+ |
/// Collection of currently connected clients. |
- final Set<Client> clients = new Set<Client>(); |
+ final NamedLookup<Client> clients = |
+ new NamedLookup<Client>(prologue: serviceNamespace); |
+ final IdGenerator _serviceRequests = new IdGenerator(prologue: 'sr'); |
/// Collection of currently running isolates. |
RunningIsolates runningIsolates = new RunningIsolates(); |
@@ -178,16 +204,41 @@ class VMService extends MessageRouter { |
void _addClient(Client client) { |
assert(client.streams.isEmpty); |
+ assert(client.services.isEmpty); |
clients.add(client); |
} |
void _removeClient(Client client) { |
+ final namespace = clients.keyOf(client); |
clients.remove(client); |
for (var streamId in client.streams) { |
if (!_isAnyClientSubscribed(streamId)) { |
_vmCancelStream(streamId); |
} |
} |
+ for (var service in client.services.keys) { |
+ _eventMessageHandler([ |
+ '_Service', |
+ JSON.encode({ |
+ 'jsonrpc': '2.0', |
+ 'method': 'streamNotify', |
+ 'params': { |
+ 'streamId': '_Service', |
+ 'event': { |
+ "type": "Event", |
+ "kind": "ServiceUnregistered", |
+ 'timestamp': new DateTime.now().millisecondsSinceEpoch, |
+ 'service': service, |
+ 'method': namespace + '.' + service, |
+ } |
+ } |
+ }) |
+ ]); |
+ } |
+ // Complete all requestes as failed |
+ for (var handle in client.serviceHandles.values) { |
+ handle(null); |
+ } |
} |
void _eventMessageHandler(List eventMessage) { |
@@ -313,6 +364,9 @@ class VMService extends MessageRouter { |
return false; |
} |
+ static const kServiceStream = '_Service'; |
+ static const serviceStreams = const [kServiceStream]; |
+ |
Future<String> _streamListen(Message message) async { |
var client = message.client; |
var streamId = message.params['streamId']; |
@@ -321,13 +375,26 @@ class VMService extends MessageRouter { |
return encodeRpcError(message, kStreamAlreadySubscribed); |
} |
if (!_isAnyClientSubscribed(streamId)) { |
- if (!_vmListenStream(streamId)) { |
+ if (!serviceStreams.contains(streamId) && !_vmListenStream(streamId)) { |
return encodeRpcError(message, kInvalidParams, |
details: "streamListen: invalid 'streamId' parameter: ${streamId}"); |
} |
} |
- client.streams.add(streamId); |
+ // Some streams can generate events or side effects after registration |
+ switch (streamId) { |
+ case kServiceStream: |
+ for (Client c in clients) { |
+ if (c == client) continue; |
+ for (String service in c.services.keys) { |
+ _sendServiceRegisteredEvent(c, service, target: client); |
+ } |
+ } |
+ ; |
+ break; |
+ } |
+ |
+ client.streams.add(streamId); |
return encodeSuccess(message); |
} |
@@ -339,13 +406,101 @@ class VMService extends MessageRouter { |
return encodeRpcError(message, kStreamNotSubscribed); |
} |
client.streams.remove(streamId); |
- if (!_isAnyClientSubscribed(streamId)) { |
+ if (!serviceStreams.contains(streamId) && |
+ !_isAnyClientSubscribed(streamId)) { |
_vmCancelStream(streamId); |
} |
return encodeSuccess(message); |
} |
+ static bool _hasNamespace(String method) => |
+ method.contains('.') && |
+ _getNamespace(method).startsWith(serviceNamespace); |
+ static String _getNamespace(String method) => method.split('.').first; |
+ static String _getMethod(String method) => method.split('.').last; |
+ |
+ Future<String> _registerService(Message message) async { |
+ final client = message.client; |
+ final service = message.params['service']; |
+ final alias = message.params['alias']; |
+ |
+ if (service is! String || service == '') { |
+ return encodeRpcError(message, kInvalidParams, |
+ details: "registerService: invalid 'service' parameter: ${service}"); |
+ } |
+ if (alias is! String || alias == '') { |
+ return encodeRpcError(message, kInvalidParams, |
+ details: "registerService: invalid 'alias' parameter: ${alias}"); |
+ } |
+ if (client.services.containsKey(service)) { |
+ return encodeRpcError(message, kServiceAlreadyRegistered); |
+ } |
+ client.services[service] = alias; |
+ |
+ bool removed; |
+ try { |
+ // Do not send streaming events to the client which registers the service |
+ removed = client.streams.remove(kServiceStream); |
+ await _sendServiceRegisteredEvent(client, service); |
+ } finally { |
+ if (removed) client.streams.add(kServiceStream); |
+ } |
+ |
+ return encodeSuccess(message); |
+ } |
+ |
+ _sendServiceRegisteredEvent(Client client, String service, |
+ {Client target}) async { |
+ final namespace = clients.keyOf(client); |
+ final alias = client.services[service]; |
+ final event = JSON.encode({ |
+ 'jsonrpc': '2.0', |
+ 'method': 'streamNotify', |
+ 'params': { |
+ 'streamId': kServiceStream, |
+ 'event': { |
+ "type": "Event", |
+ "kind": "ServiceRegistered", |
+ 'timestamp': new DateTime.now().millisecondsSinceEpoch, |
+ 'service': service, |
+ 'method': namespace + '.' + service, |
+ 'alias': alias |
+ } |
+ } |
+ }); |
+ if (target == null) { |
+ _eventMessageHandler([kServiceStream, event]); |
+ } else { |
+ target.post(event); |
+ } |
+ } |
+ |
+ Future<String> _handleService(Message message) async { |
+ final namespace = _getNamespace(message.method); |
+ final method = _getMethod(message.method); |
+ final client = clients[namespace]; |
+ if (client != null) { |
+ if (client.services.containsKey(method)) { |
+ final id = _serviceRequests.newId(); |
+ final oldId = message.serial; |
+ final completer = new Completer<String>(); |
+ client.serviceHandles[id] = (Message m) { |
+ if (m != null) { |
+ completer.complete(JSON.encode(m.forwardToJson({'id': oldId}))); |
+ } else { |
+ completer.complete(encodeRpcError(message, kServiceDisappeared)); |
+ } |
+ }; |
+ client.post( |
+ JSON.encode(message.forwardToJson({'id': id, 'method': method}))); |
+ return completer.future; |
+ } |
+ } |
+ return encodeRpcError(message, kMethodNotFound, |
+ details: "Unknown service: ${message.method}"); |
+ } |
+ |
Future<String> _spawnUri(Message message) async { |
var token = message.params['token']; |
if (token == null) { |
@@ -423,13 +578,13 @@ class VMService extends MessageRouter { |
var message = new Message.forIsolate(client, request, isolate); |
// Decode the JSON and and insert it into the map. The map key |
// is the request Uri. |
- var response = responseAsJson(await isolate.route(message)); |
+ var response = responseAsJson(await isolate.routeRequest(message)); |
responses[message.toUri().toString()] = response['result']; |
} |
// Dump the object id ring requests. |
var message = |
new Message.forIsolate(client, Uri.parse('_dumpIdZone'), isolate); |
- var response = responseAsJson(await isolate.route(message)); |
+ var response = responseAsJson(await isolate.routeRequest(message)); |
// Insert getObject requests into responses map. |
for (var object in response['result']['objects']) { |
final requestUri = |
@@ -442,30 +597,49 @@ class VMService extends MessageRouter { |
return encodeResult(message, responses); |
} |
- Future<String> route(Message message) { |
- if (message.completed) { |
+ Future routeRequest(Message message) async { |
+ try { |
+ if (message.completed) { |
+ return await message.response; |
+ } |
+ // TODO(turnidge): Update to json rpc. BEFORE SUBMIT. |
+ if (message.method == '_getCrashDump') { |
+ return await _getCrashDump(message); |
+ } |
+ if (message.method == 'streamListen') { |
+ return await _streamListen(message); |
+ } |
+ if (message.method == 'streamCancel') { |
+ return await _streamCancel(message); |
+ } |
+ if (message.method == '_registerService') { |
+ return await _registerService(message); |
+ } |
+ if (message.method == '_spawnUri') { |
+ return await _spawnUri(message); |
+ } |
+ if (devfs.shouldHandleMessage(message)) { |
+ return await devfs.handleMessage(message); |
+ } |
+ if (_hasNamespace(message.method)) { |
+ return await _handleService(message); |
+ } |
+ if (message.params['isolateId'] != null) { |
+ return await runningIsolates.routeRequest(message); |
+ } |
+ return await message.sendToVM(); |
+ } catch (e, st) { |
+ message.setErrorResponse(kInternalError, 'Unexpected exception:$e\n$st'); |
return message.response; |
} |
- // TODO(turnidge): Update to json rpc. BEFORE SUBMIT. |
- if (message.method == '_getCrashDump') { |
- return _getCrashDump(message); |
- } |
- if (message.method == 'streamListen') { |
- return _streamListen(message); |
- } |
- if (message.method == 'streamCancel') { |
- return _streamCancel(message); |
- } |
- if (message.method == '_spawnUri') { |
- return _spawnUri(message); |
- } |
- if (devfs.shouldHandleMessage(message)) { |
- return devfs.handleMessage(message); |
- } |
- if (message.params['isolateId'] != null) { |
- return runningIsolates.route(message); |
+ } |
+ |
+ void routeResponse(message) { |
+ final client = message.client; |
+ if (client.serviceHandles.containsKey(message.serial)) { |
+ client.serviceHandles.remove(message.serial)(message); |
+ _serviceRequests.release(message.serial); |
} |
- return message.sendToVM(); |
} |
} |