Chromium Code Reviews| Index: sdk/lib/vmservice/vmservice.dart |
| diff --git a/sdk/lib/vmservice/vmservice.dart b/sdk/lib/vmservice/vmservice.dart |
| index 5b60ad814c53ea14d677ce05a1ea7e7dde012713..abdefc344e69bf392453b14752b60c0216b19ef8 100644 |
| --- a/sdk/lib/vmservice/vmservice.dart |
| +++ b/sdk/lib/vmservice/vmservice.dart |
| @@ -20,6 +20,7 @@ part 'running_isolate.dart'; |
| part 'running_isolates.dart'; |
| part 'message.dart'; |
| part 'message_router.dart'; |
| +part 'named_lookup.dart'; |
| final RawReceivePort isolateControlPort = new RawReceivePort(); |
| final RawReceivePort scriptLoadPort = new RawReceivePort(); |
| @@ -51,11 +52,27 @@ final Map<int, IsolateEmbedderData> isolateEmbedderData = |
| new Map<int, IsolateEmbedderData>(); |
| // These must be kept in sync with the declarations in vm/json_stream.h. |
| +const kParseError = -32700; |
| +const kInvalidRequest = -32600; |
| +const kMethodNotFound = -32601; |
| const kInvalidParams = -32602; |
| const kInternalError = -32603; |
| + |
| +const kExtensionError = -32000; |
| + |
| const kFeatureDisabled = 100; |
| +const kCannotAddBreakpoint = 102; |
| const kStreamAlreadySubscribed = 103; |
| const kStreamNotSubscribed = 104; |
| +const kIsolateMustBeRunnable = 105; |
| +const kIsolateMustBePaused = 106; |
| +const kCannotResume = 107; |
| +const kIsolateIsReloading = 108; |
| +const kIsolateReloadBarred = 109; |
| +const kServiceAlreadyRegistered = 110; |
| +const kServiceDisappeared = 111; |
| + |
| +// Experimental (used in private rpcs). |
| const kFileSystemAlreadyExists = 1001; |
| const kFileSystemDoesNotExist = 1002; |
| const kFileDoesNotExist = 1003; |
| @@ -69,6 +86,8 @@ var _errorMessages = { |
| kFileSystemAlreadyExists: 'File system already exists', |
| kFileSystemDoesNotExist: 'File system does not exist', |
| kFileDoesNotExist: 'File does not exist', |
| + kServiceAlreadyRegistered: 'Service already registered', |
| + kServiceDisappeared: 'Service is disappeared', |
|
bkonyi
2017/07/13 23:00:45
'Service has disappeared' might make more sense fo
cbernaschina
2017/07/13 23:32:36
Done.
|
| }; |
| String encodeRpcError(Message message, int code, {String details}) { |
| @@ -141,6 +160,9 @@ typedef Future<List<int>> ReadFileCallback(Uri path); |
| /// Called to list all files under some path. |
| typedef Future<List<Map<String, String>>> ListFilesCallback(Uri path); |
| +/// Called when we need information about the server. |
| +typedef Future<Uri> ServerInformamessage_routertionCallback(); |
| + |
| /// Called when we need information about the server. |
| typedef Future<Uri> ServerInformationCallback(); |
| @@ -165,8 +187,12 @@ class VMServiceEmbedderHooks { |
| class VMService extends MessageRouter { |
| static VMService _instance; |
| + static const serviceNamespace = 's'; |
| + |
| /// Collection of currently connected clients. |
| - final Set<Client> clients = new Set<Client>(); |
| + final NamedLookup<Client> clients = |
| + new NamedLookup<Client>(prologue: serviceNamespace); |
| + final IdGenerator _serviceRequests = new IdGenerator(prologue: 'sr'); |
| /// Collection of currently running isolates. |
| RunningIsolates runningIsolates = new RunningIsolates(); |
| @@ -178,16 +204,41 @@ class VMService extends MessageRouter { |
| void _addClient(Client client) { |
| assert(client.streams.isEmpty); |
| + assert(client.services.isEmpty); |
| clients.add(client); |
| } |
| void _removeClient(Client client) { |
| + final namespace = clients.keyOf(client); |
| clients.remove(client); |
| for (var streamId in client.streams) { |
| if (!_isAnyClientSubscribed(streamId)) { |
| _vmCancelStream(streamId); |
| } |
| } |
| + for (var service in client.services.keys) { |
| + _eventMessageHandler([ |
| + '_Service', |
| + JSON.encode({ |
| + 'jsonrpc': '2.0', |
| + 'method': 'streamNotify', |
| + 'params': { |
| + 'streamId': '_Service', |
| + 'event': { |
| + "type": "Event", |
| + "kind": "ServiceUnregistered", |
| + 'timestamp': new DateTime.now().millisecondsSinceEpoch, |
| + 'service': service, |
| + 'method': namespace + '.' + service, |
| + } |
| + } |
| + }) |
| + ]); |
| + } |
| + // Complete all requestes as failed |
| + for (var handle in client.serviceHandles.values) { |
| + handle(null); |
| + } |
| } |
| void _eventMessageHandler(List eventMessage) { |
| @@ -313,6 +364,9 @@ class VMService extends MessageRouter { |
| return false; |
| } |
| + static const kServiceStream = '_Service'; |
| + static const serviceStreams = const [kServiceStream]; |
| + |
| Future<String> _streamListen(Message message) async { |
| var client = message.client; |
| var streamId = message.params['streamId']; |
| @@ -321,13 +375,26 @@ class VMService extends MessageRouter { |
| return encodeRpcError(message, kStreamAlreadySubscribed); |
| } |
| if (!_isAnyClientSubscribed(streamId)) { |
| - if (!_vmListenStream(streamId)) { |
| + if (!serviceStreams.contains(streamId) && !_vmListenStream(streamId)) { |
| return encodeRpcError(message, kInvalidParams, |
| details: "streamListen: invalid 'streamId' parameter: ${streamId}"); |
| } |
| } |
| - client.streams.add(streamId); |
| + // Some streams can generate events or side effects after registration |
| + switch (streamId) { |
| + case kServiceStream: |
| + for (Client c in clients) { |
| + if (c == client) continue; |
| + for (String service in c.services.keys) { |
| + _sendServiceRegisteredEvent(c, service, target: client); |
| + } |
| + } |
| + ; |
| + break; |
| + } |
| + |
| + client.streams.add(streamId); |
| return encodeSuccess(message); |
| } |
| @@ -339,13 +406,101 @@ class VMService extends MessageRouter { |
| return encodeRpcError(message, kStreamNotSubscribed); |
| } |
| client.streams.remove(streamId); |
| - if (!_isAnyClientSubscribed(streamId)) { |
| + if (!serviceStreams.contains(streamId) && |
| + !_isAnyClientSubscribed(streamId)) { |
| _vmCancelStream(streamId); |
| } |
| return encodeSuccess(message); |
| } |
| + static bool _hasNamespace(String method) => |
| + method.contains('.') && |
| + _getNamespace(method).startsWith(serviceNamespace); |
| + static String _getNamespace(String method) => method.split('.').first; |
| + static String _getMethod(String method) => method.split('.').last; |
| + |
| + Future<String> _registerService(Message message) async { |
| + final client = message.client; |
| + final service = message.params['service']; |
| + final alias = message.params['alias']; |
| + |
| + if (service is! String || service == '') { |
| + return encodeRpcError(message, kInvalidParams, |
| + details: "registerService: invalid 'service' parameter: ${service}"); |
| + } |
| + if (alias is! String || alias == '') { |
| + return encodeRpcError(message, kInvalidParams, |
| + details: "registerService: invalid 'alias' parameter: ${alias}"); |
| + } |
| + if (client.services.containsKey(service)) { |
| + return encodeRpcError(message, kServiceAlreadyRegistered); |
| + } |
| + client.services[service] = alias; |
| + |
| + bool removed; |
| + try { |
| + // Do not send streaming events to the client which registers the service |
| + removed = client.streams.remove(kServiceStream); |
| + await _sendServiceRegisteredEvent(client, service); |
| + } finally { |
| + if (removed) client.streams.add(kServiceStream); |
| + } |
| + |
| + return encodeSuccess(message); |
| + } |
| + |
| + _sendServiceRegisteredEvent(Client client, String service, |
| + {Client target}) async { |
| + final namespace = clients.keyOf(client); |
| + final alias = client.services[service]; |
| + final event = JSON.encode({ |
| + 'jsonrpc': '2.0', |
| + 'method': 'streamNotify', |
| + 'params': { |
| + 'streamId': kServiceStream, |
| + 'event': { |
| + "type": "Event", |
| + "kind": "ServiceRegistered", |
| + 'timestamp': new DateTime.now().millisecondsSinceEpoch, |
| + 'service': service, |
| + 'method': namespace + '.' + service, |
| + 'alias': alias |
| + } |
| + } |
| + }); |
| + if (target == null) { |
| + _eventMessageHandler([kServiceStream, event]); |
| + } else { |
| + target.post(event); |
| + } |
| + } |
| + |
| + Future<String> _handleService(Message message) async { |
| + final namespace = _getNamespace(message.method); |
| + final method = _getMethod(message.method); |
| + final client = clients[namespace]; |
| + if (client != null) { |
| + if (client.services.containsKey(method)) { |
| + final id = _serviceRequests.newId(); |
| + final oldId = message.serial; |
| + final completer = new Completer<String>(); |
| + client.serviceHandles[id] = (Message m) { |
| + if (m != null) { |
| + completer.complete(JSON.encode(m.forwardToJson({'id': oldId}))); |
| + } else { |
| + completer.complete(encodeRpcError(message, kServiceDisappeared)); |
| + } |
| + }; |
| + client.post( |
| + JSON.encode(message.forwardToJson({'id': id, 'method': method}))); |
| + return completer.future; |
| + } |
| + } |
| + return encodeRpcError(message, kMethodNotFound, |
| + details: "Unknown service: ${message.method}"); |
| + } |
| + |
| Future<String> _spawnUri(Message message) async { |
| var token = message.params['token']; |
| if (token == null) { |
| @@ -423,13 +578,13 @@ class VMService extends MessageRouter { |
| var message = new Message.forIsolate(client, request, isolate); |
| // Decode the JSON and and insert it into the map. The map key |
| // is the request Uri. |
| - var response = responseAsJson(await isolate.route(message)); |
| + var response = responseAsJson(await isolate.routeRequest(message)); |
| responses[message.toUri().toString()] = response['result']; |
| } |
| // Dump the object id ring requests. |
| var message = |
| new Message.forIsolate(client, Uri.parse('_dumpIdZone'), isolate); |
| - var response = responseAsJson(await isolate.route(message)); |
| + var response = responseAsJson(await isolate.routeRequest(message)); |
| // Insert getObject requests into responses map. |
| for (var object in response['result']['objects']) { |
| final requestUri = |
| @@ -442,30 +597,49 @@ class VMService extends MessageRouter { |
| return encodeResult(message, responses); |
| } |
| - Future<String> route(Message message) { |
| - if (message.completed) { |
| + Future routeRequest(Message message) async { |
| + try { |
| + if (message.completed) { |
| + return await message.response; |
| + } |
| + // TODO(turnidge): Update to json rpc. BEFORE SUBMIT. |
| + if (message.method == '_getCrashDump') { |
| + return await _getCrashDump(message); |
| + } |
| + if (message.method == 'streamListen') { |
| + return await _streamListen(message); |
| + } |
| + if (message.method == 'streamCancel') { |
| + return await _streamCancel(message); |
| + } |
| + if (message.method == '_registerService') { |
| + return await _registerService(message); |
| + } |
| + if (message.method == '_spawnUri') { |
| + return await _spawnUri(message); |
| + } |
| + if (devfs.shouldHandleMessage(message)) { |
| + return await devfs.handleMessage(message); |
| + } |
| + if (_hasNamespace(message.method)) { |
| + return await _handleService(message); |
| + } |
| + if (message.params['isolateId'] != null) { |
| + return await runningIsolates.routeRequest(message); |
| + } |
| + return await message.sendToVM(); |
| + } catch (e, st) { |
| + message.setErrorResponse(kInternalError, 'Unexpected exception:$e\n$st'); |
| return message.response; |
| } |
| - // TODO(turnidge): Update to json rpc. BEFORE SUBMIT. |
| - if (message.method == '_getCrashDump') { |
| - return _getCrashDump(message); |
| - } |
| - if (message.method == 'streamListen') { |
| - return _streamListen(message); |
| - } |
| - if (message.method == 'streamCancel') { |
| - return _streamCancel(message); |
| - } |
| - if (message.method == '_spawnUri') { |
| - return _spawnUri(message); |
| - } |
| - if (devfs.shouldHandleMessage(message)) { |
| - return devfs.handleMessage(message); |
| - } |
| - if (message.params['isolateId'] != null) { |
| - return runningIsolates.route(message); |
| + } |
| + |
| + void routeResponse(message) { |
| + final client = message.client; |
| + if (client.serviceHandles.containsKey(message.serial)) { |
| + client.serviceHandles.remove(message.serial)(message); |
| + _serviceRequests.release(message.serial); |
| } |
| - return message.sendToVM(); |
| } |
| } |