OLD | NEW |
1 #!/usr/bin/python | 1 #!/usr/bin/python |
2 | 2 |
3 """ | 3 """ |
4 Copyright 2013 Google Inc. | 4 Copyright 2013 Google Inc. |
5 | 5 |
6 Use of this source code is governed by a BSD-style license that can be | 6 Use of this source code is governed by a BSD-style license that can be |
7 found in the LICENSE file. | 7 found in the LICENSE file. |
8 | 8 |
9 Calulate differences between image pairs, and store them in a database. | 9 Calulate differences between image pairs, and store them in a database. |
10 """ | 10 """ |
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
48 # NOTE: Keep these in sync with static/constants.js | 48 # NOTE: Keep these in sync with static/constants.js |
49 KEY__DIFFERENCES__MAX_DIFF_PER_CHANNEL = 'maxDiffPerChannel' | 49 KEY__DIFFERENCES__MAX_DIFF_PER_CHANNEL = 'maxDiffPerChannel' |
50 KEY__DIFFERENCES__NUM_DIFF_PIXELS = 'numDifferingPixels' | 50 KEY__DIFFERENCES__NUM_DIFF_PIXELS = 'numDifferingPixels' |
51 KEY__DIFFERENCES__PERCENT_DIFF_PIXELS = 'percentDifferingPixels' | 51 KEY__DIFFERENCES__PERCENT_DIFF_PIXELS = 'percentDifferingPixels' |
52 KEY__DIFFERENCES__PERCEPTUAL_DIFF = 'perceptualDifference' | 52 KEY__DIFFERENCES__PERCEPTUAL_DIFF = 'perceptualDifference' |
53 | 53 |
54 # Special values within ImageDiffDB._diff_dict | 54 # Special values within ImageDiffDB._diff_dict |
55 _DIFFRECORD_FAILED = 'failed' | 55 _DIFFRECORD_FAILED = 'failed' |
56 _DIFFRECORD_PENDING = 'pending' | 56 _DIFFRECORD_PENDING = 'pending' |
57 | 57 |
| 58 # How often to report tasks_queue size |
| 59 QUEUE_LOGGING_GRANULARITY = 1000 |
| 60 |
58 # Temporary variable to keep track of how many times we download | 61 # Temporary variable to keep track of how many times we download |
59 # the same file in multiple threads. | 62 # the same file in multiple threads. |
60 # TODO(epoger): Delete this, once we see that the number stays close to 0. | 63 # TODO(epoger): Delete this, once we see that the number stays close to 0. |
61 global_file_collisions = 0 | 64 global_file_collisions = 0 |
62 | 65 |
63 | 66 |
64 class DiffRecord(object): | 67 class DiffRecord(object): |
65 """ Record of differences between two images. """ | 68 """ Record of differences between two images. """ |
66 | 69 |
67 def __init__(self, gs, storage_root, | 70 def __init__(self, gs, storage_root, |
(...skipping 165 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
233 """ | 236 """ |
234 Args: | 237 Args: |
235 storage_root: string; root path within the DB will store all of its stuff | 238 storage_root: string; root path within the DB will store all of its stuff |
236 gs: instance of GSUtils object we can use to download images | 239 gs: instance of GSUtils object we can use to download images |
237 num_worker_threads: how many threads that download images and | 240 num_worker_threads: how many threads that download images and |
238 generate diffs simultaneously | 241 generate diffs simultaneously |
239 """ | 242 """ |
240 self._storage_root = storage_root | 243 self._storage_root = storage_root |
241 self._gs = gs | 244 self._gs = gs |
242 | 245 |
| 246 # Mechanism for reporting queue size periodically. |
| 247 self._last_queue_size_reported = None |
| 248 self._queue_size_report_lock = threading.RLock() |
| 249 |
243 # Dictionary of DiffRecords, keyed by (expected_image_locator, | 250 # Dictionary of DiffRecords, keyed by (expected_image_locator, |
244 # actual_image_locator) tuples. | 251 # actual_image_locator) tuples. |
245 # Values can also be _DIFFRECORD_PENDING, _DIFFRECORD_FAILED. | 252 # Values can also be _DIFFRECORD_PENDING, _DIFFRECORD_FAILED. |
246 # | 253 # |
247 # Any thread that modifies _diff_dict must first acquire | 254 # Any thread that modifies _diff_dict must first acquire |
248 # _diff_dict_writelock! | 255 # _diff_dict_writelock! |
249 # | 256 # |
250 # TODO(epoger): Disk is limitless, but RAM is not... so, we should probably | 257 # TODO(epoger): Disk is limitless, but RAM is not... so, we should probably |
251 # remove items from self._diff_dict if they haven't been accessed for a | 258 # remove items from self._diff_dict if they haven't been accessed for a |
252 # long time. We can always regenerate them by diffing the images we | 259 # long time. We can always regenerate them by diffing the images we |
(...skipping 10 matching lines...) Expand all Loading... |
263 # The queue maxsize must be 0 (infinite size queue), so that asynchronous | 270 # The queue maxsize must be 0 (infinite size queue), so that asynchronous |
264 # calls can return as soon as possible. | 271 # calls can return as soon as possible. |
265 self._tasks_queue = Queue.Queue(maxsize=0) | 272 self._tasks_queue = Queue.Queue(maxsize=0) |
266 self._workers = [] | 273 self._workers = [] |
267 for i in range(num_worker_threads): | 274 for i in range(num_worker_threads): |
268 worker = threading.Thread(target=self.worker, args=(i,)) | 275 worker = threading.Thread(target=self.worker, args=(i,)) |
269 worker.daemon = True | 276 worker.daemon = True |
270 worker.start() | 277 worker.start() |
271 self._workers.append(worker) | 278 self._workers.append(worker) |
272 | 279 |
| 280 def log_queue_size_if_changed(self, limit_verbosity=True): |
| 281 """Log the size of self._tasks_queue, if it has changed since the last call. |
| 282 |
| 283 Reports the current queue size, using log.info(), unless the queue is the |
| 284 same size as the last time we reported it. |
| 285 |
| 286 Args: |
| 287 limit_verbosity: if True, only log if the queue size is a multiple of |
| 288 QUEUE_LOGGING_GRANULARITY |
| 289 """ |
| 290 # Acquire the lock, to synchronize access to self._last_queue_size_reported |
| 291 self._queue_size_report_lock.acquire() |
| 292 try: |
| 293 size = self._tasks_queue.qsize() |
| 294 if size == self._last_queue_size_reported: |
| 295 return |
| 296 if limit_verbosity and (size % QUEUE_LOGGING_GRANULARITY != 0): |
| 297 return |
| 298 logging.info('tasks_queue size is %d' % size) |
| 299 self._last_queue_size_reported = size |
| 300 finally: |
| 301 self._queue_size_report_lock.release() |
| 302 |
273 def worker(self, worker_num): | 303 def worker(self, worker_num): |
274 """Launch a worker thread that pulls tasks off self._tasks_queue. | 304 """Launch a worker thread that pulls tasks off self._tasks_queue. |
275 | 305 |
276 Args: | 306 Args: |
277 worker_num: (integer) which worker this is | 307 worker_num: (integer) which worker this is |
278 """ | 308 """ |
279 while True: | 309 while True: |
| 310 self.log_queue_size_if_changed() |
280 params = self._tasks_queue.get() | 311 params = self._tasks_queue.get() |
281 key, expected_image_url, actual_image_url = params | 312 key, expected_image_url, actual_image_url = params |
282 try: | 313 try: |
283 diff_record = DiffRecord( | 314 diff_record = DiffRecord( |
284 self._gs, self._storage_root, | 315 self._gs, self._storage_root, |
285 expected_image_url=expected_image_url, | 316 expected_image_url=expected_image_url, |
286 expected_image_locator=key[0], | 317 expected_image_locator=key[0], |
287 actual_image_url=actual_image_url, | 318 actual_image_url=actual_image_url, |
288 actual_image_locator=key[1]) | 319 actual_image_locator=key[1]) |
289 except Exception: | 320 except Exception: |
(...skipping 46 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
336 if not key in self._diff_dict: | 367 if not key in self._diff_dict: |
337 # If we have already requested a diff between these two images, | 368 # If we have already requested a diff between these two images, |
338 # we don't need to request it again. | 369 # we don't need to request it again. |
339 must_add_to_queue = True | 370 must_add_to_queue = True |
340 self._diff_dict[key] = _DIFFRECORD_PENDING | 371 self._diff_dict[key] = _DIFFRECORD_PENDING |
341 finally: | 372 finally: |
342 self._diff_dict_writelock.release() | 373 self._diff_dict_writelock.release() |
343 | 374 |
344 if must_add_to_queue: | 375 if must_add_to_queue: |
345 self._tasks_queue.put((key, expected_image_url, actual_image_url)) | 376 self._tasks_queue.put((key, expected_image_url, actual_image_url)) |
| 377 self.log_queue_size_if_changed() |
346 | 378 |
347 def get_diff_record(self, expected_image_locator, actual_image_locator): | 379 def get_diff_record(self, expected_image_locator, actual_image_locator): |
348 """Returns the DiffRecord for this image pair. | 380 """Returns the DiffRecord for this image pair. |
349 | 381 |
350 This call will block until the diff record is available, or we were unable | 382 This call will block until the diff record is available, or we were unable |
351 to generate it. | 383 to generate it. |
352 | 384 |
353 Args: | 385 Args: |
354 expected_image_locator: a unique ID string under which we will store the | 386 expected_image_locator: a unique ID string under which we will store the |
355 expected image within storage_root (probably including a checksum to | 387 expected image within storage_root (probably including a checksum to |
(...skipping 101 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
457 | 489 |
458 Args: | 490 Args: |
459 expected_image_locator: locator string pointing at expected image | 491 expected_image_locator: locator string pointing at expected image |
460 actual_image_locator: locator string pointing at actual image | 492 actual_image_locator: locator string pointing at actual image |
461 | 493 |
462 Returns: already-sanitized locator where the diffs between expected and | 494 Returns: already-sanitized locator where the diffs between expected and |
463 actual images can be found | 495 actual images can be found |
464 """ | 496 """ |
465 return "%s-vs-%s" % (_sanitize_locator(expected_image_locator), | 497 return "%s-vs-%s" % (_sanitize_locator(expected_image_locator), |
466 _sanitize_locator(actual_image_locator)) | 498 _sanitize_locator(actual_image_locator)) |
OLD | NEW |