Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(310)

Unified Diff: sdk/lib/vmservice/vmservice.dart

Issue 2980733003: Introduced support for external services registration in the ServiceProtocol (Closed)
Patch Set: Address comments Created 3 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « sdk/lib/vmservice/running_isolates.dart ('k') | sdk/lib/vmservice/vmservice_sources.gypi » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: sdk/lib/vmservice/vmservice.dart
diff --git a/sdk/lib/vmservice/vmservice.dart b/sdk/lib/vmservice/vmservice.dart
index 5b60ad814c53ea14d677ce05a1ea7e7dde012713..0de475ee143d47893289aa2bf5cad0e627260e79 100644
--- a/sdk/lib/vmservice/vmservice.dart
+++ b/sdk/lib/vmservice/vmservice.dart
@@ -20,6 +20,7 @@ part 'running_isolate.dart';
part 'running_isolates.dart';
part 'message.dart';
part 'message_router.dart';
+part 'named_lookup.dart';
final RawReceivePort isolateControlPort = new RawReceivePort();
final RawReceivePort scriptLoadPort = new RawReceivePort();
@@ -51,11 +52,27 @@ final Map<int, IsolateEmbedderData> isolateEmbedderData =
new Map<int, IsolateEmbedderData>();
// These must be kept in sync with the declarations in vm/json_stream.h.
+const kParseError = -32700;
+const kInvalidRequest = -32600;
+const kMethodNotFound = -32601;
const kInvalidParams = -32602;
const kInternalError = -32603;
+
+const kExtensionError = -32000;
+
const kFeatureDisabled = 100;
+const kCannotAddBreakpoint = 102;
const kStreamAlreadySubscribed = 103;
const kStreamNotSubscribed = 104;
+const kIsolateMustBeRunnable = 105;
+const kIsolateMustBePaused = 106;
+const kCannotResume = 107;
+const kIsolateIsReloading = 108;
+const kIsolateReloadBarred = 109;
+const kServiceAlreadyRegistered = 110;
+const kServiceDisappeared = 111;
+
+// Experimental (used in private rpcs).
const kFileSystemAlreadyExists = 1001;
const kFileSystemDoesNotExist = 1002;
const kFileDoesNotExist = 1003;
@@ -69,6 +86,8 @@ var _errorMessages = {
kFileSystemAlreadyExists: 'File system already exists',
kFileSystemDoesNotExist: 'File system does not exist',
kFileDoesNotExist: 'File does not exist',
+ kServiceAlreadyRegistered: 'Service already registered',
+ kServiceDisappeared: 'Service has disappeared',
};
String encodeRpcError(Message message, int code, {String details}) {
@@ -141,6 +160,9 @@ typedef Future<List<int>> ReadFileCallback(Uri path);
/// Called to list all files under some path.
typedef Future<List<Map<String, String>>> ListFilesCallback(Uri path);
+/// Called when we need information about the server.
+typedef Future<Uri> ServerInformamessage_routertionCallback();
+
/// Called when we need information about the server.
typedef Future<Uri> ServerInformationCallback();
@@ -165,8 +187,12 @@ class VMServiceEmbedderHooks {
class VMService extends MessageRouter {
static VMService _instance;
+ static const serviceNamespace = 's';
+
/// Collection of currently connected clients.
- final Set<Client> clients = new Set<Client>();
+ final NamedLookup<Client> clients =
+ new NamedLookup<Client>(prologue: serviceNamespace);
+ final IdGenerator _serviceRequests = new IdGenerator(prologue: 'sr');
/// Collection of currently running isolates.
RunningIsolates runningIsolates = new RunningIsolates();
@@ -178,16 +204,41 @@ class VMService extends MessageRouter {
void _addClient(Client client) {
assert(client.streams.isEmpty);
+ assert(client.services.isEmpty);
clients.add(client);
}
void _removeClient(Client client) {
+ final namespace = clients.keyOf(client);
clients.remove(client);
for (var streamId in client.streams) {
if (!_isAnyClientSubscribed(streamId)) {
_vmCancelStream(streamId);
}
}
+ for (var service in client.services.keys) {
+ _eventMessageHandler([
+ '_Service',
+ JSON.encode({
+ 'jsonrpc': '2.0',
+ 'method': 'streamNotify',
+ 'params': {
+ 'streamId': '_Service',
+ 'event': {
+ "type": "Event",
+ "kind": "ServiceUnregistered",
+ 'timestamp': new DateTime.now().millisecondsSinceEpoch,
+ 'service': service,
+ 'method': namespace + '.' + service,
+ }
+ }
+ })
+ ]);
+ }
+ // Complete all requestes as failed
+ for (var handle in client.serviceHandles.values) {
+ handle(null);
+ }
}
void _eventMessageHandler(List eventMessage) {
@@ -313,6 +364,9 @@ class VMService extends MessageRouter {
return false;
}
+ static const kServiceStream = '_Service';
+ static const serviceStreams = const [kServiceStream];
+
Future<String> _streamListen(Message message) async {
var client = message.client;
var streamId = message.params['streamId'];
@@ -321,13 +375,26 @@ class VMService extends MessageRouter {
return encodeRpcError(message, kStreamAlreadySubscribed);
}
if (!_isAnyClientSubscribed(streamId)) {
- if (!_vmListenStream(streamId)) {
+ if (!serviceStreams.contains(streamId) && !_vmListenStream(streamId)) {
return encodeRpcError(message, kInvalidParams,
details: "streamListen: invalid 'streamId' parameter: ${streamId}");
}
}
- client.streams.add(streamId);
+ // Some streams can generate events or side effects after registration
+ switch (streamId) {
+ case kServiceStream:
+ for (Client c in clients) {
+ if (c == client) continue;
+ for (String service in c.services.keys) {
+ _sendServiceRegisteredEvent(c, service, target: client);
+ }
+ }
+ ;
+ break;
+ }
+
+ client.streams.add(streamId);
return encodeSuccess(message);
}
@@ -339,13 +406,101 @@ class VMService extends MessageRouter {
return encodeRpcError(message, kStreamNotSubscribed);
}
client.streams.remove(streamId);
- if (!_isAnyClientSubscribed(streamId)) {
+ if (!serviceStreams.contains(streamId) &&
+ !_isAnyClientSubscribed(streamId)) {
_vmCancelStream(streamId);
}
return encodeSuccess(message);
}
+ static bool _hasNamespace(String method) =>
+ method.contains('.') &&
+ _getNamespace(method).startsWith(serviceNamespace);
+ static String _getNamespace(String method) => method.split('.').first;
+ static String _getMethod(String method) => method.split('.').last;
+
+ Future<String> _registerService(Message message) async {
+ final client = message.client;
+ final service = message.params['service'];
+ final alias = message.params['alias'];
+
+ if (service is! String || service == '') {
+ return encodeRpcError(message, kInvalidParams,
+ details: "registerService: invalid 'service' parameter: ${service}");
+ }
+ if (alias is! String || alias == '') {
+ return encodeRpcError(message, kInvalidParams,
+ details: "registerService: invalid 'alias' parameter: ${alias}");
+ }
+ if (client.services.containsKey(service)) {
+ return encodeRpcError(message, kServiceAlreadyRegistered);
+ }
+ client.services[service] = alias;
+
+ bool removed;
+ try {
+ // Do not send streaming events to the client which registers the service
+ removed = client.streams.remove(kServiceStream);
+ await _sendServiceRegisteredEvent(client, service);
+ } finally {
+ if (removed) client.streams.add(kServiceStream);
+ }
+
+ return encodeSuccess(message);
+ }
+
+ _sendServiceRegisteredEvent(Client client, String service,
+ {Client target}) async {
+ final namespace = clients.keyOf(client);
+ final alias = client.services[service];
+ final event = JSON.encode({
+ 'jsonrpc': '2.0',
+ 'method': 'streamNotify',
+ 'params': {
+ 'streamId': kServiceStream,
+ 'event': {
+ "type": "Event",
+ "kind": "ServiceRegistered",
+ 'timestamp': new DateTime.now().millisecondsSinceEpoch,
+ 'service': service,
+ 'method': namespace + '.' + service,
+ 'alias': alias
+ }
+ }
+ });
+ if (target == null) {
+ _eventMessageHandler([kServiceStream, event]);
+ } else {
+ target.post(event);
+ }
+ }
+
+ Future<String> _handleService(Message message) async {
+ final namespace = _getNamespace(message.method);
+ final method = _getMethod(message.method);
+ final client = clients[namespace];
+ if (client != null) {
+ if (client.services.containsKey(method)) {
+ final id = _serviceRequests.newId();
+ final oldId = message.serial;
+ final completer = new Completer<String>();
+ client.serviceHandles[id] = (Message m) {
+ if (m != null) {
+ completer.complete(JSON.encode(m.forwardToJson({'id': oldId})));
+ } else {
+ completer.complete(encodeRpcError(message, kServiceDisappeared));
+ }
+ };
+ client.post(
+ JSON.encode(message.forwardToJson({'id': id, 'method': method})));
+ return completer.future;
+ }
+ }
+ return encodeRpcError(message, kMethodNotFound,
+ details: "Unknown service: ${message.method}");
+ }
+
Future<String> _spawnUri(Message message) async {
var token = message.params['token'];
if (token == null) {
@@ -423,13 +578,13 @@ class VMService extends MessageRouter {
var message = new Message.forIsolate(client, request, isolate);
// Decode the JSON and and insert it into the map. The map key
// is the request Uri.
- var response = responseAsJson(await isolate.route(message));
+ var response = responseAsJson(await isolate.routeRequest(message));
responses[message.toUri().toString()] = response['result'];
}
// Dump the object id ring requests.
var message =
new Message.forIsolate(client, Uri.parse('_dumpIdZone'), isolate);
- var response = responseAsJson(await isolate.route(message));
+ var response = responseAsJson(await isolate.routeRequest(message));
// Insert getObject requests into responses map.
for (var object in response['result']['objects']) {
final requestUri =
@@ -442,30 +597,49 @@ class VMService extends MessageRouter {
return encodeResult(message, responses);
}
- Future<String> route(Message message) {
- if (message.completed) {
+ Future routeRequest(Message message) async {
+ try {
+ if (message.completed) {
+ return await message.response;
+ }
+ // TODO(turnidge): Update to json rpc. BEFORE SUBMIT.
+ if (message.method == '_getCrashDump') {
+ return await _getCrashDump(message);
+ }
+ if (message.method == 'streamListen') {
+ return await _streamListen(message);
+ }
+ if (message.method == 'streamCancel') {
+ return await _streamCancel(message);
+ }
+ if (message.method == '_registerService') {
+ return await _registerService(message);
+ }
+ if (message.method == '_spawnUri') {
+ return await _spawnUri(message);
+ }
+ if (devfs.shouldHandleMessage(message)) {
+ return await devfs.handleMessage(message);
+ }
+ if (_hasNamespace(message.method)) {
+ return await _handleService(message);
+ }
+ if (message.params['isolateId'] != null) {
+ return await runningIsolates.routeRequest(message);
+ }
+ return await message.sendToVM();
+ } catch (e, st) {
+ message.setErrorResponse(kInternalError, 'Unexpected exception:$e\n$st');
return message.response;
}
- // TODO(turnidge): Update to json rpc. BEFORE SUBMIT.
- if (message.method == '_getCrashDump') {
- return _getCrashDump(message);
- }
- if (message.method == 'streamListen') {
- return _streamListen(message);
- }
- if (message.method == 'streamCancel') {
- return _streamCancel(message);
- }
- if (message.method == '_spawnUri') {
- return _spawnUri(message);
- }
- if (devfs.shouldHandleMessage(message)) {
- return devfs.handleMessage(message);
- }
- if (message.params['isolateId'] != null) {
- return runningIsolates.route(message);
+ }
+
+ void routeResponse(message) {
+ final client = message.client;
+ if (client.serviceHandles.containsKey(message.serial)) {
+ client.serviceHandles.remove(message.serial)(message);
+ _serviceRequests.release(message.serial);
}
- return message.sendToVM();
}
}
« no previous file with comments | « sdk/lib/vmservice/running_isolates.dart ('k') | sdk/lib/vmservice/vmservice_sources.gypi » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698