From 805174eda2356df1b01752c8bc57019e696e0a75 Mon Sep 17 00:00:00 2001 From: Yilun Chong Date: Tue, 10 Apr 2018 13:26:17 -0700 Subject: Add script for run and upload the benchmark result to bq --- benchmarks/util/__init__.py | 0 benchmarks/util/big_query_utils.py | 188 ++++++++++++++++++++++++ benchmarks/util/run_and_upload.py | 290 +++++++++++++++++++++++++++++++++++++ 3 files changed, 478 insertions(+) create mode 100644 benchmarks/util/__init__.py create mode 100755 benchmarks/util/big_query_utils.py create mode 100755 benchmarks/util/run_and_upload.py (limited to 'benchmarks/util') diff --git a/benchmarks/util/__init__.py b/benchmarks/util/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/benchmarks/util/big_query_utils.py b/benchmarks/util/big_query_utils.py new file mode 100755 index 00000000..14105aa6 --- /dev/null +++ b/benchmarks/util/big_query_utils.py @@ -0,0 +1,188 @@ +#!/usr/bin/env python2.7 + +import argparse +import json +import uuid +import httplib2 + +from apiclient import discovery +from apiclient.errors import HttpError +from oauth2client.client import GoogleCredentials + +# 30 days in milliseconds +_EXPIRATION_MS = 30 * 24 * 60 * 60 * 1000 +NUM_RETRIES = 3 + + +def create_big_query(): + """Authenticates with cloud platform and gets a BiqQuery service object + """ + creds = GoogleCredentials.get_application_default() + return discovery.build( + 'bigquery', 'v2', credentials=creds, cache_discovery=False) + + +def create_dataset(biq_query, project_id, dataset_id): + is_success = True + body = { + 'datasetReference': { + 'projectId': project_id, + 'datasetId': dataset_id + } + } + + try: + dataset_req = biq_query.datasets().insert( + projectId=project_id, body=body) + dataset_req.execute(num_retries=NUM_RETRIES) + except HttpError as http_error: + if http_error.resp.status == 409: + print 'Warning: The dataset %s already exists' % dataset_id + else: + # Note: For more debugging info, print "http_error.content" + print 'Error in creating dataset: %s. Err: %s' % (dataset_id, + http_error) + is_success = False + return is_success + + +def create_table(big_query, project_id, dataset_id, table_id, table_schema, + description): + fields = [{ + 'name': field_name, + 'type': field_type, + 'description': field_description + } for (field_name, field_type, field_description) in table_schema] + return create_table2(big_query, project_id, dataset_id, table_id, fields, + description) + + +def create_partitioned_table(big_query, + project_id, + dataset_id, + table_id, + table_schema, + description, + partition_type='DAY', + expiration_ms=_EXPIRATION_MS): + """Creates a partitioned table. By default, a date-paritioned table is created with + each partition lasting 30 days after it was last modified. + """ + fields = [{ + 'name': field_name, + 'type': field_type, + 'description': field_description + } for (field_name, field_type, field_description) in table_schema] + return create_table2(big_query, project_id, dataset_id, table_id, fields, + description, partition_type, expiration_ms) + + +def create_table2(big_query, + project_id, + dataset_id, + table_id, + fields_schema, + description, + partition_type=None, + expiration_ms=None): + is_success = True + + body = { + 'description': description, + 'schema': { + 'fields': fields_schema + }, + 'tableReference': { + 'datasetId': dataset_id, + 'projectId': project_id, + 'tableId': table_id + } + } + + if partition_type and expiration_ms: + body["timePartitioning"] = { + "type": partition_type, + "expirationMs": expiration_ms + } + + try: + table_req = big_query.tables().insert( + projectId=project_id, datasetId=dataset_id, body=body) + res = table_req.execute(num_retries=NUM_RETRIES) + print 'Successfully created %s "%s"' % (res['kind'], res['id']) + except HttpError as http_error: + if http_error.resp.status == 409: + print 'Warning: Table %s already exists' % table_id + else: + print 'Error in creating table: %s. Err: %s' % (table_id, + http_error) + is_success = False + return is_success + + +def patch_table(big_query, project_id, dataset_id, table_id, fields_schema): + is_success = True + + body = { + 'schema': { + 'fields': fields_schema + }, + 'tableReference': { + 'datasetId': dataset_id, + 'projectId': project_id, + 'tableId': table_id + } + } + + try: + table_req = big_query.tables().patch( + projectId=project_id, + datasetId=dataset_id, + tableId=table_id, + body=body) + res = table_req.execute(num_retries=NUM_RETRIES) + print 'Successfully patched %s "%s"' % (res['kind'], res['id']) + except HttpError as http_error: + print 'Error in creating table: %s. Err: %s' % (table_id, http_error) + is_success = False + return is_success + + +def insert_rows(big_query, project_id, dataset_id, table_id, rows_list): + is_success = True + body = {'rows': rows_list} + try: + insert_req = big_query.tabledata().insertAll( + projectId=project_id, + datasetId=dataset_id, + tableId=table_id, + body=body) + res = insert_req.execute(num_retries=NUM_RETRIES) + if res.get('insertErrors', None): + print 'Error inserting rows! Response: %s' % res + is_success = False + except HttpError as http_error: + print 'Error inserting rows to the table %s' % table_id + is_success = False + + return is_success + + +def sync_query_job(big_query, project_id, query, timeout=5000): + query_data = {'query': query, 'timeoutMs': timeout} + query_job = None + try: + query_job = big_query.jobs().query( + projectId=project_id, + body=query_data).execute(num_retries=NUM_RETRIES) + except HttpError as http_error: + print 'Query execute job failed with error: %s' % http_error + print http_error.content + return query_job + + + # List of (column name, column type, description) tuples +def make_row(unique_row_id, row_values_dict): + """row_values_dict is a dictionary of column name and column value. + """ + return {'insertId': unique_row_id, 'json': row_values_dict} diff --git a/benchmarks/util/run_and_upload.py b/benchmarks/util/run_and_upload.py new file mode 100755 index 00000000..ae22a668 --- /dev/null +++ b/benchmarks/util/run_and_upload.py @@ -0,0 +1,290 @@ +import argparse +import os +import re +import copy +import uuid +import calendar +import time +import big_query_utils +import datetime +import json +# This import depends on the automake rule protoc_middleman, please make sure +# protoc_middleman has been built before run this file. +import os.path, sys +sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), os.pardir)) +import tmp.benchmarks_pb2 as benchmarks_pb2 +from click.types import STRING + +_PROJECT_ID = 'grpc-testing' +_DATASET = 'protobuf_benchmark_result' +_TABLE = 'opensource_result_v1' +_NOW = "%d%02d%02d" % (datetime.datetime.now().year, + datetime.datetime.now().month, + datetime.datetime.now().day) + +file_size_map = {} + +def get_data_size(file_name): + if file_name in file_size_map: + return file_size_map[file_name] + benchmark_dataset = benchmarks_pb2.BenchmarkDataset() + benchmark_dataset.ParseFromString( + open(os.path.dirname(os.path.abspath(__file__)) + "/../" + file_name).read()) + size = 0 + count = 0 + for payload in benchmark_dataset.payload: + size += len(payload) + count += 1 + file_size_map[file_name] = (size, 1.0 * size / count) + return size, 1.0 * size / count + + +def extract_file_name(file_name): + name_list = re.split("[/\.]", file_name) + short_file_name = "" + for name in name_list: + if name[:14] == "google_message": + short_file_name = name + return short_file_name + + +cpp_result = [] +python_result = [] +java_result = [] +go_result = [] + + +# CPP results example: +# [ +# "benchmarks": [ +# { +# "bytes_per_second": int, +# "cpu_time": int, +# "name: string, +# "time_unit: string, +# ... +# }, +# ... +# ], +# ... +# ] +def parse_cpp_result(filename): + global cpp_result + if filename == "": + return + if filename[0] != '/': + filename = os.path.dirname(os.path.abspath(__file__)) + '/' + filename + with open(filename) as f: + results = json.loads(f.read()) + for benchmark in results["benchmarks"]: + data_filename = "".join( + re.split("(_parse_|_serialize)", benchmark["name"])[0]) + behavior = benchmark["name"][len(data_filename) + 1:] + cpp_result.append({ + "language": "cpp", + "dataFileName": data_filename, + "behavior": behavior, + "throughput": benchmark["bytes_per_second"] / 2.0 ** 20 + }) + + +# Python results example: +# [ +# [ +# { +# "filename": string, +# "benchmarks": { +# behavior: results, +# ... +# }, +# "message_name": STRING +# }, +# ... +# ], #pure-python +# ... +# ] +def parse_python_result(filename): + global python_result + if filename == "": + return + if filename[0] != '/': + filename = os.path.dirname(os.path.abspath(__file__)) + '/' + filename + with open(filename) as f: + results_list = json.loads(f.read()) + for results in results_list: + for result in results: + _, avg_size = get_data_size(result["filename"]) + for behavior in result["benchmarks"]: + python_result.append({ + "language": "python", + "dataFileName": extract_file_name(result["filename"]), + "behavior": behavior, + "throughput": avg_size / + result["benchmarks"][behavior] * 1e9 / 2 ** 20 + }) + + +# Java results example: +# [ +# { +# "id": string, +# "instrumentSpec": {...}, +# "measurements": [ +# { +# "weight": float, +# "value": { +# "magnitude": float, +# "unit": string +# }, +# ... +# }, +# ... +# ], +# "run": {...}, +# "scenario": { +# "benchmarkSpec": { +# "methodName": string, +# "parameters": { +# defined parameters in the benchmark: parameters value +# }, +# ... +# }, +# ... +# } +# +# }, +# ... +# ] +def parse_java_result(filename): + global average_bytes_per_message, java_result + if filename == "": + return + if filename[0] != '/': + filename = os.path.dirname(os.path.abspath(__file__)) + '/' + filename + with open(filename) as f: + results = json.loads(f.read()) + for result in results: + total_weight = 0 + total_value = 0 + for measurement in result["measurements"]: + total_weight += measurement["weight"] + total_value += measurement["value"]["magnitude"] + avg_time = total_value * 1.0 / total_weight + total_size, _ = get_data_size( + result["scenario"]["benchmarkSpec"]["parameters"]["dataFile"]) + java_result.append({ + "language": "java", + "throughput": total_size / avg_time * 1e9 / 2 ** 20, + "behavior": result["scenario"]["benchmarkSpec"]["methodName"], + "dataFileName": extract_file_name( + result["scenario"]["benchmarkSpec"]["parameters"]["dataFile"]) + }) + + +# Go benchmark results: +# +# goos: linux +# goarch: amd64 +# Benchmark/.././datasets/google_message2/dataset.google_message2.pb/Unmarshal-12 3000 705784 ns/op +# Benchmark/.././datasets/google_message2/dataset.google_message2.pb/Marshal-12 2000 634648 ns/op +# Benchmark/.././datasets/google_message2/dataset.google_message2.pb/Size-12 5000 244174 ns/op +# Benchmark/.././datasets/google_message2/dataset.google_message2.pb/Clone-12 300 4120954 ns/op +# Benchmark/.././datasets/google_message2/dataset.google_message2.pb/Merge-12 300 4108632 ns/op +# PASS +# ok _/usr/local/google/home/yilunchong/mygit/protobuf/benchmarks 124.173s +def parse_go_result(filename): + global go_result + if filename == "": + return + if filename[0] != '/': + filename = os.path.dirname(os.path.abspath(__file__)) + '/' + filename + with open(filename) as f: + for line in f: + result_list = re.split("[\ \t]+", line) + if result_list[0][:9] != "Benchmark": + continue + first_slash_index = result_list[0].find('/') + last_slash_index = result_list[0].rfind('/') + full_filename = result_list[0][first_slash_index+4:last_slash_index] # delete ../ prefix + total_bytes, _ = get_data_size(full_filename) + behavior_with_suffix = result_list[0][last_slash_index+1:] + last_dash = behavior_with_suffix.rfind("-") + if last_dash == -1: + behavior = behavior_with_suffix + else: + behavior = behavior_with_suffix[:last_dash] + go_result.append({ + "dataFilename": extract_file_name(full_filename), + "throughput": total_bytes / float(result_list[2]) * 1e9 / 2 ** 20, + "behavior": behavior, + "language": "go" + }) + + +def get_metadata(): + build_number = os.getenv('BUILD_NUMBER') + build_url = os.getenv('BUILD_URL') + job_name = os.getenv('JOB_NAME') + git_commit = os.getenv('GIT_COMMIT') + # actual commit is the actual head of PR that is getting tested + git_actual_commit = os.getenv('ghprbActualCommit') + + utc_timestamp = str(calendar.timegm(time.gmtime())) + metadata = {'created': utc_timestamp} + + if build_number: + metadata['buildNumber'] = build_number + if build_url: + metadata['buildUrl'] = build_url + if job_name: + metadata['jobName'] = job_name + if git_commit: + metadata['gitCommit'] = git_commit + if git_actual_commit: + metadata['gitActualCommit'] = git_actual_commit + + return metadata + + +def upload_result(result_list, metadata): + for result in result_list: + new_result = copy.deepcopy(result) + new_result['metadata'] = metadata + bq = big_query_utils.create_big_query() + row = big_query_utils.make_row(str(uuid.uuid4()), new_result) + if not big_query_utils.insert_rows(bq, _PROJECT_ID, _DATASET, + _TABLE + "$" + _NOW, + [row]): + print 'Error when uploading result', new_result + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("-cpp", "--cpp_input_file", + help="The CPP benchmark result file's name", + default="") + parser.add_argument("-java", "--java_input_file", + help="The Java benchmark result file's name", + default="") + parser.add_argument("-python", "--python_input_file", + help="The Python benchmark result file's name", + default="") + parser.add_argument("-go", "--go_input_file", + help="The golang benchmark result file's name", + default="") + args = parser.parse_args() + + parse_cpp_result(args.cpp_input_file) + parse_python_result(args.python_input_file) + parse_java_result(args.java_input_file) + parse_go_result(args.go_input_file) + + metadata = get_metadata() + print "uploading cpp results..." + upload_result(cpp_result, metadata) + print "uploading java results..." + upload_result(java_result, metadata) + print "uploading python results..." + upload_result(python_result, metadata) + print "uploading go results..." + upload_result(go_result, metadata) -- cgit v1.2.3