OLD | NEW |
---|---|
1 # Copyright 2016 The Chromium Authors. All rights reserved. | 1 # Copyright 2016 The Chromium Authors. All rights reserved. |
2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
4 | 4 |
5 import logging | 5 import logging |
6 import os | 6 import os |
7 import sys | 7 import sys |
8 import time | 8 import time |
9 | 9 |
10 import cloudstorage | |
10 import flask | 11 import flask |
11 from google.appengine.api import (app_identity, taskqueue) | 12 from google.appengine.api import (app_identity, taskqueue) |
12 from google.appengine.ext import deferred | 13 from google.appengine.ext import deferred |
13 from oauth2client.client import GoogleCredentials | 14 from oauth2client.client import GoogleCredentials |
14 | 15 |
15 from common.clovis_task import ClovisTask | 16 from common.clovis_task import ClovisTask |
16 import common.google_instance_helper | 17 import common.google_instance_helper |
18 from common.loading_trace_database import LoadingTraceDatabase | |
17 import email_helper | 19 import email_helper |
18 from memory_logs import MemoryLogs | 20 from memory_logs import MemoryLogs |
19 | 21 |
20 | 22 |
21 # Global variables. | 23 # Global variables. |
22 clovis_logger = logging.getLogger('clovis_frontend') | 24 clovis_logger = logging.getLogger('clovis_frontend') |
23 clovis_logger.setLevel(logging.DEBUG) | 25 clovis_logger.setLevel(logging.DEBUG) |
24 project_name = app_identity.get_application_id() | 26 project_name = app_identity.get_application_id() |
25 instance_helper = common.google_instance_helper.GoogleInstanceHelper( | 27 instance_helper = common.google_instance_helper.GoogleInstanceHelper( |
26 credentials=GoogleCredentials.get_application_default(), | 28 credentials=GoogleCredentials.get_application_default(), |
(...skipping 115 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
142 clovis_logger.info('Instance template destruction for tag: ' + tag) | 144 clovis_logger.info('Instance template destruction for tag: ' + tag) |
143 if not instance_helper.DeleteTemplate(tag): | 145 if not instance_helper.DeleteTemplate(tag): |
144 clovis_logger.info('Instance template destruction failed for: ' + tag) | 146 clovis_logger.info('Instance template destruction failed for: ' + tag) |
145 if try_count <= 5: | 147 if try_count <= 5: |
146 deferred.defer(DeleteInstanceTemplate, tag, try_count + 1, _countdown=60) | 148 deferred.defer(DeleteInstanceTemplate, tag, try_count + 1, _countdown=60) |
147 return | 149 return |
148 clovis_logger.error('Giving up template destruction for: ' + tag) | 150 clovis_logger.error('Giving up template destruction for: ' + tag) |
149 clovis_logger.info('Cleanup complete for tag: ' + tag) | 151 clovis_logger.info('Cleanup complete for tag: ' + tag) |
150 | 152 |
151 | 153 |
154 def SplitClovisTask(task): | |
155 """Splits a ClovisTask in smaller ClovisTasks. | |
156 | |
157 Args: | |
158 task: (ClovisTask) The task to split. | |
159 | |
160 Returns: | |
161 list: The list of ClovisTasks. | |
162 """ | |
163 # For report trask, need to find the traces first. | |
Benoit L
2016/05/13 15:23:39
nit: task.
| |
164 if task.Action() == 'report': | |
165 bucket = task.BackendParams().get('storage_bucket') | |
166 if not bucket: | |
167 clovis_logger.error('Missing storage bucket for report task.') | |
168 return None | |
169 traces = GetTracePaths(bucket) | |
170 if not traces: | |
171 clovis_logger.error('No traces found in bucket: ' + bucket) | |
172 return None | |
173 task.ActionParams()['traces'] = traces | |
174 | |
175 # Compute the split key. | |
176 split_params_for_action = {'trace': ('urls',1), 'report': ('traces',5)} | |
Benoit L
2016/05/13 15:23:39
tiny, tiny nit: one space after a comma.
| |
177 (split_key, slice_size) = split_params_for_action.get(task.Action(), | |
178 (None, 0)) | |
179 if not split_key: | |
180 clovis_logger.error('Cannot split task with action: ' + task.Action()) | |
181 return None | |
182 | |
183 # Split the task using the split key. | |
184 clovis_logger.debug('Splitting task by: ' + split_key) | |
185 action_params = task.ActionParams() | |
186 values = action_params[split_key] | |
187 sub_tasks = [] | |
188 for i in range(0, len(values), slice_size): | |
189 sub_task_params = action_params.copy() | |
190 sub_task_params[split_key] = [v for v in values[i:i+slice_size]] | |
191 sub_tasks.append(ClovisTask(task.Action(), sub_task_params, | |
192 task.BackendParams())) | |
193 return sub_tasks | |
194 | |
195 | |
196 def GetTracePaths(bucket): | |
197 """Returns a list of trace files in a bucket. | |
198 | |
199 Finds and loads the trace databases, and returns their content as a list of | |
200 paths. | |
201 | |
202 This function assumes a specific structure for the files in the bucket. These | |
203 assumptions must match the behavior of the backend: | |
204 - The trace databases are located under a 'trace' directory under |bucket|. | |
205 - The trace databases files are the only objects with the 'trace_database' | |
206 prefix in their name. | |
207 | |
208 Returns: | |
209 list: The list of paths to traces, as strings. | |
210 """ | |
211 traces = [] | |
212 prefix = os.path.join('/', bucket, 'trace', 'trace_database') | |
213 file_stats = cloudstorage.listbucket(prefix) | |
214 | |
215 for file_stat in file_stats: | |
216 database_file = file_stat.filename | |
217 clovis_logger.info('Loading trace database: ' + database_file) | |
218 | |
219 with cloudstorage.open(database_file) as remote_file: | |
220 json_string = remote_file.read() | |
221 if not json_string: | |
222 clovis_logger.warning('Failed to download: ' + database_file) | |
223 continue | |
224 | |
225 database = LoadingTraceDatabase.FromJsonString(json_string) | |
226 if not database: | |
227 clovis_logger.warning('Failed to parse: ' + database_file) | |
228 continue | |
229 | |
230 for path in database.ToJsonDict(): | |
231 traces.append(path) | |
232 | |
233 return traces | |
234 | |
235 | |
152 def StartFromJsonString(http_body_str): | 236 def StartFromJsonString(http_body_str): |
153 """Main function handling a JSON task posted by the user.""" | 237 """Main function handling a JSON task posted by the user.""" |
154 # Set up logging. | 238 # Set up logging. |
155 memory_logs = MemoryLogs(clovis_logger) | 239 memory_logs = MemoryLogs(clovis_logger) |
156 memory_logs.Start() | 240 memory_logs.Start() |
157 | 241 |
158 # Load the task from JSON. | 242 # Load the task from JSON. |
159 task = ClovisTask.FromJsonString(http_body_str) | 243 task = ClovisTask.FromJsonString(http_body_str) |
160 if not task: | 244 if not task: |
161 clovis_logger.error('Invalid JSON task.') | 245 clovis_logger.error('Invalid JSON task.') |
162 return Render('Invalid JSON task:\n' + http_body_str, memory_logs) | 246 return Render('Invalid JSON task:\n' + http_body_str, memory_logs) |
163 | 247 |
164 task_tag = task.BackendParams()['tag'] | 248 task_tag = task.BackendParams()['tag'] |
249 clovis_logger.info('Start processing %s task with tag %s.' % (task.Action(), | |
250 task_tag)) | |
165 | 251 |
166 # Create the instance template if required. | 252 # Create the instance template if required. |
167 if not CreateInstanceTemplate(task): | 253 if not CreateInstanceTemplate(task): |
168 return Render('Template creation failed.', memory_logs) | 254 return Render('Template creation failed.', memory_logs) |
169 | 255 |
170 # Split the task in smaller tasks. | 256 # Build the URL where the result will live. |
171 sub_tasks = [] | |
172 task_url = None | 257 task_url = None |
173 if task.Action() == 'trace': | 258 if task.Action() == 'trace': |
174 bucket = task.BackendParams().get('storage_bucket') | 259 bucket = task.BackendParams().get('storage_bucket') |
175 if bucket: | 260 if bucket: |
176 task_url = 'https://console.cloud.google.com/storage/' + bucket | 261 task_url = 'https://console.cloud.google.com/storage/' + bucket |
177 sub_tasks = SplitTraceTask(task) | 262 elif task.Action() == 'report': |
263 # This must match the table path defined in ReportTaskHandler. | |
264 table_id = ''.join(c for c in task_tag if c.isalnum() or c == '_') | |
265 table_name = 'clovis_dataset.report_' + table_id | |
266 task_url = 'https://bigquery.cloud.google.com/table/%s:%s' %(project_name, | |
267 table_name) | |
178 else: | 268 else: |
179 error_string = 'Unsupported action: %s.' % task.Action() | 269 error_string = 'Unsupported action: %s.' % task.Action() |
180 clovis_logger.error(error_string) | 270 clovis_logger.error(error_string) |
181 return Render(error_string, memory_logs) | 271 return Render(error_string, memory_logs) |
272 clovis_logger.info('Task result URL: ' + task_url) | |
273 | |
274 # Split the task in smaller tasks. | |
275 sub_tasks = SplitClovisTask(task) | |
276 if not sub_tasks: | |
277 return Render('Task split failed.', memory_logs) | |
182 | 278 |
183 if not EnqueueTasks(sub_tasks, task_tag): | 279 if not EnqueueTasks(sub_tasks, task_tag): |
184 return Render('Task creation failed.', memory_logs) | 280 return Render('Task creation failed.', memory_logs) |
185 | 281 |
186 # Start the instances if required. | 282 # Start the instances if required. |
187 if not CreateInstances(task): | 283 if not CreateInstances(task): |
188 return Render('Instance creation failed.', memory_logs) | 284 return Render('Instance creation failed.', memory_logs) |
189 | 285 |
190 # Start polling the progress. | 286 # Start polling the progress. |
191 clovis_logger.info('Creating worker polling task.') | 287 clovis_logger.info('Creating worker polling task.') |
192 first_poll_delay_minutes = 10 | 288 first_poll_delay_minutes = 10 |
193 timeout_hours = task.BackendParams().get('timeout_hours', 5) | 289 timeout_hours = task.BackendParams().get('timeout_hours', 5) |
194 user_email = email_helper.GetUserEmail() | 290 user_email = email_helper.GetUserEmail() |
195 deferred.defer(PollWorkers, task_tag, time.time(), timeout_hours, user_email, | 291 deferred.defer(PollWorkers, task_tag, time.time(), timeout_hours, user_email, |
196 task_url, _countdown=(60 * first_poll_delay_minutes)) | 292 task_url, _countdown=(60 * first_poll_delay_minutes)) |
197 | 293 |
198 return Render(flask.Markup( | 294 return Render(flask.Markup( |
199 'Success!<br>Your task %s has started.<br>' | 295 'Success!<br>Your task %s has started.<br>' |
200 'You will be notified at %s when completed.') % (task_tag, user_email), | 296 'You will be notified at %s when completed.') % (task_tag, user_email), |
201 memory_logs) | 297 memory_logs) |
202 | 298 |
203 | 299 |
204 def SplitTraceTask(task): | |
205 """Splits a tracing task with potentially many URLs into several tracing tasks | |
206 with few URLs. | |
207 """ | |
208 clovis_logger.debug('Splitting trace task.') | |
209 action_params = task.ActionParams() | |
210 urls = action_params['urls'] | |
211 | |
212 # Split the task in smaller tasks with fewer URLs each. | |
213 urls_per_task = 1 | |
214 sub_tasks = [] | |
215 for i in range(0, len(urls), urls_per_task): | |
216 sub_task_params = action_params.copy() | |
217 sub_task_params['urls'] = [url for url in urls[i:i+urls_per_task]] | |
218 sub_tasks.append(ClovisTask(task.Action(), sub_task_params, | |
219 task.BackendParams())) | |
220 return sub_tasks | |
221 | |
222 | |
223 def EnqueueTasks(tasks, task_tag): | 300 def EnqueueTasks(tasks, task_tag): |
224 """Enqueues a list of tasks in the Google Cloud task queue, for consumption by | 301 """Enqueues a list of tasks in the Google Cloud task queue, for consumption by |
225 Google Compute Engine. | 302 Google Compute Engine. |
226 """ | 303 """ |
227 q = taskqueue.Queue('clovis-queue') | 304 q = taskqueue.Queue('clovis-queue') |
228 # Add tasks to the queue by groups. | 305 # Add tasks to the queue by groups. |
229 # TODO(droger): This supports thousands of tasks, but maybe not millions. | 306 # TODO(droger): This supports thousands of tasks, but maybe not millions. |
230 # Defer the enqueuing if it times out. | 307 # Defer the enqueuing if it times out. |
231 group_size = 100 | 308 group_size = 100 |
232 callbacks = [] | 309 callbacks = [] |
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
268 @app.errorhandler(404) | 345 @app.errorhandler(404) |
269 def PageNotFound(e): # pylint: disable=unused-argument | 346 def PageNotFound(e): # pylint: disable=unused-argument |
270 """Return a custom 404 error.""" | 347 """Return a custom 404 error.""" |
271 return 'Sorry, Nothing at this URL.', 404 | 348 return 'Sorry, Nothing at this URL.', 404 |
272 | 349 |
273 | 350 |
274 @app.errorhandler(500) | 351 @app.errorhandler(500) |
275 def ApplicationError(e): | 352 def ApplicationError(e): |
276 """Return a custom 500 error.""" | 353 """Return a custom 500 error.""" |
277 return 'Sorry, unexpected error: {}'.format(e), 499 | 354 return 'Sorry, unexpected error: {}'.format(e), 499 |
OLD | NEW |