OLD | NEW |
(Empty) | |
| 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. |
| 4 |
| 5 /** |
| 6 * An isolate-compatible object registry and lookup service. |
| 7 */ |
| 8 library dart.pkg.isolate.registry; |
| 9 |
| 10 import "dart:async" show Future, Completer, TimeoutException; |
| 11 import "dart:isolate" show RawReceivePort, SendPort, Capability; |
| 12 import "dart:collection" show HashMap, HashSet; |
| 13 import "ports.dart"; |
| 14 import "src/lists.dart"; |
| 15 |
| 16 // Command tags. |
| 17 const int _ADD = 0; |
| 18 const int _REMOVE = 1; |
| 19 const int _ADD_TAGS = 2; |
| 20 const int _REMOVE_TAGS = 3; |
| 21 const int _GET_TAGS = 4; |
| 22 const int _FIND = 5; |
| 23 |
| 24 /** |
| 25 * An isolate-compatible object registry. |
| 26 * |
| 27 * Objects can be stored as elements of a registry, |
| 28 * have "tags" assigned to them, and be looked up by tag. |
| 29 * |
| 30 * A `Registry` object caches objects found using the [lookup] |
| 31 * method, or added using [add], and returns the same object every time |
| 32 * they are requested. |
| 33 * A different `Registry` object that works on the same registry will not |
| 34 * preserve the identity of elements |
| 35 * |
| 36 * It is recommended to only have one `Registry` object working on the |
| 37 * same registry in each isolate. |
| 38 * |
| 39 * When the registry is shared accross isolates, both elements and tags must |
| 40 * be sendable between the isolates. |
| 41 * Between isolates spawned using [Isolate.spawn] from the same initial |
| 42 * isolate, most objectes can be sent. |
| 43 * Only simple objects can be sent between isolates originating from different |
| 44 * [Isolate.spawnUri] calls. |
| 45 */ |
| 46 class Registry<T> { |
| 47 // Most operations fail if they haven't received a response for this duration. |
| 48 final Duration _timeout; |
| 49 |
| 50 // Each `Registry` object has a cache of objects being controlled by it. |
| 51 // The cache is stored in an [Expando], not on the object. |
| 52 // This allows sending the `Registry` object through a `SendPort` without |
| 53 // also copying the cache. |
| 54 static Expando _caches = new Expando(); |
| 55 |
| 56 /** |
| 57 * Port for sending command to the central registry mananger. |
| 58 */ |
| 59 SendPort _commandPort; |
| 60 |
| 61 /** |
| 62 * Create a registry linked to a [RegistryManager] through [commandPort]. |
| 63 * |
| 64 * In most cases, a registry is created by using the |
| 65 * [RegistryManager.registry] getter. |
| 66 * |
| 67 * If a registry is used between isolates created using [Isolate.spawnUri], |
| 68 * the `Registry` object can't be sent between the isolates directly. |
| 69 * Instead the [RegistryManager.commandPort] port can be sent and a |
| 70 * `Registry` created from the command port using this constructor. |
| 71 * |
| 72 * The optional [timeout] parameter can be set to the duration |
| 73 * this registry should wait before assuming that an operation |
| 74 * has failed. |
| 75 */ |
| 76 Registry.fromPort(SendPort commandPort, |
| 77 {Duration timeout: const Duration(seconds: 5)}) |
| 78 : _commandPort = commandPort, |
| 79 _timeout = timeout; |
| 80 |
| 81 _RegistryCache get _cache { |
| 82 _RegistryCache cache = _caches[this]; |
| 83 if (cache != null) return cache; |
| 84 cache = new _RegistryCache(); |
| 85 _caches[this] = cache; |
| 86 return cache; |
| 87 } |
| 88 |
| 89 /** |
| 90 * Check and get the identity of an element. |
| 91 * |
| 92 * Throws if [element] is not an element in the registry. |
| 93 */ |
| 94 int _getId(T element) { |
| 95 int id = _cache.id(element); |
| 96 if (id == null) { |
| 97 throw new StateError("Not an element: ${Error.safeToString(element)}"); |
| 98 } |
| 99 return id; |
| 100 } |
| 101 |
| 102 /** |
| 103 * Adds [element] to the registry with the provided tags. |
| 104 * |
| 105 * Fails if [element] is already in this registry. |
| 106 * An object is already in the registry if it has been added using [add], |
| 107 * or if it was returned by a [lookup] call on this registry object. |
| 108 * |
| 109 * Returns a capability that can be used with [remove] to remove |
| 110 * the element from the registry again. |
| 111 * |
| 112 * The [tags] can be used to distinguish some of the elements |
| 113 * from other elements. Any object can be used as a tag, as long as |
| 114 * it preserves equality when sent through a [SendPort]. |
| 115 * This makes [Capability] objects a good choice for tags. |
| 116 */ |
| 117 Future<Capability> add(T element, {Iterable tags}) { |
| 118 _RegistryCache cache = _cache; |
| 119 if (cache.contains(element)) { |
| 120 return new Future<Capability>.sync(() { |
| 121 throw new StateError( |
| 122 "Object already in registry: ${Error.safeToString(element)}"); |
| 123 }); |
| 124 } |
| 125 Completer completer = new Completer<Capability>(); |
| 126 SendPort port = singleCompletePort(completer, callback: (List response) { |
| 127 assert(cache.isAdding(element)); |
| 128 int id = response[0]; |
| 129 Capability removeCapability = response[1]; |
| 130 cache.register(id, element); |
| 131 return removeCapability; |
| 132 }, timeout: _timeout, onTimeout: () { |
| 133 cache.stopAdding(element); |
| 134 throw new TimeoutException("Future not completed", _timeout); |
| 135 }); |
| 136 if (tags != null) tags = tags.toList(growable: false); |
| 137 cache.setAdding(element); |
| 138 _commandPort.send(list4(_ADD, element, tags, port)); |
| 139 return completer.future; |
| 140 } |
| 141 |
| 142 /** |
| 143 * Remove the element from the registry. |
| 144 * |
| 145 * Returns `true` if removing the element succeeded, or `false` if the |
| 146 * elements wasn't in the registry, or if it couldn't be removed. |
| 147 * |
| 148 * The [removeCapability] must be the same capability returned by [add] |
| 149 * when the object was added. If the capability is wrong, the |
| 150 * object is not removed, and this function returns false. |
| 151 */ |
| 152 Future<bool> remove(T element, Capability removeCapability) { |
| 153 int id = _cache.id(element); |
| 154 if (id == null) { |
| 155 return new Future<bool>.value(false); |
| 156 } |
| 157 Completer completer = new Completer<bool>(); |
| 158 SendPort port = singleCompletePort(completer, callback: (bool result) { |
| 159 _cache.remove(id); |
| 160 return result; |
| 161 }, timeout: _timeout); |
| 162 _commandPort.send(list4(_REMOVE, id, removeCapability, port)); |
| 163 return completer.future; |
| 164 } |
| 165 |
| 166 /** |
| 167 * Add tags to an object in the registry. |
| 168 * |
| 169 * Each element of the registry has a number of tags associated with |
| 170 * it. A tag is either associated with an element or not, adding it more |
| 171 * than once does not make any difference. |
| 172 * |
| 173 * Tags are compared using [Object.==] equality. |
| 174 * |
| 175 * Fails if any of the elements are not in the registry. |
| 176 */ |
| 177 Future addTags(Iterable<T> elements, Iterable tags) { |
| 178 List ids = elements.map(_getId).toList(growable: false); |
| 179 return _addTags(ids, tags); |
| 180 } |
| 181 |
| 182 /** |
| 183 * Remove tags from an object in the registry. |
| 184 * |
| 185 * After this operation, the [elements] will not be associated to the [tags]. |
| 186 * It doesn't matter whether the elements were associated with the tags |
| 187 * before or not. |
| 188 * |
| 189 * Fails if any of the elements are not in the registry. |
| 190 */ |
| 191 Future removeTags(Iterable<T> elements, Iterable tags) { |
| 192 List ids = elements.map(_getId).toList(growable: false); |
| 193 tags = tags.toList(growable: false); |
| 194 Completer completer = new Completer(); |
| 195 SendPort port = singleCompletePort(completer, timeout: _timeout); |
| 196 _commandPort.send(list4(_REMOVE_TAGS, ids, tags, port)); |
| 197 return completer.future; |
| 198 } |
| 199 |
| 200 Future _addTags(List<int> ids, Iterable tags) { |
| 201 tags = tags.toList(growable: false); |
| 202 Completer completer = new Completer(); |
| 203 SendPort port = singleCompletePort(completer, timeout: _timeout); |
| 204 _commandPort.send(list4(_ADD_TAGS, ids, tags, port)); |
| 205 return completer.future; |
| 206 } |
| 207 |
| 208 /** |
| 209 * Finds a number of elements that have all the desired [tags]. |
| 210 * |
| 211 * If [tags] is omitted or empty, any element of the registry can be |
| 212 * returned. |
| 213 * |
| 214 * If [max] is specified, it must be greater than zero. |
| 215 * In that case, at most the first `max` results are returned, |
| 216 * in whatever order the registry finds its results. |
| 217 * Otherwise all matching elements are returned. |
| 218 */ |
| 219 Future<List<T>> lookup({Iterable tags, int max}) { |
| 220 if (max != null && max < 1) { |
| 221 throw new RangeError.range(max, 1, null, "max"); |
| 222 } |
| 223 if (tags != null) tags = tags.toList(growable: false); |
| 224 Completer completer = new Completer<List<T>>(); |
| 225 SendPort port = singleCompletePort(completer, callback: (List response) { |
| 226 // Response is even-length list of (id, element) pairs. |
| 227 _RegistryCache cache = _cache; |
| 228 int count = response.length ~/ 2; |
| 229 List result = new List(count); |
| 230 for (int i = 0; i < count; i++) { |
| 231 int id = response[i * 2]; |
| 232 var element = response[i * 2 + 1]; |
| 233 element = cache.register(id, element); |
| 234 result[i] = element; |
| 235 } |
| 236 return result; |
| 237 }, timeout: _timeout); |
| 238 _commandPort.send(list4(_FIND, tags, max, port)); |
| 239 return completer.future; |
| 240 } |
| 241 } |
| 242 |
| 243 /** |
| 244 * Isolate-local cache used by a [Registry]. |
| 245 * |
| 246 * Maps between id-numbers and elements. |
| 247 * An object is considered an element of the registry if it |
| 248 */ |
| 249 class _RegistryCache { |
| 250 // Temporary marker until an object gets an id. |
| 251 static const int _BEING_ADDED = -1; |
| 252 |
| 253 final Map<int, Object> id2object = new HashMap(); |
| 254 final Map<Object, int> object2id = new HashMap.identity(); |
| 255 |
| 256 int id(Object object) { |
| 257 int result = object2id[object]; |
| 258 if (result == _BEING_ADDED) return null; |
| 259 return result; |
| 260 } |
| 261 |
| 262 Object operator[](int id) => id2object[id]; |
| 263 |
| 264 // Register a pair of id/object in the cache. |
| 265 // if the id is already in the cache, just return the existing |
| 266 // object. |
| 267 Object register(int id, Object object) { |
| 268 object = id2object.putIfAbsent(id, () { |
| 269 object2id[object] = id; |
| 270 return object; |
| 271 }); |
| 272 return object; |
| 273 } |
| 274 |
| 275 bool isAdding(element) => object2id[element] == _BEING_ADDED; |
| 276 |
| 277 void setAdding(element) { |
| 278 assert(!contains(element)); |
| 279 object2id[element] = _BEING_ADDED; |
| 280 } |
| 281 |
| 282 void stopAdding(element) { |
| 283 assert(object2id[element] == _BEING_ADDED); |
| 284 object2id.remove(element); |
| 285 } |
| 286 |
| 287 void remove(int id) { |
| 288 var element = id2object.remove(id); |
| 289 if (element != null) { |
| 290 object2id.remove(element); |
| 291 } |
| 292 } |
| 293 |
| 294 bool contains(element) => object2id.containsKey(element); |
| 295 } |
| 296 |
| 297 /** |
| 298 * The central repository used by distributed [Registry] instances. |
| 299 */ |
| 300 class RegistryManager { |
| 301 final Duration _timeout; |
| 302 int _nextId = 0; |
| 303 RawReceivePort _commandPort; |
| 304 |
| 305 /** |
| 306 * Maps id to entry. Each entry contains the id, the element, its tags, |
| 307 * and a capability required to remove it again. |
| 308 */ |
| 309 Map<int, _RegistryEntry> _entries = new HashMap(); |
| 310 Map<Object, Set<int>> _tag2id = new HashMap(); |
| 311 |
| 312 /** |
| 313 * Create a new registry managed by the created [RegistryManager]. |
| 314 * |
| 315 * The optional [timeout] parameter can be set to the duration |
| 316 * registry objects should wait before assuming that an operation |
| 317 * has failed. |
| 318 */ |
| 319 RegistryManager({timeout: const Duration(seconds: 5)}) |
| 320 : _timeout = timeout, |
| 321 _commandPort = new RawReceivePort() { |
| 322 _commandPort.handler = _handleCommand; |
| 323 } |
| 324 |
| 325 /** |
| 326 * The command port receiving commands for the registry manager. |
| 327 * |
| 328 * Use this port with [Registry.fromPort] to link a registry to the |
| 329 * manager in isolates where you can't send a [Registry] object directly. |
| 330 */ |
| 331 SendPort get commandPort => _commandPort.sendPort; |
| 332 |
| 333 /** |
| 334 * Get a registry backed by this manager. |
| 335 * |
| 336 * This registry can be sent to other isolates created using |
| 337 * [Isolate.spawn]. |
| 338 */ |
| 339 Registry get registry => new Registry.fromPort(_commandPort.sendPort, |
| 340 timeout: _timeout); |
| 341 |
| 342 // Used as argument to putIfAbsent. |
| 343 static Set _createSet() => new HashSet(); |
| 344 |
| 345 void _handleCommand(List command) { |
| 346 switch(command[0]) { |
| 347 case _ADD: |
| 348 _add(command[1], command[2], command[3]); |
| 349 return; |
| 350 case _REMOVE: |
| 351 _remove(command[1], command[2], command[3]); |
| 352 return; |
| 353 case _ADD_TAGS: |
| 354 _addTags(command[1], command[2], command[3]); |
| 355 return; |
| 356 case _REMOVE_TAGS: |
| 357 _removeTags(command[1], command[2], command[3]); |
| 358 return; |
| 359 case _GET_TAGS: |
| 360 _getTags(command[1], command[2]); |
| 361 return; |
| 362 case _FIND: |
| 363 _find(command[1], command[2], command[3]); |
| 364 return; |
| 365 default: |
| 366 throw new UnsupportedError("Unknown command: ${command[0]}"); |
| 367 } |
| 368 } |
| 369 |
| 370 void _add(Object object, List tags, SendPort replyPort) { |
| 371 int id = ++_nextId; |
| 372 var entry = new _RegistryEntry(id, object); |
| 373 _entries[id] = entry; |
| 374 if (tags != null) { |
| 375 for (var tag in tags) { |
| 376 entry.tags.add(tag); |
| 377 _tag2id.putIfAbsent(tag, _createSet).add(id); |
| 378 } |
| 379 } |
| 380 replyPort.send(list2(id, entry.removeCapability)); |
| 381 } |
| 382 |
| 383 void _remove(int id, Capability removeCapability, SendPort replyPort) { |
| 384 _RegistryEntry entry = _entries[id]; |
| 385 if (entry == null || entry.removeCapability != removeCapability) { |
| 386 replyPort.send(false); |
| 387 return; |
| 388 } |
| 389 _entries.remove(id); |
| 390 for (var tag in entry.tags) { |
| 391 _tag2id[tag].remove(id); |
| 392 } |
| 393 replyPort.send(true); |
| 394 } |
| 395 |
| 396 void _addTags(List<int> ids, List tags, SendPort replyPort) { |
| 397 assert(tags != null); |
| 398 assert(tags.isNotEmpty); |
| 399 for (int id in ids) { |
| 400 _RegistryEntry entry = _entries[id]; |
| 401 if (entry == null) continue; // Entry was removed. |
| 402 entry.tags.addAll(tags); |
| 403 for (var tag in tags) { |
| 404 Set ids = _tag2id.putIfAbsent(tag, _createSet); |
| 405 ids.add(id); |
| 406 } |
| 407 } |
| 408 replyPort.send(null); |
| 409 } |
| 410 |
| 411 void _removeTags(List<int> ids, List tags, SendPort replyPort) { |
| 412 assert(tags != null); |
| 413 assert(tags.isNotEmpty); |
| 414 for (int id in ids) { |
| 415 _RegistryEntry entry = _entries[id]; |
| 416 if (entry == null) continue; // Object was removed. |
| 417 entry.tags.removeAll(tags); |
| 418 } |
| 419 for (var tag in tags) { |
| 420 Set tagIds = _tag2id[tag]; |
| 421 if (tagIds == null) continue; |
| 422 tagIds.removeAll(ids); |
| 423 } |
| 424 replyPort.send(null); |
| 425 } |
| 426 |
| 427 void _getTags(int id, SendPort replyPort) { |
| 428 _RegistryEntry entry = _entries[id]; |
| 429 if (entry != null) { |
| 430 replyPort.send(entry.tags.toList(growable: false)); |
| 431 } else { |
| 432 replyPort.send(const []); |
| 433 } |
| 434 } |
| 435 |
| 436 Iterable<int> _findTaggedIds(List tags) { |
| 437 var matchingFirstTagIds = _tag2id[tags[0]]; |
| 438 if (matchingFirstTagIds == null) { |
| 439 return const []; |
| 440 } |
| 441 if (matchingFirstTagIds.isEmpty || tags.length == 1) { |
| 442 return matchingFirstTagIds; |
| 443 } |
| 444 // Create new set, then start removing ids not also matched |
| 445 // by other tags. |
| 446 Set<int> matchingIds = matchingFirstTagIds.toSet(); |
| 447 for (int i = 1; i < tags.length; i++) { |
| 448 var tagIds = _tag2id[tags[i]]; |
| 449 if (tagIds == null) return const []; |
| 450 matchingIds.retainAll(tagIds); |
| 451 if (matchingIds.isEmpty) break; |
| 452 } |
| 453 return matchingIds; |
| 454 } |
| 455 |
| 456 void _find(List tags, int max, SendPort replyPort) { |
| 457 assert(max == null || max > 0); |
| 458 List result = []; |
| 459 if (tags == null || tags.isEmpty) { |
| 460 var entries = _entries.values; |
| 461 if (max != null) entries = entries.take(max); |
| 462 for (_RegistryEntry entry in entries) { |
| 463 result.add(entry.id); |
| 464 result.add(entry.element); |
| 465 } |
| 466 replyPort.send(result); |
| 467 return; |
| 468 } |
| 469 var matchingIds = _findTaggedIds(tags); |
| 470 if (max == null) max = matchingIds.length; // All results. |
| 471 for (var id in matchingIds) { |
| 472 result.add(id); |
| 473 result.add(_entries[id].element); |
| 474 max--; |
| 475 if (max == 0) break; |
| 476 } |
| 477 replyPort.send(result); |
| 478 } |
| 479 |
| 480 /** |
| 481 * Shut down the registry service. |
| 482 * |
| 483 * After this, all [Registry] operations will time out. |
| 484 */ |
| 485 void close() { |
| 486 _commandPort.close(); |
| 487 } |
| 488 } |
| 489 |
| 490 /** Entry in [RegistryManager]. */ |
| 491 class _RegistryEntry { |
| 492 final int id; |
| 493 final Object element; |
| 494 final Set tags = new HashSet(); |
| 495 final Capability removeCapability = new Capability(); |
| 496 _RegistryEntry(this.id, this.element); |
| 497 } |
OLD | NEW |