OLD | NEW |
---|---|
1 # Copyright 2016 The Chromium Authors. All rights reserved. | 1 # Copyright 2016 The Chromium Authors. All rights reserved. |
2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
4 | 4 |
5 import logging | 5 import logging |
6 import datetime | |
6 import math | 7 import math |
7 import os | 8 import os |
8 import sys | 9 import sys |
9 import time | 10 import time |
10 | 11 |
11 import cloudstorage | 12 import cloudstorage |
12 import flask | 13 import flask |
13 from google.appengine.api import (app_identity, taskqueue) | 14 from google.appengine.api import (app_identity, taskqueue) |
14 from google.appengine.ext import deferred | 15 from google.appengine.ext import deferred |
15 from oauth2client.client import GoogleCredentials | 16 from oauth2client.client import GoogleCredentials |
16 | 17 |
17 import common.clovis_paths | 18 import common.clovis_paths |
18 from common.clovis_task import ClovisTask | 19 from common.clovis_task import ClovisTask |
19 import common.google_bigquery_helper | 20 import common.google_bigquery_helper |
20 import common.google_instance_helper | 21 import common.google_instance_helper |
21 from common.loading_trace_database import LoadingTraceDatabase | 22 from common.loading_trace_database import LoadingTraceDatabase |
22 import email_helper | 23 import email_helper |
24 from frontend_job import FrontendJob | |
23 from memory_logs import MemoryLogs | 25 from memory_logs import MemoryLogs |
24 | 26 |
25 | 27 |
26 # Global variables. | 28 # Global variables. |
27 logging.Formatter.converter = time.gmtime | 29 logging.Formatter.converter = time.gmtime |
28 clovis_logger = logging.getLogger('clovis_frontend') | 30 clovis_logger = logging.getLogger('clovis_frontend') |
29 clovis_logger.setLevel(logging.DEBUG) | 31 clovis_logger.setLevel(logging.DEBUG) |
30 project_name = app_identity.get_application_id() | 32 project_name = app_identity.get_application_id() |
31 instance_helper = common.google_instance_helper.GoogleInstanceHelper( | 33 instance_helper = common.google_instance_helper.GoogleInstanceHelper( |
32 credentials=GoogleCredentials.get_application_default(), | 34 credentials=GoogleCredentials.get_application_default(), |
(...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
92 email_address (str): Email address of the user to be notified. | 94 email_address (str): Email address of the user to be notified. |
93 status (str): Status of the task, indicating the success or the cause of | 95 status (str): Status of the task, indicating the success or the cause of |
94 failure. | 96 failure. |
95 task_url (str): URL where the results of the task can be found. | 97 task_url (str): URL where the results of the task can be found. |
96 """ | 98 """ |
97 email_helper.SendEmailTaskComplete( | 99 email_helper.SendEmailTaskComplete( |
98 to_address=email_address, tag=tag, status=status, task_url=task_url, | 100 to_address=email_address, tag=tag, status=status, task_url=task_url, |
99 logger=clovis_logger) | 101 logger=clovis_logger) |
100 clovis_logger.info('Scheduling instance group destruction for tag: ' + tag) | 102 clovis_logger.info('Scheduling instance group destruction for tag: ' + tag) |
101 deferred.defer(DeleteInstanceGroup, tag) | 103 deferred.defer(DeleteInstanceGroup, tag) |
104 FrontendJob.DeleteForTag(tag) | |
102 | 105 |
103 | 106 |
104 def GetEstimatedTaskDurationInSeconds(task): | 107 def GetEstimatedTaskDurationInSeconds(task): |
105 """Returns an estimation of the time required to run the task. | 108 """Returns an estimation of the time required to run the task. |
106 | 109 |
107 Args: | 110 Args: |
108 task: (ClovisTask) The task. | 111 task: (ClovisTask) The task. |
109 | 112 |
110 Returns: | 113 Returns: |
111 float: Time estimation in seconds, or -1 in case of failure. | 114 float: Time estimation in seconds, or -1 in case of failure. |
(...skipping 205 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
317 | 320 |
318 # Load the task from JSON. | 321 # Load the task from JSON. |
319 task = ClovisTask.FromJsonString(http_body_str) | 322 task = ClovisTask.FromJsonString(http_body_str) |
320 if not task: | 323 if not task: |
321 clovis_logger.error('Invalid JSON task.') | 324 clovis_logger.error('Invalid JSON task.') |
322 return Render('Invalid JSON task:\n' + http_body_str, memory_logs) | 325 return Render('Invalid JSON task:\n' + http_body_str, memory_logs) |
323 | 326 |
324 task_tag = task.BackendParams()['tag'] | 327 task_tag = task.BackendParams()['tag'] |
325 clovis_logger.info('Start processing %s task with tag %s.' % (task.Action(), | 328 clovis_logger.info('Start processing %s task with tag %s.' % (task.Action(), |
326 task_tag)) | 329 task_tag)) |
330 user_email = email_helper.GetUserEmail() | |
331 | |
332 # Write the job to the datastore. | |
333 frontend_job = FrontendJob.CreateForTag(task_tag) | |
334 frontend_job.email = user_email | |
335 frontend_job.status = 'not_started' | |
336 frontend_job.clovis_task = task.ToJsonString() | |
337 frontend_job.put() | |
338 | |
339 # Process the job on the queue, to avoid timeout issues. | |
340 deferred.defer(SpawnTasksOnBackgroundQueue, task_tag) | |
341 | |
342 return Render( | |
343 flask.Markup( | |
344 '<a href="%s">See progress.</a>' % FrontendJob.GetJobURL(task_tag)), | |
345 memory_logs) | |
346 | |
347 | |
348 def SpawnTasksOnBackgroundQueue(task_tag): | |
349 """Spawns Clovis tasks associated with task_tag from the backgound queue. | |
350 | |
351 This function is mostly a wrapper around SpawnTasks() that catches exceptions. | |
352 It is assumed that a FrontendJob for task_tag exists. | |
353 """ | |
354 memory_logs = MemoryLogs(clovis_logger) | |
355 memory_logs.Start() | |
356 clovis_logger.info('Spawning tasks on background queue.') | |
357 | |
358 try: | |
359 frontend_job = FrontendJob.GetFromTag(task_tag) | |
360 frontend_job.status = 'will_start' | |
361 SpawnTasks(frontend_job) | |
362 except Exception: | |
363 pass # Catch all exceptions so that AppEngine does not retry indefinitely. | |
mattcary
2016/06/09 15:40:06
Log the exception so if tasks disappear we have a
droger
2016/06/10 12:14:04
Done.
| |
364 | |
365 # Update the task. | |
366 if frontend_job: | |
367 frontend_job.log = memory_logs.Flush() | |
368 frontend_job.put() | |
369 | |
370 | |
371 def SpawnTasks(frontend_job): | |
372 """ Spawns Clovis tasks associated with the frontend job.""" | |
373 user_email = frontend_job.email | |
374 task = ClovisTask.FromJsonString(frontend_job.clovis_task) | |
375 task_tag = task.BackendParams()['tag'] | |
376 | |
327 # Compute the task directory. | 377 # Compute the task directory. |
378 frontend_job.status = 'building_task_dir' | |
328 task_dir_components = [] | 379 task_dir_components = [] |
329 user_email = email_helper.GetUserEmail() | |
330 user_name = None | 380 user_name = None |
331 if user_email: | 381 if user_email: |
332 user_name = user_email[:user_email.find('@')] | 382 user_name = user_email[:user_email.find('@')] |
333 if user_name: | 383 if user_name: |
334 task_dir_components.append(user_name) | 384 task_dir_components.append(user_name) |
335 task_name = task.BackendParams().get('task_name') | 385 task_name = task.BackendParams().get('task_name') |
336 if task_name: | 386 if task_name: |
337 task_dir_components.append(task_name) | 387 task_dir_components.append(task_name) |
338 task_dir_components.append(task_tag) | 388 task_dir_components.append(task_tag) |
339 task_dir = os.path.join(task.Action(), '_'.join(task_dir_components)) | 389 task_dir = os.path.join(task.Action(), '_'.join(task_dir_components)) |
340 | 390 |
341 # Build the URL where the result will live. | 391 # Build the URL where the result will live. |
392 frontend_job.status = 'building_task_url' | |
mattcary
2016/06/09 15:40:06
Do you need to put these status update values, or
droger
2016/06/10 12:14:04
You need to put.
This is only done at the end (li
mattcary
2016/06/10 12:17:27
Shouldn't we put after each return, so we have the
droger
2016/06/10 12:58:27
put() will be executed after each return, by the c
mattcary
2016/06/10 13:24:20
Ah! I was just looking in SpawnTasks. Thanks.
| |
342 task_url = GetTaskURL(task, task_dir) | 393 task_url = GetTaskURL(task, task_dir) |
343 if task_url: | 394 if task_url: |
344 clovis_logger.info('Task result URL: ' + task_url) | 395 clovis_logger.info('Task result URL: ' + task_url) |
396 frontend_job.task_url = task_url | |
345 else: | 397 else: |
346 return Render('Could not build the task URL.', memory_logs) | 398 frontend_job.status = 'task_url_error' |
399 return | |
347 | 400 |
348 # Split the task in smaller tasks. | 401 # Split the task in smaller tasks. |
402 frontend_job.status = 'splitting_task' | |
349 sub_tasks = SplitClovisTask(task) | 403 sub_tasks = SplitClovisTask(task) |
350 if not sub_tasks: | 404 if not sub_tasks: |
351 return Render('Task split failed.', memory_logs) | 405 frontend_job.status = 'task_split_error' |
406 return | |
352 | 407 |
353 # Compute estimates for the work duration, in order to compute the instance | 408 # Compute estimates for the work duration, in order to compute the instance |
354 # count and the timeout. | 409 # count and the timeout. |
410 frontend_job.status = 'estimating_duration' | |
355 sequential_duration_s = \ | 411 sequential_duration_s = \ |
356 GetEstimatedTaskDurationInSeconds(sub_tasks[0]) * len(sub_tasks) | 412 GetEstimatedTaskDurationInSeconds(sub_tasks[0]) * len(sub_tasks) |
357 if sequential_duration_s <= 0: | 413 if sequential_duration_s <= 0: |
358 return Render('Time estimation failed.', memory_logs) | 414 frontend_job.status = 'time_estimation_error' |
415 return | |
359 | 416 |
360 # Compute the number of required instances if not specified. | 417 # Compute the number of required instances if not specified. |
361 if not task.BackendParams().get('instance_count'): | 418 if task.BackendParams().get('instance_count') is None: |
419 frontend_job.status = 'estimating_instance_count' | |
362 target_parallel_duration_s = 1800.0 # 30 minutes. | 420 target_parallel_duration_s = 1800.0 # 30 minutes. |
363 task.BackendParams()['instance_count'] = math.ceil( | 421 task.BackendParams()['instance_count'] = math.ceil( |
364 sequential_duration_s / target_parallel_duration_s) | 422 sequential_duration_s / target_parallel_duration_s) |
365 | 423 |
366 # Check the instance quotas. | 424 # Check the instance quotas. |
367 clovis_logger.info( | 425 clovis_logger.info( |
368 'Requesting %i instances.' % task.BackendParams()['instance_count']) | 426 'Requesting %i instances.' % task.BackendParams()['instance_count']) |
427 frontend_job.status = 'checking_instance_quotas' | |
369 max_instances = instance_helper.GetAvailableInstanceCount() | 428 max_instances = instance_helper.GetAvailableInstanceCount() |
370 if max_instances == -1: | 429 if max_instances == -1: |
371 return Render('Failed to count the available instances.', memory_logs) | 430 frontend_job.status = 'instance_count_error' |
431 return | |
372 elif task.BackendParams()['instance_count'] == 0: | 432 elif task.BackendParams()['instance_count'] == 0: |
373 return Render('Cannot create instances, quota exceeded.', memory_logs) | 433 frontend_job.status = 'no_instance_available_error' |
434 return | |
374 elif max_instances < task.BackendParams()['instance_count']: | 435 elif max_instances < task.BackendParams()['instance_count']: |
375 clovis_logger.warning( | 436 clovis_logger.warning( |
376 'Instance count limited by quota: %i available / %i requested.' % ( | 437 'Instance count limited by quota: %i available / %i requested.' % ( |
377 max_instances, task.BackendParams()['instance_count'])) | 438 max_instances, task.BackendParams()['instance_count'])) |
378 task.BackendParams()['instance_count'] = max_instances | 439 task.BackendParams()['instance_count'] = max_instances |
379 | 440 |
380 # Compute the timeout if there is none specified. | 441 # Compute the timeout if there is none specified. |
381 expected_duration_h = sequential_duration_s / ( | 442 expected_duration_s = sequential_duration_s / ( |
382 task.BackendParams()['instance_count'] * 3600.0) | 443 task.BackendParams()['instance_count']) |
444 frontend_job.eta = datetime.datetime.now() + datetime.timedelta( | |
445 seconds=expected_duration_s) | |
383 if not task.BackendParams().get('timeout_hours'): | 446 if not task.BackendParams().get('timeout_hours'): |
384 # Timeout is at least 1 hour. | 447 # Timeout is at least 1 hour. |
385 task.BackendParams()['timeout_hours'] = max(1, 5 * expected_duration_h) | 448 task.BackendParams()['timeout_hours'] = max( |
449 1, 5 * expected_duration_s / 3600.0) | |
386 clovis_logger.info( | 450 clovis_logger.info( |
387 'Timeout delay: %.1f hours. ' % task.BackendParams()['timeout_hours']) | 451 'Timeout delay: %.1f hours. ' % task.BackendParams()['timeout_hours']) |
388 | 452 |
453 frontend_job.status = 'queueing_tasks' | |
389 if not EnqueueTasks(sub_tasks, task_tag): | 454 if not EnqueueTasks(sub_tasks, task_tag): |
390 return Render('Task creation failed.', memory_logs) | 455 frontend_job.status = 'task_creation_error' |
456 return | |
391 | 457 |
392 # Start polling the progress. | 458 # Start polling the progress. |
393 clovis_logger.info('Creating worker polling task.') | 459 clovis_logger.info('Creating worker polling task.') |
394 first_poll_delay_minutes = 10 | 460 first_poll_delay_minutes = 10 |
395 deferred.defer(PollWorkers, task_tag, time.time(), | 461 deferred.defer(PollWorkers, task_tag, time.time(), |
396 task.BackendParams()['timeout_hours'], user_email, | 462 task.BackendParams()['timeout_hours'], user_email, |
397 task_url, _countdown=(60 * first_poll_delay_minutes)) | 463 task_url, _countdown=(60 * first_poll_delay_minutes)) |
398 | 464 |
399 # Start the instances if required. | 465 # Start the instances if required. |
466 frontend_job.status = 'creating_instances' | |
400 if not CreateInstanceTemplate(task, task_dir): | 467 if not CreateInstanceTemplate(task, task_dir): |
401 return Render('Instance template creation failed.', memory_logs) | 468 frontend_job.status = 'instance_template_error' |
469 return | |
402 if not CreateInstances(task): | 470 if not CreateInstances(task): |
403 return Render('Instance creation failed.', memory_logs) | 471 frontend_job.status = 'instance_creation_error' |
472 return | |
404 | 473 |
405 return Render(flask.Markup( | 474 frontend_job.status = 'started' |
406 'Success!<br>Your task %s has started.<br>' | |
407 'Expected duration: %.1f hours.<br>' | |
408 'You will be notified at %s when completed.') % ( | |
409 task_tag, expected_duration_h, user_email), | |
410 memory_logs) | |
411 | |
412 | 475 |
413 def EnqueueTasks(tasks, task_tag): | 476 def EnqueueTasks(tasks, task_tag): |
414 """Enqueues a list of tasks in the Google Cloud task queue, for consumption by | 477 """Enqueues a list of tasks in the Google Cloud task queue, for consumption by |
415 Google Compute Engine. | 478 Google Compute Engine. |
416 """ | 479 """ |
417 q = taskqueue.Queue('clovis-queue') | 480 q = taskqueue.Queue('clovis-queue') |
418 # Add tasks to the queue by groups. | 481 # Add tasks to the queue by groups. |
419 # TODO(droger): This supports thousands of tasks, but maybe not millions. | 482 # TODO(droger): This supports thousands of tasks, but maybe not millions. |
420 # Defer the enqueuing if it times out. | 483 # Defer the enqueuing if it times out. |
421 group_size = 100 | 484 group_size = 100 |
(...skipping 26 matching lines...) Expand all Loading... | |
448 @app.route('/form_sent', methods=['POST']) | 511 @app.route('/form_sent', methods=['POST']) |
449 def StartFromForm(): | 512 def StartFromForm(): |
450 """HTML form endpoint.""" | 513 """HTML form endpoint.""" |
451 data_stream = flask.request.files.get('json_task') | 514 data_stream = flask.request.files.get('json_task') |
452 if not data_stream: | 515 if not data_stream: |
453 return Render('Failed, no content.') | 516 return Render('Failed, no content.') |
454 http_body_str = data_stream.read() | 517 http_body_str = data_stream.read() |
455 return StartFromJsonString(http_body_str) | 518 return StartFromJsonString(http_body_str) |
456 | 519 |
457 | 520 |
521 @app.route('/list_jobs') | |
522 def ShowTaskList(): | |
523 """Shows a list of all active jobs.""" | |
524 tags = FrontendJob.ListJobs() | |
525 | |
526 if not tags: | |
527 Render('No active jobs.') | |
528 | |
529 html = flask.Markup('Active tasks:<ul>') | |
530 for tag in tags: | |
531 html += flask.Markup( | |
532 '<li><a href="%s">%s</a></li>') % (FrontendJob.GetJobURL(tag), tag) | |
533 html += flask.Markup('</ul>') | |
534 return Render(html) | |
535 | |
536 | |
537 @app.route(FrontendJob.SHOW_JOB_URL) | |
538 def ShowTask(): | |
539 """Shows basic information abour a job.""" | |
540 tag = flask.request.args.get('tag') | |
541 if not tag: | |
542 return Render('Invalid task tag.') | |
543 | |
544 frontend_job = FrontendJob.GetFromTag(tag) | |
545 | |
546 if not frontend_job: | |
547 return Render('Task not found.') | |
548 | |
549 message = flask.Markup('Task details:' + frontend_job.RenderAsHtml()) | |
550 | |
551 log = None | |
552 if frontend_job.log: | |
553 log = frontend_job.log.split('\n') | |
554 | |
555 return flask.render_template('log.html', body=message, log=log) | |
556 | |
557 | |
458 @app.errorhandler(404) | 558 @app.errorhandler(404) |
459 def PageNotFound(e): # pylint: disable=unused-argument | 559 def PageNotFound(e): # pylint: disable=unused-argument |
460 """Return a custom 404 error.""" | 560 """Return a custom 404 error.""" |
461 return 'Sorry, Nothing at this URL.', 404 | 561 return 'Sorry, Nothing at this URL.', 404 |
462 | 562 |
463 | 563 |
464 @app.errorhandler(500) | 564 @app.errorhandler(500) |
465 def ApplicationError(e): | 565 def ApplicationError(e): |
466 """Return a custom 500 error.""" | 566 """Return a custom 500 error.""" |
467 return 'Sorry, unexpected error: {}'.format(e), 499 | 567 return 'Sorry, unexpected error: {}'.format(e), 499 |
OLD | NEW |