Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(502)

Side by Side Diff: appengine/gce-backend/utilities.py

Issue 2713533002: Refactor task enqueuing (Closed)
Patch Set: Remove metric Created 3 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « appengine/gce-backend/metadata.py ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 # Copyright 2016 The LUCI Authors. All rights reserved. 1 # Copyright 2016 The LUCI Authors. All rights reserved.
2 # Use of this source code is governed under the Apache License, Version 2.0 2 # Use of this source code is governed under the Apache License, Version 2.0
3 # that can be found in the LICENSE file. 3 # that can be found in the LICENSE file.
4 4
5 """Utilities for GCE Backend.""" 5 """Utilities for GCE Backend."""
6 6
7 import hashlib 7 import hashlib
8 import json 8 import json
9 9
10 from google.appengine.ext import ndb 10 from google.appengine.ext import ndb
11 11
12 from components import utils
13
12 14
13 def batch_process_async(items, f, max_concurrent=50): 15 def batch_process_async(items, f, max_concurrent=50):
14 """Processes asynchronous calls in parallel, but batched. 16 """Processes asynchronous calls in parallel, but batched.
15 17
16 Args: 18 Args:
17 items: List of items to process. 19 items: List of items to process.
18 f: f(item) -> ndb.Future. Asynchronous function to apply to each item. 20 f: f(item) -> ndb.Future. Asynchronous function to apply to each item.
19 max_concurrent: Maximum number of futures to have pending concurrently. 21 max_concurrent: Maximum number of futures to have pending concurrently.
20 """ 22 """
21 futures = [] 23 futures = []
22 while items: 24 while items:
23 num_futures = len(futures) 25 num_futures = len(futures)
24 if num_futures < max_concurrent: 26 if num_futures < max_concurrent:
25 futures.extend([f(item) for item in items[:max_concurrent - num_futures]]) 27 futures.extend([f(item) for item in items[:max_concurrent - num_futures]])
26 items = items[max_concurrent - num_futures:] 28 items = items[max_concurrent - num_futures:]
27 ndb.Future.wait_any(futures) 29 ndb.Future.wait_any(futures)
28 futures = [future for future in futures if not future.done()] 30 futures = [future for future in futures if not future.done()]
29 if futures: 31 if futures:
30 ndb.Future.wait_all(futures) 32 ndb.Future.wait_all(futures)
31 33
32 34
33 def compute_checksum(json_encodable): 35 def compute_checksum(json_encodable):
34 """Computes a checksum from a JSON-encodable dict or list.""" 36 """Computes a checksum from a JSON-encodable dict or list."""
35 return hashlib.sha1(json.dumps(json_encodable, sort_keys=True)).hexdigest() 37 return hashlib.sha1(json.dumps(json_encodable, sort_keys=True)).hexdigest()
38
39
40 def enqueue_task(taskqueue, key):
41 """Enqueues a task for the specified task queue to process the given key.
42
43 Args:
44 taskqueue: Name of the task queue.
45 key: ndb.Key to pass as a parameter to the task queue.
46 """
47 utils.enqueue_task(
48 '/internal/queues/%s' % taskqueue,
49 taskqueue,
50 params={
51 'key': key.urlsafe(),
52 },
53 )
OLDNEW
« no previous file with comments | « appengine/gce-backend/metadata.py ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698