Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(411)

Side by Side Diff: lib/registry.dart

Issue 928663003: Add IsolateRunner as a helper around Isolate. (Closed) Base URL: https://github.com/dart-lang/isolate.git@master
Patch Set: Add .status. Created 5 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « lib/ports.dart ('k') | lib/runner.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 /**
6 * An isolate-compatible object registry and lookup service.
7 */
8 library dart.pkg.isolate.registry;
9
10 import "dart:async" show Future, Completer, TimeoutException;
11 import "dart:isolate" show RawReceivePort, SendPort, Capability;
12 import "dart:collection" show HashMap, HashSet;
13 import "ports.dart";
14 import "src/lists.dart";
15
16 // Command tags.
17 const int _ADD = 0;
18 const int _REMOVE = 1;
19 const int _ADD_TAGS = 2;
20 const int _REMOVE_TAGS = 3;
21 const int _GET_TAGS = 4;
22 const int _FIND = 5;
23
24 /**
25 * An isolate-compatible object registry.
26 *
27 * Objects can be stored as elements of a registry,
28 * have "tags" assigned to them, and be looked up by tag.
29 *
30 * A `Registry` object caches objects found using the [lookup]
31 * method, or added using [add], and returns the same object every time
32 * they are requested.
33 * A different `Registry` object that works on the same registry will not
34 * preserve the identity of elements
35 *
36 * It is recommended to only have one `Registry` object working on the
37 * same registry in each isolate.
38 *
39 * When the registry is shared accross isolates, both elements and tags must
40 * be sendable between the isolates.
41 * Between isolates spawned using [Isolate.spawn] from the same initial
42 * isolate, most objectes can be sent.
43 * Only simple objects can be sent between isolates originating from different
44 * [Isolate.spawnUri] calls.
45 */
46 class Registry<T> {
47 // Most operations fail if they haven't received a response for this duration.
48 final Duration _timeout;
49
50 // Each `Registry` object has a cache of objects being controlled by it.
51 // The cache is stored in an [Expando], not on the object.
52 // This allows sending the `Registry` object through a `SendPort` without
53 // also copying the cache.
54 static Expando _caches = new Expando();
55
56 /**
57 * Port for sending command to the central registry mananger.
58 */
59 SendPort _commandPort;
60
61 /**
62 * Create a registry linked to a [RegistryManager] through [commandPort].
63 *
64 * In most cases, a registry is created by using the
65 * [RegistryManager.registry] getter.
66 *
67 * If a registry is used between isolates created using [Isolate.spawnUri],
68 * the `Registry` object can't be sent between the isolates directly.
69 * Instead the [RegistryManager.commandPort] port can be sent and a
70 * `Registry` created from the command port using this constructor.
71 *
72 * The optional [timeout] parameter can be set to the duration
73 * this registry should wait before assuming that an operation
74 * has failed.
75 */
76 Registry.fromPort(SendPort commandPort,
77 {Duration timeout: const Duration(seconds: 5)})
78 : _commandPort = commandPort,
79 _timeout = timeout;
80
81 _RegistryCache get _cache {
82 _RegistryCache cache = _caches[this];
83 if (cache != null) return cache;
84 cache = new _RegistryCache();
85 _caches[this] = cache;
86 return cache;
87 }
88
89 /**
90 * Check and get the identity of an element.
91 *
92 * Throws if [element] is not an element in the registry.
93 */
94 int _getId(T element) {
95 int id = _cache.id(element);
96 if (id == null) {
97 throw new StateError("Not an element: ${Error.safeToString(element)}");
98 }
99 return id;
100 }
101
102 /**
103 * Adds [element] to the registry with the provided tags.
104 *
105 * Fails if [element] is already in this registry.
106 * An object is already in the registry if it has been added using [add],
107 * or if it was returned by a [lookup] call on this registry object.
108 *
109 * Returns a capability that can be used with [remove] to remove
110 * the element from the registry again.
111 *
112 * The [tags] can be used to distinguish some of the elements
113 * from other elements. Any object can be used as a tag, as long as
114 * it preserves equality when sent through a [SendPort].
115 * This makes [Capability] objects a good choice for tags.
116 */
117 Future<Capability> add(T element, {Iterable tags}) {
118 _RegistryCache cache = _cache;
119 if (cache.contains(element)) {
120 return new Future<Capability>.sync(() {
121 throw new StateError(
122 "Object already in registry: ${Error.safeToString(element)}");
123 });
124 }
125 Completer completer = new Completer<Capability>();
126 SendPort port = singleCompletePort(completer, callback: (List response) {
127 assert(cache.isAdding(element));
128 int id = response[0];
129 Capability removeCapability = response[1];
130 cache.register(id, element);
131 return removeCapability;
132 }, timeout: _timeout, onTimeout: () {
133 cache.stopAdding(element);
134 throw new TimeoutException("Future not completed", _timeout);
135 });
136 if (tags != null) tags = tags.toList(growable: false);
137 cache.setAdding(element);
138 _commandPort.send(list4(_ADD, element, tags, port));
139 return completer.future;
140 }
141
142 /**
143 * Remove the element from the registry.
144 *
145 * Returns `true` if removing the element succeeded, or `false` if the
146 * elements wasn't in the registry, or if it couldn't be removed.
147 *
148 * The [removeCapability] must be the same capability returned by [add]
149 * when the object was added. If the capability is wrong, the
150 * object is not removed, and this function returns false.
151 */
152 Future<bool> remove(T element, Capability removeCapability) {
153 int id = _cache.id(element);
154 if (id == null) {
155 return new Future<bool>.value(false);
156 }
157 Completer completer = new Completer<bool>();
158 SendPort port = singleCompletePort(completer, callback: (bool result) {
159 _cache.remove(id);
160 return result;
161 }, timeout: _timeout);
162 _commandPort.send(list4(_REMOVE, id, removeCapability, port));
163 return completer.future;
164 }
165
166 /**
167 * Add tags to an object in the registry.
168 *
169 * Each element of the registry has a number of tags associated with
170 * it. A tag is either associated with an element or not, adding it more
171 * than once does not make any difference.
172 *
173 * Tags are compared using [Object.==] equality.
174 *
175 * Fails if any of the elements are not in the registry.
176 */
177 Future addTags(Iterable<T> elements, Iterable tags) {
178 List ids = elements.map(_getId).toList(growable: false);
179 return _addTags(ids, tags);
180 }
181
182 /**
183 * Remove tags from an object in the registry.
184 *
185 * After this operation, the [elements] will not be associated to the [tags].
186 * It doesn't matter whether the elements were associated with the tags
187 * before or not.
188 *
189 * Fails if any of the elements are not in the registry.
190 */
191 Future removeTags(Iterable<T> elements, Iterable tags) {
192 List ids = elements.map(_getId).toList(growable: false);
193 tags = tags.toList(growable: false);
194 Completer completer = new Completer();
195 SendPort port = singleCompletePort(completer, timeout: _timeout);
196 _commandPort.send(list4(_REMOVE_TAGS, ids, tags, port));
197 return completer.future;
198 }
199
200 Future _addTags(List<int> ids, Iterable tags) {
201 tags = tags.toList(growable: false);
202 Completer completer = new Completer();
203 SendPort port = singleCompletePort(completer, timeout: _timeout);
204 _commandPort.send(list4(_ADD_TAGS, ids, tags, port));
205 return completer.future;
206 }
207
208 /**
209 * Finds a number of elements that have all the desired [tags].
210 *
211 * If [tags] is omitted or empty, any element of the registry can be
212 * returned.
213 *
214 * If [max] is specified, it must be greater than zero.
215 * In that case, at most the first `max` results are returned,
216 * in whatever order the registry finds its results.
217 * Otherwise all matching elements are returned.
218 */
219 Future<List<T>> lookup({Iterable tags, int max}) {
220 if (max != null && max < 1) {
221 throw new RangeError.range(max, 1, null, "max");
222 }
223 if (tags != null) tags = tags.toList(growable: false);
224 Completer completer = new Completer<List<T>>();
225 SendPort port = singleCompletePort(completer, callback: (List response) {
226 // Response is even-length list of (id, element) pairs.
227 _RegistryCache cache = _cache;
228 int count = response.length ~/ 2;
229 List result = new List(count);
230 for (int i = 0; i < count; i++) {
231 int id = response[i * 2];
232 var element = response[i * 2 + 1];
233 element = cache.register(id, element);
234 result[i] = element;
235 }
236 return result;
237 }, timeout: _timeout);
238 _commandPort.send(list4(_FIND, tags, max, port));
239 return completer.future;
240 }
241 }
242
243 /**
244 * Isolate-local cache used by a [Registry].
245 *
246 * Maps between id-numbers and elements.
247 * An object is considered an element of the registry if it
248 */
249 class _RegistryCache {
250 // Temporary marker until an object gets an id.
251 static const int _BEING_ADDED = -1;
252
253 final Map<int, Object> id2object = new HashMap();
254 final Map<Object, int> object2id = new HashMap.identity();
255
256 int id(Object object) {
257 int result = object2id[object];
258 if (result == _BEING_ADDED) return null;
259 return result;
260 }
261
262 Object operator[](int id) => id2object[id];
263
264 // Register a pair of id/object in the cache.
265 // if the id is already in the cache, just return the existing
266 // object.
267 Object register(int id, Object object) {
268 object = id2object.putIfAbsent(id, () {
269 object2id[object] = id;
270 return object;
271 });
272 return object;
273 }
274
275 bool isAdding(element) => object2id[element] == _BEING_ADDED;
276
277 void setAdding(element) {
278 assert(!contains(element));
279 object2id[element] = _BEING_ADDED;
280 }
281
282 void stopAdding(element) {
283 assert(object2id[element] == _BEING_ADDED);
284 object2id.remove(element);
285 }
286
287 void remove(int id) {
288 var element = id2object.remove(id);
289 if (element != null) {
290 object2id.remove(element);
291 }
292 }
293
294 bool contains(element) => object2id.containsKey(element);
295 }
296
297 /**
298 * The central repository used by distributed [Registry] instances.
299 */
300 class RegistryManager {
301 final Duration _timeout;
302 int _nextId = 0;
303 RawReceivePort _commandPort;
304
305 /**
306 * Maps id to entry. Each entry contains the id, the element, its tags,
307 * and a capability required to remove it again.
308 */
309 Map<int, _RegistryEntry> _entries = new HashMap();
310 Map<Object, Set<int>> _tag2id = new HashMap();
311
312 /**
313 * Create a new registry managed by the created [RegistryManager].
314 *
315 * The optional [timeout] parameter can be set to the duration
316 * registry objects should wait before assuming that an operation
317 * has failed.
318 */
319 RegistryManager({timeout: const Duration(seconds: 5)})
320 : _timeout = timeout,
321 _commandPort = new RawReceivePort() {
322 _commandPort.handler = _handleCommand;
323 }
324
325 /**
326 * The command port receiving commands for the registry manager.
327 *
328 * Use this port with [Registry.fromPort] to link a registry to the
329 * manager in isolates where you can't send a [Registry] object directly.
330 */
331 SendPort get commandPort => _commandPort.sendPort;
332
333 /**
334 * Get a registry backed by this manager.
335 *
336 * This registry can be sent to other isolates created using
337 * [Isolate.spawn].
338 */
339 Registry get registry => new Registry.fromPort(_commandPort.sendPort,
340 timeout: _timeout);
341
342 // Used as argument to putIfAbsent.
343 static Set _createSet() => new HashSet();
344
345 void _handleCommand(List command) {
346 switch(command[0]) {
347 case _ADD:
348 _add(command[1], command[2], command[3]);
349 return;
350 case _REMOVE:
351 _remove(command[1], command[2], command[3]);
352 return;
353 case _ADD_TAGS:
354 _addTags(command[1], command[2], command[3]);
355 return;
356 case _REMOVE_TAGS:
357 _removeTags(command[1], command[2], command[3]);
358 return;
359 case _GET_TAGS:
360 _getTags(command[1], command[2]);
361 return;
362 case _FIND:
363 _find(command[1], command[2], command[3]);
364 return;
365 default:
366 throw new UnsupportedError("Unknown command: ${command[0]}");
367 }
368 }
369
370 void _add(Object object, List tags, SendPort replyPort) {
371 int id = ++_nextId;
372 var entry = new _RegistryEntry(id, object);
373 _entries[id] = entry;
374 if (tags != null) {
375 for (var tag in tags) {
376 entry.tags.add(tag);
377 _tag2id.putIfAbsent(tag, _createSet).add(id);
378 }
379 }
380 replyPort.send(list2(id, entry.removeCapability));
381 }
382
383 void _remove(int id, Capability removeCapability, SendPort replyPort) {
384 _RegistryEntry entry = _entries[id];
385 if (entry == null || entry.removeCapability != removeCapability) {
386 replyPort.send(false);
387 return;
388 }
389 _entries.remove(id);
390 for (var tag in entry.tags) {
391 _tag2id[tag].remove(id);
392 }
393 replyPort.send(true);
394 }
395
396 void _addTags(List<int> ids, List tags, SendPort replyPort) {
397 assert(tags != null);
398 assert(tags.isNotEmpty);
399 for (int id in ids) {
400 _RegistryEntry entry = _entries[id];
401 if (entry == null) continue; // Entry was removed.
402 entry.tags.addAll(tags);
403 for (var tag in tags) {
404 Set ids = _tag2id.putIfAbsent(tag, _createSet);
405 ids.add(id);
406 }
407 }
408 replyPort.send(null);
409 }
410
411 void _removeTags(List<int> ids, List tags, SendPort replyPort) {
412 assert(tags != null);
413 assert(tags.isNotEmpty);
414 for (int id in ids) {
415 _RegistryEntry entry = _entries[id];
416 if (entry == null) continue; // Object was removed.
417 entry.tags.removeAll(tags);
418 }
419 for (var tag in tags) {
420 Set tagIds = _tag2id[tag];
421 if (tagIds == null) continue;
422 tagIds.removeAll(ids);
423 }
424 replyPort.send(null);
425 }
426
427 void _getTags(int id, SendPort replyPort) {
428 _RegistryEntry entry = _entries[id];
429 if (entry != null) {
430 replyPort.send(entry.tags.toList(growable: false));
431 } else {
432 replyPort.send(const []);
433 }
434 }
435
436 Iterable<int> _findTaggedIds(List tags) {
437 var matchingFirstTagIds = _tag2id[tags[0]];
438 if (matchingFirstTagIds == null) {
439 return const [];
440 }
441 if (matchingFirstTagIds.isEmpty || tags.length == 1) {
442 return matchingFirstTagIds;
443 }
444 // Create new set, then start removing ids not also matched
445 // by other tags.
446 Set<int> matchingIds = matchingFirstTagIds.toSet();
447 for (int i = 1; i < tags.length; i++) {
448 var tagIds = _tag2id[tags[i]];
449 if (tagIds == null) return const [];
450 matchingIds.retainAll(tagIds);
451 if (matchingIds.isEmpty) break;
452 }
453 return matchingIds;
454 }
455
456 void _find(List tags, int max, SendPort replyPort) {
457 assert(max == null || max > 0);
458 List result = [];
459 if (tags == null || tags.isEmpty) {
460 var entries = _entries.values;
461 if (max != null) entries = entries.take(max);
462 for (_RegistryEntry entry in entries) {
463 result.add(entry.id);
464 result.add(entry.element);
465 }
466 replyPort.send(result);
467 return;
468 }
469 var matchingIds = _findTaggedIds(tags);
470 if (max == null) max = matchingIds.length; // All results.
471 for (var id in matchingIds) {
472 result.add(id);
473 result.add(_entries[id].element);
474 max--;
475 if (max == 0) break;
476 }
477 replyPort.send(result);
478 }
479
480 /**
481 * Shut down the registry service.
482 *
483 * After this, all [Registry] operations will time out.
484 */
485 void close() {
486 _commandPort.close();
487 }
488 }
489
490 /** Entry in [RegistryManager]. */
491 class _RegistryEntry {
492 final int id;
493 final Object element;
494 final Set tags = new HashSet();
495 final Capability removeCapability = new Capability();
496 _RegistryEntry(this.id, this.element);
497 }
OLDNEW
« no previous file with comments | « lib/ports.dart ('k') | lib/runner.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698