Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(598)

Unified Diff: lib/registry.dart

Issue 928663003: Add IsolateRunner as a helper around Isolate. (Closed) Base URL: https://github.com/dart-lang/isolate.git@master
Patch Set: Add .status. Created 5 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « lib/ports.dart ('k') | lib/runner.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: lib/registry.dart
diff --git a/lib/registry.dart b/lib/registry.dart
new file mode 100644
index 0000000000000000000000000000000000000000..9e87c11132aab1cd79b2eddb2dd9b89d839c19be
--- /dev/null
+++ b/lib/registry.dart
@@ -0,0 +1,497 @@
+// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+/**
+ * An isolate-compatible object registry and lookup service.
+ */
+library dart.pkg.isolate.registry;
+
+import "dart:async" show Future, Completer, TimeoutException;
+import "dart:isolate" show RawReceivePort, SendPort, Capability;
+import "dart:collection" show HashMap, HashSet;
+import "ports.dart";
+import "src/lists.dart";
+
+// Command tags.
+const int _ADD = 0;
+const int _REMOVE = 1;
+const int _ADD_TAGS = 2;
+const int _REMOVE_TAGS = 3;
+const int _GET_TAGS = 4;
+const int _FIND = 5;
+
+/**
+ * An isolate-compatible object registry.
+ *
+ * Objects can be stored as elements of a registry,
+ * have "tags" assigned to them, and be looked up by tag.
+ *
+ * A `Registry` object caches objects found using the [lookup]
+ * method, or added using [add], and returns the same object every time
+ * they are requested.
+ * A different `Registry` object that works on the same registry will not
+ * preserve the identity of elements
+ *
+ * It is recommended to only have one `Registry` object working on the
+ * same registry in each isolate.
+ *
+ * When the registry is shared accross isolates, both elements and tags must
+ * be sendable between the isolates.
+ * Between isolates spawned using [Isolate.spawn] from the same initial
+ * isolate, most objectes can be sent.
+ * Only simple objects can be sent between isolates originating from different
+ * [Isolate.spawnUri] calls.
+ */
+class Registry<T> {
+ // Most operations fail if they haven't received a response for this duration.
+ final Duration _timeout;
+
+ // Each `Registry` object has a cache of objects being controlled by it.
+ // The cache is stored in an [Expando], not on the object.
+ // This allows sending the `Registry` object through a `SendPort` without
+ // also copying the cache.
+ static Expando _caches = new Expando();
+
+ /**
+ * Port for sending command to the central registry mananger.
+ */
+ SendPort _commandPort;
+
+ /**
+ * Create a registry linked to a [RegistryManager] through [commandPort].
+ *
+ * In most cases, a registry is created by using the
+ * [RegistryManager.registry] getter.
+ *
+ * If a registry is used between isolates created using [Isolate.spawnUri],
+ * the `Registry` object can't be sent between the isolates directly.
+ * Instead the [RegistryManager.commandPort] port can be sent and a
+ * `Registry` created from the command port using this constructor.
+ *
+ * The optional [timeout] parameter can be set to the duration
+ * this registry should wait before assuming that an operation
+ * has failed.
+ */
+ Registry.fromPort(SendPort commandPort,
+ {Duration timeout: const Duration(seconds: 5)})
+ : _commandPort = commandPort,
+ _timeout = timeout;
+
+ _RegistryCache get _cache {
+ _RegistryCache cache = _caches[this];
+ if (cache != null) return cache;
+ cache = new _RegistryCache();
+ _caches[this] = cache;
+ return cache;
+ }
+
+ /**
+ * Check and get the identity of an element.
+ *
+ * Throws if [element] is not an element in the registry.
+ */
+ int _getId(T element) {
+ int id = _cache.id(element);
+ if (id == null) {
+ throw new StateError("Not an element: ${Error.safeToString(element)}");
+ }
+ return id;
+ }
+
+ /**
+ * Adds [element] to the registry with the provided tags.
+ *
+ * Fails if [element] is already in this registry.
+ * An object is already in the registry if it has been added using [add],
+ * or if it was returned by a [lookup] call on this registry object.
+ *
+ * Returns a capability that can be used with [remove] to remove
+ * the element from the registry again.
+ *
+ * The [tags] can be used to distinguish some of the elements
+ * from other elements. Any object can be used as a tag, as long as
+ * it preserves equality when sent through a [SendPort].
+ * This makes [Capability] objects a good choice for tags.
+ */
+ Future<Capability> add(T element, {Iterable tags}) {
+ _RegistryCache cache = _cache;
+ if (cache.contains(element)) {
+ return new Future<Capability>.sync(() {
+ throw new StateError(
+ "Object already in registry: ${Error.safeToString(element)}");
+ });
+ }
+ Completer completer = new Completer<Capability>();
+ SendPort port = singleCompletePort(completer, callback: (List response) {
+ assert(cache.isAdding(element));
+ int id = response[0];
+ Capability removeCapability = response[1];
+ cache.register(id, element);
+ return removeCapability;
+ }, timeout: _timeout, onTimeout: () {
+ cache.stopAdding(element);
+ throw new TimeoutException("Future not completed", _timeout);
+ });
+ if (tags != null) tags = tags.toList(growable: false);
+ cache.setAdding(element);
+ _commandPort.send(list4(_ADD, element, tags, port));
+ return completer.future;
+ }
+
+ /**
+ * Remove the element from the registry.
+ *
+ * Returns `true` if removing the element succeeded, or `false` if the
+ * elements wasn't in the registry, or if it couldn't be removed.
+ *
+ * The [removeCapability] must be the same capability returned by [add]
+ * when the object was added. If the capability is wrong, the
+ * object is not removed, and this function returns false.
+ */
+ Future<bool> remove(T element, Capability removeCapability) {
+ int id = _cache.id(element);
+ if (id == null) {
+ return new Future<bool>.value(false);
+ }
+ Completer completer = new Completer<bool>();
+ SendPort port = singleCompletePort(completer, callback: (bool result) {
+ _cache.remove(id);
+ return result;
+ }, timeout: _timeout);
+ _commandPort.send(list4(_REMOVE, id, removeCapability, port));
+ return completer.future;
+ }
+
+ /**
+ * Add tags to an object in the registry.
+ *
+ * Each element of the registry has a number of tags associated with
+ * it. A tag is either associated with an element or not, adding it more
+ * than once does not make any difference.
+ *
+ * Tags are compared using [Object.==] equality.
+ *
+ * Fails if any of the elements are not in the registry.
+ */
+ Future addTags(Iterable<T> elements, Iterable tags) {
+ List ids = elements.map(_getId).toList(growable: false);
+ return _addTags(ids, tags);
+ }
+
+ /**
+ * Remove tags from an object in the registry.
+ *
+ * After this operation, the [elements] will not be associated to the [tags].
+ * It doesn't matter whether the elements were associated with the tags
+ * before or not.
+ *
+ * Fails if any of the elements are not in the registry.
+ */
+ Future removeTags(Iterable<T> elements, Iterable tags) {
+ List ids = elements.map(_getId).toList(growable: false);
+ tags = tags.toList(growable: false);
+ Completer completer = new Completer();
+ SendPort port = singleCompletePort(completer, timeout: _timeout);
+ _commandPort.send(list4(_REMOVE_TAGS, ids, tags, port));
+ return completer.future;
+ }
+
+ Future _addTags(List<int> ids, Iterable tags) {
+ tags = tags.toList(growable: false);
+ Completer completer = new Completer();
+ SendPort port = singleCompletePort(completer, timeout: _timeout);
+ _commandPort.send(list4(_ADD_TAGS, ids, tags, port));
+ return completer.future;
+ }
+
+ /**
+ * Finds a number of elements that have all the desired [tags].
+ *
+ * If [tags] is omitted or empty, any element of the registry can be
+ * returned.
+ *
+ * If [max] is specified, it must be greater than zero.
+ * In that case, at most the first `max` results are returned,
+ * in whatever order the registry finds its results.
+ * Otherwise all matching elements are returned.
+ */
+ Future<List<T>> lookup({Iterable tags, int max}) {
+ if (max != null && max < 1) {
+ throw new RangeError.range(max, 1, null, "max");
+ }
+ if (tags != null) tags = tags.toList(growable: false);
+ Completer completer = new Completer<List<T>>();
+ SendPort port = singleCompletePort(completer, callback: (List response) {
+ // Response is even-length list of (id, element) pairs.
+ _RegistryCache cache = _cache;
+ int count = response.length ~/ 2;
+ List result = new List(count);
+ for (int i = 0; i < count; i++) {
+ int id = response[i * 2];
+ var element = response[i * 2 + 1];
+ element = cache.register(id, element);
+ result[i] = element;
+ }
+ return result;
+ }, timeout: _timeout);
+ _commandPort.send(list4(_FIND, tags, max, port));
+ return completer.future;
+ }
+}
+
+/**
+ * Isolate-local cache used by a [Registry].
+ *
+ * Maps between id-numbers and elements.
+ * An object is considered an element of the registry if it
+ */
+class _RegistryCache {
+ // Temporary marker until an object gets an id.
+ static const int _BEING_ADDED = -1;
+
+ final Map<int, Object> id2object = new HashMap();
+ final Map<Object, int> object2id = new HashMap.identity();
+
+ int id(Object object) {
+ int result = object2id[object];
+ if (result == _BEING_ADDED) return null;
+ return result;
+ }
+
+ Object operator[](int id) => id2object[id];
+
+ // Register a pair of id/object in the cache.
+ // if the id is already in the cache, just return the existing
+ // object.
+ Object register(int id, Object object) {
+ object = id2object.putIfAbsent(id, () {
+ object2id[object] = id;
+ return object;
+ });
+ return object;
+ }
+
+ bool isAdding(element) => object2id[element] == _BEING_ADDED;
+
+ void setAdding(element) {
+ assert(!contains(element));
+ object2id[element] = _BEING_ADDED;
+ }
+
+ void stopAdding(element) {
+ assert(object2id[element] == _BEING_ADDED);
+ object2id.remove(element);
+ }
+
+ void remove(int id) {
+ var element = id2object.remove(id);
+ if (element != null) {
+ object2id.remove(element);
+ }
+ }
+
+ bool contains(element) => object2id.containsKey(element);
+}
+
+/**
+ * The central repository used by distributed [Registry] instances.
+ */
+class RegistryManager {
+ final Duration _timeout;
+ int _nextId = 0;
+ RawReceivePort _commandPort;
+
+ /**
+ * Maps id to entry. Each entry contains the id, the element, its tags,
+ * and a capability required to remove it again.
+ */
+ Map<int, _RegistryEntry> _entries = new HashMap();
+ Map<Object, Set<int>> _tag2id = new HashMap();
+
+ /**
+ * Create a new registry managed by the created [RegistryManager].
+ *
+ * The optional [timeout] parameter can be set to the duration
+ * registry objects should wait before assuming that an operation
+ * has failed.
+ */
+ RegistryManager({timeout: const Duration(seconds: 5)})
+ : _timeout = timeout,
+ _commandPort = new RawReceivePort() {
+ _commandPort.handler = _handleCommand;
+ }
+
+ /**
+ * The command port receiving commands for the registry manager.
+ *
+ * Use this port with [Registry.fromPort] to link a registry to the
+ * manager in isolates where you can't send a [Registry] object directly.
+ */
+ SendPort get commandPort => _commandPort.sendPort;
+
+ /**
+ * Get a registry backed by this manager.
+ *
+ * This registry can be sent to other isolates created using
+ * [Isolate.spawn].
+ */
+ Registry get registry => new Registry.fromPort(_commandPort.sendPort,
+ timeout: _timeout);
+
+ // Used as argument to putIfAbsent.
+ static Set _createSet() => new HashSet();
+
+ void _handleCommand(List command) {
+ switch(command[0]) {
+ case _ADD:
+ _add(command[1], command[2], command[3]);
+ return;
+ case _REMOVE:
+ _remove(command[1], command[2], command[3]);
+ return;
+ case _ADD_TAGS:
+ _addTags(command[1], command[2], command[3]);
+ return;
+ case _REMOVE_TAGS:
+ _removeTags(command[1], command[2], command[3]);
+ return;
+ case _GET_TAGS:
+ _getTags(command[1], command[2]);
+ return;
+ case _FIND:
+ _find(command[1], command[2], command[3]);
+ return;
+ default:
+ throw new UnsupportedError("Unknown command: ${command[0]}");
+ }
+ }
+
+ void _add(Object object, List tags, SendPort replyPort) {
+ int id = ++_nextId;
+ var entry = new _RegistryEntry(id, object);
+ _entries[id] = entry;
+ if (tags != null) {
+ for (var tag in tags) {
+ entry.tags.add(tag);
+ _tag2id.putIfAbsent(tag, _createSet).add(id);
+ }
+ }
+ replyPort.send(list2(id, entry.removeCapability));
+ }
+
+ void _remove(int id, Capability removeCapability, SendPort replyPort) {
+ _RegistryEntry entry = _entries[id];
+ if (entry == null || entry.removeCapability != removeCapability) {
+ replyPort.send(false);
+ return;
+ }
+ _entries.remove(id);
+ for (var tag in entry.tags) {
+ _tag2id[tag].remove(id);
+ }
+ replyPort.send(true);
+ }
+
+ void _addTags(List<int> ids, List tags, SendPort replyPort) {
+ assert(tags != null);
+ assert(tags.isNotEmpty);
+ for (int id in ids) {
+ _RegistryEntry entry = _entries[id];
+ if (entry == null) continue; // Entry was removed.
+ entry.tags.addAll(tags);
+ for (var tag in tags) {
+ Set ids = _tag2id.putIfAbsent(tag, _createSet);
+ ids.add(id);
+ }
+ }
+ replyPort.send(null);
+ }
+
+ void _removeTags(List<int> ids, List tags, SendPort replyPort) {
+ assert(tags != null);
+ assert(tags.isNotEmpty);
+ for (int id in ids) {
+ _RegistryEntry entry = _entries[id];
+ if (entry == null) continue; // Object was removed.
+ entry.tags.removeAll(tags);
+ }
+ for (var tag in tags) {
+ Set tagIds = _tag2id[tag];
+ if (tagIds == null) continue;
+ tagIds.removeAll(ids);
+ }
+ replyPort.send(null);
+ }
+
+ void _getTags(int id, SendPort replyPort) {
+ _RegistryEntry entry = _entries[id];
+ if (entry != null) {
+ replyPort.send(entry.tags.toList(growable: false));
+ } else {
+ replyPort.send(const []);
+ }
+ }
+
+ Iterable<int> _findTaggedIds(List tags) {
+ var matchingFirstTagIds = _tag2id[tags[0]];
+ if (matchingFirstTagIds == null) {
+ return const [];
+ }
+ if (matchingFirstTagIds.isEmpty || tags.length == 1) {
+ return matchingFirstTagIds;
+ }
+ // Create new set, then start removing ids not also matched
+ // by other tags.
+ Set<int> matchingIds = matchingFirstTagIds.toSet();
+ for (int i = 1; i < tags.length; i++) {
+ var tagIds = _tag2id[tags[i]];
+ if (tagIds == null) return const [];
+ matchingIds.retainAll(tagIds);
+ if (matchingIds.isEmpty) break;
+ }
+ return matchingIds;
+ }
+
+ void _find(List tags, int max, SendPort replyPort) {
+ assert(max == null || max > 0);
+ List result = [];
+ if (tags == null || tags.isEmpty) {
+ var entries = _entries.values;
+ if (max != null) entries = entries.take(max);
+ for (_RegistryEntry entry in entries) {
+ result.add(entry.id);
+ result.add(entry.element);
+ }
+ replyPort.send(result);
+ return;
+ }
+ var matchingIds = _findTaggedIds(tags);
+ if (max == null) max = matchingIds.length; // All results.
+ for (var id in matchingIds) {
+ result.add(id);
+ result.add(_entries[id].element);
+ max--;
+ if (max == 0) break;
+ }
+ replyPort.send(result);
+ }
+
+ /**
+ * Shut down the registry service.
+ *
+ * After this, all [Registry] operations will time out.
+ */
+ void close() {
+ _commandPort.close();
+ }
+}
+
+/** Entry in [RegistryManager]. */
+class _RegistryEntry {
+ final int id;
+ final Object element;
+ final Set tags = new HashSet();
+ final Capability removeCapability = new Capability();
+ _RegistryEntry(this.id, this.element);
+}
« no previous file with comments | « lib/ports.dart ('k') | lib/runner.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698