| OLD | NEW |
| 1 #!/usr/bin/python | 1 #!/usr/bin/python |
| 2 | 2 |
| 3 """ | 3 """ |
| 4 Copyright 2013 Google Inc. | 4 Copyright 2013 Google Inc. |
| 5 | 5 |
| 6 Use of this source code is governed by a BSD-style license that can be | 6 Use of this source code is governed by a BSD-style license that can be |
| 7 found in the LICENSE file. | 7 found in the LICENSE file. |
| 8 | 8 |
| 9 Calulate differences between image pairs, and store them in a database. | 9 Calulate differences between image pairs, and store them in a database. |
| 10 """ | 10 """ |
| (...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 48 # NOTE: Keep these in sync with static/constants.js | 48 # NOTE: Keep these in sync with static/constants.js |
| 49 KEY__DIFFERENCES__MAX_DIFF_PER_CHANNEL = 'maxDiffPerChannel' | 49 KEY__DIFFERENCES__MAX_DIFF_PER_CHANNEL = 'maxDiffPerChannel' |
| 50 KEY__DIFFERENCES__NUM_DIFF_PIXELS = 'numDifferingPixels' | 50 KEY__DIFFERENCES__NUM_DIFF_PIXELS = 'numDifferingPixels' |
| 51 KEY__DIFFERENCES__PERCENT_DIFF_PIXELS = 'percentDifferingPixels' | 51 KEY__DIFFERENCES__PERCENT_DIFF_PIXELS = 'percentDifferingPixels' |
| 52 KEY__DIFFERENCES__PERCEPTUAL_DIFF = 'perceptualDifference' | 52 KEY__DIFFERENCES__PERCEPTUAL_DIFF = 'perceptualDifference' |
| 53 | 53 |
| 54 # Special values within ImageDiffDB._diff_dict | 54 # Special values within ImageDiffDB._diff_dict |
| 55 _DIFFRECORD_FAILED = 'failed' | 55 _DIFFRECORD_FAILED = 'failed' |
| 56 _DIFFRECORD_PENDING = 'pending' | 56 _DIFFRECORD_PENDING = 'pending' |
| 57 | 57 |
| 58 # How often to report tasks_queue size |
| 59 QUEUE_LOGGING_GRANULARITY = 1000 |
| 60 |
| 58 # Temporary variable to keep track of how many times we download | 61 # Temporary variable to keep track of how many times we download |
| 59 # the same file in multiple threads. | 62 # the same file in multiple threads. |
| 60 # TODO(epoger): Delete this, once we see that the number stays close to 0. | 63 # TODO(epoger): Delete this, once we see that the number stays close to 0. |
| 61 global_file_collisions = 0 | 64 global_file_collisions = 0 |
| 62 | 65 |
| 63 | 66 |
| 64 class DiffRecord(object): | 67 class DiffRecord(object): |
| 65 """ Record of differences between two images. """ | 68 """ Record of differences between two images. """ |
| 66 | 69 |
| 67 def __init__(self, gs, storage_root, | 70 def __init__(self, gs, storage_root, |
| (...skipping 165 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 233 """ | 236 """ |
| 234 Args: | 237 Args: |
| 235 storage_root: string; root path within the DB will store all of its stuff | 238 storage_root: string; root path within the DB will store all of its stuff |
| 236 gs: instance of GSUtils object we can use to download images | 239 gs: instance of GSUtils object we can use to download images |
| 237 num_worker_threads: how many threads that download images and | 240 num_worker_threads: how many threads that download images and |
| 238 generate diffs simultaneously | 241 generate diffs simultaneously |
| 239 """ | 242 """ |
| 240 self._storage_root = storage_root | 243 self._storage_root = storage_root |
| 241 self._gs = gs | 244 self._gs = gs |
| 242 | 245 |
| 246 # Mechanism for reporting queue size periodically. |
| 247 self._last_queue_size_reported = None |
| 248 self._queue_size_report_lock = threading.RLock() |
| 249 |
| 243 # Dictionary of DiffRecords, keyed by (expected_image_locator, | 250 # Dictionary of DiffRecords, keyed by (expected_image_locator, |
| 244 # actual_image_locator) tuples. | 251 # actual_image_locator) tuples. |
| 245 # Values can also be _DIFFRECORD_PENDING, _DIFFRECORD_FAILED. | 252 # Values can also be _DIFFRECORD_PENDING, _DIFFRECORD_FAILED. |
| 246 # | 253 # |
| 247 # Any thread that modifies _diff_dict must first acquire | 254 # Any thread that modifies _diff_dict must first acquire |
| 248 # _diff_dict_writelock! | 255 # _diff_dict_writelock! |
| 249 # | 256 # |
| 250 # TODO(epoger): Disk is limitless, but RAM is not... so, we should probably | 257 # TODO(epoger): Disk is limitless, but RAM is not... so, we should probably |
| 251 # remove items from self._diff_dict if they haven't been accessed for a | 258 # remove items from self._diff_dict if they haven't been accessed for a |
| 252 # long time. We can always regenerate them by diffing the images we | 259 # long time. We can always regenerate them by diffing the images we |
| (...skipping 10 matching lines...) Expand all Loading... |
| 263 # The queue maxsize must be 0 (infinite size queue), so that asynchronous | 270 # The queue maxsize must be 0 (infinite size queue), so that asynchronous |
| 264 # calls can return as soon as possible. | 271 # calls can return as soon as possible. |
| 265 self._tasks_queue = Queue.Queue(maxsize=0) | 272 self._tasks_queue = Queue.Queue(maxsize=0) |
| 266 self._workers = [] | 273 self._workers = [] |
| 267 for i in range(num_worker_threads): | 274 for i in range(num_worker_threads): |
| 268 worker = threading.Thread(target=self.worker, args=(i,)) | 275 worker = threading.Thread(target=self.worker, args=(i,)) |
| 269 worker.daemon = True | 276 worker.daemon = True |
| 270 worker.start() | 277 worker.start() |
| 271 self._workers.append(worker) | 278 self._workers.append(worker) |
| 272 | 279 |
| 280 def log_queue_size_if_changed(self, limit_verbosity=True): |
| 281 """Log the size of self._tasks_queue, if it has changed since the last call. |
| 282 |
| 283 Reports the current queue size, using log.info(), unless the queue is the |
| 284 same size as the last time we reported it. |
| 285 |
| 286 Args: |
| 287 limit_verbosity: if True, only log if the queue size is a multiple of |
| 288 QUEUE_LOGGING_GRANULARITY |
| 289 """ |
| 290 # Acquire the lock, to synchronize access to self._last_queue_size_reported |
| 291 self._queue_size_report_lock.acquire() |
| 292 try: |
| 293 size = self._tasks_queue.qsize() |
| 294 if size == self._last_queue_size_reported: |
| 295 return |
| 296 if limit_verbosity and (size % QUEUE_LOGGING_GRANULARITY != 0): |
| 297 return |
| 298 logging.info('tasks_queue size is %d' % size) |
| 299 self._last_queue_size_reported = size |
| 300 finally: |
| 301 self._queue_size_report_lock.release() |
| 302 |
| 273 def worker(self, worker_num): | 303 def worker(self, worker_num): |
| 274 """Launch a worker thread that pulls tasks off self._tasks_queue. | 304 """Launch a worker thread that pulls tasks off self._tasks_queue. |
| 275 | 305 |
| 276 Args: | 306 Args: |
| 277 worker_num: (integer) which worker this is | 307 worker_num: (integer) which worker this is |
| 278 """ | 308 """ |
| 279 while True: | 309 while True: |
| 310 self.log_queue_size_if_changed() |
| 280 params = self._tasks_queue.get() | 311 params = self._tasks_queue.get() |
| 281 key, expected_image_url, actual_image_url = params | 312 key, expected_image_url, actual_image_url = params |
| 282 try: | 313 try: |
| 283 diff_record = DiffRecord( | 314 diff_record = DiffRecord( |
| 284 self._gs, self._storage_root, | 315 self._gs, self._storage_root, |
| 285 expected_image_url=expected_image_url, | 316 expected_image_url=expected_image_url, |
| 286 expected_image_locator=key[0], | 317 expected_image_locator=key[0], |
| 287 actual_image_url=actual_image_url, | 318 actual_image_url=actual_image_url, |
| 288 actual_image_locator=key[1]) | 319 actual_image_locator=key[1]) |
| 289 except Exception: | 320 except Exception: |
| (...skipping 46 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 336 if not key in self._diff_dict: | 367 if not key in self._diff_dict: |
| 337 # If we have already requested a diff between these two images, | 368 # If we have already requested a diff between these two images, |
| 338 # we don't need to request it again. | 369 # we don't need to request it again. |
| 339 must_add_to_queue = True | 370 must_add_to_queue = True |
| 340 self._diff_dict[key] = _DIFFRECORD_PENDING | 371 self._diff_dict[key] = _DIFFRECORD_PENDING |
| 341 finally: | 372 finally: |
| 342 self._diff_dict_writelock.release() | 373 self._diff_dict_writelock.release() |
| 343 | 374 |
| 344 if must_add_to_queue: | 375 if must_add_to_queue: |
| 345 self._tasks_queue.put((key, expected_image_url, actual_image_url)) | 376 self._tasks_queue.put((key, expected_image_url, actual_image_url)) |
| 377 self.log_queue_size_if_changed() |
| 346 | 378 |
| 347 def get_diff_record(self, expected_image_locator, actual_image_locator): | 379 def get_diff_record(self, expected_image_locator, actual_image_locator): |
| 348 """Returns the DiffRecord for this image pair. | 380 """Returns the DiffRecord for this image pair. |
| 349 | 381 |
| 350 This call will block until the diff record is available, or we were unable | 382 This call will block until the diff record is available, or we were unable |
| 351 to generate it. | 383 to generate it. |
| 352 | 384 |
| 353 Args: | 385 Args: |
| 354 expected_image_locator: a unique ID string under which we will store the | 386 expected_image_locator: a unique ID string under which we will store the |
| 355 expected image within storage_root (probably including a checksum to | 387 expected image within storage_root (probably including a checksum to |
| (...skipping 101 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 457 | 489 |
| 458 Args: | 490 Args: |
| 459 expected_image_locator: locator string pointing at expected image | 491 expected_image_locator: locator string pointing at expected image |
| 460 actual_image_locator: locator string pointing at actual image | 492 actual_image_locator: locator string pointing at actual image |
| 461 | 493 |
| 462 Returns: already-sanitized locator where the diffs between expected and | 494 Returns: already-sanitized locator where the diffs between expected and |
| 463 actual images can be found | 495 actual images can be found |
| 464 """ | 496 """ |
| 465 return "%s-vs-%s" % (_sanitize_locator(expected_image_locator), | 497 return "%s-vs-%s" % (_sanitize_locator(expected_image_locator), |
| 466 _sanitize_locator(actual_image_locator)) | 498 _sanitize_locator(actual_image_locator)) |
| OLD | NEW |