OLD | NEW |
---|---|
1 #!/usr/bin/python | 1 #!/usr/bin/python |
2 | 2 |
3 """ | 3 """ |
4 Copyright 2013 Google Inc. | 4 Copyright 2013 Google Inc. |
5 | 5 |
6 Use of this source code is governed by a BSD-style license that can be | 6 Use of this source code is governed by a BSD-style license that can be |
7 found in the LICENSE file. | 7 found in the LICENSE file. |
8 | 8 |
9 Calulate differences between image pairs, and store them in a database. | 9 Calulate differences between image pairs, and store them in a database. |
10 """ | 10 """ |
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
48 # NOTE: Keep these in sync with static/constants.js | 48 # NOTE: Keep these in sync with static/constants.js |
49 KEY__DIFFERENCES__MAX_DIFF_PER_CHANNEL = 'maxDiffPerChannel' | 49 KEY__DIFFERENCES__MAX_DIFF_PER_CHANNEL = 'maxDiffPerChannel' |
50 KEY__DIFFERENCES__NUM_DIFF_PIXELS = 'numDifferingPixels' | 50 KEY__DIFFERENCES__NUM_DIFF_PIXELS = 'numDifferingPixels' |
51 KEY__DIFFERENCES__PERCENT_DIFF_PIXELS = 'percentDifferingPixels' | 51 KEY__DIFFERENCES__PERCENT_DIFF_PIXELS = 'percentDifferingPixels' |
52 KEY__DIFFERENCES__PERCEPTUAL_DIFF = 'perceptualDifference' | 52 KEY__DIFFERENCES__PERCEPTUAL_DIFF = 'perceptualDifference' |
53 | 53 |
54 # Special values within ImageDiffDB._diff_dict | 54 # Special values within ImageDiffDB._diff_dict |
55 _DIFFRECORD_FAILED = 'failed' | 55 _DIFFRECORD_FAILED = 'failed' |
56 _DIFFRECORD_PENDING = 'pending' | 56 _DIFFRECORD_PENDING = 'pending' |
57 | 57 |
58 # How often to report tasks_queue size | |
59 QUEUE_LOGGING_GRANULARITY = 1000 | |
60 | |
58 # Temporary variable to keep track of how many times we download | 61 # Temporary variable to keep track of how many times we download |
59 # the same file in multiple threads. | 62 # the same file in multiple threads. |
60 # TODO(epoger): Delete this, once we see that the number stays close to 0. | 63 # TODO(epoger): Delete this, once we see that the number stays close to 0. |
61 global_file_collisions = 0 | 64 global_file_collisions = 0 |
62 | 65 |
63 | 66 |
64 class DiffRecord(object): | 67 class DiffRecord(object): |
65 """ Record of differences between two images. """ | 68 """ Record of differences between two images. """ |
66 | 69 |
67 def __init__(self, gs, storage_root, | 70 def __init__(self, gs, storage_root, |
(...skipping 165 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
233 """ | 236 """ |
234 Args: | 237 Args: |
235 storage_root: string; root path within the DB will store all of its stuff | 238 storage_root: string; root path within the DB will store all of its stuff |
236 gs: instance of GSUtils object we can use to download images | 239 gs: instance of GSUtils object we can use to download images |
237 num_worker_threads: how many threads that download images and | 240 num_worker_threads: how many threads that download images and |
238 generate diffs simultaneously | 241 generate diffs simultaneously |
239 """ | 242 """ |
240 self._storage_root = storage_root | 243 self._storage_root = storage_root |
241 self._gs = gs | 244 self._gs = gs |
242 | 245 |
246 # Mechanism for reporting queue size periodically. | |
247 self._last_queue_size_reported = None | |
248 self._queue_size_report_lock = threading.RLock() | |
249 | |
243 # Dictionary of DiffRecords, keyed by (expected_image_locator, | 250 # Dictionary of DiffRecords, keyed by (expected_image_locator, |
244 # actual_image_locator) tuples. | 251 # actual_image_locator) tuples. |
245 # Values can also be _DIFFRECORD_PENDING, _DIFFRECORD_FAILED. | 252 # Values can also be _DIFFRECORD_PENDING, _DIFFRECORD_FAILED. |
246 # | 253 # |
247 # Any thread that modifies _diff_dict must first acquire | 254 # Any thread that modifies _diff_dict must first acquire |
248 # _diff_dict_writelock! | 255 # _diff_dict_writelock! |
249 # | 256 # |
250 # TODO(epoger): Disk is limitless, but RAM is not... so, we should probably | 257 # TODO(epoger): Disk is limitless, but RAM is not... so, we should probably |
251 # remove items from self._diff_dict if they haven't been accessed for a | 258 # remove items from self._diff_dict if they haven't been accessed for a |
252 # long time. We can always regenerate them by diffing the images we | 259 # long time. We can always regenerate them by diffing the images we |
(...skipping 10 matching lines...) Expand all Loading... | |
263 # The queue maxsize must be 0 (infinite size queue), so that asynchronous | 270 # The queue maxsize must be 0 (infinite size queue), so that asynchronous |
264 # calls can return as soon as possible. | 271 # calls can return as soon as possible. |
265 self._tasks_queue = Queue.Queue(maxsize=0) | 272 self._tasks_queue = Queue.Queue(maxsize=0) |
266 self._workers = [] | 273 self._workers = [] |
267 for i in range(num_worker_threads): | 274 for i in range(num_worker_threads): |
268 worker = threading.Thread(target=self.worker, args=(i,)) | 275 worker = threading.Thread(target=self.worker, args=(i,)) |
269 worker.daemon = True | 276 worker.daemon = True |
270 worker.start() | 277 worker.start() |
271 self._workers.append(worker) | 278 self._workers.append(worker) |
272 | 279 |
280 def log_queue_size_if_changed(self, limit_verbosity=True): | |
281 """Log the size of self._tasks_queue, if it has changed since the last call. | |
282 | |
283 Reports the current queue size, using log.info(), unless the queue is the | |
284 same size as the last time we reported it. | |
285 | |
286 Args: | |
287 limit_verbosity: if True, only log if the queue size is a multiple of | |
288 QUEUE_LOGGING_GRANULARITY | |
289 """ | |
290 self._queue_size_report_lock.acquire() | |
291 try: | |
292 size = self._tasks_queue.qsize() | |
293 if size == self._last_queue_size_reported: | |
294 return | |
295 if limit_verbosity and (size % QUEUE_LOGGING_GRANULARITY != 0): | |
296 return | |
297 logging.info('tasks_queue size is %d' % size) | |
298 self._last_queue_size_reported = size | |
299 finally: | |
300 self._queue_size_report_lock.release() | |
301 | |
stephana
2014/08/11 13:35:18
This is a nit, but why do you need a lock for read
epoger
2014/08/11 18:49:09
Actually, we need it for self._last_queue_size_rep
| |
273 def worker(self, worker_num): | 302 def worker(self, worker_num): |
274 """Launch a worker thread that pulls tasks off self._tasks_queue. | 303 """Launch a worker thread that pulls tasks off self._tasks_queue. |
275 | 304 |
276 Args: | 305 Args: |
277 worker_num: (integer) which worker this is | 306 worker_num: (integer) which worker this is |
278 """ | 307 """ |
279 while True: | 308 while True: |
309 self.log_queue_size_if_changed() | |
280 params = self._tasks_queue.get() | 310 params = self._tasks_queue.get() |
281 key, expected_image_url, actual_image_url = params | 311 key, expected_image_url, actual_image_url = params |
282 try: | 312 try: |
283 diff_record = DiffRecord( | 313 diff_record = DiffRecord( |
284 self._gs, self._storage_root, | 314 self._gs, self._storage_root, |
285 expected_image_url=expected_image_url, | 315 expected_image_url=expected_image_url, |
286 expected_image_locator=key[0], | 316 expected_image_locator=key[0], |
287 actual_image_url=actual_image_url, | 317 actual_image_url=actual_image_url, |
288 actual_image_locator=key[1]) | 318 actual_image_locator=key[1]) |
289 except Exception: | 319 except Exception: |
(...skipping 46 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
336 if not key in self._diff_dict: | 366 if not key in self._diff_dict: |
337 # If we have already requested a diff between these two images, | 367 # If we have already requested a diff between these two images, |
338 # we don't need to request it again. | 368 # we don't need to request it again. |
339 must_add_to_queue = True | 369 must_add_to_queue = True |
340 self._diff_dict[key] = _DIFFRECORD_PENDING | 370 self._diff_dict[key] = _DIFFRECORD_PENDING |
341 finally: | 371 finally: |
342 self._diff_dict_writelock.release() | 372 self._diff_dict_writelock.release() |
343 | 373 |
344 if must_add_to_queue: | 374 if must_add_to_queue: |
345 self._tasks_queue.put((key, expected_image_url, actual_image_url)) | 375 self._tasks_queue.put((key, expected_image_url, actual_image_url)) |
376 self.log_queue_size_if_changed() | |
346 | 377 |
347 def get_diff_record(self, expected_image_locator, actual_image_locator): | 378 def get_diff_record(self, expected_image_locator, actual_image_locator): |
348 """Returns the DiffRecord for this image pair. | 379 """Returns the DiffRecord for this image pair. |
349 | 380 |
350 This call will block until the diff record is available, or we were unable | 381 This call will block until the diff record is available, or we were unable |
351 to generate it. | 382 to generate it. |
352 | 383 |
353 Args: | 384 Args: |
354 expected_image_locator: a unique ID string under which we will store the | 385 expected_image_locator: a unique ID string under which we will store the |
355 expected image within storage_root (probably including a checksum to | 386 expected image within storage_root (probably including a checksum to |
(...skipping 101 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
457 | 488 |
458 Args: | 489 Args: |
459 expected_image_locator: locator string pointing at expected image | 490 expected_image_locator: locator string pointing at expected image |
460 actual_image_locator: locator string pointing at actual image | 491 actual_image_locator: locator string pointing at actual image |
461 | 492 |
462 Returns: already-sanitized locator where the diffs between expected and | 493 Returns: already-sanitized locator where the diffs between expected and |
463 actual images can be found | 494 actual images can be found |
464 """ | 495 """ |
465 return "%s-vs-%s" % (_sanitize_locator(expected_image_locator), | 496 return "%s-vs-%s" % (_sanitize_locator(expected_image_locator), |
466 _sanitize_locator(actual_image_locator)) | 497 _sanitize_locator(actual_image_locator)) |
OLD | NEW |