Index: gm/rebaseline_server/imagediffdb.py |
diff --git a/gm/rebaseline_server/imagediffdb.py b/gm/rebaseline_server/imagediffdb.py |
index 1f939407a8a2e93b2af7d89031f0506a8b39f307..167134aaf4b2a2efb9f41f8de1b2e7601bf60690 100644 |
--- a/gm/rebaseline_server/imagediffdb.py |
+++ b/gm/rebaseline_server/imagediffdb.py |
@@ -55,6 +55,9 @@ KEY__DIFFERENCES__PERCEPTUAL_DIFF = 'perceptualDifference' |
_DIFFRECORD_FAILED = 'failed' |
_DIFFRECORD_PENDING = 'pending' |
+# How often to report tasks_queue size |
+QUEUE_LOGGING_GRANULARITY = 1000 |
+ |
# Temporary variable to keep track of how many times we download |
# the same file in multiple threads. |
# TODO(epoger): Delete this, once we see that the number stays close to 0. |
@@ -240,6 +243,10 @@ class ImageDiffDB(object): |
self._storage_root = storage_root |
self._gs = gs |
+ # Mechanism for reporting queue size periodically. |
+ self._last_queue_size_reported = None |
+ self._queue_size_report_lock = threading.RLock() |
+ |
# Dictionary of DiffRecords, keyed by (expected_image_locator, |
# actual_image_locator) tuples. |
# Values can also be _DIFFRECORD_PENDING, _DIFFRECORD_FAILED. |
@@ -270,6 +277,29 @@ class ImageDiffDB(object): |
worker.start() |
self._workers.append(worker) |
+ def log_queue_size_if_changed(self, limit_verbosity=True): |
+ """Log the size of self._tasks_queue, if it has changed since the last call. |
+ |
+ Reports the current queue size, using log.info(), unless the queue is the |
+ same size as the last time we reported it. |
+ |
+ Args: |
+ limit_verbosity: if True, only log if the queue size is a multiple of |
+ QUEUE_LOGGING_GRANULARITY |
+ """ |
+ # Acquire the lock, to synchronize access to self._last_queue_size_reported |
+ self._queue_size_report_lock.acquire() |
+ try: |
+ size = self._tasks_queue.qsize() |
+ if size == self._last_queue_size_reported: |
+ return |
+ if limit_verbosity and (size % QUEUE_LOGGING_GRANULARITY != 0): |
+ return |
+ logging.info('tasks_queue size is %d' % size) |
+ self._last_queue_size_reported = size |
+ finally: |
+ self._queue_size_report_lock.release() |
+ |
def worker(self, worker_num): |
"""Launch a worker thread that pulls tasks off self._tasks_queue. |
@@ -277,6 +307,7 @@ class ImageDiffDB(object): |
worker_num: (integer) which worker this is |
""" |
while True: |
+ self.log_queue_size_if_changed() |
params = self._tasks_queue.get() |
key, expected_image_url, actual_image_url = params |
try: |
@@ -343,6 +374,7 @@ class ImageDiffDB(object): |
if must_add_to_queue: |
self._tasks_queue.put((key, expected_image_url, actual_image_url)) |
+ self.log_queue_size_if_changed() |
def get_diff_record(self, expected_image_locator, actual_image_locator): |
"""Returns the DiffRecord for this image pair. |