Chromium Code Reviews| Index: appengine/swarming/handlers_backend.py |
| diff --git a/appengine/swarming/handlers_backend.py b/appengine/swarming/handlers_backend.py |
| index ee398790eeacbe1e2a39a0ee8a833567ce537c23..c3368496cc1304716cfda94d8142f324e5eef801 100644 |
| --- a/appengine/swarming/handlers_backend.py |
| +++ b/appengine/swarming/handlers_backend.py |
| @@ -4,6 +4,7 @@ |
| """Main entry point for Swarming backend handlers.""" |
| +import datetime |
| import json |
| import logging |
| @@ -16,11 +17,13 @@ from components import utils |
| import mapreduce_jobs |
| from components import decorators |
| +from components import datastore_utils |
| from components import machine_provider |
| from server import bot_management |
| from server import config |
| from server import lease_management |
| from server import stats |
| +from server import task_result |
| from server import task_scheduler |
| @@ -124,6 +127,40 @@ class CronBotsDimensionAggregationHandler(webapp2.RequestHandler): |
| ts=now).put() |
| +class CronTasksTagsAggregationHandler(webapp2.RequestHandler): |
| + """Aggregates all task tags from the last 48 hours.""" |
| + |
| + @decorators.require_cronjob |
| + def get(self): |
| + seen = {} |
| + now = utils.utcnow() |
| + count = 0 |
| + q = task_result.get_result_summaries_query( |
| + now - datetime.timedelta(days=2), None, 'created_ts', 'all', None) |
| + cursor = None |
| + while True: |
| + tasks, cursor = datastore_utils.fetch_page(q, 1000, cursor) |
|
M-A Ruel
2016/08/16 15:37:21
fetching 1000 at a time is going to be very slow.
kjlubick
2016/08/16 17:20:44
I tried iterating one at a time, even with a filte
|
| + count += len(tasks) |
| + for t in tasks: |
| + for i in t.tags: |
| + k, v = i.split(':', 1) |
| + if k != 'cron_invocation_id': |
|
M-A Ruel
2016/08/16 15:37:21
what's that?
kjlubick
2016/08/16 17:20:44
It appeared to be something that was different for
|
| + seen.setdefault(k, set()).add(v) |
| + if not cursor or len(tasks) == 0: |
| + break |
| + |
| + tags = [ |
| + task_result.TagValues(tag=k,values=sorted(values)) |
|
M-A Ruel
2016/08/16 15:37:21
task_result.TagValues(tag=k, values=sorted(values)
kjlubick
2016/08/16 17:20:44
Done.
|
| + for k, values in sorted(seen.iteritems()) |
| + ] |
| + |
| + logging.info('From %d tasks, saw tags %s', count, tags) |
| + task_result.TagAggregation( |
| + key=task_result.TagAggregation.KEY, |
| + tags=tags, |
| + ts=now).put() |
| + |
| + |
| class CronMachineProviderPubSubHandler(webapp2.RequestHandler): |
| """Listens for Pub/Sub communication from Machine Provider.""" |
| @@ -199,6 +236,8 @@ def get_routes(): |
| ('/internal/cron/trigger_cleanup_data', CronTriggerCleanupDataHandler), |
| ('/internal/cron/aggregate_bots_dimensions', |
| CronBotsDimensionAggregationHandler), |
| + ('/internal/cron/aggregate_tasks_tags', |
| + CronTasksTagsAggregationHandler), |
| ('/internal/cron/machine_provider', CronMachineProviderBotHandler), |
| ('/internal/cron/machine_provider_cleanup', |
| CronMachineProviderCleanUpHandler), |