| OLD | NEW |
| (Empty) | |
| 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. |
| 4 |
| 5 /// An isolate-compatible object registry and lookup service. |
| 6 library isolate.registry; |
| 7 |
| 8 import 'dart:async' show Future, Completer, TimeoutException; |
| 9 import 'dart:collection' show HashMap, HashSet; |
| 10 import 'dart:isolate' show RawReceivePort, SendPort, Capability; |
| 11 |
| 12 import 'ports.dart'; |
| 13 import 'src/lists.dart'; |
| 14 |
| 15 // Command tags. |
| 16 const int _ADD = 0; |
| 17 const int _REMOVE = 1; |
| 18 const int _ADD_TAGS = 2; |
| 19 const int _REMOVE_TAGS = 3; |
| 20 const int _GET_TAGS = 4; |
| 21 const int _FIND = 5; |
| 22 |
| 23 /// An isolate-compatible object registry. |
| 24 /// |
| 25 /// Objects can be stored as elements of a registry, |
| 26 /// have "tags" assigned to them, and be looked up by tag. |
| 27 /// |
| 28 /// A `Registry` object caches objects found using the [lookup] |
| 29 /// method, or added using [add], and returns the same object every time |
| 30 /// they are requested. |
| 31 /// A different `Registry` object that works on the same registry will not |
| 32 /// preserve the identity of elements |
| 33 /// |
| 34 /// It is recommended to only have one `Registry` object working on the |
| 35 /// same registry in each isolate. |
| 36 /// |
| 37 /// When the registry is shared across isolates, both elements and tags must |
| 38 /// be sendable between the isolates. |
| 39 /// Between isolates spawned using [Isolate.spawn] from the same initial |
| 40 /// isolate, most objects can be sent. |
| 41 /// Only simple objects can be sent between isolates originating from different |
| 42 /// [Isolate.spawnUri] calls. |
| 43 class Registry<T> { |
| 44 // Most operations fail if they haven't received a response for this duration. |
| 45 final Duration _timeout; |
| 46 |
| 47 // Each `Registry` object has a cache of objects being controlled by it. |
| 48 // The cache is stored in an [Expando], not on the object. |
| 49 // This allows sending the `Registry` object through a `SendPort` without |
| 50 // also copying the cache. |
| 51 static Expando _caches = new Expando(); |
| 52 |
| 53 /// Port for sending command to the central registry manager. |
| 54 SendPort _commandPort; |
| 55 |
| 56 /// Create a registry linked to a [RegistryManager] through [commandPort]. |
| 57 /// |
| 58 /// In most cases, a registry is created by using the |
| 59 /// [RegistryManager.registry] getter. |
| 60 /// |
| 61 /// If a registry is used between isolates created using [Isolate.spawnUri], |
| 62 /// the `Registry` object can't be sent between the isolates directly. |
| 63 /// Instead the [RegistryManager.commandPort] port can be sent and a |
| 64 /// `Registry` created from the command port using this constructor. |
| 65 /// |
| 66 /// The optional [timeout] parameter can be set to the duration |
| 67 /// this registry should wait before assuming that an operation |
| 68 /// has failed. |
| 69 Registry.fromPort(SendPort commandPort, |
| 70 {Duration timeout: const Duration(seconds: 5)}) |
| 71 : _commandPort = commandPort, |
| 72 _timeout = timeout; |
| 73 |
| 74 _RegistryCache get _cache { |
| 75 _RegistryCache cache = _caches[this]; |
| 76 if (cache != null) return cache; |
| 77 cache = new _RegistryCache(); |
| 78 _caches[this] = cache; |
| 79 return cache; |
| 80 } |
| 81 |
| 82 /// Check and get the identity of an element. |
| 83 /// |
| 84 /// Throws if [element] is not an element in the registry. |
| 85 int _getId(T element) { |
| 86 int id = _cache.id(element); |
| 87 if (id == null) { |
| 88 throw new StateError("Not an element: ${Error.safeToString(element)}"); |
| 89 } |
| 90 return id; |
| 91 } |
| 92 |
| 93 /// Adds [element] to the registry with the provided tags. |
| 94 /// |
| 95 /// Fails if [element] is already in this registry. |
| 96 /// An object is already in the registry if it has been added using [add], |
| 97 /// or if it was returned by a [lookup] call on this registry object. |
| 98 /// |
| 99 /// Returns a capability that can be used with [remove] to remove |
| 100 /// the element from the registry again. |
| 101 /// |
| 102 /// The [tags] can be used to distinguish some of the elements |
| 103 /// from other elements. Any object can be used as a tag, as long as |
| 104 /// it preserves equality when sent through a [SendPort]. |
| 105 /// This makes [Capability] objects a good choice for tags. |
| 106 Future<Capability> add(T element, {Iterable tags}) { |
| 107 _RegistryCache cache = _cache; |
| 108 if (cache.contains(element)) { |
| 109 return new Future<Capability>.sync(() { |
| 110 throw new StateError( |
| 111 "Object already in registry: ${Error.safeToString(element)}"); |
| 112 }); |
| 113 } |
| 114 Completer completer = new Completer<Capability>(); |
| 115 SendPort port = singleCompletePort(completer, callback: (List response) { |
| 116 assert(cache.isAdding(element)); |
| 117 int id = response[0]; |
| 118 Capability removeCapability = response[1]; |
| 119 cache.register(id, element); |
| 120 return removeCapability; |
| 121 }, timeout: _timeout, onTimeout: () { |
| 122 cache.stopAdding(element); |
| 123 throw new TimeoutException("Future not completed", _timeout); |
| 124 }); |
| 125 if (tags != null) tags = tags.toList(growable: false); |
| 126 cache.setAdding(element); |
| 127 _commandPort.send(list4(_ADD, element, tags, port)); |
| 128 return completer.future; |
| 129 } |
| 130 |
| 131 /// Remove the element from the registry. |
| 132 /// |
| 133 /// Returns `true` if removing the element succeeded, or `false` if the |
| 134 /// elements wasn't in the registry, or if it couldn't be removed. |
| 135 /// |
| 136 /// The [removeCapability] must be the same capability returned by [add] |
| 137 /// when the object was added. If the capability is wrong, the |
| 138 /// object is not removed, and this function returns false. |
| 139 Future<bool> remove(T element, Capability removeCapability) { |
| 140 int id = _cache.id(element); |
| 141 if (id == null) { |
| 142 return new Future<bool>.value(false); |
| 143 } |
| 144 Completer completer = new Completer<bool>(); |
| 145 SendPort port = singleCompletePort(completer, callback: (bool result) { |
| 146 _cache.remove(id); |
| 147 return result; |
| 148 }, timeout: _timeout); |
| 149 _commandPort.send(list4(_REMOVE, id, removeCapability, port)); |
| 150 return completer.future; |
| 151 } |
| 152 |
| 153 /// Add tags to an object in the registry. |
| 154 /// |
| 155 /// Each element of the registry has a number of tags associated with |
| 156 /// it. A tag is either associated with an element or not, adding it more |
| 157 /// than once does not make any difference. |
| 158 /// |
| 159 /// Tags are compared using [Object.==] equality. |
| 160 /// |
| 161 /// Fails if any of the elements are not in the registry. |
| 162 Future addTags(Iterable<T> elements, Iterable tags) { |
| 163 List ids = elements.map(_getId).toList(growable: false); |
| 164 return _addTags(ids, tags); |
| 165 } |
| 166 |
| 167 /// Remove tags from an object in the registry. |
| 168 /// |
| 169 /// After this operation, the [elements] will not be associated to the [tags]. |
| 170 /// It doesn't matter whether the elements were associated with the tags |
| 171 /// before or not. |
| 172 /// |
| 173 /// Fails if any of the elements are not in the registry. |
| 174 Future removeTags(Iterable<T> elements, Iterable tags) { |
| 175 List ids = elements.map(_getId).toList(growable: false); |
| 176 tags = tags.toList(growable: false); |
| 177 Completer completer = new Completer(); |
| 178 SendPort port = singleCompletePort(completer, timeout: _timeout); |
| 179 _commandPort.send(list4(_REMOVE_TAGS, ids, tags, port)); |
| 180 return completer.future; |
| 181 } |
| 182 |
| 183 Future _addTags(List<int> ids, Iterable tags) { |
| 184 tags = tags.toList(growable: false); |
| 185 Completer completer = new Completer(); |
| 186 SendPort port = singleCompletePort(completer, timeout: _timeout); |
| 187 _commandPort.send(list4(_ADD_TAGS, ids, tags, port)); |
| 188 return completer.future; |
| 189 } |
| 190 |
| 191 /// Finds a number of elements that have all the desired [tags]. |
| 192 /// |
| 193 /// If [tags] is omitted or empty, any element of the registry can be |
| 194 /// returned. |
| 195 /// |
| 196 /// If [max] is specified, it must be greater than zero. |
| 197 /// In that case, at most the first `max` results are returned, |
| 198 /// in whatever order the registry finds its results. |
| 199 /// Otherwise all matching elements are returned. |
| 200 Future<List<T>> lookup({Iterable tags, int max}) { |
| 201 if (max != null && max < 1) { |
| 202 throw new RangeError.range(max, 1, null, "max"); |
| 203 } |
| 204 if (tags != null) tags = tags.toList(growable: false); |
| 205 Completer completer = new Completer<List<T>>(); |
| 206 SendPort port = singleCompletePort(completer, callback: (List response) { |
| 207 // Response is even-length list of (id, element) pairs. |
| 208 _RegistryCache cache = _cache; |
| 209 int count = response.length ~/ 2; |
| 210 List result = new List(count); |
| 211 for (int i = 0; i < count; i++) { |
| 212 int id = response[i * 2]; |
| 213 var element = response[i * 2 + 1]; |
| 214 element = cache.register(id, element); |
| 215 result[i] = element; |
| 216 } |
| 217 return result; |
| 218 }, timeout: _timeout); |
| 219 _commandPort.send(list4(_FIND, tags, max, port)); |
| 220 return completer.future; |
| 221 } |
| 222 } |
| 223 |
| 224 /// Isolate-local cache used by a [Registry]. |
| 225 /// |
| 226 /// Maps between id-numbers and elements. |
| 227 /// An object is considered an element of the registry if it |
| 228 class _RegistryCache { |
| 229 // Temporary marker until an object gets an id. |
| 230 static const int _BEING_ADDED = -1; |
| 231 |
| 232 final Map<int, Object> id2object = new HashMap(); |
| 233 final Map<Object, int> object2id = new HashMap.identity(); |
| 234 |
| 235 int id(Object object) { |
| 236 int result = object2id[object]; |
| 237 if (result == _BEING_ADDED) return null; |
| 238 return result; |
| 239 } |
| 240 |
| 241 Object operator [](int id) => id2object[id]; |
| 242 |
| 243 // Register a pair of id/object in the cache. |
| 244 // if the id is already in the cache, just return the existing |
| 245 // object. |
| 246 Object register(int id, Object object) { |
| 247 object = id2object.putIfAbsent(id, () { |
| 248 object2id[object] = id; |
| 249 return object; |
| 250 }); |
| 251 return object; |
| 252 } |
| 253 |
| 254 bool isAdding(element) => object2id[element] == _BEING_ADDED; |
| 255 |
| 256 void setAdding(element) { |
| 257 assert(!contains(element)); |
| 258 object2id[element] = _BEING_ADDED; |
| 259 } |
| 260 |
| 261 void stopAdding(element) { |
| 262 assert(object2id[element] == _BEING_ADDED); |
| 263 object2id.remove(element); |
| 264 } |
| 265 |
| 266 void remove(int id) { |
| 267 var element = id2object.remove(id); |
| 268 if (element != null) { |
| 269 object2id.remove(element); |
| 270 } |
| 271 } |
| 272 |
| 273 bool contains(element) => object2id.containsKey(element); |
| 274 } |
| 275 |
| 276 /// The central repository used by distributed [Registry] instances. |
| 277 class RegistryManager { |
| 278 final Duration _timeout; |
| 279 int _nextId = 0; |
| 280 RawReceivePort _commandPort; |
| 281 |
| 282 /// Maps id to entry. Each entry contains the id, the element, its tags, |
| 283 /// and a capability required to remove it again. |
| 284 Map<int, _RegistryEntry> _entries = new HashMap(); |
| 285 Map<Object, Set<int>> _tag2id = new HashMap(); |
| 286 |
| 287 /// Create a new registry managed by the created [RegistryManager]. |
| 288 /// |
| 289 /// The optional [timeout] parameter can be set to the duration |
| 290 /// registry objects should wait before assuming that an operation |
| 291 /// has failed. |
| 292 RegistryManager({timeout: const Duration(seconds: 5)}) |
| 293 : _timeout = timeout, |
| 294 _commandPort = new RawReceivePort() { |
| 295 _commandPort.handler = _handleCommand; |
| 296 } |
| 297 |
| 298 /// The command port receiving commands for the registry manager. |
| 299 /// |
| 300 /// Use this port with [Registry.fromPort] to link a registry to the |
| 301 /// manager in isolates where you can't send a [Registry] object directly. |
| 302 SendPort get commandPort => _commandPort.sendPort; |
| 303 |
| 304 /// Get a registry backed by this manager. |
| 305 /// |
| 306 /// This registry can be sent to other isolates created using |
| 307 /// [Isolate.spawn]. |
| 308 Registry get registry => |
| 309 new Registry.fromPort(_commandPort.sendPort, timeout: _timeout); |
| 310 |
| 311 // Used as argument to putIfAbsent. |
| 312 static Set<int> _createSet() => new HashSet<int>(); |
| 313 |
| 314 void _handleCommand(List command) { |
| 315 switch (command[0]) { |
| 316 case _ADD: |
| 317 _add(command[1], command[2], command[3]); |
| 318 return; |
| 319 case _REMOVE: |
| 320 _remove(command[1], command[2], command[3]); |
| 321 return; |
| 322 case _ADD_TAGS: |
| 323 _addTags(command[1], command[2], command[3]); |
| 324 return; |
| 325 case _REMOVE_TAGS: |
| 326 _removeTags(command[1], command[2], command[3]); |
| 327 return; |
| 328 case _GET_TAGS: |
| 329 _getTags(command[1], command[2]); |
| 330 return; |
| 331 case _FIND: |
| 332 _find(command[1], command[2], command[3]); |
| 333 return; |
| 334 default: |
| 335 throw new UnsupportedError("Unknown command: ${command[0]}"); |
| 336 } |
| 337 } |
| 338 |
| 339 void _add(Object object, List tags, SendPort replyPort) { |
| 340 int id = ++_nextId; |
| 341 var entry = new _RegistryEntry(id, object); |
| 342 _entries[id] = entry; |
| 343 if (tags != null) { |
| 344 for (var tag in tags) { |
| 345 entry.tags.add(tag); |
| 346 _tag2id.putIfAbsent(tag, _createSet).add(id); |
| 347 } |
| 348 } |
| 349 replyPort.send(list2(id, entry.removeCapability)); |
| 350 } |
| 351 |
| 352 void _remove(int id, Capability removeCapability, SendPort replyPort) { |
| 353 _RegistryEntry entry = _entries[id]; |
| 354 if (entry == null || entry.removeCapability != removeCapability) { |
| 355 replyPort.send(false); |
| 356 return; |
| 357 } |
| 358 _entries.remove(id); |
| 359 for (var tag in entry.tags) { |
| 360 _tag2id[tag].remove(id); |
| 361 } |
| 362 replyPort.send(true); |
| 363 } |
| 364 |
| 365 void _addTags(List<int> ids, List tags, SendPort replyPort) { |
| 366 assert(tags != null); |
| 367 assert(tags.isNotEmpty); |
| 368 for (int id in ids) { |
| 369 _RegistryEntry entry = _entries[id]; |
| 370 if (entry == null) continue; // Entry was removed. |
| 371 entry.tags.addAll(tags); |
| 372 for (var tag in tags) { |
| 373 Set ids = _tag2id.putIfAbsent(tag, _createSet); |
| 374 ids.add(id); |
| 375 } |
| 376 } |
| 377 replyPort.send(null); |
| 378 } |
| 379 |
| 380 void _removeTags(List<int> ids, List tags, SendPort replyPort) { |
| 381 assert(tags != null); |
| 382 assert(tags.isNotEmpty); |
| 383 for (int id in ids) { |
| 384 _RegistryEntry entry = _entries[id]; |
| 385 if (entry == null) continue; // Object was removed. |
| 386 entry.tags.removeAll(tags); |
| 387 } |
| 388 for (var tag in tags) { |
| 389 Set tagIds = _tag2id[tag]; |
| 390 if (tagIds == null) continue; |
| 391 tagIds.removeAll(ids); |
| 392 } |
| 393 replyPort.send(null); |
| 394 } |
| 395 |
| 396 void _getTags(int id, SendPort replyPort) { |
| 397 _RegistryEntry entry = _entries[id]; |
| 398 if (entry != null) { |
| 399 replyPort.send(entry.tags.toList(growable: false)); |
| 400 } else { |
| 401 replyPort.send(const []); |
| 402 } |
| 403 } |
| 404 |
| 405 Iterable<int> _findTaggedIds(List tags) { |
| 406 var matchingFirstTagIds = _tag2id[tags[0]]; |
| 407 if (matchingFirstTagIds == null) { |
| 408 return const []; |
| 409 } |
| 410 if (matchingFirstTagIds.isEmpty || tags.length == 1) { |
| 411 return matchingFirstTagIds; |
| 412 } |
| 413 // Create new set, then start removing ids not also matched |
| 414 // by other tags. |
| 415 Set<int> matchingIds = matchingFirstTagIds.toSet(); |
| 416 for (int i = 1; i < tags.length; i++) { |
| 417 var tagIds = _tag2id[tags[i]]; |
| 418 if (tagIds == null) return const []; |
| 419 matchingIds.retainAll(tagIds); |
| 420 if (matchingIds.isEmpty) break; |
| 421 } |
| 422 return matchingIds; |
| 423 } |
| 424 |
| 425 void _find(List tags, int max, SendPort replyPort) { |
| 426 assert(max == null || max > 0); |
| 427 List result = []; |
| 428 if (tags == null || tags.isEmpty) { |
| 429 var entries = _entries.values; |
| 430 if (max != null) entries = entries.take(max); |
| 431 for (_RegistryEntry entry in entries) { |
| 432 result.add(entry.id); |
| 433 result.add(entry.element); |
| 434 } |
| 435 replyPort.send(result); |
| 436 return; |
| 437 } |
| 438 var matchingIds = _findTaggedIds(tags); |
| 439 if (max == null) max = matchingIds.length; // All results. |
| 440 for (var id in matchingIds) { |
| 441 result.add(id); |
| 442 result.add(_entries[id].element); |
| 443 max--; |
| 444 if (max == 0) break; |
| 445 } |
| 446 replyPort.send(result); |
| 447 } |
| 448 |
| 449 /// Shut down the registry service. |
| 450 /// |
| 451 /// After this, all [Registry] operations will time out. |
| 452 void close() { |
| 453 _commandPort.close(); |
| 454 } |
| 455 } |
| 456 |
| 457 /// Entry in [RegistryManager]. |
| 458 class _RegistryEntry { |
| 459 final int id; |
| 460 final Object element; |
| 461 final Set tags = new HashSet(); |
| 462 final Capability removeCapability = new Capability(); |
| 463 _RegistryEntry(this.id, this.element); |
| 464 } |
| OLD | NEW |