Index: lib/registry.dart |
diff --git a/lib/registry.dart b/lib/registry.dart |
new file mode 100644 |
index 0000000000000000000000000000000000000000..9e87c11132aab1cd79b2eddb2dd9b89d839c19be |
--- /dev/null |
+++ b/lib/registry.dart |
@@ -0,0 +1,497 @@ |
+// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
+// for details. All rights reserved. Use of this source code is governed by a |
+// BSD-style license that can be found in the LICENSE file. |
+ |
+/** |
+ * An isolate-compatible object registry and lookup service. |
+ */ |
+library dart.pkg.isolate.registry; |
+ |
+import "dart:async" show Future, Completer, TimeoutException; |
+import "dart:isolate" show RawReceivePort, SendPort, Capability; |
+import "dart:collection" show HashMap, HashSet; |
+import "ports.dart"; |
+import "src/lists.dart"; |
+ |
+// Command tags. |
+const int _ADD = 0; |
+const int _REMOVE = 1; |
+const int _ADD_TAGS = 2; |
+const int _REMOVE_TAGS = 3; |
+const int _GET_TAGS = 4; |
+const int _FIND = 5; |
+ |
+/** |
+ * An isolate-compatible object registry. |
+ * |
+ * Objects can be stored as elements of a registry, |
+ * have "tags" assigned to them, and be looked up by tag. |
+ * |
+ * A `Registry` object caches objects found using the [lookup] |
+ * method, or added using [add], and returns the same object every time |
+ * they are requested. |
+ * A different `Registry` object that works on the same registry will not |
+ * preserve the identity of elements |
+ * |
+ * It is recommended to only have one `Registry` object working on the |
+ * same registry in each isolate. |
+ * |
+ * When the registry is shared accross isolates, both elements and tags must |
+ * be sendable between the isolates. |
+ * Between isolates spawned using [Isolate.spawn] from the same initial |
+ * isolate, most objectes can be sent. |
+ * Only simple objects can be sent between isolates originating from different |
+ * [Isolate.spawnUri] calls. |
+ */ |
+class Registry<T> { |
+ // Most operations fail if they haven't received a response for this duration. |
+ final Duration _timeout; |
+ |
+ // Each `Registry` object has a cache of objects being controlled by it. |
+ // The cache is stored in an [Expando], not on the object. |
+ // This allows sending the `Registry` object through a `SendPort` without |
+ // also copying the cache. |
+ static Expando _caches = new Expando(); |
+ |
+ /** |
+ * Port for sending command to the central registry mananger. |
+ */ |
+ SendPort _commandPort; |
+ |
+ /** |
+ * Create a registry linked to a [RegistryManager] through [commandPort]. |
+ * |
+ * In most cases, a registry is created by using the |
+ * [RegistryManager.registry] getter. |
+ * |
+ * If a registry is used between isolates created using [Isolate.spawnUri], |
+ * the `Registry` object can't be sent between the isolates directly. |
+ * Instead the [RegistryManager.commandPort] port can be sent and a |
+ * `Registry` created from the command port using this constructor. |
+ * |
+ * The optional [timeout] parameter can be set to the duration |
+ * this registry should wait before assuming that an operation |
+ * has failed. |
+ */ |
+ Registry.fromPort(SendPort commandPort, |
+ {Duration timeout: const Duration(seconds: 5)}) |
+ : _commandPort = commandPort, |
+ _timeout = timeout; |
+ |
+ _RegistryCache get _cache { |
+ _RegistryCache cache = _caches[this]; |
+ if (cache != null) return cache; |
+ cache = new _RegistryCache(); |
+ _caches[this] = cache; |
+ return cache; |
+ } |
+ |
+ /** |
+ * Check and get the identity of an element. |
+ * |
+ * Throws if [element] is not an element in the registry. |
+ */ |
+ int _getId(T element) { |
+ int id = _cache.id(element); |
+ if (id == null) { |
+ throw new StateError("Not an element: ${Error.safeToString(element)}"); |
+ } |
+ return id; |
+ } |
+ |
+ /** |
+ * Adds [element] to the registry with the provided tags. |
+ * |
+ * Fails if [element] is already in this registry. |
+ * An object is already in the registry if it has been added using [add], |
+ * or if it was returned by a [lookup] call on this registry object. |
+ * |
+ * Returns a capability that can be used with [remove] to remove |
+ * the element from the registry again. |
+ * |
+ * The [tags] can be used to distinguish some of the elements |
+ * from other elements. Any object can be used as a tag, as long as |
+ * it preserves equality when sent through a [SendPort]. |
+ * This makes [Capability] objects a good choice for tags. |
+ */ |
+ Future<Capability> add(T element, {Iterable tags}) { |
+ _RegistryCache cache = _cache; |
+ if (cache.contains(element)) { |
+ return new Future<Capability>.sync(() { |
+ throw new StateError( |
+ "Object already in registry: ${Error.safeToString(element)}"); |
+ }); |
+ } |
+ Completer completer = new Completer<Capability>(); |
+ SendPort port = singleCompletePort(completer, callback: (List response) { |
+ assert(cache.isAdding(element)); |
+ int id = response[0]; |
+ Capability removeCapability = response[1]; |
+ cache.register(id, element); |
+ return removeCapability; |
+ }, timeout: _timeout, onTimeout: () { |
+ cache.stopAdding(element); |
+ throw new TimeoutException("Future not completed", _timeout); |
+ }); |
+ if (tags != null) tags = tags.toList(growable: false); |
+ cache.setAdding(element); |
+ _commandPort.send(list4(_ADD, element, tags, port)); |
+ return completer.future; |
+ } |
+ |
+ /** |
+ * Remove the element from the registry. |
+ * |
+ * Returns `true` if removing the element succeeded, or `false` if the |
+ * elements wasn't in the registry, or if it couldn't be removed. |
+ * |
+ * The [removeCapability] must be the same capability returned by [add] |
+ * when the object was added. If the capability is wrong, the |
+ * object is not removed, and this function returns false. |
+ */ |
+ Future<bool> remove(T element, Capability removeCapability) { |
+ int id = _cache.id(element); |
+ if (id == null) { |
+ return new Future<bool>.value(false); |
+ } |
+ Completer completer = new Completer<bool>(); |
+ SendPort port = singleCompletePort(completer, callback: (bool result) { |
+ _cache.remove(id); |
+ return result; |
+ }, timeout: _timeout); |
+ _commandPort.send(list4(_REMOVE, id, removeCapability, port)); |
+ return completer.future; |
+ } |
+ |
+ /** |
+ * Add tags to an object in the registry. |
+ * |
+ * Each element of the registry has a number of tags associated with |
+ * it. A tag is either associated with an element or not, adding it more |
+ * than once does not make any difference. |
+ * |
+ * Tags are compared using [Object.==] equality. |
+ * |
+ * Fails if any of the elements are not in the registry. |
+ */ |
+ Future addTags(Iterable<T> elements, Iterable tags) { |
+ List ids = elements.map(_getId).toList(growable: false); |
+ return _addTags(ids, tags); |
+ } |
+ |
+ /** |
+ * Remove tags from an object in the registry. |
+ * |
+ * After this operation, the [elements] will not be associated to the [tags]. |
+ * It doesn't matter whether the elements were associated with the tags |
+ * before or not. |
+ * |
+ * Fails if any of the elements are not in the registry. |
+ */ |
+ Future removeTags(Iterable<T> elements, Iterable tags) { |
+ List ids = elements.map(_getId).toList(growable: false); |
+ tags = tags.toList(growable: false); |
+ Completer completer = new Completer(); |
+ SendPort port = singleCompletePort(completer, timeout: _timeout); |
+ _commandPort.send(list4(_REMOVE_TAGS, ids, tags, port)); |
+ return completer.future; |
+ } |
+ |
+ Future _addTags(List<int> ids, Iterable tags) { |
+ tags = tags.toList(growable: false); |
+ Completer completer = new Completer(); |
+ SendPort port = singleCompletePort(completer, timeout: _timeout); |
+ _commandPort.send(list4(_ADD_TAGS, ids, tags, port)); |
+ return completer.future; |
+ } |
+ |
+ /** |
+ * Finds a number of elements that have all the desired [tags]. |
+ * |
+ * If [tags] is omitted or empty, any element of the registry can be |
+ * returned. |
+ * |
+ * If [max] is specified, it must be greater than zero. |
+ * In that case, at most the first `max` results are returned, |
+ * in whatever order the registry finds its results. |
+ * Otherwise all matching elements are returned. |
+ */ |
+ Future<List<T>> lookup({Iterable tags, int max}) { |
+ if (max != null && max < 1) { |
+ throw new RangeError.range(max, 1, null, "max"); |
+ } |
+ if (tags != null) tags = tags.toList(growable: false); |
+ Completer completer = new Completer<List<T>>(); |
+ SendPort port = singleCompletePort(completer, callback: (List response) { |
+ // Response is even-length list of (id, element) pairs. |
+ _RegistryCache cache = _cache; |
+ int count = response.length ~/ 2; |
+ List result = new List(count); |
+ for (int i = 0; i < count; i++) { |
+ int id = response[i * 2]; |
+ var element = response[i * 2 + 1]; |
+ element = cache.register(id, element); |
+ result[i] = element; |
+ } |
+ return result; |
+ }, timeout: _timeout); |
+ _commandPort.send(list4(_FIND, tags, max, port)); |
+ return completer.future; |
+ } |
+} |
+ |
+/** |
+ * Isolate-local cache used by a [Registry]. |
+ * |
+ * Maps between id-numbers and elements. |
+ * An object is considered an element of the registry if it |
+ */ |
+class _RegistryCache { |
+ // Temporary marker until an object gets an id. |
+ static const int _BEING_ADDED = -1; |
+ |
+ final Map<int, Object> id2object = new HashMap(); |
+ final Map<Object, int> object2id = new HashMap.identity(); |
+ |
+ int id(Object object) { |
+ int result = object2id[object]; |
+ if (result == _BEING_ADDED) return null; |
+ return result; |
+ } |
+ |
+ Object operator[](int id) => id2object[id]; |
+ |
+ // Register a pair of id/object in the cache. |
+ // if the id is already in the cache, just return the existing |
+ // object. |
+ Object register(int id, Object object) { |
+ object = id2object.putIfAbsent(id, () { |
+ object2id[object] = id; |
+ return object; |
+ }); |
+ return object; |
+ } |
+ |
+ bool isAdding(element) => object2id[element] == _BEING_ADDED; |
+ |
+ void setAdding(element) { |
+ assert(!contains(element)); |
+ object2id[element] = _BEING_ADDED; |
+ } |
+ |
+ void stopAdding(element) { |
+ assert(object2id[element] == _BEING_ADDED); |
+ object2id.remove(element); |
+ } |
+ |
+ void remove(int id) { |
+ var element = id2object.remove(id); |
+ if (element != null) { |
+ object2id.remove(element); |
+ } |
+ } |
+ |
+ bool contains(element) => object2id.containsKey(element); |
+} |
+ |
+/** |
+ * The central repository used by distributed [Registry] instances. |
+ */ |
+class RegistryManager { |
+ final Duration _timeout; |
+ int _nextId = 0; |
+ RawReceivePort _commandPort; |
+ |
+ /** |
+ * Maps id to entry. Each entry contains the id, the element, its tags, |
+ * and a capability required to remove it again. |
+ */ |
+ Map<int, _RegistryEntry> _entries = new HashMap(); |
+ Map<Object, Set<int>> _tag2id = new HashMap(); |
+ |
+ /** |
+ * Create a new registry managed by the created [RegistryManager]. |
+ * |
+ * The optional [timeout] parameter can be set to the duration |
+ * registry objects should wait before assuming that an operation |
+ * has failed. |
+ */ |
+ RegistryManager({timeout: const Duration(seconds: 5)}) |
+ : _timeout = timeout, |
+ _commandPort = new RawReceivePort() { |
+ _commandPort.handler = _handleCommand; |
+ } |
+ |
+ /** |
+ * The command port receiving commands for the registry manager. |
+ * |
+ * Use this port with [Registry.fromPort] to link a registry to the |
+ * manager in isolates where you can't send a [Registry] object directly. |
+ */ |
+ SendPort get commandPort => _commandPort.sendPort; |
+ |
+ /** |
+ * Get a registry backed by this manager. |
+ * |
+ * This registry can be sent to other isolates created using |
+ * [Isolate.spawn]. |
+ */ |
+ Registry get registry => new Registry.fromPort(_commandPort.sendPort, |
+ timeout: _timeout); |
+ |
+ // Used as argument to putIfAbsent. |
+ static Set _createSet() => new HashSet(); |
+ |
+ void _handleCommand(List command) { |
+ switch(command[0]) { |
+ case _ADD: |
+ _add(command[1], command[2], command[3]); |
+ return; |
+ case _REMOVE: |
+ _remove(command[1], command[2], command[3]); |
+ return; |
+ case _ADD_TAGS: |
+ _addTags(command[1], command[2], command[3]); |
+ return; |
+ case _REMOVE_TAGS: |
+ _removeTags(command[1], command[2], command[3]); |
+ return; |
+ case _GET_TAGS: |
+ _getTags(command[1], command[2]); |
+ return; |
+ case _FIND: |
+ _find(command[1], command[2], command[3]); |
+ return; |
+ default: |
+ throw new UnsupportedError("Unknown command: ${command[0]}"); |
+ } |
+ } |
+ |
+ void _add(Object object, List tags, SendPort replyPort) { |
+ int id = ++_nextId; |
+ var entry = new _RegistryEntry(id, object); |
+ _entries[id] = entry; |
+ if (tags != null) { |
+ for (var tag in tags) { |
+ entry.tags.add(tag); |
+ _tag2id.putIfAbsent(tag, _createSet).add(id); |
+ } |
+ } |
+ replyPort.send(list2(id, entry.removeCapability)); |
+ } |
+ |
+ void _remove(int id, Capability removeCapability, SendPort replyPort) { |
+ _RegistryEntry entry = _entries[id]; |
+ if (entry == null || entry.removeCapability != removeCapability) { |
+ replyPort.send(false); |
+ return; |
+ } |
+ _entries.remove(id); |
+ for (var tag in entry.tags) { |
+ _tag2id[tag].remove(id); |
+ } |
+ replyPort.send(true); |
+ } |
+ |
+ void _addTags(List<int> ids, List tags, SendPort replyPort) { |
+ assert(tags != null); |
+ assert(tags.isNotEmpty); |
+ for (int id in ids) { |
+ _RegistryEntry entry = _entries[id]; |
+ if (entry == null) continue; // Entry was removed. |
+ entry.tags.addAll(tags); |
+ for (var tag in tags) { |
+ Set ids = _tag2id.putIfAbsent(tag, _createSet); |
+ ids.add(id); |
+ } |
+ } |
+ replyPort.send(null); |
+ } |
+ |
+ void _removeTags(List<int> ids, List tags, SendPort replyPort) { |
+ assert(tags != null); |
+ assert(tags.isNotEmpty); |
+ for (int id in ids) { |
+ _RegistryEntry entry = _entries[id]; |
+ if (entry == null) continue; // Object was removed. |
+ entry.tags.removeAll(tags); |
+ } |
+ for (var tag in tags) { |
+ Set tagIds = _tag2id[tag]; |
+ if (tagIds == null) continue; |
+ tagIds.removeAll(ids); |
+ } |
+ replyPort.send(null); |
+ } |
+ |
+ void _getTags(int id, SendPort replyPort) { |
+ _RegistryEntry entry = _entries[id]; |
+ if (entry != null) { |
+ replyPort.send(entry.tags.toList(growable: false)); |
+ } else { |
+ replyPort.send(const []); |
+ } |
+ } |
+ |
+ Iterable<int> _findTaggedIds(List tags) { |
+ var matchingFirstTagIds = _tag2id[tags[0]]; |
+ if (matchingFirstTagIds == null) { |
+ return const []; |
+ } |
+ if (matchingFirstTagIds.isEmpty || tags.length == 1) { |
+ return matchingFirstTagIds; |
+ } |
+ // Create new set, then start removing ids not also matched |
+ // by other tags. |
+ Set<int> matchingIds = matchingFirstTagIds.toSet(); |
+ for (int i = 1; i < tags.length; i++) { |
+ var tagIds = _tag2id[tags[i]]; |
+ if (tagIds == null) return const []; |
+ matchingIds.retainAll(tagIds); |
+ if (matchingIds.isEmpty) break; |
+ } |
+ return matchingIds; |
+ } |
+ |
+ void _find(List tags, int max, SendPort replyPort) { |
+ assert(max == null || max > 0); |
+ List result = []; |
+ if (tags == null || tags.isEmpty) { |
+ var entries = _entries.values; |
+ if (max != null) entries = entries.take(max); |
+ for (_RegistryEntry entry in entries) { |
+ result.add(entry.id); |
+ result.add(entry.element); |
+ } |
+ replyPort.send(result); |
+ return; |
+ } |
+ var matchingIds = _findTaggedIds(tags); |
+ if (max == null) max = matchingIds.length; // All results. |
+ for (var id in matchingIds) { |
+ result.add(id); |
+ result.add(_entries[id].element); |
+ max--; |
+ if (max == 0) break; |
+ } |
+ replyPort.send(result); |
+ } |
+ |
+ /** |
+ * Shut down the registry service. |
+ * |
+ * After this, all [Registry] operations will time out. |
+ */ |
+ void close() { |
+ _commandPort.close(); |
+ } |
+} |
+ |
+/** Entry in [RegistryManager]. */ |
+class _RegistryEntry { |
+ final int id; |
+ final Object element; |
+ final Set tags = new HashSet(); |
+ final Capability removeCapability = new Capability(); |
+ _RegistryEntry(this.id, this.element); |
+} |