diff options
Diffstat (limited to 'gm/rebaseline_server/imagediffdb.py')
-rw-r--r-- | gm/rebaseline_server/imagediffdb.py | 220 |
1 files changed, 170 insertions, 50 deletions
diff --git a/gm/rebaseline_server/imagediffdb.py b/gm/rebaseline_server/imagediffdb.py index 89f9fef319..fbe7121140 100644 --- a/gm/rebaseline_server/imagediffdb.py +++ b/gm/rebaseline_server/imagediffdb.py @@ -11,12 +11,16 @@ Calulate differences between image pairs, and store them in a database. # System-level imports import contextlib +import errno import json import logging import os +import Queue import re import shutil import tempfile +import threading +import time import urllib # Must fix up PYTHONPATH before importing from within Skia @@ -24,11 +28,16 @@ import fix_pythonpath # pylint: disable=W0611 # Imports from within Skia import find_run_binary +from py.utils import gs_utils + SKPDIFF_BINARY = find_run_binary.find_path_to_program('skpdiff') DEFAULT_IMAGE_SUFFIX = '.png' DEFAULT_IMAGES_SUBDIR = 'images' +# TODO(epoger): Figure out a better default number of threads; for now, +# using a conservative default value. +DEFAULT_NUM_WORKER_THREADS = 1 DISALLOWED_FILEPATH_CHAR_REGEX = re.compile('[^\w\-]') @@ -42,11 +51,20 @@ KEY__DIFFERENCES__NUM_DIFF_PIXELS = 'numDifferingPixels' KEY__DIFFERENCES__PERCENT_DIFF_PIXELS = 'percentDifferingPixels' KEY__DIFFERENCES__PERCEPTUAL_DIFF = 'perceptualDifference' +# Special values within ImageDiffDB._diff_dict +_DIFFRECORD_FAILED = 'failed' +_DIFFRECORD_PENDING = 'pending' + +# Temporary variable to keep track of how many times we download +# the same file in multiple threads. +# TODO(epoger): Delete this, once we see that the number stays close to 0. +global_file_collisions = 0 + class DiffRecord(object): """ Record of differences between two images. """ - def __init__(self, storage_root, + def __init__(self, gs, storage_root, expected_image_url, expected_image_locator, actual_image_url, actual_image_locator, expected_images_subdir=DEFAULT_IMAGES_SUBDIR, @@ -55,18 +73,16 @@ class DiffRecord(object): """Download this pair of images (unless we already have them on local disk), and prepare a DiffRecord for them. - TODO(epoger): Make this asynchronously download images, rather than blocking - until the images have been downloaded and processed. - Args: + gs: instance of GSUtils object we can use to download images storage_root: root directory on local disk within which we store all images - expected_image_url: file or HTTP url from which we will download the + expected_image_url: file, GS, or HTTP url from which we will download the expected image expected_image_locator: a unique ID string under which we will store the expected image within storage_root (probably including a checksum to guarantee uniqueness) - actual_image_url: file or HTTP url from which we will download the + actual_image_url: file, GS, or HTTP url from which we will download the actual image actual_image_locator: a unique ID string under which we will store the actual image within storage_root (probably including a checksum to @@ -79,8 +95,6 @@ class DiffRecord(object): actual_image_locator = _sanitize_locator(actual_image_locator) # Download the expected/actual images, if we don't have them already. - # TODO(rmistry): Add a parameter that just tries to use already-present - # image files rather than downloading them. expected_image_file = os.path.join( storage_root, expected_images_subdir, str(expected_image_locator) + image_suffix) @@ -88,13 +102,13 @@ class DiffRecord(object): storage_root, actual_images_subdir, str(actual_image_locator) + image_suffix) try: - _download_file(expected_image_file, expected_image_url) + _download_file(gs, expected_image_file, expected_image_url) except Exception: logging.exception('unable to download expected_image_url %s to file %s' % (expected_image_url, expected_image_file)) raise try: - _download_file(actual_image_file, actual_image_url) + _download_file(gs, actual_image_file, actual_image_url) except Exception: logging.exception('unable to download actual_image_url %s to file %s' % (actual_image_url, actual_image_file)) @@ -112,8 +126,12 @@ class DiffRecord(object): actual_img = os.path.join(storage_root, actual_images_subdir, str(actual_image_locator) + image_suffix) - # TODO: Call skpdiff ONCE for all image pairs, instead of calling it - # repeatedly. This will allow us to parallelize a lot more work. + # TODO(epoger): Consider calling skpdiff ONCE for all image pairs, + # instead of calling it separately for each image pair. + # Pro: we'll incur less overhead from making repeated system calls, + # spinning up the skpdiff binary, etc. + # Con: we would have to wait until all image pairs were loaded before + # generating any of the diffs? find_run_binary.run_command( [SKPDIFF_BINARY, '-p', expected_img, actual_img, '--jsonp', 'false', @@ -211,16 +229,71 @@ class ImageDiffDB(object): """ Calculates differences between image pairs, maintaining a database of them for download.""" - def __init__(self, storage_root): + def __init__(self, storage_root, gs=None, + num_worker_threads=DEFAULT_NUM_WORKER_THREADS): """ Args: storage_root: string; root path within the DB will store all of its stuff + gs: instance of GSUtils object we can use to download images + num_worker_threads: how many threads that download images and + generate diffs simultaneously """ self._storage_root = storage_root + self._gs = gs # Dictionary of DiffRecords, keyed by (expected_image_locator, # actual_image_locator) tuples. + # Values can also be _DIFFRECORD_PENDING, _DIFFRECORD_FAILED. + # + # Any thread that modifies _diff_dict must first acquire + # _diff_dict_writelock! + # + # TODO(epoger): Disk is limitless, but RAM is not... so, we should probably + # remove items from self._diff_dict if they haven't been accessed for a + # long time. We can always regenerate them by diffing the images we + # previously downloaded to local disk. + # I guess we should figure out how expensive it is to download vs diff the + # image pairs... if diffing them is expensive too, we can write these + # _diff_dict objects out to disk if there's too many to hold in RAM. + # Or we could use virtual memory to handle that automatically. self._diff_dict = {} + self._diff_dict_writelock = threading.RLock() + + # Set up the queue for asynchronously loading DiffRecords, and start the + # worker threads reading from it. + self._tasks_queue = Queue.Queue(maxsize=2*num_worker_threads) + self._workers = [] + for i in range(num_worker_threads): + worker = threading.Thread(target=self.worker, args=(i,)) + worker.daemon = True + worker.start() + self._workers.append(worker) + + def worker(self, worker_num): + """Launch a worker thread that pulls tasks off self._tasks_queue. + + Args: + worker_num: (integer) which worker this is + """ + while True: + params = self._tasks_queue.get() + key, expected_image_url, actual_image_url = params + try: + diff_record = DiffRecord( + self._gs, self._storage_root, + expected_image_url=expected_image_url, + expected_image_locator=key[0], + actual_image_url=actual_image_url, + actual_image_locator=key[1]) + except Exception: + logging.exception( + 'exception while creating DiffRecord for key %s' % str(key)) + diff_record = _DIFFRECORD_FAILED + self._diff_dict_writelock.acquire() + try: + self._diff_dict[key] = diff_record + finally: + self._diff_dict_writelock.release() @property def storage_root(self): @@ -229,24 +302,21 @@ class ImageDiffDB(object): def add_image_pair(self, expected_image_url, expected_image_locator, actual_image_url, actual_image_locator): - """Download this pair of images (unless we already have them on local disk), - and prepare a DiffRecord for them. + """Asynchronously prepare a DiffRecord for a pair of images. + + This method will return quickly; calls to get_diff_record() will block + until the DiffRecord is available (or we have given up on creating it). - TODO(epoger): Make this asynchronously download images, rather than blocking - until the images have been downloaded and processed. - When we do that, we should probably add a new method that will block - until all of the images have been downloaded and processed. Otherwise, - we won't know when it's safe to start calling get_diff_record(). - jcgregorio notes: maybe just make ImageDiffDB thread-safe and create a - thread-pool/worker queue at a higher level that just uses ImageDiffDB? + If we already have a DiffRecord for this particular image pair, no work + will be done. Args: - expected_image_url: file or HTTP url from which we will download the + expected_image_url: file, GS, or HTTP url from which we will download the expected image expected_image_locator: a unique ID string under which we will store the expected image within storage_root (probably including a checksum to guarantee uniqueness) - actual_image_url: file or HTTP url from which we will download the + actual_image_url: file, GS, or HTTP url from which we will download the actual image actual_image_locator: a unique ID string under which we will store the actual image within storage_root (probably including a checksum to @@ -255,49 +325,96 @@ class ImageDiffDB(object): expected_image_locator = _sanitize_locator(expected_image_locator) actual_image_locator = _sanitize_locator(actual_image_locator) key = (expected_image_locator, actual_image_locator) - if not key in self._diff_dict: - try: - new_diff_record = DiffRecord( - self._storage_root, - expected_image_url=expected_image_url, - expected_image_locator=expected_image_locator, - actual_image_url=actual_image_url, - actual_image_locator=actual_image_locator) - except Exception: - # If we can't create a real DiffRecord for this (expected, actual) pair, - # store None and the UI will show whatever information we DO have. - # Fixes http://skbug.com/2368 . - logging.exception( - 'got exception while creating a DiffRecord for ' - 'expected_image_url=%s , actual_image_url=%s; returning None' % ( - expected_image_url, actual_image_url)) - new_diff_record = None - self._diff_dict[key] = new_diff_record + must_add_to_queue = False + + self._diff_dict_writelock.acquire() + try: + if not key in self._diff_dict: + # If we have already requested a diff between these two images, + # we don't need to request it again. + must_add_to_queue = True + self._diff_dict[key] = _DIFFRECORD_PENDING + finally: + self._diff_dict_writelock.release() + + if must_add_to_queue: + self._tasks_queue.put((key, expected_image_url, actual_image_url)) def get_diff_record(self, expected_image_locator, actual_image_locator): """Returns the DiffRecord for this image pair. - Raises a KeyError if we don't have a DiffRecord for this image pair. + This call will block until the diff record is available, or we were unable + to generate it. + + Args: + expected_image_locator: a unique ID string under which we will store the + expected image within storage_root (probably including a checksum to + guarantee uniqueness) + actual_image_locator: a unique ID string under which we will store the + actual image within storage_root (probably including a checksum to + guarantee uniqueness) + + Returns the DiffRecord for this image pair, or None if we were unable to + generate one. """ key = (_sanitize_locator(expected_image_locator), _sanitize_locator(actual_image_locator)) - return self._diff_dict[key] + diff_record = self._diff_dict[key] + + # If we have no results yet, block until we do. + while diff_record == _DIFFRECORD_PENDING: + time.sleep(1) + diff_record = self._diff_dict[key] + + # Once we have the result... + if diff_record == _DIFFRECORD_FAILED: + logging.error( + 'failed to create a DiffRecord for expected_image_locator=%s , ' + 'actual_image_locator=%s' % ( + expected_image_locator, actual_image_locator)) + return None + else: + return diff_record # Utility functions -def _download_file(local_filepath, url): +def _download_file(gs, local_filepath, url): """Download a file from url to local_filepath, unless it is already there. Args: + gs: instance of GSUtils object, in case the url points at Google Storage local_filepath: path on local disk where the image should be stored - url: URL from which we can download the image if we don't have it yet + url: HTTP or GS URL from which we can download the image if we don't have + it yet """ + global global_file_collisions if not os.path.exists(local_filepath): _mkdir_unless_exists(os.path.dirname(local_filepath)) - with contextlib.closing(urllib.urlopen(url)) as url_handle: - with open(local_filepath, 'wb') as file_handle: - shutil.copyfileobj(fsrc=url_handle, fdst=file_handle) + + # First download the file contents into a unique filename, and + # then rename that file. That way, if multiple threads are downloading + # the same filename at the same time, they won't interfere with each + # other (they will both download the file, and one will "win" in the end) + temp_filename = '%s-%d' % (local_filepath, + threading.current_thread().ident) + if gs_utils.GSUtils.is_gs_url(url): + (bucket, path) = gs_utils.GSUtils.split_gs_url(url) + gs.download_file(source_bucket=bucket, source_path=path, + dest_path=temp_filename) + else: + with contextlib.closing(urllib.urlopen(url)) as url_handle: + with open(temp_filename, 'wb') as file_handle: + shutil.copyfileobj(fsrc=url_handle, fdst=file_handle) + + # Rename the file to its real filename. + # Keep count of how many colliding downloads we encounter; + # if it's a large number, we may want to change our download strategy + # to minimize repeated downloads. + if os.path.exists(local_filepath): + global_file_collisions += 1 + else: + os.rename(temp_filename, local_filepath) def _mkdir_unless_exists(path): @@ -306,8 +423,11 @@ def _mkdir_unless_exists(path): Args: path: path on local disk """ - if not os.path.isdir(path): + try: os.makedirs(path) + except OSError as e: + if e.errno == errno.EEXIST: + pass def _sanitize_locator(locator): |