| OLD | NEW |
| 1 # Copyright 2014 The LUCI Authors. All rights reserved. | 1 # Copyright 2014 The LUCI Authors. All rights reserved. |
| 2 # Use of this source code is governed under the Apache License, Version 2.0 | 2 # Use of this source code is governed under the Apache License, Version 2.0 |
| 3 # that can be found in the LICENSE file. | 3 # that can be found in the LICENSE file. |
| 4 | 4 |
| 5 """Main entry point for Swarming backend handlers.""" | 5 """Main entry point for Swarming backend handlers.""" |
| 6 | 6 |
| 7 import datetime | 7 import datetime |
| 8 import json | 8 import json |
| 9 import logging | 9 import logging |
| 10 | 10 |
| 11 import webapp2 | 11 import webapp2 |
| 12 from google.appengine.api import app_identity | |
| 13 from google.appengine.api import datastore_errors | 12 from google.appengine.api import datastore_errors |
| 14 from google.appengine.api import taskqueue | 13 from google.appengine.api import taskqueue |
| 14 from google.appengine.ext import ndb |
| 15 | 15 |
| 16 from components import utils | 16 from components import utils |
| 17 | 17 |
| 18 import mapreduce_jobs | 18 import mapreduce_jobs |
| 19 from components import decorators | 19 from components import decorators |
| 20 from components import datastore_utils | 20 from components import datastore_utils |
| 21 from components import machine_provider | 21 from components import machine_provider |
| 22 from server import bot_management | 22 from server import bot_management |
| 23 from server import config | 23 from server import config |
| 24 from server import lease_management | 24 from server import lease_management |
| (...skipping 30 matching lines...) Expand all Loading... |
| 55 """Triggers task to delete orphaned blobs.""" | 55 """Triggers task to delete orphaned blobs.""" |
| 56 | 56 |
| 57 @decorators.require_cronjob | 57 @decorators.require_cronjob |
| 58 def get(self): | 58 def get(self): |
| 59 taskqueue.add(method='POST', url='/internal/taskqueue/cleanup_data', | 59 taskqueue.add(method='POST', url='/internal/taskqueue/cleanup_data', |
| 60 queue_name='cleanup') | 60 queue_name='cleanup') |
| 61 self.response.headers['Content-Type'] = 'text/plain; charset=utf-8' | 61 self.response.headers['Content-Type'] = 'text/plain; charset=utf-8' |
| 62 self.response.out.write('Success.') | 62 self.response.out.write('Success.') |
| 63 | 63 |
| 64 | 64 |
| 65 class CronMachineProviderBotHandler(webapp2.RequestHandler): | 65 class CronMachineProviderConfigHandler(webapp2.RequestHandler): |
| 66 """Handles bots leased from the Machine Provider.""" | 66 """Configures entities to lease bots from the Machine Provider.""" |
| 67 | 67 |
| 68 @decorators.require_cronjob | 68 @decorators.require_cronjob |
| 69 def get(self): | 69 def get(self): |
| 70 BATCH_SIZE = 50 | |
| 71 | |
| 72 if not config.settings().mp.enabled: | 70 if not config.settings().mp.enabled: |
| 73 logging.info('MP support is disabled') | 71 logging.info('MP support is disabled') |
| 74 return | 72 return |
| 75 | 73 |
| 76 if config.settings().mp.server: | 74 if config.settings().mp.server: |
| 77 new_server = config.settings().mp.server | 75 new_server = config.settings().mp.server |
| 78 current_config = machine_provider.MachineProviderConfiguration().cached() | 76 current_config = machine_provider.MachineProviderConfiguration().cached() |
| 79 if new_server != current_config.instance_url: | 77 if new_server != current_config.instance_url: |
| 80 logging.info('Updating Machine Provider server to %s', new_server) | 78 logging.info('Updating Machine Provider server to %s', new_server) |
| 81 current_config.modify(instance_url=new_server) | 79 current_config.modify(instance_url=new_server) |
| 82 | 80 |
| 83 app_id = app_identity.get_application_id() | 81 lease_management.ensure_entities_exist() |
| 84 swarming_server = 'https://%s' % app_identity.get_default_version_hostname() | 82 lease_management.drain_excess() |
| 85 found = 0 | |
| 86 for machine_type_key in lease_management.MachineType.query().fetch( | |
| 87 keys_only=True): | |
| 88 lease_requests = lease_management.generate_lease_requests( | |
| 89 machine_type_key, app_id, swarming_server) | |
| 90 found += len(lease_requests) | |
| 91 responses = [] | |
| 92 while lease_requests: | |
| 93 response = machine_provider.lease_machines(lease_requests[:BATCH_SIZE]) | |
| 94 responses.extend(response.get('responses', [])) | |
| 95 lease_requests = lease_requests[BATCH_SIZE:] | |
| 96 if responses: | |
| 97 lease_management.update_leases(machine_type_key, responses) | |
| 98 logging.info('Updated %d', found) | |
| 99 | 83 |
| 100 | 84 |
| 101 class CronMachineProviderCleanUpHandler(webapp2.RequestHandler): | 85 class CronMachineProviderManagementHandler(webapp2.RequestHandler): |
| 102 """Cleans up leftover BotInfo entities.""" | 86 """Manages leases for bots from the Machine Provider.""" |
| 103 | 87 |
| 104 @decorators.require_cronjob | 88 @decorators.require_cronjob |
| 105 def get(self): | 89 def get(self): |
| 106 if not config.settings().mp.enabled: | 90 if not config.settings().mp.enabled: |
| 107 logging.info('MP support is disabled') | 91 logging.info('MP support is disabled') |
| 108 return | 92 return |
| 109 | 93 |
| 110 lease_management.clean_up_bots() | 94 lease_management.schedule_lease_management() |
| 111 | 95 |
| 112 | 96 |
| 113 class CronBotsDimensionAggregationHandler(webapp2.RequestHandler): | 97 class CronBotsDimensionAggregationHandler(webapp2.RequestHandler): |
| 114 """Aggregates all bots dimensions (except id) in the fleet.""" | 98 """Aggregates all bots dimensions (except id) in the fleet.""" |
| 115 | 99 |
| 116 @decorators.require_cronjob | 100 @decorators.require_cronjob |
| 117 def get(self): | 101 def get(self): |
| 118 seen = {} | 102 seen = {} |
| 119 now = utils.utcnow() | 103 now = utils.utcnow() |
| 120 for b in bot_management.BotInfo.query(): | 104 for b in bot_management.BotInfo.query(): |
| (...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 166 task_result.TagValues(tag=k, values=sorted(values or [])) | 150 task_result.TagValues(tag=k, values=sorted(values or [])) |
| 167 for k, values in sorted(seen.iteritems()) | 151 for k, values in sorted(seen.iteritems()) |
| 168 ] | 152 ] |
| 169 logging.info('From %d tasks, saw %d tags', count, len(tags)) | 153 logging.info('From %d tasks, saw %d tags', count, len(tags)) |
| 170 task_result.TagAggregation( | 154 task_result.TagAggregation( |
| 171 key=task_result.TagAggregation.KEY, | 155 key=task_result.TagAggregation.KEY, |
| 172 tags=tags, | 156 tags=tags, |
| 173 ts=now).put() | 157 ts=now).put() |
| 174 | 158 |
| 175 | 159 |
| 176 class CronMachineProviderPubSubHandler(webapp2.RequestHandler): | |
| 177 """Listens for Pub/Sub communication from Machine Provider.""" | |
| 178 | |
| 179 @decorators.require_cronjob | |
| 180 def get(self): | |
| 181 if not config.settings().mp.enabled: | |
| 182 logging.info('MP support is disabled') | |
| 183 return | |
| 184 | |
| 185 taskqueue.add( | |
| 186 method='POST', url='/internal/taskqueue/pubsub/machine_provider', | |
| 187 queue_name='machine-provider-pubsub') | |
| 188 self.response.headers['Content-Type'] = 'text/plain; charset=utf-8' | |
| 189 self.response.out.write('Success.') | |
| 190 | |
| 191 | |
| 192 class TaskCleanupDataHandler(webapp2.RequestHandler): | 160 class TaskCleanupDataHandler(webapp2.RequestHandler): |
| 193 """Deletes orphaned blobs.""" | 161 """Deletes orphaned blobs.""" |
| 194 | 162 |
| 195 @decorators.silence(datastore_errors.Timeout) | 163 @decorators.silence(datastore_errors.Timeout) |
| 196 @decorators.require_taskqueue('cleanup') | 164 @decorators.require_taskqueue('cleanup') |
| 197 def post(self): | 165 def post(self): |
| 198 # TODO(maruel): Clean up old TaskRequest after a cut-off date. | 166 # TODO(maruel): Clean up old TaskRequest after a cut-off date. |
| 199 self.response.headers['Content-Type'] = 'text/plain; charset=utf-8' | 167 self.response.headers['Content-Type'] = 'text/plain; charset=utf-8' |
| 200 self.response.out.write('Success.') | 168 self.response.out.write('Success.') |
| 201 | 169 |
| 202 | 170 |
| 203 class TaskMachineProviderPubSubHandler(webapp2.RequestHandler): | |
| 204 """Handles Pub/Sub messages from the Machine Provider.""" | |
| 205 | |
| 206 @decorators.require_taskqueue('machine-provider-pubsub') | |
| 207 def post(self): | |
| 208 app_id = app_identity.get_application_id() | |
| 209 lease_management.process_pubsub(app_id) | |
| 210 | |
| 211 | |
| 212 class TaskSendPubSubMessage(webapp2.RequestHandler): | 171 class TaskSendPubSubMessage(webapp2.RequestHandler): |
| 213 """Sends PubSub notification about task completion.""" | 172 """Sends PubSub notification about task completion.""" |
| 214 | 173 |
| 215 # Add task_id to the URL for better visibility in request logs. | 174 # Add task_id to the URL for better visibility in request logs. |
| 216 @decorators.require_taskqueue('pubsub') | 175 @decorators.require_taskqueue('pubsub') |
| 217 def post(self, task_id): # pylint: disable=unused-argument | 176 def post(self, task_id): # pylint: disable=unused-argument |
| 218 task_scheduler.task_handle_pubsub_task(json.loads(self.request.body)) | 177 task_scheduler.task_handle_pubsub_task(json.loads(self.request.body)) |
| 219 self.response.headers['Content-Type'] = 'text/plain; charset=utf-8' | 178 self.response.headers['Content-Type'] = 'text/plain; charset=utf-8' |
| 220 self.response.out.write('Success.') | 179 self.response.out.write('Success.') |
| 221 | 180 |
| 222 | 181 |
| 182 class TaskMachineProviderManagementHandler(webapp2.RequestHandler): |
| 183 """Manages a lease for a Machine Provider bot.""" |
| 184 |
| 185 @decorators.require_taskqueue('machine-provider-manage') |
| 186 def post(self): |
| 187 key = ndb.Key(urlsafe=self.request.get('key')) |
| 188 assert key.kind() == 'MachineLease', key |
| 189 lease_management.manage_lease(key) |
| 190 |
| 191 |
| 223 ### Mapreduce related handlers | 192 ### Mapreduce related handlers |
| 224 | 193 |
| 225 | 194 |
| 226 class InternalLaunchMapReduceJobWorkerHandler(webapp2.RequestHandler): | 195 class InternalLaunchMapReduceJobWorkerHandler(webapp2.RequestHandler): |
| 227 """Called via task queue or cron to start a map reduce job.""" | 196 """Called via task queue or cron to start a map reduce job.""" |
| 228 @decorators.require_taskqueue(mapreduce_jobs.MAPREDUCE_TASK_QUEUE) | 197 @decorators.require_taskqueue(mapreduce_jobs.MAPREDUCE_TASK_QUEUE) |
| 229 def post(self, job_id): # pylint: disable=R0201 | 198 def post(self, job_id): # pylint: disable=R0201 |
| 230 mapreduce_jobs.launch_job(job_id) | 199 mapreduce_jobs.launch_job(job_id) |
| 231 | 200 |
| 232 | 201 |
| (...skipping 10 matching lines...) Expand all Loading... |
| 243 ('/internal/cron/handle_bot_died', CronBotDiedHandler), | 212 ('/internal/cron/handle_bot_died', CronBotDiedHandler), |
| 244 ('/internal/cron/abort_expired_task_to_run', | 213 ('/internal/cron/abort_expired_task_to_run', |
| 245 CronAbortExpiredShardToRunHandler), | 214 CronAbortExpiredShardToRunHandler), |
| 246 | 215 |
| 247 ('/internal/cron/stats/update', stats.InternalStatsUpdateHandler), | 216 ('/internal/cron/stats/update', stats.InternalStatsUpdateHandler), |
| 248 ('/internal/cron/trigger_cleanup_data', CronTriggerCleanupDataHandler), | 217 ('/internal/cron/trigger_cleanup_data', CronTriggerCleanupDataHandler), |
| 249 ('/internal/cron/aggregate_bots_dimensions', | 218 ('/internal/cron/aggregate_bots_dimensions', |
| 250 CronBotsDimensionAggregationHandler), | 219 CronBotsDimensionAggregationHandler), |
| 251 ('/internal/cron/aggregate_tasks_tags', | 220 ('/internal/cron/aggregate_tasks_tags', |
| 252 CronTasksTagsAggregationHandler), | 221 CronTasksTagsAggregationHandler), |
| 253 ('/internal/cron/machine_provider', CronMachineProviderBotHandler), | 222 |
| 254 ('/internal/cron/machine_provider_cleanup', | 223 # Machine Provider. |
| 255 CronMachineProviderCleanUpHandler), | 224 ('/internal/cron/machine_provider_config', |
| 256 ('/internal/cron/machine_provider_pubsub', | 225 CronMachineProviderConfigHandler), |
| 257 CronMachineProviderPubSubHandler), | 226 ('/internal/cron/machine_provider_manage', |
| 227 CronMachineProviderManagementHandler), |
| 258 | 228 |
| 259 # Task queues. | 229 # Task queues. |
| 260 ('/internal/taskqueue/cleanup_data', TaskCleanupDataHandler), | 230 ('/internal/taskqueue/cleanup_data', TaskCleanupDataHandler), |
| 261 (r'/internal/taskqueue/pubsub/<task_id:[0-9a-f]+>', TaskSendPubSubMessage), | 231 (r'/internal/taskqueue/pubsub/<task_id:[0-9a-f]+>', TaskSendPubSubMessage), |
| 262 ('/internal/taskqueue/pubsub/machine_provider', | 232 ('/internal/taskqueue/machine-provider-manage', |
| 263 TaskMachineProviderPubSubHandler), | 233 TaskMachineProviderManagementHandler), |
| 264 | 234 |
| 265 # Mapreduce related urls. | 235 # Mapreduce related urls. |
| 266 (r'/internal/taskqueue/mapreduce/launch/<job_id:[^\/]+>', | 236 (r'/internal/taskqueue/mapreduce/launch/<job_id:[^\/]+>', |
| 267 InternalLaunchMapReduceJobWorkerHandler), | 237 InternalLaunchMapReduceJobWorkerHandler), |
| 268 ] | 238 ] |
| 269 return [webapp2.Route(*a) for a in routes] | 239 return [webapp2.Route(*a) for a in routes] |
| OLD | NEW |