OLD | NEW |
(Empty) | |
| 1 # Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 # Use of this source code is governed by a BSD-style license that can be |
| 3 # found in the LICENSE file. |
| 4 |
| 5 import flask |
| 6 from google.appengine.api import taskqueue |
| 7 import json |
| 8 import os |
| 9 import sys |
| 10 import uuid |
| 11 |
| 12 from common.clovis_task import ClovisTask |
| 13 |
| 14 |
| 15 app = flask.Flask(__name__) |
| 16 |
| 17 |
| 18 def StartFromJson(http_body_str): |
| 19 """Creates a new batch of tasks from its JSON representation.""" |
| 20 task = ClovisTask.FromJsonDict(http_body_str) |
| 21 if not task: |
| 22 return 'Invalid JSON task:\n%s\n' % http_body_str |
| 23 |
| 24 task_tag = task.TaskqueueTag() |
| 25 if not task_tag: |
| 26 task_tag = uuid.uuid1() |
| 27 |
| 28 sub_tasks = [] |
| 29 if task.Action() == 'trace': |
| 30 sub_tasks = SplitTraceTask(task) |
| 31 else: |
| 32 return 'Unsupported action: %s\n' % task.Action() |
| 33 |
| 34 return EnqueueTasks(sub_tasks, task_tag) |
| 35 |
| 36 |
| 37 def SplitTraceTask(task): |
| 38 """Split a tracing task with potentially many URLs into several tracing tasks |
| 39 with few URLs. |
| 40 """ |
| 41 params = task.Params() |
| 42 urls = params['urls'] |
| 43 |
| 44 # Split the task in smaller tasks with fewer URLs each. |
| 45 urls_per_task = 1 |
| 46 sub_tasks = [] |
| 47 for i in range(0, len(urls), urls_per_task): |
| 48 sub_task_params = params.copy() |
| 49 sub_task_params['urls'] = [url for url in urls[i:i+urls_per_task]] |
| 50 sub_tasks.append(ClovisTask(task.Action(), sub_task_params, |
| 51 task.TaskqueueTag())) |
| 52 return sub_tasks |
| 53 |
| 54 |
| 55 def EnqueueTasks(tasks, task_tag): |
| 56 """Enqueues a list of tasks in the Google Cloud task queue, for consumption by |
| 57 Google Compute Engine. |
| 58 """ |
| 59 q = taskqueue.Queue('clovis-queue') |
| 60 retry_options = taskqueue.TaskRetryOptions(task_retry_limit=3) |
| 61 # Add tasks to the queue by groups. |
| 62 # TODO(droger): This support to thousands of tasks, but maybe not millions. |
| 63 # Defer the enqueuing if it times out. |
| 64 # is too large. |
| 65 group_size = 100 |
| 66 callbacks = [] |
| 67 try: |
| 68 for i in range(0, len(tasks), group_size): |
| 69 group = tasks[i:i+group_size] |
| 70 taskqueue_tasks = [ |
| 71 taskqueue.Task(payload=task.ToJsonDict(), method='PULL', tag=task_tag, |
| 72 retry_options=retry_options) |
| 73 for task in group] |
| 74 rpc = taskqueue.create_rpc() |
| 75 q.add_async(task=taskqueue_tasks, rpc=rpc) |
| 76 callbacks.append(rpc) |
| 77 for callback in callbacks: |
| 78 callback.get_result() |
| 79 except Exception as e: |
| 80 return 'Exception:' + type(e).__name__ + ' ' + str(e.args) + '\n' |
| 81 return 'pushed %i tasks with tag: %s\n' % (len(tasks), task_tag) |
| 82 |
| 83 |
| 84 @app.route('/') |
| 85 def Root(): |
| 86 """Home page: redirect to the static form.""" |
| 87 return flask.redirect('/static/form.html') |
| 88 |
| 89 |
| 90 @app.route('/form_sent', methods=['POST']) |
| 91 def StartFromForm(): |
| 92 """HTML form endpoint""" |
| 93 data_stream = flask.request.files.get('json_task') |
| 94 if not data_stream: |
| 95 return 'failed' |
| 96 http_body_str = data_stream.read() |
| 97 return StartFromJson(http_body_str) |
| 98 |
| 99 |
| 100 @app.errorhandler(404) |
| 101 def PageNotFound(e): # pylint: disable=unused-argument |
| 102 """Return a custom 404 error.""" |
| 103 return 'Sorry, Nothing at this URL.', 404 |
| 104 |
| 105 |
| 106 @app.errorhandler(500) |
| 107 def ApplicationError(e): |
| 108 """Return a custom 500 error.""" |
| 109 return 'Sorry, unexpected error: {}'.format(e), 499 |
OLD | NEW |