OLD | NEW |
1 # Copyright 2016 The Chromium Authors. All rights reserved. | 1 # Copyright 2016 The Chromium Authors. All rights reserved. |
2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
4 | 4 |
5 import logging | 5 import logging |
6 import os | 6 import os |
7 import sys | 7 import sys |
8 import time | 8 import time |
9 | 9 |
| 10 import cloudstorage |
10 import flask | 11 import flask |
11 from google.appengine.api import (app_identity, taskqueue) | 12 from google.appengine.api import (app_identity, taskqueue) |
12 from google.appengine.ext import deferred | 13 from google.appengine.ext import deferred |
13 from oauth2client.client import GoogleCredentials | 14 from oauth2client.client import GoogleCredentials |
14 | 15 |
15 from common.clovis_task import ClovisTask | 16 from common.clovis_task import ClovisTask |
16 import common.google_instance_helper | 17 import common.google_instance_helper |
| 18 from common.loading_trace_database import LoadingTraceDatabase |
17 import email_helper | 19 import email_helper |
18 from memory_logs import MemoryLogs | 20 from memory_logs import MemoryLogs |
19 | 21 |
20 | 22 |
21 # Global variables. | 23 # Global variables. |
22 clovis_logger = logging.getLogger('clovis_frontend') | 24 clovis_logger = logging.getLogger('clovis_frontend') |
23 clovis_logger.setLevel(logging.DEBUG) | 25 clovis_logger.setLevel(logging.DEBUG) |
24 project_name = app_identity.get_application_id() | 26 project_name = app_identity.get_application_id() |
25 instance_helper = common.google_instance_helper.GoogleInstanceHelper( | 27 instance_helper = common.google_instance_helper.GoogleInstanceHelper( |
26 credentials=GoogleCredentials.get_application_default(), | 28 credentials=GoogleCredentials.get_application_default(), |
(...skipping 115 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
142 clovis_logger.info('Instance template destruction for tag: ' + tag) | 144 clovis_logger.info('Instance template destruction for tag: ' + tag) |
143 if not instance_helper.DeleteTemplate(tag): | 145 if not instance_helper.DeleteTemplate(tag): |
144 clovis_logger.info('Instance template destruction failed for: ' + tag) | 146 clovis_logger.info('Instance template destruction failed for: ' + tag) |
145 if try_count <= 5: | 147 if try_count <= 5: |
146 deferred.defer(DeleteInstanceTemplate, tag, try_count + 1, _countdown=60) | 148 deferred.defer(DeleteInstanceTemplate, tag, try_count + 1, _countdown=60) |
147 return | 149 return |
148 clovis_logger.error('Giving up template destruction for: ' + tag) | 150 clovis_logger.error('Giving up template destruction for: ' + tag) |
149 clovis_logger.info('Cleanup complete for tag: ' + tag) | 151 clovis_logger.info('Cleanup complete for tag: ' + tag) |
150 | 152 |
151 | 153 |
| 154 def SplitClovisTask(task): |
| 155 """Splits a ClovisTask in smaller ClovisTasks. |
| 156 |
| 157 Args: |
| 158 task: (ClovisTask) The task to split. |
| 159 |
| 160 Returns: |
| 161 list: The list of ClovisTasks. |
| 162 """ |
| 163 # For report task, need to find the traces first. |
| 164 if task.Action() == 'report': |
| 165 bucket = task.BackendParams().get('storage_bucket') |
| 166 if not bucket: |
| 167 clovis_logger.error('Missing storage bucket for report task.') |
| 168 return None |
| 169 traces = GetTracePaths(bucket) |
| 170 if not traces: |
| 171 clovis_logger.error('No traces found in bucket: ' + bucket) |
| 172 return None |
| 173 task.ActionParams()['traces'] = traces |
| 174 |
| 175 # Compute the split key. |
| 176 split_params_for_action = {'trace': ('urls', 1), 'report': ('traces', 5)} |
| 177 (split_key, slice_size) = split_params_for_action.get(task.Action(), |
| 178 (None, 0)) |
| 179 if not split_key: |
| 180 clovis_logger.error('Cannot split task with action: ' + task.Action()) |
| 181 return None |
| 182 |
| 183 # Split the task using the split key. |
| 184 clovis_logger.debug('Splitting task by: ' + split_key) |
| 185 action_params = task.ActionParams() |
| 186 values = action_params[split_key] |
| 187 sub_tasks = [] |
| 188 for i in range(0, len(values), slice_size): |
| 189 sub_task_params = action_params.copy() |
| 190 sub_task_params[split_key] = [v for v in values[i:i+slice_size]] |
| 191 sub_tasks.append(ClovisTask(task.Action(), sub_task_params, |
| 192 task.BackendParams())) |
| 193 return sub_tasks |
| 194 |
| 195 |
| 196 def GetTracePaths(bucket): |
| 197 """Returns a list of trace files in a bucket. |
| 198 |
| 199 Finds and loads the trace databases, and returns their content as a list of |
| 200 paths. |
| 201 |
| 202 This function assumes a specific structure for the files in the bucket. These |
| 203 assumptions must match the behavior of the backend: |
| 204 - The trace databases are located under a 'trace' directory under |bucket|. |
| 205 - The trace databases files are the only objects with the 'trace_database' |
| 206 prefix in their name. |
| 207 |
| 208 Returns: |
| 209 list: The list of paths to traces, as strings. |
| 210 """ |
| 211 traces = [] |
| 212 prefix = os.path.join('/', bucket, 'trace', 'trace_database') |
| 213 file_stats = cloudstorage.listbucket(prefix) |
| 214 |
| 215 for file_stat in file_stats: |
| 216 database_file = file_stat.filename |
| 217 clovis_logger.info('Loading trace database: ' + database_file) |
| 218 |
| 219 with cloudstorage.open(database_file) as remote_file: |
| 220 json_string = remote_file.read() |
| 221 if not json_string: |
| 222 clovis_logger.warning('Failed to download: ' + database_file) |
| 223 continue |
| 224 |
| 225 database = LoadingTraceDatabase.FromJsonString(json_string) |
| 226 if not database: |
| 227 clovis_logger.warning('Failed to parse: ' + database_file) |
| 228 continue |
| 229 |
| 230 for path in database.ToJsonDict(): |
| 231 traces.append(path) |
| 232 |
| 233 return traces |
| 234 |
| 235 |
152 def StartFromJsonString(http_body_str): | 236 def StartFromJsonString(http_body_str): |
153 """Main function handling a JSON task posted by the user.""" | 237 """Main function handling a JSON task posted by the user.""" |
154 # Set up logging. | 238 # Set up logging. |
155 memory_logs = MemoryLogs(clovis_logger) | 239 memory_logs = MemoryLogs(clovis_logger) |
156 memory_logs.Start() | 240 memory_logs.Start() |
157 | 241 |
158 # Load the task from JSON. | 242 # Load the task from JSON. |
159 task = ClovisTask.FromJsonString(http_body_str) | 243 task = ClovisTask.FromJsonString(http_body_str) |
160 if not task: | 244 if not task: |
161 clovis_logger.error('Invalid JSON task.') | 245 clovis_logger.error('Invalid JSON task.') |
162 return Render('Invalid JSON task:\n' + http_body_str, memory_logs) | 246 return Render('Invalid JSON task:\n' + http_body_str, memory_logs) |
163 | 247 |
164 task_tag = task.BackendParams()['tag'] | 248 task_tag = task.BackendParams()['tag'] |
| 249 clovis_logger.info('Start processing %s task with tag %s.' % (task.Action(), |
| 250 task_tag)) |
165 | 251 |
166 # Create the instance template if required. | 252 # Create the instance template if required. |
167 if not CreateInstanceTemplate(task): | 253 if not CreateInstanceTemplate(task): |
168 return Render('Template creation failed.', memory_logs) | 254 return Render('Template creation failed.', memory_logs) |
169 | 255 |
170 # Split the task in smaller tasks. | 256 # Build the URL where the result will live. |
171 sub_tasks = [] | |
172 task_url = None | 257 task_url = None |
173 if task.Action() == 'trace': | 258 if task.Action() == 'trace': |
174 bucket = task.BackendParams().get('storage_bucket') | 259 bucket = task.BackendParams().get('storage_bucket') |
175 if bucket: | 260 if bucket: |
176 task_url = 'https://console.cloud.google.com/storage/' + bucket | 261 task_url = 'https://console.cloud.google.com/storage/' + bucket |
177 sub_tasks = SplitTraceTask(task) | 262 elif task.Action() == 'report': |
| 263 # This must match the table path defined in ReportTaskHandler. |
| 264 table_id = ''.join(c for c in task_tag if c.isalnum() or c == '_') |
| 265 table_name = 'clovis_dataset.report_' + table_id |
| 266 task_url = 'https://bigquery.cloud.google.com/table/%s:%s' %(project_name, |
| 267 table_name) |
178 else: | 268 else: |
179 error_string = 'Unsupported action: %s.' % task.Action() | 269 error_string = 'Unsupported action: %s.' % task.Action() |
180 clovis_logger.error(error_string) | 270 clovis_logger.error(error_string) |
181 return Render(error_string, memory_logs) | 271 return Render(error_string, memory_logs) |
| 272 clovis_logger.info('Task result URL: ' + task_url) |
| 273 |
| 274 # Split the task in smaller tasks. |
| 275 sub_tasks = SplitClovisTask(task) |
| 276 if not sub_tasks: |
| 277 return Render('Task split failed.', memory_logs) |
182 | 278 |
183 if not EnqueueTasks(sub_tasks, task_tag): | 279 if not EnqueueTasks(sub_tasks, task_tag): |
184 return Render('Task creation failed.', memory_logs) | 280 return Render('Task creation failed.', memory_logs) |
185 | 281 |
186 # Start the instances if required. | 282 # Start the instances if required. |
187 if not CreateInstances(task): | 283 if not CreateInstances(task): |
188 return Render('Instance creation failed.', memory_logs) | 284 return Render('Instance creation failed.', memory_logs) |
189 | 285 |
190 # Start polling the progress. | 286 # Start polling the progress. |
191 clovis_logger.info('Creating worker polling task.') | 287 clovis_logger.info('Creating worker polling task.') |
192 first_poll_delay_minutes = 10 | 288 first_poll_delay_minutes = 10 |
193 timeout_hours = task.BackendParams().get('timeout_hours', 5) | 289 timeout_hours = task.BackendParams().get('timeout_hours', 5) |
194 user_email = email_helper.GetUserEmail() | 290 user_email = email_helper.GetUserEmail() |
195 deferred.defer(PollWorkers, task_tag, time.time(), timeout_hours, user_email, | 291 deferred.defer(PollWorkers, task_tag, time.time(), timeout_hours, user_email, |
196 task_url, _countdown=(60 * first_poll_delay_minutes)) | 292 task_url, _countdown=(60 * first_poll_delay_minutes)) |
197 | 293 |
198 return Render(flask.Markup( | 294 return Render(flask.Markup( |
199 'Success!<br>Your task %s has started.<br>' | 295 'Success!<br>Your task %s has started.<br>' |
200 'You will be notified at %s when completed.') % (task_tag, user_email), | 296 'You will be notified at %s when completed.') % (task_tag, user_email), |
201 memory_logs) | 297 memory_logs) |
202 | 298 |
203 | 299 |
204 def SplitTraceTask(task): | |
205 """Splits a tracing task with potentially many URLs into several tracing tasks | |
206 with few URLs. | |
207 """ | |
208 clovis_logger.debug('Splitting trace task.') | |
209 action_params = task.ActionParams() | |
210 urls = action_params['urls'] | |
211 | |
212 # Split the task in smaller tasks with fewer URLs each. | |
213 urls_per_task = 1 | |
214 sub_tasks = [] | |
215 for i in range(0, len(urls), urls_per_task): | |
216 sub_task_params = action_params.copy() | |
217 sub_task_params['urls'] = [url for url in urls[i:i+urls_per_task]] | |
218 sub_tasks.append(ClovisTask(task.Action(), sub_task_params, | |
219 task.BackendParams())) | |
220 return sub_tasks | |
221 | |
222 | |
223 def EnqueueTasks(tasks, task_tag): | 300 def EnqueueTasks(tasks, task_tag): |
224 """Enqueues a list of tasks in the Google Cloud task queue, for consumption by | 301 """Enqueues a list of tasks in the Google Cloud task queue, for consumption by |
225 Google Compute Engine. | 302 Google Compute Engine. |
226 """ | 303 """ |
227 q = taskqueue.Queue('clovis-queue') | 304 q = taskqueue.Queue('clovis-queue') |
228 # Add tasks to the queue by groups. | 305 # Add tasks to the queue by groups. |
229 # TODO(droger): This supports thousands of tasks, but maybe not millions. | 306 # TODO(droger): This supports thousands of tasks, but maybe not millions. |
230 # Defer the enqueuing if it times out. | 307 # Defer the enqueuing if it times out. |
231 group_size = 100 | 308 group_size = 100 |
232 callbacks = [] | 309 callbacks = [] |
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
268 @app.errorhandler(404) | 345 @app.errorhandler(404) |
269 def PageNotFound(e): # pylint: disable=unused-argument | 346 def PageNotFound(e): # pylint: disable=unused-argument |
270 """Return a custom 404 error.""" | 347 """Return a custom 404 error.""" |
271 return 'Sorry, Nothing at this URL.', 404 | 348 return 'Sorry, Nothing at this URL.', 404 |
272 | 349 |
273 | 350 |
274 @app.errorhandler(500) | 351 @app.errorhandler(500) |
275 def ApplicationError(e): | 352 def ApplicationError(e): |
276 """Return a custom 500 error.""" | 353 """Return a custom 500 error.""" |
277 return 'Sorry, unexpected error: {}'.format(e), 499 | 354 return 'Sorry, unexpected error: {}'.format(e), 499 |
OLD | NEW |