#!/usr/bin/env python # Copyright 2016 Google Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ################################################################################ from __future__ import print_function import logging import os import re import sys import zipfile logging.basicConfig(level=logging.INFO, format='INFO: %(message)s') CODEC_NAME_REGEXP = re.compile(r'codec_id_(.+?)_fuzzer') def get_fuzzer_tags(fuzzer_name): """Extract tags (are used to filter samples) from the given fuzzer name.""" tags = [] fuzzer_name = fuzzer_name.lower() # All subtitle samples are in 'sub' directory, need to add 'sub' tag manually. if 'subtitle' in fuzzer_name: tags.append('sub') m = CODEC_NAME_REGEXP.search(fuzzer_name) if m: codec_name = m.group(1) # Some names are complex, need to split them and filter common strings. codec_name_parts = codec_name.split('_') for codec in codec_name_parts: # Remove common strings from codec names like 'mpeg1video' or 'msvideo1'. codec = codec.split('video')[0] codec = codec.split('audio')[0] codec = codec.split('subtitle')[0] codec = codec.split('text')[0] if codec: # Some codec names have trailing characters: 'VP6F','FLV1', 'JPEGLS'. # Use only first 3 characters for long enough codec names. if len(codec) > 3: tags.append(codec[:3]) else: tags.append(codec) return tags def parse_corpus(corpus_directory): """Recursively list all files in the given directory and ignore checksums.""" all_corpus_files = [] for root, dirs, files in os.walk(corpus_directory): for filename in files: # Skip checksum files, they are useless in corpus. if 'md5sum' in filename: continue path = os.path.join(root, filename) all_corpus_files.append(path) logging.info('Parsed %d corpus files from %s' % (len(all_corpus_files), corpus_directory)) return all_corpus_files def parse_fuzzers(fuzzers_directory): """Recursively list all fuzzers in the given directory.""" all_fuzzers = [] for filename in os.listdir(fuzzers_directory): # Skip non-ffmpeg and non-fuzzer files in the given directory, if not filename.startswith('ffmpeg_') or not filename.endswith('_fuzzer'): continue fuzzer_path = os.path.join(fuzzers_directory, filename) all_fuzzers.append(fuzzer_path) logging.info('Parsed %d fuzzers from %s' % (len(all_fuzzers), fuzzers_directory)) return all_fuzzers def zip_relevant_corpus(corpus_files, fuzzers): """Find relevant corpus files and archive them for every fuzzer given.""" for fuzzer in fuzzers: fuzzer_name = os.path.basename(fuzzer) fuzzer_directory = os.path.dirname(fuzzer) fuzzer_tags = get_fuzzer_tags(fuzzer_name) relevant_corpus_files = set() for filename in corpus_files: # Remove 'ffmpeg' substring to do not use everything for 'MPEG' codec. sanitized_filename = filename.replace('ffmpeg', '').lower() for tag in fuzzer_tags: if tag in sanitized_filename: relevant_corpus_files.add(filename) if not relevant_corpus_files: # Strip last symbol from tags if we haven't found relevant corpus. # It helps for such codecs as 'RV40' ('RV4' -> 'RV') or 'PCX' (-> 'PC'). for tag in fuzzer_tags: if tag[:-1] in sanitized_filename: relevant_corpus_files.add(filename) logging.info( 'Found %d relevant samples for %s' % (len(relevant_corpus_files), fuzzer_name)) if not relevant_corpus_files: continue zip_archive_name = fuzzer + "_seed_corpus.zip" with zipfile.ZipFile(zip_archive_name, 'w') as archive: for filename in relevant_corpus_files: archive.write(filename) def main(): if len(sys.argv) < 3: print('Usage: %s ' % __file__) sys.exit(1) seed_corpus_directory = sys.argv[1] fuzzers_directory = sys.argv[2] corpus_files = parse_corpus(seed_corpus_directory) fuzzers = parse_fuzzers(fuzzers_directory) zip_relevant_corpus(corpus_files, fuzzers) if __name__ == '__main__': sys.exit(main())