Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(169)

Side by Side Diff: packages/isolate/lib/registry.dart

Issue 2990843002: Removed fixed dependencies (Closed)
Patch Set: Created 3 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « packages/isolate/lib/ports.dart ('k') | packages/isolate/lib/runner.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 /// An isolate-compatible object registry and lookup service.
6 library isolate.registry;
7
8 import 'dart:async' show Future, Completer, TimeoutException;
9 import 'dart:collection' show HashMap, HashSet;
10 import 'dart:isolate' show RawReceivePort, SendPort, Capability;
11
12 import 'ports.dart';
13 import 'src/lists.dart';
14
15 // Command tags.
16 const int _ADD = 0;
17 const int _REMOVE = 1;
18 const int _ADD_TAGS = 2;
19 const int _REMOVE_TAGS = 3;
20 const int _GET_TAGS = 4;
21 const int _FIND = 5;
22
23 /// An isolate-compatible object registry.
24 ///
25 /// Objects can be stored as elements of a registry,
26 /// have "tags" assigned to them, and be looked up by tag.
27 ///
28 /// A `Registry` object caches objects found using the [lookup]
29 /// method, or added using [add], and returns the same object every time
30 /// they are requested.
31 /// A different `Registry` object that works on the same registry will not
32 /// preserve the identity of elements
33 ///
34 /// It is recommended to only have one `Registry` object working on the
35 /// same registry in each isolate.
36 ///
37 /// When the registry is shared across isolates, both elements and tags must
38 /// be sendable between the isolates.
39 /// Between isolates spawned using [Isolate.spawn] from the same initial
40 /// isolate, most objects can be sent.
41 /// Only simple objects can be sent between isolates originating from different
42 /// [Isolate.spawnUri] calls.
43 class Registry<T> {
44 // Most operations fail if they haven't received a response for this duration.
45 final Duration _timeout;
46
47 // Each `Registry` object has a cache of objects being controlled by it.
48 // The cache is stored in an [Expando], not on the object.
49 // This allows sending the `Registry` object through a `SendPort` without
50 // also copying the cache.
51 static Expando _caches = new Expando();
52
53 /// Port for sending command to the central registry manager.
54 SendPort _commandPort;
55
56 /// Create a registry linked to a [RegistryManager] through [commandPort].
57 ///
58 /// In most cases, a registry is created by using the
59 /// [RegistryManager.registry] getter.
60 ///
61 /// If a registry is used between isolates created using [Isolate.spawnUri],
62 /// the `Registry` object can't be sent between the isolates directly.
63 /// Instead the [RegistryManager.commandPort] port can be sent and a
64 /// `Registry` created from the command port using this constructor.
65 ///
66 /// The optional [timeout] parameter can be set to the duration
67 /// this registry should wait before assuming that an operation
68 /// has failed.
69 Registry.fromPort(SendPort commandPort,
70 {Duration timeout: const Duration(seconds: 5)})
71 : _commandPort = commandPort,
72 _timeout = timeout;
73
74 _RegistryCache get _cache {
75 _RegistryCache cache = _caches[this];
76 if (cache != null) return cache;
77 cache = new _RegistryCache();
78 _caches[this] = cache;
79 return cache;
80 }
81
82 /// Check and get the identity of an element.
83 ///
84 /// Throws if [element] is not an element in the registry.
85 int _getId(T element) {
86 int id = _cache.id(element);
87 if (id == null) {
88 throw new StateError("Not an element: ${Error.safeToString(element)}");
89 }
90 return id;
91 }
92
93 /// Adds [element] to the registry with the provided tags.
94 ///
95 /// Fails if [element] is already in this registry.
96 /// An object is already in the registry if it has been added using [add],
97 /// or if it was returned by a [lookup] call on this registry object.
98 ///
99 /// Returns a capability that can be used with [remove] to remove
100 /// the element from the registry again.
101 ///
102 /// The [tags] can be used to distinguish some of the elements
103 /// from other elements. Any object can be used as a tag, as long as
104 /// it preserves equality when sent through a [SendPort].
105 /// This makes [Capability] objects a good choice for tags.
106 Future<Capability> add(T element, {Iterable tags}) {
107 _RegistryCache cache = _cache;
108 if (cache.contains(element)) {
109 return new Future<Capability>.sync(() {
110 throw new StateError(
111 "Object already in registry: ${Error.safeToString(element)}");
112 });
113 }
114 Completer completer = new Completer<Capability>();
115 SendPort port = singleCompletePort(completer, callback: (List response) {
116 assert(cache.isAdding(element));
117 int id = response[0];
118 Capability removeCapability = response[1];
119 cache.register(id, element);
120 return removeCapability;
121 }, timeout: _timeout, onTimeout: () {
122 cache.stopAdding(element);
123 throw new TimeoutException("Future not completed", _timeout);
124 });
125 if (tags != null) tags = tags.toList(growable: false);
126 cache.setAdding(element);
127 _commandPort.send(list4(_ADD, element, tags, port));
128 return completer.future;
129 }
130
131 /// Remove the element from the registry.
132 ///
133 /// Returns `true` if removing the element succeeded, or `false` if the
134 /// elements wasn't in the registry, or if it couldn't be removed.
135 ///
136 /// The [removeCapability] must be the same capability returned by [add]
137 /// when the object was added. If the capability is wrong, the
138 /// object is not removed, and this function returns false.
139 Future<bool> remove(T element, Capability removeCapability) {
140 int id = _cache.id(element);
141 if (id == null) {
142 return new Future<bool>.value(false);
143 }
144 Completer completer = new Completer<bool>();
145 SendPort port = singleCompletePort(completer, callback: (bool result) {
146 _cache.remove(id);
147 return result;
148 }, timeout: _timeout);
149 _commandPort.send(list4(_REMOVE, id, removeCapability, port));
150 return completer.future;
151 }
152
153 /// Add tags to an object in the registry.
154 ///
155 /// Each element of the registry has a number of tags associated with
156 /// it. A tag is either associated with an element or not, adding it more
157 /// than once does not make any difference.
158 ///
159 /// Tags are compared using [Object.==] equality.
160 ///
161 /// Fails if any of the elements are not in the registry.
162 Future addTags(Iterable<T> elements, Iterable tags) {
163 List ids = elements.map(_getId).toList(growable: false);
164 return _addTags(ids, tags);
165 }
166
167 /// Remove tags from an object in the registry.
168 ///
169 /// After this operation, the [elements] will not be associated to the [tags].
170 /// It doesn't matter whether the elements were associated with the tags
171 /// before or not.
172 ///
173 /// Fails if any of the elements are not in the registry.
174 Future removeTags(Iterable<T> elements, Iterable tags) {
175 List ids = elements.map(_getId).toList(growable: false);
176 tags = tags.toList(growable: false);
177 Completer completer = new Completer();
178 SendPort port = singleCompletePort(completer, timeout: _timeout);
179 _commandPort.send(list4(_REMOVE_TAGS, ids, tags, port));
180 return completer.future;
181 }
182
183 Future _addTags(List<int> ids, Iterable tags) {
184 tags = tags.toList(growable: false);
185 Completer completer = new Completer();
186 SendPort port = singleCompletePort(completer, timeout: _timeout);
187 _commandPort.send(list4(_ADD_TAGS, ids, tags, port));
188 return completer.future;
189 }
190
191 /// Finds a number of elements that have all the desired [tags].
192 ///
193 /// If [tags] is omitted or empty, any element of the registry can be
194 /// returned.
195 ///
196 /// If [max] is specified, it must be greater than zero.
197 /// In that case, at most the first `max` results are returned,
198 /// in whatever order the registry finds its results.
199 /// Otherwise all matching elements are returned.
200 Future<List<T>> lookup({Iterable tags, int max}) {
201 if (max != null && max < 1) {
202 throw new RangeError.range(max, 1, null, "max");
203 }
204 if (tags != null) tags = tags.toList(growable: false);
205 Completer completer = new Completer<List<T>>();
206 SendPort port = singleCompletePort(completer, callback: (List response) {
207 // Response is even-length list of (id, element) pairs.
208 _RegistryCache cache = _cache;
209 int count = response.length ~/ 2;
210 List result = new List(count);
211 for (int i = 0; i < count; i++) {
212 int id = response[i * 2];
213 var element = response[i * 2 + 1];
214 element = cache.register(id, element);
215 result[i] = element;
216 }
217 return result;
218 }, timeout: _timeout);
219 _commandPort.send(list4(_FIND, tags, max, port));
220 return completer.future;
221 }
222 }
223
224 /// Isolate-local cache used by a [Registry].
225 ///
226 /// Maps between id-numbers and elements.
227 /// An object is considered an element of the registry if it
228 class _RegistryCache {
229 // Temporary marker until an object gets an id.
230 static const int _BEING_ADDED = -1;
231
232 final Map<int, Object> id2object = new HashMap();
233 final Map<Object, int> object2id = new HashMap.identity();
234
235 int id(Object object) {
236 int result = object2id[object];
237 if (result == _BEING_ADDED) return null;
238 return result;
239 }
240
241 Object operator [](int id) => id2object[id];
242
243 // Register a pair of id/object in the cache.
244 // if the id is already in the cache, just return the existing
245 // object.
246 Object register(int id, Object object) {
247 object = id2object.putIfAbsent(id, () {
248 object2id[object] = id;
249 return object;
250 });
251 return object;
252 }
253
254 bool isAdding(element) => object2id[element] == _BEING_ADDED;
255
256 void setAdding(element) {
257 assert(!contains(element));
258 object2id[element] = _BEING_ADDED;
259 }
260
261 void stopAdding(element) {
262 assert(object2id[element] == _BEING_ADDED);
263 object2id.remove(element);
264 }
265
266 void remove(int id) {
267 var element = id2object.remove(id);
268 if (element != null) {
269 object2id.remove(element);
270 }
271 }
272
273 bool contains(element) => object2id.containsKey(element);
274 }
275
276 /// The central repository used by distributed [Registry] instances.
277 class RegistryManager {
278 final Duration _timeout;
279 int _nextId = 0;
280 RawReceivePort _commandPort;
281
282 /// Maps id to entry. Each entry contains the id, the element, its tags,
283 /// and a capability required to remove it again.
284 Map<int, _RegistryEntry> _entries = new HashMap();
285 Map<Object, Set<int>> _tag2id = new HashMap();
286
287 /// Create a new registry managed by the created [RegistryManager].
288 ///
289 /// The optional [timeout] parameter can be set to the duration
290 /// registry objects should wait before assuming that an operation
291 /// has failed.
292 RegistryManager({timeout: const Duration(seconds: 5)})
293 : _timeout = timeout,
294 _commandPort = new RawReceivePort() {
295 _commandPort.handler = _handleCommand;
296 }
297
298 /// The command port receiving commands for the registry manager.
299 ///
300 /// Use this port with [Registry.fromPort] to link a registry to the
301 /// manager in isolates where you can't send a [Registry] object directly.
302 SendPort get commandPort => _commandPort.sendPort;
303
304 /// Get a registry backed by this manager.
305 ///
306 /// This registry can be sent to other isolates created using
307 /// [Isolate.spawn].
308 Registry get registry =>
309 new Registry.fromPort(_commandPort.sendPort, timeout: _timeout);
310
311 // Used as argument to putIfAbsent.
312 static Set<int> _createSet() => new HashSet<int>();
313
314 void _handleCommand(List command) {
315 switch (command[0]) {
316 case _ADD:
317 _add(command[1], command[2], command[3]);
318 return;
319 case _REMOVE:
320 _remove(command[1], command[2], command[3]);
321 return;
322 case _ADD_TAGS:
323 _addTags(command[1], command[2], command[3]);
324 return;
325 case _REMOVE_TAGS:
326 _removeTags(command[1], command[2], command[3]);
327 return;
328 case _GET_TAGS:
329 _getTags(command[1], command[2]);
330 return;
331 case _FIND:
332 _find(command[1], command[2], command[3]);
333 return;
334 default:
335 throw new UnsupportedError("Unknown command: ${command[0]}");
336 }
337 }
338
339 void _add(Object object, List tags, SendPort replyPort) {
340 int id = ++_nextId;
341 var entry = new _RegistryEntry(id, object);
342 _entries[id] = entry;
343 if (tags != null) {
344 for (var tag in tags) {
345 entry.tags.add(tag);
346 _tag2id.putIfAbsent(tag, _createSet).add(id);
347 }
348 }
349 replyPort.send(list2(id, entry.removeCapability));
350 }
351
352 void _remove(int id, Capability removeCapability, SendPort replyPort) {
353 _RegistryEntry entry = _entries[id];
354 if (entry == null || entry.removeCapability != removeCapability) {
355 replyPort.send(false);
356 return;
357 }
358 _entries.remove(id);
359 for (var tag in entry.tags) {
360 _tag2id[tag].remove(id);
361 }
362 replyPort.send(true);
363 }
364
365 void _addTags(List<int> ids, List tags, SendPort replyPort) {
366 assert(tags != null);
367 assert(tags.isNotEmpty);
368 for (int id in ids) {
369 _RegistryEntry entry = _entries[id];
370 if (entry == null) continue; // Entry was removed.
371 entry.tags.addAll(tags);
372 for (var tag in tags) {
373 Set ids = _tag2id.putIfAbsent(tag, _createSet);
374 ids.add(id);
375 }
376 }
377 replyPort.send(null);
378 }
379
380 void _removeTags(List<int> ids, List tags, SendPort replyPort) {
381 assert(tags != null);
382 assert(tags.isNotEmpty);
383 for (int id in ids) {
384 _RegistryEntry entry = _entries[id];
385 if (entry == null) continue; // Object was removed.
386 entry.tags.removeAll(tags);
387 }
388 for (var tag in tags) {
389 Set tagIds = _tag2id[tag];
390 if (tagIds == null) continue;
391 tagIds.removeAll(ids);
392 }
393 replyPort.send(null);
394 }
395
396 void _getTags(int id, SendPort replyPort) {
397 _RegistryEntry entry = _entries[id];
398 if (entry != null) {
399 replyPort.send(entry.tags.toList(growable: false));
400 } else {
401 replyPort.send(const []);
402 }
403 }
404
405 Iterable<int> _findTaggedIds(List tags) {
406 var matchingFirstTagIds = _tag2id[tags[0]];
407 if (matchingFirstTagIds == null) {
408 return const [];
409 }
410 if (matchingFirstTagIds.isEmpty || tags.length == 1) {
411 return matchingFirstTagIds;
412 }
413 // Create new set, then start removing ids not also matched
414 // by other tags.
415 Set<int> matchingIds = matchingFirstTagIds.toSet();
416 for (int i = 1; i < tags.length; i++) {
417 var tagIds = _tag2id[tags[i]];
418 if (tagIds == null) return const [];
419 matchingIds.retainAll(tagIds);
420 if (matchingIds.isEmpty) break;
421 }
422 return matchingIds;
423 }
424
425 void _find(List tags, int max, SendPort replyPort) {
426 assert(max == null || max > 0);
427 List result = [];
428 if (tags == null || tags.isEmpty) {
429 var entries = _entries.values;
430 if (max != null) entries = entries.take(max);
431 for (_RegistryEntry entry in entries) {
432 result.add(entry.id);
433 result.add(entry.element);
434 }
435 replyPort.send(result);
436 return;
437 }
438 var matchingIds = _findTaggedIds(tags);
439 if (max == null) max = matchingIds.length; // All results.
440 for (var id in matchingIds) {
441 result.add(id);
442 result.add(_entries[id].element);
443 max--;
444 if (max == 0) break;
445 }
446 replyPort.send(result);
447 }
448
449 /// Shut down the registry service.
450 ///
451 /// After this, all [Registry] operations will time out.
452 void close() {
453 _commandPort.close();
454 }
455 }
456
457 /// Entry in [RegistryManager].
458 class _RegistryEntry {
459 final int id;
460 final Object element;
461 final Set tags = new HashSet();
462 final Capability removeCapability = new Capability();
463 _RegistryEntry(this.id, this.element);
464 }
OLDNEW
« no previous file with comments | « packages/isolate/lib/ports.dart ('k') | packages/isolate/lib/runner.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698