Chromium Code Reviews| Index: gm/rebaseline_server/imagediffdb.py |
| diff --git a/gm/rebaseline_server/imagediffdb.py b/gm/rebaseline_server/imagediffdb.py |
| index 1f939407a8a2e93b2af7d89031f0506a8b39f307..ab0dcfc7a8b4f0b5e317a82a8e3550bf6ee6a10c 100644 |
| --- a/gm/rebaseline_server/imagediffdb.py |
| +++ b/gm/rebaseline_server/imagediffdb.py |
| @@ -55,6 +55,9 @@ KEY__DIFFERENCES__PERCEPTUAL_DIFF = 'perceptualDifference' |
| _DIFFRECORD_FAILED = 'failed' |
| _DIFFRECORD_PENDING = 'pending' |
| +# How often to report tasks_queue size |
| +QUEUE_LOGGING_GRANULARITY = 1000 |
| + |
| # Temporary variable to keep track of how many times we download |
| # the same file in multiple threads. |
| # TODO(epoger): Delete this, once we see that the number stays close to 0. |
| @@ -240,6 +243,10 @@ class ImageDiffDB(object): |
| self._storage_root = storage_root |
| self._gs = gs |
| + # Mechanism for reporting queue size periodically. |
| + self._last_queue_size_reported = None |
| + self._queue_size_report_lock = threading.RLock() |
| + |
| # Dictionary of DiffRecords, keyed by (expected_image_locator, |
| # actual_image_locator) tuples. |
| # Values can also be _DIFFRECORD_PENDING, _DIFFRECORD_FAILED. |
| @@ -270,6 +277,28 @@ class ImageDiffDB(object): |
| worker.start() |
| self._workers.append(worker) |
| + def log_queue_size_if_changed(self, limit_verbosity=True): |
| + """Log the size of self._tasks_queue, if it has changed since the last call. |
| + |
| + Reports the current queue size, using log.info(), unless the queue is the |
| + same size as the last time we reported it. |
| + |
| + Args: |
| + limit_verbosity: if True, only log if the queue size is a multiple of |
| + QUEUE_LOGGING_GRANULARITY |
| + """ |
| + self._queue_size_report_lock.acquire() |
| + try: |
| + size = self._tasks_queue.qsize() |
| + if size == self._last_queue_size_reported: |
| + return |
| + if limit_verbosity and (size % QUEUE_LOGGING_GRANULARITY != 0): |
| + return |
| + logging.info('tasks_queue size is %d' % size) |
| + self._last_queue_size_reported = size |
| + finally: |
| + self._queue_size_report_lock.release() |
| + |
|
stephana
2014/08/11 13:35:18
This is a nit, but why do you need a lock for read
epoger
2014/08/11 18:49:09
Actually, we need it for self._last_queue_size_rep
|
| def worker(self, worker_num): |
| """Launch a worker thread that pulls tasks off self._tasks_queue. |
| @@ -277,6 +306,7 @@ class ImageDiffDB(object): |
| worker_num: (integer) which worker this is |
| """ |
| while True: |
| + self.log_queue_size_if_changed() |
| params = self._tasks_queue.get() |
| key, expected_image_url, actual_image_url = params |
| try: |
| @@ -343,6 +373,7 @@ class ImageDiffDB(object): |
| if must_add_to_queue: |
| self._tasks_queue.put((key, expected_image_url, actual_image_url)) |
| + self.log_queue_size_if_changed() |
| def get_diff_record(self, expected_image_locator, actual_image_locator): |
| """Returns the DiffRecord for this image pair. |