diff options
author | commit-bot@chromium.org <commit-bot@chromium.org@2bbb7eff-a529-9590-31e7-b0007b416f81> | 2014-04-14 18:15:29 +0000 |
---|---|---|
committer | commit-bot@chromium.org <commit-bot@chromium.org@2bbb7eff-a529-9590-31e7-b0007b416f81> | 2014-04-14 18:15:29 +0000 |
commit | 280ea8296ff83663a427b8c9aa0ee98b7839f926 (patch) | |
tree | b3b2226cb3b8d1eeb979a42ba64f5a1bc7a076e0 | |
parent | 4431e7757cfcb8cfa99535eed0e9f156dabf95c2 (diff) |
rebaseline_server: multithreaded loading/diffing of images
BUG=skia:2414
NOTRY=True
R=rmistry@google.com
Author: epoger@google.com
Review URL: https://codereview.chromium.org/235923002
git-svn-id: http://skia.googlecode.com/svn/trunk@14184 2bbb7eff-a529-9590-31e7-b0007b416f81
-rwxr-xr-x | gm/rebaseline_server/compare_to_expectations.py | 2 | ||||
-rw-r--r-- | gm/rebaseline_server/imagediffdb.py | 171 | ||||
-rw-r--r-- | gm/rebaseline_server/imagepair.py | 49 | ||||
-rw-r--r-- | gm/rebaseline_server/imagepairset.py | 8 |
4 files changed, 190 insertions, 40 deletions
diff --git a/gm/rebaseline_server/compare_to_expectations.py b/gm/rebaseline_server/compare_to_expectations.py index 2389b61dad..ab16b36290 100755 --- a/gm/rebaseline_server/compare_to_expectations.py +++ b/gm/rebaseline_server/compare_to_expectations.py @@ -89,6 +89,8 @@ class ExpectationComparisons(results.BaseComparisons): self._expected_root = expected_root self._load_actual_and_expected() self._timestamp = int(time.time()) + logging.info('Number of download file collisions: %s' % + imagediffdb.global_file_collisions) logging.info('Results complete; took %d seconds.' % (self._timestamp - time_start)) diff --git a/gm/rebaseline_server/imagediffdb.py b/gm/rebaseline_server/imagediffdb.py index 3b1eb3ebc0..10fcc98f3b 100644 --- a/gm/rebaseline_server/imagediffdb.py +++ b/gm/rebaseline_server/imagediffdb.py @@ -11,12 +11,16 @@ Calulate differences between image pairs, and store them in a database. import contextlib import csv +import errno import logging +import Queue import os import re import shutil import sys import tempfile +import time +import threading import urllib try: from PIL import Image, ImageChops @@ -35,6 +39,7 @@ SKPDIFF_BINARY = find_run_binary.find_path_to_program('skpdiff') DEFAULT_IMAGE_SUFFIX = '.png' DEFAULT_IMAGES_SUBDIR = 'images' +DEFAULT_NUM_WORKERS = 8 DISALLOWED_FILEPATH_CHAR_REGEX = re.compile('[^\w\-]') @@ -51,6 +56,14 @@ KEY__DIFFERENCE_DATA__PERCENT_DIFF_PIXELS = 'percentDifferingPixels' KEY__DIFFERENCE_DATA__PERCEPTUAL_DIFF = 'perceptualDifference' KEY__DIFFERENCE_DATA__WEIGHTED_DIFF = 'weightedDiffMeasure' +# Special values within ImageDiffDB._diff_dict +DIFFRECORD_FAILED = 'failed' +DIFFRECORD_PENDING = 'pending' + +# TODO(epoger): Temporary(?) list to keep track of how many times we download +# the same file in multiple threads. +global_file_collisions = 0 + class DiffRecord(object): """ Record of differences between two images. """ @@ -64,9 +77,6 @@ class DiffRecord(object): """Download this pair of images (unless we already have them on local disk), and prepare a DiffRecord for them. - TODO(epoger): Make this asynchronously download images, rather than blocking - until the images have been downloaded and processed. - Args: storage_root: root directory on local disk within which we store all images @@ -219,30 +229,59 @@ class ImageDiffDB(object): """ Calculates differences between image pairs, maintaining a database of them for download.""" - def __init__(self, storage_root): + def __init__(self, storage_root, num_workers=DEFAULT_NUM_WORKERS): """ Args: storage_root: string; root path within the DB will store all of its stuff + num_workers: integer; number of worker threads to spawn """ self._storage_root = storage_root # Dictionary of DiffRecords, keyed by (expected_image_locator, # actual_image_locator) tuples. + # Values can also be DIFFRECORD_PENDING, DIFFRECORD_FAILED. self._diff_dict = {} + # Set up the queue for asynchronously loading DiffRecords, and start the + # worker threads reading from it. + self._tasks_queue = Queue.Queue(maxsize=2*num_workers) + self._workers = [] + for i in range(num_workers): + worker = threading.Thread(target=self.worker, args=(i,)) + worker.daemon = True + worker.start() + self._workers.append(worker) + + def worker(self, worker_num): + """Launch a worker thread that pulls tasks off self._tasks_queue. + + Args: + worker_num: (integer) which worker this is + """ + while True: + params = self._tasks_queue.get() + key, expected_image_url, actual_image_url = params + try: + diff_record = DiffRecord( + self._storage_root, + expected_image_url=expected_image_url, + expected_image_locator=key[0], + actual_image_url=actual_image_url, + actual_image_locator=key[1]) + except Exception: + logging.exception( + 'exception while creating DiffRecord for key %s' % str(key)) + diff_record = DIFFRECORD_FAILED + self._diff_dict[key] = diff_record + def add_image_pair(self, expected_image_url, expected_image_locator, actual_image_url, actual_image_locator): """Download this pair of images (unless we already have them on local disk), and prepare a DiffRecord for them. - TODO(epoger): Make this asynchronously download images, rather than blocking - until the images have been downloaded and processed. - When we do that, we should probably add a new method that will block - until all of the images have been downloaded and processed. Otherwise, - we won't know when it's safe to start calling get_diff_record(). - jcgregorio notes: maybe just make ImageDiffDB thread-safe and create a - thread-pool/worker queue at a higher level that just uses ImageDiffDB? + This method will block until the images are downloaded and DiffRecord is + available by calling get_diff_record(). Args: expected_image_url: file or HTTP url from which we will download the @@ -255,10 +294,11 @@ class ImageDiffDB(object): actual_image_locator: a unique ID string under which we will store the actual image within storage_root (probably including a checksum to guarantee uniqueness) + + Raises: + Exception if we are unable to create a DiffRecord for this image pair. """ - expected_image_locator = _sanitize_locator(expected_image_locator) - actual_image_locator = _sanitize_locator(actual_image_locator) - key = (expected_image_locator, actual_image_locator) + key = _generate_key(expected_image_locator, actual_image_locator) if not key in self._diff_dict: try: new_diff_record = DiffRecord( @@ -278,14 +318,70 @@ class ImageDiffDB(object): new_diff_record = None self._diff_dict[key] = new_diff_record + def add_image_pair_async(self, + expected_image_url, expected_image_locator, + actual_image_url, actual_image_locator): + """Download this pair of images (unless we already have them on local disk), + and prepare a DiffRecord for them. + + This method will return quickly; calls to get_diff_record() will block + until the DiffRecord is available (or we have given up on creating it). + + Args: + expected_image_url: file or HTTP url from which we will download the + expected image + expected_image_locator: a unique ID string under which we will store the + expected image within storage_root (probably including a checksum to + guarantee uniqueness) + actual_image_url: file or HTTP url from which we will download the + actual image + actual_image_locator: a unique ID string under which we will store the + actual image within storage_root (probably including a checksum to + guarantee uniqueness) + """ + key = _generate_key(expected_image_locator, actual_image_locator) + if not key in self._diff_dict: + # If we have already requested a diff between these two images, + # we don't need to request it again. + # + # Threading note: If multiple threads called into this method with the + # same key at the same time, there will be multiple tasks on the queue + # with the same key. But that's OK; they will both complete successfully, + # and just waste a little time in the process. Nothing will break. + self._diff_dict[key] = DIFFRECORD_PENDING + self._tasks_queue.put((key, expected_image_url, actual_image_url)) + def get_diff_record(self, expected_image_locator, actual_image_locator): """Returns the DiffRecord for this image pair. - Raises a KeyError if we don't have a DiffRecord for this image pair. + Args: + expected_image_locator: a unique ID string under which we will store the + expected image within storage_root (probably including a checksum to + guarantee uniqueness) + actual_image_locator: a unique ID string under which we will store the + actual image within storage_root (probably including a checksum to + guarantee uniqueness) + + Returns the DiffRecord for this image pair, or None if we were unable to + generate one. """ - key = (_sanitize_locator(expected_image_locator), - _sanitize_locator(actual_image_locator)) - return self._diff_dict[key] + key = _generate_key(expected_image_locator, actual_image_locator) + diff_record = self._diff_dict[key] + + # If we have no results yet, block until we do. + while diff_record == DIFFRECORD_PENDING: + time.sleep(1) + diff_record = self._diff_dict[key] + + # Once we have the result... + if diff_record == DIFFRECORD_FAILED: + logging.error( + 'failed to create a DiffRecord for expected_image_locator=%s , ' + 'actual_image_locator=%s' % ( + expected_image_locator, actual_image_locator)) + return None + else: + return diff_record # Utility functions @@ -374,11 +470,28 @@ def _download_and_open_image(local_filepath, url): Returns: a PIL image object """ + global global_file_collisions if not os.path.exists(local_filepath): _mkdir_unless_exists(os.path.dirname(local_filepath)) with contextlib.closing(urllib.urlopen(url)) as url_handle: - with open(local_filepath, 'wb') as file_handle: + + # First download the file contents into a unique filename, and + # then rename that file. That way, if multiple threads are downloading + # the same filename at the same time, they won't interfere with each + # other (they will both download the file, and one will "win" in the end) + temp_filename = '%s-%d' % (local_filepath, + threading.current_thread().ident) + with open(temp_filename, 'wb') as file_handle: shutil.copyfileobj(fsrc=url_handle, fdst=file_handle) + + # Keep count of how many colliding downloads we encounter; + # if it's a large number, we may want to change our download strategy + # to minimize repeated downloads. + if os.path.exists(local_filepath): + global_file_collisions += 1 + else: + os.rename(temp_filename, local_filepath) + return _open_image(local_filepath) @@ -419,8 +532,11 @@ def _mkdir_unless_exists(path): Args: path: path on local disk """ - if not os.path.isdir(path): + try: os.makedirs(path) + except OSError as e: + if e.errno == errno.EEXIST: + pass def _sanitize_locator(locator): @@ -433,6 +549,21 @@ def _sanitize_locator(locator): return DISALLOWED_FILEPATH_CHAR_REGEX.sub('_', str(locator)) +def _generate_key(expected_image_locator, actual_image_locator): + """Returns a key suitable for looking up this image pair. + + Args: + expected_image_locator: a unique ID string under which we will store the + expected image within storage_root (probably including a checksum to + guarantee uniqueness) + actual_image_locator: a unique ID string under which we will store the + actual image within storage_root (probably including a checksum to + guarantee uniqueness) + """ + return (_sanitize_locator(expected_image_locator), + _sanitize_locator(actual_image_locator)) + + def _get_difference_locator(expected_image_locator, actual_image_locator): """Returns the locator string used to look up the diffs between expected_image and actual_image. diff --git a/gm/rebaseline_server/imagepair.py b/gm/rebaseline_server/imagepair.py index 33385ab522..a89066fca3 100644 --- a/gm/rebaseline_server/imagepair.py +++ b/gm/rebaseline_server/imagepair.py @@ -48,27 +48,43 @@ class ImagePair(object): self.extra_columns_dict = extra_columns if not imageA_relative_url or not imageB_relative_url: self._is_different = True - self.diff_record = None + self._diff_record = None + self._diff_record_set = True elif imageA_relative_url == imageB_relative_url: self._is_different = False - self.diff_record = None + self._diff_record = None + self._diff_record_set = True else: - # TODO(epoger): Rather than blocking until image_diff_db can read in - # the image pair and generate diffs, it would be better to do it - # asynchronously: tell image_diff_db to download a bunch of file pairs, - # and only block later if we're still waiting for diff_records to come - # back. - self._is_different = True - image_diff_db.add_image_pair( + # Tell image_diff_db to add this ImagePair. + # It will do so in a separate thread so as not to block this one; + # when you call self.get_diff_record(), it will block until the results + # are ready. + image_diff_db.add_image_pair_async( expected_image_locator=imageA_relative_url, expected_image_url=posixpath.join(base_url, imageA_relative_url), actual_image_locator=imageB_relative_url, actual_image_url=posixpath.join(base_url, imageB_relative_url)) - self.diff_record = image_diff_db.get_diff_record( - expected_image_locator=imageA_relative_url, - actual_image_locator=imageB_relative_url) - if self.diff_record and self.diff_record.get_num_pixels_differing() == 0: + self._image_diff_db = image_diff_db + self._diff_record_set = False + + def get_diff_record(self): + """Returns the DiffRecord associated with this ImagePair. + + Returns None if the images are identical, or one is missing. + This method will block until the DiffRecord is available. + """ + if not self._diff_record_set: + self._diff_record = self._image_diff_db.get_diff_record( + expected_image_locator=self.imageA_relative_url, + actual_image_locator=self.imageB_relative_url) + self._image_diff_db = None # release reference, no longer needed + if (self._diff_record and + self._diff_record.get_num_pixels_differing() == 0): self._is_different = False + else: + self._is_different = True + self._diff_record_set = True + return self._diff_record def as_dict(self): """Returns a dictionary describing this ImagePair. @@ -79,11 +95,12 @@ class ImagePair(object): KEY__IMAGE_A_URL: self.imageA_relative_url, KEY__IMAGE_B_URL: self.imageB_relative_url, } - asdict[KEY__IS_DIFFERENT] = self._is_different if self.expectations_dict: asdict[KEY__EXPECTATIONS_DATA] = self.expectations_dict if self.extra_columns_dict: asdict[KEY__EXTRA_COLUMN_VALUES] = self.extra_columns_dict - if self.diff_record and (self.diff_record.get_num_pixels_differing() > 0): - asdict[KEY__DIFFERENCE_DATA] = self.diff_record.as_dict() + diff_record = self.get_diff_record() + if diff_record and (diff_record.get_num_pixels_differing() > 0): + asdict[KEY__DIFFERENCE_DATA] = diff_record.as_dict() + asdict[KEY__IS_DIFFERENT] = self._is_different return asdict diff --git a/gm/rebaseline_server/imagepairset.py b/gm/rebaseline_server/imagepairset.py index 04aea90342..ae02d8a811 100644 --- a/gm/rebaseline_server/imagepairset.py +++ b/gm/rebaseline_server/imagepairset.py @@ -50,20 +50,20 @@ class ImagePairSet(object): self._descriptions = descriptions or DEFAULT_DESCRIPTIONS self._extra_column_tallies = {} # maps column_id -> values # -> instances_per_value - self._image_pair_dicts = [] + self._image_pairs = [] self._image_base_url = None self._diff_base_url = diff_base_url def add_image_pair(self, image_pair): """Adds an ImagePair; this may be repeated any number of times.""" # Special handling when we add the first ImagePair... - if not self._image_pair_dicts: + if not self._image_pairs: self._image_base_url = image_pair.base_url if image_pair.base_url != self._image_base_url: raise Exception('added ImagePair with base_url "%s" instead of "%s"' % ( image_pair.base_url, self._image_base_url)) - self._image_pair_dicts.append(image_pair.as_dict()) + self._image_pairs.append(image_pair) extra_columns_dict = image_pair.extra_columns_dict if extra_columns_dict: for column_id, value in extra_columns_dict.iteritems(): @@ -142,7 +142,7 @@ class ImagePairSet(object): key_base_url = KEY__IMAGESETS__FIELD__BASE_URL return { KEY__EXTRACOLUMNHEADERS: self._column_headers_as_dict(), - KEY__IMAGEPAIRS: self._image_pair_dicts, + KEY__IMAGEPAIRS: [pair.as_dict() for pair in self._image_pairs], KEY__IMAGESETS: { KEY__IMAGESETS__SET__IMAGE_A: { key_description: self._descriptions[0], |