| OLD | NEW |
| 1 # Copyright 2014 The LUCI Authors. All rights reserved. | 1 # Copyright 2014 The LUCI Authors. All rights reserved. |
| 2 # Use of this source code is governed under the Apache License, Version 2.0 | 2 # Use of this source code is governed under the Apache License, Version 2.0 |
| 3 # that can be found in the LICENSE file. | 3 # that can be found in the LICENSE file. |
| 4 | 4 |
| 5 """Main entry point for Swarming backend handlers.""" | 5 """Main entry point for Swarming backend handlers.""" |
| 6 | 6 |
| 7 import datetime | 7 import datetime |
| 8 import json | 8 import json |
| 9 import logging | 9 import logging |
| 10 | 10 |
| (...skipping 139 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 150 tasks, cursor = datastore_utils.fetch_page(q, 1000, cursor) | 150 tasks, cursor = datastore_utils.fetch_page(q, 1000, cursor) |
| 151 count += len(tasks) | 151 count += len(tasks) |
| 152 for t in tasks: | 152 for t in tasks: |
| 153 for i in t.tags: | 153 for i in t.tags: |
| 154 k, v = i.split(':', 1) | 154 k, v = i.split(':', 1) |
| 155 s = seen.setdefault(k, set()) | 155 s = seen.setdefault(k, set()) |
| 156 if s is not None: | 156 if s is not None: |
| 157 s.add(v) | 157 s.add(v) |
| 158 # 128 is arbitrary large number to avoid OOM | 158 # 128 is arbitrary large number to avoid OOM |
| 159 if len(s) >= 128: | 159 if len(s) >= 128: |
| 160 logging.info('Stripping tag %s because there are too many', k) | 160 logging.info('Limiting tag %s because there are too many', k) |
| 161 seen[k] = None | 161 seen[k] = None |
| 162 if not cursor or len(tasks) == 0: | 162 if not cursor or len(tasks) == 0: |
| 163 break | 163 break |
| 164 | 164 |
| 165 tags = [ | 165 tags = [ |
| 166 task_result.TagValues(tag=k, values=sorted(values)) | 166 task_result.TagValues(tag=k, values=sorted(values or [])) |
| 167 for k, values in sorted(seen.iteritems()) if values is not None | 167 for k, values in sorted(seen.iteritems()) |
| 168 ] | 168 ] |
| 169 | |
| 170 logging.info('From %d tasks, saw %d tags', count, len(tags)) | 169 logging.info('From %d tasks, saw %d tags', count, len(tags)) |
| 171 task_result.TagAggregation( | 170 task_result.TagAggregation( |
| 172 key=task_result.TagAggregation.KEY, | 171 key=task_result.TagAggregation.KEY, |
| 173 tags=tags, | 172 tags=tags, |
| 174 ts=now).put() | 173 ts=now).put() |
| 175 | 174 |
| 176 | 175 |
| 177 class CronMachineProviderPubSubHandler(webapp2.RequestHandler): | 176 class CronMachineProviderPubSubHandler(webapp2.RequestHandler): |
| 178 """Listens for Pub/Sub communication from Machine Provider.""" | 177 """Listens for Pub/Sub communication from Machine Provider.""" |
| 179 | 178 |
| (...skipping 81 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 261 ('/internal/taskqueue/cleanup_data', TaskCleanupDataHandler), | 260 ('/internal/taskqueue/cleanup_data', TaskCleanupDataHandler), |
| 262 (r'/internal/taskqueue/pubsub/<task_id:[0-9a-f]+>', TaskSendPubSubMessage), | 261 (r'/internal/taskqueue/pubsub/<task_id:[0-9a-f]+>', TaskSendPubSubMessage), |
| 263 ('/internal/taskqueue/pubsub/machine_provider', | 262 ('/internal/taskqueue/pubsub/machine_provider', |
| 264 TaskMachineProviderPubSubHandler), | 263 TaskMachineProviderPubSubHandler), |
| 265 | 264 |
| 266 # Mapreduce related urls. | 265 # Mapreduce related urls. |
| 267 (r'/internal/taskqueue/mapreduce/launch/<job_id:[^\/]+>', | 266 (r'/internal/taskqueue/mapreduce/launch/<job_id:[^\/]+>', |
| 268 InternalLaunchMapReduceJobWorkerHandler), | 267 InternalLaunchMapReduceJobWorkerHandler), |
| 269 ] | 268 ] |
| 270 return [webapp2.Route(*a) for a in routes] | 269 return [webapp2.Route(*a) for a in routes] |
| OLD | NEW |