Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(164)

Side by Side Diff: appengine/swarming/handlers_backend.py

Issue 2386793002: Reimplement Machine Provider integration (Closed)
Patch Set: Created 4 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « appengine/swarming/cron.yaml ('k') | appengine/swarming/handlers_test.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 # Copyright 2014 The LUCI Authors. All rights reserved. 1 # Copyright 2014 The LUCI Authors. All rights reserved.
2 # Use of this source code is governed under the Apache License, Version 2.0 2 # Use of this source code is governed under the Apache License, Version 2.0
3 # that can be found in the LICENSE file. 3 # that can be found in the LICENSE file.
4 4
5 """Main entry point for Swarming backend handlers.""" 5 """Main entry point for Swarming backend handlers."""
6 6
7 import datetime 7 import datetime
8 import json 8 import json
9 import logging 9 import logging
10 10
11 import webapp2 11 import webapp2
12 from google.appengine.api import app_identity
13 from google.appengine.api import datastore_errors 12 from google.appengine.api import datastore_errors
14 from google.appengine.api import taskqueue 13 from google.appengine.api import taskqueue
14 from google.appengine.ext import ndb
15 15
16 from components import utils 16 from components import utils
17 17
18 import mapreduce_jobs 18 import mapreduce_jobs
19 from components import decorators 19 from components import decorators
20 from components import datastore_utils 20 from components import datastore_utils
21 from components import machine_provider 21 from components import machine_provider
22 from server import bot_management 22 from server import bot_management
23 from server import config 23 from server import config
24 from server import lease_management 24 from server import lease_management
(...skipping 30 matching lines...) Expand all
55 """Triggers task to delete orphaned blobs.""" 55 """Triggers task to delete orphaned blobs."""
56 56
57 @decorators.require_cronjob 57 @decorators.require_cronjob
58 def get(self): 58 def get(self):
59 taskqueue.add(method='POST', url='/internal/taskqueue/cleanup_data', 59 taskqueue.add(method='POST', url='/internal/taskqueue/cleanup_data',
60 queue_name='cleanup') 60 queue_name='cleanup')
61 self.response.headers['Content-Type'] = 'text/plain; charset=utf-8' 61 self.response.headers['Content-Type'] = 'text/plain; charset=utf-8'
62 self.response.out.write('Success.') 62 self.response.out.write('Success.')
63 63
64 64
65 class CronMachineProviderBotHandler(webapp2.RequestHandler): 65 class CronMachineProviderConfigHandler(webapp2.RequestHandler):
66 """Handles bots leased from the Machine Provider.""" 66 """Configures entities to lease bots from the Machine Provider."""
67 67
68 @decorators.require_cronjob 68 @decorators.require_cronjob
69 def get(self): 69 def get(self):
70 BATCH_SIZE = 50
71
72 if not config.settings().mp.enabled: 70 if not config.settings().mp.enabled:
73 logging.info('MP support is disabled') 71 logging.info('MP support is disabled')
74 return 72 return
75 73
76 if config.settings().mp.server: 74 if config.settings().mp.server:
77 new_server = config.settings().mp.server 75 new_server = config.settings().mp.server
78 current_config = machine_provider.MachineProviderConfiguration().cached() 76 current_config = machine_provider.MachineProviderConfiguration().cached()
79 if new_server != current_config.instance_url: 77 if new_server != current_config.instance_url:
80 logging.info('Updating Machine Provider server to %s', new_server) 78 logging.info('Updating Machine Provider server to %s', new_server)
81 current_config.modify(instance_url=new_server) 79 current_config.modify(instance_url=new_server)
82 80
83 app_id = app_identity.get_application_id() 81 lease_management.ensure_entities_exist()
84 swarming_server = 'https://%s' % app_identity.get_default_version_hostname() 82 lease_management.drain_excess()
85 found = 0
86 for machine_type_key in lease_management.MachineType.query().fetch(
87 keys_only=True):
88 lease_requests = lease_management.generate_lease_requests(
89 machine_type_key, app_id, swarming_server)
90 found += len(lease_requests)
91 responses = []
92 while lease_requests:
93 response = machine_provider.lease_machines(lease_requests[:BATCH_SIZE])
94 responses.extend(response.get('responses', []))
95 lease_requests = lease_requests[BATCH_SIZE:]
96 if responses:
97 lease_management.update_leases(machine_type_key, responses)
98 logging.info('Updated %d', found)
99 83
100 84
101 class CronMachineProviderCleanUpHandler(webapp2.RequestHandler): 85 class CronMachineProviderManagementHandler(webapp2.RequestHandler):
102 """Cleans up leftover BotInfo entities.""" 86 """Manages leases for bots from the Machine Provider."""
103 87
104 @decorators.require_cronjob 88 @decorators.require_cronjob
105 def get(self): 89 def get(self):
106 if not config.settings().mp.enabled: 90 if not config.settings().mp.enabled:
107 logging.info('MP support is disabled') 91 logging.info('MP support is disabled')
108 return 92 return
109 93
110 lease_management.clean_up_bots() 94 lease_management.schedule_lease_management()
111 95
112 96
113 class CronBotsDimensionAggregationHandler(webapp2.RequestHandler): 97 class CronBotsDimensionAggregationHandler(webapp2.RequestHandler):
114 """Aggregates all bots dimensions (except id) in the fleet.""" 98 """Aggregates all bots dimensions (except id) in the fleet."""
115 99
116 @decorators.require_cronjob 100 @decorators.require_cronjob
117 def get(self): 101 def get(self):
118 seen = {} 102 seen = {}
119 now = utils.utcnow() 103 now = utils.utcnow()
120 for b in bot_management.BotInfo.query(): 104 for b in bot_management.BotInfo.query():
(...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after
166 task_result.TagValues(tag=k, values=sorted(values or [])) 150 task_result.TagValues(tag=k, values=sorted(values or []))
167 for k, values in sorted(seen.iteritems()) 151 for k, values in sorted(seen.iteritems())
168 ] 152 ]
169 logging.info('From %d tasks, saw %d tags', count, len(tags)) 153 logging.info('From %d tasks, saw %d tags', count, len(tags))
170 task_result.TagAggregation( 154 task_result.TagAggregation(
171 key=task_result.TagAggregation.KEY, 155 key=task_result.TagAggregation.KEY,
172 tags=tags, 156 tags=tags,
173 ts=now).put() 157 ts=now).put()
174 158
175 159
176 class CronMachineProviderPubSubHandler(webapp2.RequestHandler):
177 """Listens for Pub/Sub communication from Machine Provider."""
178
179 @decorators.require_cronjob
180 def get(self):
181 if not config.settings().mp.enabled:
182 logging.info('MP support is disabled')
183 return
184
185 taskqueue.add(
186 method='POST', url='/internal/taskqueue/pubsub/machine_provider',
187 queue_name='machine-provider-pubsub')
188 self.response.headers['Content-Type'] = 'text/plain; charset=utf-8'
189 self.response.out.write('Success.')
190
191
192 class TaskCleanupDataHandler(webapp2.RequestHandler): 160 class TaskCleanupDataHandler(webapp2.RequestHandler):
193 """Deletes orphaned blobs.""" 161 """Deletes orphaned blobs."""
194 162
195 @decorators.silence(datastore_errors.Timeout) 163 @decorators.silence(datastore_errors.Timeout)
196 @decorators.require_taskqueue('cleanup') 164 @decorators.require_taskqueue('cleanup')
197 def post(self): 165 def post(self):
198 # TODO(maruel): Clean up old TaskRequest after a cut-off date. 166 # TODO(maruel): Clean up old TaskRequest after a cut-off date.
199 self.response.headers['Content-Type'] = 'text/plain; charset=utf-8' 167 self.response.headers['Content-Type'] = 'text/plain; charset=utf-8'
200 self.response.out.write('Success.') 168 self.response.out.write('Success.')
201 169
202 170
203 class TaskMachineProviderPubSubHandler(webapp2.RequestHandler):
204 """Handles Pub/Sub messages from the Machine Provider."""
205
206 @decorators.require_taskqueue('machine-provider-pubsub')
207 def post(self):
208 app_id = app_identity.get_application_id()
209 lease_management.process_pubsub(app_id)
210
211
212 class TaskSendPubSubMessage(webapp2.RequestHandler): 171 class TaskSendPubSubMessage(webapp2.RequestHandler):
213 """Sends PubSub notification about task completion.""" 172 """Sends PubSub notification about task completion."""
214 173
215 # Add task_id to the URL for better visibility in request logs. 174 # Add task_id to the URL for better visibility in request logs.
216 @decorators.require_taskqueue('pubsub') 175 @decorators.require_taskqueue('pubsub')
217 def post(self, task_id): # pylint: disable=unused-argument 176 def post(self, task_id): # pylint: disable=unused-argument
218 task_scheduler.task_handle_pubsub_task(json.loads(self.request.body)) 177 task_scheduler.task_handle_pubsub_task(json.loads(self.request.body))
219 self.response.headers['Content-Type'] = 'text/plain; charset=utf-8' 178 self.response.headers['Content-Type'] = 'text/plain; charset=utf-8'
220 self.response.out.write('Success.') 179 self.response.out.write('Success.')
221 180
222 181
182 class TaskMachineProviderManagementHandler(webapp2.RequestHandler):
183 """Manages a lease for a Machine Provider bot."""
184
185 @decorators.require_taskqueue('machine-provider-manage')
186 def post(self):
187 key = ndb.Key(urlsafe=self.request.get('key'))
188 assert key.kind() == 'MachineLease', key
189 lease_management.manage_lease(key)
190
191
223 ### Mapreduce related handlers 192 ### Mapreduce related handlers
224 193
225 194
226 class InternalLaunchMapReduceJobWorkerHandler(webapp2.RequestHandler): 195 class InternalLaunchMapReduceJobWorkerHandler(webapp2.RequestHandler):
227 """Called via task queue or cron to start a map reduce job.""" 196 """Called via task queue or cron to start a map reduce job."""
228 @decorators.require_taskqueue(mapreduce_jobs.MAPREDUCE_TASK_QUEUE) 197 @decorators.require_taskqueue(mapreduce_jobs.MAPREDUCE_TASK_QUEUE)
229 def post(self, job_id): # pylint: disable=R0201 198 def post(self, job_id): # pylint: disable=R0201
230 mapreduce_jobs.launch_job(job_id) 199 mapreduce_jobs.launch_job(job_id)
231 200
232 201
(...skipping 10 matching lines...) Expand all
243 ('/internal/cron/handle_bot_died', CronBotDiedHandler), 212 ('/internal/cron/handle_bot_died', CronBotDiedHandler),
244 ('/internal/cron/abort_expired_task_to_run', 213 ('/internal/cron/abort_expired_task_to_run',
245 CronAbortExpiredShardToRunHandler), 214 CronAbortExpiredShardToRunHandler),
246 215
247 ('/internal/cron/stats/update', stats.InternalStatsUpdateHandler), 216 ('/internal/cron/stats/update', stats.InternalStatsUpdateHandler),
248 ('/internal/cron/trigger_cleanup_data', CronTriggerCleanupDataHandler), 217 ('/internal/cron/trigger_cleanup_data', CronTriggerCleanupDataHandler),
249 ('/internal/cron/aggregate_bots_dimensions', 218 ('/internal/cron/aggregate_bots_dimensions',
250 CronBotsDimensionAggregationHandler), 219 CronBotsDimensionAggregationHandler),
251 ('/internal/cron/aggregate_tasks_tags', 220 ('/internal/cron/aggregate_tasks_tags',
252 CronTasksTagsAggregationHandler), 221 CronTasksTagsAggregationHandler),
253 ('/internal/cron/machine_provider', CronMachineProviderBotHandler), 222
254 ('/internal/cron/machine_provider_cleanup', 223 # Machine Provider.
255 CronMachineProviderCleanUpHandler), 224 ('/internal/cron/machine_provider_config',
256 ('/internal/cron/machine_provider_pubsub', 225 CronMachineProviderConfigHandler),
257 CronMachineProviderPubSubHandler), 226 ('/internal/cron/machine_provider_manage',
227 CronMachineProviderManagementHandler),
258 228
259 # Task queues. 229 # Task queues.
260 ('/internal/taskqueue/cleanup_data', TaskCleanupDataHandler), 230 ('/internal/taskqueue/cleanup_data', TaskCleanupDataHandler),
261 (r'/internal/taskqueue/pubsub/<task_id:[0-9a-f]+>', TaskSendPubSubMessage), 231 (r'/internal/taskqueue/pubsub/<task_id:[0-9a-f]+>', TaskSendPubSubMessage),
262 ('/internal/taskqueue/pubsub/machine_provider', 232 ('/internal/taskqueue/machine-provider-manage',
263 TaskMachineProviderPubSubHandler), 233 TaskMachineProviderManagementHandler),
264 234
265 # Mapreduce related urls. 235 # Mapreduce related urls.
266 (r'/internal/taskqueue/mapreduce/launch/<job_id:[^\/]+>', 236 (r'/internal/taskqueue/mapreduce/launch/<job_id:[^\/]+>',
267 InternalLaunchMapReduceJobWorkerHandler), 237 InternalLaunchMapReduceJobWorkerHandler),
268 ] 238 ]
269 return [webapp2.Route(*a) for a in routes] 239 return [webapp2.Route(*a) for a in routes]
OLDNEW
« no previous file with comments | « appengine/swarming/cron.yaml ('k') | appengine/swarming/handlers_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698