Chromium Code Reviews| Index: gm/rebaseline_server/imagediffdb.py |
| diff --git a/gm/rebaseline_server/imagediffdb.py b/gm/rebaseline_server/imagediffdb.py |
| index 89f9fef319a4045909553c7c840bbc19fb1004f0..687030b62ac5be84215c61328af865219c9a315c 100644 |
| --- a/gm/rebaseline_server/imagediffdb.py |
| +++ b/gm/rebaseline_server/imagediffdb.py |
| @@ -11,12 +11,16 @@ Calulate differences between image pairs, and store them in a database. |
| # System-level imports |
| import contextlib |
| +import errno |
| import json |
| import logging |
| import os |
| +import Queue |
| import re |
| import shutil |
| import tempfile |
| +import threading |
| +import time |
| import urllib |
| # Must fix up PYTHONPATH before importing from within Skia |
| @@ -24,11 +28,16 @@ import fix_pythonpath # pylint: disable=W0611 |
| # Imports from within Skia |
| import find_run_binary |
| +from py.utils import gs_utils |
| + |
| SKPDIFF_BINARY = find_run_binary.find_path_to_program('skpdiff') |
| DEFAULT_IMAGE_SUFFIX = '.png' |
| DEFAULT_IMAGES_SUBDIR = 'images' |
| +# TODO(epoger): Using a conservative default number of threads, to avoid |
| +# the "too many open files" bug we saw in http://skbug.com/2423 |
|
rmistry
2014/08/04 20:52:49
Is this a TODO or a comment?
epoger
2014/08/05 03:34:08
Done.
|
| +DEFAULT_NUM_WORKER_THREADS = 1 |
| DISALLOWED_FILEPATH_CHAR_REGEX = re.compile('[^\w\-]') |
| @@ -42,11 +51,19 @@ KEY__DIFFERENCES__NUM_DIFF_PIXELS = 'numDifferingPixels' |
| KEY__DIFFERENCES__PERCENT_DIFF_PIXELS = 'percentDifferingPixels' |
| KEY__DIFFERENCES__PERCEPTUAL_DIFF = 'perceptualDifference' |
| +# Special values within ImageDiffDB._diff_dict |
| +_DIFFRECORD_FAILED = 'failed' |
| +_DIFFRECORD_PENDING = 'pending' |
| + |
| +# TODO(epoger): Temporary variable to keep track of how many times we download |
| +# the same file in multiple threads. |
|
rmistry
2014/08/04 20:52:49
TODO or comment?
epoger
2014/08/05 03:34:08
Done.
|
| +global_file_collisions = 0 |
| + |
| class DiffRecord(object): |
| """ Record of differences between two images. """ |
| - def __init__(self, storage_root, |
| + def __init__(self, gs, storage_root, |
| expected_image_url, expected_image_locator, |
| actual_image_url, actual_image_locator, |
| expected_images_subdir=DEFAULT_IMAGES_SUBDIR, |
| @@ -55,18 +72,16 @@ class DiffRecord(object): |
| """Download this pair of images (unless we already have them on local disk), |
| and prepare a DiffRecord for them. |
| - TODO(epoger): Make this asynchronously download images, rather than blocking |
| - until the images have been downloaded and processed. |
| - |
| Args: |
| + gs: instance of GSUtils object we can use to download images |
| storage_root: root directory on local disk within which we store all |
| images |
| - expected_image_url: file or HTTP url from which we will download the |
| + expected_image_url: file, GS, or HTTP url from which we will download the |
| expected image |
| expected_image_locator: a unique ID string under which we will store the |
| expected image within storage_root (probably including a checksum to |
| guarantee uniqueness) |
| - actual_image_url: file or HTTP url from which we will download the |
| + actual_image_url: file, GS, or HTTP url from which we will download the |
| actual image |
| actual_image_locator: a unique ID string under which we will store the |
| actual image within storage_root (probably including a checksum to |
| @@ -79,8 +94,6 @@ class DiffRecord(object): |
| actual_image_locator = _sanitize_locator(actual_image_locator) |
| # Download the expected/actual images, if we don't have them already. |
| - # TODO(rmistry): Add a parameter that just tries to use already-present |
| - # image files rather than downloading them. |
| expected_image_file = os.path.join( |
| storage_root, expected_images_subdir, |
| str(expected_image_locator) + image_suffix) |
| @@ -88,13 +101,13 @@ class DiffRecord(object): |
| storage_root, actual_images_subdir, |
| str(actual_image_locator) + image_suffix) |
| try: |
| - _download_file(expected_image_file, expected_image_url) |
| + _download_file(gs, expected_image_file, expected_image_url) |
| except Exception: |
| logging.exception('unable to download expected_image_url %s to file %s' % |
| (expected_image_url, expected_image_file)) |
| raise |
| try: |
| - _download_file(actual_image_file, actual_image_url) |
| + _download_file(gs, actual_image_file, actual_image_url) |
| except Exception: |
| logging.exception('unable to download actual_image_url %s to file %s' % |
| (actual_image_url, actual_image_file)) |
| @@ -112,8 +125,12 @@ class DiffRecord(object): |
| actual_img = os.path.join(storage_root, actual_images_subdir, |
| str(actual_image_locator) + image_suffix) |
| - # TODO: Call skpdiff ONCE for all image pairs, instead of calling it |
| - # repeatedly. This will allow us to parallelize a lot more work. |
| + # TODO(epoger): Consider calling skpdiff ONCE for all image pairs, |
| + # instead of calling it separately for each image pair. |
| + # Pro: we'll incur less overhead from making repeated system calls, |
| + # spinning up the skpdiff binary, etc. |
| + # Con: we would have to wait until all image pairs were loaded before |
| + # generating any of the diffs? |
| find_run_binary.run_command( |
| [SKPDIFF_BINARY, '-p', expected_img, actual_img, |
| '--jsonp', 'false', |
| @@ -211,16 +228,71 @@ class ImageDiffDB(object): |
| """ Calculates differences between image pairs, maintaining a database of |
| them for download.""" |
| - def __init__(self, storage_root): |
| + def __init__(self, storage_root, gs=None, |
| + num_worker_threads=DEFAULT_NUM_WORKER_THREADS): |
| """ |
| Args: |
| storage_root: string; root path within the DB will store all of its stuff |
| + gs: instance of GSUtils object we can use to download images |
| + num_worker_threads: how many threads that download images and |
| + generate diffs simultaneously |
| """ |
| self._storage_root = storage_root |
| + self._gs = gs |
| # Dictionary of DiffRecords, keyed by (expected_image_locator, |
| # actual_image_locator) tuples. |
| + # Values can also be _DIFFRECORD_PENDING, _DIFFRECORD_FAILED. |
| + # |
| + # Any thread that modifies _diff_dict must first acquire |
| + # _diff_dict_writelock! |
| + # |
| + # TODO(epoger): Disk is limitless, but RAM is not... so, we should probably |
| + # remove items from self._diff_dict if they haven't been accessed for a |
| + # long time. We can always regenerate them by diffing the images we |
| + # previously downloaded to local disk. |
| + # I guess we should figure out how expensive it is to download vs diff the |
| + # image pairs... if diffing them is expensive too, we can write these |
| + # _diff_dict objects out to disk if there's too many to hold in RAM. |
| + # Or we could use virtual memory to handle that automatically. |
| self._diff_dict = {} |
| + self._diff_dict_writelock = threading.RLock() |
| + |
| + # Set up the queue for asynchronously loading DiffRecords, and start the |
| + # worker threads reading from it. |
| + self._tasks_queue = Queue.Queue(maxsize=2*num_worker_threads) |
| + self._workers = [] |
| + for i in range(num_worker_threads): |
| + worker = threading.Thread(target=self.worker, args=(i,)) |
| + worker.daemon = True |
| + worker.start() |
| + self._workers.append(worker) |
| + |
| + def worker(self, worker_num): |
| + """Launch a worker thread that pulls tasks off self._tasks_queue. |
| + |
| + Args: |
| + worker_num: (integer) which worker this is |
| + """ |
| + while True: |
| + params = self._tasks_queue.get() |
| + key, expected_image_url, actual_image_url = params |
| + try: |
| + diff_record = DiffRecord( |
| + self._gs, self._storage_root, |
| + expected_image_url=expected_image_url, |
| + expected_image_locator=key[0], |
| + actual_image_url=actual_image_url, |
| + actual_image_locator=key[1]) |
| + except Exception: |
| + logging.exception( |
| + 'exception while creating DiffRecord for key %s' % str(key)) |
| + diff_record = _DIFFRECORD_FAILED |
| + self._diff_dict_writelock.acquire() |
| + try: |
| + self._diff_dict[key] = diff_record |
| + finally: |
| + self._diff_dict_writelock.release() |
| @property |
| def storage_root(self): |
| @@ -229,24 +301,21 @@ class ImageDiffDB(object): |
| def add_image_pair(self, |
| expected_image_url, expected_image_locator, |
| actual_image_url, actual_image_locator): |
| - """Download this pair of images (unless we already have them on local disk), |
| - and prepare a DiffRecord for them. |
| + """Asynchronously prepare a DiffRecord for a pair of images. |
| + |
| + This method will return quickly; calls to get_diff_record() will block |
| + until the DiffRecord is available (or we have given up on creating it). |
| - TODO(epoger): Make this asynchronously download images, rather than blocking |
| - until the images have been downloaded and processed. |
| - When we do that, we should probably add a new method that will block |
| - until all of the images have been downloaded and processed. Otherwise, |
| - we won't know when it's safe to start calling get_diff_record(). |
| - jcgregorio notes: maybe just make ImageDiffDB thread-safe and create a |
| - thread-pool/worker queue at a higher level that just uses ImageDiffDB? |
| + If we already have a DiffRecord for this particular image pair, no work |
| + will be done. |
| Args: |
| - expected_image_url: file or HTTP url from which we will download the |
| + expected_image_url: file, GS, or HTTP url from which we will download the |
| expected image |
| expected_image_locator: a unique ID string under which we will store the |
| expected image within storage_root (probably including a checksum to |
| guarantee uniqueness) |
| - actual_image_url: file or HTTP url from which we will download the |
| + actual_image_url: file, GS, or HTTP url from which we will download the |
| actual image |
| actual_image_locator: a unique ID string under which we will store the |
| actual image within storage_root (probably including a checksum to |
| @@ -255,49 +324,96 @@ class ImageDiffDB(object): |
| expected_image_locator = _sanitize_locator(expected_image_locator) |
| actual_image_locator = _sanitize_locator(actual_image_locator) |
| key = (expected_image_locator, actual_image_locator) |
| - if not key in self._diff_dict: |
| - try: |
| - new_diff_record = DiffRecord( |
| - self._storage_root, |
| - expected_image_url=expected_image_url, |
| - expected_image_locator=expected_image_locator, |
| - actual_image_url=actual_image_url, |
| - actual_image_locator=actual_image_locator) |
| - except Exception: |
| - # If we can't create a real DiffRecord for this (expected, actual) pair, |
| - # store None and the UI will show whatever information we DO have. |
| - # Fixes http://skbug.com/2368 . |
| - logging.exception( |
| - 'got exception while creating a DiffRecord for ' |
| - 'expected_image_url=%s , actual_image_url=%s; returning None' % ( |
| - expected_image_url, actual_image_url)) |
| - new_diff_record = None |
| - self._diff_dict[key] = new_diff_record |
| + must_add_to_queue = False |
| + |
| + self._diff_dict_writelock.acquire() |
| + try: |
| + if not key in self._diff_dict: |
| + # If we have already requested a diff between these two images, |
| + # we don't need to request it again. |
| + must_add_to_queue = True |
| + self._diff_dict[key] = _DIFFRECORD_PENDING |
| + finally: |
| + self._diff_dict_writelock.release() |
| + |
| + if must_add_to_queue: |
| + self._tasks_queue.put((key, expected_image_url, actual_image_url)) |
| def get_diff_record(self, expected_image_locator, actual_image_locator): |
| """Returns the DiffRecord for this image pair. |
| - Raises a KeyError if we don't have a DiffRecord for this image pair. |
| + This call will block until the diff record is available, or we were unable |
| + to generate it. |
| + |
| + Args: |
| + expected_image_locator: a unique ID string under which we will store the |
| + expected image within storage_root (probably including a checksum to |
| + guarantee uniqueness) |
| + actual_image_locator: a unique ID string under which we will store the |
| + actual image within storage_root (probably including a checksum to |
| + guarantee uniqueness) |
| + |
| + Returns the DiffRecord for this image pair, or None if we were unable to |
| + generate one. |
| """ |
| key = (_sanitize_locator(expected_image_locator), |
| _sanitize_locator(actual_image_locator)) |
| - return self._diff_dict[key] |
| + diff_record = self._diff_dict[key] |
| + |
| + # If we have no results yet, block until we do. |
| + while diff_record == _DIFFRECORD_PENDING: |
| + time.sleep(1) |
| + diff_record = self._diff_dict[key] |
| + |
| + # Once we have the result... |
| + if diff_record == _DIFFRECORD_FAILED: |
| + logging.error( |
| + 'failed to create a DiffRecord for expected_image_locator=%s , ' |
| + 'actual_image_locator=%s' % ( |
| + expected_image_locator, actual_image_locator)) |
| + return None |
| + else: |
| + return diff_record |
| # Utility functions |
| -def _download_file(local_filepath, url): |
| +def _download_file(gs, local_filepath, url): |
| """Download a file from url to local_filepath, unless it is already there. |
| Args: |
| + gs: instance of GSUtils object, in case the url points at Google Storage |
| local_filepath: path on local disk where the image should be stored |
| - url: URL from which we can download the image if we don't have it yet |
| + url: HTTP or GS URL from which we can download the image if we don't have |
| + it yet |
| """ |
| + global global_file_collisions |
| if not os.path.exists(local_filepath): |
| _mkdir_unless_exists(os.path.dirname(local_filepath)) |
| - with contextlib.closing(urllib.urlopen(url)) as url_handle: |
| - with open(local_filepath, 'wb') as file_handle: |
| - shutil.copyfileobj(fsrc=url_handle, fdst=file_handle) |
| + |
| + # First download the file contents into a unique filename, and |
| + # then rename that file. That way, if multiple threads are downloading |
| + # the same filename at the same time, they won't interfere with each |
| + # other (they will both download the file, and one will "win" in the end) |
| + temp_filename = '%s-%d' % (local_filepath, |
| + threading.current_thread().ident) |
| + if gs_utils.GSUtils.is_gs_url(url): |
| + (bucket, path) = gs_utils.GSUtils.split_gs_url(url) |
| + gs.download_file(source_bucket=bucket, source_path=path, |
| + dest_path=temp_filename) |
| + else: |
| + with contextlib.closing(urllib.urlopen(url)) as url_handle: |
| + with open(temp_filename, 'wb') as file_handle: |
| + shutil.copyfileobj(fsrc=url_handle, fdst=file_handle) |
| + |
| + # Rename the file to its real filename. |
| + # Keep count of how many colliding downloads we encounter; |
| + # if it's a large number, we may want to change our download strategy |
| + # to minimize repeated downloads. |
| + if os.path.exists(local_filepath): |
| + global_file_collisions += 1 |
| + else: |
| + os.rename(temp_filename, local_filepath) |
| def _mkdir_unless_exists(path): |
| @@ -306,8 +422,11 @@ def _mkdir_unless_exists(path): |
| Args: |
| path: path on local disk |
| """ |
| - if not os.path.isdir(path): |
| + try: |
| os.makedirs(path) |
| + except OSError as e: |
| + if e.errno == errno.EEXIST: |
| + pass |
| def _sanitize_locator(locator): |