| Index: gm/rebaseline_server/imagediffdb.py
|
| diff --git a/gm/rebaseline_server/imagediffdb.py b/gm/rebaseline_server/imagediffdb.py
|
| index 1f939407a8a2e93b2af7d89031f0506a8b39f307..167134aaf4b2a2efb9f41f8de1b2e7601bf60690 100644
|
| --- a/gm/rebaseline_server/imagediffdb.py
|
| +++ b/gm/rebaseline_server/imagediffdb.py
|
| @@ -55,6 +55,9 @@ KEY__DIFFERENCES__PERCEPTUAL_DIFF = 'perceptualDifference'
|
| _DIFFRECORD_FAILED = 'failed'
|
| _DIFFRECORD_PENDING = 'pending'
|
|
|
| +# How often to report tasks_queue size
|
| +QUEUE_LOGGING_GRANULARITY = 1000
|
| +
|
| # Temporary variable to keep track of how many times we download
|
| # the same file in multiple threads.
|
| # TODO(epoger): Delete this, once we see that the number stays close to 0.
|
| @@ -240,6 +243,10 @@ class ImageDiffDB(object):
|
| self._storage_root = storage_root
|
| self._gs = gs
|
|
|
| + # Mechanism for reporting queue size periodically.
|
| + self._last_queue_size_reported = None
|
| + self._queue_size_report_lock = threading.RLock()
|
| +
|
| # Dictionary of DiffRecords, keyed by (expected_image_locator,
|
| # actual_image_locator) tuples.
|
| # Values can also be _DIFFRECORD_PENDING, _DIFFRECORD_FAILED.
|
| @@ -270,6 +277,29 @@ class ImageDiffDB(object):
|
| worker.start()
|
| self._workers.append(worker)
|
|
|
| + def log_queue_size_if_changed(self, limit_verbosity=True):
|
| + """Log the size of self._tasks_queue, if it has changed since the last call.
|
| +
|
| + Reports the current queue size, using log.info(), unless the queue is the
|
| + same size as the last time we reported it.
|
| +
|
| + Args:
|
| + limit_verbosity: if True, only log if the queue size is a multiple of
|
| + QUEUE_LOGGING_GRANULARITY
|
| + """
|
| + # Acquire the lock, to synchronize access to self._last_queue_size_reported
|
| + self._queue_size_report_lock.acquire()
|
| + try:
|
| + size = self._tasks_queue.qsize()
|
| + if size == self._last_queue_size_reported:
|
| + return
|
| + if limit_verbosity and (size % QUEUE_LOGGING_GRANULARITY != 0):
|
| + return
|
| + logging.info('tasks_queue size is %d' % size)
|
| + self._last_queue_size_reported = size
|
| + finally:
|
| + self._queue_size_report_lock.release()
|
| +
|
| def worker(self, worker_num):
|
| """Launch a worker thread that pulls tasks off self._tasks_queue.
|
|
|
| @@ -277,6 +307,7 @@ class ImageDiffDB(object):
|
| worker_num: (integer) which worker this is
|
| """
|
| while True:
|
| + self.log_queue_size_if_changed()
|
| params = self._tasks_queue.get()
|
| key, expected_image_url, actual_image_url = params
|
| try:
|
| @@ -343,6 +374,7 @@ class ImageDiffDB(object):
|
|
|
| if must_add_to_queue:
|
| self._tasks_queue.put((key, expected_image_url, actual_image_url))
|
| + self.log_queue_size_if_changed()
|
|
|
| def get_diff_record(self, expected_image_locator, actual_image_locator):
|
| """Returns the DiffRecord for this image pair.
|
|
|