Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(34)

Side by Side Diff: appengine/swarming/handlers_backend.py

Issue 2832203002: task_queues: Add more scaffolding (Closed)
Patch Set: Addressed comments Created 3 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « appengine/swarming/cron.yaml ('k') | appengine/swarming/handlers_endpoints_test.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 # Copyright 2014 The LUCI Authors. All rights reserved. 1 # Copyright 2014 The LUCI Authors. All rights reserved.
2 # Use of this source code is governed under the Apache License, Version 2.0 2 # Use of this source code is governed under the Apache License, Version 2.0
3 # that can be found in the LICENSE file. 3 # that can be found in the LICENSE file.
4 4
5 """Main entry point for Swarming backend handlers.""" 5 """Main entry point for Swarming backend handlers."""
6 6
7 import datetime 7 import datetime
8 import json 8 import json
9 import logging 9 import logging
10 10
11 import webapp2 11 import webapp2
12 from google.appengine.api import datastore_errors 12 from google.appengine.api import datastore_errors
13 from google.appengine.ext import ndb 13 from google.appengine.ext import ndb
14 14
15 from components import utils 15 from components import utils
16 16
17 import mapreduce_jobs 17 import mapreduce_jobs
18 from components import decorators 18 from components import decorators
19 from components import datastore_utils 19 from components import datastore_utils
20 from components import machine_provider 20 from components import machine_provider
21 from server import bot_management 21 from server import bot_management
22 from server import config 22 from server import config
23 from server import lease_management 23 from server import lease_management
24 from server import stats 24 from server import stats
25 from server import task_pack 25 from server import task_pack
26 from server import task_queues
26 from server import task_result 27 from server import task_result
27 from server import task_scheduler 28 from server import task_scheduler
28 import ts_mon_metrics 29 import ts_mon_metrics
29 30
30 31
31 class CronBotDiedHandler(webapp2.RequestHandler): 32 class CronBotDiedHandler(webapp2.RequestHandler):
32 @decorators.require_cronjob 33 @decorators.require_cronjob
33 def get(self): 34 def get(self):
34 try: 35 try:
35 task_scheduler.cron_handle_bot_died(self.request.host_url) 36 task_scheduler.cron_handle_bot_died(self.request.host_url)
36 except datastore_errors.NeedIndexError as e: 37 except datastore_errors.NeedIndexError as e:
37 # When a fresh new instance is deployed, it takes a few minutes for the 38 # When a fresh new instance is deployed, it takes a few minutes for the
38 # composite indexes to be created even if they are empty. Ignore the case 39 # composite indexes to be created even if they are empty. Ignore the case
39 # where the index is defined but still being created by AppEngine. 40 # where the index is defined but still being created by AppEngine.
40 if not str(e).startswith( 41 if not str(e).startswith(
41 'NeedIndexError: The index for this query is not ready to serve.'): 42 'NeedIndexError: The index for this query is not ready to serve.'):
42 raise 43 raise
43 self.response.headers['Content-Type'] = 'text/plain; charset=utf-8' 44 self.response.headers['Content-Type'] = 'text/plain; charset=utf-8'
44 self.response.out.write('Success.') 45 self.response.out.write('Success.')
45 46
46 47
47 class CronAbortExpiredShardToRunHandler(webapp2.RequestHandler): 48 class CronAbortExpiredShardToRunHandler(webapp2.RequestHandler):
48 @decorators.require_cronjob 49 @decorators.require_cronjob
49 def get(self): 50 def get(self):
50 task_scheduler.cron_abort_expired_task_to_run(self.request.host_url) 51 task_scheduler.cron_abort_expired_task_to_run(self.request.host_url)
51 self.response.headers['Content-Type'] = 'text/plain; charset=utf-8' 52 self.response.headers['Content-Type'] = 'text/plain; charset=utf-8'
52 self.response.out.write('Success.') 53 self.response.out.write('Success.')
53 54
54 55
56 class CronTaskQueues(webapp2.RequestHandler):
57 @decorators.require_cronjob
58 def get(self):
59 task_queues.tidy_stale()
60 self.response.headers['Content-Type'] = 'text/plain; charset=utf-8'
61 self.response.out.write('Success.')
62
63
55 class CronMachineProviderBotsUtilizationHandler(webapp2.RequestHandler): 64 class CronMachineProviderBotsUtilizationHandler(webapp2.RequestHandler):
56 """Determines Machine Provider bot utilization.""" 65 """Determines Machine Provider bot utilization."""
57 66
58 @decorators.require_cronjob 67 @decorators.require_cronjob
59 def get(self): 68 def get(self):
60 if not config.settings().mp.enabled: 69 if not config.settings().mp.enabled:
61 logging.info('MP support is disabled') 70 logging.info('MP support is disabled')
62 return 71 return
63 72
64 lease_management.compute_utilization() 73 lease_management.compute_utilization()
(...skipping 111 matching lines...) Expand 10 before | Expand all | Expand 10 after
176 continue 185 continue
177 request_obj = request_key.get() 186 request_obj = request_key.get()
178 if not request_obj: 187 if not request_obj:
179 logging.error('Request for %s was not found.', request_key.id()) 188 logging.error('Request for %s was not found.', request_key.id())
180 continue 189 continue
181 ok, was_running = task_scheduler.cancel_task(request_obj, result_key) 190 ok, was_running = task_scheduler.cancel_task(request_obj, result_key)
182 logging.info('task %s canceled: %s was running: %s', 191 logging.info('task %s canceled: %s was running: %s',
183 task_id, ok, was_running) 192 task_id, ok, was_running)
184 193
185 194
195 class TaskDimensionsHandler(webapp2.RequestHandler):
196 @decorators.require_taskqueue('task-dimensions')
197 def post(self):
198 self.tidy_stale(self.request.body)
199 self.response.headers['Content-Type'] = 'text/plain; charset=utf-8'
200 self.response.out.write('Success.')
201
202 @staticmethod
203 def tidy_stale(body):
204 payload = body.split('\n')
205 dimensions_hash = int(payload[0])
206 dimensions_flat = payload[1:]
207 task_queues.rebuild_task_cache(dimensions_hash, dimensions_flat)
208
209
186 class TaskSendPubSubMessage(webapp2.RequestHandler): 210 class TaskSendPubSubMessage(webapp2.RequestHandler):
187 """Sends PubSub notification about task completion.""" 211 """Sends PubSub notification about task completion."""
188 212
189 # Add task_id to the URL for better visibility in request logs. 213 # Add task_id to the URL for better visibility in request logs.
190 @decorators.require_taskqueue('pubsub') 214 @decorators.require_taskqueue('pubsub')
191 def post(self, task_id): # pylint: disable=unused-argument 215 def post(self, task_id): # pylint: disable=unused-argument
192 task_scheduler.task_handle_pubsub_task(json.loads(self.request.body)) 216 task_scheduler.task_handle_pubsub_task(json.loads(self.request.body))
193 self.response.headers['Content-Type'] = 'text/plain; charset=utf-8' 217 self.response.headers['Content-Type'] = 'text/plain; charset=utf-8'
194 self.response.out.write('Success.') 218 self.response.out.write('Success.')
195 219
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after
228 mapreduce_jobs.launch_job(job_id) 252 mapreduce_jobs.launch_job(job_id)
229 253
230 254
231 ### 255 ###
232 256
233 257
234 def get_routes(): 258 def get_routes():
235 """Returns internal urls that should only be accessible via the backend.""" 259 """Returns internal urls that should only be accessible via the backend."""
236 routes = [ 260 routes = [
237 # Cron jobs. 261 # Cron jobs.
238 # TODO(maruel): Rename cron.yaml job url. Doing so is a bit annoying since
239 # the app version has to be running an already compatible version already.
240 ('/internal/cron/abort_bot_died', CronBotDiedHandler), 262 ('/internal/cron/abort_bot_died', CronBotDiedHandler),
241 ('/internal/cron/handle_bot_died', CronBotDiedHandler), 263 ('/internal/cron/handle_bot_died', CronBotDiedHandler),
242 ('/internal/cron/abort_expired_task_to_run', 264 ('/internal/cron/abort_expired_task_to_run',
243 CronAbortExpiredShardToRunHandler), 265 CronAbortExpiredShardToRunHandler),
266 ('/internal/cron/task_queues_tidy', CronTaskQueues),
244 267
245 ('/internal/cron/stats/update', stats.InternalStatsUpdateHandler), 268 ('/internal/cron/stats/update', stats.InternalStatsUpdateHandler),
246 ('/internal/cron/aggregate_bots_dimensions', 269 ('/internal/cron/aggregate_bots_dimensions',
247 CronBotsDimensionAggregationHandler), 270 CronBotsDimensionAggregationHandler),
248 ('/internal/cron/aggregate_tasks_tags', 271 ('/internal/cron/aggregate_tasks_tags',
249 CronTasksTagsAggregationHandler), 272 CronTasksTagsAggregationHandler),
250 273
251 # Machine Provider. 274 # Machine Provider.
252 ('/internal/cron/machine_provider_bot_usage', 275 ('/internal/cron/machine_provider_bot_usage',
253 CronMachineProviderBotsUtilizationHandler), 276 CronMachineProviderBotsUtilizationHandler),
254 ('/internal/cron/machine_provider_config', 277 ('/internal/cron/machine_provider_config',
255 CronMachineProviderConfigHandler), 278 CronMachineProviderConfigHandler),
256 ('/internal/cron/machine_provider_manage', 279 ('/internal/cron/machine_provider_manage',
257 CronMachineProviderManagementHandler), 280 CronMachineProviderManagementHandler),
258 281
259 # Task queues. 282 # Task queues.
260 ('/internal/taskqueue/cancel-tasks', CancelTasksHandler), 283 ('/internal/taskqueue/cancel-tasks', CancelTasksHandler),
284 ('/internal/taskqueue/task-dimensions', TaskDimensionsHandler),
261 (r'/internal/taskqueue/pubsub/<task_id:[0-9a-f]+>', TaskSendPubSubMessage), 285 (r'/internal/taskqueue/pubsub/<task_id:[0-9a-f]+>', TaskSendPubSubMessage),
262 ('/internal/taskqueue/machine-provider-manage', 286 ('/internal/taskqueue/machine-provider-manage',
263 TaskMachineProviderManagementHandler), 287 TaskMachineProviderManagementHandler),
264 (r'/internal/taskqueue/tsmon/<kind:[0-9A-Za-z_]+>', TaskGlobalMetrics), 288 (r'/internal/taskqueue/tsmon/<kind:[0-9A-Za-z_]+>', TaskGlobalMetrics),
265 289
266 # Mapreduce related urls. 290 # Mapreduce related urls.
267 (r'/internal/taskqueue/mapreduce/launch/<job_id:[^\/]+>', 291 (r'/internal/taskqueue/mapreduce/launch/<job_id:[^\/]+>',
268 InternalLaunchMapReduceJobWorkerHandler), 292 InternalLaunchMapReduceJobWorkerHandler),
269 ] 293 ]
270 return [webapp2.Route(*a) for a in routes] 294 return [webapp2.Route(*a) for a in routes]
271 295
272 296
273 def create_application(debug): 297 def create_application(debug):
274 return webapp2.WSGIApplication(get_routes(), debug=debug) 298 return webapp2.WSGIApplication(get_routes(), debug=debug)
OLDNEW
« no previous file with comments | « appengine/swarming/cron.yaml ('k') | appengine/swarming/handlers_endpoints_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698